diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index 767eb93..301f239 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -1,5 +1,5 @@ from functools import singledispatch -from typing import Dict, Union, Callable +from typing import Dict, Callable, Union from flask import Flask, jsonify, request from funcy import merge @@ -10,8 +10,6 @@ from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction from pyinfra.server.stream.rest import LazyRestProcessor - - @singledispatch def set_up_processing_server(arg: Union[dict, Callable], buffer_size=1): """Produces a processing server given a streamable function or a mapping from operations to streamable functions. @@ -51,32 +49,6 @@ def __stream_fn_to_processing_server(operation2stream_fn: dict, buffer_size): return __set_up_processing_server(operation2stream_fn) - - -# -# -# def set_up_processing_server(server_stream_function, buffer_size): -# flat_stream_buffer = FlatStreamBuffer(server_stream_function, buffer_size=buffer_size) -# queued_stream_function = QueuedStreamFunction(flat_stream_buffer) -# return __set_up_processing_server(queued_stream_function) -# -# -# @singledispatch -# def __set_up_processing_server(arg): -# pass -# -# -# @__set_up_processing_server.register -# def _(operation2function: dict): -# return __set_up_processing_server_impl(operation2function) -# -# -# @__set_up_processing_server.register -# def _(queued_stream_function: object): -# operation2function = {None: queued_stream_function} -# return __set_up_processing_server_impl(operation2function) - - def __set_up_processing_server(operation2function: Dict[str, QueuedStreamFunction]): app = Flask(__name__) registry = CollectorRegistry(auto_describe=True) diff --git a/scripts/mock_client.py b/scripts/mock_client.py index 7b05c5d..c38b1f0 100644 --- a/scripts/mock_client.py +++ b/scripts/mock_client.py @@ -13,7 +13,7 @@ def parse_args(): parser.add_argument( "--analysis_container", "-a", - choices=["detr", "ner", "image", "conversion", "extraction", "dl_error"], + choices=["detr", "ner", "image", "conversion", "extraction", "dl_error", "table_parsing"], required=True, ) args = parser.parse_args() @@ -61,6 +61,13 @@ def build_message_bodies(analyse_container_type, bucket_name): "pages": [1, 2, 3], } ) + if analyse_container_type == "table_parsing": + message_dict.update( + { + "operation": "table_parsing", + "pages": [1, 2, 3], + } + ) if analyse_container_type == "extraction": message_dict.update( {"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "json.gz", "operation": "extraction"} @@ -89,6 +96,9 @@ def main(args): channel = make_channel(connection) declare_queue(channel, CONFIG.rabbitmq.queues.input) declare_queue(channel, CONFIG.rabbitmq.queues.output) + if args.analysis_container == "table_parsing": + CONFIG["service"]["target_file_extension"] = "json.gz" + # CONFIG["service"]["download_strategy"] = "multi" for body in build_message_bodies(args.analysis_container, args.bucket_name): channel.basic_publish("", CONFIG.rabbitmq.queues.input, body) diff --git a/scripts/show_compressed_json.py b/scripts/show_compressed_json.py new file mode 100644 index 0000000..999dea7 --- /dev/null +++ b/scripts/show_compressed_json.py @@ -0,0 +1,26 @@ +import argparse +import gzip +import json + +from pyinfra.server.packing import bytes_to_string + + +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--compressed_json", "-j", required=True) + return parser.parse_args() + + +def main(fp): + with open(fp, "rb") as f: + compressed_json = f.read() + + json_str = gzip.decompress(compressed_json) + json_dict = json.loads(json_str) + + print(json_dict) + + +if __name__ == "__main__": + fp = parse_args().compressed_json + main(fp)