misc refactoring

This commit is contained in:
Matthias Bisping 2022-06-18 23:53:51 +02:00
parent c6c1b121a3
commit 4df5b8f72c
4 changed files with 34 additions and 25 deletions

View File

@ -19,15 +19,15 @@ class AzureStorageAdapter(StorageAdapter):
container_client = self.__client.get_container_client(bucket_name)
return container_client.exists()
def make_bucket(self, bucket_name):
container_client = self.__client.get_container_client(bucket_name)
container_client if container_client.exists() else self.__client.create_container(bucket_name)
def __provide_container_client(self, bucket_name) -> ContainerClient:
self.make_bucket(bucket_name)
container_client = self.__client.get_container_client(bucket_name)
return container_client
def make_bucket(self, bucket_name):
container_client = self.__client.get_container_client(bucket_name)
container_client if container_client.exists() else self.__client.create_container(bucket_name)
def put_object(self, bucket_name, object_name, data):
logger.debug(f"Uploading '{object_name}'...")
container_client = self.__provide_container_client(bucket_name)

View File

@ -26,13 +26,13 @@ class DownloadStrategy(abc.ABC):
return data
@staticmethod
@abc.abstractmethod
def get_object_name(body: dict):
raise NotImplementedError
def get_object_descriptor(self, body):
return {
"bucket_name": parse_disjunction_string(CONFIG.storage.bucket),
"object_name": self.get_object_name(body),
}
@staticmethod
@abc.abstractmethod
def get_object_name(body: dict):
raise NotImplementedError

View File

@ -58,4 +58,3 @@ class AggregationStorageStrategy(ResponseStrategy):
def upload_queue_items(self, storage_upload_info):
data = json.dumps(self.merge_queue_items()).encode()
return self.put_object(data, storage_upload_info)

View File

@ -1,8 +1,9 @@
from typing import Callable
from funcy import lflatten
from funcy import lflatten, compose
from pyinfra.storage.storage import Storage
from pyinfra.utils.func import lift
from pyinfra.visitor.strategies.download.download import DownloadStrategy
from pyinfra.visitor.strategies.parsing.dynamic import DynamicParsingStrategy
from pyinfra.visitor.strategies.parsing.parsing import ParsingStrategy
@ -25,26 +26,35 @@ class QueueVisitor:
self.parsing_strategy = parsing_strategy or DynamicParsingStrategy()
self.download_strategy = download_strategy or get_download_strategy()
def load_data(self, queue_item_body):
data = self.download_strategy(self.storage, queue_item_body)
data = map(self.parsing_strategy, data)
data = map(standardize, data)
return data
def process_storage_item(self, data_metadata_pack):
return self.callback(data_metadata_pack)
def __call__(self, queue_item_body):
analysis_result_body = self.load_item_from_storage_and_process_with_callback(queue_item_body)
return self.response_strategy(analysis_result_body)
def load_item_from_storage_and_process_with_callback(self, queue_item_body):
"""Bundles the result from processing a storage item with the body of the corresponding queue item."""
callback_results = compose(
lflatten,
lift(self.get_item_processor(queue_item_body)),
self.load_data,
)(queue_item_body)
return {"data": callback_results, **queue_item_body}
def get_item_processor(self, queue_item_body):
def process_storage_item(storage_item):
analysis_input = {**storage_item, **queue_item_body}
return self.process_storage_item(analysis_input)
storage_items = self.load_data(queue_item_body)
results = lflatten(map(process_storage_item, storage_items))
return {"data": results, **queue_item_body}
return process_storage_item
def __call__(self, queue_item_body):
analysis_result_body = self.load_item_from_storage_and_process_with_callback(queue_item_body)
return self.response_strategy(analysis_result_body)
def load_data(self, queue_item_body):
data = compose(
lift(standardize),
lift(self.parsing_strategy),
self.download_strategy,
)(self.storage, queue_item_body)
return data
def process_storage_item(self, data_metadata_pack):
return self.callback(data_metadata_pack)