Skip to content

Commit eed7df0

Browse files
authored
adding support for POST method and applying limit in payload lenght
1 parent 0403d14 commit eed7df0

File tree

1 file changed

+89
-25
lines changed

1 file changed

+89
-25
lines changed

elasticsearch_middleware.py

Lines changed: 89 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -25,37 +25,30 @@ def __init__(self, app: ASGIApp, config: dict) -> None:
2525
config (dict): Configuration settings for Elasticsearch logging.
2626
{
2727
'url': str, # Elasticsearch server URL
28+
'user': str, # Elasticsearch API user
29+
'password': str # Elasticsearch API password
2830
'index': str, # Elasticsearch index name
2931
'environment': str, # Environment identifier for logs
32+
'limit': bool, # Limit Elasticsearch payload array and string lenght
3033
'debug': bool # When True logs aren't sent to Elasticsearch
3134
}
3235
"""
3336
elastic_config = config
34-
self.elasticsearch_client = Elasticsearch([elastic_config.get('url')])
37+
elastic_user = config.get('user')
38+
elastic_password = config.get('password')
39+
basic_auth = (elastic_user, elastic_password) if elastic_user and elastic_password else None
40+
self.elasticsearch_client = Elasticsearch([elastic_config.get('url')], basic_auth=basic_auth)
3541
self.index = elastic_config.get('index')
36-
self.environment = elastic_config.get('environment')
37-
self.debug = elastic_config.get('debug')
42+
self.limit = elastic_config.get('limit', False)
43+
self.environment = elastic_config.get('environment', 'Development')
44+
self.debug = elastic_config.get('debug', False)
3845
self.app = app
3946

40-
async def __call__(self, scope: Scope, receive: Receive, send: Send):
47+
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
48+
4149
if scope.get('type') == 'http' and not self.debug:
42-
request = Request(scope)
43-
44-
request_data_received = await receive()
45-
request_received_body = json.dumps(json.loads(request_data_received.get('body')), ensure_ascii=False) if len(request_data_received.get('body')) > 0 else None
46-
47-
start_time = time.time()
48-
49-
log_data = {
50-
"@timestamp": datetime.utcnow().replace(tzinfo=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S"),
51-
"environment": self.environment,
52-
"method": request.method,
53-
"path": request.url.path,
54-
"request": {},
55-
"response": {}
56-
}
5750

58-
log_data["request"]["body"] = request_received_body
51+
request = Request(scope)
5952

6053
async def intercept_send(response):
6154
nonlocal log_data
@@ -64,22 +57,26 @@ async def intercept_send(response):
6457
return
6558
if response['type'] == 'http.response.body' and "response" in log_data.keys(): # Response part
6659

60+
# Finishes telemetry
6761
end_time = time.time()
6862
elapsed_time = end_time - start_time
6963

7064
log_data["elapsed_time"] = elapsed_time
7165

7266
if log_data["response"]["headers"].get('content-type') == 'application/json':
73-
log_data["response"]["body"] = json.dumps(json.loads(response.get('body')), ensure_ascii=False) if 'body' in response.keys() else None
74-
67+
response_body = json.loads(response.get('body'))
68+
response_body = self.limit_array_length(response_body)
69+
70+
response_body = json.dumps(response_body, ensure_ascii=False) if 'body' in response.keys() else None
71+
log_data["response"]["body"] = response_body
72+
7573
elif log_data["response"]["headers"].get('content-type') == 'application/octet-stream':
7674
log_data["response"]["body"] = str(response.get('body')) if 'body' in response.keys() else None
7775

7876
self.log_to_elasticsearch(log_data)
7977

8078
if response['type'] == 'http.response.start': # Request part
8179

82-
8380
request_headers = dict(request.headers) if 'headers' in request.keys() else None
8481
request_query_parameters = dict(request.query_params) if len(request.query_params._list) > 0 else None
8582

@@ -93,13 +90,80 @@ async def intercept_send(response):
9390

9491
await send(response)
9592

96-
await self.app(scope, receive, intercept_send)
93+
start_time = time.time()
94+
95+
log_data = {
96+
"@timestamp": datetime.utcnow().replace(tzinfo=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S"),
97+
"environment": self.environment,
98+
"method": request.method,
99+
"path": request.url.path,
100+
"request": {},
101+
"response": {}
102+
}
103+
104+
# Starts telemetry
105+
start_time = time.time()
106+
107+
async def intercept_receive():
108+
message = await receive()
109+
110+
more_body = message.get("more_body", False)
111+
body = message.get("body", "")
112+
while more_body:
113+
message = await receive()
114+
body += message.get("body", b"")
115+
more_body = message.get("more_body", False)
116+
117+
message["body"] = body
118+
request_body = ''
119+
120+
if len(message["body"]) > 0:
121+
request_body = json.loads(body.decode('utf-8'))
122+
request_body = self.limit_string_length(request_body)
123+
request_body = json.dumps(request_body, ensure_ascii=False) if len(body.decode('utf-8')) > 0 else None
124+
125+
log_data["request"]["body"] = request_body
126+
127+
return message
128+
129+
await self.app(scope, intercept_receive, intercept_send)
130+
97131
else:
98132
await self.app(scope, receive, send)
99133

100-
def log_to_elasticsearch(self, log_data):
134+
def log_to_elasticsearch(self, log_data) -> None:
101135
try:
102136
self.elasticsearch_client.index(index=self.index, body=log_data)
103137
log_data.clear()
104138
except Exception as e:
105139
logging.error(f"Failed to log to Elasticsearch: {str(e)}")
140+
141+
def limit_string_length(self, data, max_lines=50):
142+
if not self.limit:
143+
return data
144+
if isinstance(data, dict):
145+
for key, value in data.items():
146+
data[key] = self.limit_string_length(value, max_lines)
147+
elif isinstance(data, list):
148+
for i in range(len(data)):
149+
data[i] = self.limit_string_length(data[i], max_lines)
150+
elif isinstance(data, str) and len(data.split('\n')) > max_lines:
151+
data_splitted = data.split('\n')[:max_lines]
152+
data_splitted.append(f' [...] value limited in {max_lines} lines')
153+
data = '\n'.join(data_splitted)
154+
elif isinstance(data, str) and len(data.split('/')) > max_lines: # Base64 files
155+
data_splitted = data.split('/')[:max_lines]
156+
data_splitted.append(f' [...] value limited in {max_lines} lines')
157+
data = '/'.join(data_splitted)
158+
return data
159+
160+
def limit_array_length(self, data, max_length=3):
161+
if not self.limit:
162+
return data
163+
if isinstance(data, dict):
164+
for key, value in data.items():
165+
data[key] = self.limit_array_length(value, max_length)
166+
elif isinstance(data, list) and len(data) > max_length:
167+
data = data[:max_length]
168+
data.append(f'[...] array limited in {max_length} objects')
169+
return data

0 commit comments

Comments
 (0)