refactoring

This commit is contained in:
Matthias Bisping 2022-06-23 10:42:46 +02:00
parent 9a7c412523
commit 3828341e98

View File

@ -1,10 +1,9 @@
from typing import Callable
from funcy import lflatten, compose, itervalues
from funcy import lflatten, compose, itervalues, lfilter
from pyinfra.server.debugging import inspect
from pyinfra.storage.storage import Storage
from pyinfra.utils.func import lift, flift
from pyinfra.utils.func import lift
from pyinfra.visitor.response_formatter.formatter import ResponseFormatter
from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter
from pyinfra.visitor.strategies.blob_parsing.blob_parsing import BlobParsingStrategy
@ -45,17 +44,19 @@ class QueueVisitor:
self.response_formatter = response_formatter or IdentityResponseFormatter()
def __call__(self, queue_item_body):
analysis_response = self.load_items_from_storage_and_process_with_callback(queue_item_body)
response = self.response_strategy(analysis_response)
response = self.response_formatter(response)
analysis_response = compose(
self.response_formatter,
self.response_strategy,
self.load_items_from_storage_and_process_with_callback,
)(queue_item_body)
return response
return analysis_response
def load_items_from_storage_and_process_with_callback(self, queue_item_body):
"""Bundles the result from processing a storage item with the body of the corresponding queue item."""
callback_results = compose(
flift(compose(any, itervalues)),
self.remove_empty_restuls,
lflatten,
lift(self.get_item_processor(queue_item_body)),
self.load_data,
@ -63,13 +64,6 @@ class QueueVisitor:
return {"analysis_payloads": callback_results, **queue_item_body}
def get_item_processor(self, queue_item_body):
def process_storage_item(storage_item):
analysis_input = {**storage_item, **queue_item_body}
return self.process_storage_item(analysis_input)
return process_storage_item
def load_data(self, queue_item_body):
data = compose(
lift(standardize),
@ -79,5 +73,15 @@ class QueueVisitor:
return data
def get_item_processor(self, queue_item_body):
def process_storage_item(storage_item):
analysis_input = {**storage_item, **queue_item_body}
return self.process_storage_item(analysis_input)
return process_storage_item
def remove_empty_restuls(self, results):
return lfilter(compose(any, itervalues), results)
def process_storage_item(self, data_metadata_pack):
return self.callback(data_metadata_pack)