topological sorting of definitions by caller hierarchy

This commit is contained in:
Matthias Bisping 2022-06-18 23:17:13 +02:00
parent 02da828268
commit c6c1b121a3
2 changed files with 22 additions and 21 deletions

View File

@ -4,13 +4,13 @@ import json
from pyinfra.server.packing import bytes_to_string
def pack_for_upload(data: bytes):
return compress(json.dumps(bytes_to_string(data)).encode())
def compress(data: bytes):
return gzip.compress(data)
def decompress(data: bytes):
return gzip.decompress(data)
def pack_for_upload(data: bytes):
return compress(json.dumps(bytes_to_string(data)).encode())

View File

@ -20,19 +20,14 @@ class AggregationStorageStrategy(ResponseStrategy):
self.dispatch_callback = dispatch_callback or IdentifierDispatchCallback()
self.buffer = deque()
def put_object(self, data: bytes, storage_upload_info):
object_descriptor = self.get_response_object_descriptor(storage_upload_info)
self.storage.put_object(**object_descriptor, data=compress(data))
return {**storage_upload_info, "responseFile": object_descriptor["object_name"]}
def handle_response(self, analysis_response, final=False):
def upload_or_aggregate(analysis_payload):
return self.upload_or_aggregate(analysis_payload, request_metadata, last=not result_data.peek(False))
def merge_queue_items(self):
merged_buffer_content = self.merger(self.buffer)
self.buffer.clear()
return merged_buffer_content
request_metadata = omit(analysis_response, ["data"])
result_data = peekable(analysis_response["data"])
def upload_queue_items(self, storage_upload_info):
data = json.dumps(self.merge_queue_items()).encode()
return self.put_object(data, storage_upload_info)
yield from filter(is_not_nothing, map(upload_or_aggregate, result_data))
def upload_or_aggregate(self, analysis_payload, request_metadata, last=False):
"""analysis_payload : {data: ..., metadata: ...}"""
@ -50,11 +45,17 @@ class AggregationStorageStrategy(ResponseStrategy):
else:
return Nothing
def handle_response(self, analysis_response, final=False):
def upload_or_aggregate(analysis_payload):
return self.upload_or_aggregate(analysis_payload, request_metadata, last=not result_data.peek(False))
def put_object(self, data: bytes, storage_upload_info):
object_descriptor = self.get_response_object_descriptor(storage_upload_info)
self.storage.put_object(**object_descriptor, data=compress(data))
return {**storage_upload_info, "responseFile": object_descriptor["object_name"]}
request_metadata = omit(analysis_response, ["data"])
result_data = peekable(analysis_response["data"])
def merge_queue_items(self):
merged_buffer_content = self.merger(self.buffer)
self.buffer.clear()
return merged_buffer_content
def upload_queue_items(self, storage_upload_info):
data = json.dumps(self.merge_queue_items()).encode()
return self.put_object(data, storage_upload_info)
yield from filter(is_not_nothing, map(upload_or_aggregate, result_data))