From bfe8bbb8cb748b7c1a60c46958b3a4f40b430fe8 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Wed, 1 Jun 2022 16:00:46 +0200 Subject: [PATCH] reorganized queue message metadata in request --- pyinfra/default_objects.py | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/pyinfra/default_objects.py b/pyinfra/default_objects.py index 6048829..de43ba8 100644 --- a/pyinfra/default_objects.py +++ b/pyinfra/default_objects.py @@ -1,8 +1,7 @@ import logging from functools import lru_cache -from operator import itemgetter -from funcy import rcompose +from funcy import rcompose, omit, merge, lmap from pyinfra.config import CONFIG from pyinfra.exceptions import AnalysisFailure @@ -16,6 +15,9 @@ from pyinfra.server.receiver.receivers.rest import RestReceiver from pyinfra.storage import storages from pyinfra.visitor import QueueVisitor, AggregationStorageStrategy +logger = logging.getLogger(__name__) +logger.setLevel(logging.ERROR) + @lru_cache(maxsize=None) def get_consumer(callback=None): @@ -76,15 +78,29 @@ class Callback: pipeline. Probably the pipeline return value needs to contains the queue message frame (or so), in order for the queue manager to tell which message to ack. - TODO: casting list on `analysis_response_stream` is a temporary solution, while the client pipeline operates - on singletons ([data], [metadata]). + TODO: casting list (lmap) on `analysis_response_stream` is a temporary solution, while the client pipeline + operates on singletons ([data], [metadata]). """ + def combine_storage_item_metadata_with_queue_message_metadata(body): + return merge(body["metadata"], omit(body, ["data", "metadata"])) + + def remove_queue_message_metadata(result): + metadata = omit(result["metadata"], queue_message_keys(body)) + return {**result, "metadata": metadata} + + def queue_message_keys(body): + return {*body.keys()}.difference({"data", "metadata"}) + try: - data, metadata = itemgetter("data", "metadata")(body) + data = body["data"] + metadata = combine_storage_item_metadata_with_queue_message_metadata(body) analysis_response_stream = pipeline([data], [metadata]) - return list(analysis_response_stream) + analysis_response_stream = lmap(remove_queue_message_metadata, analysis_response_stream) + return analysis_response_stream + except Exception as err: + logger.error(err) raise AnalysisFailure from err def __call__(self, body: dict):