added queue message body to analysis input dict

This commit is contained in:
Matthias Bisping 2022-05-31 18:40:40 +02:00
parent 3046b4dc26
commit 586871a26f

View File

@ -175,7 +175,7 @@ class QueueVisitor:
return data
@staticmethod
def standardize(data: bytes):
def standardize(data: bytes, queue_item_body):
"""Storage items can be a blob or a blob with metadata. Standardizes to the latter.
Cases:
@ -211,13 +211,13 @@ class QueueVisitor:
data["data"] = string_to_bytes(data["data"])
return data
def load_data(self, body):
object_descriptor = get_object_descriptor(body)
def load_data(self, queue_item_body):
object_descriptor = get_object_descriptor(queue_item_body)
logging.debug(f"Downloading {object_descriptor}...")
data = self.download(object_descriptor)
logging.debug(f"Downloaded {object_descriptor}.")
data = gzip.decompress(data)
data = self.standardize(data)
data = self.standardize(data, queue_item_body)
return data
def process_storage_item(self, data_metadata_pack):
@ -226,7 +226,8 @@ class QueueVisitor:
def load_item_from_storage_and_process_with_callback(self, queue_item_body):
"""Bundles the result from processing a storage item with the body of the corresponding queue item."""
storage_item = self.load_data(queue_item_body)
result = self.process_storage_item(storage_item)
analysis_input = {**storage_item, **queue_item_body}
result = self.process_storage_item(analysis_input)
result_body = {"data": result, **queue_item_body}
return result_body