pyinfra/pyinfra/visitor/visitor.py
Matthias Bisping 5978c7d09e renaming
2022-06-19 00:53:53 +02:00

75 lines
2.9 KiB
Python

from typing import Callable
from funcy import lflatten, compose
from pyinfra.storage.storage import Storage
from pyinfra.utils.func import lift
from pyinfra.visitor.strategies.download.download import DownloadStrategy
from pyinfra.visitor.strategies.blob_parsing.dynamic import DynamicParsingStrategy
from pyinfra.visitor.strategies.blob_parsing.blob_parsing import BlobParsingStrategy
from pyinfra.visitor.strategies.response.response import ResponseStrategy
from pyinfra.visitor.strategies.response.storage import StorageStrategy
from pyinfra.visitor.utils import standardize, get_download_strategy
class QueueVisitor:
def __init__(
self,
storage: Storage,
callback: Callable,
download_strategy: DownloadStrategy = None,
parsing_strategy: BlobParsingStrategy = None,
response_strategy: ResponseStrategy = None,
):
"""Processes queue messages that specify items on a storage to process with a given callback.
Args:
storage: storage to pull items specified by queue message from
callback: callback to apply to storage items
download_strategy: behaviour for loading items from the storage
parsing_strategy: behaviour for interpreting storage items
response_strategy: behaviour for response production
Returns:
depends on response strategy
"""
self.storage = storage
self.callback = callback
self.download_strategy = download_strategy or get_download_strategy()
self.parsing_strategy = parsing_strategy or DynamicParsingStrategy()
self.response_strategy = response_strategy or StorageStrategy()
def __call__(self, queue_item_body):
analysis_result_body = self.load_items_from_storage_and_process_with_callback(queue_item_body)
return self.response_strategy(analysis_result_body)
def load_items_from_storage_and_process_with_callback(self, queue_item_body):
"""Bundles the result from processing a storage item with the body of the corresponding queue item."""
callback_results = compose(
lflatten,
lift(self.get_item_processor(queue_item_body)),
self.load_data,
)(queue_item_body)
return {"data": callback_results, **queue_item_body}
def get_item_processor(self, queue_item_body):
def process_storage_item(storage_item):
analysis_input = {**storage_item, **queue_item_body}
return self.process_storage_item(analysis_input)
return process_storage_item
def load_data(self, queue_item_body):
data = compose(
lift(standardize),
lift(self.parsing_strategy),
self.download_strategy,
)(self.storage, queue_item_body)
return data
def process_storage_item(self, data_metadata_pack):
return self.callback(data_metadata_pack)