diff --git a/README.md b/README.md index 33f8e90..11336d2 100755 --- a/README.md +++ b/README.md @@ -36,8 +36,6 @@ A configuration is located in `/config.yaml`. All relevant variables can be conf { "dossierId": "", "fileId": "", - "targetFileExtension": "", - "responseFileExtension": "" } ``` diff --git a/config.yaml b/config.yaml index 4a52b80..3cd4291 100755 --- a/config.yaml +++ b/config.yaml @@ -1,13 +1,38 @@ service: logging_level: $LOGGING_LEVEL_ROOT|DEBUG # Logging level for service logger - target_file_extension: $TARGET_FILE_EXTENSION|"ORIGIN.pdf.gz" # Extension for files to download from storage and process - response_file_extension: $RESPONSE_FILE_EXTENSION|"json.gz" # Extension for response files to upload to storage - # In case a service is not supposed to place response files under the root folder `dossierId/fileId` and requests do - # not specify an operation, this parameter can specify an output folder - response_folder: $RESPONSE_FOLDER|null - # Specifies, how to handle the `page` key of a request. "multi" will download all pages matching the list of pages - # specified in the request - download_strategy: $DOWNLOAD_STRATEGY|single + name: $SERVICE_NAME|research # Default service name for research service, used for prometheus metric name + response_formatter: default # formats analysis payloads of response messages + upload_formatter: projecting # formats analysis payloads of objects uploaded to storage + # Note: This is not really the right place for this. It should be configured on a per-service basis. + operations: + conversion: + input: + subdir: "" + extension: ORIGIN.pdf.gz + output: + subdir: "pages_as_images" + extension: json.gz + extraction: + input: + subdir: "" + extension: ORIGIN.pdf.gz + output: + subdir: "extracted_images" + extension: json.gz + table_parsing: + input: + subdir: "pages_as_images" + extension: json.gz + output: + subdir: "table_parses" + extension: json.gz + default: + input: + subdir: "" + extension: IN.gz + output: + subdir: "" + extension: out.gz probing_webserver: host: $PROBING_WEBSERVER_HOST|"0.0.0.0" # Probe webserver address diff --git a/pyinfra/callback.py b/pyinfra/callback.py new file mode 100644 index 0000000..0b3d817 --- /dev/null +++ b/pyinfra/callback.py @@ -0,0 +1,76 @@ +import logging + +from funcy import merge, omit, lmap + +from pyinfra.exceptions import AnalysisFailure + +logger = logging.getLogger(__name__) + + +class Callback: + """This is the callback that is applied to items pulled from the storage. It forwards these items to an analysis + endpoint. + """ + + def __init__(self, base_url, pipeline_factory): + self.base_url = base_url + self.pipeline_factory = pipeline_factory + self.endpoint2pipeline = {} + + def __make_endpoint(self, operation): + return f"{self.base_url}/{operation}" + + def __get_pipeline(self, endpoint): + if endpoint in self.endpoint2pipeline: + pipeline = self.endpoint2pipeline[endpoint] + + else: + pipeline = self.pipeline_factory(endpoint) + self.endpoint2pipeline[endpoint] = pipeline + + return pipeline + + @staticmethod + def __run_pipeline(pipeline, body): + """ + TODO: Since data and metadata are passed as singletons, there is no buffering and hence no batching happening + within the pipeline. However, the queue acknowledgment logic needs to be changed in order to facilitate + passing non-singletons, to only ack a message, once a response is pulled from the output queue of the + pipeline. Probably the pipeline return value needs to contains the queue message frame (or so), in order for + the queue manager to tell which message to ack. + + TODO: casting list (lmap) on `analysis_response_stream` is a temporary solution, while the client pipeline + operates on singletons ([data], [metadata]). + """ + + def combine_storage_item_metadata_with_queue_message_metadata(body): + return merge(body["metadata"], omit(body, ["data", "metadata"])) + + def remove_queue_message_metadata(result): + metadata = omit(result["metadata"], queue_message_keys(body)) + return {**result, "metadata": metadata} + + def queue_message_keys(body): + return {*body.keys()}.difference({"data", "metadata"}) + + try: + data = body["data"] + metadata = combine_storage_item_metadata_with_queue_message_metadata(body) + analysis_response_stream = pipeline([data], [metadata]) + analysis_response_stream = lmap(remove_queue_message_metadata, analysis_response_stream) + return analysis_response_stream + + except Exception as err: + logger.error(err) + raise AnalysisFailure from err + + def __call__(self, body: dict): + operation = body.get("operation", "submit") + endpoint = self.__make_endpoint(operation) + pipeline = self.__get_pipeline(endpoint) + + try: + logging.debug(f"Requesting analysis from {endpoint}...") + return self.__run_pipeline(pipeline, body) + except AnalysisFailure: + logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.") diff --git a/pyinfra/component_factory.py b/pyinfra/component_factory.py new file mode 100644 index 0000000..17733e0 --- /dev/null +++ b/pyinfra/component_factory.py @@ -0,0 +1,109 @@ +import logging +from functools import lru_cache + +from funcy import project, identity, rcompose + +from pyinfra.callback import Callback +from pyinfra.config import parse_disjunction_string +from pyinfra.file_descriptor_manager import FileDescriptorManager +from pyinfra.queue.consumer import Consumer +from pyinfra.queue.queue_manager.pika_queue_manager import PikaQueueManager +from pyinfra.server.client_pipeline import ClientPipeline +from pyinfra.server.dispatcher.dispatchers.rest import RestDispatcher +from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStreamer +from pyinfra.server.packer.packers.rest import RestPacker +from pyinfra.server.receiver.receivers.rest import RestReceiver +from pyinfra.storage import storages +from pyinfra.visitor import QueueVisitor +from pyinfra.visitor.downloader import Downloader +from pyinfra.visitor.response_formatter.formatters.default import DefaultResponseFormatter +from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter +from pyinfra.visitor.strategies.response.aggregation import AggregationStorageStrategy, ProjectingUploadFormatter + +logger = logging.getLogger(__name__) + + +class ComponentFactory: + def __init__(self, config): + self.config = config + + @lru_cache(maxsize=None) + def get_consumer(self, callback=None): + callback = callback or self.get_callback() + return Consumer(self.get_visitor(callback), self.get_queue_manager()) + + @lru_cache(maxsize=None) + def get_callback(self, analysis_base_url=None): + analysis_base_url = analysis_base_url or self.config.rabbitmq.callback.analysis_endpoint + + callback = Callback(analysis_base_url, self.get_pipeline) + + def wrapped(body): + body_repr = project(body, ["dossierId", "fileId", "operation"]) + logger.info(f"Processing {body_repr}...") + result = callback(body) + logger.info(f"Completed processing {body_repr}...") + return result + + return wrapped + + @lru_cache(maxsize=None) + def get_visitor(self, callback): + return QueueVisitor( + callback=callback, + data_loader=self.get_downloader(), + response_strategy=self.get_response_strategy(), + response_formatter=self.get_response_formatter(), + ) + + @lru_cache(maxsize=None) + def get_queue_manager(self): + return PikaQueueManager(self.config.rabbitmq.queues.input, self.config.rabbitmq.queues.output) + + @lru_cache(maxsize=None) + def get_storage(self): + return storages.get_storage(self.config.storage.backend) + + @lru_cache(maxsize=None) + def get_response_strategy(self, storage=None): + return AggregationStorageStrategy( + storage=storage or self.get_storage(), + file_descriptor_manager=self.get_file_descriptor_manager(), + upload_formatter=self.get_upload_formatter(), + ) + + @lru_cache(maxsize=None) + def get_file_descriptor_manager(self): + return FileDescriptorManager( + bucket_name=parse_disjunction_string(self.config.storage.bucket), + operation2file_patterns=self.get_operation2file_patterns(), + ) + + @lru_cache(maxsize=None) + def get_upload_formatter(self): + return {"identity": identity, "projecting": ProjectingUploadFormatter()}[self.config.service.upload_formatter] + + @lru_cache(maxsize=None) + def get_response_formatter(self): + return {"default": DefaultResponseFormatter(), "identity": IdentityResponseFormatter()}[ + self.config.service.response_formatter + ] + + @lru_cache(maxsize=None) + def get_operation2file_patterns(self): + return self.config.service.operations + + @lru_cache(maxsize=None) + def get_downloader(self, storage=None): + return Downloader( + storage=storage or self.get_storage(), + bucket_name=parse_disjunction_string(self.config.storage.bucket), + file_descriptor_manager=self.get_file_descriptor_manager(), + ) + + @staticmethod + @lru_cache(maxsize=None) + def get_pipeline(endpoint): + return ClientPipeline( + RestPacker(), RestDispatcher(endpoint), RestReceiver(), rcompose(RestPickupStreamer(), RestReceiver()) + ) diff --git a/pyinfra/config.py b/pyinfra/config.py index 3e55ea8..6e82b61 100644 --- a/pyinfra/config.py +++ b/pyinfra/config.py @@ -1,10 +1,13 @@ """Implements a config object with dot-indexing syntax.""" import os +from functools import partial from itertools import chain from operator import truth +from typing import Iterable from envyaml import EnvYAML -from funcy import first, juxt, butlast, last +from frozendict import frozendict +from funcy import first, juxt, butlast, last, lmap from pyinfra.locations import CONFIG_FILE @@ -45,6 +48,25 @@ class Config: def __setitem__(self, key, value): self.__config.key = value + def to_dict(self, frozen=True): + return to_dict(self.__config.export(), frozen=frozen) + + def __hash__(self): + return hash(self.to_dict()) + + +def to_dict(v, frozen=True): + def make_dict(*args, **kwargs): + return (frozendict if frozen else dict)(*args, **kwargs) + + if isinstance(v, list): + return tuple(map(partial(to_dict, frozen=frozen), v)) + elif isinstance(v, DotIndexable): + return make_dict({k: to_dict(v, frozen=frozen) for k, v in v.x.items()}) + elif isinstance(v, dict): + return make_dict({k: to_dict(v, frozen=frozen) for k, v in v.items()}) + else: + return v CONFIG = Config(CONFIG_FILE) diff --git a/pyinfra/default_objects.py b/pyinfra/default_objects.py index 9026b9f..2dec7c6 100644 --- a/pyinfra/default_objects.py +++ b/pyinfra/default_objects.py @@ -1,122 +1,8 @@ -import logging from functools import lru_cache -from funcy import rcompose, omit, merge, lmap - -from pyinfra.config import CONFIG -from pyinfra.exceptions import AnalysisFailure -from pyinfra.queue.consumer import Consumer -from pyinfra.queue.queue_manager.pika_queue_manager import PikaQueueManager -from pyinfra.server.client_pipeline import ClientPipeline -from pyinfra.server.dispatcher.dispatchers.rest import RestDispatcher -from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStreamer -from pyinfra.server.packer.packers.rest import RestPacker -from pyinfra.server.receiver.receivers.rest import RestReceiver -from pyinfra.storage import storages -from pyinfra.visitor import QueueVisitor, AggregationStorageStrategy - -logger = logging.getLogger(__name__) -logger.setLevel(logging.ERROR) +from pyinfra.component_factory import ComponentFactory @lru_cache(maxsize=None) -def get_consumer(callback=None): - callback = callback or get_callback() - return Consumer(get_visitor(callback), get_queue_manager()) - - -@lru_cache(maxsize=None) -def get_visitor(callback): - return QueueVisitor(storage=get_storage(), callback=callback, response_strategy=get_response_strategy()) - - -@lru_cache(maxsize=None) -def get_queue_manager(): - return PikaQueueManager(CONFIG.rabbitmq.queues.input, CONFIG.rabbitmq.queues.output) - - -@lru_cache(maxsize=None) -def get_storage(): - return storages.get_storage(CONFIG.storage.backend) - - -@lru_cache(maxsize=None) -def get_callback(analysis_base_url=None): - analysis_base_url = analysis_base_url or CONFIG.rabbitmq.callback.analysis_endpoint - return Callback(analysis_base_url) - - -@lru_cache(maxsize=None) -def get_response_strategy(storage=None): - return AggregationStorageStrategy(storage or get_storage()) - - -class Callback: - def __init__(self, base_url): - self.base_url = base_url - self.endpoint2pipeline = {} - - def __make_endpoint(self, operation): - return f"{self.base_url}/{operation}" - - def __get_pipeline(self, endpoint): - if endpoint in self.endpoint2pipeline: - pipeline = self.endpoint2pipeline[endpoint] - - else: - pipeline = get_pipeline(endpoint) - self.endpoint2pipeline[endpoint] = pipeline - - return pipeline - - @staticmethod - def __run_pipeline(pipeline, body): - """ - TODO: since data and metadata are passed as singletons, there is no buffering and hence no batching happening - within the pipeline. However, the queue acknowledgment logic needs to be changed in order to facilitate - passing non-singletons, to only ack a message, once a response is pulled from the output queue of the - pipeline. Probably the pipeline return value needs to contains the queue message frame (or so), in order for - the queue manager to tell which message to ack. - - TODO: casting list (lmap) on `analysis_response_stream` is a temporary solution, while the client pipeline - operates on singletons ([data], [metadata]). - """ - - def combine_storage_item_metadata_with_queue_message_metadata(body): - return merge(body["metadata"], omit(body, ["data", "metadata"])) - - def remove_queue_message_metadata(result): - metadata = omit(result["metadata"], queue_message_keys(body)) - return {**result, "metadata": metadata} - - def queue_message_keys(body): - return {*body.keys()}.difference({"data", "metadata"}) - - try: - data = body["data"] - metadata = combine_storage_item_metadata_with_queue_message_metadata(body) - analysis_response_stream = pipeline([data], [metadata]) - analysis_response_stream = lmap(remove_queue_message_metadata, analysis_response_stream) - return analysis_response_stream - - except Exception as err: - logger.error(err) - raise AnalysisFailure from err - - def __call__(self, body: dict): - operation = body.get("operation", "submit") - endpoint = self.__make_endpoint(operation) - pipeline = self.__get_pipeline(endpoint) - - try: - logging.debug(f"Requesting analysis from {endpoint}...") - return self.__run_pipeline(pipeline, body) - except AnalysisFailure: - logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.") - - -@lru_cache(maxsize=None) -def get_pipeline(endpoint): - return ClientPipeline( - RestPacker(), RestDispatcher(endpoint), RestReceiver(), rcompose(RestPickupStreamer(), RestReceiver()) - ) +def get_component_factory(config): + return ComponentFactory(config) diff --git a/pyinfra/exceptions.py b/pyinfra/exceptions.py index 0fbbe54..5857415 100644 --- a/pyinfra/exceptions.py +++ b/pyinfra/exceptions.py @@ -44,3 +44,7 @@ class NoBufferCapacity(ValueError): class InvalidMessage(ValueError): pass + + +class InvalidStorageItemFormat(ValueError): + pass diff --git a/pyinfra/file_descriptor_manager.py b/pyinfra/file_descriptor_manager.py new file mode 100644 index 0000000..ab2f18e --- /dev/null +++ b/pyinfra/file_descriptor_manager.py @@ -0,0 +1,89 @@ +import os +from _operator import itemgetter + +from funcy import project + + +class FileDescriptorManager: + def __init__(self, bucket_name, operation2file_patterns: dict = None): + self.bucket_name = bucket_name + self.operation2file_patterns = operation2file_patterns or self.get_default_operation2file_patterns() + + @staticmethod + def get_default_operation2file_patterns(): + return {"default": {"input": {"subdir": "", "extension": ".in"}, "output": {"subdir": "", "extension": ".out"}}} + + def get_input_object_name(self, queue_item_body: dict): + return self.get_object_name(queue_item_body, end="input") + + def get_output_object_name(self, queue_item_body: dict): + return self.get_object_name(queue_item_body, end="output") + + def get_object_name(self, queue_item_body: dict, end): + + file_descriptor = self.build_file_descriptor(queue_item_body, end=end) + file_descriptor["pages"] = [queue_item_body.get("id", 0)] + + object_name = self.__build_matcher(file_descriptor) + + return object_name + + @staticmethod + def __build_matcher(file_descriptor): + def make_filename(file_id, subdir, suffix): + return os.path.join(file_id, subdir, suffix) if subdir else f"{file_id}.{suffix}" + + dossier_id, file_id, subdir, pages, extension = itemgetter( + "dossierId", "fileId", "subdir", "pages", "extension" + )(file_descriptor) + + if len(pages) > 1 and subdir: + page_re = "id:(" + "|".join(map(str, pages)) + ")." + elif len(pages) == 1 and subdir: + page_re = f"id:{pages[0]}." + else: + page_re = "" + + matcher = os.path.join(dossier_id, make_filename(file_id, subdir, page_re + extension)) + + return matcher + + def build_file_descriptor(self, queue_item_body, end="input"): + operation = queue_item_body.get("operation", "default") + + file_pattern = self.operation2file_patterns[operation][end] + + file_descriptor = { + **project(queue_item_body, ["dossierId", "fileId", "pages"]), + "pages": queue_item_body.get("pages", []), + "extension": file_pattern["extension"], + "subdir": file_pattern["subdir"], + } + return file_descriptor + + def build_input_matcher(self, queue_item_body): + return self.build_matcher(queue_item_body, end="input") + + def build_output_matcher(self, queue_item_body): + return self.build_matcher(queue_item_body, end="output") + + def build_matcher(self, queue_item_body, end): + file_descriptor = self.build_file_descriptor(queue_item_body, end=end) + return self.__build_matcher(file_descriptor) + + def get_input_object_descriptor(self, queue_item_body): + return self.get_object_descriptor(queue_item_body, end="input") + + def get_output_object_descriptor(self, storage_upload_info): + return self.get_object_descriptor(storage_upload_info, end="output") + + def get_object_descriptor(self, queue_item_body, end): + return { + "bucket_name": self.bucket_name, + "object_name": self.get_object_name(queue_item_body, end=end), + } + + @staticmethod + def build_storage_upload_info(analysis_payload, request_metadata): + storage_upload_info = {**request_metadata, "id": analysis_payload["metadata"].get("id", 0)} + return storage_upload_info diff --git a/pyinfra/parser/parser_composer.py b/pyinfra/parser/parser_composer.py index a0b7bc9..0e3f931 100644 --- a/pyinfra/parser/parser_composer.py +++ b/pyinfra/parser/parser_composer.py @@ -29,9 +29,9 @@ class EitherParserWrapper: def __log(self, result): if isinstance(result, Right): - logger.debug(f"{self.parser.__class__.__name__} succeeded or forwarded on {result.bind()}") + logger.log(logging.DEBUG - 5, f"{self.parser.__class__.__name__} succeeded or forwarded on {result.bind()}") else: - logger.debug(f"{self.parser.__class__.__name__} failed on {result.bind()}") + logger.log(logging.DEBUG - 5, f"{self.parser.__class__.__name__} failed on {result.bind()}") return result def parse(self, item: Either): diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index 8e471f4..7c7553a 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -1,50 +1,56 @@ from functools import singledispatch -from typing import Dict +from typing import Dict, Callable, Union from flask import Flask, jsonify, request from funcy import merge from prometheus_client import generate_latest, Summary, CollectorRegistry +from pyinfra.config import CONFIG from pyinfra.server.buffering.stream import FlatStreamBuffer from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction from pyinfra.server.stream.rest import LazyRestProcessor -def set_up_processing_server(server_stream_function, buffer_size): - flat_stream_buffer = FlatStreamBuffer(server_stream_function, buffer_size=buffer_size) - queued_stream_function = QueuedStreamFunction(flat_stream_buffer) - return __set_up_processing_server(queued_stream_function) - - -def build_endpoint_suffixes(op: str): - return {"submit_suffix": submit_suffix(op), "pickup_suffix": pickup_suffix(op)} - - -def submit_suffix(op: str): - return "submit" if not op else op - - -def pickup_suffix(op: str): - return "pickup" if not op else f"{op}_pickup" - - @singledispatch -def __set_up_processing_server(arg): +def set_up_processing_server(arg: Union[dict, Callable], buffer_size=1): + """Produces a processing server given a streamable function or a mapping from operations to streamable functions. + Streamable functions are constructed by calling pyinfra.server.utils.make_streamable_and_wrap_in_packing_logic on a + function taking a tuple of data and metadata and also returning a tuple or yielding tuples of data and metadata. + If the function doesn't produce data, data should be an empty byte string. + If the function doesn't produce metadata, metadata should be an empty dictionary. + + Args: + arg: streamable function or mapping of operations: str to streamable functions + buffer_size: If your function operates on batches this parameter controls how many items are aggregated before + your function is applied. + + TODO: buffer_size has to be controllable on per function basis. + + Returns: + Processing server: flask app + """ pass -@__set_up_processing_server.register -def _(operation2function: dict): - return __set_up_processing_server_impl(operation2function) +@set_up_processing_server.register +def _(operation2stream_fn: dict, buffer_size=1): + return __stream_fn_to_processing_server(operation2stream_fn, buffer_size) -@__set_up_processing_server.register -def _(queued_stream_function: object): - operation2function = {None: queued_stream_function} - return __set_up_processing_server_impl(operation2function) +@set_up_processing_server.register +def _(stream_fn: object, buffer_size=1): + operation2stream_fn = {None: stream_fn} + return __stream_fn_to_processing_server(operation2stream_fn, buffer_size) -def __set_up_processing_server_impl(operation2function: Dict[str, QueuedStreamFunction]): +def __stream_fn_to_processing_server(operation2stream_fn: dict, buffer_size): + operation2stream_fn = { + op: QueuedStreamFunction(FlatStreamBuffer(fn, buffer_size)) for op, fn in operation2stream_fn.items() + } + return __set_up_processing_server(operation2stream_fn) + + +def __set_up_processing_server(operation2function: Dict[str, QueuedStreamFunction]): app = Flask(__name__) registry = CollectorRegistry(auto_describe=True) @@ -53,8 +59,9 @@ def __set_up_processing_server_impl(operation2function: Dict[str, QueuedStreamFu } def make_summary_instance(op: str): - op = op.replace("_pickup", "") - return Summary(f"redactmanager_{op}_seconds", f"Time spent on {op}.", registry=registry) + op = op.replace("_pickup", "") if op != "pickup" else "default" + service_name = CONFIG.service.name + return Summary(f"redactmanager_{service_name}_{op}_seconds", f"Time spent on {op}.", registry=registry) submit_operation2processor = {submit_suffix(op): prc for op, prc in operation2processor.items()} pickup_operation2processor = {pickup_suffix(op): prc for op, prc in operation2processor.items()} @@ -88,3 +95,15 @@ def __set_up_processing_server_impl(operation2function: Dict[str, QueuedStreamFu return operation2processor[operation].pop() return app + + +def build_endpoint_suffixes(op: str): + return {"submit_suffix": submit_suffix(op), "pickup_suffix": pickup_suffix(op)} + + +def submit_suffix(op: str): + return "submit" if not op else op + + +def pickup_suffix(op: str): + return "pickup" if not op else f"{op}_pickup" diff --git a/pyinfra/storage/adapters/azure.py b/pyinfra/storage/adapters/azure.py index 9f908d8..b3d498c 100644 --- a/pyinfra/storage/adapters/azure.py +++ b/pyinfra/storage/adapters/azure.py @@ -1,5 +1,4 @@ import logging -from itertools import repeat from operator import attrgetter from azure.storage.blob import ContainerClient, BlobServiceClient @@ -20,15 +19,15 @@ class AzureStorageAdapter(StorageAdapter): container_client = self.__client.get_container_client(bucket_name) return container_client.exists() - def make_bucket(self, bucket_name): - container_client = self.__client.get_container_client(bucket_name) - container_client if container_client.exists() else self.__client.create_container(bucket_name) - def __provide_container_client(self, bucket_name) -> ContainerClient: self.make_bucket(bucket_name) container_client = self.__client.get_container_client(bucket_name) return container_client + def make_bucket(self, bucket_name): + container_client = self.__client.get_container_client(bucket_name) + container_client if container_client.exists() else self.__client.create_container(bucket_name) + def put_object(self, bucket_name, object_name, data): logger.debug(f"Uploading '{object_name}'...") container_client = self.__provide_container_client(bucket_name) diff --git a/pyinfra/utils/encoding.py b/pyinfra/utils/encoding.py index d6ed7d3..57f60bb 100644 --- a/pyinfra/utils/encoding.py +++ b/pyinfra/utils/encoding.py @@ -1,8 +1,14 @@ import gzip import json -from pyinfra.server.packing import bytes_to_string + +def pack_analysis_payload(analysis_payload): + return compress(json.dumps(analysis_payload).encode()) -def pack_for_upload(data: bytes): - return gzip.compress(json.dumps(bytes_to_string(data)).encode()) +def compress(data: bytes): + return gzip.compress(data) + + +def decompress(data: bytes): + return gzip.decompress(data) diff --git a/pyinfra/utils/func.py b/pyinfra/utils/func.py index cf5e308..163f802 100644 --- a/pyinfra/utils/func.py +++ b/pyinfra/utils/func.py @@ -1,6 +1,6 @@ from itertools import starmap, tee -from funcy import curry, compose +from funcy import curry, compose, filter def lift(fn): @@ -36,6 +36,10 @@ def foreach(fn, iterable): fn(itm) +def flift(pred): + return curry(filter)(pred) + + def parallel_map(f1, f2): """Applies functions to a stream in parallel and yields a stream of tuples: parallel_map :: a -> b, a -> c -> [a] -> [(b, c)] diff --git a/pyinfra/visitor.py b/pyinfra/visitor.py deleted file mode 100644 index 554abcc..0000000 --- a/pyinfra/visitor.py +++ /dev/null @@ -1,361 +0,0 @@ -import abc -import gzip -import json -import logging -from collections import deque -from copy import deepcopy -from operator import itemgetter -from typing import Callable, Dict, Union - -from funcy import omit, filter, lflatten -from more_itertools import peekable - -from pyinfra.config import CONFIG, parse_disjunction_string -from pyinfra.exceptions import DataLoadingFailure, InvalidMessage -from pyinfra.parser.parser_composer import EitherParserComposer -from pyinfra.parser.parsers.identity import IdentityBlobParser -from pyinfra.parser.parsers.json import JsonBlobParser -from pyinfra.parser.parsers.string import StringBlobParser -from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing -from pyinfra.server.packing import string_to_bytes -from pyinfra.storage.storage import Storage - -logger = logging.getLogger(__name__) - - -class ResponseStrategy(abc.ABC): - @abc.abstractmethod - def handle_response(self, analysis_response: dict): - pass - - def __call__(self, analysis_response: dict): - return self.handle_response(analysis_response) - - def get_response_object_descriptor(self, body): - return { - "bucket_name": parse_disjunction_string(CONFIG.storage.bucket), - "object_name": self.get_response_object_name(body), - } - - @staticmethod - def get_response_object_name(body): - - if "pages" not in body: - body["pages"] = [] - - if "id" not in body: - body["id"] = 0 - - dossier_id, file_id, pages, idnt = itemgetter("dossierId", "fileId", "pages", "id")(body) - - object_name = f"{dossier_id}/{file_id}/id:{idnt}.{CONFIG.service.response_file_extension}" - - return object_name - - -class StorageStrategy(ResponseStrategy): - def __init__(self, storage): - self.storage = storage - - def handle_response(self, body: dict): - response_object_descriptor = self.get_response_object_descriptor(body) - self.storage.put_object(**response_object_descriptor, data=gzip.compress(json.dumps(body).encode())) - body.pop("data") - body["responseFile"] = response_object_descriptor["object_name"] - return body - - -class ForwardingStrategy(ResponseStrategy): - def handle_response(self, analysis_response): - return analysis_response - - -class DispatchCallback(abc.ABC): - @abc.abstractmethod - def __call__(self, payload): - pass - - -class IdentifierDispatchCallback(DispatchCallback): - def __init__(self): - self.identifier = None - - def has_new_identifier(self, metadata): - - identifier = ":".join(itemgetter("fileId", "dossierId")(metadata)) - - if not self.identifier: - self.identifier = identifier - - return identifier != self.identifier - - def __call__(self, metadata): - - return self.has_new_identifier(metadata) - - -class AggregationStorageStrategy(ResponseStrategy): - def __init__(self, storage, merger: Callable = None, dispatch_callback: DispatchCallback = None): - self.storage = storage - self.merger = merger or list - self.dispatch_callback = dispatch_callback or IdentifierDispatchCallback() - self.buffer = deque() - - def put_object(self, data: bytes, storage_upload_info): - object_descriptor = self.get_response_object_descriptor(storage_upload_info) - self.storage.put_object(**object_descriptor, data=gzip.compress(data)) - return {**storage_upload_info, "responseFile": object_descriptor["object_name"]} - - def merge_queue_items(self): - merged_buffer_content = self.merger(self.buffer) - self.buffer.clear() - return merged_buffer_content - - def upload_queue_items(self, storage_upload_info): - data = json.dumps(self.merge_queue_items()).encode() - return self.put_object(data, storage_upload_info) - - def upload_or_aggregate(self, analysis_payload, request_metadata, last=False): - """analysis_payload : {data: ..., metadata: ...}""" - - storage_upload_info = build_storage_upload_info(analysis_payload, request_metadata) - analysis_payload["metadata"].pop("id") - - if analysis_payload["data"]: - return self.put_object(json.dumps(analysis_payload).encode(), storage_upload_info) - - else: - self.buffer.append(analysis_payload) - if last or self.dispatch_callback(storage_upload_info): - return self.upload_queue_items(storage_upload_info) - else: - return Nothing - - def handle_response(self, analysis_response, final=False): - def upload_or_aggregate(analysis_payload): - return self.upload_or_aggregate(analysis_payload, request_metadata, last=not result_data.peek(False)) - - request_metadata = omit(analysis_response, ["data"]) - result_data = peekable(analysis_response["data"]) - - yield from filter(is_not_nothing, map(upload_or_aggregate, result_data)) - - -def build_storage_upload_info(analysis_payload, request_metadata): - storage_upload_info = {**request_metadata, "id": analysis_payload["metadata"].get("id", 0)} - storage_upload_info["fileId"] = build_file_path( - storage_upload_info, storage_upload_info.get("operation", CONFIG.service.response_folder) - ) - return storage_upload_info - - -def build_file_path(storage_upload_info, folder): - return f"{storage_upload_info['fileId']}" + (f"/{folder}" if folder else "") - - -class InvalidStorageItemFormat(ValueError): - pass - - -class ParsingStrategy(abc.ABC): - @abc.abstractmethod - def parse(self, data: bytes): - pass - - @abc.abstractmethod - def parse_and_wrap(self, data: bytes): - pass - - def __call__(self, data: bytes): - return self.parse_and_wrap(data) - - -# TODO: Each analysis service should specify a custom parsing strategy for the type of data it expects to be found -# on the storage. This class is only a temporary trial-and-error->fallback type of solution. -class DynamicParsingStrategy(ParsingStrategy): - def __init__(self): - self.parser = EitherParserComposer(JsonBlobParser(), StringBlobParser(), IdentityBlobParser()) - - def parse(self, data: bytes) -> Union[bytes, dict]: - return self.parser(data) - - def parse_and_wrap(self, data): - return self.parse(data) - - -def validate(data): - if not ("data" in data and "metadata" in data): - raise InvalidStorageItemFormat(f"Expected a mapping with keys 'data' and 'metadata', got {data}.") - - -def wrap(data): - return {"data": data, "metadata": {}} - - -class QueueVisitor: - def __init__( - self, - storage: Storage, - callback: Callable, - response_strategy: ResponseStrategy, - parsing_strategy: ParsingStrategy = None, - download_strategy=None, - ): - self.storage = storage - self.callback = callback - self.download_strategy = download_strategy or get_download_strategy() - self.response_strategy = response_strategy - self.parsing_strategy = parsing_strategy or DynamicParsingStrategy() - - def load_data(self, queue_item_body): - data = self.download_strategy(self.storage, queue_item_body) - data = map(self.parsing_strategy, data) - data = map(standardize, data) - return data - - def process_storage_item(self, data_metadata_pack): - return self.callback(data_metadata_pack) - - def load_item_from_storage_and_process_with_callback(self, queue_item_body): - """Bundles the result from processing a storage item with the body of the corresponding queue item.""" - - def process_storage_item(storage_item): - analysis_input = {**storage_item, **queue_item_body} - return self.process_storage_item(analysis_input) - - storage_items = self.load_data(queue_item_body) - results = lflatten(map(process_storage_item, storage_items)) - return {"data": results, **queue_item_body} - - def __call__(self, queue_item_body): - analysis_result_body = self.load_item_from_storage_and_process_with_callback(queue_item_body) - return self.response_strategy(analysis_result_body) - - -def standardize(data) -> Dict: - """Storage items can be a blob or a blob with metadata. Standardizes to the latter. - - Cases: - 1) backend upload: data as bytes - 2) Some Python service's upload: data as bytes of a json string "{'data': , 'metadata': }", - where value of key 'data' was encoded with bytes_to_string(...) - - Returns: - {"data": bytes, "metadata": dict} - """ - - def is_blob_without_metadata(data): - return isinstance(data, bytes) - - def is_blob_with_metadata(data: Dict): - return isinstance(data, dict) - - if is_blob_without_metadata(data): - return wrap(data) - - elif is_blob_with_metadata(data): - validate(data) - return data - - else: # Fallback / used for testing with simple data - logger.warning("Encountered storage data in unexpected format.") - assert isinstance(data, str) - return wrap(string_to_bytes(data)) - - -def get_download_strategy(download_strategy_type=None): - download_strategies = { - "single": SingleDownloadStrategy(), - "multi": MultiDownloadStrategy(), - } - return download_strategies.get(download_strategy_type or CONFIG.service.download_strategy, SingleDownloadStrategy()) - - -class DownloadStrategy(abc.ABC): - def _load_data(self, storage, queue_item_body): - object_descriptor = self.get_object_descriptor(queue_item_body) - logging.debug(f"Downloading {object_descriptor}...") - data = self.__download(storage, object_descriptor) - logging.debug(f"Downloaded {object_descriptor}.") - assert isinstance(data, bytes) - data = gzip.decompress(data) - return [data] - - @staticmethod - def __download(storage, object_descriptor): - try: - data = storage.get_object(**object_descriptor) - except Exception as err: - logging.warning(f"Loading data from storage failed for {object_descriptor}.") - raise DataLoadingFailure from err - - return data - - @staticmethod - @abc.abstractmethod - def get_object_name(body: dict): - raise NotImplementedError - - def get_object_descriptor(self, body): - return {"bucket_name": parse_disjunction_string(CONFIG.storage.bucket), "object_name": self.get_object_name(body)} - - -class SingleDownloadStrategy(DownloadStrategy): - def download(self, storage, queue_item_body): - return self._load_data(storage, queue_item_body) - - @staticmethod - def get_object_name(body: dict): - - # TODO: deepcopy still necessary? - body = deepcopy(body) - - dossier_id, file_id = itemgetter("dossierId", "fileId")(body) - - object_name = f"{dossier_id}/{file_id}.{CONFIG.service.target_file_extension}" - - return object_name - - def __call__(self, storage, queue_item_body): - return self.download(storage, queue_item_body) - - -class MultiDownloadStrategy(DownloadStrategy): - def __init__(self): - # TODO: pass in bucket name from outside / introduce closure-like abstraction for the bucket - self.bucket_name = parse_disjunction_string(CONFIG.storage.bucket) - - def download(self, storage: Storage, queue_item_body): - pages = "|".join(map(str, queue_item_body["pages"])) - matches_page = r".*id:(" + pages + r").*" - - object_names = storage.get_all_object_names(self.bucket_name) - object_names = filter(matches_page, object_names) - objects = (storage.get_object(self.bucket_name, objn) for objn in object_names) - objects = map(gzip.decompress, objects) - - return objects - - @staticmethod - def get_object_name(body: dict): - - def get_key(key): - return key if key in body else False - - # TODO: deepcopy still necessary? - body = deepcopy(body) - - folder = f"/{get_key('pages') or get_key('images')}/" - if not folder: - raise InvalidMessage("Expected a folder like 'images' oder 'pages' to be specified in message.") - - idnt = f"id:{body.get('id', 0)}" - - dossier_id, file_id = itemgetter("dossierId", "fileId")(body) - - object_name = f"{dossier_id}/{file_id}{folder}{idnt}.{CONFIG.service.target_file_extension}" - - return object_name - - def __call__(self, storage, queue_item_body): - return self.download(storage, queue_item_body) diff --git a/pyinfra/visitor/__init__.py b/pyinfra/visitor/__init__.py new file mode 100644 index 0000000..e8fbecb --- /dev/null +++ b/pyinfra/visitor/__init__.py @@ -0,0 +1 @@ +from .visitor import QueueVisitor \ No newline at end of file diff --git a/pyinfra/visitor/downloader.py b/pyinfra/visitor/downloader.py new file mode 100644 index 0000000..230e392 --- /dev/null +++ b/pyinfra/visitor/downloader.py @@ -0,0 +1,35 @@ +from functools import partial + +from funcy import compose + +from pyinfra.file_descriptor_manager import FileDescriptorManager +from pyinfra.storage.storage import Storage +from pyinfra.utils.encoding import decompress +from pyinfra.utils.func import flift + + +class Downloader: + def __init__(self, storage: Storage, bucket_name, file_descriptor_manager: FileDescriptorManager): + self.storage = storage + self.bucket_name = bucket_name + self.file_descriptor_manager = file_descriptor_manager + + def __call__(self, queue_item_body): + return self.download(queue_item_body) + + def get_names_of_objects_by_pattern(self, file_pattern: str): + matches_pattern = flift(file_pattern) + page_object_names = compose(matches_pattern, self.storage.get_all_object_names)(self.bucket_name) + return page_object_names + + def download_and_decompress_object(self, object_names): + download = partial(self.storage.get_object, self.bucket_name) + return map(compose(decompress, download), object_names) + + def download(self, queue_item_body): + file_pattern = self.file_descriptor_manager.build_input_matcher(queue_item_body) + + page_object_names = self.get_names_of_objects_by_pattern(file_pattern) + objects = self.download_and_decompress_object(page_object_names) + + return objects diff --git a/pyinfra/visitor/response_formatter/__init__.py b/pyinfra/visitor/response_formatter/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/visitor/response_formatter/formatter.py b/pyinfra/visitor/response_formatter/formatter.py new file mode 100644 index 0000000..ebb4832 --- /dev/null +++ b/pyinfra/visitor/response_formatter/formatter.py @@ -0,0 +1,14 @@ +import abc + +from funcy import identity + +from pyinfra.utils.func import lift + + +class ResponseFormatter(abc.ABC): + def __call__(self, message): + return (identity if isinstance(message, dict) else lift)(self.format)(message) + + @abc.abstractmethod + def format(self, message): + pass diff --git a/pyinfra/visitor/response_formatter/formatters/__init__.py b/pyinfra/visitor/response_formatter/formatters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/visitor/response_formatter/formatters/default.py b/pyinfra/visitor/response_formatter/formatters/default.py new file mode 100644 index 0000000..c2c2f0e --- /dev/null +++ b/pyinfra/visitor/response_formatter/formatters/default.py @@ -0,0 +1,13 @@ +from funcy import first, omit + +from pyinfra.visitor.response_formatter.formatter import ResponseFormatter + + +class DefaultResponseFormatter(ResponseFormatter): + """ + TODO: Extend via using enums throughout the codebase instead of strings. + See the enum-formatter in image-prediction service for reference. + """ + + def format(self, message): + return {**omit(message, ["response_files"]), "responseFile": first(message["response_files"])} diff --git a/pyinfra/visitor/response_formatter/formatters/identity.py b/pyinfra/visitor/response_formatter/formatters/identity.py new file mode 100644 index 0000000..6d92014 --- /dev/null +++ b/pyinfra/visitor/response_formatter/formatters/identity.py @@ -0,0 +1,6 @@ +from pyinfra.visitor.response_formatter.formatter import ResponseFormatter + + +class IdentityResponseFormatter(ResponseFormatter): + def format(self, message): + return message diff --git a/pyinfra/visitor/strategies/__init__.py b/pyinfra/visitor/strategies/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/visitor/strategies/blob_parsing/__init__.py b/pyinfra/visitor/strategies/blob_parsing/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/visitor/strategies/blob_parsing/blob_parsing.py b/pyinfra/visitor/strategies/blob_parsing/blob_parsing.py new file mode 100644 index 0000000..baed5dc --- /dev/null +++ b/pyinfra/visitor/strategies/blob_parsing/blob_parsing.py @@ -0,0 +1,14 @@ +import abc + + +class BlobParsingStrategy(abc.ABC): + @abc.abstractmethod + def parse(self, data: bytes): + pass + + @abc.abstractmethod + def parse_and_wrap(self, data: bytes): + pass + + def __call__(self, data: bytes): + return self.parse_and_wrap(data) diff --git a/pyinfra/visitor/strategies/blob_parsing/dynamic.py b/pyinfra/visitor/strategies/blob_parsing/dynamic.py new file mode 100644 index 0000000..850ee87 --- /dev/null +++ b/pyinfra/visitor/strategies/blob_parsing/dynamic.py @@ -0,0 +1,20 @@ +from typing import Union + +from pyinfra.parser.parser_composer import EitherParserComposer +from pyinfra.parser.parsers.identity import IdentityBlobParser +from pyinfra.parser.parsers.json import JsonBlobParser +from pyinfra.parser.parsers.string import StringBlobParser +from pyinfra.visitor.strategies.blob_parsing.blob_parsing import BlobParsingStrategy + + +# TODO: Each analysis service should specify a custom parsing strategy for the type of data it expects to be found +# on the storage. This class is only a temporary trial-and-error->fallback type of solution. +class DynamicParsingStrategy(BlobParsingStrategy): + def __init__(self): + self.parser = EitherParserComposer(JsonBlobParser(), StringBlobParser(), IdentityBlobParser()) + + def parse(self, data: bytes) -> Union[bytes, dict]: + return self.parser(data) + + def parse_and_wrap(self, data): + return self.parse(data) diff --git a/pyinfra/visitor/strategies/response/__init__.py b/pyinfra/visitor/strategies/response/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/visitor/strategies/response/aggregation.py b/pyinfra/visitor/strategies/response/aggregation.py new file mode 100644 index 0000000..b8ac6a7 --- /dev/null +++ b/pyinfra/visitor/strategies/response/aggregation.py @@ -0,0 +1,92 @@ +import abc +from collections import deque +from typing import Callable + +from funcy import omit, filter, first, lpluck, identity +from more_itertools import peekable + +from pyinfra.file_descriptor_manager import FileDescriptorManager +from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing +from pyinfra.utils.encoding import pack_analysis_payload +from pyinfra.visitor.strategies.response.response import ResponseStrategy + + +class UploadFormatter(abc.ABC): + @abc.abstractmethod + def format(self, items): + raise NotImplementedError + + def __call__(self, items): + return self.format(items) + + +class ProjectingUploadFormatter(UploadFormatter): + def format(self, items): + + head = first(items) + if head["data"]: + assert len(items) == 1 + return head + else: + items = lpluck("metadata", items) + return items + + +class AggregationStorageStrategy(ResponseStrategy): + def __init__( + self, + storage, + file_descriptor_manager: FileDescriptorManager, + merger: Callable = list, + upload_formatter: UploadFormatter = identity, + ): + self.storage = storage + self.file_descriptor_manager = file_descriptor_manager + self.merger = merger + self.upload_formatter = upload_formatter + + self.buffer = deque() + self.response_files = deque() + + def handle_response(self, analysis_response, final=False): + def upload_or_aggregate(analysis_payload): + request_metadata = omit(analysis_response, ["analysis_payloads"]) + return self.upload_or_aggregate(analysis_payload, request_metadata, last=not analysis_payloads.peek(False)) + + analysis_payloads = peekable(analysis_response["analysis_payloads"]) + yield from filter(is_not_nothing, map(upload_or_aggregate, analysis_payloads)) + + def upload_or_aggregate(self, analysis_payload, request_metadata, last=False): + """analysis_payload : {data: ..., metadata: ...}""" + storage_upload_info = self.file_descriptor_manager.build_storage_upload_info(analysis_payload, request_metadata) + object_descriptor = self.file_descriptor_manager.get_output_object_descriptor(storage_upload_info) + + self.add_analysis_payload_to_buffer(analysis_payload) + + if analysis_payload["data"] or last: + self.upload_aggregated_items(object_descriptor) + self.response_files.append(object_descriptor["object_name"]) + + return self.build_response_message(storage_upload_info) if last else Nothing + + def add_analysis_payload_to_buffer(self, analysis_payload): + self.buffer.append({**analysis_payload, "metadata": omit(analysis_payload["metadata"], ["id"])}) + + def upload_aggregated_items(self, object_descriptor): + items_to_upload = self.upload_formatter(self.merge_queue_items()) + self.upload_item(items_to_upload, object_descriptor) + + def build_response_message(self, storage_upload_info): + + response_files = [*self.response_files] + self.response_files.clear() + + return {**storage_upload_info, "response_files": response_files} + + def upload_item(self, analysis_payload, object_descriptor): + self.storage.put_object(**object_descriptor, data=pack_analysis_payload(analysis_payload)) + + def merge_queue_items(self): + merged_buffer_content = self.merger(self.buffer) + self.buffer.clear() + return merged_buffer_content diff --git a/pyinfra/visitor/strategies/response/forwarding.py b/pyinfra/visitor/strategies/response/forwarding.py new file mode 100644 index 0000000..c71604d --- /dev/null +++ b/pyinfra/visitor/strategies/response/forwarding.py @@ -0,0 +1,6 @@ +from pyinfra.visitor.strategies.response.response import ResponseStrategy + + +class ForwardingStrategy(ResponseStrategy): + def handle_response(self, analysis_response): + return analysis_response diff --git a/pyinfra/visitor/strategies/response/response.py b/pyinfra/visitor/strategies/response/response.py new file mode 100644 index 0000000..d8ae847 --- /dev/null +++ b/pyinfra/visitor/strategies/response/response.py @@ -0,0 +1,10 @@ +import abc + + +class ResponseStrategy(abc.ABC): + @abc.abstractmethod + def handle_response(self, analysis_response: dict): + pass + + def __call__(self, analysis_response: dict): + return self.handle_response(analysis_response) diff --git a/pyinfra/visitor/strategies/response/storage.py b/pyinfra/visitor/strategies/response/storage.py new file mode 100644 index 0000000..77326ec --- /dev/null +++ b/pyinfra/visitor/strategies/response/storage.py @@ -0,0 +1,33 @@ +import json +from operator import itemgetter + +from pyinfra.config import parse_disjunction_string, CONFIG +from pyinfra.utils.encoding import compress +from pyinfra.visitor.strategies.response.response import ResponseStrategy + + +class StorageStrategy(ResponseStrategy): + def __init__(self, storage, response_file_extension="out"): + self.storage = storage + self.response_file_extension = response_file_extension + + def handle_response(self, body: dict): + response_object_descriptor = self.get_response_object_descriptor(body) + self.storage.put_object(**response_object_descriptor, data=compress(json.dumps(body).encode())) + body.pop("analysis_payloads") + body["response_files"] = [response_object_descriptor["object_name"]] + return body + + def get_response_object_descriptor(self, body): + return { + "bucket_name": parse_disjunction_string(CONFIG.storage.bucket), + "object_name": self.get_response_object_name(body), + } + + def get_response_object_name(self, body): + + dossier_id, file_id = itemgetter("dossierId", "fileId")(body) + + object_name = f"{dossier_id}/{file_id}/.{self.response_file_extension}" + + return object_name diff --git a/pyinfra/visitor/utils.py b/pyinfra/visitor/utils.py new file mode 100644 index 0000000..72a7b57 --- /dev/null +++ b/pyinfra/visitor/utils.py @@ -0,0 +1,51 @@ +import logging +from typing import Dict + +from pyinfra.exceptions import InvalidStorageItemFormat +from pyinfra.server.packing import string_to_bytes + +logger = logging.getLogger() + + +def build_file_path(storage_upload_info, folder): + return f"{storage_upload_info['fileId']}" + (f"/{folder}" if folder else "") + + +def standardize(storage_item) -> Dict: + """Storage items can be a blob or a blob with metadata. Standardizes to the latter. + + Cases: + 1) backend upload: data as bytes + 2) Some Python service's upload: data as bytes of a json string "{'data': , 'metadata': }", + where value of key 'data' was encoded with bytes_to_string(...) + + Returns: + {"data": bytes, "metadata": dict} + """ + + def is_blob_without_metadata(storage_item): + return isinstance(storage_item, bytes) + + def is_blob_with_metadata(storage_item: Dict): + return isinstance(storage_item, dict) + + if is_blob_without_metadata(storage_item): + return wrap(storage_item) + + elif is_blob_with_metadata(storage_item): + validate(storage_item) + return storage_item + + else: # Fallback / used for testing with simple data + logger.warning("Encountered storage data in unexpected format.") + assert isinstance(storage_item, str) + return wrap(string_to_bytes(storage_item)) + + +def wrap(data): + return {"data": data, "metadata": {}} + + +def validate(storage_item): + if not ("data" in storage_item and "metadata" in storage_item): + raise InvalidStorageItemFormat(f"Expected a mapping with keys 'data' and 'metadata', got {storage_item}.") diff --git a/pyinfra/visitor/visitor.py b/pyinfra/visitor/visitor.py new file mode 100644 index 0000000..a50ed26 --- /dev/null +++ b/pyinfra/visitor/visitor.py @@ -0,0 +1,84 @@ +from typing import Callable + +from funcy import lflatten, compose, itervalues, lfilter + +from pyinfra.utils.func import lift +from pyinfra.visitor.response_formatter.formatter import ResponseFormatter +from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter +from pyinfra.visitor.strategies.blob_parsing.blob_parsing import BlobParsingStrategy +from pyinfra.visitor.strategies.blob_parsing.dynamic import DynamicParsingStrategy +from pyinfra.visitor.strategies.response.response import ResponseStrategy +from pyinfra.visitor.utils import standardize + + +class QueueVisitor: + def __init__( + self, + callback: Callable, + data_loader: Callable, + response_strategy: ResponseStrategy, + parsing_strategy: BlobParsingStrategy = None, + response_formatter: ResponseFormatter = None, + ): + """Processes queue messages with a given callback. + + Args: + callback: callback to apply + data_loader: loads data specified in message and passes to callback + parsing_strategy: behaviour for interpreting loaded items + response_strategy: behaviour for response production + + TODO: merge all dependencies into a single pipeline like: getter -> parser -> processor -> formatter -> putter + + Returns: + depends on response strategy + """ + self.callback = callback + self.data_loader = data_loader + self.response_strategy = response_strategy + self.parsing_strategy = parsing_strategy or DynamicParsingStrategy() + self.response_formatter = response_formatter or IdentityResponseFormatter() + + def __call__(self, queue_item_body): + analysis_response = compose( + self.response_formatter, + self.response_strategy, + self.load_items_from_storage_and_process_with_callback, + )(queue_item_body) + + return analysis_response + + def load_items_from_storage_and_process_with_callback(self, queue_item_body): + """Bundles the result from processing a storage item with the body of the corresponding queue item.""" + + callback_results = compose( + self.remove_empty_results, + lflatten, + lift(self.get_item_processor(queue_item_body)), + self.load_data, + )(queue_item_body) + + return {"analysis_payloads": callback_results, **queue_item_body} + + def load_data(self, queue_item_body): + data = compose( + lift(standardize), + lift(self.parsing_strategy), + self.data_loader, + )(queue_item_body) + + return data + + def get_item_processor(self, queue_item_body): + def process_storage_item(storage_item): + analysis_input = {**storage_item, **queue_item_body} + return self.process_storage_item(analysis_input) + + return process_storage_item + + @staticmethod + def remove_empty_results(results): + return lfilter(compose(any, itervalues), results) + + def process_storage_item(self, data_metadata_pack): + return self.callback(data_metadata_pack) diff --git a/requirements.txt b/requirements.txt index 59682fe..19cdbeb 100755 --- a/requirements.txt +++ b/requirements.txt @@ -18,3 +18,4 @@ more-itertools==8.12.0 numpy==1.22.3 Pillow==9.0.1 prometheus-client==0.13.1 +frozendict==2.3.2 diff --git a/scripts/manage_minio.py b/scripts/manage_minio.py index 88e1127..d9e7eaa 100644 --- a/scripts/manage_minio.py +++ b/scripts/manage_minio.py @@ -1,5 +1,4 @@ import argparse -import gzip import os from pathlib import Path @@ -7,6 +6,7 @@ from tqdm import tqdm from pyinfra.config import CONFIG, parse_disjunction_string from pyinfra.storage.storages import get_s3_storage +from pyinfra.utils.encoding import compress def parse_args(): @@ -31,7 +31,7 @@ def combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, extension) def upload_compressed_response(storage, bucket_name, dossier_id, file_id, result) -> None: - data = gzip.compress(result.encode()) + data = compress(result.encode()) path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, CONFIG.service.response.extension) storage.put_object(bucket_name, path_gz, data) @@ -44,7 +44,7 @@ def add_file_compressed(storage, bucket_name, dossier_id, path) -> None: path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, Path(path).stem, suffix_gz) with open(path, "rb") as f: - data = gzip.compress(f.read()) + data = compress(f.read()) storage.put_object(bucket_name, path_gz, data) diff --git a/scripts/mock_client.py b/scripts/mock_client.py index 7b05c5d..34e007b 100644 --- a/scripts/mock_client.py +++ b/scripts/mock_client.py @@ -13,7 +13,7 @@ def parse_args(): parser.add_argument( "--analysis_container", "-a", - choices=["detr", "ner", "image", "conversion", "extraction", "dl_error"], + choices=["detr", "ner", "image", "conversion", "extraction", "dl_error", "table_parsing"], required=True, ) args = parser.parse_args() @@ -48,11 +48,11 @@ def make_connection() -> pika.BlockingConnection: return connection -def build_message_bodies(analyse_container_type, bucket_name): +def build_message_bodies(analysis_container, bucket_name): def update_message(message_dict): - if analyse_container_type == "detr" or analyse_container_type == "image": + if analysis_container == "detr" or analysis_container == "image": message_dict.update({"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "IMAGE_INFO.json.gz"}) - if analyse_container_type == "conversion": + if analysis_container == "conversion": message_dict.update( { "targetFileExtension": "ORIGIN.pdf.gz", @@ -61,13 +61,20 @@ def build_message_bodies(analyse_container_type, bucket_name): "pages": [1, 2, 3], } ) - if analyse_container_type == "extraction": + if analysis_container == "table_parsing": + message_dict.update( + { + "operation": "table_parsing", + "pages": [1, 2, 3], + } + ) + if analysis_container == "extraction": message_dict.update( {"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "json.gz", "operation": "extraction"} ) - if analyse_container_type == "dl_error": + if analysis_container == "dl_error": message_dict.update({"targetFileExtension": "no_such_file", "responseFileExtension": "IMAGE_INFO.json.gz"}) - if analyse_container_type == "ner": + if analysis_container == "ner": message_dict.update( {"targetFileExtension": "TEXT.json.gz", "responseFileExtension": "NER_ENTITIES.json.gz"} ) diff --git a/scripts/show_compressed_json.py b/scripts/show_compressed_json.py new file mode 100644 index 0000000..ce74729 --- /dev/null +++ b/scripts/show_compressed_json.py @@ -0,0 +1,26 @@ +import argparse +import gzip +import json + +from pyinfra.server.packing import bytes_to_string + + +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--compressed_json", "-j", required=True) + return parser.parse_args() + + +def main(fp): + with open(fp, "rb") as f: + compressed_json = f.read() + + json_str = gzip.decompress(compressed_json) + json_dict = json.loads(json_str) + + print(json.dumps(json_dict, indent=2)) + + +if __name__ == "__main__": + fp = parse_args().compressed_json + main(fp) diff --git a/src/serve.py b/src/serve.py index 2e52624..6db2ef6 100644 --- a/src/serve.py +++ b/src/serve.py @@ -4,7 +4,7 @@ from multiprocessing import Process from pyinfra.utils.retry import retry from pyinfra.config import CONFIG -from pyinfra.default_objects import get_consumer +from pyinfra.component_factory import ComponentFactory from pyinfra.exceptions import ConsumerError from pyinfra.flask import run_probing_webserver, set_up_probing_webserver from pyinfra.utils.banner import show_banner @@ -15,7 +15,7 @@ logger = logging.getLogger() @retry(ConsumerError) def consume(): try: - consumer = get_consumer() + consumer = ComponentFactory(CONFIG).get_consumer() consumer.basic_consume_and_publish() except Exception as err: logger.exception(err) diff --git a/test/config.yaml b/test/config.yaml index ba8404c..894a84f 100644 --- a/test/config.yaml +++ b/test/config.yaml @@ -1,3 +1,28 @@ +service: + response_formatter: identity + operations: + multi_inp_op: + input: + subdir: op_inp_files + extension: IN.gz + output: + subdir: op_outp_files + extension: OUT.gz + single_inp_op: + input: + subdir: "" + extension: IN.gz + output: + subdir: op_outp_files + extension: OUT.gz + default: + input: + subdir: "" + extension: IN.gz + output: + subdir: "" + extension: OUT.gz + storage: minio: endpoint: "http://127.0.0.1:9000" @@ -21,8 +46,7 @@ webserver: port: $SERVER_PORT|5000 # webserver port mode: $SERVER_MODE|production # webserver mode: {development, production} - mock_analysis_endpoint: "http://127.0.0.1:5000" -use_docker_fixture: True -logging: False \ No newline at end of file +use_docker_fixture: 1 +logging: 0 \ No newline at end of file diff --git a/test/conftest.py b/test/conftest.py index 69b86e4..43e3806 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,9 +1,15 @@ import json from pyinfra.config import CONFIG as MAIN_CONFIG +from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter +from pyinfra.visitor.strategies.blob_parsing.dynamic import DynamicParsingStrategy MAIN_CONFIG["retry"]["delay"] = 0.1 MAIN_CONFIG["retry"]["jitter"] = (0.1, 0.2) + +from pyinfra.default_objects import get_component_factory +from test.config import CONFIG as TEST_CONFIG + import logging import time from unittest.mock import Mock @@ -21,8 +27,9 @@ from pyinfra.storage.adapters.s3 import S3StorageAdapter from pyinfra.storage.clients.azure import get_azure_client from pyinfra.storage.clients.s3 import get_s3_client from pyinfra.storage.storage import Storage -from pyinfra.visitor import StorageStrategy, ForwardingStrategy, QueueVisitor -from test.config import CONFIG +from pyinfra.visitor import QueueVisitor +from pyinfra.visitor.strategies.response.forwarding import ForwardingStrategy +from pyinfra.visitor.strategies.response.storage import StorageStrategy from test.queue.queue_manager_mock import QueueManagerMock from test.storage.adapter_mock import StorageAdapterMock from test.storage.client_mock import StorageClientMock @@ -48,7 +55,7 @@ logging.getLogger("waitress").setLevel(level=logging.CRITICAL + 1) @pytest.fixture(autouse=True) def mute_logger(): - if not CONFIG.logging: + if not TEST_CONFIG.logging: logger.setLevel(logging.CRITICAL + 1) @@ -106,7 +113,7 @@ def s3_backend(request): @pytest.fixture(scope="session", autouse=True) def docker_compose(sleep_seconds=30): - if CONFIG.use_docker_fixture: + if TEST_CONFIG.use_docker_fixture: logger.info(f"Starting docker containers with {COMPOSE_PATH}/docker-compose.yml...") compose = DockerCompose(COMPOSE_PATH, compose_file_name="docker-compose.yml") compose.start() @@ -124,7 +131,7 @@ def get_pika_connection_params(): def get_s3_params(s3_backend): - params = CONFIG.storage[s3_backend] + params = TEST_CONFIG.storage[s3_backend] return params @@ -133,7 +140,7 @@ def get_adapter(client_name, s3_backend): if client_name == "mock": return StorageAdapterMock(StorageClientMock()) if client_name == "azure": - return AzureStorageAdapter(get_azure_client(CONFIG.storage.azure.connection_string)) + return AzureStorageAdapter(get_azure_client(TEST_CONFIG.storage.azure.connection_string)) if client_name == "s3": return S3StorageAdapter(get_s3_client(get_s3_params(s3_backend))) else: @@ -194,5 +201,22 @@ def response_strategy(response_strategy_name, storage): @pytest.fixture() -def visitor(storage, analysis_callback, response_strategy): - return QueueVisitor(storage, analysis_callback, response_strategy) +def visitor(storage, analysis_callback, response_strategy, component_factory): + + return QueueVisitor( + callback=analysis_callback, + data_loader=component_factory.get_downloader(storage), + response_strategy=response_strategy, + ) + + +@pytest.fixture +def file_descriptor_manager(component_factory): + return component_factory.get_file_descriptor_manager() + + +@pytest.fixture +def component_factory(): + MAIN_CONFIG["service"]["operations"] = TEST_CONFIG.service.operations + + return get_component_factory(MAIN_CONFIG) diff --git a/test/fixtures/consumer.py b/test/fixtures/consumer.py index 8b663b1..d4e3220 100644 --- a/test/fixtures/consumer.py +++ b/test/fixtures/consumer.py @@ -1,9 +1,9 @@ -from _operator import itemgetter +from operator import itemgetter +from typing import Iterable import pytest from pyinfra.queue.consumer import Consumer -from test.utils.input import pair_data_with_queue_message @pytest.fixture(scope="session") @@ -20,3 +20,18 @@ def access_callback(): def items(): numbers = [f"{i}".encode() for i in range(3)] return pair_data_with_queue_message(numbers) + + +def pair_data_with_queue_message(data: Iterable[bytes]): + def inner(): + for i, d in enumerate(data): + body = { + "dossierId": "folder", + "fileId": f"file{i}", + "targetFileExtension": "in.gz", + "responseFileExtension": "out.gz", + "pages": [0, 2, 3], + } + yield d, body + + return list(inner()) diff --git a/test/fixtures/input.py b/test/fixtures/input.py index e635568..00e5c90 100644 --- a/test/fixtures/input.py +++ b/test/fixtures/input.py @@ -11,7 +11,6 @@ from pyinfra.server.normalization import normalize_item from pyinfra.server.packing import pack, unpack from pyinfra.utils.func import star, lift, lstarlift from test.utils.image import image_to_bytes -from test.utils.input import pair_data_with_queue_message from test.utils.pdf import pdf_stream @@ -74,7 +73,7 @@ def strings_to_bytes(strings): @pytest.fixture -def targets(data_message_pairs, input_data_items, operation, metadata, server_side_test): +def targets(data_message_pairs, input_data_items, operation, metadata, server_side_test, queue_message_metadata): """TODO: this has become super wonky""" metadata = [{**m1, **m2} for m1, m2 in zip(lmap(second, data_message_pairs), metadata)] @@ -87,7 +86,7 @@ def targets(data_message_pairs, input_data_items, operation, metadata, server_si response_data, response_metadata = zip(*map(unpack, flatten(starmap(op, zip(input_data_items, metadata))))) queue_message_keys = ["id"] * (not server_side_test) + [ - *second(first(pair_data_with_queue_message([b""]))).keys() + *first(queue_message_metadata).keys() ] response_metadata = lmap(partial(omit, keys=queue_message_keys), response_metadata) expected = lzip(response_data, response_metadata) @@ -99,8 +98,8 @@ def targets(data_message_pairs, input_data_items, operation, metadata, server_si @pytest.fixture -def endpoint(url): - return f"{url}/submit" +def endpoint(url, operation_name): + return f"{url}/{operation_name}" @pytest.fixture(params=["rest", "basic"]) @@ -141,10 +140,26 @@ def images_to_bytes(images): @pytest.fixture -def metadata(n_items): +def metadata(n_items, many_to_n): + """storage metadata + TODO: rename + """ return list(repeat({"key": "value"}, times=n_items)) +@pytest.fixture +def queue_message_metadata(n_items, operation_name): + def metadata(i): + return { + "dossierId": "folder", + "fileId": f"file{i}", + "pages": [0, 2, 3], + "operation": operation_name, + } + + return lmap(metadata, range(n_items)) + + @pytest.fixture def packages(input_data_items, metadata): return lstarlift(pack)(zip(input_data_items, metadata)) diff --git a/test/fixtures/server.py b/test/fixtures/server.py index 84346c5..4626deb 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -46,8 +46,13 @@ def url(host, port): @pytest.fixture -def server(server_stream_function, buffer_size): - return set_up_processing_server(server_stream_function, buffer_size) +def server(server_stream_function, buffer_size, operation_name): + return set_up_processing_server({operation_name: server_stream_function}, buffer_size) + + +@pytest.fixture +def operation_name(many_to_n): + return "multi_inp_op" if many_to_n else "single_inp_op" @pytest.fixture diff --git a/test/integration_tests/serve_test.py b/test/integration_tests/serve_test.py index 21c0e34..6ce0647 100644 --- a/test/integration_tests/serve_test.py +++ b/test/integration_tests/serve_test.py @@ -1,23 +1,16 @@ -import gzip import json import re from itertools import starmap, repeat, chain from operator import itemgetter import pytest -from funcy import compose, lpluck, first, second +from funcy import compose, first, second, pluck, lflatten, lzip -from pyinfra.default_objects import ( - get_callback, - get_response_strategy, - get_consumer, - get_queue_manager, - get_storage, -) -from pyinfra.queue.consumer import Consumer +from pyinfra.config import CONFIG +from pyinfra.default_objects import get_component_factory from pyinfra.server.packing import unpack, pack -from pyinfra.visitor import QueueVisitor, get_download_strategy -from test.utils.input import pair_data_with_queue_message +from pyinfra.utils.encoding import compress, decompress +from test.config import CONFIG as TEST_CONFIG @pytest.mark.parametrize( @@ -37,7 +30,7 @@ from test.utils.input import pair_data_with_queue_message @pytest.mark.parametrize( "analysis_task", [ - # False, + False, True, ], ) @@ -85,15 +78,13 @@ from test.utils.input import pair_data_with_queue_message @pytest.mark.parametrize( "many_to_n", [ - False, True, + # False, ], ) -def test_serving( - server_process, bucket_name, components, targets, data_message_pairs, n_items, many_to_n, download_strategy -): +def test_serving(server_process, bucket_name, components, targets, data_message_pairs, n_items, many_to_n): - storage, queue_manager, consumer = components + storage, queue_manager, consumer, file_descriptor_manager = components assert queue_manager.input_queue.to_list() == [] assert queue_manager.output_queue.to_list() == [] @@ -104,18 +95,18 @@ def test_serving( if many_to_n: upload_data_to_folder_in_storage_and_publish_single_request_to_queue( - storage, queue_manager, data_message_pairs, download_strategy + storage, queue_manager, data_message_pairs, file_descriptor_manager ) else: upload_data_to_storage_and_publish_requests_to_queue( - storage, queue_manager, data_message_pairs, download_strategy + storage, queue_manager, data_message_pairs, file_descriptor_manager ) consumer.consume_and_publish(n=int(many_to_n) or n_items) outputs = get_data_uploaded_by_consumer(queue_manager, storage, bucket_name) - # TODO: correctness of target should be validated as well, since production was become non-trivial + # TODO: correctness of target should be validated as well, since production has become non-trivial assert sorted(outputs) == sorted(targets) @@ -125,26 +116,29 @@ def data_metadata_packs(input_data_items, metadata): @pytest.fixture -def data_message_pairs(data_metadata_packs): - data_message_pairs = pair_data_with_queue_message(data_metadata_packs) - return data_message_pairs +def data_message_pairs(data_metadata_packs, queue_message_metadata): + return lzip(data_metadata_packs, queue_message_metadata) # TODO: refactor; too many params -def upload_data_to_storage_and_publish_requests_to_queue(storage, queue_manager, data_message_pairs, download_strategy): +def upload_data_to_storage_and_publish_requests_to_queue( + storage, queue_manager, data_message_pairs, file_descriptor_manager +): for data, message in data_message_pairs: - upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message, download_strategy) + upload_data_to_storage_and_publish_request_to_queue( + storage, queue_manager, data, message, file_descriptor_manager + ) # TODO: refactor; too many params -def upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message, download_strategy): - storage.put_object(**download_strategy.get_object_descriptor(message), data=gzip.compress(data)) +def upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message, file_descriptor_manager): + storage.put_object(**file_descriptor_manager.get_input_object_descriptor(message), data=compress(data)) queue_manager.publish_request(message) # TODO: refactor body; too long and scripty def upload_data_to_folder_in_storage_and_publish_single_request_to_queue( - storage, queue_manager, data_message_pairs, download_strategy + storage, queue_manager, data_message_pairs, file_descriptor_manager ): assert data_message_pairs @@ -152,10 +146,10 @@ def upload_data_to_folder_in_storage_and_publish_single_request_to_queue( pages = ref_message["pages"] for data, page in zip(map(first, data_message_pairs), pages): - object_descriptor = download_strategy.get_object_descriptor(ref_message) + object_descriptor = file_descriptor_manager.get_input_object_descriptor(ref_message) object_descriptor["object_name"] = build_filepath(object_descriptor, page) - storage.put_object(**object_descriptor, data=gzip.compress(data)) + storage.put_object(**object_descriptor, data=compress(data)) queue_manager.publish_request(ref_message) @@ -163,15 +157,14 @@ def upload_data_to_folder_in_storage_and_publish_single_request_to_queue( def build_filepath(object_descriptor, page): object_name = object_descriptor["object_name"] parts = object_name.split("/") - parts.insert(-1, "pages") path = "/".join(parts) - path = re.sub("id:\d", f"id:{page}", path) + path = re.sub(r"id:\d", f"id:{page}", path) return path def get_data_uploaded_by_consumer(queue_manager, storage, bucket_name): - names_of_uploaded_files = lpluck("responseFile", queue_manager.output_queue.to_list()) + names_of_uploaded_files = lflatten(pluck("response_files", queue_manager.output_queue.to_list())) uploaded_files = starmap(storage.get_object, zip(repeat(bucket_name), names_of_uploaded_files)) outputs = sorted(chain(*map(decode, uploaded_files)), key=itemgetter(0)) return outputs @@ -186,20 +179,20 @@ def components(components_type, real_components, test_components, bucket_name): else: raise ValueError(f"Unknown component type '{components_type}'.") - storage, queue_manager, consumer = components + storage, queue_manager, consumer, file_descriptor_manager = components queue_manager.clear() storage.make_bucket(bucket_name) storage.clear_bucket(bucket_name) - yield storage, queue_manager, consumer + yield storage, queue_manager, consumer, file_descriptor_manager queue_manager.clear() storage.clear_bucket(bucket_name) def decode(storage_item): - storage_item = json.loads(gzip.decompress(storage_item).decode()) + storage_item = json.loads(decompress(storage_item).decode()) if not isinstance(storage_item, list): storage_item = [storage_item] @@ -212,26 +205,36 @@ def components_type(request): @pytest.fixture -def real_components(url, download_strategy): - callback = get_callback(url) - consumer = get_consumer(callback) - queue_manager = get_queue_manager() - storage = get_storage() +def real_components(url): - consumer.visitor.download_strategy = download_strategy - return storage, queue_manager, consumer + CONFIG["service"]["operations"] = TEST_CONFIG.service.operations + CONFIG["service"]["response_formatter"] = TEST_CONFIG.service.response_formatter + CONFIG["service"]["upload_formatter"] = "identity" + component_factory = get_component_factory(CONFIG) -@pytest.fixture -def download_strategy(many_to_n): - return get_download_strategy("multi" if many_to_n else "single") + callback = component_factory.get_callback(url) + consumer = component_factory.get_consumer(callback) + queue_manager = component_factory.get_queue_manager() + storage = component_factory.get_storage() + file_descriptor_manager = component_factory.get_file_descriptor_manager() + + return storage, queue_manager, consumer, file_descriptor_manager @pytest.fixture def test_components(url, queue_manager, storage): - - callback = get_callback(url) - visitor = QueueVisitor(storage, callback, get_response_strategy(storage)) - consumer = Consumer(visitor, queue_manager) - - return storage, queue_manager, consumer + pass + # + # component_factory = ComponentFactory(CONFIG) + # + # file_descriptor_manager = component_factory.get_file_descriptor_manager() + # + # visitor = QueueVisitor( + # storage=storage, + # callback=component_factory.get_callback(url), + # response_strategy=component_factory.get_response_strategy(storage), + # ) + # consumer = Consumer(visitor, queue_manager) + # + # return storage, queue_manager, consumer, file_descriptor_manager diff --git a/test/unit_tests/consumer_test.py b/test/unit_tests/consumer_test.py index 95f0c62..4975b89 100644 --- a/test/unit_tests/consumer_test.py +++ b/test/unit_tests/consumer_test.py @@ -1,19 +1,20 @@ import logging -from operator import itemgetter import pytest -from funcy import lmapcat +from funcy import pluck, lflatten +from pyinfra.config import CONFIG +from pyinfra.component_factory import ComponentFactory from pyinfra.exceptions import ProcessingFailure -from pyinfra.utils.encoding import pack_for_upload -from pyinfra.visitor import ForwardingStrategy, SingleDownloadStrategy +from pyinfra.visitor.strategies.response.forwarding import ForwardingStrategy +from test.utils.storage import pack_for_upload logger = logging.getLogger() @pytest.mark.xfail( reason="NOTE: Something is messed up in the test setups." - "These tests fail when run together with other tests. Do not know yet which ones and why." + " These tests fail when run together with other tests. Do not know yet which ones and why." ) class TestConsumer: @pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session") @@ -25,7 +26,7 @@ class TestConsumer: @pytest.mark.parametrize( "queue_manager_name", [ - "pika", # NOTE: pika must come first. Test fails IFF pika is in second place, for whatever reason. + "pika", # NOTE: pika must come first. Test fails IFF pika is in second position, for whatever reason. "mock", ], scope="session", @@ -63,8 +64,9 @@ class TestConsumer: @pytest.mark.parametrize("client_name", ["mock", "s3", "azure"], scope="session") @pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session") def test_consuming_nonempty_input_queue_with_visitor_puts_messages_on_output_queue_in_fifo_order( - self, consumer, queue_manager, visitor, bucket_name, storage, items, input_data_encoder + self, consumer, queue_manager, visitor, bucket_name, storage, items, input_data_encoder, ): + file_descriptor_manager = ComponentFactory(CONFIG).get_file_descriptor_manager() visitor.response_strategy = ForwardingStrategy() @@ -72,7 +74,7 @@ class TestConsumer: storage.clear_bucket(bucket_name) for data, message in items: - storage.put_object(**SingleDownloadStrategy().get_object_descriptor(message), data=pack_for_upload(data)) + storage.put_object(**file_descriptor_manager.get_input_object_descriptor(message), data=pack_for_upload(data)) queue_manager.publish_request(message) requests = consumer.consume(inactivity_timeout=5) @@ -81,7 +83,7 @@ class TestConsumer: logger.debug(f"Processing item {itm}") queue_manager.publish_response(req, visitor) - assert lmapcat(itemgetter("data"), queue_manager.output_queue.to_list()) == ["00", "11", "22"] + assert lflatten(pluck("analysis_payloads", queue_manager.output_queue.to_list())) == ["00", "11", "22"] @pytest.mark.parametrize("queue_manager_name", ["pika"], scope="session") def test_message_is_republished_when_callback_raises_processing_failure_exception( @@ -109,7 +111,7 @@ class TestConsumer: logger.addFilter(lambda record: False) # TODO: - # Note above says order of mock and pika matters. Note, that you can produce an error for debugging, when + # The note above says order of mock and pika matters. Note, that you can produce an error for debugging, when # using `queue_manager.publish_response(next(requests), callback)` without the-with and while-block, where it # becomes obvious, that the test with the note above then uses the data from THIS here test. with pytest.raises(DebugError): diff --git a/test/unit_tests/queue_visitor_test.py b/test/unit_tests/queue_visitor_test.py index aec7c96..4ad298c 100644 --- a/test/unit_tests/queue_visitor_test.py +++ b/test/unit_tests/queue_visitor_test.py @@ -1,41 +1,41 @@ -import gzip import json import pytest +from funcy import first -from pyinfra.utils.encoding import pack_for_upload -from pyinfra.visitor import SingleDownloadStrategy +from pyinfra.utils.encoding import decompress +from test.utils.storage import pack_for_upload @pytest.fixture() def body(): - return {"dossierId": "folder", "fileId": "file", "targetFileExtension": "in.gz", "responseFileExtension": "out.gz"} + return {"dossierId": "folder", "fileId": "file"} @pytest.mark.parametrize("client_name", ["mock", "azure", "s3"], scope="session") class TestVisitor: @pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session") def test_given_a_input_queue_message_callback_pulls_the_data_from_storage( - self, visitor, body, storage, bucket_name + self, visitor, body, storage, bucket_name, file_descriptor_manager ): storage.clear_bucket(bucket_name) - storage.put_object(**SingleDownloadStrategy().get_object_descriptor(body), data=pack_for_upload(b"content")) + storage.put_object(**file_descriptor_manager.get_input_object_descriptor(body), data=pack_for_upload(b"content")) data_received = list(visitor.load_data(body)) assert [{"data": b"content", "metadata": {}}] == data_received @pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session") - def test_visitor_pulls_and_processes_data(self, visitor, body, storage, bucket_name): + def test_visitor_pulls_and_processes_data(self, visitor, body, storage, bucket_name, file_descriptor_manager): storage.clear_bucket(bucket_name) - storage.put_object(**SingleDownloadStrategy().get_object_descriptor(body), data=pack_for_upload(b"2")) - response_body = visitor.load_item_from_storage_and_process_with_callback(body) - assert response_body["data"] == ["22"] + storage.put_object(**file_descriptor_manager.get_input_object_descriptor(body), data=pack_for_upload(b"2")) + response_body = visitor.load_items_from_storage_and_process_with_callback(body) + assert response_body["analysis_payloads"] == ["22"] @pytest.mark.parametrize("response_strategy_name", ["storage"], scope="session") - def test_visitor_puts_response_on_storage(self, visitor, body, storage, bucket_name): + def test_visitor_puts_response_on_storage(self, visitor, body, storage, bucket_name, file_descriptor_manager): storage.clear_bucket(bucket_name) - storage.put_object(**SingleDownloadStrategy().get_object_descriptor(body), data=pack_for_upload(b"2")) + storage.put_object(**file_descriptor_manager.get_input_object_descriptor(body), data=pack_for_upload(b"2")) response_body = visitor(body) assert "data" not in response_body assert json.loads( - gzip.decompress(storage.get_object(bucket_name=bucket_name, object_name=response_body["responseFile"])) - )["data"] == ["22"] + decompress(storage.get_object(bucket_name=bucket_name, object_name=first(response_body["response_files"]))) + )["analysis_payloads"] == ["22"] diff --git a/test/unit_tests/server/aggregation_strategy_tets.py b/test/unit_tests/server/aggregation_strategy_tets.py new file mode 100644 index 0000000..3c40527 --- /dev/null +++ b/test/unit_tests/server/aggregation_strategy_tets.py @@ -0,0 +1,54 @@ +import json + +import pytest + +from pyinfra.visitor.strategies.response.aggregation import AggregationStorageStrategy + + +@pytest.mark.parametrize("client_name", ["mock"], scope="session") +class TestAggregationStorageStrategy: + def test_aggregation_strategy_with_no_empty_data_field_causes_two_uploads(self, storage): + strat = AggregationStorageStrategy(storage=storage) + + analysis_response = { + "analysis_payloads": [ + {"data": [1], "metadata": {"id": 1}}, + {"data": [3], "metadata": {"id": 3}}, + ], + "dossierId": "dossier0", + "fileId": "file0", + "pages": [0, 2], + } + response_message_bodies = [*strat(analysis_response)] + assert response_message_bodies == [ + { + "dossierId": "dossier0", + "fileId": "file0", + "pages": [0, 2], + "id": 3, + "response_files": ["dossier0/file0/id:1.json.gz", "dossier0/file0/id:3.json.gz"], + } + ] + + def test_aggregation_strategy_with_empty_data_field_causes_single_uploads(self, storage): + strat = AggregationStorageStrategy(storage=storage) + + analysis_response = { + "analysis_payloads": [ + {"data": None, "metadata": {"id": 1}}, + {"data": [3], "metadata": {"id": 3}}, + ], + "dossierId": "dossier0", + "fileId": "file0", + "pages": [0, 2], + } + response_message_bodies = [*strat(analysis_response)] + assert response_message_bodies == [ + { + "dossierId": "dossier0", + "fileId": "file0", + "pages": [0, 2], + "id": 3, + "response_files": ["dossier0/file0/id:3.json.gz"], + } + ] diff --git a/test/unit_tests/server/pipeline_test.py b/test/unit_tests/server/pipeline_test.py index 1508c57..0197d95 100644 --- a/test/unit_tests/server/pipeline_test.py +++ b/test/unit_tests/server/pipeline_test.py @@ -1,5 +1,5 @@ import pytest -from funcy import rcompose, compose, project, second, merge +from funcy import rcompose, compose, project, second, merge, lpluck from pyinfra.server.buffering.stream import FlatStreamBuffer from pyinfra.server.client_pipeline import ClientPipeline @@ -15,14 +15,13 @@ from pyinfra.server.receiver.receivers.rest import RestReceiver from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction from pyinfra.server.utils import make_streamable_and_wrap_in_packing_logic from pyinfra.utils.func import lift, llift -from test.utils.input import pair_data_with_queue_message def test_mock_pipeline(): data = [1, 2, 3] - f, g, h, u = map(lift, [lambda x: x**2, lambda x: x + 2, lambda x: x / 2, lambda x: x]) + f, g, h, u = map(lift, [lambda x: x ** 2, lambda x: x + 2, lambda x: x / 2, lambda x: x]) pipeline = ClientPipeline(f, g, h, u) @@ -31,23 +30,21 @@ def test_mock_pipeline(): @pytest.mark.parametrize("client_pipeline_type", ["basic", "rest"]) @pytest.mark.parametrize("server_side_test", [True]) -def test_pipeline(core_operation, client_pipeline, input_data_items, metadata, targets, n_items): +def test_pipeline( + core_operation, client_pipeline, input_data_items, metadata, queue_message_metadata, targets, n_items +): assert core_operation is not Nothing - metadata = add_queue_metadata_to_server_side_metadata(metadata) + metadata = [ + merge(storage_mdt, project(queue_mdt, ["operation", "pages"])) + for storage_mdt, queue_mdt in zip(metadata, queue_message_metadata) + ] output = compose(llift(unpack), client_pipeline)(input_data_items, metadata) assert n_items == 0 or len(output) > 0 assert output == targets -def add_queue_metadata_to_server_side_metadata(metadata): - return [ - merge(project(second(mdt), [*mdt_o.keys(), "pages"]), mdt_o) - for mdt, mdt_o in zip(pair_data_with_queue_message(metadata), metadata) - ] - - @pytest.mark.parametrize("item_type", ["string"]) @pytest.mark.parametrize("n_items", [1]) def test_pipeline_is_lazy(input_data_items, metadata, basic_client_pipeline, buffer_size): diff --git a/test/utils/input.py b/test/utils/input.py deleted file mode 100644 index 77003bc..0000000 --- a/test/utils/input.py +++ /dev/null @@ -1,16 +0,0 @@ -from typing import Iterable - - -def pair_data_with_queue_message(data: Iterable[bytes]): - def inner(): - for i, d in enumerate(data): - body = { - "dossierId": "folder", - "fileId": f"file{i}", - "targetFileExtension": "in.gz", - "responseFileExtension": "out.gz", - "pages": [0, 2, 3] - } - yield d, body - - return list(inner()) diff --git a/test/utils/storage.py b/test/utils/storage.py new file mode 100644 index 0000000..e13fee8 --- /dev/null +++ b/test/utils/storage.py @@ -0,0 +1,8 @@ +import json + +from pyinfra.server.packing import bytes_to_string +from pyinfra.utils.encoding import compress + + +def pack_for_upload(data: bytes): + return compress(json.dumps(bytes_to_string(data)).encode())