From 586871a26fe1f3796d9b33d5b19a332817deb54f Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Tue, 31 May 2022 18:40:40 +0200 Subject: [PATCH] added queue message body to analysis input dict --- pyinfra/visitor.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pyinfra/visitor.py b/pyinfra/visitor.py index 61ffbd1..c4e6b2b 100644 --- a/pyinfra/visitor.py +++ b/pyinfra/visitor.py @@ -175,7 +175,7 @@ class QueueVisitor: return data @staticmethod - def standardize(data: bytes): + def standardize(data: bytes, queue_item_body): """Storage items can be a blob or a blob with metadata. Standardizes to the latter. Cases: @@ -211,13 +211,13 @@ class QueueVisitor: data["data"] = string_to_bytes(data["data"]) return data - def load_data(self, body): - object_descriptor = get_object_descriptor(body) + def load_data(self, queue_item_body): + object_descriptor = get_object_descriptor(queue_item_body) logging.debug(f"Downloading {object_descriptor}...") data = self.download(object_descriptor) logging.debug(f"Downloaded {object_descriptor}.") data = gzip.decompress(data) - data = self.standardize(data) + data = self.standardize(data, queue_item_body) return data def process_storage_item(self, data_metadata_pack): @@ -226,7 +226,8 @@ class QueueVisitor: def load_item_from_storage_and_process_with_callback(self, queue_item_body): """Bundles the result from processing a storage item with the body of the corresponding queue item.""" storage_item = self.load_data(queue_item_body) - result = self.process_storage_item(storage_item) + analysis_input = {**storage_item, **queue_item_body} + result = self.process_storage_item(analysis_input) result_body = {"data": result, **queue_item_body} return result_body