refactoring: rm dl strat module

This commit is contained in:
Matthias Bisping 2022-06-23 18:58:41 +02:00
parent efd36d0fc4
commit ac69a5c8e3
7 changed files with 54 additions and 60 deletions

View File

@ -17,7 +17,8 @@ from pyinfra.storage import storages
from pyinfra.visitor import QueueVisitor from pyinfra.visitor import QueueVisitor
from pyinfra.visitor.response_formatter.formatters.default import DefaultResponseFormatter from pyinfra.visitor.response_formatter.formatters.default import DefaultResponseFormatter
from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter
from pyinfra.visitor.strategies.download.multi import Downloader from pyinfra.visitor.downloader import Downloader
from pyinfra.visitor.strategies.blob_parsing.dynamic import DynamicParsingStrategy
from pyinfra.visitor.strategies.response.aggregation import AggregationStorageStrategy from pyinfra.visitor.strategies.response.aggregation import AggregationStorageStrategy
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -54,7 +55,7 @@ class ComponentFactory:
def get_visitor(self, callback): def get_visitor(self, callback):
return QueueVisitor( return QueueVisitor(
callback=callback, callback=callback,
downloader=self.get_downloader(), data_loader=self.get_downloader(),
response_strategy=self.get_response_strategy(), response_strategy=self.get_response_strategy(),
response_formatter=self.get_response_formatter(), response_formatter=self.get_response_formatter(),
) )

View File

@ -0,0 +1,35 @@
from functools import partial
from funcy import compose
from pyinfra.file_descriptor_manager import FileDescriptorManager
from pyinfra.storage.storage import Storage
from pyinfra.utils.encoding import decompress
from pyinfra.utils.func import flift
class Downloader:
def __init__(self, storage: Storage, bucket_name, file_descriptor_manager: FileDescriptorManager):
self.storage = storage
self.bucket_name = bucket_name
self.file_descriptor_manager = file_descriptor_manager
def __call__(self, queue_item_body):
return self.download(queue_item_body)
def get_names_of_objects_by_pattern(self, file_pattern: str):
matches_pattern = flift(file_pattern)
page_object_names = compose(matches_pattern, self.storage.get_all_object_names)(self.bucket_name)
return page_object_names
def download_and_decompress_object(self, object_names):
download = partial(self.storage.get_object, self.bucket_name)
return map(compose(decompress, download), object_names)
def download(self, queue_item_body):
file_pattern = self.file_descriptor_manager.build_input_matcher(queue_item_body)
page_object_names = self.get_names_of_objects_by_pattern(file_pattern)
objects = self.download_and_decompress_object(page_object_names)
return objects

View File

@ -1,8 +0,0 @@
import abc
from pyinfra.file_descriptor_manager import FileDescriptorManager
class DownloadStrategy(abc.ABC):
def __init__(self, file_descriptor_manager: FileDescriptorManager):
self.file_descriptor_manager = file_descriptor_manager

View File

@ -1,35 +0,0 @@
from functools import partial
from funcy import compose
from pyinfra.file_descriptor_manager import FileDescriptorManager
from pyinfra.storage.storage import Storage
from pyinfra.utils.encoding import decompress
from pyinfra.utils.func import flift
class Downloader:
def __init__(self, storage: Storage, bucket_name, file_descriptor_manager: FileDescriptorManager):
self.storage = storage
self.bucket_name = bucket_name
self.file_descriptor_manager = file_descriptor_manager
def __call__(self, queue_item_body):
return self.download(queue_item_body)
def get_names_of_objects_by_pattern(self, file_pattern: str):
matches_pattern = flift(file_pattern)
page_object_names = compose(matches_pattern, self.storage.get_all_object_names)(self.bucket_name)
return page_object_names
def download_and_decompress_object(self, object_names):
download = partial(self.storage.get_object, self.bucket_name)
return map(compose(decompress, download), object_names)
def download(self, queue_item_body):
file_pattern = self.file_descriptor_manager.build_input_matcher(queue_item_body)
page_object_names = self.get_names_of_objects_by_pattern(file_pattern)
objects = self.download_and_decompress_object(page_object_names)
return objects

View File

@ -2,13 +2,11 @@ from typing import Callable
from funcy import lflatten, compose, itervalues, lfilter from funcy import lflatten, compose, itervalues, lfilter
from pyinfra.server.debugging import inspect
from pyinfra.utils.func import lift from pyinfra.utils.func import lift
from pyinfra.visitor.response_formatter.formatter import ResponseFormatter from pyinfra.visitor.response_formatter.formatter import ResponseFormatter
from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter
from pyinfra.visitor.strategies.blob_parsing.blob_parsing import BlobParsingStrategy from pyinfra.visitor.strategies.blob_parsing.blob_parsing import BlobParsingStrategy
from pyinfra.visitor.strategies.blob_parsing.dynamic import DynamicParsingStrategy from pyinfra.visitor.strategies.blob_parsing.dynamic import DynamicParsingStrategy
from pyinfra.visitor.strategies.download.multi import Downloader
from pyinfra.visitor.strategies.response.response import ResponseStrategy from pyinfra.visitor.strategies.response.response import ResponseStrategy
from pyinfra.visitor.utils import standardize from pyinfra.visitor.utils import standardize
@ -17,27 +15,28 @@ class QueueVisitor:
def __init__( def __init__(
self, self,
callback: Callable, callback: Callable,
downloader: Downloader, data_loader: Callable,
response_strategy: ResponseStrategy,
parsing_strategy: BlobParsingStrategy = None, parsing_strategy: BlobParsingStrategy = None,
response_strategy: ResponseStrategy = None,
response_formatter: ResponseFormatter = None, response_formatter: ResponseFormatter = None,
): ):
"""Processes queue messages that specify items on a storage to process with a given callback. """Processes queue messages with a given callback.
Args: Args:
callback: callback to apply to storage items callback: callback to apply
downloader: behaviour for loading items from the storage data_loader: loads data specified in message and passes to callback
parsing_strategy: behaviour for interpreting storage items parsing_strategy: behaviour for interpreting loaded items
response_strategy: behaviour for response production response_strategy: behaviour for response production
TODO: merge all dependencies into a single pipeline like: getter -> parser -> processor -> formatter -> putter
Returns: Returns:
depends on response strategy depends on response strategy
""" """
self.callback = callback self.callback = callback
self.downloader = downloader self.data_loader = data_loader
self.parsing_strategy = parsing_strategy or DynamicParsingStrategy()
self.response_strategy = response_strategy self.response_strategy = response_strategy
assert self.response_strategy is not None self.parsing_strategy = parsing_strategy or DynamicParsingStrategy()
self.response_formatter = response_formatter or IdentityResponseFormatter() self.response_formatter = response_formatter or IdentityResponseFormatter()
def __call__(self, queue_item_body): def __call__(self, queue_item_body):
@ -53,7 +52,7 @@ class QueueVisitor:
"""Bundles the result from processing a storage item with the body of the corresponding queue item.""" """Bundles the result from processing a storage item with the body of the corresponding queue item."""
callback_results = compose( callback_results = compose(
self.remove_empty_restuls, self.remove_empty_results,
lflatten, lflatten,
lift(self.get_item_processor(queue_item_body)), lift(self.get_item_processor(queue_item_body)),
self.load_data, self.load_data,
@ -65,7 +64,7 @@ class QueueVisitor:
data = compose( data = compose(
lift(standardize), lift(standardize),
lift(self.parsing_strategy), lift(self.parsing_strategy),
self.downloader, self.data_loader,
)(queue_item_body) )(queue_item_body)
return data return data
@ -78,7 +77,7 @@ class QueueVisitor:
return process_storage_item return process_storage_item
@staticmethod @staticmethod
def remove_empty_restuls(results): def remove_empty_results(results):
return lfilter(compose(any, itervalues), results) return lfilter(compose(any, itervalues), results)
def process_storage_item(self, data_metadata_pack): def process_storage_item(self, data_metadata_pack):

View File

@ -1,6 +1,8 @@
import json import json
from pyinfra.config import CONFIG as MAIN_CONFIG from pyinfra.config import CONFIG as MAIN_CONFIG
from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter
from pyinfra.visitor.strategies.blob_parsing.dynamic import DynamicParsingStrategy
MAIN_CONFIG["retry"]["delay"] = 0.1 MAIN_CONFIG["retry"]["delay"] = 0.1
MAIN_CONFIG["retry"]["jitter"] = (0.1, 0.2) MAIN_CONFIG["retry"]["jitter"] = (0.1, 0.2)
@ -203,7 +205,7 @@ def visitor(storage, analysis_callback, response_strategy, component_factory):
return QueueVisitor( return QueueVisitor(
callback=analysis_callback, callback=analysis_callback,
downloader=component_factory.get_downloader(storage), data_loader=component_factory.get_downloader(storage),
response_strategy=response_strategy, response_strategy=response_strategy,
) )