Pull request #38: 2.0.0 input output file pattern for download strategy

Merge in RR/pyinfra from 2.0.0-input-output-file-pattern-for-download-strategy to 2.0.0

Squashed commit of the following:

commit c7ce79ebbeace6a8cb7925ed69eda2d7cd2a4783
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Fri Jun 24 12:35:29 2022 +0200

    refactor

commit 80f04e544962760adb2dc60c9dd03ccca22167d6
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Fri Jun 24 11:06:10 2022 +0200

    refactoring of component factory, callback and client-pipeline getter

commit 6c024e1a789e1d55f0739c6846e5c02e8b7c943d
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 20:04:10 2022 +0200

    operations section in config cleaned up; added upload formatter

commit c85800aefc224967cea591c1ec4cf1aaa3ac8215
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 19:22:51 2022 +0200

    refactoring; removed obsolete config entries and code

commit 4be125952d82dc868935c8c73ad87fd8f0bd1d6c
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 19:14:47 2022 +0200

    removed obsolete code

commit ac69a5c8e3f1e2fd7e828a17eeab97984f4f9746
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 18:58:41 2022 +0200

    refactoring: rm dl strat module

commit efd36d0fc4f8f36d267bfa9d35415811fe723ccc
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 18:33:51 2022 +0200

    refactoring: multi dl strat -> downloader, rm single dl strat

commit afffdeb993500a6abdb6fe85a549e3d6e97e9ee7
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 16:39:22 2022 +0200

    operations section in config cleaned up

commit 671129af3e343490e0fb277a2b0329aa3027fd73
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Thu Jun 23 16:09:16 2022 +0200

    rename prometheus metric name to include service name

commit 932a3e314b382315492aecab95b1f02f2916f8a6
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 14:43:23 2022 +0200

    cleaned up file descr mngr

commit 79350b4ce71fcd095ed6a5e1d3a598ea246fae53
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 12:26:15 2022 +0200

    refactoring WIP: moving response stratgey logic into storage strategy (needs to be refactored as well, later) and file descr mngr. Here the moved code needs to be cleaned up.

commit 7e48c66f0c378b25a433a4034eefdc8a0957e775
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 12:00:48 2022 +0200

    refactoring; removed operation / response folder from output path

commit 8e6cbdaf23c48f6eeb52512b7f382d5727e206d6
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 11:08:37 2022 +0200

    refactoring; added operation -> file pattern mapping to file descr mngr (mainly for self-documentaton purposes)

commit 2c80d7cec0cc171e099e5b13aadd2ae0f9bf4f02
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 10:59:57 2022 +0200

    refactoring: introduced input- and output-file specific methods to file descr mngr

commit ecced37150eaac3008cc1b01b235e5f7135e504b
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 10:43:26 2022 +0200

    refactoring

commit 3828341e98861ff8d63035ee983309ad5064bb30
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 10:42:46 2022 +0200

    refactoring

commit 9a7c412523d467af40feb6924823ca89e28aadfe
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Wed Jun 22 17:04:54 2022 +0200

    add prometheus metric name for default operation

commit d207b2e274ba53b2a21a18c367bb130fb05ee1cd
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Wed Jun 22 17:02:55 2022 +0200

    Merge config

commit d3fdf36b12d8def18810454765e731599b833bfc
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jun 22 17:01:12 2022 +0200

    added fixmes / todos

commit f49d0b9cb7764473ef9d127bc5d88525a4a16a23
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Wed Jun 22 16:28:25 2022 +0200

    update script

... and 47 more commits
This commit is contained in:
Matthias Bisping 2022-06-24 12:59:26 +02:00
parent 0d87c60fce
commit 94254e1681
49 changed files with 1088 additions and 668 deletions

View File

@ -36,8 +36,6 @@ A configuration is located in `/config.yaml`. All relevant variables can be conf
{
"dossierId": "",
"fileId": "",
"targetFileExtension": "",
"responseFileExtension": ""
}
```

View File

@ -1,13 +1,38 @@
service:
logging_level: $LOGGING_LEVEL_ROOT|DEBUG # Logging level for service logger
target_file_extension: $TARGET_FILE_EXTENSION|"ORIGIN.pdf.gz" # Extension for files to download from storage and process
response_file_extension: $RESPONSE_FILE_EXTENSION|"json.gz" # Extension for response files to upload to storage
# In case a service is not supposed to place response files under the root folder `dossierId/fileId` and requests do
# not specify an operation, this parameter can specify an output folder
response_folder: $RESPONSE_FOLDER|null
# Specifies, how to handle the `page` key of a request. "multi" will download all pages matching the list of pages
# specified in the request
download_strategy: $DOWNLOAD_STRATEGY|single
name: $SERVICE_NAME|research # Default service name for research service, used for prometheus metric name
response_formatter: default # formats analysis payloads of response messages
upload_formatter: projecting # formats analysis payloads of objects uploaded to storage
# Note: This is not really the right place for this. It should be configured on a per-service basis.
operations:
conversion:
input:
subdir: ""
extension: ORIGIN.pdf.gz
output:
subdir: "pages_as_images"
extension: json.gz
extraction:
input:
subdir: ""
extension: ORIGIN.pdf.gz
output:
subdir: "extracted_images"
extension: json.gz
table_parsing:
input:
subdir: "pages_as_images"
extension: json.gz
output:
subdir: "table_parses"
extension: json.gz
default:
input:
subdir: ""
extension: IN.gz
output:
subdir: ""
extension: out.gz
probing_webserver:
host: $PROBING_WEBSERVER_HOST|"0.0.0.0" # Probe webserver address

76
pyinfra/callback.py Normal file
View File

@ -0,0 +1,76 @@
import logging
from funcy import merge, omit, lmap
from pyinfra.exceptions import AnalysisFailure
logger = logging.getLogger(__name__)
class Callback:
"""This is the callback that is applied to items pulled from the storage. It forwards these items to an analysis
endpoint.
"""
def __init__(self, base_url, pipeline_factory):
self.base_url = base_url
self.pipeline_factory = pipeline_factory
self.endpoint2pipeline = {}
def __make_endpoint(self, operation):
return f"{self.base_url}/{operation}"
def __get_pipeline(self, endpoint):
if endpoint in self.endpoint2pipeline:
pipeline = self.endpoint2pipeline[endpoint]
else:
pipeline = self.pipeline_factory(endpoint)
self.endpoint2pipeline[endpoint] = pipeline
return pipeline
@staticmethod
def __run_pipeline(pipeline, body):
"""
TODO: Since data and metadata are passed as singletons, there is no buffering and hence no batching happening
within the pipeline. However, the queue acknowledgment logic needs to be changed in order to facilitate
passing non-singletons, to only ack a message, once a response is pulled from the output queue of the
pipeline. Probably the pipeline return value needs to contains the queue message frame (or so), in order for
the queue manager to tell which message to ack.
TODO: casting list (lmap) on `analysis_response_stream` is a temporary solution, while the client pipeline
operates on singletons ([data], [metadata]).
"""
def combine_storage_item_metadata_with_queue_message_metadata(body):
return merge(body["metadata"], omit(body, ["data", "metadata"]))
def remove_queue_message_metadata(result):
metadata = omit(result["metadata"], queue_message_keys(body))
return {**result, "metadata": metadata}
def queue_message_keys(body):
return {*body.keys()}.difference({"data", "metadata"})
try:
data = body["data"]
metadata = combine_storage_item_metadata_with_queue_message_metadata(body)
analysis_response_stream = pipeline([data], [metadata])
analysis_response_stream = lmap(remove_queue_message_metadata, analysis_response_stream)
return analysis_response_stream
except Exception as err:
logger.error(err)
raise AnalysisFailure from err
def __call__(self, body: dict):
operation = body.get("operation", "submit")
endpoint = self.__make_endpoint(operation)
pipeline = self.__get_pipeline(endpoint)
try:
logging.debug(f"Requesting analysis from {endpoint}...")
return self.__run_pipeline(pipeline, body)
except AnalysisFailure:
logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.")

View File

@ -0,0 +1,109 @@
import logging
from functools import lru_cache
from funcy import project, identity, rcompose
from pyinfra.callback import Callback
from pyinfra.config import parse_disjunction_string
from pyinfra.file_descriptor_manager import FileDescriptorManager
from pyinfra.queue.consumer import Consumer
from pyinfra.queue.queue_manager.pika_queue_manager import PikaQueueManager
from pyinfra.server.client_pipeline import ClientPipeline
from pyinfra.server.dispatcher.dispatchers.rest import RestDispatcher
from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStreamer
from pyinfra.server.packer.packers.rest import RestPacker
from pyinfra.server.receiver.receivers.rest import RestReceiver
from pyinfra.storage import storages
from pyinfra.visitor import QueueVisitor
from pyinfra.visitor.downloader import Downloader
from pyinfra.visitor.response_formatter.formatters.default import DefaultResponseFormatter
from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter
from pyinfra.visitor.strategies.response.aggregation import AggregationStorageStrategy, ProjectingUploadFormatter
logger = logging.getLogger(__name__)
class ComponentFactory:
def __init__(self, config):
self.config = config
@lru_cache(maxsize=None)
def get_consumer(self, callback=None):
callback = callback or self.get_callback()
return Consumer(self.get_visitor(callback), self.get_queue_manager())
@lru_cache(maxsize=None)
def get_callback(self, analysis_base_url=None):
analysis_base_url = analysis_base_url or self.config.rabbitmq.callback.analysis_endpoint
callback = Callback(analysis_base_url, self.get_pipeline)
def wrapped(body):
body_repr = project(body, ["dossierId", "fileId", "operation"])
logger.info(f"Processing {body_repr}...")
result = callback(body)
logger.info(f"Completed processing {body_repr}...")
return result
return wrapped
@lru_cache(maxsize=None)
def get_visitor(self, callback):
return QueueVisitor(
callback=callback,
data_loader=self.get_downloader(),
response_strategy=self.get_response_strategy(),
response_formatter=self.get_response_formatter(),
)
@lru_cache(maxsize=None)
def get_queue_manager(self):
return PikaQueueManager(self.config.rabbitmq.queues.input, self.config.rabbitmq.queues.output)
@lru_cache(maxsize=None)
def get_storage(self):
return storages.get_storage(self.config.storage.backend)
@lru_cache(maxsize=None)
def get_response_strategy(self, storage=None):
return AggregationStorageStrategy(
storage=storage or self.get_storage(),
file_descriptor_manager=self.get_file_descriptor_manager(),
upload_formatter=self.get_upload_formatter(),
)
@lru_cache(maxsize=None)
def get_file_descriptor_manager(self):
return FileDescriptorManager(
bucket_name=parse_disjunction_string(self.config.storage.bucket),
operation2file_patterns=self.get_operation2file_patterns(),
)
@lru_cache(maxsize=None)
def get_upload_formatter(self):
return {"identity": identity, "projecting": ProjectingUploadFormatter()}[self.config.service.upload_formatter]
@lru_cache(maxsize=None)
def get_response_formatter(self):
return {"default": DefaultResponseFormatter(), "identity": IdentityResponseFormatter()}[
self.config.service.response_formatter
]
@lru_cache(maxsize=None)
def get_operation2file_patterns(self):
return self.config.service.operations
@lru_cache(maxsize=None)
def get_downloader(self, storage=None):
return Downloader(
storage=storage or self.get_storage(),
bucket_name=parse_disjunction_string(self.config.storage.bucket),
file_descriptor_manager=self.get_file_descriptor_manager(),
)
@staticmethod
@lru_cache(maxsize=None)
def get_pipeline(endpoint):
return ClientPipeline(
RestPacker(), RestDispatcher(endpoint), RestReceiver(), rcompose(RestPickupStreamer(), RestReceiver())
)

View File

@ -1,10 +1,13 @@
"""Implements a config object with dot-indexing syntax."""
import os
from functools import partial
from itertools import chain
from operator import truth
from typing import Iterable
from envyaml import EnvYAML
from funcy import first, juxt, butlast, last
from frozendict import frozendict
from funcy import first, juxt, butlast, last, lmap
from pyinfra.locations import CONFIG_FILE
@ -45,6 +48,25 @@ class Config:
def __setitem__(self, key, value):
self.__config.key = value
def to_dict(self, frozen=True):
return to_dict(self.__config.export(), frozen=frozen)
def __hash__(self):
return hash(self.to_dict())
def to_dict(v, frozen=True):
def make_dict(*args, **kwargs):
return (frozendict if frozen else dict)(*args, **kwargs)
if isinstance(v, list):
return tuple(map(partial(to_dict, frozen=frozen), v))
elif isinstance(v, DotIndexable):
return make_dict({k: to_dict(v, frozen=frozen) for k, v in v.x.items()})
elif isinstance(v, dict):
return make_dict({k: to_dict(v, frozen=frozen) for k, v in v.items()})
else:
return v
CONFIG = Config(CONFIG_FILE)

View File

@ -1,122 +1,8 @@
import logging
from functools import lru_cache
from funcy import rcompose, omit, merge, lmap
from pyinfra.config import CONFIG
from pyinfra.exceptions import AnalysisFailure
from pyinfra.queue.consumer import Consumer
from pyinfra.queue.queue_manager.pika_queue_manager import PikaQueueManager
from pyinfra.server.client_pipeline import ClientPipeline
from pyinfra.server.dispatcher.dispatchers.rest import RestDispatcher
from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStreamer
from pyinfra.server.packer.packers.rest import RestPacker
from pyinfra.server.receiver.receivers.rest import RestReceiver
from pyinfra.storage import storages
from pyinfra.visitor import QueueVisitor, AggregationStorageStrategy
logger = logging.getLogger(__name__)
logger.setLevel(logging.ERROR)
from pyinfra.component_factory import ComponentFactory
@lru_cache(maxsize=None)
def get_consumer(callback=None):
callback = callback or get_callback()
return Consumer(get_visitor(callback), get_queue_manager())
@lru_cache(maxsize=None)
def get_visitor(callback):
return QueueVisitor(storage=get_storage(), callback=callback, response_strategy=get_response_strategy())
@lru_cache(maxsize=None)
def get_queue_manager():
return PikaQueueManager(CONFIG.rabbitmq.queues.input, CONFIG.rabbitmq.queues.output)
@lru_cache(maxsize=None)
def get_storage():
return storages.get_storage(CONFIG.storage.backend)
@lru_cache(maxsize=None)
def get_callback(analysis_base_url=None):
analysis_base_url = analysis_base_url or CONFIG.rabbitmq.callback.analysis_endpoint
return Callback(analysis_base_url)
@lru_cache(maxsize=None)
def get_response_strategy(storage=None):
return AggregationStorageStrategy(storage or get_storage())
class Callback:
def __init__(self, base_url):
self.base_url = base_url
self.endpoint2pipeline = {}
def __make_endpoint(self, operation):
return f"{self.base_url}/{operation}"
def __get_pipeline(self, endpoint):
if endpoint in self.endpoint2pipeline:
pipeline = self.endpoint2pipeline[endpoint]
else:
pipeline = get_pipeline(endpoint)
self.endpoint2pipeline[endpoint] = pipeline
return pipeline
@staticmethod
def __run_pipeline(pipeline, body):
"""
TODO: since data and metadata are passed as singletons, there is no buffering and hence no batching happening
within the pipeline. However, the queue acknowledgment logic needs to be changed in order to facilitate
passing non-singletons, to only ack a message, once a response is pulled from the output queue of the
pipeline. Probably the pipeline return value needs to contains the queue message frame (or so), in order for
the queue manager to tell which message to ack.
TODO: casting list (lmap) on `analysis_response_stream` is a temporary solution, while the client pipeline
operates on singletons ([data], [metadata]).
"""
def combine_storage_item_metadata_with_queue_message_metadata(body):
return merge(body["metadata"], omit(body, ["data", "metadata"]))
def remove_queue_message_metadata(result):
metadata = omit(result["metadata"], queue_message_keys(body))
return {**result, "metadata": metadata}
def queue_message_keys(body):
return {*body.keys()}.difference({"data", "metadata"})
try:
data = body["data"]
metadata = combine_storage_item_metadata_with_queue_message_metadata(body)
analysis_response_stream = pipeline([data], [metadata])
analysis_response_stream = lmap(remove_queue_message_metadata, analysis_response_stream)
return analysis_response_stream
except Exception as err:
logger.error(err)
raise AnalysisFailure from err
def __call__(self, body: dict):
operation = body.get("operation", "submit")
endpoint = self.__make_endpoint(operation)
pipeline = self.__get_pipeline(endpoint)
try:
logging.debug(f"Requesting analysis from {endpoint}...")
return self.__run_pipeline(pipeline, body)
except AnalysisFailure:
logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.")
@lru_cache(maxsize=None)
def get_pipeline(endpoint):
return ClientPipeline(
RestPacker(), RestDispatcher(endpoint), RestReceiver(), rcompose(RestPickupStreamer(), RestReceiver())
)
def get_component_factory(config):
return ComponentFactory(config)

View File

@ -44,3 +44,7 @@ class NoBufferCapacity(ValueError):
class InvalidMessage(ValueError):
pass
class InvalidStorageItemFormat(ValueError):
pass

View File

@ -0,0 +1,89 @@
import os
from _operator import itemgetter
from funcy import project
class FileDescriptorManager:
def __init__(self, bucket_name, operation2file_patterns: dict = None):
self.bucket_name = bucket_name
self.operation2file_patterns = operation2file_patterns or self.get_default_operation2file_patterns()
@staticmethod
def get_default_operation2file_patterns():
return {"default": {"input": {"subdir": "", "extension": ".in"}, "output": {"subdir": "", "extension": ".out"}}}
def get_input_object_name(self, queue_item_body: dict):
return self.get_object_name(queue_item_body, end="input")
def get_output_object_name(self, queue_item_body: dict):
return self.get_object_name(queue_item_body, end="output")
def get_object_name(self, queue_item_body: dict, end):
file_descriptor = self.build_file_descriptor(queue_item_body, end=end)
file_descriptor["pages"] = [queue_item_body.get("id", 0)]
object_name = self.__build_matcher(file_descriptor)
return object_name
@staticmethod
def __build_matcher(file_descriptor):
def make_filename(file_id, subdir, suffix):
return os.path.join(file_id, subdir, suffix) if subdir else f"{file_id}.{suffix}"
dossier_id, file_id, subdir, pages, extension = itemgetter(
"dossierId", "fileId", "subdir", "pages", "extension"
)(file_descriptor)
if len(pages) > 1 and subdir:
page_re = "id:(" + "|".join(map(str, pages)) + ")."
elif len(pages) == 1 and subdir:
page_re = f"id:{pages[0]}."
else:
page_re = ""
matcher = os.path.join(dossier_id, make_filename(file_id, subdir, page_re + extension))
return matcher
def build_file_descriptor(self, queue_item_body, end="input"):
operation = queue_item_body.get("operation", "default")
file_pattern = self.operation2file_patterns[operation][end]
file_descriptor = {
**project(queue_item_body, ["dossierId", "fileId", "pages"]),
"pages": queue_item_body.get("pages", []),
"extension": file_pattern["extension"],
"subdir": file_pattern["subdir"],
}
return file_descriptor
def build_input_matcher(self, queue_item_body):
return self.build_matcher(queue_item_body, end="input")
def build_output_matcher(self, queue_item_body):
return self.build_matcher(queue_item_body, end="output")
def build_matcher(self, queue_item_body, end):
file_descriptor = self.build_file_descriptor(queue_item_body, end=end)
return self.__build_matcher(file_descriptor)
def get_input_object_descriptor(self, queue_item_body):
return self.get_object_descriptor(queue_item_body, end="input")
def get_output_object_descriptor(self, storage_upload_info):
return self.get_object_descriptor(storage_upload_info, end="output")
def get_object_descriptor(self, queue_item_body, end):
return {
"bucket_name": self.bucket_name,
"object_name": self.get_object_name(queue_item_body, end=end),
}
@staticmethod
def build_storage_upload_info(analysis_payload, request_metadata):
storage_upload_info = {**request_metadata, "id": analysis_payload["metadata"].get("id", 0)}
return storage_upload_info

View File

@ -29,9 +29,9 @@ class EitherParserWrapper:
def __log(self, result):
if isinstance(result, Right):
logger.debug(f"{self.parser.__class__.__name__} succeeded or forwarded on {result.bind()}")
logger.log(logging.DEBUG - 5, f"{self.parser.__class__.__name__} succeeded or forwarded on {result.bind()}")
else:
logger.debug(f"{self.parser.__class__.__name__} failed on {result.bind()}")
logger.log(logging.DEBUG - 5, f"{self.parser.__class__.__name__} failed on {result.bind()}")
return result
def parse(self, item: Either):

View File

@ -1,50 +1,56 @@
from functools import singledispatch
from typing import Dict
from typing import Dict, Callable, Union
from flask import Flask, jsonify, request
from funcy import merge
from prometheus_client import generate_latest, Summary, CollectorRegistry
from pyinfra.config import CONFIG
from pyinfra.server.buffering.stream import FlatStreamBuffer
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
from pyinfra.server.stream.rest import LazyRestProcessor
def set_up_processing_server(server_stream_function, buffer_size):
flat_stream_buffer = FlatStreamBuffer(server_stream_function, buffer_size=buffer_size)
queued_stream_function = QueuedStreamFunction(flat_stream_buffer)
return __set_up_processing_server(queued_stream_function)
def build_endpoint_suffixes(op: str):
return {"submit_suffix": submit_suffix(op), "pickup_suffix": pickup_suffix(op)}
def submit_suffix(op: str):
return "submit" if not op else op
def pickup_suffix(op: str):
return "pickup" if not op else f"{op}_pickup"
@singledispatch
def __set_up_processing_server(arg):
def set_up_processing_server(arg: Union[dict, Callable], buffer_size=1):
"""Produces a processing server given a streamable function or a mapping from operations to streamable functions.
Streamable functions are constructed by calling pyinfra.server.utils.make_streamable_and_wrap_in_packing_logic on a
function taking a tuple of data and metadata and also returning a tuple or yielding tuples of data and metadata.
If the function doesn't produce data, data should be an empty byte string.
If the function doesn't produce metadata, metadata should be an empty dictionary.
Args:
arg: streamable function or mapping of operations: str to streamable functions
buffer_size: If your function operates on batches this parameter controls how many items are aggregated before
your function is applied.
TODO: buffer_size has to be controllable on per function basis.
Returns:
Processing server: flask app
"""
pass
@__set_up_processing_server.register
def _(operation2function: dict):
return __set_up_processing_server_impl(operation2function)
@set_up_processing_server.register
def _(operation2stream_fn: dict, buffer_size=1):
return __stream_fn_to_processing_server(operation2stream_fn, buffer_size)
@__set_up_processing_server.register
def _(queued_stream_function: object):
operation2function = {None: queued_stream_function}
return __set_up_processing_server_impl(operation2function)
@set_up_processing_server.register
def _(stream_fn: object, buffer_size=1):
operation2stream_fn = {None: stream_fn}
return __stream_fn_to_processing_server(operation2stream_fn, buffer_size)
def __set_up_processing_server_impl(operation2function: Dict[str, QueuedStreamFunction]):
def __stream_fn_to_processing_server(operation2stream_fn: dict, buffer_size):
operation2stream_fn = {
op: QueuedStreamFunction(FlatStreamBuffer(fn, buffer_size)) for op, fn in operation2stream_fn.items()
}
return __set_up_processing_server(operation2stream_fn)
def __set_up_processing_server(operation2function: Dict[str, QueuedStreamFunction]):
app = Flask(__name__)
registry = CollectorRegistry(auto_describe=True)
@ -53,8 +59,9 @@ def __set_up_processing_server_impl(operation2function: Dict[str, QueuedStreamFu
}
def make_summary_instance(op: str):
op = op.replace("_pickup", "")
return Summary(f"redactmanager_{op}_seconds", f"Time spent on {op}.", registry=registry)
op = op.replace("_pickup", "") if op != "pickup" else "default"
service_name = CONFIG.service.name
return Summary(f"redactmanager_{service_name}_{op}_seconds", f"Time spent on {op}.", registry=registry)
submit_operation2processor = {submit_suffix(op): prc for op, prc in operation2processor.items()}
pickup_operation2processor = {pickup_suffix(op): prc for op, prc in operation2processor.items()}
@ -88,3 +95,15 @@ def __set_up_processing_server_impl(operation2function: Dict[str, QueuedStreamFu
return operation2processor[operation].pop()
return app
def build_endpoint_suffixes(op: str):
return {"submit_suffix": submit_suffix(op), "pickup_suffix": pickup_suffix(op)}
def submit_suffix(op: str):
return "submit" if not op else op
def pickup_suffix(op: str):
return "pickup" if not op else f"{op}_pickup"

View File

@ -1,5 +1,4 @@
import logging
from itertools import repeat
from operator import attrgetter
from azure.storage.blob import ContainerClient, BlobServiceClient
@ -20,15 +19,15 @@ class AzureStorageAdapter(StorageAdapter):
container_client = self.__client.get_container_client(bucket_name)
return container_client.exists()
def make_bucket(self, bucket_name):
container_client = self.__client.get_container_client(bucket_name)
container_client if container_client.exists() else self.__client.create_container(bucket_name)
def __provide_container_client(self, bucket_name) -> ContainerClient:
self.make_bucket(bucket_name)
container_client = self.__client.get_container_client(bucket_name)
return container_client
def make_bucket(self, bucket_name):
container_client = self.__client.get_container_client(bucket_name)
container_client if container_client.exists() else self.__client.create_container(bucket_name)
def put_object(self, bucket_name, object_name, data):
logger.debug(f"Uploading '{object_name}'...")
container_client = self.__provide_container_client(bucket_name)

View File

@ -1,8 +1,14 @@
import gzip
import json
from pyinfra.server.packing import bytes_to_string
def pack_analysis_payload(analysis_payload):
return compress(json.dumps(analysis_payload).encode())
def pack_for_upload(data: bytes):
return gzip.compress(json.dumps(bytes_to_string(data)).encode())
def compress(data: bytes):
return gzip.compress(data)
def decompress(data: bytes):
return gzip.decompress(data)

View File

@ -1,6 +1,6 @@
from itertools import starmap, tee
from funcy import curry, compose
from funcy import curry, compose, filter
def lift(fn):
@ -36,6 +36,10 @@ def foreach(fn, iterable):
fn(itm)
def flift(pred):
return curry(filter)(pred)
def parallel_map(f1, f2):
"""Applies functions to a stream in parallel and yields a stream of tuples:
parallel_map :: a -> b, a -> c -> [a] -> [(b, c)]

View File

@ -1,361 +0,0 @@
import abc
import gzip
import json
import logging
from collections import deque
from copy import deepcopy
from operator import itemgetter
from typing import Callable, Dict, Union
from funcy import omit, filter, lflatten
from more_itertools import peekable
from pyinfra.config import CONFIG, parse_disjunction_string
from pyinfra.exceptions import DataLoadingFailure, InvalidMessage
from pyinfra.parser.parser_composer import EitherParserComposer
from pyinfra.parser.parsers.identity import IdentityBlobParser
from pyinfra.parser.parsers.json import JsonBlobParser
from pyinfra.parser.parsers.string import StringBlobParser
from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing
from pyinfra.server.packing import string_to_bytes
from pyinfra.storage.storage import Storage
logger = logging.getLogger(__name__)
class ResponseStrategy(abc.ABC):
@abc.abstractmethod
def handle_response(self, analysis_response: dict):
pass
def __call__(self, analysis_response: dict):
return self.handle_response(analysis_response)
def get_response_object_descriptor(self, body):
return {
"bucket_name": parse_disjunction_string(CONFIG.storage.bucket),
"object_name": self.get_response_object_name(body),
}
@staticmethod
def get_response_object_name(body):
if "pages" not in body:
body["pages"] = []
if "id" not in body:
body["id"] = 0
dossier_id, file_id, pages, idnt = itemgetter("dossierId", "fileId", "pages", "id")(body)
object_name = f"{dossier_id}/{file_id}/id:{idnt}.{CONFIG.service.response_file_extension}"
return object_name
class StorageStrategy(ResponseStrategy):
def __init__(self, storage):
self.storage = storage
def handle_response(self, body: dict):
response_object_descriptor = self.get_response_object_descriptor(body)
self.storage.put_object(**response_object_descriptor, data=gzip.compress(json.dumps(body).encode()))
body.pop("data")
body["responseFile"] = response_object_descriptor["object_name"]
return body
class ForwardingStrategy(ResponseStrategy):
def handle_response(self, analysis_response):
return analysis_response
class DispatchCallback(abc.ABC):
@abc.abstractmethod
def __call__(self, payload):
pass
class IdentifierDispatchCallback(DispatchCallback):
def __init__(self):
self.identifier = None
def has_new_identifier(self, metadata):
identifier = ":".join(itemgetter("fileId", "dossierId")(metadata))
if not self.identifier:
self.identifier = identifier
return identifier != self.identifier
def __call__(self, metadata):
return self.has_new_identifier(metadata)
class AggregationStorageStrategy(ResponseStrategy):
def __init__(self, storage, merger: Callable = None, dispatch_callback: DispatchCallback = None):
self.storage = storage
self.merger = merger or list
self.dispatch_callback = dispatch_callback or IdentifierDispatchCallback()
self.buffer = deque()
def put_object(self, data: bytes, storage_upload_info):
object_descriptor = self.get_response_object_descriptor(storage_upload_info)
self.storage.put_object(**object_descriptor, data=gzip.compress(data))
return {**storage_upload_info, "responseFile": object_descriptor["object_name"]}
def merge_queue_items(self):
merged_buffer_content = self.merger(self.buffer)
self.buffer.clear()
return merged_buffer_content
def upload_queue_items(self, storage_upload_info):
data = json.dumps(self.merge_queue_items()).encode()
return self.put_object(data, storage_upload_info)
def upload_or_aggregate(self, analysis_payload, request_metadata, last=False):
"""analysis_payload : {data: ..., metadata: ...}"""
storage_upload_info = build_storage_upload_info(analysis_payload, request_metadata)
analysis_payload["metadata"].pop("id")
if analysis_payload["data"]:
return self.put_object(json.dumps(analysis_payload).encode(), storage_upload_info)
else:
self.buffer.append(analysis_payload)
if last or self.dispatch_callback(storage_upload_info):
return self.upload_queue_items(storage_upload_info)
else:
return Nothing
def handle_response(self, analysis_response, final=False):
def upload_or_aggregate(analysis_payload):
return self.upload_or_aggregate(analysis_payload, request_metadata, last=not result_data.peek(False))
request_metadata = omit(analysis_response, ["data"])
result_data = peekable(analysis_response["data"])
yield from filter(is_not_nothing, map(upload_or_aggregate, result_data))
def build_storage_upload_info(analysis_payload, request_metadata):
storage_upload_info = {**request_metadata, "id": analysis_payload["metadata"].get("id", 0)}
storage_upload_info["fileId"] = build_file_path(
storage_upload_info, storage_upload_info.get("operation", CONFIG.service.response_folder)
)
return storage_upload_info
def build_file_path(storage_upload_info, folder):
return f"{storage_upload_info['fileId']}" + (f"/{folder}" if folder else "")
class InvalidStorageItemFormat(ValueError):
pass
class ParsingStrategy(abc.ABC):
@abc.abstractmethod
def parse(self, data: bytes):
pass
@abc.abstractmethod
def parse_and_wrap(self, data: bytes):
pass
def __call__(self, data: bytes):
return self.parse_and_wrap(data)
# TODO: Each analysis service should specify a custom parsing strategy for the type of data it expects to be found
# on the storage. This class is only a temporary trial-and-error->fallback type of solution.
class DynamicParsingStrategy(ParsingStrategy):
def __init__(self):
self.parser = EitherParserComposer(JsonBlobParser(), StringBlobParser(), IdentityBlobParser())
def parse(self, data: bytes) -> Union[bytes, dict]:
return self.parser(data)
def parse_and_wrap(self, data):
return self.parse(data)
def validate(data):
if not ("data" in data and "metadata" in data):
raise InvalidStorageItemFormat(f"Expected a mapping with keys 'data' and 'metadata', got {data}.")
def wrap(data):
return {"data": data, "metadata": {}}
class QueueVisitor:
def __init__(
self,
storage: Storage,
callback: Callable,
response_strategy: ResponseStrategy,
parsing_strategy: ParsingStrategy = None,
download_strategy=None,
):
self.storage = storage
self.callback = callback
self.download_strategy = download_strategy or get_download_strategy()
self.response_strategy = response_strategy
self.parsing_strategy = parsing_strategy or DynamicParsingStrategy()
def load_data(self, queue_item_body):
data = self.download_strategy(self.storage, queue_item_body)
data = map(self.parsing_strategy, data)
data = map(standardize, data)
return data
def process_storage_item(self, data_metadata_pack):
return self.callback(data_metadata_pack)
def load_item_from_storage_and_process_with_callback(self, queue_item_body):
"""Bundles the result from processing a storage item with the body of the corresponding queue item."""
def process_storage_item(storage_item):
analysis_input = {**storage_item, **queue_item_body}
return self.process_storage_item(analysis_input)
storage_items = self.load_data(queue_item_body)
results = lflatten(map(process_storage_item, storage_items))
return {"data": results, **queue_item_body}
def __call__(self, queue_item_body):
analysis_result_body = self.load_item_from_storage_and_process_with_callback(queue_item_body)
return self.response_strategy(analysis_result_body)
def standardize(data) -> Dict:
"""Storage items can be a blob or a blob with metadata. Standardizes to the latter.
Cases:
1) backend upload: data as bytes
2) Some Python service's upload: data as bytes of a json string "{'data': <str>, 'metadata': <dict>}",
where value of key 'data' was encoded with bytes_to_string(...)
Returns:
{"data": bytes, "metadata": dict}
"""
def is_blob_without_metadata(data):
return isinstance(data, bytes)
def is_blob_with_metadata(data: Dict):
return isinstance(data, dict)
if is_blob_without_metadata(data):
return wrap(data)
elif is_blob_with_metadata(data):
validate(data)
return data
else: # Fallback / used for testing with simple data
logger.warning("Encountered storage data in unexpected format.")
assert isinstance(data, str)
return wrap(string_to_bytes(data))
def get_download_strategy(download_strategy_type=None):
download_strategies = {
"single": SingleDownloadStrategy(),
"multi": MultiDownloadStrategy(),
}
return download_strategies.get(download_strategy_type or CONFIG.service.download_strategy, SingleDownloadStrategy())
class DownloadStrategy(abc.ABC):
def _load_data(self, storage, queue_item_body):
object_descriptor = self.get_object_descriptor(queue_item_body)
logging.debug(f"Downloading {object_descriptor}...")
data = self.__download(storage, object_descriptor)
logging.debug(f"Downloaded {object_descriptor}.")
assert isinstance(data, bytes)
data = gzip.decompress(data)
return [data]
@staticmethod
def __download(storage, object_descriptor):
try:
data = storage.get_object(**object_descriptor)
except Exception as err:
logging.warning(f"Loading data from storage failed for {object_descriptor}.")
raise DataLoadingFailure from err
return data
@staticmethod
@abc.abstractmethod
def get_object_name(body: dict):
raise NotImplementedError
def get_object_descriptor(self, body):
return {"bucket_name": parse_disjunction_string(CONFIG.storage.bucket), "object_name": self.get_object_name(body)}
class SingleDownloadStrategy(DownloadStrategy):
def download(self, storage, queue_item_body):
return self._load_data(storage, queue_item_body)
@staticmethod
def get_object_name(body: dict):
# TODO: deepcopy still necessary?
body = deepcopy(body)
dossier_id, file_id = itemgetter("dossierId", "fileId")(body)
object_name = f"{dossier_id}/{file_id}.{CONFIG.service.target_file_extension}"
return object_name
def __call__(self, storage, queue_item_body):
return self.download(storage, queue_item_body)
class MultiDownloadStrategy(DownloadStrategy):
def __init__(self):
# TODO: pass in bucket name from outside / introduce closure-like abstraction for the bucket
self.bucket_name = parse_disjunction_string(CONFIG.storage.bucket)
def download(self, storage: Storage, queue_item_body):
pages = "|".join(map(str, queue_item_body["pages"]))
matches_page = r".*id:(" + pages + r").*"
object_names = storage.get_all_object_names(self.bucket_name)
object_names = filter(matches_page, object_names)
objects = (storage.get_object(self.bucket_name, objn) for objn in object_names)
objects = map(gzip.decompress, objects)
return objects
@staticmethod
def get_object_name(body: dict):
def get_key(key):
return key if key in body else False
# TODO: deepcopy still necessary?
body = deepcopy(body)
folder = f"/{get_key('pages') or get_key('images')}/"
if not folder:
raise InvalidMessage("Expected a folder like 'images' oder 'pages' to be specified in message.")
idnt = f"id:{body.get('id', 0)}"
dossier_id, file_id = itemgetter("dossierId", "fileId")(body)
object_name = f"{dossier_id}/{file_id}{folder}{idnt}.{CONFIG.service.target_file_extension}"
return object_name
def __call__(self, storage, queue_item_body):
return self.download(storage, queue_item_body)

View File

@ -0,0 +1 @@
from .visitor import QueueVisitor

View File

@ -0,0 +1,35 @@
from functools import partial
from funcy import compose
from pyinfra.file_descriptor_manager import FileDescriptorManager
from pyinfra.storage.storage import Storage
from pyinfra.utils.encoding import decompress
from pyinfra.utils.func import flift
class Downloader:
def __init__(self, storage: Storage, bucket_name, file_descriptor_manager: FileDescriptorManager):
self.storage = storage
self.bucket_name = bucket_name
self.file_descriptor_manager = file_descriptor_manager
def __call__(self, queue_item_body):
return self.download(queue_item_body)
def get_names_of_objects_by_pattern(self, file_pattern: str):
matches_pattern = flift(file_pattern)
page_object_names = compose(matches_pattern, self.storage.get_all_object_names)(self.bucket_name)
return page_object_names
def download_and_decompress_object(self, object_names):
download = partial(self.storage.get_object, self.bucket_name)
return map(compose(decompress, download), object_names)
def download(self, queue_item_body):
file_pattern = self.file_descriptor_manager.build_input_matcher(queue_item_body)
page_object_names = self.get_names_of_objects_by_pattern(file_pattern)
objects = self.download_and_decompress_object(page_object_names)
return objects

View File

@ -0,0 +1,14 @@
import abc
from funcy import identity
from pyinfra.utils.func import lift
class ResponseFormatter(abc.ABC):
def __call__(self, message):
return (identity if isinstance(message, dict) else lift)(self.format)(message)
@abc.abstractmethod
def format(self, message):
pass

View File

@ -0,0 +1,13 @@
from funcy import first, omit
from pyinfra.visitor.response_formatter.formatter import ResponseFormatter
class DefaultResponseFormatter(ResponseFormatter):
"""
TODO: Extend via using enums throughout the codebase instead of strings.
See the enum-formatter in image-prediction service for reference.
"""
def format(self, message):
return {**omit(message, ["response_files"]), "responseFile": first(message["response_files"])}

View File

@ -0,0 +1,6 @@
from pyinfra.visitor.response_formatter.formatter import ResponseFormatter
class IdentityResponseFormatter(ResponseFormatter):
def format(self, message):
return message

View File

View File

@ -0,0 +1,14 @@
import abc
class BlobParsingStrategy(abc.ABC):
@abc.abstractmethod
def parse(self, data: bytes):
pass
@abc.abstractmethod
def parse_and_wrap(self, data: bytes):
pass
def __call__(self, data: bytes):
return self.parse_and_wrap(data)

View File

@ -0,0 +1,20 @@
from typing import Union
from pyinfra.parser.parser_composer import EitherParserComposer
from pyinfra.parser.parsers.identity import IdentityBlobParser
from pyinfra.parser.parsers.json import JsonBlobParser
from pyinfra.parser.parsers.string import StringBlobParser
from pyinfra.visitor.strategies.blob_parsing.blob_parsing import BlobParsingStrategy
# TODO: Each analysis service should specify a custom parsing strategy for the type of data it expects to be found
# on the storage. This class is only a temporary trial-and-error->fallback type of solution.
class DynamicParsingStrategy(BlobParsingStrategy):
def __init__(self):
self.parser = EitherParserComposer(JsonBlobParser(), StringBlobParser(), IdentityBlobParser())
def parse(self, data: bytes) -> Union[bytes, dict]:
return self.parser(data)
def parse_and_wrap(self, data):
return self.parse(data)

View File

@ -0,0 +1,92 @@
import abc
from collections import deque
from typing import Callable
from funcy import omit, filter, first, lpluck, identity
from more_itertools import peekable
from pyinfra.file_descriptor_manager import FileDescriptorManager
from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing
from pyinfra.utils.encoding import pack_analysis_payload
from pyinfra.visitor.strategies.response.response import ResponseStrategy
class UploadFormatter(abc.ABC):
@abc.abstractmethod
def format(self, items):
raise NotImplementedError
def __call__(self, items):
return self.format(items)
class ProjectingUploadFormatter(UploadFormatter):
def format(self, items):
head = first(items)
if head["data"]:
assert len(items) == 1
return head
else:
items = lpluck("metadata", items)
return items
class AggregationStorageStrategy(ResponseStrategy):
def __init__(
self,
storage,
file_descriptor_manager: FileDescriptorManager,
merger: Callable = list,
upload_formatter: UploadFormatter = identity,
):
self.storage = storage
self.file_descriptor_manager = file_descriptor_manager
self.merger = merger
self.upload_formatter = upload_formatter
self.buffer = deque()
self.response_files = deque()
def handle_response(self, analysis_response, final=False):
def upload_or_aggregate(analysis_payload):
request_metadata = omit(analysis_response, ["analysis_payloads"])
return self.upload_or_aggregate(analysis_payload, request_metadata, last=not analysis_payloads.peek(False))
analysis_payloads = peekable(analysis_response["analysis_payloads"])
yield from filter(is_not_nothing, map(upload_or_aggregate, analysis_payloads))
def upload_or_aggregate(self, analysis_payload, request_metadata, last=False):
"""analysis_payload : {data: ..., metadata: ...}"""
storage_upload_info = self.file_descriptor_manager.build_storage_upload_info(analysis_payload, request_metadata)
object_descriptor = self.file_descriptor_manager.get_output_object_descriptor(storage_upload_info)
self.add_analysis_payload_to_buffer(analysis_payload)
if analysis_payload["data"] or last:
self.upload_aggregated_items(object_descriptor)
self.response_files.append(object_descriptor["object_name"])
return self.build_response_message(storage_upload_info) if last else Nothing
def add_analysis_payload_to_buffer(self, analysis_payload):
self.buffer.append({**analysis_payload, "metadata": omit(analysis_payload["metadata"], ["id"])})
def upload_aggregated_items(self, object_descriptor):
items_to_upload = self.upload_formatter(self.merge_queue_items())
self.upload_item(items_to_upload, object_descriptor)
def build_response_message(self, storage_upload_info):
response_files = [*self.response_files]
self.response_files.clear()
return {**storage_upload_info, "response_files": response_files}
def upload_item(self, analysis_payload, object_descriptor):
self.storage.put_object(**object_descriptor, data=pack_analysis_payload(analysis_payload))
def merge_queue_items(self):
merged_buffer_content = self.merger(self.buffer)
self.buffer.clear()
return merged_buffer_content

View File

@ -0,0 +1,6 @@
from pyinfra.visitor.strategies.response.response import ResponseStrategy
class ForwardingStrategy(ResponseStrategy):
def handle_response(self, analysis_response):
return analysis_response

View File

@ -0,0 +1,10 @@
import abc
class ResponseStrategy(abc.ABC):
@abc.abstractmethod
def handle_response(self, analysis_response: dict):
pass
def __call__(self, analysis_response: dict):
return self.handle_response(analysis_response)

View File

@ -0,0 +1,33 @@
import json
from operator import itemgetter
from pyinfra.config import parse_disjunction_string, CONFIG
from pyinfra.utils.encoding import compress
from pyinfra.visitor.strategies.response.response import ResponseStrategy
class StorageStrategy(ResponseStrategy):
def __init__(self, storage, response_file_extension="out"):
self.storage = storage
self.response_file_extension = response_file_extension
def handle_response(self, body: dict):
response_object_descriptor = self.get_response_object_descriptor(body)
self.storage.put_object(**response_object_descriptor, data=compress(json.dumps(body).encode()))
body.pop("analysis_payloads")
body["response_files"] = [response_object_descriptor["object_name"]]
return body
def get_response_object_descriptor(self, body):
return {
"bucket_name": parse_disjunction_string(CONFIG.storage.bucket),
"object_name": self.get_response_object_name(body),
}
def get_response_object_name(self, body):
dossier_id, file_id = itemgetter("dossierId", "fileId")(body)
object_name = f"{dossier_id}/{file_id}/.{self.response_file_extension}"
return object_name

51
pyinfra/visitor/utils.py Normal file
View File

@ -0,0 +1,51 @@
import logging
from typing import Dict
from pyinfra.exceptions import InvalidStorageItemFormat
from pyinfra.server.packing import string_to_bytes
logger = logging.getLogger()
def build_file_path(storage_upload_info, folder):
return f"{storage_upload_info['fileId']}" + (f"/{folder}" if folder else "")
def standardize(storage_item) -> Dict:
"""Storage items can be a blob or a blob with metadata. Standardizes to the latter.
Cases:
1) backend upload: data as bytes
2) Some Python service's upload: data as bytes of a json string "{'data': <str>, 'metadata': <dict>}",
where value of key 'data' was encoded with bytes_to_string(...)
Returns:
{"data": bytes, "metadata": dict}
"""
def is_blob_without_metadata(storage_item):
return isinstance(storage_item, bytes)
def is_blob_with_metadata(storage_item: Dict):
return isinstance(storage_item, dict)
if is_blob_without_metadata(storage_item):
return wrap(storage_item)
elif is_blob_with_metadata(storage_item):
validate(storage_item)
return storage_item
else: # Fallback / used for testing with simple data
logger.warning("Encountered storage data in unexpected format.")
assert isinstance(storage_item, str)
return wrap(string_to_bytes(storage_item))
def wrap(data):
return {"data": data, "metadata": {}}
def validate(storage_item):
if not ("data" in storage_item and "metadata" in storage_item):
raise InvalidStorageItemFormat(f"Expected a mapping with keys 'data' and 'metadata', got {storage_item}.")

View File

@ -0,0 +1,84 @@
from typing import Callable
from funcy import lflatten, compose, itervalues, lfilter
from pyinfra.utils.func import lift
from pyinfra.visitor.response_formatter.formatter import ResponseFormatter
from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter
from pyinfra.visitor.strategies.blob_parsing.blob_parsing import BlobParsingStrategy
from pyinfra.visitor.strategies.blob_parsing.dynamic import DynamicParsingStrategy
from pyinfra.visitor.strategies.response.response import ResponseStrategy
from pyinfra.visitor.utils import standardize
class QueueVisitor:
def __init__(
self,
callback: Callable,
data_loader: Callable,
response_strategy: ResponseStrategy,
parsing_strategy: BlobParsingStrategy = None,
response_formatter: ResponseFormatter = None,
):
"""Processes queue messages with a given callback.
Args:
callback: callback to apply
data_loader: loads data specified in message and passes to callback
parsing_strategy: behaviour for interpreting loaded items
response_strategy: behaviour for response production
TODO: merge all dependencies into a single pipeline like: getter -> parser -> processor -> formatter -> putter
Returns:
depends on response strategy
"""
self.callback = callback
self.data_loader = data_loader
self.response_strategy = response_strategy
self.parsing_strategy = parsing_strategy or DynamicParsingStrategy()
self.response_formatter = response_formatter or IdentityResponseFormatter()
def __call__(self, queue_item_body):
analysis_response = compose(
self.response_formatter,
self.response_strategy,
self.load_items_from_storage_and_process_with_callback,
)(queue_item_body)
return analysis_response
def load_items_from_storage_and_process_with_callback(self, queue_item_body):
"""Bundles the result from processing a storage item with the body of the corresponding queue item."""
callback_results = compose(
self.remove_empty_results,
lflatten,
lift(self.get_item_processor(queue_item_body)),
self.load_data,
)(queue_item_body)
return {"analysis_payloads": callback_results, **queue_item_body}
def load_data(self, queue_item_body):
data = compose(
lift(standardize),
lift(self.parsing_strategy),
self.data_loader,
)(queue_item_body)
return data
def get_item_processor(self, queue_item_body):
def process_storage_item(storage_item):
analysis_input = {**storage_item, **queue_item_body}
return self.process_storage_item(analysis_input)
return process_storage_item
@staticmethod
def remove_empty_results(results):
return lfilter(compose(any, itervalues), results)
def process_storage_item(self, data_metadata_pack):
return self.callback(data_metadata_pack)

View File

@ -18,3 +18,4 @@ more-itertools==8.12.0
numpy==1.22.3
Pillow==9.0.1
prometheus-client==0.13.1
frozendict==2.3.2

View File

@ -1,5 +1,4 @@
import argparse
import gzip
import os
from pathlib import Path
@ -7,6 +6,7 @@ from tqdm import tqdm
from pyinfra.config import CONFIG, parse_disjunction_string
from pyinfra.storage.storages import get_s3_storage
from pyinfra.utils.encoding import compress
def parse_args():
@ -31,7 +31,7 @@ def combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, extension)
def upload_compressed_response(storage, bucket_name, dossier_id, file_id, result) -> None:
data = gzip.compress(result.encode())
data = compress(result.encode())
path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, CONFIG.service.response.extension)
storage.put_object(bucket_name, path_gz, data)
@ -44,7 +44,7 @@ def add_file_compressed(storage, bucket_name, dossier_id, path) -> None:
path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, Path(path).stem, suffix_gz)
with open(path, "rb") as f:
data = gzip.compress(f.read())
data = compress(f.read())
storage.put_object(bucket_name, path_gz, data)

View File

@ -13,7 +13,7 @@ def parse_args():
parser.add_argument(
"--analysis_container",
"-a",
choices=["detr", "ner", "image", "conversion", "extraction", "dl_error"],
choices=["detr", "ner", "image", "conversion", "extraction", "dl_error", "table_parsing"],
required=True,
)
args = parser.parse_args()
@ -48,11 +48,11 @@ def make_connection() -> pika.BlockingConnection:
return connection
def build_message_bodies(analyse_container_type, bucket_name):
def build_message_bodies(analysis_container, bucket_name):
def update_message(message_dict):
if analyse_container_type == "detr" or analyse_container_type == "image":
if analysis_container == "detr" or analysis_container == "image":
message_dict.update({"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "IMAGE_INFO.json.gz"})
if analyse_container_type == "conversion":
if analysis_container == "conversion":
message_dict.update(
{
"targetFileExtension": "ORIGIN.pdf.gz",
@ -61,13 +61,20 @@ def build_message_bodies(analyse_container_type, bucket_name):
"pages": [1, 2, 3],
}
)
if analyse_container_type == "extraction":
if analysis_container == "table_parsing":
message_dict.update(
{
"operation": "table_parsing",
"pages": [1, 2, 3],
}
)
if analysis_container == "extraction":
message_dict.update(
{"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "json.gz", "operation": "extraction"}
)
if analyse_container_type == "dl_error":
if analysis_container == "dl_error":
message_dict.update({"targetFileExtension": "no_such_file", "responseFileExtension": "IMAGE_INFO.json.gz"})
if analyse_container_type == "ner":
if analysis_container == "ner":
message_dict.update(
{"targetFileExtension": "TEXT.json.gz", "responseFileExtension": "NER_ENTITIES.json.gz"}
)

View File

@ -0,0 +1,26 @@
import argparse
import gzip
import json
from pyinfra.server.packing import bytes_to_string
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--compressed_json", "-j", required=True)
return parser.parse_args()
def main(fp):
with open(fp, "rb") as f:
compressed_json = f.read()
json_str = gzip.decompress(compressed_json)
json_dict = json.loads(json_str)
print(json.dumps(json_dict, indent=2))
if __name__ == "__main__":
fp = parse_args().compressed_json
main(fp)

View File

@ -4,7 +4,7 @@ from multiprocessing import Process
from pyinfra.utils.retry import retry
from pyinfra.config import CONFIG
from pyinfra.default_objects import get_consumer
from pyinfra.component_factory import ComponentFactory
from pyinfra.exceptions import ConsumerError
from pyinfra.flask import run_probing_webserver, set_up_probing_webserver
from pyinfra.utils.banner import show_banner
@ -15,7 +15,7 @@ logger = logging.getLogger()
@retry(ConsumerError)
def consume():
try:
consumer = get_consumer()
consumer = ComponentFactory(CONFIG).get_consumer()
consumer.basic_consume_and_publish()
except Exception as err:
logger.exception(err)

View File

@ -1,3 +1,28 @@
service:
response_formatter: identity
operations:
multi_inp_op:
input:
subdir: op_inp_files
extension: IN.gz
output:
subdir: op_outp_files
extension: OUT.gz
single_inp_op:
input:
subdir: ""
extension: IN.gz
output:
subdir: op_outp_files
extension: OUT.gz
default:
input:
subdir: ""
extension: IN.gz
output:
subdir: ""
extension: OUT.gz
storage:
minio:
endpoint: "http://127.0.0.1:9000"
@ -21,8 +46,7 @@ webserver:
port: $SERVER_PORT|5000 # webserver port
mode: $SERVER_MODE|production # webserver mode: {development, production}
mock_analysis_endpoint: "http://127.0.0.1:5000"
use_docker_fixture: True
logging: False
use_docker_fixture: 1
logging: 0

View File

@ -1,9 +1,15 @@
import json
from pyinfra.config import CONFIG as MAIN_CONFIG
from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter
from pyinfra.visitor.strategies.blob_parsing.dynamic import DynamicParsingStrategy
MAIN_CONFIG["retry"]["delay"] = 0.1
MAIN_CONFIG["retry"]["jitter"] = (0.1, 0.2)
from pyinfra.default_objects import get_component_factory
from test.config import CONFIG as TEST_CONFIG
import logging
import time
from unittest.mock import Mock
@ -21,8 +27,9 @@ from pyinfra.storage.adapters.s3 import S3StorageAdapter
from pyinfra.storage.clients.azure import get_azure_client
from pyinfra.storage.clients.s3 import get_s3_client
from pyinfra.storage.storage import Storage
from pyinfra.visitor import StorageStrategy, ForwardingStrategy, QueueVisitor
from test.config import CONFIG
from pyinfra.visitor import QueueVisitor
from pyinfra.visitor.strategies.response.forwarding import ForwardingStrategy
from pyinfra.visitor.strategies.response.storage import StorageStrategy
from test.queue.queue_manager_mock import QueueManagerMock
from test.storage.adapter_mock import StorageAdapterMock
from test.storage.client_mock import StorageClientMock
@ -48,7 +55,7 @@ logging.getLogger("waitress").setLevel(level=logging.CRITICAL + 1)
@pytest.fixture(autouse=True)
def mute_logger():
if not CONFIG.logging:
if not TEST_CONFIG.logging:
logger.setLevel(logging.CRITICAL + 1)
@ -106,7 +113,7 @@ def s3_backend(request):
@pytest.fixture(scope="session", autouse=True)
def docker_compose(sleep_seconds=30):
if CONFIG.use_docker_fixture:
if TEST_CONFIG.use_docker_fixture:
logger.info(f"Starting docker containers with {COMPOSE_PATH}/docker-compose.yml...")
compose = DockerCompose(COMPOSE_PATH, compose_file_name="docker-compose.yml")
compose.start()
@ -124,7 +131,7 @@ def get_pika_connection_params():
def get_s3_params(s3_backend):
params = CONFIG.storage[s3_backend]
params = TEST_CONFIG.storage[s3_backend]
return params
@ -133,7 +140,7 @@ def get_adapter(client_name, s3_backend):
if client_name == "mock":
return StorageAdapterMock(StorageClientMock())
if client_name == "azure":
return AzureStorageAdapter(get_azure_client(CONFIG.storage.azure.connection_string))
return AzureStorageAdapter(get_azure_client(TEST_CONFIG.storage.azure.connection_string))
if client_name == "s3":
return S3StorageAdapter(get_s3_client(get_s3_params(s3_backend)))
else:
@ -194,5 +201,22 @@ def response_strategy(response_strategy_name, storage):
@pytest.fixture()
def visitor(storage, analysis_callback, response_strategy):
return QueueVisitor(storage, analysis_callback, response_strategy)
def visitor(storage, analysis_callback, response_strategy, component_factory):
return QueueVisitor(
callback=analysis_callback,
data_loader=component_factory.get_downloader(storage),
response_strategy=response_strategy,
)
@pytest.fixture
def file_descriptor_manager(component_factory):
return component_factory.get_file_descriptor_manager()
@pytest.fixture
def component_factory():
MAIN_CONFIG["service"]["operations"] = TEST_CONFIG.service.operations
return get_component_factory(MAIN_CONFIG)

View File

@ -1,9 +1,9 @@
from _operator import itemgetter
from operator import itemgetter
from typing import Iterable
import pytest
from pyinfra.queue.consumer import Consumer
from test.utils.input import pair_data_with_queue_message
@pytest.fixture(scope="session")
@ -20,3 +20,18 @@ def access_callback():
def items():
numbers = [f"{i}".encode() for i in range(3)]
return pair_data_with_queue_message(numbers)
def pair_data_with_queue_message(data: Iterable[bytes]):
def inner():
for i, d in enumerate(data):
body = {
"dossierId": "folder",
"fileId": f"file{i}",
"targetFileExtension": "in.gz",
"responseFileExtension": "out.gz",
"pages": [0, 2, 3],
}
yield d, body
return list(inner())

View File

@ -11,7 +11,6 @@ from pyinfra.server.normalization import normalize_item
from pyinfra.server.packing import pack, unpack
from pyinfra.utils.func import star, lift, lstarlift
from test.utils.image import image_to_bytes
from test.utils.input import pair_data_with_queue_message
from test.utils.pdf import pdf_stream
@ -74,7 +73,7 @@ def strings_to_bytes(strings):
@pytest.fixture
def targets(data_message_pairs, input_data_items, operation, metadata, server_side_test):
def targets(data_message_pairs, input_data_items, operation, metadata, server_side_test, queue_message_metadata):
"""TODO: this has become super wonky"""
metadata = [{**m1, **m2} for m1, m2 in zip(lmap(second, data_message_pairs), metadata)]
@ -87,7 +86,7 @@ def targets(data_message_pairs, input_data_items, operation, metadata, server_si
response_data, response_metadata = zip(*map(unpack, flatten(starmap(op, zip(input_data_items, metadata)))))
queue_message_keys = ["id"] * (not server_side_test) + [
*second(first(pair_data_with_queue_message([b""]))).keys()
*first(queue_message_metadata).keys()
]
response_metadata = lmap(partial(omit, keys=queue_message_keys), response_metadata)
expected = lzip(response_data, response_metadata)
@ -99,8 +98,8 @@ def targets(data_message_pairs, input_data_items, operation, metadata, server_si
@pytest.fixture
def endpoint(url):
return f"{url}/submit"
def endpoint(url, operation_name):
return f"{url}/{operation_name}"
@pytest.fixture(params=["rest", "basic"])
@ -141,10 +140,26 @@ def images_to_bytes(images):
@pytest.fixture
def metadata(n_items):
def metadata(n_items, many_to_n):
"""storage metadata
TODO: rename
"""
return list(repeat({"key": "value"}, times=n_items))
@pytest.fixture
def queue_message_metadata(n_items, operation_name):
def metadata(i):
return {
"dossierId": "folder",
"fileId": f"file{i}",
"pages": [0, 2, 3],
"operation": operation_name,
}
return lmap(metadata, range(n_items))
@pytest.fixture
def packages(input_data_items, metadata):
return lstarlift(pack)(zip(input_data_items, metadata))

View File

@ -46,8 +46,13 @@ def url(host, port):
@pytest.fixture
def server(server_stream_function, buffer_size):
return set_up_processing_server(server_stream_function, buffer_size)
def server(server_stream_function, buffer_size, operation_name):
return set_up_processing_server({operation_name: server_stream_function}, buffer_size)
@pytest.fixture
def operation_name(many_to_n):
return "multi_inp_op" if many_to_n else "single_inp_op"
@pytest.fixture

View File

@ -1,23 +1,16 @@
import gzip
import json
import re
from itertools import starmap, repeat, chain
from operator import itemgetter
import pytest
from funcy import compose, lpluck, first, second
from funcy import compose, first, second, pluck, lflatten, lzip
from pyinfra.default_objects import (
get_callback,
get_response_strategy,
get_consumer,
get_queue_manager,
get_storage,
)
from pyinfra.queue.consumer import Consumer
from pyinfra.config import CONFIG
from pyinfra.default_objects import get_component_factory
from pyinfra.server.packing import unpack, pack
from pyinfra.visitor import QueueVisitor, get_download_strategy
from test.utils.input import pair_data_with_queue_message
from pyinfra.utils.encoding import compress, decompress
from test.config import CONFIG as TEST_CONFIG
@pytest.mark.parametrize(
@ -37,7 +30,7 @@ from test.utils.input import pair_data_with_queue_message
@pytest.mark.parametrize(
"analysis_task",
[
# False,
False,
True,
],
)
@ -85,15 +78,13 @@ from test.utils.input import pair_data_with_queue_message
@pytest.mark.parametrize(
"many_to_n",
[
False,
True,
# False,
],
)
def test_serving(
server_process, bucket_name, components, targets, data_message_pairs, n_items, many_to_n, download_strategy
):
def test_serving(server_process, bucket_name, components, targets, data_message_pairs, n_items, many_to_n):
storage, queue_manager, consumer = components
storage, queue_manager, consumer, file_descriptor_manager = components
assert queue_manager.input_queue.to_list() == []
assert queue_manager.output_queue.to_list() == []
@ -104,18 +95,18 @@ def test_serving(
if many_to_n:
upload_data_to_folder_in_storage_and_publish_single_request_to_queue(
storage, queue_manager, data_message_pairs, download_strategy
storage, queue_manager, data_message_pairs, file_descriptor_manager
)
else:
upload_data_to_storage_and_publish_requests_to_queue(
storage, queue_manager, data_message_pairs, download_strategy
storage, queue_manager, data_message_pairs, file_descriptor_manager
)
consumer.consume_and_publish(n=int(many_to_n) or n_items)
outputs = get_data_uploaded_by_consumer(queue_manager, storage, bucket_name)
# TODO: correctness of target should be validated as well, since production was become non-trivial
# TODO: correctness of target should be validated as well, since production has become non-trivial
assert sorted(outputs) == sorted(targets)
@ -125,26 +116,29 @@ def data_metadata_packs(input_data_items, metadata):
@pytest.fixture
def data_message_pairs(data_metadata_packs):
data_message_pairs = pair_data_with_queue_message(data_metadata_packs)
return data_message_pairs
def data_message_pairs(data_metadata_packs, queue_message_metadata):
return lzip(data_metadata_packs, queue_message_metadata)
# TODO: refactor; too many params
def upload_data_to_storage_and_publish_requests_to_queue(storage, queue_manager, data_message_pairs, download_strategy):
def upload_data_to_storage_and_publish_requests_to_queue(
storage, queue_manager, data_message_pairs, file_descriptor_manager
):
for data, message in data_message_pairs:
upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message, download_strategy)
upload_data_to_storage_and_publish_request_to_queue(
storage, queue_manager, data, message, file_descriptor_manager
)
# TODO: refactor; too many params
def upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message, download_strategy):
storage.put_object(**download_strategy.get_object_descriptor(message), data=gzip.compress(data))
def upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message, file_descriptor_manager):
storage.put_object(**file_descriptor_manager.get_input_object_descriptor(message), data=compress(data))
queue_manager.publish_request(message)
# TODO: refactor body; too long and scripty
def upload_data_to_folder_in_storage_and_publish_single_request_to_queue(
storage, queue_manager, data_message_pairs, download_strategy
storage, queue_manager, data_message_pairs, file_descriptor_manager
):
assert data_message_pairs
@ -152,10 +146,10 @@ def upload_data_to_folder_in_storage_and_publish_single_request_to_queue(
pages = ref_message["pages"]
for data, page in zip(map(first, data_message_pairs), pages):
object_descriptor = download_strategy.get_object_descriptor(ref_message)
object_descriptor = file_descriptor_manager.get_input_object_descriptor(ref_message)
object_descriptor["object_name"] = build_filepath(object_descriptor, page)
storage.put_object(**object_descriptor, data=gzip.compress(data))
storage.put_object(**object_descriptor, data=compress(data))
queue_manager.publish_request(ref_message)
@ -163,15 +157,14 @@ def upload_data_to_folder_in_storage_and_publish_single_request_to_queue(
def build_filepath(object_descriptor, page):
object_name = object_descriptor["object_name"]
parts = object_name.split("/")
parts.insert(-1, "pages")
path = "/".join(parts)
path = re.sub("id:\d", f"id:{page}", path)
path = re.sub(r"id:\d", f"id:{page}", path)
return path
def get_data_uploaded_by_consumer(queue_manager, storage, bucket_name):
names_of_uploaded_files = lpluck("responseFile", queue_manager.output_queue.to_list())
names_of_uploaded_files = lflatten(pluck("response_files", queue_manager.output_queue.to_list()))
uploaded_files = starmap(storage.get_object, zip(repeat(bucket_name), names_of_uploaded_files))
outputs = sorted(chain(*map(decode, uploaded_files)), key=itemgetter(0))
return outputs
@ -186,20 +179,20 @@ def components(components_type, real_components, test_components, bucket_name):
else:
raise ValueError(f"Unknown component type '{components_type}'.")
storage, queue_manager, consumer = components
storage, queue_manager, consumer, file_descriptor_manager = components
queue_manager.clear()
storage.make_bucket(bucket_name)
storage.clear_bucket(bucket_name)
yield storage, queue_manager, consumer
yield storage, queue_manager, consumer, file_descriptor_manager
queue_manager.clear()
storage.clear_bucket(bucket_name)
def decode(storage_item):
storage_item = json.loads(gzip.decompress(storage_item).decode())
storage_item = json.loads(decompress(storage_item).decode())
if not isinstance(storage_item, list):
storage_item = [storage_item]
@ -212,26 +205,36 @@ def components_type(request):
@pytest.fixture
def real_components(url, download_strategy):
callback = get_callback(url)
consumer = get_consumer(callback)
queue_manager = get_queue_manager()
storage = get_storage()
def real_components(url):
consumer.visitor.download_strategy = download_strategy
return storage, queue_manager, consumer
CONFIG["service"]["operations"] = TEST_CONFIG.service.operations
CONFIG["service"]["response_formatter"] = TEST_CONFIG.service.response_formatter
CONFIG["service"]["upload_formatter"] = "identity"
component_factory = get_component_factory(CONFIG)
@pytest.fixture
def download_strategy(many_to_n):
return get_download_strategy("multi" if many_to_n else "single")
callback = component_factory.get_callback(url)
consumer = component_factory.get_consumer(callback)
queue_manager = component_factory.get_queue_manager()
storage = component_factory.get_storage()
file_descriptor_manager = component_factory.get_file_descriptor_manager()
return storage, queue_manager, consumer, file_descriptor_manager
@pytest.fixture
def test_components(url, queue_manager, storage):
callback = get_callback(url)
visitor = QueueVisitor(storage, callback, get_response_strategy(storage))
consumer = Consumer(visitor, queue_manager)
return storage, queue_manager, consumer
pass
#
# component_factory = ComponentFactory(CONFIG)
#
# file_descriptor_manager = component_factory.get_file_descriptor_manager()
#
# visitor = QueueVisitor(
# storage=storage,
# callback=component_factory.get_callback(url),
# response_strategy=component_factory.get_response_strategy(storage),
# )
# consumer = Consumer(visitor, queue_manager)
#
# return storage, queue_manager, consumer, file_descriptor_manager

View File

@ -1,19 +1,20 @@
import logging
from operator import itemgetter
import pytest
from funcy import lmapcat
from funcy import pluck, lflatten
from pyinfra.config import CONFIG
from pyinfra.component_factory import ComponentFactory
from pyinfra.exceptions import ProcessingFailure
from pyinfra.utils.encoding import pack_for_upload
from pyinfra.visitor import ForwardingStrategy, SingleDownloadStrategy
from pyinfra.visitor.strategies.response.forwarding import ForwardingStrategy
from test.utils.storage import pack_for_upload
logger = logging.getLogger()
@pytest.mark.xfail(
reason="NOTE: Something is messed up in the test setups."
"These tests fail when run together with other tests. Do not know yet which ones and why."
" These tests fail when run together with other tests. Do not know yet which ones and why."
)
class TestConsumer:
@pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session")
@ -25,7 +26,7 @@ class TestConsumer:
@pytest.mark.parametrize(
"queue_manager_name",
[
"pika", # NOTE: pika must come first. Test fails IFF pika is in second place, for whatever reason.
"pika", # NOTE: pika must come first. Test fails IFF pika is in second position, for whatever reason.
"mock",
],
scope="session",
@ -63,8 +64,9 @@ class TestConsumer:
@pytest.mark.parametrize("client_name", ["mock", "s3", "azure"], scope="session")
@pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session")
def test_consuming_nonempty_input_queue_with_visitor_puts_messages_on_output_queue_in_fifo_order(
self, consumer, queue_manager, visitor, bucket_name, storage, items, input_data_encoder
self, consumer, queue_manager, visitor, bucket_name, storage, items, input_data_encoder,
):
file_descriptor_manager = ComponentFactory(CONFIG).get_file_descriptor_manager()
visitor.response_strategy = ForwardingStrategy()
@ -72,7 +74,7 @@ class TestConsumer:
storage.clear_bucket(bucket_name)
for data, message in items:
storage.put_object(**SingleDownloadStrategy().get_object_descriptor(message), data=pack_for_upload(data))
storage.put_object(**file_descriptor_manager.get_input_object_descriptor(message), data=pack_for_upload(data))
queue_manager.publish_request(message)
requests = consumer.consume(inactivity_timeout=5)
@ -81,7 +83,7 @@ class TestConsumer:
logger.debug(f"Processing item {itm}")
queue_manager.publish_response(req, visitor)
assert lmapcat(itemgetter("data"), queue_manager.output_queue.to_list()) == ["00", "11", "22"]
assert lflatten(pluck("analysis_payloads", queue_manager.output_queue.to_list())) == ["00", "11", "22"]
@pytest.mark.parametrize("queue_manager_name", ["pika"], scope="session")
def test_message_is_republished_when_callback_raises_processing_failure_exception(
@ -109,7 +111,7 @@ class TestConsumer:
logger.addFilter(lambda record: False)
# TODO:
# Note above says order of mock and pika matters. Note, that you can produce an error for debugging, when
# The note above says order of mock and pika matters. Note, that you can produce an error for debugging, when
# using `queue_manager.publish_response(next(requests), callback)` without the-with and while-block, where it
# becomes obvious, that the test with the note above then uses the data from THIS here test.
with pytest.raises(DebugError):

View File

@ -1,41 +1,41 @@
import gzip
import json
import pytest
from funcy import first
from pyinfra.utils.encoding import pack_for_upload
from pyinfra.visitor import SingleDownloadStrategy
from pyinfra.utils.encoding import decompress
from test.utils.storage import pack_for_upload
@pytest.fixture()
def body():
return {"dossierId": "folder", "fileId": "file", "targetFileExtension": "in.gz", "responseFileExtension": "out.gz"}
return {"dossierId": "folder", "fileId": "file"}
@pytest.mark.parametrize("client_name", ["mock", "azure", "s3"], scope="session")
class TestVisitor:
@pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session")
def test_given_a_input_queue_message_callback_pulls_the_data_from_storage(
self, visitor, body, storage, bucket_name
self, visitor, body, storage, bucket_name, file_descriptor_manager
):
storage.clear_bucket(bucket_name)
storage.put_object(**SingleDownloadStrategy().get_object_descriptor(body), data=pack_for_upload(b"content"))
storage.put_object(**file_descriptor_manager.get_input_object_descriptor(body), data=pack_for_upload(b"content"))
data_received = list(visitor.load_data(body))
assert [{"data": b"content", "metadata": {}}] == data_received
@pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session")
def test_visitor_pulls_and_processes_data(self, visitor, body, storage, bucket_name):
def test_visitor_pulls_and_processes_data(self, visitor, body, storage, bucket_name, file_descriptor_manager):
storage.clear_bucket(bucket_name)
storage.put_object(**SingleDownloadStrategy().get_object_descriptor(body), data=pack_for_upload(b"2"))
response_body = visitor.load_item_from_storage_and_process_with_callback(body)
assert response_body["data"] == ["22"]
storage.put_object(**file_descriptor_manager.get_input_object_descriptor(body), data=pack_for_upload(b"2"))
response_body = visitor.load_items_from_storage_and_process_with_callback(body)
assert response_body["analysis_payloads"] == ["22"]
@pytest.mark.parametrize("response_strategy_name", ["storage"], scope="session")
def test_visitor_puts_response_on_storage(self, visitor, body, storage, bucket_name):
def test_visitor_puts_response_on_storage(self, visitor, body, storage, bucket_name, file_descriptor_manager):
storage.clear_bucket(bucket_name)
storage.put_object(**SingleDownloadStrategy().get_object_descriptor(body), data=pack_for_upload(b"2"))
storage.put_object(**file_descriptor_manager.get_input_object_descriptor(body), data=pack_for_upload(b"2"))
response_body = visitor(body)
assert "data" not in response_body
assert json.loads(
gzip.decompress(storage.get_object(bucket_name=bucket_name, object_name=response_body["responseFile"]))
)["data"] == ["22"]
decompress(storage.get_object(bucket_name=bucket_name, object_name=first(response_body["response_files"])))
)["analysis_payloads"] == ["22"]

View File

@ -0,0 +1,54 @@
import json
import pytest
from pyinfra.visitor.strategies.response.aggregation import AggregationStorageStrategy
@pytest.mark.parametrize("client_name", ["mock"], scope="session")
class TestAggregationStorageStrategy:
def test_aggregation_strategy_with_no_empty_data_field_causes_two_uploads(self, storage):
strat = AggregationStorageStrategy(storage=storage)
analysis_response = {
"analysis_payloads": [
{"data": [1], "metadata": {"id": 1}},
{"data": [3], "metadata": {"id": 3}},
],
"dossierId": "dossier0",
"fileId": "file0",
"pages": [0, 2],
}
response_message_bodies = [*strat(analysis_response)]
assert response_message_bodies == [
{
"dossierId": "dossier0",
"fileId": "file0",
"pages": [0, 2],
"id": 3,
"response_files": ["dossier0/file0/id:1.json.gz", "dossier0/file0/id:3.json.gz"],
}
]
def test_aggregation_strategy_with_empty_data_field_causes_single_uploads(self, storage):
strat = AggregationStorageStrategy(storage=storage)
analysis_response = {
"analysis_payloads": [
{"data": None, "metadata": {"id": 1}},
{"data": [3], "metadata": {"id": 3}},
],
"dossierId": "dossier0",
"fileId": "file0",
"pages": [0, 2],
}
response_message_bodies = [*strat(analysis_response)]
assert response_message_bodies == [
{
"dossierId": "dossier0",
"fileId": "file0",
"pages": [0, 2],
"id": 3,
"response_files": ["dossier0/file0/id:3.json.gz"],
}
]

View File

@ -1,5 +1,5 @@
import pytest
from funcy import rcompose, compose, project, second, merge
from funcy import rcompose, compose, project, second, merge, lpluck
from pyinfra.server.buffering.stream import FlatStreamBuffer
from pyinfra.server.client_pipeline import ClientPipeline
@ -15,14 +15,13 @@ from pyinfra.server.receiver.receivers.rest import RestReceiver
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
from pyinfra.server.utils import make_streamable_and_wrap_in_packing_logic
from pyinfra.utils.func import lift, llift
from test.utils.input import pair_data_with_queue_message
def test_mock_pipeline():
data = [1, 2, 3]
f, g, h, u = map(lift, [lambda x: x**2, lambda x: x + 2, lambda x: x / 2, lambda x: x])
f, g, h, u = map(lift, [lambda x: x ** 2, lambda x: x + 2, lambda x: x / 2, lambda x: x])
pipeline = ClientPipeline(f, g, h, u)
@ -31,23 +30,21 @@ def test_mock_pipeline():
@pytest.mark.parametrize("client_pipeline_type", ["basic", "rest"])
@pytest.mark.parametrize("server_side_test", [True])
def test_pipeline(core_operation, client_pipeline, input_data_items, metadata, targets, n_items):
def test_pipeline(
core_operation, client_pipeline, input_data_items, metadata, queue_message_metadata, targets, n_items
):
assert core_operation is not Nothing
metadata = add_queue_metadata_to_server_side_metadata(metadata)
metadata = [
merge(storage_mdt, project(queue_mdt, ["operation", "pages"]))
for storage_mdt, queue_mdt in zip(metadata, queue_message_metadata)
]
output = compose(llift(unpack), client_pipeline)(input_data_items, metadata)
assert n_items == 0 or len(output) > 0
assert output == targets
def add_queue_metadata_to_server_side_metadata(metadata):
return [
merge(project(second(mdt), [*mdt_o.keys(), "pages"]), mdt_o)
for mdt, mdt_o in zip(pair_data_with_queue_message(metadata), metadata)
]
@pytest.mark.parametrize("item_type", ["string"])
@pytest.mark.parametrize("n_items", [1])
def test_pipeline_is_lazy(input_data_items, metadata, basic_client_pipeline, buffer_size):

View File

@ -1,16 +0,0 @@
from typing import Iterable
def pair_data_with_queue_message(data: Iterable[bytes]):
def inner():
for i, d in enumerate(data):
body = {
"dossierId": "folder",
"fileId": f"file{i}",
"targetFileExtension": "in.gz",
"responseFileExtension": "out.gz",
"pages": [0, 2, 3]
}
yield d, body
return list(inner())

8
test/utils/storage.py Normal file
View File

@ -0,0 +1,8 @@
import json
from pyinfra.server.packing import bytes_to_string
from pyinfra.utils.encoding import compress
def pack_for_upload(data: bytes):
return compress(json.dumps(bytes_to_string(data)).encode())