From ac69a5c8e3f1e2fd7e828a17eeab97984f4f9746 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Thu, 23 Jun 2022 18:58:41 +0200 Subject: [PATCH] refactoring: rm dl strat module --- pyinfra/default_objects.py | 5 +-- pyinfra/visitor/downloader.py | 35 +++++++++++++++++++ .../visitor/strategies/download/__init__.py | 0 .../visitor/strategies/download/download.py | 8 ----- pyinfra/visitor/strategies/download/multi.py | 35 ------------------- pyinfra/visitor/visitor.py | 27 +++++++------- test/conftest.py | 4 ++- 7 files changed, 54 insertions(+), 60 deletions(-) delete mode 100644 pyinfra/visitor/strategies/download/__init__.py delete mode 100644 pyinfra/visitor/strategies/download/download.py delete mode 100644 pyinfra/visitor/strategies/download/multi.py diff --git a/pyinfra/default_objects.py b/pyinfra/default_objects.py index 28de59a..4704212 100644 --- a/pyinfra/default_objects.py +++ b/pyinfra/default_objects.py @@ -17,7 +17,8 @@ from pyinfra.storage import storages from pyinfra.visitor import QueueVisitor from pyinfra.visitor.response_formatter.formatters.default import DefaultResponseFormatter from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter -from pyinfra.visitor.strategies.download.multi import Downloader +from pyinfra.visitor.downloader import Downloader +from pyinfra.visitor.strategies.blob_parsing.dynamic import DynamicParsingStrategy from pyinfra.visitor.strategies.response.aggregation import AggregationStorageStrategy logger = logging.getLogger(__name__) @@ -54,7 +55,7 @@ class ComponentFactory: def get_visitor(self, callback): return QueueVisitor( callback=callback, - downloader=self.get_downloader(), + data_loader=self.get_downloader(), response_strategy=self.get_response_strategy(), response_formatter=self.get_response_formatter(), ) diff --git a/pyinfra/visitor/downloader.py b/pyinfra/visitor/downloader.py index e69de29..230e392 100644 --- a/pyinfra/visitor/downloader.py +++ b/pyinfra/visitor/downloader.py @@ -0,0 +1,35 @@ +from functools import partial + +from funcy import compose + +from pyinfra.file_descriptor_manager import FileDescriptorManager +from pyinfra.storage.storage import Storage +from pyinfra.utils.encoding import decompress +from pyinfra.utils.func import flift + + +class Downloader: + def __init__(self, storage: Storage, bucket_name, file_descriptor_manager: FileDescriptorManager): + self.storage = storage + self.bucket_name = bucket_name + self.file_descriptor_manager = file_descriptor_manager + + def __call__(self, queue_item_body): + return self.download(queue_item_body) + + def get_names_of_objects_by_pattern(self, file_pattern: str): + matches_pattern = flift(file_pattern) + page_object_names = compose(matches_pattern, self.storage.get_all_object_names)(self.bucket_name) + return page_object_names + + def download_and_decompress_object(self, object_names): + download = partial(self.storage.get_object, self.bucket_name) + return map(compose(decompress, download), object_names) + + def download(self, queue_item_body): + file_pattern = self.file_descriptor_manager.build_input_matcher(queue_item_body) + + page_object_names = self.get_names_of_objects_by_pattern(file_pattern) + objects = self.download_and_decompress_object(page_object_names) + + return objects diff --git a/pyinfra/visitor/strategies/download/__init__.py b/pyinfra/visitor/strategies/download/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pyinfra/visitor/strategies/download/download.py b/pyinfra/visitor/strategies/download/download.py deleted file mode 100644 index 1f6c09b..0000000 --- a/pyinfra/visitor/strategies/download/download.py +++ /dev/null @@ -1,8 +0,0 @@ -import abc - -from pyinfra.file_descriptor_manager import FileDescriptorManager - - -class DownloadStrategy(abc.ABC): - def __init__(self, file_descriptor_manager: FileDescriptorManager): - self.file_descriptor_manager = file_descriptor_manager diff --git a/pyinfra/visitor/strategies/download/multi.py b/pyinfra/visitor/strategies/download/multi.py deleted file mode 100644 index 230e392..0000000 --- a/pyinfra/visitor/strategies/download/multi.py +++ /dev/null @@ -1,35 +0,0 @@ -from functools import partial - -from funcy import compose - -from pyinfra.file_descriptor_manager import FileDescriptorManager -from pyinfra.storage.storage import Storage -from pyinfra.utils.encoding import decompress -from pyinfra.utils.func import flift - - -class Downloader: - def __init__(self, storage: Storage, bucket_name, file_descriptor_manager: FileDescriptorManager): - self.storage = storage - self.bucket_name = bucket_name - self.file_descriptor_manager = file_descriptor_manager - - def __call__(self, queue_item_body): - return self.download(queue_item_body) - - def get_names_of_objects_by_pattern(self, file_pattern: str): - matches_pattern = flift(file_pattern) - page_object_names = compose(matches_pattern, self.storage.get_all_object_names)(self.bucket_name) - return page_object_names - - def download_and_decompress_object(self, object_names): - download = partial(self.storage.get_object, self.bucket_name) - return map(compose(decompress, download), object_names) - - def download(self, queue_item_body): - file_pattern = self.file_descriptor_manager.build_input_matcher(queue_item_body) - - page_object_names = self.get_names_of_objects_by_pattern(file_pattern) - objects = self.download_and_decompress_object(page_object_names) - - return objects diff --git a/pyinfra/visitor/visitor.py b/pyinfra/visitor/visitor.py index e6f7433..a50ed26 100644 --- a/pyinfra/visitor/visitor.py +++ b/pyinfra/visitor/visitor.py @@ -2,13 +2,11 @@ from typing import Callable from funcy import lflatten, compose, itervalues, lfilter -from pyinfra.server.debugging import inspect from pyinfra.utils.func import lift from pyinfra.visitor.response_formatter.formatter import ResponseFormatter from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter from pyinfra.visitor.strategies.blob_parsing.blob_parsing import BlobParsingStrategy from pyinfra.visitor.strategies.blob_parsing.dynamic import DynamicParsingStrategy -from pyinfra.visitor.strategies.download.multi import Downloader from pyinfra.visitor.strategies.response.response import ResponseStrategy from pyinfra.visitor.utils import standardize @@ -17,27 +15,28 @@ class QueueVisitor: def __init__( self, callback: Callable, - downloader: Downloader, + data_loader: Callable, + response_strategy: ResponseStrategy, parsing_strategy: BlobParsingStrategy = None, - response_strategy: ResponseStrategy = None, response_formatter: ResponseFormatter = None, ): - """Processes queue messages that specify items on a storage to process with a given callback. + """Processes queue messages with a given callback. Args: - callback: callback to apply to storage items - downloader: behaviour for loading items from the storage - parsing_strategy: behaviour for interpreting storage items + callback: callback to apply + data_loader: loads data specified in message and passes to callback + parsing_strategy: behaviour for interpreting loaded items response_strategy: behaviour for response production + TODO: merge all dependencies into a single pipeline like: getter -> parser -> processor -> formatter -> putter + Returns: depends on response strategy """ self.callback = callback - self.downloader = downloader - self.parsing_strategy = parsing_strategy or DynamicParsingStrategy() + self.data_loader = data_loader self.response_strategy = response_strategy - assert self.response_strategy is not None + self.parsing_strategy = parsing_strategy or DynamicParsingStrategy() self.response_formatter = response_formatter or IdentityResponseFormatter() def __call__(self, queue_item_body): @@ -53,7 +52,7 @@ class QueueVisitor: """Bundles the result from processing a storage item with the body of the corresponding queue item.""" callback_results = compose( - self.remove_empty_restuls, + self.remove_empty_results, lflatten, lift(self.get_item_processor(queue_item_body)), self.load_data, @@ -65,7 +64,7 @@ class QueueVisitor: data = compose( lift(standardize), lift(self.parsing_strategy), - self.downloader, + self.data_loader, )(queue_item_body) return data @@ -78,7 +77,7 @@ class QueueVisitor: return process_storage_item @staticmethod - def remove_empty_restuls(results): + def remove_empty_results(results): return lfilter(compose(any, itervalues), results) def process_storage_item(self, data_metadata_pack): diff --git a/test/conftest.py b/test/conftest.py index 1d49f56..7805a84 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,6 +1,8 @@ import json from pyinfra.config import CONFIG as MAIN_CONFIG +from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter +from pyinfra.visitor.strategies.blob_parsing.dynamic import DynamicParsingStrategy MAIN_CONFIG["retry"]["delay"] = 0.1 MAIN_CONFIG["retry"]["jitter"] = (0.1, 0.2) @@ -203,7 +205,7 @@ def visitor(storage, analysis_callback, response_strategy, component_factory): return QueueVisitor( callback=analysis_callback, - downloader=component_factory.get_downloader(storage), + data_loader=component_factory.get_downloader(storage), response_strategy=response_strategy, )