Skip to content

Commit 0403d14

Browse files
authored
Create elasticsearch_middleware.py
1 parent f90a94a commit 0403d14

File tree

1 file changed

+105
-0
lines changed

1 file changed

+105
-0
lines changed

elasticsearch_middleware.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import json
2+
import logging
3+
import time
4+
from fastapi import Request
5+
from datetime import datetime, timezone
6+
from fastapi.datastructures import Headers
7+
from starlette.types import ASGIApp, Receive, Scope, Send
8+
from elasticsearch import Elasticsearch
9+
10+
class LogLevelEnum:
11+
level_mapping = {
12+
2: "Info",
13+
3: "Warning",
14+
4: "Warning",
15+
5: "Error"
16+
}
17+
18+
class ElasticsearchLoggerMiddleware:
19+
def __init__(self, app: ASGIApp, config: dict) -> None:
20+
"""
21+
Initializes an Elasticsearch Logger Middleware for FastAPI.
22+
23+
Args:
24+
app (ASGIApp): The FastAPI ASGI application.
25+
config (dict): Configuration settings for Elasticsearch logging.
26+
{
27+
'url': str, # Elasticsearch server URL
28+
'index': str, # Elasticsearch index name
29+
'environment': str, # Environment identifier for logs
30+
'debug': bool # When True logs aren't sent to Elasticsearch
31+
}
32+
"""
33+
elastic_config = config
34+
self.elasticsearch_client = Elasticsearch([elastic_config.get('url')])
35+
self.index = elastic_config.get('index')
36+
self.environment = elastic_config.get('environment')
37+
self.debug = elastic_config.get('debug')
38+
self.app = app
39+
40+
async def __call__(self, scope: Scope, receive: Receive, send: Send):
41+
if scope.get('type') == 'http' and not self.debug:
42+
request = Request(scope)
43+
44+
request_data_received = await receive()
45+
request_received_body = json.dumps(json.loads(request_data_received.get('body')), ensure_ascii=False) if len(request_data_received.get('body')) > 0 else None
46+
47+
start_time = time.time()
48+
49+
log_data = {
50+
"@timestamp": datetime.utcnow().replace(tzinfo=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S"),
51+
"environment": self.environment,
52+
"method": request.method,
53+
"path": request.url.path,
54+
"request": {},
55+
"response": {}
56+
}
57+
58+
log_data["request"]["body"] = request_received_body
59+
60+
async def intercept_send(response):
61+
nonlocal log_data
62+
if response['type'] == 'http.response.body' and "response" not in log_data.keys(): # Streaming response, we don't want to log this
63+
await send(response)
64+
return
65+
if response['type'] == 'http.response.body' and "response" in log_data.keys(): # Response part
66+
67+
end_time = time.time()
68+
elapsed_time = end_time - start_time
69+
70+
log_data["elapsed_time"] = elapsed_time
71+
72+
if log_data["response"]["headers"].get('content-type') == 'application/json':
73+
log_data["response"]["body"] = json.dumps(json.loads(response.get('body')), ensure_ascii=False) if 'body' in response.keys() else None
74+
75+
elif log_data["response"]["headers"].get('content-type') == 'application/octet-stream':
76+
log_data["response"]["body"] = str(response.get('body')) if 'body' in response.keys() else None
77+
78+
self.log_to_elasticsearch(log_data)
79+
80+
if response['type'] == 'http.response.start': # Request part
81+
82+
83+
request_headers = dict(request.headers) if 'headers' in request.keys() else None
84+
request_query_parameters = dict(request.query_params) if len(request.query_params._list) > 0 else None
85+
86+
response_headers = dict(Headers(raw=response.get('headers'))) if 'headers' in response.keys() else None
87+
88+
log_data["status_code"] = response['status']
89+
log_data["level"] = LogLevelEnum.level_mapping.get(int(str(response['status'])[0]))
90+
log_data["request"]["headers"] = request_headers
91+
log_data["request"]["query_parameters"] = request_query_parameters
92+
log_data["response"]["headers"] = response_headers
93+
94+
await send(response)
95+
96+
await self.app(scope, receive, intercept_send)
97+
else:
98+
await self.app(scope, receive, send)
99+
100+
def log_to_elasticsearch(self, log_data):
101+
try:
102+
self.elasticsearch_client.index(index=self.index, body=log_data)
103+
log_data.clear()
104+
except Exception as e:
105+
logging.error(f"Failed to log to Elasticsearch: {str(e)}")

0 commit comments

Comments
 (0)