Merge branch '2.0.0-refactoring' of ssh://git.iqser.com:2222/rr/pyinfra into 2.0.0-input-output-file-pattern-for-download-strategy

This commit is contained in:
Matthias Bisping 2022-06-22 16:04:24 +02:00
commit 4a0bd216a4
3 changed files with 38 additions and 30 deletions

View File

@ -1,5 +1,5 @@
from functools import singledispatch
from typing import Dict, Union, Callable
from typing import Dict, Callable, Union
from flask import Flask, jsonify, request
from funcy import merge
@ -10,8 +10,6 @@ from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
from pyinfra.server.stream.rest import LazyRestProcessor
@singledispatch
def set_up_processing_server(arg: Union[dict, Callable], buffer_size=1):
"""Produces a processing server given a streamable function or a mapping from operations to streamable functions.
@ -51,32 +49,6 @@ def __stream_fn_to_processing_server(operation2stream_fn: dict, buffer_size):
return __set_up_processing_server(operation2stream_fn)
#
#
# def set_up_processing_server(server_stream_function, buffer_size):
# flat_stream_buffer = FlatStreamBuffer(server_stream_function, buffer_size=buffer_size)
# queued_stream_function = QueuedStreamFunction(flat_stream_buffer)
# return __set_up_processing_server(queued_stream_function)
#
#
# @singledispatch
# def __set_up_processing_server(arg):
# pass
#
#
# @__set_up_processing_server.register
# def _(operation2function: dict):
# return __set_up_processing_server_impl(operation2function)
#
#
# @__set_up_processing_server.register
# def _(queued_stream_function: object):
# operation2function = {None: queued_stream_function}
# return __set_up_processing_server_impl(operation2function)
def __set_up_processing_server(operation2function: Dict[str, QueuedStreamFunction]):
app = Flask(__name__)
registry = CollectorRegistry(auto_describe=True)

View File

@ -13,7 +13,7 @@ def parse_args():
parser.add_argument(
"--analysis_container",
"-a",
choices=["detr", "ner", "image", "conversion", "extraction", "dl_error"],
choices=["detr", "ner", "image", "conversion", "extraction", "dl_error", "table_parsing"],
required=True,
)
args = parser.parse_args()
@ -61,6 +61,13 @@ def build_message_bodies(analyse_container_type, bucket_name):
"pages": [1, 2, 3],
}
)
if analyse_container_type == "table_parsing":
message_dict.update(
{
"operation": "table_parsing",
"pages": [1, 2, 3],
}
)
if analyse_container_type == "extraction":
message_dict.update(
{"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "json.gz", "operation": "extraction"}
@ -89,6 +96,9 @@ def main(args):
channel = make_channel(connection)
declare_queue(channel, CONFIG.rabbitmq.queues.input)
declare_queue(channel, CONFIG.rabbitmq.queues.output)
if args.analysis_container == "table_parsing":
CONFIG["service"]["target_file_extension"] = "json.gz"
# CONFIG["service"]["download_strategy"] = "multi"
for body in build_message_bodies(args.analysis_container, args.bucket_name):
channel.basic_publish("", CONFIG.rabbitmq.queues.input, body)

View File

@ -0,0 +1,26 @@
import argparse
import gzip
import json
from pyinfra.server.packing import bytes_to_string
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--compressed_json", "-j", required=True)
return parser.parse_args()
def main(fp):
with open(fp, "rb") as f:
compressed_json = f.read()
json_str = gzip.decompress(compressed_json)
json_dict = json.loads(json_str)
print(json_dict)
if __name__ == "__main__":
fp = parse_args().compressed_json
main(fp)