From 3828341e98861ff8d63035ee983309ad5064bb30 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Thu, 23 Jun 2022 10:42:46 +0200 Subject: [PATCH] refactoring --- pyinfra/visitor/visitor.py | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/pyinfra/visitor/visitor.py b/pyinfra/visitor/visitor.py index c872d14..9a7d806 100644 --- a/pyinfra/visitor/visitor.py +++ b/pyinfra/visitor/visitor.py @@ -1,10 +1,9 @@ from typing import Callable -from funcy import lflatten, compose, itervalues +from funcy import lflatten, compose, itervalues, lfilter -from pyinfra.server.debugging import inspect from pyinfra.storage.storage import Storage -from pyinfra.utils.func import lift, flift +from pyinfra.utils.func import lift from pyinfra.visitor.response_formatter.formatter import ResponseFormatter from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter from pyinfra.visitor.strategies.blob_parsing.blob_parsing import BlobParsingStrategy @@ -45,17 +44,19 @@ class QueueVisitor: self.response_formatter = response_formatter or IdentityResponseFormatter() def __call__(self, queue_item_body): - analysis_response = self.load_items_from_storage_and_process_with_callback(queue_item_body) - response = self.response_strategy(analysis_response) - response = self.response_formatter(response) + analysis_response = compose( + self.response_formatter, + self.response_strategy, + self.load_items_from_storage_and_process_with_callback, + )(queue_item_body) - return response + return analysis_response def load_items_from_storage_and_process_with_callback(self, queue_item_body): """Bundles the result from processing a storage item with the body of the corresponding queue item.""" callback_results = compose( - flift(compose(any, itervalues)), + self.remove_empty_restuls, lflatten, lift(self.get_item_processor(queue_item_body)), self.load_data, @@ -63,13 +64,6 @@ class QueueVisitor: return {"analysis_payloads": callback_results, **queue_item_body} - def get_item_processor(self, queue_item_body): - def process_storage_item(storage_item): - analysis_input = {**storage_item, **queue_item_body} - return self.process_storage_item(analysis_input) - - return process_storage_item - def load_data(self, queue_item_body): data = compose( lift(standardize), @@ -79,5 +73,15 @@ class QueueVisitor: return data + def get_item_processor(self, queue_item_body): + def process_storage_item(storage_item): + analysis_input = {**storage_item, **queue_item_body} + return self.process_storage_item(analysis_input) + + return process_storage_item + + def remove_empty_restuls(self, results): + return lfilter(compose(any, itervalues), results) + def process_storage_item(self, data_metadata_pack): return self.callback(data_metadata_pack)