diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index 8e471f4..8d1c06d 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -1,5 +1,5 @@ from functools import singledispatch -from typing import Dict +from typing import Dict, Callable, Union from flask import Flask, jsonify, request from funcy import merge @@ -10,41 +10,46 @@ from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction from pyinfra.server.stream.rest import LazyRestProcessor -def set_up_processing_server(server_stream_function, buffer_size): - flat_stream_buffer = FlatStreamBuffer(server_stream_function, buffer_size=buffer_size) - queued_stream_function = QueuedStreamFunction(flat_stream_buffer) - return __set_up_processing_server(queued_stream_function) - - -def build_endpoint_suffixes(op: str): - return {"submit_suffix": submit_suffix(op), "pickup_suffix": pickup_suffix(op)} - - -def submit_suffix(op: str): - return "submit" if not op else op - - -def pickup_suffix(op: str): - return "pickup" if not op else f"{op}_pickup" - - @singledispatch -def __set_up_processing_server(arg): +def set_up_processing_server(arg: Union[dict, Callable], buffer_size=1): + """Produces a processing server given a streamable function or a mapping from operations to streamable functions. + Streamable functions are constructed by calling pyinfra.server.utils.make_streamable_and_wrap_in_packing_logic on a + function taking a tuple of data and metadata and also returning a tuple or yielding tuples of data and metadata. + If the function doesn't produce data, data should be an empty byte string. + If the function doesn't produce metadata, metadata should be an empty dictionary. + + Args: + arg: streamable function or mapping of operations: str to streamable functions + buffer_size: If your function operates on batches this parameter controls how many items are aggregated before + your function is applied. + + TODO: buffer_size has to be controllable on per function basis. + + Returns: + Processing server: flask app + """ pass -@__set_up_processing_server.register -def _(operation2function: dict): - return __set_up_processing_server_impl(operation2function) +@set_up_processing_server.register +def _(operation2stream_fn: dict, buffer_size): + return __stream_fn_to_processing_server(operation2stream_fn, buffer_size) -@__set_up_processing_server.register -def _(queued_stream_function: object): - operation2function = {None: queued_stream_function} - return __set_up_processing_server_impl(operation2function) +@set_up_processing_server.register +def _(stream_fn: object, buffer_size): + operation2stream_fn = {None: stream_fn} + return __stream_fn_to_processing_server(operation2stream_fn, buffer_size) -def __set_up_processing_server_impl(operation2function: Dict[str, QueuedStreamFunction]): +def __stream_fn_to_processing_server(operation2stream_fn: dict, buffer_size): + operation2stream_fn = { + op: QueuedStreamFunction(FlatStreamBuffer(fn, buffer_size)) for op, fn in operation2stream_fn.items() + } + return __set_up_processing_server(operation2stream_fn) + + +def __set_up_processing_server(operation2function: Dict[str, QueuedStreamFunction]): app = Flask(__name__) registry = CollectorRegistry(auto_describe=True) @@ -88,3 +93,15 @@ def __set_up_processing_server_impl(operation2function: Dict[str, QueuedStreamFu return operation2processor[operation].pop() return app + + +def build_endpoint_suffixes(op: str): + return {"submit_suffix": submit_suffix(op), "pickup_suffix": pickup_suffix(op)} + + +def submit_suffix(op: str): + return "submit" if not op else op + + +def pickup_suffix(op: str): + return "pickup" if not op else f"{op}_pickup" diff --git a/scripts/mock_client.py b/scripts/mock_client.py index 7b05c5d..c38b1f0 100644 --- a/scripts/mock_client.py +++ b/scripts/mock_client.py @@ -13,7 +13,7 @@ def parse_args(): parser.add_argument( "--analysis_container", "-a", - choices=["detr", "ner", "image", "conversion", "extraction", "dl_error"], + choices=["detr", "ner", "image", "conversion", "extraction", "dl_error", "table_parsing"], required=True, ) args = parser.parse_args() @@ -61,6 +61,13 @@ def build_message_bodies(analyse_container_type, bucket_name): "pages": [1, 2, 3], } ) + if analyse_container_type == "table_parsing": + message_dict.update( + { + "operation": "table_parsing", + "pages": [1, 2, 3], + } + ) if analyse_container_type == "extraction": message_dict.update( {"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "json.gz", "operation": "extraction"} @@ -89,6 +96,9 @@ def main(args): channel = make_channel(connection) declare_queue(channel, CONFIG.rabbitmq.queues.input) declare_queue(channel, CONFIG.rabbitmq.queues.output) + if args.analysis_container == "table_parsing": + CONFIG["service"]["target_file_extension"] = "json.gz" + # CONFIG["service"]["download_strategy"] = "multi" for body in build_message_bodies(args.analysis_container, args.bucket_name): channel.basic_publish("", CONFIG.rabbitmq.queues.input, body)