From 01bfb1d668e21f1d0d71e2d9b12d5715a69f5c56 Mon Sep 17 00:00:00 2001 From: Julius Unverfehrt Date: Mon, 11 Jul 2022 09:09:44 +0200 Subject: [PATCH] Pull request #40: 2.0.0 documentation Merge in RR/pyinfra from 2.0.0-documentation to 2.0.0 Squashed commit of the following: commit 7a794bdcc987631cdc4d89b5620359464e2e018e Author: Matthias Bisping Date: Mon Jul 4 13:05:26 2022 +0200 removed obsolete imports commit 3fc6a7ef5d0172dbce1c4292d245eced2f378b5a Author: Matthias Bisping Date: Mon Jul 4 11:47:12 2022 +0200 enable docker-compose fixture commit 36d8d3bc851b06d94cf12a73048a00a67ef79c42 Author: Matthias Bisping Date: Mon Jul 4 11:46:53 2022 +0200 renaming commit 3bf00d11cd041dff325b66f13fcd00d3ce96b8b5 Author: Matthias Bisping Date: Thu Jun 30 12:47:57 2022 +0200 refactoring: added cached pipeline factory commit 90e735852af2f86e35be845fabf28494de952edb Author: Matthias Bisping Date: Wed Jun 29 13:47:08 2022 +0200 renaming commit 93b3d4b202b41183ed8cabe193a4bfa03f520787 Author: Matthias Bisping Date: Wed Jun 29 13:25:03 2022 +0200 further refactored server setup code: moving and decomplecting commit 8b2ed83c7ade5bd811cb045d56fbfb0353fa385e Author: Matthias Bisping Date: Wed Jun 29 12:53:09 2022 +0200 refactored server setup code: factored out and decoupled operation registry and prometheus summary registry commit da2dce762bdd6889165fbb320dc9ee8a0bd089b2 Author: Matthias Bisping Date: Tue Jun 28 19:40:04 2022 +0200 adjusted test target commit 70df7911b9b92f4b72afd7d4b33ca2bbf136295e Author: Matthias Bisping Date: Tue Jun 28 19:32:38 2022 +0200 minor refactoring commit 0937b63dc000346559bde353381304b273244109 Author: Matthias Bisping Date: Mon Jun 27 13:59:59 2022 +0200 support for empty operation suffix commit 5e56917970962a2e69bbd66a324bdb4618c040bd Author: Matthias Bisping Date: Mon Jun 27 12:52:36 2022 +0200 minor refactoring commit 40665a7815ae5927b3877bda14fb77deef37d667 Author: Matthias Bisping Date: Mon Jun 27 10:57:04 2022 +0200 optimization: prefix filtering via storage API for get_all_object_names commit af0892a899d09023eb0e61eecb63e03dc2fd3b60 Author: Matthias Bisping Date: Mon Jun 27 10:55:47 2022 +0200 topological sorting of definitions by caller hierarchy --- pyinfra/callback.py | 53 +++++++--------- pyinfra/component_factory.py | 17 +++--- pyinfra/file_descriptor_manager.py | 9 +++ pyinfra/pipeline_factory.py | 18 ++++++ pyinfra/server/buffering/stream.py | 4 +- pyinfra/server/monitoring.py | 39 ++++++++++++ pyinfra/server/operation_dispatcher.py | 33 ++++++++++ pyinfra/server/server.py | 61 ++++++++----------- .../server/stream/queued_stream_function.py | 11 +++- pyinfra/server/stream/rest.py | 11 +++- pyinfra/storage/adapters/adapter.py | 2 +- pyinfra/storage/adapters/azure.py | 4 +- pyinfra/storage/adapters/s3.py | 4 +- pyinfra/storage/storage.py | 4 +- pyinfra/visitor/downloader.py | 40 +++++++++--- test/config.yaml | 9 +-- test/conftest.py | 2 - test/fixtures/input.py | 20 +++--- test/fixtures/server.py | 2 +- test/storage/adapter_mock.py | 2 +- test/unit_tests/server/pipeline_test.py | 8 +-- test/unit_tests/server/stream_buffer_test.py | 2 +- 22 files changed, 230 insertions(+), 125 deletions(-) create mode 100644 pyinfra/pipeline_factory.py create mode 100644 pyinfra/server/monitoring.py create mode 100644 pyinfra/server/operation_dispatcher.py diff --git a/pyinfra/callback.py b/pyinfra/callback.py index 0b3d817..cec2d75 100644 --- a/pyinfra/callback.py +++ b/pyinfra/callback.py @@ -3,6 +3,7 @@ import logging from funcy import merge, omit, lmap from pyinfra.exceptions import AnalysisFailure +from pyinfra.pipeline_factory import CachedPipelineFactory logger = logging.getLogger(__name__) @@ -12,30 +13,18 @@ class Callback: endpoint. """ - def __init__(self, base_url, pipeline_factory): - self.base_url = base_url + def __init__(self, pipeline_factory: CachedPipelineFactory): self.pipeline_factory = pipeline_factory - self.endpoint2pipeline = {} - - def __make_endpoint(self, operation): - return f"{self.base_url}/{operation}" def __get_pipeline(self, endpoint): - if endpoint in self.endpoint2pipeline: - pipeline = self.endpoint2pipeline[endpoint] - - else: - pipeline = self.pipeline_factory(endpoint) - self.endpoint2pipeline[endpoint] = pipeline - - return pipeline + return self.pipeline_factory.get_pipeline(endpoint) @staticmethod - def __run_pipeline(pipeline, body): + def __run_pipeline(pipeline, analysis_input: dict): """ TODO: Since data and metadata are passed as singletons, there is no buffering and hence no batching happening within the pipeline. However, the queue acknowledgment logic needs to be changed in order to facilitate - passing non-singletons, to only ack a message, once a response is pulled from the output queue of the + passing non-singletons, to only ack a message, once a response is pulled from the output queue of the pipeline. Probably the pipeline return value needs to contains the queue message frame (or so), in order for the queue manager to tell which message to ack. @@ -43,19 +32,19 @@ class Callback: operates on singletons ([data], [metadata]). """ - def combine_storage_item_metadata_with_queue_message_metadata(body): - return merge(body["metadata"], omit(body, ["data", "metadata"])) + def combine_storage_item_metadata_with_queue_message_metadata(analysis_input): + return merge(analysis_input["metadata"], omit(analysis_input, ["data", "metadata"])) - def remove_queue_message_metadata(result): - metadata = omit(result["metadata"], queue_message_keys(body)) - return {**result, "metadata": metadata} + def remove_queue_message_metadata(analysis_result): + metadata = omit(analysis_result["metadata"], queue_message_keys(analysis_input)) + return {**analysis_result, "metadata": metadata} - def queue_message_keys(body): - return {*body.keys()}.difference({"data", "metadata"}) + def queue_message_keys(analysis_input): + return {*analysis_input.keys()}.difference({"data", "metadata"}) try: - data = body["data"] - metadata = combine_storage_item_metadata_with_queue_message_metadata(body) + data = analysis_input["data"] + metadata = combine_storage_item_metadata_with_queue_message_metadata(analysis_input) analysis_response_stream = pipeline([data], [metadata]) analysis_response_stream = lmap(remove_queue_message_metadata, analysis_response_stream) return analysis_response_stream @@ -64,13 +53,13 @@ class Callback: logger.error(err) raise AnalysisFailure from err - def __call__(self, body: dict): - operation = body.get("operation", "submit") - endpoint = self.__make_endpoint(operation) - pipeline = self.__get_pipeline(endpoint) + def __call__(self, analysis_input: dict): + """data_metadata_pack: {'dossierId': ..., 'fileId': ..., 'pages': ..., 'operation': ...}""" + operation = analysis_input.get("operation", "") + pipeline = self.__get_pipeline(operation) try: - logging.debug(f"Requesting analysis from {endpoint}...") - return self.__run_pipeline(pipeline, body) + logging.debug(f"Requesting analysis for operation '{operation}'...") + return self.__run_pipeline(pipeline, analysis_input) except AnalysisFailure: - logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.") + logging.warning(f"Exception caught when calling analysis endpoint for operation '{operation}'.") diff --git a/pyinfra/component_factory.py b/pyinfra/component_factory.py index 17733e0..564c554 100644 --- a/pyinfra/component_factory.py +++ b/pyinfra/component_factory.py @@ -6,6 +6,7 @@ from funcy import project, identity, rcompose from pyinfra.callback import Callback from pyinfra.config import parse_disjunction_string from pyinfra.file_descriptor_manager import FileDescriptorManager +from pyinfra.pipeline_factory import CachedPipelineFactory from pyinfra.queue.consumer import Consumer from pyinfra.queue.queue_manager.pika_queue_manager import PikaQueueManager from pyinfra.server.client_pipeline import ClientPipeline @@ -36,7 +37,7 @@ class ComponentFactory: def get_callback(self, analysis_base_url=None): analysis_base_url = analysis_base_url or self.config.rabbitmq.callback.analysis_endpoint - callback = Callback(analysis_base_url, self.get_pipeline) + callback = Callback(CachedPipelineFactory(base_url=analysis_base_url, pipeline_factory=self.get_pipeline)) def wrapped(body): body_repr = project(body, ["dossierId", "fileId", "operation"]) @@ -60,6 +61,13 @@ class ComponentFactory: def get_queue_manager(self): return PikaQueueManager(self.config.rabbitmq.queues.input, self.config.rabbitmq.queues.output) + @staticmethod + @lru_cache(maxsize=None) + def get_pipeline(endpoint): + return ClientPipeline( + RestPacker(), RestDispatcher(endpoint), RestReceiver(), rcompose(RestPickupStreamer(), RestReceiver()) + ) + @lru_cache(maxsize=None) def get_storage(self): return storages.get_storage(self.config.storage.backend) @@ -100,10 +108,3 @@ class ComponentFactory: bucket_name=parse_disjunction_string(self.config.storage.bucket), file_descriptor_manager=self.get_file_descriptor_manager(), ) - - @staticmethod - @lru_cache(maxsize=None) - def get_pipeline(endpoint): - return ClientPipeline( - RestPacker(), RestDispatcher(endpoint), RestReceiver(), rcompose(RestPickupStreamer(), RestReceiver()) - ) diff --git a/pyinfra/file_descriptor_manager.py b/pyinfra/file_descriptor_manager.py index ab2f18e..1ca8292 100644 --- a/pyinfra/file_descriptor_manager.py +++ b/pyinfra/file_descriptor_manager.py @@ -87,3 +87,12 @@ class FileDescriptorManager: def build_storage_upload_info(analysis_payload, request_metadata): storage_upload_info = {**request_metadata, "id": analysis_payload["metadata"].get("id", 0)} return storage_upload_info + + def get_path_prefix(self, queue_item_body): + + prefix = "/".join( + itemgetter("dossierId", "fileId")( + self.build_file_descriptor(queue_item_body, end="input"), + ) + ) + return prefix diff --git a/pyinfra/pipeline_factory.py b/pyinfra/pipeline_factory.py new file mode 100644 index 0000000..fce742e --- /dev/null +++ b/pyinfra/pipeline_factory.py @@ -0,0 +1,18 @@ +class CachedPipelineFactory: + def __init__(self, base_url, pipeline_factory): + self.base_url = base_url + self.operation2pipeline = {} + self.pipeline_factory = pipeline_factory + + def get_pipeline(self, operation: str): + pipeline = self.operation2pipeline.get(operation, None) or self.__register_pipeline(operation) + return pipeline + + def __register_pipeline(self, operation): + endpoint = self.__make_endpoint(operation) + pipeline = self.pipeline_factory(endpoint) + self.operation2pipeline[operation] = pipeline + return pipeline + + def __make_endpoint(self, operation): + return f"{self.base_url}/{operation}" diff --git a/pyinfra/server/buffering/stream.py b/pyinfra/server/buffering/stream.py index e1cd669..1033968 100644 --- a/pyinfra/server/buffering/stream.py +++ b/pyinfra/server/buffering/stream.py @@ -1,7 +1,7 @@ from itertools import chain, takewhile from typing import Iterable -from funcy import first, repeatedly, flatten +from funcy import first, repeatedly, mapcat from pyinfra.server.buffering.bufferize import bufferize from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing @@ -16,7 +16,7 @@ class FlatStreamBuffer: def __call__(self, items): items = chain(items, [Nothing]) - yield from flatten(map(self.stream_buffer, items)) + yield from mapcat(self.stream_buffer, items) class StreamBuffer: diff --git a/pyinfra/server/monitoring.py b/pyinfra/server/monitoring.py new file mode 100644 index 0000000..42d1cf8 --- /dev/null +++ b/pyinfra/server/monitoring.py @@ -0,0 +1,39 @@ +from functools import lru_cache + +from funcy import identity +from prometheus_client import CollectorRegistry, Summary + +from pyinfra.server.operation_dispatcher import OperationDispatcher + + +class OperationDispatcherMonitoringDecorator: + def __init__(self, operation_dispatcher: OperationDispatcher, naming_policy=identity): + self.operation_dispatcher = operation_dispatcher + self.operation2metric = {} + self.naming_policy = naming_policy + + @property + @lru_cache(maxsize=None) + def registry(self): + return CollectorRegistry(auto_describe=True) + + def make_summary_instance(self, op: str): + return Summary(f"{self.naming_policy(op)}_seconds", f"Time spent on {op}.", registry=self.registry) + + def submit(self, operation, request): + return self.operation_dispatcher.submit(operation, request) + + def pickup(self, operation): + with self.get_monitor(operation): + return self.operation_dispatcher.pickup(operation) + + def get_monitor(self, operation): + monitor = self.operation2metric.get(operation, None) or self.register_operation(operation) + return monitor.time() + + def register_operation(self, operation): + summary = self.make_summary_instance(operation) + self.operation2metric[operation] = summary + return summary + + diff --git a/pyinfra/server/operation_dispatcher.py b/pyinfra/server/operation_dispatcher.py new file mode 100644 index 0000000..e885a2e --- /dev/null +++ b/pyinfra/server/operation_dispatcher.py @@ -0,0 +1,33 @@ +from itertools import starmap, tee +from typing import Dict + +from funcy import juxt, zipdict, cat + +from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction +from pyinfra.server.stream.rest import LazyRestProcessor + + +class OperationDispatcher: + def __init__(self, operation2function: Dict[str, QueuedStreamFunction]): + submit_suffixes, pickup_suffixes = zip(*map(juxt(submit_suffix, pickup_suffix), operation2function)) + processors = starmap(LazyRestProcessor, zip(operation2function.values(), submit_suffixes, pickup_suffixes)) + self.operation2processor = zipdict(submit_suffixes + pickup_suffixes, cat(tee(processors))) + + @classmethod + @property + def pickup_suffix(cls): + return pickup_suffix("") + + def submit(self, operation, request): + return self.operation2processor[operation].push(request) + + def pickup(self, operation): + return self.operation2processor[operation].pop() + + +def submit_suffix(op: str): + return "" if not op else op + + +def pickup_suffix(op: str): + return "pickup" if not op else f"{op}_pickup" diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index 7c7553a..68fb804 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -2,13 +2,13 @@ from functools import singledispatch from typing import Dict, Callable, Union from flask import Flask, jsonify, request -from funcy import merge -from prometheus_client import generate_latest, Summary, CollectorRegistry +from prometheus_client import generate_latest from pyinfra.config import CONFIG from pyinfra.server.buffering.stream import FlatStreamBuffer +from pyinfra.server.monitoring import OperationDispatcherMonitoringDecorator +from pyinfra.server.operation_dispatcher import OperationDispatcher from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction -from pyinfra.server.stream.rest import LazyRestProcessor @singledispatch @@ -52,58 +52,49 @@ def __stream_fn_to_processing_server(operation2stream_fn: dict, buffer_size): def __set_up_processing_server(operation2function: Dict[str, QueuedStreamFunction]): app = Flask(__name__) - registry = CollectorRegistry(auto_describe=True) - operation2processor = { - op: LazyRestProcessor(fn, **build_endpoint_suffixes(op)) for op, fn in operation2function.items() - } + dispatcher = OperationDispatcherMonitoringDecorator( + OperationDispatcher(operation2function), + naming_policy=naming_policy, + ) - def make_summary_instance(op: str): - op = op.replace("_pickup", "") if op != "pickup" else "default" - service_name = CONFIG.service.name - return Summary(f"redactmanager_{service_name}_{op}_seconds", f"Time spent on {op}.", registry=registry) - - submit_operation2processor = {submit_suffix(op): prc for op, prc in operation2processor.items()} - pickup_operation2processor = {pickup_suffix(op): prc for op, prc in operation2processor.items()} - operation2processor = merge(submit_operation2processor, pickup_operation2processor) - - operation2metric = {op: make_summary_instance(op) for op in pickup_operation2processor} + def ok(): + resp = jsonify("OK") + resp.status_code = 200 + return resp @app.route("/ready", methods=["GET"]) def ready(): - resp = jsonify("OK") - resp.status_code = 200 - return resp + return ok() @app.route("/health", methods=["GET"]) def healthy(): - resp = jsonify("OK") - resp.status_code = 200 - return resp + return ok() @app.route("/prometheus", methods=["GET"]) def prometheus(): - return generate_latest(registry=registry) + return generate_latest(registry=dispatcher.registry) @app.route("/", methods=["POST", "PATCH"]) def submit(operation): - return operation2processor[operation].push(request) + return dispatcher.submit(operation, request) + + @app.route("/", methods=["POST", "PATCH"]) + def submit_default(): + return dispatcher.submit("", request) @app.route("/", methods=["GET"]) def pickup(operation): - with operation2metric[operation].time(): - return operation2processor[operation].pop() + return dispatcher.pickup(operation) return app -def build_endpoint_suffixes(op: str): - return {"submit_suffix": submit_suffix(op), "pickup_suffix": pickup_suffix(op)} +def naming_policy(op_name: str): + pop_suffix = OperationDispatcher.pickup_suffix + prefix = f"redactmanager_{CONFIG.service.name}" + op_display_name = op_name.replace(f"_{pop_suffix}", "") if op_name != pop_suffix else "default" + complete_display_name = f"{prefix}_{op_display_name}" -def submit_suffix(op: str): - return "submit" if not op else op - - -def pickup_suffix(op: str): - return "pickup" if not op else f"{op}_pickup" + return complete_display_name diff --git a/pyinfra/server/stream/queued_stream_function.py b/pyinfra/server/stream/queued_stream_function.py index 81563b6..dc93962 100644 --- a/pyinfra/server/stream/queued_stream_function.py +++ b/pyinfra/server/stream/queued_stream_function.py @@ -4,13 +4,18 @@ from pyinfra.server.buffering.queue import stream_queue, Queue class QueuedStreamFunction: - def __init__(self, fn): + def __init__(self, stream_function): + """Combines a stream function with a queue. + + Args: + stream_function: Needs to operate on iterables. + """ self.queue = Queue() - self.fn = fn + self.stream_function = stream_function def push(self, item): self.queue.append(item) def pop(self): items = stream_queue(self.queue) - return first(self.fn(items)) + return first(self.stream_function(items)) diff --git a/pyinfra/server/stream/rest.py b/pyinfra/server/stream/rest.py index 3674913..8a9f3ff 100644 --- a/pyinfra/server/stream/rest.py +++ b/pyinfra/server/stream/rest.py @@ -1,6 +1,7 @@ import logging from flask import jsonify +from funcy import drop from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction @@ -16,7 +17,7 @@ class LazyRestProcessor: def push(self, request): self.queued_stream_function.push(request.json) - return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix)) + return jsonify(replace_suffix(request.base_url, self.submit_suffix, self.pickup_suffix)) def pop(self): result = self.queued_stream_function.pop() or Nothing @@ -38,3 +39,11 @@ class LazyRestProcessor: def valid(result): return isinstance(result, dict) or result is Nothing + + +def replace_suffix(strn, suf, repl): + return remove_last_n(strn, len(suf)) + repl + + +def remove_last_n(strn, n): + return "".join(reversed(list(drop(n, reversed(strn))))) diff --git a/pyinfra/storage/adapters/adapter.py b/pyinfra/storage/adapters/adapter.py index 57cff28..7eb31a9 100644 --- a/pyinfra/storage/adapters/adapter.py +++ b/pyinfra/storage/adapters/adapter.py @@ -30,5 +30,5 @@ class StorageAdapter(ABC): raise NotImplementedError @abstractmethod - def get_all_object_names(self, bucket_name): + def get_all_object_names(self, bucket_name, prefix=None): raise NotImplementedError diff --git a/pyinfra/storage/adapters/azure.py b/pyinfra/storage/adapters/azure.py index b3d498c..074a9d6 100644 --- a/pyinfra/storage/adapters/azure.py +++ b/pyinfra/storage/adapters/azure.py @@ -58,7 +58,7 @@ class AzureStorageAdapter(StorageAdapter): blobs = container_client.list_blobs() container_client.delete_blobs(*blobs) - def get_all_object_names(self, bucket_name): + def get_all_object_names(self, bucket_name, prefix=None): container_client = self.__provide_container_client(bucket_name) - blobs = container_client.list_blobs() + blobs = container_client.list_blobs(name_starts_with=prefix) return map(attrgetter("name"), blobs) diff --git a/pyinfra/storage/adapters/s3.py b/pyinfra/storage/adapters/s3.py index 2fe5dcc..a7729df 100644 --- a/pyinfra/storage/adapters/s3.py +++ b/pyinfra/storage/adapters/s3.py @@ -53,6 +53,6 @@ class S3StorageAdapter(StorageAdapter): for obj in objects: self.__client.remove_object(bucket_name, obj.object_name) - def get_all_object_names(self, bucket_name): - objs = self.__client.list_objects(bucket_name, recursive=True) + def get_all_object_names(self, bucket_name, prefix=None): + objs = self.__client.list_objects(bucket_name, recursive=True, prefix=prefix) return map(attrgetter("object_name"), objs) diff --git a/pyinfra/storage/storage.py b/pyinfra/storage/storage.py index a0026c3..2bc4eff 100644 --- a/pyinfra/storage/storage.py +++ b/pyinfra/storage/storage.py @@ -40,5 +40,5 @@ class Storage: def clear_bucket(self, bucket_name): return self.__adapter.clear_bucket(bucket_name) - def get_all_object_names(self, bucket_name): - return self.__adapter.get_all_object_names(bucket_name) + def get_all_object_names(self, bucket_name, prefix=None): + return self.__adapter.get_all_object_names(bucket_name, prefix=prefix) diff --git a/pyinfra/visitor/downloader.py b/pyinfra/visitor/downloader.py index 230e392..ec5573e 100644 --- a/pyinfra/visitor/downloader.py +++ b/pyinfra/visitor/downloader.py @@ -1,3 +1,4 @@ +import logging from functools import partial from funcy import compose @@ -7,6 +8,8 @@ from pyinfra.storage.storage import Storage from pyinfra.utils.encoding import decompress from pyinfra.utils.func import flift +logger = logging.getLogger(__name__) + class Downloader: def __init__(self, storage: Storage, bucket_name, file_descriptor_manager: FileDescriptorManager): @@ -17,19 +20,36 @@ class Downloader: def __call__(self, queue_item_body): return self.download(queue_item_body) - def get_names_of_objects_by_pattern(self, file_pattern: str): - matches_pattern = flift(file_pattern) - page_object_names = compose(matches_pattern, self.storage.get_all_object_names)(self.bucket_name) - return page_object_names + def download(self, queue_item_body): + names_of_relevant_objects = self.get_names_of_objects_by_pattern(queue_item_body) + objects = self.download_and_decompress_object(names_of_relevant_objects) + + return objects + + def get_names_of_objects_by_pattern(self, queue_item_body): + logger.debug(f"Filtering objects in bucket {self.bucket_name} by pattern...") + + names_of_relevant_objects = compose( + self.get_pattern_filter(queue_item_body), + self.get_names_of_all_associated_objects, + )(queue_item_body) + + logger.debug(f"Completed filtering.") + + return names_of_relevant_objects def download_and_decompress_object(self, object_names): download = partial(self.storage.get_object, self.bucket_name) return map(compose(decompress, download), object_names) - def download(self, queue_item_body): + def get_names_of_all_associated_objects(self, queue_item_body): + prefix = self.file_descriptor_manager.get_path_prefix(queue_item_body) + # TODO: performance tests for the following situations: + # 1) dossier with very many files + # 2) prefix matches very many files, independent of dossier cardinality + yield from self.storage.get_all_object_names(self.bucket_name, prefix=prefix) + + def get_pattern_filter(self, queue_item_body): file_pattern = self.file_descriptor_manager.build_input_matcher(queue_item_body) - - page_object_names = self.get_names_of_objects_by_pattern(file_pattern) - objects = self.download_and_decompress_object(page_object_names) - - return objects + matches_pattern = flift(file_pattern) + return matches_pattern diff --git a/test/config.yaml b/test/config.yaml index 894a84f..fc5ea30 100644 --- a/test/config.yaml +++ b/test/config.yaml @@ -8,19 +8,12 @@ service: output: subdir: op_outp_files extension: OUT.gz - single_inp_op: - input: - subdir: "" - extension: IN.gz - output: - subdir: op_outp_files - extension: OUT.gz default: input: subdir: "" extension: IN.gz output: - subdir: "" + subdir: op_outp_files extension: OUT.gz storage: diff --git a/test/conftest.py b/test/conftest.py index 43e3806..b5aa614 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,8 +1,6 @@ import json from pyinfra.config import CONFIG as MAIN_CONFIG -from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter -from pyinfra.visitor.strategies.blob_parsing.dynamic import DynamicParsingStrategy MAIN_CONFIG["retry"]["delay"] = 0.1 MAIN_CONFIG["retry"]["jitter"] = (0.1, 0.2) diff --git a/test/fixtures/input.py b/test/fixtures/input.py index 00e5c90..6ef1365 100644 --- a/test/fixtures/input.py +++ b/test/fixtures/input.py @@ -4,7 +4,7 @@ from itertools import starmap, repeat import numpy as np import pytest from PIL import Image -from funcy import lmap, compose, flatten, lflatten, omit, second, first, lzip +from funcy import lmap, compose, flatten, lflatten, omit, second, first, lzip, merge from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.normalization import normalize_item @@ -85,9 +85,7 @@ def targets(data_message_pairs, input_data_items, operation, metadata, server_si try: response_data, response_metadata = zip(*map(unpack, flatten(starmap(op, zip(input_data_items, metadata))))) - queue_message_keys = ["id"] * (not server_side_test) + [ - *first(queue_message_metadata).keys() - ] + queue_message_keys = ["id"] * (not server_side_test) + [*first(queue_message_metadata).keys()] response_metadata = lmap(partial(omit, keys=queue_message_keys), response_metadata) expected = lzip(response_data, response_metadata) @@ -150,12 +148,14 @@ def metadata(n_items, many_to_n): @pytest.fixture def queue_message_metadata(n_items, operation_name): def metadata(i): - return { - "dossierId": "folder", - "fileId": f"file{i}", - "pages": [0, 2, 3], - "operation": operation_name, - } + return merge( + { + "dossierId": "folder", + "fileId": f"file{i}", + "pages": [0, 2, 3], + }, + ({"operation": operation_name} if operation_name else {}), + ) return lmap(metadata, range(n_items)) diff --git a/test/fixtures/server.py b/test/fixtures/server.py index 4626deb..ebac679 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -52,7 +52,7 @@ def server(server_stream_function, buffer_size, operation_name): @pytest.fixture def operation_name(many_to_n): - return "multi_inp_op" if many_to_n else "single_inp_op" + return "multi_inp_op" if many_to_n else "" @pytest.fixture diff --git a/test/storage/adapter_mock.py b/test/storage/adapter_mock.py index 5443ade..6b60a27 100644 --- a/test/storage/adapter_mock.py +++ b/test/storage/adapter_mock.py @@ -26,5 +26,5 @@ class StorageAdapterMock(StorageAdapter): def clear_bucket(self, bucket_name): return self.__client.clear_bucket(bucket_name) - def get_all_object_names(self, bucket_name): + def get_all_object_names(self, bucket_name, prefix=None): return self.__client.get_all_object_names(bucket_name) diff --git a/test/unit_tests/server/pipeline_test.py b/test/unit_tests/server/pipeline_test.py index 0197d95..50d60e6 100644 --- a/test/unit_tests/server/pipeline_test.py +++ b/test/unit_tests/server/pipeline_test.py @@ -1,5 +1,5 @@ import pytest -from funcy import rcompose, compose, project, second, merge, lpluck +from funcy import rcompose, compose, project, merge from pyinfra.server.buffering.stream import FlatStreamBuffer from pyinfra.server.client_pipeline import ClientPipeline @@ -40,9 +40,9 @@ def test_pipeline( for storage_mdt, queue_mdt in zip(metadata, queue_message_metadata) ] - output = compose(llift(unpack), client_pipeline)(input_data_items, metadata) - assert n_items == 0 or len(output) > 0 - assert output == targets + outputs = compose(llift(unpack), client_pipeline)(input_data_items, metadata) + assert n_items == 0 or len(outputs) > 0 + assert outputs == targets @pytest.mark.parametrize("item_type", ["string"]) diff --git a/test/unit_tests/server/stream_buffer_test.py b/test/unit_tests/server/stream_buffer_test.py index 7d2f683..3f2fa98 100644 --- a/test/unit_tests/server/stream_buffer_test.py +++ b/test/unit_tests/server/stream_buffer_test.py @@ -35,7 +35,7 @@ def test_stream_buffer_catches_type_error(func, inputs, outputs): def test_flat_stream_buffer(func, inputs, outputs, buffer_size): flat_stream_buffer = FlatStreamBuffer(lift(func), buffer_size=buffer_size) - assert list(flat_stream_buffer(inputs)) == lflatten(outputs) + assert list(flat_stream_buffer(inputs)) == outputs assert list(flat_stream_buffer([])) == []