reorganized queue message metadata in request
This commit is contained in:
parent
ecff50ae7c
commit
bfe8bbb8cb
@ -1,8 +1,7 @@
|
||||
import logging
|
||||
from functools import lru_cache
|
||||
from operator import itemgetter
|
||||
|
||||
from funcy import rcompose
|
||||
from funcy import rcompose, omit, merge, lmap
|
||||
|
||||
from pyinfra.config import CONFIG
|
||||
from pyinfra.exceptions import AnalysisFailure
|
||||
@ -16,6 +15,9 @@ from pyinfra.server.receiver.receivers.rest import RestReceiver
|
||||
from pyinfra.storage import storages
|
||||
from pyinfra.visitor import QueueVisitor, AggregationStorageStrategy
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.ERROR)
|
||||
|
||||
|
||||
@lru_cache(maxsize=None)
|
||||
def get_consumer(callback=None):
|
||||
@ -76,15 +78,29 @@ class Callback:
|
||||
pipeline. Probably the pipeline return value needs to contains the queue message frame (or so), in order for
|
||||
the queue manager to tell which message to ack.
|
||||
|
||||
TODO: casting list on `analysis_response_stream` is a temporary solution, while the client pipeline operates
|
||||
on singletons ([data], [metadata]).
|
||||
TODO: casting list (lmap) on `analysis_response_stream` is a temporary solution, while the client pipeline
|
||||
operates on singletons ([data], [metadata]).
|
||||
"""
|
||||
|
||||
def combine_storage_item_metadata_with_queue_message_metadata(body):
|
||||
return merge(body["metadata"], omit(body, ["data", "metadata"]))
|
||||
|
||||
def remove_queue_message_metadata(result):
|
||||
metadata = omit(result["metadata"], queue_message_keys(body))
|
||||
return {**result, "metadata": metadata}
|
||||
|
||||
def queue_message_keys(body):
|
||||
return {*body.keys()}.difference({"data", "metadata"})
|
||||
|
||||
try:
|
||||
data, metadata = itemgetter("data", "metadata")(body)
|
||||
data = body["data"]
|
||||
metadata = combine_storage_item_metadata_with_queue_message_metadata(body)
|
||||
analysis_response_stream = pipeline([data], [metadata])
|
||||
return list(analysis_response_stream)
|
||||
analysis_response_stream = lmap(remove_queue_message_metadata, analysis_response_stream)
|
||||
return analysis_response_stream
|
||||
|
||||
except Exception as err:
|
||||
logger.error(err)
|
||||
raise AnalysisFailure from err
|
||||
|
||||
def __call__(self, body: dict):
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user