From c6c1b121a364821156d17d383eb61ad445c6c680 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Sat, 18 Jun 2022 23:17:13 +0200 Subject: [PATCH] topological sorting of definitions by caller hierarchy --- pyinfra/utils/encoding.py | 8 ++--- .../strategies/response/aggregation.py | 35 ++++++++++--------- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/pyinfra/utils/encoding.py b/pyinfra/utils/encoding.py index 1aedb22..146853b 100644 --- a/pyinfra/utils/encoding.py +++ b/pyinfra/utils/encoding.py @@ -4,13 +4,13 @@ import json from pyinfra.server.packing import bytes_to_string +def pack_for_upload(data: bytes): + return compress(json.dumps(bytes_to_string(data)).encode()) + + def compress(data: bytes): return gzip.compress(data) def decompress(data: bytes): return gzip.decompress(data) - - -def pack_for_upload(data: bytes): - return compress(json.dumps(bytes_to_string(data)).encode()) diff --git a/pyinfra/visitor/strategies/response/aggregation.py b/pyinfra/visitor/strategies/response/aggregation.py index fb8024d..e3dc72a 100644 --- a/pyinfra/visitor/strategies/response/aggregation.py +++ b/pyinfra/visitor/strategies/response/aggregation.py @@ -20,19 +20,14 @@ class AggregationStorageStrategy(ResponseStrategy): self.dispatch_callback = dispatch_callback or IdentifierDispatchCallback() self.buffer = deque() - def put_object(self, data: bytes, storage_upload_info): - object_descriptor = self.get_response_object_descriptor(storage_upload_info) - self.storage.put_object(**object_descriptor, data=compress(data)) - return {**storage_upload_info, "responseFile": object_descriptor["object_name"]} + def handle_response(self, analysis_response, final=False): + def upload_or_aggregate(analysis_payload): + return self.upload_or_aggregate(analysis_payload, request_metadata, last=not result_data.peek(False)) - def merge_queue_items(self): - merged_buffer_content = self.merger(self.buffer) - self.buffer.clear() - return merged_buffer_content + request_metadata = omit(analysis_response, ["data"]) + result_data = peekable(analysis_response["data"]) - def upload_queue_items(self, storage_upload_info): - data = json.dumps(self.merge_queue_items()).encode() - return self.put_object(data, storage_upload_info) + yield from filter(is_not_nothing, map(upload_or_aggregate, result_data)) def upload_or_aggregate(self, analysis_payload, request_metadata, last=False): """analysis_payload : {data: ..., metadata: ...}""" @@ -50,11 +45,17 @@ class AggregationStorageStrategy(ResponseStrategy): else: return Nothing - def handle_response(self, analysis_response, final=False): - def upload_or_aggregate(analysis_payload): - return self.upload_or_aggregate(analysis_payload, request_metadata, last=not result_data.peek(False)) + def put_object(self, data: bytes, storage_upload_info): + object_descriptor = self.get_response_object_descriptor(storage_upload_info) + self.storage.put_object(**object_descriptor, data=compress(data)) + return {**storage_upload_info, "responseFile": object_descriptor["object_name"]} - request_metadata = omit(analysis_response, ["data"]) - result_data = peekable(analysis_response["data"]) + def merge_queue_items(self): + merged_buffer_content = self.merger(self.buffer) + self.buffer.clear() + return merged_buffer_content + + def upload_queue_items(self, storage_upload_info): + data = json.dumps(self.merge_queue_items()).encode() + return self.put_object(data, storage_upload_info) - yield from filter(is_not_nothing, map(upload_or_aggregate, result_data))