From 4df5b8f72cb2e9502f49b1fcd9117efaded01a57 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Sat, 18 Jun 2022 23:53:51 +0200 Subject: [PATCH] misc refactoring --- pyinfra/storage/adapters/azure.py | 8 ++-- .../visitor/strategies/download/download.py | 10 ++--- .../strategies/response/aggregation.py | 1 - pyinfra/visitor/visitor.py | 40 ++++++++++++------- 4 files changed, 34 insertions(+), 25 deletions(-) diff --git a/pyinfra/storage/adapters/azure.py b/pyinfra/storage/adapters/azure.py index 1aad2af..b3d498c 100644 --- a/pyinfra/storage/adapters/azure.py +++ b/pyinfra/storage/adapters/azure.py @@ -19,15 +19,15 @@ class AzureStorageAdapter(StorageAdapter): container_client = self.__client.get_container_client(bucket_name) return container_client.exists() - def make_bucket(self, bucket_name): - container_client = self.__client.get_container_client(bucket_name) - container_client if container_client.exists() else self.__client.create_container(bucket_name) - def __provide_container_client(self, bucket_name) -> ContainerClient: self.make_bucket(bucket_name) container_client = self.__client.get_container_client(bucket_name) return container_client + def make_bucket(self, bucket_name): + container_client = self.__client.get_container_client(bucket_name) + container_client if container_client.exists() else self.__client.create_container(bucket_name) + def put_object(self, bucket_name, object_name, data): logger.debug(f"Uploading '{object_name}'...") container_client = self.__provide_container_client(bucket_name) diff --git a/pyinfra/visitor/strategies/download/download.py b/pyinfra/visitor/strategies/download/download.py index 0f7bc50..260ab84 100644 --- a/pyinfra/visitor/strategies/download/download.py +++ b/pyinfra/visitor/strategies/download/download.py @@ -26,13 +26,13 @@ class DownloadStrategy(abc.ABC): return data - @staticmethod - @abc.abstractmethod - def get_object_name(body: dict): - raise NotImplementedError - def get_object_descriptor(self, body): return { "bucket_name": parse_disjunction_string(CONFIG.storage.bucket), "object_name": self.get_object_name(body), } + + @staticmethod + @abc.abstractmethod + def get_object_name(body: dict): + raise NotImplementedError diff --git a/pyinfra/visitor/strategies/response/aggregation.py b/pyinfra/visitor/strategies/response/aggregation.py index e3dc72a..b9489a5 100644 --- a/pyinfra/visitor/strategies/response/aggregation.py +++ b/pyinfra/visitor/strategies/response/aggregation.py @@ -58,4 +58,3 @@ class AggregationStorageStrategy(ResponseStrategy): def upload_queue_items(self, storage_upload_info): data = json.dumps(self.merge_queue_items()).encode() return self.put_object(data, storage_upload_info) - diff --git a/pyinfra/visitor/visitor.py b/pyinfra/visitor/visitor.py index df36235..c0f6ca0 100644 --- a/pyinfra/visitor/visitor.py +++ b/pyinfra/visitor/visitor.py @@ -1,8 +1,9 @@ from typing import Callable -from funcy import lflatten +from funcy import lflatten, compose from pyinfra.storage.storage import Storage +from pyinfra.utils.func import lift from pyinfra.visitor.strategies.download.download import DownloadStrategy from pyinfra.visitor.strategies.parsing.dynamic import DynamicParsingStrategy from pyinfra.visitor.strategies.parsing.parsing import ParsingStrategy @@ -25,26 +26,35 @@ class QueueVisitor: self.parsing_strategy = parsing_strategy or DynamicParsingStrategy() self.download_strategy = download_strategy or get_download_strategy() - def load_data(self, queue_item_body): - data = self.download_strategy(self.storage, queue_item_body) - data = map(self.parsing_strategy, data) - data = map(standardize, data) - return data - - def process_storage_item(self, data_metadata_pack): - return self.callback(data_metadata_pack) + def __call__(self, queue_item_body): + analysis_result_body = self.load_item_from_storage_and_process_with_callback(queue_item_body) + return self.response_strategy(analysis_result_body) def load_item_from_storage_and_process_with_callback(self, queue_item_body): """Bundles the result from processing a storage item with the body of the corresponding queue item.""" + callback_results = compose( + lflatten, + lift(self.get_item_processor(queue_item_body)), + self.load_data, + )(queue_item_body) + + return {"data": callback_results, **queue_item_body} + + def get_item_processor(self, queue_item_body): def process_storage_item(storage_item): analysis_input = {**storage_item, **queue_item_body} return self.process_storage_item(analysis_input) - storage_items = self.load_data(queue_item_body) - results = lflatten(map(process_storage_item, storage_items)) - return {"data": results, **queue_item_body} + return process_storage_item - def __call__(self, queue_item_body): - analysis_result_body = self.load_item_from_storage_and_process_with_callback(queue_item_body) - return self.response_strategy(analysis_result_body) + def load_data(self, queue_item_body): + data = compose( + lift(standardize), + lift(self.parsing_strategy), + self.download_strategy, + )(self.storage, queue_item_body) + return data + + def process_storage_item(self, data_metadata_pack): + return self.callback(data_metadata_pack)