Pull request #43: Image prediction v2 support

Merge in RR/pyinfra from image-prediction-v2-support to 2.0.0

Squashed commit of the following:

commit 37c536324e847357e86dd9b72d1e07ad792ed90f
Merge: 77d1db8 01bfb1d
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Mon Jul 11 13:53:56 2022 +0200

    Merge branch '2.0.0' of ssh://git.iqser.com:2222/rr/pyinfra into image-prediction-v2-support

commit 77d1db8e8630de8822c124eb39f4cd817ed1d3e1
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Mon Jul 11 13:07:41 2022 +0200

    add operation assignment via config if operation is not defined by caller

commit 36c8ca48a8c6151f713c093a23de110901ba6b02
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Mon Jul 11 10:33:34 2022 +0200

    refactor nothing part 2

commit f6cd0ef986802554dd544b9b7a24073d3b3f05b5
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Mon Jul 11 10:28:49 2022 +0200

    refactor nothing

commit 1e70d49531e89613c70903be49290b94ee014f65
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jul 6 17:42:12 2022 +0200

    enable docker-compose fixture

commit 9fee32cecdd120cfac3e065fb8ad2b4f37b49226
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jul 6 17:40:35 2022 +0200

    added 'multi' key to actual operation configurations

commit 4287f6d9878dd361489b8490eafd06f81df472ce
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jul 6 16:56:12 2022 +0200

    removed debug prints

commit 23a533e8f99222c7e598fb0864f65e9aa3508a3b
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jul 6 16:31:50 2022 +0200

    completed correcting / cleaning upload and download logic with regard to operations and ids. next: remove debug code

commit 33246d1ff94989d2ea70242c7ae2e58afa4d35c1
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jul 6 14:37:17 2022 +0200

    corrected / cleaned upload and download logic with regard to operations and ids

commit 7f2b4e882022c6843cb2f80df202caa495c54ee9
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Tue Jul 5 18:41:07 2022 +0200

    partially decomplected file descriptor manager from concrete and non-generic descriptor code

commit 40b892da17670dae3b8eba1700877c1dcf219852
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Tue Jul 5 09:53:46 2022 +0200

    typo

commit ec4fa8e6f4551ff1f8d4f78c484b7a260f274898
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Tue Jul 5 09:52:41 2022 +0200

    typo

commit 701b43403c328161fd96a73ce388a66035cca348
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jul 4 17:26:53 2022 +0200

    made adjustments for image classification with pyinfra 2.x; added related fixmes

commit 7a794bdcc987631cdc4d89b5620359464e2e018e
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jul 4 13:05:26 2022 +0200

    removed obsolete imports

commit 3fc6a7ef5d0172dbce1c4292d245eced2f378b5a
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jul 4 11:47:12 2022 +0200

    enable docker-compose fixture

commit 36d8d3bc851b06d94cf12a73048a00a67ef79c42
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jul 4 11:46:53 2022 +0200

    renaming

commit 3bf00d11cd041dff325b66f13fcd00d3ce96b8b5
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 30 12:47:57 2022 +0200

    refactoring: added cached pipeline factory

commit 90e735852af2f86e35be845fabf28494de952edb
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jun 29 13:47:08 2022 +0200

    renaming

commit 93b3d4b202b41183ed8cabe193a4bfa03f520787
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jun 29 13:25:03 2022 +0200

    further refactored server setup code: moving and decomplecting

commit 8b2ed83c7ade5bd811cb045d56fbfb0353fa385e
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jun 29 12:53:09 2022 +0200

    refactored server setup code: factored out and decoupled operation registry and prometheus summary registry

... and 6 more commits
This commit is contained in:
Julius Unverfehrt 2022-07-11 14:17:59 +02:00
parent 01bfb1d668
commit a1bfec765c
27 changed files with 280 additions and 156 deletions

View File

@ -4,9 +4,12 @@ service:
response_formatter: default # formats analysis payloads of response messages
upload_formatter: projecting # formats analysis payloads of objects uploaded to storage
# Note: This is not really the right place for this. It should be configured on a per-service basis.
operation: $OPERATION|default
# operation needs to be specified in deployment config for services that are called without an operation specified
operations:
conversion:
input:
multi: False
subdir: ""
extension: ORIGIN.pdf.gz
output:
@ -14,6 +17,7 @@ service:
extension: json.gz
extraction:
input:
multi: False
subdir: ""
extension: ORIGIN.pdf.gz
output:
@ -21,15 +25,25 @@ service:
extension: json.gz
table_parsing:
input:
multi: True
subdir: "pages_as_images"
extension: json.gz
output:
subdir: "table_parses"
extension: json.gz
image_classification:
input:
multi: True
subdir: "extracted_images"
extension: json.gz
output:
subdir: ""
extension: IMAGE_INFO.json.gz
default:
input:
multi: False
subdir: ""
extension: IN.gz
extension: in.gz
output:
subdir: ""
extension: out.gz

View File

@ -5,6 +5,7 @@ from funcy import project, identity, rcompose
from pyinfra.callback import Callback
from pyinfra.config import parse_disjunction_string
from pyinfra.file_descriptor_builder import RedFileDescriptorBuilder
from pyinfra.file_descriptor_manager import FileDescriptorManager
from pyinfra.pipeline_factory import CachedPipelineFactory
from pyinfra.queue.consumer import Consumer
@ -84,13 +85,20 @@ class ComponentFactory:
def get_file_descriptor_manager(self):
return FileDescriptorManager(
bucket_name=parse_disjunction_string(self.config.storage.bucket),
operation2file_patterns=self.get_operation2file_patterns(),
file_descriptor_builder=self.get_operation_file_descriptor_builder(),
)
@lru_cache(maxsize=None)
def get_upload_formatter(self):
return {"identity": identity, "projecting": ProjectingUploadFormatter()}[self.config.service.upload_formatter]
@lru_cache(maxsize=None)
def get_operation_file_descriptor_builder(self):
return RedFileDescriptorBuilder(
operation2file_patterns=self.get_operation2file_patterns(),
default_operation_name=self.config.service.operation,
)
@lru_cache(maxsize=None)
def get_response_formatter(self):
return {"default": DefaultResponseFormatter(), "identity": IdentityResponseFormatter()}[
@ -99,6 +107,8 @@ class ComponentFactory:
@lru_cache(maxsize=None)
def get_operation2file_patterns(self):
if self.config.service.operation is not "default":
self.config.service.operations["default"] = self.config.service.operations[self.config.service.operation]
return self.config.service.operations
@lru_cache(maxsize=None)

View File

@ -0,0 +1,99 @@
import abc
import os
from operator import itemgetter
from funcy import project
class FileDescriptorBuilder:
@abc.abstractmethod
def build_file_descriptor(self, queue_item_body, end="input"):
raise NotImplementedError
@abc.abstractmethod
def build_matcher(self, file_descriptor):
raise NotImplementedError
@staticmethod
@abc.abstractmethod
def build_storage_upload_info(analysis_payload, request_metadata):
raise NotImplementedError
@abc.abstractmethod
def get_path_prefix(self, queue_item_body):
raise NotImplementedError
class RedFileDescriptorBuilder(FileDescriptorBuilder):
"""Defines concrete descriptors for storage objects based on queue messages"""
def __init__(self, operation2file_patterns, default_operation_name):
self.operation2file_patterns = operation2file_patterns or self.get_default_operation2file_patterns()
self.default_operation_name = default_operation_name
@staticmethod
def get_default_operation2file_patterns():
return {"default": {"input": {"subdir": "", "extension": ".in"}, "output": {"subdir": "", "extension": ".out"}}}
def build_file_descriptor(self, queue_item_body, end="input"):
def pages():
if end == "input":
if "id" in queue_item_body:
return [queue_item_body["id"]]
else:
return queue_item_body["pages"] if file_pattern["multi"] else []
elif end == "output":
return [queue_item_body["id"]]
else:
raise ValueError(f"Invalid argument: {end=}") # TODO: use an enum for `end`
operation = queue_item_body.get("operation", self.default_operation_name)
file_pattern = self.operation2file_patterns[operation][end]
file_descriptor = {
**project(queue_item_body, ["dossierId", "fileId", "pages"]),
"pages": pages(),
"extension": file_pattern["extension"],
"subdir": file_pattern["subdir"],
}
return file_descriptor
def build_matcher(self, file_descriptor):
def make_filename(file_id, subdir, suffix):
return os.path.join(file_id, subdir, suffix) if subdir else f"{file_id}.{suffix}"
dossier_id, file_id, subdir, pages, extension = itemgetter(
"dossierId", "fileId", "subdir", "pages", "extension"
)(file_descriptor)
matcher = os.path.join(
dossier_id, make_filename(file_id, subdir, self.__build_page_regex(pages, subdir) + extension)
)
return matcher
@staticmethod
def __build_page_regex(pages, subdir):
n_pages = len(pages)
if n_pages > 1:
page_re = "id:(" + "|".join(map(str, pages)) + ")."
elif n_pages == 1:
page_re = f"id:{pages[0]}."
else: # no pages specified -> either all pages or no pages, depending on whether a subdir is specified
page_re = r"id:\d+." if subdir else ""
return page_re
@staticmethod
def build_storage_upload_info(analysis_payload, request_metadata):
storage_upload_info = {**request_metadata, "id": analysis_payload["metadata"].get("id", 0)}
return storage_upload_info
def get_path_prefix(self, queue_item_body):
prefix = "/".join(itemgetter("dossierId", "fileId")(self.build_file_descriptor(queue_item_body, end="input")))
return prefix

View File

@ -1,17 +1,18 @@
import os
from _operator import itemgetter
from funcy import project
from pyinfra.file_descriptor_builder import FileDescriptorBuilder
class FileDescriptorManager:
def __init__(self, bucket_name, operation2file_patterns: dict = None):
self.bucket_name = bucket_name
self.operation2file_patterns = operation2file_patterns or self.get_default_operation2file_patterns()
"""Decorates a file descriptor builder with additional convenience functionality and this way provides a
comprehensive interface for all file descriptor related operations, while the concrete descriptor logic is
implemented in a file descriptor builder.
@staticmethod
def get_default_operation2file_patterns():
return {"default": {"input": {"subdir": "", "extension": ".in"}, "output": {"subdir": "", "extension": ".out"}}}
TODO: This is supposed to be fully decoupled from the concrete file descriptor builder implementation, however some
bad coupling is still left.
"""
def __init__(self, bucket_name, file_descriptor_builder: FileDescriptorBuilder):
self.bucket_name = bucket_name
self.operation_file_descriptor_builder = file_descriptor_builder
def get_input_object_name(self, queue_item_body: dict):
return self.get_object_name(queue_item_body, end="input")
@ -20,46 +21,13 @@ class FileDescriptorManager:
return self.get_object_name(queue_item_body, end="output")
def get_object_name(self, queue_item_body: dict, end):
file_descriptor = self.build_file_descriptor(queue_item_body, end=end)
file_descriptor["pages"] = [queue_item_body.get("id", 0)]
object_name = self.__build_matcher(file_descriptor)
return object_name
@staticmethod
def __build_matcher(file_descriptor):
def make_filename(file_id, subdir, suffix):
return os.path.join(file_id, subdir, suffix) if subdir else f"{file_id}.{suffix}"
dossier_id, file_id, subdir, pages, extension = itemgetter(
"dossierId", "fileId", "subdir", "pages", "extension"
)(file_descriptor)
if len(pages) > 1 and subdir:
page_re = "id:(" + "|".join(map(str, pages)) + ")."
elif len(pages) == 1 and subdir:
page_re = f"id:{pages[0]}."
else:
page_re = ""
matcher = os.path.join(dossier_id, make_filename(file_id, subdir, page_re + extension))
return matcher
def build_file_descriptor(self, queue_item_body, end="input"):
operation = queue_item_body.get("operation", "default")
file_pattern = self.operation2file_patterns[operation][end]
file_descriptor = {
**project(queue_item_body, ["dossierId", "fileId", "pages"]),
"pages": queue_item_body.get("pages", []),
"extension": file_pattern["extension"],
"subdir": file_pattern["subdir"],
}
return file_descriptor
return self.operation_file_descriptor_builder.build_file_descriptor(queue_item_body, end=end)
def build_input_matcher(self, queue_item_body):
return self.build_matcher(queue_item_body, end="input")
@ -71,6 +39,9 @@ class FileDescriptorManager:
file_descriptor = self.build_file_descriptor(queue_item_body, end=end)
return self.__build_matcher(file_descriptor)
def __build_matcher(self, file_descriptor):
return self.operation_file_descriptor_builder.build_matcher(file_descriptor)
def get_input_object_descriptor(self, queue_item_body):
return self.get_object_descriptor(queue_item_body, end="input")
@ -78,21 +49,15 @@ class FileDescriptorManager:
return self.get_object_descriptor(storage_upload_info, end="output")
def get_object_descriptor(self, queue_item_body, end):
# TODO: this is complected with the Storage class API
# FIXME: bad coupling
return {
"bucket_name": self.bucket_name,
"object_name": self.get_object_name(queue_item_body, end=end),
}
@staticmethod
def build_storage_upload_info(analysis_payload, request_metadata):
storage_upload_info = {**request_metadata, "id": analysis_payload["metadata"].get("id", 0)}
return storage_upload_info
def build_storage_upload_info(self, analysis_payload, request_metadata):
return self.operation_file_descriptor_builder.build_storage_upload_info(analysis_payload, request_metadata)
def get_path_prefix(self, queue_item_body):
prefix = "/".join(
itemgetter("dossierId", "fileId")(
self.build_file_descriptor(queue_item_body, end="input"),
)
)
return prefix
return self.operation_file_descriptor_builder.get_path_prefix(queue_item_body)

View File

@ -4,7 +4,7 @@ from collections import deque
from funcy import repeatedly, identity
from pyinfra.exceptions import NoBufferCapacity
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.nothing import Nothing
logger = logging.getLogger(__name__)

View File

@ -3,7 +3,7 @@ from itertools import takewhile
from funcy import repeatedly
from pyinfra.server.dispatcher.dispatcher import is_not_nothing, Nothing
from pyinfra.server.nothing import is_not_nothing, Nothing
def stream_queue(queue):

View File

@ -4,7 +4,7 @@ from typing import Iterable
from funcy import first, repeatedly, mapcat
from pyinfra.server.buffering.bufferize import bufferize
from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing
from pyinfra.server.nothing import Nothing, is_not_nothing
class FlatStreamBuffer:

View File

@ -3,13 +3,7 @@ from typing import Iterable
from more_itertools import peekable
class Nothing:
pass
def is_not_nothing(x):
return x is not Nothing
from pyinfra.server.nothing import Nothing
def has_next(peekable_iter):

View File

@ -0,0 +1,6 @@
class Nothing:
pass
def is_not_nothing(x):
return x is not Nothing

View File

@ -1,39 +0,0 @@
# from typing import Iterable
#
# from funcy import rcompose
#
# from pyinfra.server.dispatcher.dispatchers.rest import RestDispatcher
# from pyinfra.server.interpreter.interpreters.identity import IdentityInterpreter
# from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStreamer
# from pyinfra.server.packer.packers.rest import RestPacker
# from pyinfra.server.client_pipeline import ClientPipeline
# from pyinfra.server.receiver.receivers.rest import RestReceiver
#
#
# def process_eagerly(endpoint, data: Iterable[bytes], metadata: Iterable[dict]):
# """Posts `data` to `url` and aggregates responses for each element of `data`."""
# pipeline = get_eager_pipeline(endpoint)
# yield from pipeline(data, metadata)
#
#
# def process_lazily(endpoint, data: Iterable[bytes], metadata: Iterable[dict]):
# """Posts `data` to `url` and aggregates responses for each element of `data` by querying a pickup endpoint.
#
# Requires:
# - responses must provide return pickup_endpoint as JSON payload
# - responses must have status code 206 for more responses coming and 204 for the last response already sent
# """
# pipeline = get_lazy_pipeline(endpoint)
# yield from pipeline(data, metadata)
#
#
# def get_eager_pipeline(endpoint):
# return ClientPipeline(*pipeline_head(endpoint), IdentityInterpreter())
#
#
# def get_lazy_pipeline(endpoint):
# return ClientPipeline(*pipeline_head(endpoint), rcompose(RestPickupStreamer(), RestReceiver()))
#
#
# def pipeline_head(url):
# return RestPacker(), RestDispatcher(url), RestReceiver()

View File

@ -3,10 +3,10 @@ import logging
from flask import jsonify
from funcy import drop
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.nothing import Nothing
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
logger = logging.getLogger()
logger = logging.getLogger(__name__)
class LazyRestProcessor:
@ -27,10 +27,12 @@ class LazyRestProcessor:
result = Nothing
if result is Nothing:
logger.info("Analysis completed successfully.")
resp = jsonify("No more items left")
resp.status_code = 204
else:
logger.debug("Partial analysis completed.")
resp = jsonify(result)
resp.status_code = 206

View File

@ -12,4 +12,5 @@ def make_streamable_and_wrap_in_packing_logic(fn, batched):
def make_streamable(fn, batched):
# FIXME: something broken with batched == True
return compose(normalize, (identity if batched else starlift)(fn))

View File

@ -5,7 +5,7 @@ from azure.storage.blob import ContainerClient, BlobServiceClient
from pyinfra.storage.adapters.adapter import StorageAdapter
logger = logging.getLogger()
logger = logging.getLogger(__name__)
logging.getLogger("azure").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)

View File

@ -8,7 +8,7 @@ from minio import Minio
from pyinfra.exceptions import DataLoadingFailure
from pyinfra.storage.adapters.adapter import StorageAdapter
logger = logging.getLogger()
logger = logging.getLogger(__name__)
class S3StorageAdapter(StorageAdapter):

View File

@ -30,11 +30,12 @@ class Downloader:
logger.debug(f"Filtering objects in bucket {self.bucket_name} by pattern...")
names_of_relevant_objects = compose(
list,
self.get_pattern_filter(queue_item_body),
self.get_names_of_all_associated_objects,
)(queue_item_body)
logger.debug(f"Completed filtering.")
logger.debug(f"Found {len(names_of_relevant_objects)} objects matching filter.")
return names_of_relevant_objects
@ -51,5 +52,7 @@ class Downloader:
def get_pattern_filter(self, queue_item_body):
file_pattern = self.file_descriptor_manager.build_input_matcher(queue_item_body)
logger.debug(f"Filtering pattern: {file_pattern if len(file_pattern) <= 120 else (file_pattern[:120]+'...')}")
matches_pattern = flift(file_pattern)
return matches_pattern

View File

@ -6,7 +6,7 @@ from funcy import omit, filter, first, lpluck, identity
from more_itertools import peekable
from pyinfra.file_descriptor_manager import FileDescriptorManager
from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing
from pyinfra.server.nothing import Nothing, is_not_nothing
from pyinfra.utils.encoding import pack_analysis_payload
from pyinfra.visitor.strategies.response.response import ResponseStrategy

View File

@ -9,7 +9,7 @@ from pyinfra.storage.storages import get_s3_storage
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--bucket_name", "-b", required=True)
parser.add_argument("--bucket_name", "-b")
parser.add_argument(
"--analysis_container",
"-a",
@ -51,7 +51,7 @@ def make_connection() -> pika.BlockingConnection:
def build_message_bodies(analysis_container, bucket_name):
def update_message(message_dict):
if analysis_container == "detr" or analysis_container == "image":
message_dict.update({"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "IMAGE_INFO.json.gz"})
message_dict.update({"pages": []})
if analysis_container == "conversion":
message_dict.update(
{
@ -97,7 +97,9 @@ def main(args):
declare_queue(channel, CONFIG.rabbitmq.queues.input)
declare_queue(channel, CONFIG.rabbitmq.queues.output)
for body in build_message_bodies(args.analysis_container, args.bucket_name):
bucket_name = args.bucket_name or parse_disjunction_string(CONFIG.storage.bucket)
for body in build_message_bodies(args.analysis_container, bucket_name):
channel.basic_publish("", CONFIG.rabbitmq.queues.input, body)
print(f"Put {body} on {CONFIG.rabbitmq.queues.input}")

View File

@ -2,25 +2,36 @@ import argparse
import gzip
import json
from pyinfra.server.packing import bytes_to_string
from funcy import lmap
from pyinfra.server.packing import string_to_bytes
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--compressed_json", "-j", required=True)
parser.add_argument("compressed_json_path", help="Path to compressed JSON file")
return parser.parse_args()
def interpret(parsed):
try:
return {**parsed, "data": str(string_to_bytes(parsed["data"]))}
except KeyError:
return parsed
def main(fp):
with open(fp, "rb") as f:
compressed_json = f.read()
compressed_json_path = f.read()
json_str = gzip.decompress(compressed_json)
json_dict = json.loads(json_str)
json_str = gzip.decompress(compressed_json_path)
parsed = json.loads(json_str)
parsed = [parsed] if isinstance(parsed, dict) else parsed
parsed = lmap(interpret, parsed)
print(json.dumps(json_dict, indent=2))
print(json.dumps(parsed, indent=2))
if __name__ == "__main__":
fp = parse_args().compressed_json
main(fp)
args = parse_args()
main(args.compressed_json_path)

View File

@ -1,19 +1,53 @@
service:
response_formatter: identity
operations:
multi_inp_op:
upper:
input:
subdir: op_inp_files
extension: IN.gz
subdir: ""
extension: up_in.gz
multi: False
output:
subdir: op_outp_files
extension: OUT.gz
subdir: ""
extension: up_out.gz
extract:
input:
subdir: ""
extension: extr_in.gz
multi: False
output:
subdir: "extractions"
extension: gz
rotate:
input:
subdir: ""
extension: rot_in.gz
multi: False
output:
subdir: ""
extension: rot_out.gz
classify:
input:
subdir: ""
extension: cls_in.gz
multi: True
output:
subdir: ""
extension: cls_out.gz
stream_pages:
input:
subdir: ""
extension: pgs_in.gz
multi: False
output:
subdir: "pages"
extension: pgs_out.gz
default:
input:
subdir: ""
extension: IN.gz
multi: False
output:
subdir: op_outp_files
subdir: ""
extension: OUT.gz
storage:

View File

@ -26,8 +26,8 @@ def pair_data_with_queue_message(data: Iterable[bytes]):
def inner():
for i, d in enumerate(data):
body = {
"dossierId": "folder",
"fileId": f"file{i}",
"dossierId": "dossier_id",
"fileId": f"file_id_{i}",
"targetFileExtension": "in.gz",
"responseFileExtension": "out.gz",
"pages": [0, 2, 3],

View File

@ -6,8 +6,8 @@ import pytest
from PIL import Image
from funcy import lmap, compose, flatten, lflatten, omit, second, first, lzip, merge
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.normalization import normalize_item
from pyinfra.server.nothing import Nothing
from pyinfra.server.packing import pack, unpack
from pyinfra.utils.func import star, lift, lstarlift
from test.utils.image import image_to_bytes
@ -150,11 +150,11 @@ def queue_message_metadata(n_items, operation_name):
def metadata(i):
return merge(
{
"dossierId": "folder",
"fileId": f"file{i}",
"pages": [0, 2, 3],
"dossierId": "dossier_id",
"fileId": f"file_id_{i}",
},
({"operation": operation_name} if operation_name else {}),
({"pages": [0, 2, 3]} if n_items > 1 else {}),
)
return lmap(metadata, range(n_items))

View File

@ -13,7 +13,7 @@ from PIL import Image
from funcy import retry, project, omit
from waitress import serve
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.nothing import Nothing
from pyinfra.server.server import (
set_up_processing_server,
)
@ -51,8 +51,9 @@ def server(server_stream_function, buffer_size, operation_name):
@pytest.fixture
def operation_name(many_to_n):
return "multi_inp_op" if many_to_n else ""
def operation_name(core_operation):
operation_name = core_operation.__name__
return operation_name
@pytest.fixture
@ -99,16 +100,19 @@ def server_side_test(request):
@pytest.fixture
def core_operation(item_type, one_to_many, analysis_task):
def core_operation(many_to_n, item_type, one_to_many, analysis_task):
def duplicate(string: bytes, metadata):
for _ in range(2):
yield upper(string, metadata), metadata
def upper(string: bytes, metadata):
return string.decode().upper().encode(), metadata
r = string.decode().upper().encode(), metadata
return r
def extract(string: bytes, metadata):
for i, c in project(dict(enumerate(string.decode())), metadata["pages"]).items():
for i, c in project(
dict(enumerate(string.decode())), metadata.get("pages", range(len(string.decode())))
).items():
metadata["id"] = i
yield c.encode(), metadata
@ -126,18 +130,34 @@ def core_operation(item_type, one_to_many, analysis_task):
yield f"page_{i}".encode(), metadata
params2op = {
False: {
"string": {False: upper},
"image": {False: rotate, True: classify},
},
True: {
"string": {False: extract},
"pdf": {False: stream_pages},
True: {},
False: {
"image": {True: classify},
},
},
False: {
True: {
"string": {False: extract},
"pdf": {False: stream_pages},
},
False: {
"string": {False: upper},
"image": {False: rotate},
},
},
# False: {
# "string": {False: upper},
# "image": {False: rotate, True: classify},
# },
# True: {
# "string": {False: extract},
# "pdf": {False: stream_pages},
# },
}
try:
return params2op[one_to_many][item_type][analysis_task]
return params2op[many_to_n][one_to_many][item_type][analysis_task]
except KeyError:
msg = f"No operation defined for [{one_to_many=}, {item_type=}, {analysis_task=}]."
pytest.skip(msg)

View File

@ -37,7 +37,7 @@ from test.config import CONFIG as TEST_CONFIG
@pytest.mark.parametrize(
"n_items",
[
1,
# 1,
3,
],
)
@ -79,10 +79,12 @@ from test.config import CONFIG as TEST_CONFIG
"many_to_n",
[
True,
# False,
False,
],
)
def test_serving(server_process, bucket_name, components, targets, data_message_pairs, n_items, many_to_n):
def test_serving(
operation_name, server_process, bucket_name, components, targets, data_message_pairs, n_items, many_to_n
):
storage, queue_manager, consumer, file_descriptor_manager = components
@ -146,7 +148,7 @@ def upload_data_to_folder_in_storage_and_publish_single_request_to_queue(
pages = ref_message["pages"]
for data, page in zip(map(first, data_message_pairs), pages):
object_descriptor = file_descriptor_manager.get_input_object_descriptor(ref_message)
object_descriptor = file_descriptor_manager.get_input_object_descriptor({**ref_message, "id": page})
object_descriptor["object_name"] = build_filepath(object_descriptor, page)
storage.put_object(**object_descriptor, data=compress(data))

View File

@ -9,7 +9,7 @@ from test.utils.storage import pack_for_upload
@pytest.fixture()
def body():
return {"dossierId": "folder", "fileId": "file"}
return {"dossierId": "dossier_id", "fileId": "file_id"}
@pytest.mark.parametrize("client_name", ["mock", "azure", "s3"], scope="session")

View File

@ -3,7 +3,7 @@ from funcy import compose, lmapcat, compact, flatten, identity
from pyinfra.exceptions import NoBufferCapacity
from pyinfra.server.buffering.bufferize import bufferize
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.nothing import Nothing
def test_buffer(buffer_size):

View File

@ -3,7 +3,7 @@ from funcy import rcompose, compose, project, merge
from pyinfra.server.buffering.stream import FlatStreamBuffer
from pyinfra.server.client_pipeline import ClientPipeline
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.nothing import Nothing
from pyinfra.server.dispatcher.dispatchers.queue import QueuedStreamFunctionDispatcher
from pyinfra.server.dispatcher.dispatchers.rest import RestDispatcher
from pyinfra.server.interpreter.interpreters.identity import IdentityInterpreter

View File

@ -2,7 +2,7 @@ import pytest
from funcy import repeatedly, takewhile, notnone, lmap, lmapcat, lflatten
from pyinfra.server.buffering.stream import FlatStreamBuffer, StreamBuffer
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.nothing import Nothing
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
from pyinfra.utils.func import lift, foreach, starlift