diff --git a/pyinfra/config.py b/pyinfra/config.py index 155283a..6e82b61 100644 --- a/pyinfra/config.py +++ b/pyinfra/config.py @@ -1,5 +1,6 @@ """Implements a config object with dot-indexing syntax.""" import os +from functools import partial from itertools import chain from operator import truth from typing import Iterable @@ -47,20 +48,23 @@ class Config: def __setitem__(self, key, value): self.__config.key = value - def to_dict(self): - return to_dict(self.__config.export()) + def to_dict(self, frozen=True): + return to_dict(self.__config.export(), frozen=frozen) def __hash__(self): return hash(self.to_dict()) -def to_dict(v): +def to_dict(v, frozen=True): + def make_dict(*args, **kwargs): + return (frozendict if frozen else dict)(*args, **kwargs) + if isinstance(v, list): - return tuple(map(to_dict, v)) + return tuple(map(partial(to_dict, frozen=frozen), v)) elif isinstance(v, DotIndexable): - return frozendict({k: to_dict(v) for k, v in v.x.items()}) + return make_dict({k: to_dict(v, frozen=frozen) for k, v in v.x.items()}) elif isinstance(v, dict): - return frozendict({k: to_dict(v) for k, v in v.items()}) + return make_dict({k: to_dict(v, frozen=frozen) for k, v in v.items()}) else: return v diff --git a/pyinfra/default_objects.py b/pyinfra/default_objects.py index ad699dd..28de59a 100644 --- a/pyinfra/default_objects.py +++ b/pyinfra/default_objects.py @@ -17,8 +17,7 @@ from pyinfra.storage import storages from pyinfra.visitor import QueueVisitor from pyinfra.visitor.response_formatter.formatters.default import DefaultResponseFormatter from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter -from pyinfra.visitor.strategies.download.multi import MultiDownloadStrategy -from pyinfra.visitor.strategies.download.single import SingleDownloadStrategy +from pyinfra.visitor.strategies.download.multi import Downloader from pyinfra.visitor.strategies.response.aggregation import AggregationStorageStrategy logger = logging.getLogger(__name__) @@ -54,9 +53,8 @@ class ComponentFactory: @lru_cache(maxsize=None) def get_visitor(self, callback): return QueueVisitor( - storage=self.get_storage(), callback=callback, - download_strategy=self.get_download_strategy(), + downloader=self.get_downloader(), response_strategy=self.get_response_strategy(), response_formatter=self.get_response_formatter(), ) @@ -93,24 +91,13 @@ class ComponentFactory: return self.config.service.operations @lru_cache(maxsize=None) - def get_download_strategy(self, download_strategy_type=None): - download_strategies = { - "single": self.get_single_download_strategy(), - "multi": self.get_multi_download_strategy(), - } - return download_strategies.get( - download_strategy_type or self.config.service.download_strategy, - self.get_single_download_strategy(), + def get_downloader(self, storage=None): + return Downloader( + storage=storage or self.get_storage(), + bucket_name=parse_disjunction_string(self.config.storage.bucket), + file_descriptor_manager=self.get_file_descriptor_manager(), ) - @lru_cache(maxsize=None) - def get_single_download_strategy(self): - return SingleDownloadStrategy(self.get_file_descriptor_manager()) - - @lru_cache(maxsize=None) - def get_multi_download_strategy(self): - return MultiDownloadStrategy(self.get_file_descriptor_manager()) - class Callback: def __init__(self, base_url): diff --git a/pyinfra/visitor/downloader.py b/pyinfra/visitor/downloader.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/visitor/strategies/download/multi.py b/pyinfra/visitor/strategies/download/multi.py index c7b6d45..230e392 100644 --- a/pyinfra/visitor/strategies/download/multi.py +++ b/pyinfra/visitor/strategies/download/multi.py @@ -2,36 +2,34 @@ from functools import partial from funcy import compose -from pyinfra.config import parse_disjunction_string, CONFIG from pyinfra.file_descriptor_manager import FileDescriptorManager from pyinfra.storage.storage import Storage from pyinfra.utils.encoding import decompress from pyinfra.utils.func import flift -from pyinfra.visitor.strategies.download.download import DownloadStrategy -class MultiDownloadStrategy(DownloadStrategy): - def __init__(self, file_descriptor_manager: FileDescriptorManager): - # TODO: pass in bucket name from outside / introduce closure-like abstraction for the bucket - self.bucket_name = parse_disjunction_string(CONFIG.storage.bucket) - super().__init__(file_descriptor_manager=file_descriptor_manager) +class Downloader: + def __init__(self, storage: Storage, bucket_name, file_descriptor_manager: FileDescriptorManager): + self.storage = storage + self.bucket_name = bucket_name + self.file_descriptor_manager = file_descriptor_manager - def __call__(self, storage, queue_item_body): - return self.download(storage, queue_item_body) + def __call__(self, queue_item_body): + return self.download(queue_item_body) - def get_names_of_objects_by_pages(self, storage, file_pattern: str): + def get_names_of_objects_by_pattern(self, file_pattern: str): matches_pattern = flift(file_pattern) - page_object_names = compose(matches_pattern, storage.get_all_object_names)(self.bucket_name) + page_object_names = compose(matches_pattern, self.storage.get_all_object_names)(self.bucket_name) return page_object_names - def download_and_decompress_object(self, storage, object_names): - download = partial(storage.get_object, self.bucket_name) + def download_and_decompress_object(self, object_names): + download = partial(self.storage.get_object, self.bucket_name) return map(compose(decompress, download), object_names) - def download(self, storage: Storage, queue_item_body): + def download(self, queue_item_body): file_pattern = self.file_descriptor_manager.build_input_matcher(queue_item_body) - page_object_names = self.get_names_of_objects_by_pages(storage, file_pattern) - objects = self.download_and_decompress_object(storage, page_object_names) + page_object_names = self.get_names_of_objects_by_pattern(file_pattern) + objects = self.download_and_decompress_object(page_object_names) return objects diff --git a/pyinfra/visitor/strategies/download/single.py b/pyinfra/visitor/strategies/download/single.py deleted file mode 100644 index 83efbc6..0000000 --- a/pyinfra/visitor/strategies/download/single.py +++ /dev/null @@ -1,36 +0,0 @@ -import logging - -from pyinfra.exceptions import DataLoadingFailure -from pyinfra.file_descriptor_manager import FileDescriptorManager -from pyinfra.utils.encoding import decompress -from pyinfra.visitor.strategies.download.download import DownloadStrategy - - -class SingleDownloadStrategy(DownloadStrategy): - def __init__(self, file_descriptor_manager: FileDescriptorManager): - super().__init__(file_descriptor_manager=file_descriptor_manager) - - def download(self, storage, queue_item_body): - return self._load_data(storage, queue_item_body) - - def _load_data(self, storage, queue_item_body): - object_descriptor = self.file_descriptor_manager.get_input_object_descriptor(queue_item_body) - logging.debug(f"Downloading {object_descriptor}...") - data = self.__download(storage, object_descriptor) - logging.debug(f"Downloaded {object_descriptor}.") - assert isinstance(data, bytes) - data = decompress(data) - return [data] - - @staticmethod - def __download(storage, object_descriptor): - try: - data = storage.get_object(**object_descriptor) - except Exception as err: - logging.warning(f"Loading data from storage failed for {object_descriptor}.") - raise DataLoadingFailure from err - - return data - - def __call__(self, storage, queue_item_body): - return self.download(storage, queue_item_body) diff --git a/pyinfra/visitor/strategies/response/storage.py b/pyinfra/visitor/strategies/response/storage.py index c427867..658987e 100644 --- a/pyinfra/visitor/strategies/response/storage.py +++ b/pyinfra/visitor/strategies/response/storage.py @@ -28,13 +28,10 @@ class StorageStrategy(ResponseStrategy): def get_response_object_name(body): """TODO: refactor by using FileDescriptorManager""" - if "pages" not in body: - body["pages"] = [] - if "id" not in body: body["id"] = 0 - dossier_id, file_id, pages, idnt = itemgetter("dossierId", "fileId", "pages", "id")(body) + dossier_id, file_id, idnt = itemgetter("dossierId", "fileId", "id")(body) object_name = f"{dossier_id}/{file_id}/id:{idnt}.{CONFIG.service.response_file_extension}" diff --git a/pyinfra/visitor/visitor.py b/pyinfra/visitor/visitor.py index 54c255f..e6f7433 100644 --- a/pyinfra/visitor/visitor.py +++ b/pyinfra/visitor/visitor.py @@ -2,24 +2,22 @@ from typing import Callable from funcy import lflatten, compose, itervalues, lfilter -from pyinfra.storage.storage import Storage +from pyinfra.server.debugging import inspect from pyinfra.utils.func import lift from pyinfra.visitor.response_formatter.formatter import ResponseFormatter from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter from pyinfra.visitor.strategies.blob_parsing.blob_parsing import BlobParsingStrategy from pyinfra.visitor.strategies.blob_parsing.dynamic import DynamicParsingStrategy -from pyinfra.visitor.strategies.download.download import DownloadStrategy +from pyinfra.visitor.strategies.download.multi import Downloader from pyinfra.visitor.strategies.response.response import ResponseStrategy -from pyinfra.visitor.strategies.response.storage import StorageStrategy from pyinfra.visitor.utils import standardize class QueueVisitor: def __init__( self, - storage: Storage, callback: Callable, - download_strategy: DownloadStrategy, + downloader: Downloader, parsing_strategy: BlobParsingStrategy = None, response_strategy: ResponseStrategy = None, response_formatter: ResponseFormatter = None, @@ -27,20 +25,19 @@ class QueueVisitor: """Processes queue messages that specify items on a storage to process with a given callback. Args: - storage: storage to pull items specified by queue message from callback: callback to apply to storage items - download_strategy: behaviour for loading items from the storage + downloader: behaviour for loading items from the storage parsing_strategy: behaviour for interpreting storage items response_strategy: behaviour for response production Returns: depends on response strategy """ - self.storage = storage self.callback = callback - self.download_strategy = download_strategy + self.downloader = downloader self.parsing_strategy = parsing_strategy or DynamicParsingStrategy() - self.response_strategy = response_strategy or StorageStrategy(storage) + self.response_strategy = response_strategy + assert self.response_strategy is not None self.response_formatter = response_formatter or IdentityResponseFormatter() def __call__(self, queue_item_body): @@ -68,8 +65,8 @@ class QueueVisitor: data = compose( lift(standardize), lift(self.parsing_strategy), - self.download_strategy, - )(self.storage, queue_item_body) + self.downloader, + )(queue_item_body) return data diff --git a/test/conftest.py b/test/conftest.py index 49991e8..1d49f56 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -28,7 +28,6 @@ from pyinfra.storage.storage import Storage from pyinfra.visitor import QueueVisitor from pyinfra.visitor.strategies.response.forwarding import ForwardingStrategy from pyinfra.visitor.strategies.response.storage import StorageStrategy -from test.config import CONFIG from test.queue.queue_manager_mock import QueueManagerMock from test.storage.adapter_mock import StorageAdapterMock from test.storage.client_mock import StorageClientMock @@ -54,7 +53,7 @@ logging.getLogger("waitress").setLevel(level=logging.CRITICAL + 1) @pytest.fixture(autouse=True) def mute_logger(): - if not CONFIG.logging: + if not TEST_CONFIG.logging: logger.setLevel(logging.CRITICAL + 1) @@ -112,7 +111,7 @@ def s3_backend(request): @pytest.fixture(scope="session", autouse=True) def docker_compose(sleep_seconds=30): - if CONFIG.use_docker_fixture: + if TEST_CONFIG.use_docker_fixture: logger.info(f"Starting docker containers with {COMPOSE_PATH}/docker-compose.yml...") compose = DockerCompose(COMPOSE_PATH, compose_file_name="docker-compose.yml") compose.start() @@ -130,7 +129,7 @@ def get_pika_connection_params(): def get_s3_params(s3_backend): - params = CONFIG.storage[s3_backend] + params = TEST_CONFIG.storage[s3_backend] return params @@ -139,7 +138,7 @@ def get_adapter(client_name, s3_backend): if client_name == "mock": return StorageAdapterMock(StorageClientMock()) if client_name == "azure": - return AzureStorageAdapter(get_azure_client(CONFIG.storage.azure.connection_string)) + return AzureStorageAdapter(get_azure_client(TEST_CONFIG.storage.azure.connection_string)) if client_name == "s3": return S3StorageAdapter(get_s3_client(get_s3_params(s3_backend))) else: @@ -203,9 +202,8 @@ def response_strategy(response_strategy_name, storage): def visitor(storage, analysis_callback, response_strategy, component_factory): return QueueVisitor( - storage=storage, callback=analysis_callback, - download_strategy=component_factory.get_download_strategy(), + downloader=component_factory.get_downloader(storage), response_strategy=response_strategy, ) @@ -217,6 +215,7 @@ def file_descriptor_manager(component_factory): @pytest.fixture def component_factory(): - CONFIG["service"]["operations"] = TEST_CONFIG.service.operations - CONFIG["service"]["download_strategy"] = "single" - return get_component_factory(CONFIG) + MAIN_CONFIG["service"]["operations"] = TEST_CONFIG.service.operations + MAIN_CONFIG["service"]["download_strategy"] = "single" + + return get_component_factory(MAIN_CONFIG) diff --git a/test/fixtures/consumer.py b/test/fixtures/consumer.py index 80bb4ea..d4e3220 100644 --- a/test/fixtures/consumer.py +++ b/test/fixtures/consumer.py @@ -1,3 +1,6 @@ +from operator import itemgetter +from typing import Iterable + import pytest from pyinfra.queue.consumer import Consumer @@ -6,3 +9,29 @@ from pyinfra.queue.consumer import Consumer @pytest.fixture(scope="session") def consumer(queue_manager, callback): return Consumer(callback, queue_manager) + + +@pytest.fixture(scope="session") +def access_callback(): + return itemgetter("fileId") + + +@pytest.fixture() +def items(): + numbers = [f"{i}".encode() for i in range(3)] + return pair_data_with_queue_message(numbers) + + +def pair_data_with_queue_message(data: Iterable[bytes]): + def inner(): + for i, d in enumerate(data): + body = { + "dossierId": "folder", + "fileId": f"file{i}", + "targetFileExtension": "in.gz", + "responseFileExtension": "out.gz", + "pages": [0, 2, 3], + } + yield d, body + + return list(inner()) diff --git a/test/integration_tests/serve_test.py b/test/integration_tests/serve_test.py index ccbe279..7d6f97c 100644 --- a/test/integration_tests/serve_test.py +++ b/test/integration_tests/serve_test.py @@ -6,12 +6,10 @@ from operator import itemgetter import pytest from funcy import compose, first, second, pluck, lflatten, lzip -from pyinfra.config import CONFIG -from pyinfra.default_objects import ComponentFactory, get_component_factory -from pyinfra.queue.consumer import Consumer +from pyinfra.config import CONFIG, DotIndexable +from pyinfra.default_objects import get_component_factory from pyinfra.server.packing import unpack, pack from pyinfra.utils.encoding import compress, decompress -from pyinfra.visitor import QueueVisitor from test.config import CONFIG as TEST_CONFIG diff --git a/test/unit_tests/consumer_test.py b/test/unit_tests/consumer_test.py index 0fc75f4..64f2a37 100644 --- a/test/unit_tests/consumer_test.py +++ b/test/unit_tests/consumer_test.py @@ -1,13 +1,13 @@ import logging -from operator import itemgetter import pytest -from funcy import lmapcat +from funcy import pluck, lflatten +from pyinfra.config import CONFIG +from pyinfra.default_objects import ComponentFactory from pyinfra.exceptions import ProcessingFailure -from test.utils.storage import pack_for_upload -from pyinfra.visitor.strategies.download.single import SingleDownloadStrategy from pyinfra.visitor.strategies.response.forwarding import ForwardingStrategy +from test.utils.storage import pack_for_upload logger = logging.getLogger() @@ -26,7 +26,7 @@ class TestConsumer: @pytest.mark.parametrize( "queue_manager_name", [ - "pika", # NOTE: pika must come first. Test fails IFF pika is in second place, for whatever reason. + "pika", # NOTE: pika must come first. Test fails IFF pika is in second position, for whatever reason. "mock", ], scope="session", @@ -64,8 +64,9 @@ class TestConsumer: @pytest.mark.parametrize("client_name", ["mock", "s3", "azure"], scope="session") @pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session") def test_consuming_nonempty_input_queue_with_visitor_puts_messages_on_output_queue_in_fifo_order( - self, consumer, queue_manager, visitor, bucket_name, storage, items, input_data_encoder + self, consumer, queue_manager, visitor, bucket_name, storage, items, input_data_encoder, ): + file_descriptor_manager = ComponentFactory(CONFIG).get_file_descriptor_manager() visitor.response_strategy = ForwardingStrategy() @@ -73,7 +74,7 @@ class TestConsumer: storage.clear_bucket(bucket_name) for data, message in items: - storage.put_object(**SingleDownloadStrategy().get_object_descriptor(message), data=pack_for_upload(data)) + storage.put_object(**file_descriptor_manager.get_input_object_descriptor(message), data=pack_for_upload(data)) queue_manager.publish_request(message) requests = consumer.consume(inactivity_timeout=5) @@ -82,7 +83,7 @@ class TestConsumer: logger.debug(f"Processing item {itm}") queue_manager.publish_response(req, visitor) - assert lmapcat(itemgetter("data"), queue_manager.output_queue.to_list()) == ["00", "11", "22"] + assert lflatten(pluck("analysis_payloads", queue_manager.output_queue.to_list())) == ["00", "11", "22"] @pytest.mark.parametrize("queue_manager_name", ["pika"], scope="session") def test_message_is_republished_when_callback_raises_processing_failure_exception( @@ -110,7 +111,7 @@ class TestConsumer: logger.addFilter(lambda record: False) # TODO: - # Note above says order of mock and pika matters. Note, that you can produce an error for debugging, when + # The note above says order of mock and pika matters. Note, that you can produce an error for debugging, when # using `queue_manager.publish_response(next(requests), callback)` without the-with and while-block, where it # becomes obvious, that the test with the note above then uses the data from THIS here test. with pytest.raises(DebugError):