refactored visitor init
This commit is contained in:
parent
4df5b8f72c
commit
2c902f3129
@ -22,15 +22,6 @@ def build_file_path(storage_upload_info, folder):
|
||||
return f"{storage_upload_info['fileId']}" + (f"/{folder}" if folder else "")
|
||||
|
||||
|
||||
def validate(data):
|
||||
if not ("data" in data and "metadata" in data):
|
||||
raise InvalidStorageItemFormat(f"Expected a mapping with keys 'data' and 'metadata', got {data}.")
|
||||
|
||||
|
||||
def wrap(data):
|
||||
return {"data": data, "metadata": {}}
|
||||
|
||||
|
||||
def standardize(data) -> Dict:
|
||||
"""Storage items can be a blob or a blob with metadata. Standardizes to the latter.
|
||||
|
||||
@ -62,6 +53,15 @@ def standardize(data) -> Dict:
|
||||
return wrap(string_to_bytes(data))
|
||||
|
||||
|
||||
def wrap(data):
|
||||
return {"data": data, "metadata": {}}
|
||||
|
||||
|
||||
def validate(data):
|
||||
if not ("data" in data and "metadata" in data):
|
||||
raise InvalidStorageItemFormat(f"Expected a mapping with keys 'data' and 'metadata', got {data}.")
|
||||
|
||||
|
||||
def get_download_strategy(download_strategy_type=None):
|
||||
download_strategies = {
|
||||
"single": SingleDownloadStrategy(),
|
||||
|
||||
@ -8,6 +8,7 @@ from pyinfra.visitor.strategies.download.download import DownloadStrategy
|
||||
from pyinfra.visitor.strategies.parsing.dynamic import DynamicParsingStrategy
|
||||
from pyinfra.visitor.strategies.parsing.parsing import ParsingStrategy
|
||||
from pyinfra.visitor.strategies.response.response import ResponseStrategy
|
||||
from pyinfra.visitor.strategies.response.storage import StorageStrategy
|
||||
from pyinfra.visitor.utils import standardize, get_download_strategy
|
||||
|
||||
|
||||
@ -16,21 +17,33 @@ class QueueVisitor:
|
||||
self,
|
||||
storage: Storage,
|
||||
callback: Callable,
|
||||
response_strategy: ResponseStrategy,
|
||||
parsing_strategy: ParsingStrategy = None,
|
||||
download_strategy: DownloadStrategy = None,
|
||||
parsing_strategy: ParsingStrategy = None,
|
||||
response_strategy: ResponseStrategy = None,
|
||||
):
|
||||
"""Processes queue messages that specify items on a storage to process with a given callback.
|
||||
|
||||
Args:
|
||||
storage: storage to pull items specified by queue message from
|
||||
callback: callback to apply to storage items
|
||||
download_strategy: behaviour for loading items from the storage
|
||||
parsing_strategy: behaviour for interpreting storage items
|
||||
response_strategy: behaviour for response production
|
||||
|
||||
Returns:
|
||||
depends on response strategy
|
||||
"""
|
||||
self.storage = storage
|
||||
self.callback = callback
|
||||
self.response_strategy = response_strategy
|
||||
self.parsing_strategy = parsing_strategy or DynamicParsingStrategy()
|
||||
self.download_strategy = download_strategy or get_download_strategy()
|
||||
self.parsing_strategy = parsing_strategy or DynamicParsingStrategy()
|
||||
self.response_strategy = response_strategy or StorageStrategy()
|
||||
|
||||
def __call__(self, queue_item_body):
|
||||
analysis_result_body = self.load_item_from_storage_and_process_with_callback(queue_item_body)
|
||||
analysis_result_body = self.load_items_from_storage_and_process_with_callback(queue_item_body)
|
||||
return self.response_strategy(analysis_result_body)
|
||||
|
||||
def load_item_from_storage_and_process_with_callback(self, queue_item_body):
|
||||
def load_items_from_storage_and_process_with_callback(self, queue_item_body):
|
||||
"""Bundles the result from processing a storage item with the body of the corresponding queue item."""
|
||||
|
||||
callback_results = compose(
|
||||
@ -54,6 +67,7 @@ class QueueVisitor:
|
||||
lift(self.parsing_strategy),
|
||||
self.download_strategy,
|
||||
)(self.storage, queue_item_body)
|
||||
|
||||
return data
|
||||
|
||||
def process_storage_item(self, data_metadata_pack):
|
||||
|
||||
@ -197,4 +197,4 @@ def response_strategy(response_strategy_name, storage):
|
||||
|
||||
@pytest.fixture()
|
||||
def visitor(storage, analysis_callback, response_strategy):
|
||||
return QueueVisitor(storage, analysis_callback, response_strategy)
|
||||
return QueueVisitor(storage=storage, callback=analysis_callback, response_strategy=response_strategy)
|
||||
|
||||
@ -232,7 +232,7 @@ def download_strategy(many_to_n):
|
||||
def test_components(url, queue_manager, storage):
|
||||
|
||||
callback = get_callback(url)
|
||||
visitor = QueueVisitor(storage, callback, get_response_strategy(storage))
|
||||
visitor = QueueVisitor(storage=storage, callback=callback, response_strategy=get_response_strategy(storage))
|
||||
consumer = Consumer(visitor, queue_manager)
|
||||
|
||||
return storage, queue_manager, consumer
|
||||
|
||||
@ -26,7 +26,7 @@ class TestVisitor:
|
||||
def test_visitor_pulls_and_processes_data(self, visitor, body, storage, bucket_name):
|
||||
storage.clear_bucket(bucket_name)
|
||||
storage.put_object(**SingleDownloadStrategy().get_object_descriptor(body), data=pack_for_upload(b"2"))
|
||||
response_body = visitor.load_item_from_storage_and_process_with_callback(body)
|
||||
response_body = visitor.load_items_from_storage_and_process_with_callback(body)
|
||||
assert response_body["data"] == ["22"]
|
||||
|
||||
@pytest.mark.parametrize("response_strategy_name", ["storage"], scope="session")
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user