refactor processing server set up to also work with mappings of operations to functions

This commit is contained in:
Julius Unverfehrt 2022-06-21 11:27:38 +02:00
parent 2362619bef
commit 92b4416f21
2 changed files with 56 additions and 29 deletions

View File

@ -1,5 +1,5 @@
from functools import singledispatch
from typing import Dict
from typing import Dict, Callable, Union
from flask import Flask, jsonify, request
from funcy import merge
@ -10,41 +10,46 @@ from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
from pyinfra.server.stream.rest import LazyRestProcessor
def set_up_processing_server(server_stream_function, buffer_size):
flat_stream_buffer = FlatStreamBuffer(server_stream_function, buffer_size=buffer_size)
queued_stream_function = QueuedStreamFunction(flat_stream_buffer)
return __set_up_processing_server(queued_stream_function)
def build_endpoint_suffixes(op: str):
return {"submit_suffix": submit_suffix(op), "pickup_suffix": pickup_suffix(op)}
def submit_suffix(op: str):
return "submit" if not op else op
def pickup_suffix(op: str):
return "pickup" if not op else f"{op}_pickup"
@singledispatch
def __set_up_processing_server(arg):
def set_up_processing_server(arg: Union[dict, Callable], buffer_size=1):
"""Produces a processing server given a streamable function or a mapping from operations to streamable functions.
Streamable functions are constructed by calling pyinfra.server.utils.make_streamable_and_wrap_in_packing_logic on a
function taking a tuple of data and metadata and also returning a tuple or yielding tuples of data and metadata.
If the function doesn't produce data, data should be an empty byte string.
If the function doesn't produce metadata, metadata should be an empty dictionary.
Args:
arg: streamable function or mapping of operations: str to streamable functions
buffer_size: If your function operates on batches this parameter controls how many items are aggregated before
your function is applied.
TODO: buffer_size has to be controllable on per function basis.
Returns:
Processing server: flask app
"""
pass
@__set_up_processing_server.register
def _(operation2function: dict):
return __set_up_processing_server_impl(operation2function)
@set_up_processing_server.register
def _(operation2stream_fn: dict, buffer_size):
return __stream_fn_to_processing_server(operation2stream_fn, buffer_size)
@__set_up_processing_server.register
def _(queued_stream_function: object):
operation2function = {None: queued_stream_function}
return __set_up_processing_server_impl(operation2function)
@set_up_processing_server.register
def _(stream_fn: object, buffer_size):
operation2stream_fn = {None: stream_fn}
return __stream_fn_to_processing_server(operation2stream_fn, buffer_size)
def __set_up_processing_server_impl(operation2function: Dict[str, QueuedStreamFunction]):
def __stream_fn_to_processing_server(operation2stream_fn: dict, buffer_size):
operation2stream_fn = {
op: QueuedStreamFunction(FlatStreamBuffer(fn, buffer_size)) for op, fn in operation2stream_fn.items()
}
return __set_up_processing_server(operation2stream_fn)
def __set_up_processing_server(operation2function: Dict[str, QueuedStreamFunction]):
app = Flask(__name__)
registry = CollectorRegistry(auto_describe=True)
@ -88,3 +93,15 @@ def __set_up_processing_server_impl(operation2function: Dict[str, QueuedStreamFu
return operation2processor[operation].pop()
return app
def build_endpoint_suffixes(op: str):
return {"submit_suffix": submit_suffix(op), "pickup_suffix": pickup_suffix(op)}
def submit_suffix(op: str):
return "submit" if not op else op
def pickup_suffix(op: str):
return "pickup" if not op else f"{op}_pickup"

View File

@ -13,7 +13,7 @@ def parse_args():
parser.add_argument(
"--analysis_container",
"-a",
choices=["detr", "ner", "image", "conversion", "extraction", "dl_error"],
choices=["detr", "ner", "image", "conversion", "extraction", "dl_error", "table_parsing"],
required=True,
)
args = parser.parse_args()
@ -61,6 +61,13 @@ def build_message_bodies(analyse_container_type, bucket_name):
"pages": [1, 2, 3],
}
)
if analyse_container_type == "table_parsing":
message_dict.update(
{
"operation": "table_parsing",
"pages": [1, 2, 3],
}
)
if analyse_container_type == "extraction":
message_dict.update(
{"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "json.gz", "operation": "extraction"}
@ -89,6 +96,9 @@ def main(args):
channel = make_channel(connection)
declare_queue(channel, CONFIG.rabbitmq.queues.input)
declare_queue(channel, CONFIG.rabbitmq.queues.output)
if args.analysis_container == "table_parsing":
CONFIG["service"]["target_file_extension"] = "json.gz"
# CONFIG["service"]["download_strategy"] = "multi"
for body in build_message_bodies(args.analysis_container, args.bucket_name):
channel.basic_publish("", CONFIG.rabbitmq.queues.input, body)