diff --git a/config.yaml b/config.yaml index 3cd4291..592d0e2 100755 --- a/config.yaml +++ b/config.yaml @@ -4,9 +4,12 @@ service: response_formatter: default # formats analysis payloads of response messages upload_formatter: projecting # formats analysis payloads of objects uploaded to storage # Note: This is not really the right place for this. It should be configured on a per-service basis. + operation: $OPERATION|default + # operation needs to be specified in deployment config for services that are called without an operation specified operations: conversion: input: + multi: False subdir: "" extension: ORIGIN.pdf.gz output: @@ -14,6 +17,7 @@ service: extension: json.gz extraction: input: + multi: False subdir: "" extension: ORIGIN.pdf.gz output: @@ -21,15 +25,25 @@ service: extension: json.gz table_parsing: input: + multi: True subdir: "pages_as_images" extension: json.gz output: subdir: "table_parses" extension: json.gz + image_classification: + input: + multi: True + subdir: "extracted_images" + extension: json.gz + output: + subdir: "" + extension: IMAGE_INFO.json.gz default: input: + multi: False subdir: "" - extension: IN.gz + extension: in.gz output: subdir: "" extension: out.gz diff --git a/pyinfra/component_factory.py b/pyinfra/component_factory.py index 564c554..80d9069 100644 --- a/pyinfra/component_factory.py +++ b/pyinfra/component_factory.py @@ -5,6 +5,7 @@ from funcy import project, identity, rcompose from pyinfra.callback import Callback from pyinfra.config import parse_disjunction_string +from pyinfra.file_descriptor_builder import RedFileDescriptorBuilder from pyinfra.file_descriptor_manager import FileDescriptorManager from pyinfra.pipeline_factory import CachedPipelineFactory from pyinfra.queue.consumer import Consumer @@ -84,13 +85,20 @@ class ComponentFactory: def get_file_descriptor_manager(self): return FileDescriptorManager( bucket_name=parse_disjunction_string(self.config.storage.bucket), - operation2file_patterns=self.get_operation2file_patterns(), + file_descriptor_builder=self.get_operation_file_descriptor_builder(), ) @lru_cache(maxsize=None) def get_upload_formatter(self): return {"identity": identity, "projecting": ProjectingUploadFormatter()}[self.config.service.upload_formatter] + @lru_cache(maxsize=None) + def get_operation_file_descriptor_builder(self): + return RedFileDescriptorBuilder( + operation2file_patterns=self.get_operation2file_patterns(), + default_operation_name=self.config.service.operation, + ) + @lru_cache(maxsize=None) def get_response_formatter(self): return {"default": DefaultResponseFormatter(), "identity": IdentityResponseFormatter()}[ @@ -99,6 +107,8 @@ class ComponentFactory: @lru_cache(maxsize=None) def get_operation2file_patterns(self): + if self.config.service.operation is not "default": + self.config.service.operations["default"] = self.config.service.operations[self.config.service.operation] return self.config.service.operations @lru_cache(maxsize=None) diff --git a/pyinfra/file_descriptor_builder.py b/pyinfra/file_descriptor_builder.py new file mode 100644 index 0000000..0dec1e9 --- /dev/null +++ b/pyinfra/file_descriptor_builder.py @@ -0,0 +1,99 @@ +import abc +import os +from operator import itemgetter + +from funcy import project + + +class FileDescriptorBuilder: + @abc.abstractmethod + def build_file_descriptor(self, queue_item_body, end="input"): + raise NotImplementedError + + @abc.abstractmethod + def build_matcher(self, file_descriptor): + raise NotImplementedError + + @staticmethod + @abc.abstractmethod + def build_storage_upload_info(analysis_payload, request_metadata): + raise NotImplementedError + + @abc.abstractmethod + def get_path_prefix(self, queue_item_body): + raise NotImplementedError + + +class RedFileDescriptorBuilder(FileDescriptorBuilder): + """Defines concrete descriptors for storage objects based on queue messages""" + + def __init__(self, operation2file_patterns, default_operation_name): + + self.operation2file_patterns = operation2file_patterns or self.get_default_operation2file_patterns() + self.default_operation_name = default_operation_name + + @staticmethod + def get_default_operation2file_patterns(): + return {"default": {"input": {"subdir": "", "extension": ".in"}, "output": {"subdir": "", "extension": ".out"}}} + + def build_file_descriptor(self, queue_item_body, end="input"): + + def pages(): + if end == "input": + if "id" in queue_item_body: + return [queue_item_body["id"]] + else: + return queue_item_body["pages"] if file_pattern["multi"] else [] + elif end == "output": + return [queue_item_body["id"]] + else: + raise ValueError(f"Invalid argument: {end=}") # TODO: use an enum for `end` + + operation = queue_item_body.get("operation", self.default_operation_name) + + file_pattern = self.operation2file_patterns[operation][end] + + file_descriptor = { + **project(queue_item_body, ["dossierId", "fileId", "pages"]), + "pages": pages(), + "extension": file_pattern["extension"], + "subdir": file_pattern["subdir"], + } + + return file_descriptor + + def build_matcher(self, file_descriptor): + def make_filename(file_id, subdir, suffix): + return os.path.join(file_id, subdir, suffix) if subdir else f"{file_id}.{suffix}" + + dossier_id, file_id, subdir, pages, extension = itemgetter( + "dossierId", "fileId", "subdir", "pages", "extension" + )(file_descriptor) + + matcher = os.path.join( + dossier_id, make_filename(file_id, subdir, self.__build_page_regex(pages, subdir) + extension) + ) + + return matcher + + @staticmethod + def __build_page_regex(pages, subdir): + + n_pages = len(pages) + if n_pages > 1: + page_re = "id:(" + "|".join(map(str, pages)) + ")." + elif n_pages == 1: + page_re = f"id:{pages[0]}." + else: # no pages specified -> either all pages or no pages, depending on whether a subdir is specified + page_re = r"id:\d+." if subdir else "" + + return page_re + + @staticmethod + def build_storage_upload_info(analysis_payload, request_metadata): + storage_upload_info = {**request_metadata, "id": analysis_payload["metadata"].get("id", 0)} + return storage_upload_info + + def get_path_prefix(self, queue_item_body): + prefix = "/".join(itemgetter("dossierId", "fileId")(self.build_file_descriptor(queue_item_body, end="input"))) + return prefix diff --git a/pyinfra/file_descriptor_manager.py b/pyinfra/file_descriptor_manager.py index 1ca8292..13e715d 100644 --- a/pyinfra/file_descriptor_manager.py +++ b/pyinfra/file_descriptor_manager.py @@ -1,17 +1,18 @@ -import os -from _operator import itemgetter - -from funcy import project +from pyinfra.file_descriptor_builder import FileDescriptorBuilder class FileDescriptorManager: - def __init__(self, bucket_name, operation2file_patterns: dict = None): - self.bucket_name = bucket_name - self.operation2file_patterns = operation2file_patterns or self.get_default_operation2file_patterns() + """Decorates a file descriptor builder with additional convenience functionality and this way provides a + comprehensive interface for all file descriptor related operations, while the concrete descriptor logic is + implemented in a file descriptor builder. - @staticmethod - def get_default_operation2file_patterns(): - return {"default": {"input": {"subdir": "", "extension": ".in"}, "output": {"subdir": "", "extension": ".out"}}} + TODO: This is supposed to be fully decoupled from the concrete file descriptor builder implementation, however some + bad coupling is still left. + """ + + def __init__(self, bucket_name, file_descriptor_builder: FileDescriptorBuilder): + self.bucket_name = bucket_name + self.operation_file_descriptor_builder = file_descriptor_builder def get_input_object_name(self, queue_item_body: dict): return self.get_object_name(queue_item_body, end="input") @@ -20,46 +21,13 @@ class FileDescriptorManager: return self.get_object_name(queue_item_body, end="output") def get_object_name(self, queue_item_body: dict, end): - file_descriptor = self.build_file_descriptor(queue_item_body, end=end) - file_descriptor["pages"] = [queue_item_body.get("id", 0)] - object_name = self.__build_matcher(file_descriptor) return object_name - @staticmethod - def __build_matcher(file_descriptor): - def make_filename(file_id, subdir, suffix): - return os.path.join(file_id, subdir, suffix) if subdir else f"{file_id}.{suffix}" - - dossier_id, file_id, subdir, pages, extension = itemgetter( - "dossierId", "fileId", "subdir", "pages", "extension" - )(file_descriptor) - - if len(pages) > 1 and subdir: - page_re = "id:(" + "|".join(map(str, pages)) + ")." - elif len(pages) == 1 and subdir: - page_re = f"id:{pages[0]}." - else: - page_re = "" - - matcher = os.path.join(dossier_id, make_filename(file_id, subdir, page_re + extension)) - - return matcher - def build_file_descriptor(self, queue_item_body, end="input"): - operation = queue_item_body.get("operation", "default") - - file_pattern = self.operation2file_patterns[operation][end] - - file_descriptor = { - **project(queue_item_body, ["dossierId", "fileId", "pages"]), - "pages": queue_item_body.get("pages", []), - "extension": file_pattern["extension"], - "subdir": file_pattern["subdir"], - } - return file_descriptor + return self.operation_file_descriptor_builder.build_file_descriptor(queue_item_body, end=end) def build_input_matcher(self, queue_item_body): return self.build_matcher(queue_item_body, end="input") @@ -71,6 +39,9 @@ class FileDescriptorManager: file_descriptor = self.build_file_descriptor(queue_item_body, end=end) return self.__build_matcher(file_descriptor) + def __build_matcher(self, file_descriptor): + return self.operation_file_descriptor_builder.build_matcher(file_descriptor) + def get_input_object_descriptor(self, queue_item_body): return self.get_object_descriptor(queue_item_body, end="input") @@ -78,21 +49,15 @@ class FileDescriptorManager: return self.get_object_descriptor(storage_upload_info, end="output") def get_object_descriptor(self, queue_item_body, end): + # TODO: this is complected with the Storage class API + # FIXME: bad coupling return { "bucket_name": self.bucket_name, "object_name": self.get_object_name(queue_item_body, end=end), } - @staticmethod - def build_storage_upload_info(analysis_payload, request_metadata): - storage_upload_info = {**request_metadata, "id": analysis_payload["metadata"].get("id", 0)} - return storage_upload_info + def build_storage_upload_info(self, analysis_payload, request_metadata): + return self.operation_file_descriptor_builder.build_storage_upload_info(analysis_payload, request_metadata) def get_path_prefix(self, queue_item_body): - - prefix = "/".join( - itemgetter("dossierId", "fileId")( - self.build_file_descriptor(queue_item_body, end="input"), - ) - ) - return prefix + return self.operation_file_descriptor_builder.get_path_prefix(queue_item_body) diff --git a/pyinfra/server/buffering/bufferize.py b/pyinfra/server/buffering/bufferize.py index 0b152e0..5b2b4e4 100644 --- a/pyinfra/server/buffering/bufferize.py +++ b/pyinfra/server/buffering/bufferize.py @@ -4,7 +4,7 @@ from collections import deque from funcy import repeatedly, identity from pyinfra.exceptions import NoBufferCapacity -from pyinfra.server.dispatcher.dispatcher import Nothing +from pyinfra.server.nothing import Nothing logger = logging.getLogger(__name__) diff --git a/pyinfra/server/buffering/queue.py b/pyinfra/server/buffering/queue.py index b7c66a4..c6bd187 100644 --- a/pyinfra/server/buffering/queue.py +++ b/pyinfra/server/buffering/queue.py @@ -3,7 +3,7 @@ from itertools import takewhile from funcy import repeatedly -from pyinfra.server.dispatcher.dispatcher import is_not_nothing, Nothing +from pyinfra.server.nothing import is_not_nothing, Nothing def stream_queue(queue): diff --git a/pyinfra/server/buffering/stream.py b/pyinfra/server/buffering/stream.py index 1033968..f8a4adb 100644 --- a/pyinfra/server/buffering/stream.py +++ b/pyinfra/server/buffering/stream.py @@ -4,7 +4,7 @@ from typing import Iterable from funcy import first, repeatedly, mapcat from pyinfra.server.buffering.bufferize import bufferize -from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing +from pyinfra.server.nothing import Nothing, is_not_nothing class FlatStreamBuffer: diff --git a/pyinfra/server/dispatcher/dispatcher.py b/pyinfra/server/dispatcher/dispatcher.py index 62b5fc6..aabfce1 100644 --- a/pyinfra/server/dispatcher/dispatcher.py +++ b/pyinfra/server/dispatcher/dispatcher.py @@ -3,13 +3,7 @@ from typing import Iterable from more_itertools import peekable - -class Nothing: - pass - - -def is_not_nothing(x): - return x is not Nothing +from pyinfra.server.nothing import Nothing def has_next(peekable_iter): diff --git a/pyinfra/server/nothing.py b/pyinfra/server/nothing.py new file mode 100644 index 0000000..6d690f8 --- /dev/null +++ b/pyinfra/server/nothing.py @@ -0,0 +1,6 @@ +class Nothing: + pass + + +def is_not_nothing(x): + return x is not Nothing \ No newline at end of file diff --git a/pyinfra/server/rest.py b/pyinfra/server/rest.py deleted file mode 100644 index 4be6374..0000000 --- a/pyinfra/server/rest.py +++ /dev/null @@ -1,39 +0,0 @@ -# from typing import Iterable -# -# from funcy import rcompose -# -# from pyinfra.server.dispatcher.dispatchers.rest import RestDispatcher -# from pyinfra.server.interpreter.interpreters.identity import IdentityInterpreter -# from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStreamer -# from pyinfra.server.packer.packers.rest import RestPacker -# from pyinfra.server.client_pipeline import ClientPipeline -# from pyinfra.server.receiver.receivers.rest import RestReceiver -# -# -# def process_eagerly(endpoint, data: Iterable[bytes], metadata: Iterable[dict]): -# """Posts `data` to `url` and aggregates responses for each element of `data`.""" -# pipeline = get_eager_pipeline(endpoint) -# yield from pipeline(data, metadata) -# -# -# def process_lazily(endpoint, data: Iterable[bytes], metadata: Iterable[dict]): -# """Posts `data` to `url` and aggregates responses for each element of `data` by querying a pickup endpoint. -# -# Requires: -# - responses must provide return pickup_endpoint as JSON payload -# - responses must have status code 206 for more responses coming and 204 for the last response already sent -# """ -# pipeline = get_lazy_pipeline(endpoint) -# yield from pipeline(data, metadata) -# -# -# def get_eager_pipeline(endpoint): -# return ClientPipeline(*pipeline_head(endpoint), IdentityInterpreter()) -# -# -# def get_lazy_pipeline(endpoint): -# return ClientPipeline(*pipeline_head(endpoint), rcompose(RestPickupStreamer(), RestReceiver())) -# -# -# def pipeline_head(url): -# return RestPacker(), RestDispatcher(url), RestReceiver() diff --git a/pyinfra/server/stream/rest.py b/pyinfra/server/stream/rest.py index 8a9f3ff..0b17266 100644 --- a/pyinfra/server/stream/rest.py +++ b/pyinfra/server/stream/rest.py @@ -3,10 +3,10 @@ import logging from flask import jsonify from funcy import drop -from pyinfra.server.dispatcher.dispatcher import Nothing +from pyinfra.server.nothing import Nothing from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction -logger = logging.getLogger() +logger = logging.getLogger(__name__) class LazyRestProcessor: @@ -27,10 +27,12 @@ class LazyRestProcessor: result = Nothing if result is Nothing: + logger.info("Analysis completed successfully.") resp = jsonify("No more items left") resp.status_code = 204 else: + logger.debug("Partial analysis completed.") resp = jsonify(result) resp.status_code = 206 diff --git a/pyinfra/server/utils.py b/pyinfra/server/utils.py index 57c2fa1..b98b710 100644 --- a/pyinfra/server/utils.py +++ b/pyinfra/server/utils.py @@ -12,4 +12,5 @@ def make_streamable_and_wrap_in_packing_logic(fn, batched): def make_streamable(fn, batched): + # FIXME: something broken with batched == True return compose(normalize, (identity if batched else starlift)(fn)) diff --git a/pyinfra/storage/adapters/azure.py b/pyinfra/storage/adapters/azure.py index 074a9d6..6e6edcf 100644 --- a/pyinfra/storage/adapters/azure.py +++ b/pyinfra/storage/adapters/azure.py @@ -5,7 +5,7 @@ from azure.storage.blob import ContainerClient, BlobServiceClient from pyinfra.storage.adapters.adapter import StorageAdapter -logger = logging.getLogger() +logger = logging.getLogger(__name__) logging.getLogger("azure").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) diff --git a/pyinfra/storage/adapters/s3.py b/pyinfra/storage/adapters/s3.py index a7729df..cb06c67 100644 --- a/pyinfra/storage/adapters/s3.py +++ b/pyinfra/storage/adapters/s3.py @@ -8,7 +8,7 @@ from minio import Minio from pyinfra.exceptions import DataLoadingFailure from pyinfra.storage.adapters.adapter import StorageAdapter -logger = logging.getLogger() +logger = logging.getLogger(__name__) class S3StorageAdapter(StorageAdapter): diff --git a/pyinfra/visitor/downloader.py b/pyinfra/visitor/downloader.py index ec5573e..f7a87dc 100644 --- a/pyinfra/visitor/downloader.py +++ b/pyinfra/visitor/downloader.py @@ -30,11 +30,12 @@ class Downloader: logger.debug(f"Filtering objects in bucket {self.bucket_name} by pattern...") names_of_relevant_objects = compose( + list, self.get_pattern_filter(queue_item_body), self.get_names_of_all_associated_objects, )(queue_item_body) - logger.debug(f"Completed filtering.") + logger.debug(f"Found {len(names_of_relevant_objects)} objects matching filter.") return names_of_relevant_objects @@ -51,5 +52,7 @@ class Downloader: def get_pattern_filter(self, queue_item_body): file_pattern = self.file_descriptor_manager.build_input_matcher(queue_item_body) + + logger.debug(f"Filtering pattern: {file_pattern if len(file_pattern) <= 120 else (file_pattern[:120]+'...')}") matches_pattern = flift(file_pattern) return matches_pattern diff --git a/pyinfra/visitor/strategies/response/aggregation.py b/pyinfra/visitor/strategies/response/aggregation.py index b8ac6a7..57557b7 100644 --- a/pyinfra/visitor/strategies/response/aggregation.py +++ b/pyinfra/visitor/strategies/response/aggregation.py @@ -6,7 +6,7 @@ from funcy import omit, filter, first, lpluck, identity from more_itertools import peekable from pyinfra.file_descriptor_manager import FileDescriptorManager -from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing +from pyinfra.server.nothing import Nothing, is_not_nothing from pyinfra.utils.encoding import pack_analysis_payload from pyinfra.visitor.strategies.response.response import ResponseStrategy diff --git a/scripts/mock_client.py b/scripts/mock_client.py index 34e007b..485cc57 100644 --- a/scripts/mock_client.py +++ b/scripts/mock_client.py @@ -9,7 +9,7 @@ from pyinfra.storage.storages import get_s3_storage def parse_args(): parser = argparse.ArgumentParser() - parser.add_argument("--bucket_name", "-b", required=True) + parser.add_argument("--bucket_name", "-b") parser.add_argument( "--analysis_container", "-a", @@ -51,7 +51,7 @@ def make_connection() -> pika.BlockingConnection: def build_message_bodies(analysis_container, bucket_name): def update_message(message_dict): if analysis_container == "detr" or analysis_container == "image": - message_dict.update({"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "IMAGE_INFO.json.gz"}) + message_dict.update({"pages": []}) if analysis_container == "conversion": message_dict.update( { @@ -97,7 +97,9 @@ def main(args): declare_queue(channel, CONFIG.rabbitmq.queues.input) declare_queue(channel, CONFIG.rabbitmq.queues.output) - for body in build_message_bodies(args.analysis_container, args.bucket_name): + bucket_name = args.bucket_name or parse_disjunction_string(CONFIG.storage.bucket) + + for body in build_message_bodies(args.analysis_container, bucket_name): channel.basic_publish("", CONFIG.rabbitmq.queues.input, body) print(f"Put {body} on {CONFIG.rabbitmq.queues.input}") diff --git a/scripts/show_compressed_json.py b/scripts/show_compressed_json.py index ce74729..57976dc 100644 --- a/scripts/show_compressed_json.py +++ b/scripts/show_compressed_json.py @@ -2,25 +2,36 @@ import argparse import gzip import json -from pyinfra.server.packing import bytes_to_string +from funcy import lmap + +from pyinfra.server.packing import string_to_bytes def parse_args(): parser = argparse.ArgumentParser() - parser.add_argument("--compressed_json", "-j", required=True) + parser.add_argument("compressed_json_path", help="Path to compressed JSON file") return parser.parse_args() +def interpret(parsed): + try: + return {**parsed, "data": str(string_to_bytes(parsed["data"]))} + except KeyError: + return parsed + + def main(fp): with open(fp, "rb") as f: - compressed_json = f.read() + compressed_json_path = f.read() - json_str = gzip.decompress(compressed_json) - json_dict = json.loads(json_str) + json_str = gzip.decompress(compressed_json_path) + parsed = json.loads(json_str) + parsed = [parsed] if isinstance(parsed, dict) else parsed + parsed = lmap(interpret, parsed) - print(json.dumps(json_dict, indent=2)) + print(json.dumps(parsed, indent=2)) if __name__ == "__main__": - fp = parse_args().compressed_json - main(fp) + args = parse_args() + main(args.compressed_json_path) diff --git a/test/config.yaml b/test/config.yaml index fc5ea30..2a14dcb 100644 --- a/test/config.yaml +++ b/test/config.yaml @@ -1,19 +1,53 @@ service: response_formatter: identity operations: - multi_inp_op: + upper: input: - subdir: op_inp_files - extension: IN.gz + subdir: "" + extension: up_in.gz + multi: False output: - subdir: op_outp_files - extension: OUT.gz + subdir: "" + extension: up_out.gz + extract: + input: + subdir: "" + extension: extr_in.gz + multi: False + output: + subdir: "extractions" + extension: gz + rotate: + input: + subdir: "" + extension: rot_in.gz + multi: False + output: + subdir: "" + extension: rot_out.gz + classify: + input: + subdir: "" + extension: cls_in.gz + multi: True + output: + subdir: "" + extension: cls_out.gz + stream_pages: + input: + subdir: "" + extension: pgs_in.gz + multi: False + output: + subdir: "pages" + extension: pgs_out.gz default: input: subdir: "" extension: IN.gz + multi: False output: - subdir: op_outp_files + subdir: "" extension: OUT.gz storage: diff --git a/test/fixtures/consumer.py b/test/fixtures/consumer.py index d4e3220..7f95699 100644 --- a/test/fixtures/consumer.py +++ b/test/fixtures/consumer.py @@ -26,8 +26,8 @@ def pair_data_with_queue_message(data: Iterable[bytes]): def inner(): for i, d in enumerate(data): body = { - "dossierId": "folder", - "fileId": f"file{i}", + "dossierId": "dossier_id", + "fileId": f"file_id_{i}", "targetFileExtension": "in.gz", "responseFileExtension": "out.gz", "pages": [0, 2, 3], diff --git a/test/fixtures/input.py b/test/fixtures/input.py index 6ef1365..e924d71 100644 --- a/test/fixtures/input.py +++ b/test/fixtures/input.py @@ -6,8 +6,8 @@ import pytest from PIL import Image from funcy import lmap, compose, flatten, lflatten, omit, second, first, lzip, merge -from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.normalization import normalize_item +from pyinfra.server.nothing import Nothing from pyinfra.server.packing import pack, unpack from pyinfra.utils.func import star, lift, lstarlift from test.utils.image import image_to_bytes @@ -150,11 +150,11 @@ def queue_message_metadata(n_items, operation_name): def metadata(i): return merge( { - "dossierId": "folder", - "fileId": f"file{i}", - "pages": [0, 2, 3], + "dossierId": "dossier_id", + "fileId": f"file_id_{i}", }, ({"operation": operation_name} if operation_name else {}), + ({"pages": [0, 2, 3]} if n_items > 1 else {}), ) return lmap(metadata, range(n_items)) diff --git a/test/fixtures/server.py b/test/fixtures/server.py index ebac679..f2b02dd 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -13,7 +13,7 @@ from PIL import Image from funcy import retry, project, omit from waitress import serve -from pyinfra.server.dispatcher.dispatcher import Nothing +from pyinfra.server.nothing import Nothing from pyinfra.server.server import ( set_up_processing_server, ) @@ -51,8 +51,9 @@ def server(server_stream_function, buffer_size, operation_name): @pytest.fixture -def operation_name(many_to_n): - return "multi_inp_op" if many_to_n else "" +def operation_name(core_operation): + operation_name = core_operation.__name__ + return operation_name @pytest.fixture @@ -99,16 +100,19 @@ def server_side_test(request): @pytest.fixture -def core_operation(item_type, one_to_many, analysis_task): +def core_operation(many_to_n, item_type, one_to_many, analysis_task): def duplicate(string: bytes, metadata): for _ in range(2): yield upper(string, metadata), metadata def upper(string: bytes, metadata): - return string.decode().upper().encode(), metadata + r = string.decode().upper().encode(), metadata + return r def extract(string: bytes, metadata): - for i, c in project(dict(enumerate(string.decode())), metadata["pages"]).items(): + for i, c in project( + dict(enumerate(string.decode())), metadata.get("pages", range(len(string.decode()))) + ).items(): metadata["id"] = i yield c.encode(), metadata @@ -126,18 +130,34 @@ def core_operation(item_type, one_to_many, analysis_task): yield f"page_{i}".encode(), metadata params2op = { - False: { - "string": {False: upper}, - "image": {False: rotate, True: classify}, - }, True: { - "string": {False: extract}, - "pdf": {False: stream_pages}, + True: {}, + False: { + "image": {True: classify}, + }, }, + False: { + True: { + "string": {False: extract}, + "pdf": {False: stream_pages}, + }, + False: { + "string": {False: upper}, + "image": {False: rotate}, + }, + }, + # False: { + # "string": {False: upper}, + # "image": {False: rotate, True: classify}, + # }, + # True: { + # "string": {False: extract}, + # "pdf": {False: stream_pages}, + # }, } try: - return params2op[one_to_many][item_type][analysis_task] + return params2op[many_to_n][one_to_many][item_type][analysis_task] except KeyError: msg = f"No operation defined for [{one_to_many=}, {item_type=}, {analysis_task=}]." pytest.skip(msg) diff --git a/test/integration_tests/serve_test.py b/test/integration_tests/serve_test.py index 6ce0647..8fe1d15 100644 --- a/test/integration_tests/serve_test.py +++ b/test/integration_tests/serve_test.py @@ -37,7 +37,7 @@ from test.config import CONFIG as TEST_CONFIG @pytest.mark.parametrize( "n_items", [ - 1, + # 1, 3, ], ) @@ -79,10 +79,12 @@ from test.config import CONFIG as TEST_CONFIG "many_to_n", [ True, - # False, + False, ], ) -def test_serving(server_process, bucket_name, components, targets, data_message_pairs, n_items, many_to_n): +def test_serving( + operation_name, server_process, bucket_name, components, targets, data_message_pairs, n_items, many_to_n +): storage, queue_manager, consumer, file_descriptor_manager = components @@ -146,7 +148,7 @@ def upload_data_to_folder_in_storage_and_publish_single_request_to_queue( pages = ref_message["pages"] for data, page in zip(map(first, data_message_pairs), pages): - object_descriptor = file_descriptor_manager.get_input_object_descriptor(ref_message) + object_descriptor = file_descriptor_manager.get_input_object_descriptor({**ref_message, "id": page}) object_descriptor["object_name"] = build_filepath(object_descriptor, page) storage.put_object(**object_descriptor, data=compress(data)) diff --git a/test/unit_tests/queue_visitor_test.py b/test/unit_tests/queue_visitor_test.py index 4ad298c..0fd1f01 100644 --- a/test/unit_tests/queue_visitor_test.py +++ b/test/unit_tests/queue_visitor_test.py @@ -9,7 +9,7 @@ from test.utils.storage import pack_for_upload @pytest.fixture() def body(): - return {"dossierId": "folder", "fileId": "file"} + return {"dossierId": "dossier_id", "fileId": "file_id"} @pytest.mark.parametrize("client_name", ["mock", "azure", "s3"], scope="session") diff --git a/test/unit_tests/server/buffer_test.py b/test/unit_tests/server/buffer_test.py index 828308f..e420343 100644 --- a/test/unit_tests/server/buffer_test.py +++ b/test/unit_tests/server/buffer_test.py @@ -3,7 +3,7 @@ from funcy import compose, lmapcat, compact, flatten, identity from pyinfra.exceptions import NoBufferCapacity from pyinfra.server.buffering.bufferize import bufferize -from pyinfra.server.dispatcher.dispatcher import Nothing +from pyinfra.server.nothing import Nothing def test_buffer(buffer_size): diff --git a/test/unit_tests/server/pipeline_test.py b/test/unit_tests/server/pipeline_test.py index 50d60e6..a9a83ac 100644 --- a/test/unit_tests/server/pipeline_test.py +++ b/test/unit_tests/server/pipeline_test.py @@ -3,7 +3,7 @@ from funcy import rcompose, compose, project, merge from pyinfra.server.buffering.stream import FlatStreamBuffer from pyinfra.server.client_pipeline import ClientPipeline -from pyinfra.server.dispatcher.dispatcher import Nothing +from pyinfra.server.nothing import Nothing from pyinfra.server.dispatcher.dispatchers.queue import QueuedStreamFunctionDispatcher from pyinfra.server.dispatcher.dispatchers.rest import RestDispatcher from pyinfra.server.interpreter.interpreters.identity import IdentityInterpreter diff --git a/test/unit_tests/server/stream_buffer_test.py b/test/unit_tests/server/stream_buffer_test.py index 3f2fa98..ec2ef2c 100644 --- a/test/unit_tests/server/stream_buffer_test.py +++ b/test/unit_tests/server/stream_buffer_test.py @@ -2,7 +2,7 @@ import pytest from funcy import repeatedly, takewhile, notnone, lmap, lmapcat, lflatten from pyinfra.server.buffering.stream import FlatStreamBuffer, StreamBuffer -from pyinfra.server.dispatcher.dispatcher import Nothing +from pyinfra.server.nothing import Nothing from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction from pyinfra.utils.func import lift, foreach, starlift