From af88e2d7d44499ca4f30288c777b015d4b38a6d5 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Wed, 22 Jun 2022 11:18:55 +0200 Subject: [PATCH] file pattern matching WIP: using of different operations to select file pattern config section -- semi-hardcoded multi input file operation. next: refactor fixtures in order to de-hardcode --- pyinfra/default_objects.py | 6 +- pyinfra/server/server.py | 97 ++++++++++++++------ pyinfra/visitor/strategies/download/multi.py | 18 ++-- pyinfra/visitor/utils.py | 2 - pyinfra/visitor/visitor.py | 1 - test/config.yaml | 6 +- test/fixtures/input.py | 5 +- test/fixtures/server.py | 9 +- test/integration_tests/serve_test.py | 32 ++++--- test/utils/input.py | 4 +- 10 files changed, 122 insertions(+), 58 deletions(-) diff --git a/pyinfra/default_objects.py b/pyinfra/default_objects.py index 2064d09..aa1e984 100644 --- a/pyinfra/default_objects.py +++ b/pyinfra/default_objects.py @@ -80,11 +80,15 @@ class ComponentFactory: def get_operation2file_patterns(self): return self.config.service.operations + @lru_cache(maxsize=None) + def get_file_descriptor_manager(self): + return FileDescriptorManager(self.get_operation2file_patterns()) + @lru_cache(maxsize=None) def get_download_strategy(self, download_strategy_type=None): download_strategies = { "single": SingleDownloadStrategy(), - "multi": MultiDownloadStrategy(FileDescriptorManager(self.get_operation2file_patterns())), + "multi": MultiDownloadStrategy(self.get_file_descriptor_manager()), } return download_strategies.get( download_strategy_type or self.config.service.download_strategy, SingleDownloadStrategy() diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index 8e471f4..767eb93 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -1,5 +1,5 @@ from functools import singledispatch -from typing import Dict +from typing import Dict, Union, Callable from flask import Flask, jsonify, request from funcy import merge @@ -10,41 +10,74 @@ from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction from pyinfra.server.stream.rest import LazyRestProcessor -def set_up_processing_server(server_stream_function, buffer_size): - flat_stream_buffer = FlatStreamBuffer(server_stream_function, buffer_size=buffer_size) - queued_stream_function = QueuedStreamFunction(flat_stream_buffer) - return __set_up_processing_server(queued_stream_function) - - -def build_endpoint_suffixes(op: str): - return {"submit_suffix": submit_suffix(op), "pickup_suffix": pickup_suffix(op)} - - -def submit_suffix(op: str): - return "submit" if not op else op - - -def pickup_suffix(op: str): - return "pickup" if not op else f"{op}_pickup" @singledispatch -def __set_up_processing_server(arg): +def set_up_processing_server(arg: Union[dict, Callable], buffer_size=1): + """Produces a processing server given a streamable function or a mapping from operations to streamable functions. + Streamable functions are constructed by calling pyinfra.server.utils.make_streamable_and_wrap_in_packing_logic on a + function taking a tuple of data and metadata and also returning a tuple or yielding tuples of data and metadata. + If the function doesn't produce data, data should be an empty byte string. + If the function doesn't produce metadata, metadata should be an empty dictionary. + + Args: + arg: streamable function or mapping of operations: str to streamable functions + buffer_size: If your function operates on batches this parameter controls how many items are aggregated before + your function is applied. + + TODO: buffer_size has to be controllable on per function basis. + + Returns: + Processing server: flask app + """ pass -@__set_up_processing_server.register -def _(operation2function: dict): - return __set_up_processing_server_impl(operation2function) +@set_up_processing_server.register +def _(operation2stream_fn: dict, buffer_size=1): + return __stream_fn_to_processing_server(operation2stream_fn, buffer_size) -@__set_up_processing_server.register -def _(queued_stream_function: object): - operation2function = {None: queued_stream_function} - return __set_up_processing_server_impl(operation2function) +@set_up_processing_server.register +def _(stream_fn: object, buffer_size=1): + operation2stream_fn = {None: stream_fn} + return __stream_fn_to_processing_server(operation2stream_fn, buffer_size) -def __set_up_processing_server_impl(operation2function: Dict[str, QueuedStreamFunction]): +def __stream_fn_to_processing_server(operation2stream_fn: dict, buffer_size): + operation2stream_fn = { + op: QueuedStreamFunction(FlatStreamBuffer(fn, buffer_size)) for op, fn in operation2stream_fn.items() + } + return __set_up_processing_server(operation2stream_fn) + + + + +# +# +# def set_up_processing_server(server_stream_function, buffer_size): +# flat_stream_buffer = FlatStreamBuffer(server_stream_function, buffer_size=buffer_size) +# queued_stream_function = QueuedStreamFunction(flat_stream_buffer) +# return __set_up_processing_server(queued_stream_function) +# +# +# @singledispatch +# def __set_up_processing_server(arg): +# pass +# +# +# @__set_up_processing_server.register +# def _(operation2function: dict): +# return __set_up_processing_server_impl(operation2function) +# +# +# @__set_up_processing_server.register +# def _(queued_stream_function: object): +# operation2function = {None: queued_stream_function} +# return __set_up_processing_server_impl(operation2function) + + +def __set_up_processing_server(operation2function: Dict[str, QueuedStreamFunction]): app = Flask(__name__) registry = CollectorRegistry(auto_describe=True) @@ -88,3 +121,15 @@ def __set_up_processing_server_impl(operation2function: Dict[str, QueuedStreamFu return operation2processor[operation].pop() return app + + +def build_endpoint_suffixes(op: str): + return {"submit_suffix": submit_suffix(op), "pickup_suffix": pickup_suffix(op)} + + +def submit_suffix(op: str): + return "submit" if not op else op + + +def pickup_suffix(op: str): + return "pickup" if not op else f"{op}_pickup" diff --git a/pyinfra/visitor/strategies/download/multi.py b/pyinfra/visitor/strategies/download/multi.py index 2cf16fa..5066532 100644 --- a/pyinfra/visitor/strategies/download/multi.py +++ b/pyinfra/visitor/strategies/download/multi.py @@ -21,7 +21,7 @@ class FileDescriptorManager: file_descriptor = self.build_file_descriptor(queue_item_body) file_descriptor["pages"] = [queue_item_body.get("id", 0)] - object_name = self.build_matcher(file_descriptor) + object_name = self.__build_matcher(file_descriptor) return object_name @@ -39,8 +39,10 @@ class FileDescriptorManager: return file_descriptor def build_matcher(self, queue_item_body): - file_descriptor = self.build_file_descriptor(queue_item_body) + return self.__build_matcher(file_descriptor) + + def __build_matcher(self, file_descriptor): dossier_id, file_id, subdir, pages, extension = itemgetter( "dossierId", "fileId", "subdir", "pages", "extension" @@ -57,6 +59,12 @@ class FileDescriptorManager: return matcher + def get_object_descriptor(self, queue_item_body): + return { + "bucket_name": parse_disjunction_string(CONFIG.storage.bucket), + "object_name": self.get_object_name(queue_item_body), + } + class MultiDownloadStrategy: def __init__(self, file_descriptor_manager: FileDescriptorManager): @@ -83,9 +91,3 @@ class MultiDownloadStrategy: objects = self.download_and_decompress_object(storage, page_object_names) return objects - - def get_object_descriptor(self, body): - return { - "bucket_name": parse_disjunction_string(CONFIG.storage.bucket), - "object_name": self.file_descriptor_manager.get_object_name(body), - } diff --git a/pyinfra/visitor/utils.py b/pyinfra/visitor/utils.py index 736f764..d39525e 100644 --- a/pyinfra/visitor/utils.py +++ b/pyinfra/visitor/utils.py @@ -4,8 +4,6 @@ from typing import Dict from pyinfra.config import CONFIG from pyinfra.exceptions import InvalidStorageItemFormat from pyinfra.server.packing import string_to_bytes -from pyinfra.visitor.strategies.download.multi import MultiDownloadStrategy -from pyinfra.visitor.strategies.download.single import SingleDownloadStrategy logger = logging.getLogger() diff --git a/pyinfra/visitor/visitor.py b/pyinfra/visitor/visitor.py index 760ab8a..b6bc851 100644 --- a/pyinfra/visitor/visitor.py +++ b/pyinfra/visitor/visitor.py @@ -2,7 +2,6 @@ from typing import Callable from funcy import lflatten, compose -from pyinfra.server.debugging import inspect from pyinfra.storage.storage import Storage from pyinfra.utils.func import lift from pyinfra.visitor.response_formatter.formatter import ResponseFormatter diff --git a/test/config.yaml b/test/config.yaml index 31aedc4..40fb706 100644 --- a/test/config.yaml +++ b/test/config.yaml @@ -1,9 +1,9 @@ service: response_formatter: identity operations: - default: + multi_inp_op: input: - subdir: pages + subdir: op_inp_files extension: IN.gz storage: @@ -32,4 +32,4 @@ webserver: mock_analysis_endpoint: "http://127.0.0.1:5000" use_docker_fixture: 0 -logging: 0 \ No newline at end of file +logging: 1 \ No newline at end of file diff --git a/test/fixtures/input.py b/test/fixtures/input.py index e635568..b6387ae 100644 --- a/test/fixtures/input.py +++ b/test/fixtures/input.py @@ -141,7 +141,10 @@ def images_to_bytes(images): @pytest.fixture -def metadata(n_items): +def metadata(n_items, many_to_n): + """storage metadata + TODO: rename + """ return list(repeat({"key": "value"}, times=n_items)) diff --git a/test/fixtures/server.py b/test/fixtures/server.py index 84346c5..4626deb 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -46,8 +46,13 @@ def url(host, port): @pytest.fixture -def server(server_stream_function, buffer_size): - return set_up_processing_server(server_stream_function, buffer_size) +def server(server_stream_function, buffer_size, operation_name): + return set_up_processing_server({operation_name: server_stream_function}, buffer_size) + + +@pytest.fixture +def operation_name(many_to_n): + return "multi_inp_op" if many_to_n else "single_inp_op" @pytest.fixture diff --git a/test/integration_tests/serve_test.py b/test/integration_tests/serve_test.py index 42c5c74..bd9e6ca 100644 --- a/test/integration_tests/serve_test.py +++ b/test/integration_tests/serve_test.py @@ -87,7 +87,7 @@ from test.utils.input import pair_data_with_queue_message ) def test_serving(server_process, bucket_name, components, targets, data_message_pairs, n_items, many_to_n): - storage, queue_manager, consumer, download_strategy = components + storage, queue_manager, consumer, download_strategy, file_descriptor_manager = components assert queue_manager.input_queue.to_list() == [] assert queue_manager.output_queue.to_list() == [] @@ -98,11 +98,11 @@ def test_serving(server_process, bucket_name, components, targets, data_message_ if many_to_n: upload_data_to_folder_in_storage_and_publish_single_request_to_queue( - storage, queue_manager, data_message_pairs, download_strategy + storage, queue_manager, data_message_pairs, file_descriptor_manager ) else: upload_data_to_storage_and_publish_requests_to_queue( - storage, queue_manager, data_message_pairs, download_strategy + storage, queue_manager, data_message_pairs, file_descriptor_manager ) consumer.consume_and_publish(n=int(many_to_n) or n_items) @@ -125,20 +125,24 @@ def data_message_pairs(data_metadata_packs): # TODO: refactor; too many params -def upload_data_to_storage_and_publish_requests_to_queue(storage, queue_manager, data_message_pairs, download_strategy): +def upload_data_to_storage_and_publish_requests_to_queue( + storage, queue_manager, data_message_pairs, file_descriptor_manager +): for data, message in data_message_pairs: - upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message, download_strategy) + upload_data_to_storage_and_publish_request_to_queue( + storage, queue_manager, data, message, file_descriptor_manager + ) # TODO: refactor; too many params -def upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message, download_strategy): - storage.put_object(**download_strategy.get_object_descriptor(message), data=compress(data)) +def upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message, file_descriptor_manager): + storage.put_object(**file_descriptor_manager.get_object_descriptor(message), data=compress(data)) queue_manager.publish_request(message) # TODO: refactor body; too long and scripty def upload_data_to_folder_in_storage_and_publish_single_request_to_queue( - storage, queue_manager, data_message_pairs, download_strategy + storage, queue_manager, data_message_pairs, file_descriptor_manager ): assert data_message_pairs @@ -146,7 +150,7 @@ def upload_data_to_folder_in_storage_and_publish_single_request_to_queue( pages = ref_message["pages"] for data, page in zip(map(first, data_message_pairs), pages): - object_descriptor = download_strategy.get_object_descriptor(ref_message) + object_descriptor = file_descriptor_manager.get_object_descriptor(ref_message) object_descriptor["object_name"] = build_filepath(object_descriptor, page) storage.put_object(**object_descriptor, data=compress(data)) @@ -179,13 +183,13 @@ def components(components_type, real_components, test_components, bucket_name): else: raise ValueError(f"Unknown component type '{components_type}'.") - storage, queue_manager, consumer, download_strategy = components + storage, queue_manager, consumer, download_strategy, file_descriptor_manager = components queue_manager.clear() storage.make_bucket(bucket_name) storage.clear_bucket(bucket_name) - yield storage, queue_manager, consumer, download_strategy + yield storage, queue_manager, consumer, download_strategy, file_descriptor_manager queue_manager.clear() # storage.clear_bucket(bucket_name) @@ -216,11 +220,12 @@ def real_components(url, many_to_n): consumer = component_factory.get_consumer(callback) queue_manager = component_factory.get_queue_manager() storage = component_factory.get_storage() + file_descriptor_manager = component_factory.get_file_descriptor_manager() download_strategy = component_factory.get_download_strategy("multi" if many_to_n else "single") consumer.visitor.download_strategy = download_strategy - return storage, queue_manager, consumer, download_strategy + return storage, queue_manager, consumer, download_strategy, file_descriptor_manager @pytest.fixture @@ -229,6 +234,7 @@ def test_components(url, queue_manager, storage, many_to_n): component_factory = ComponentFactory(CONFIG) download_strategy = component_factory.get_download_strategy("multi" if many_to_n else "single") + file_descriptor_manager = component_factory.get_file_descriptor_manager() visitor = QueueVisitor( storage=storage, @@ -238,4 +244,4 @@ def test_components(url, queue_manager, storage, many_to_n): ) consumer = Consumer(visitor, queue_manager) - return storage, queue_manager, consumer, download_strategy + return storage, queue_manager, consumer, download_strategy, file_descriptor_manager diff --git a/test/utils/input.py b/test/utils/input.py index 99235f4..540aef1 100644 --- a/test/utils/input.py +++ b/test/utils/input.py @@ -1,13 +1,15 @@ from typing import Iterable +# TODO: make into fixture def pair_data_with_queue_message(data: Iterable[bytes]): def inner(): for i, d in enumerate(data): body = { "dossierId": "folder", "fileId": f"file{i}", - "pages": [0, 2, 3] + "pages": [0, 2, 3], + "operation": "multi_inp_op", # TODO: de-hardcode } yield d, body