diff --git a/config.yaml b/config.yaml index d56fbe2..a15aee5 100755 --- a/config.yaml +++ b/config.yaml @@ -1,5 +1,8 @@ service: logging_level: $LOGGING_LEVEL_ROOT|DEBUG # Logging level for service logger + response: + save: True # file-to-storage upload + extension: $RESPONSE_FILE_EXTENSION|".NER_ENTITIES.json.gz" # {.OBJECTS.json.gz | .NER_ENTITIES.json.gz} probing_webserver: host: $PROBING_WEBSERVER_HOST|"0.0.0.0" # Probe webserver address @@ -33,7 +36,7 @@ storage: backend: $STORAGE_BACKEND|s3 # The type of storage to use {s3, azure} bucket: $STORAGE_BUCKET|"pyinfra-test-bucket" # The bucket / container to pull files specified in queue requests from - target_file_extension: $TARGET_FILE_EXTENSION|pdf # Defines type of file to pull from storage + target_file_extension: $TARGET_FILE_EXTENSION|".TEXT.json.gz" # {.TEXT.json.gz | .ORIGIN.pdf.gz} Defines type of file to pull from storage s3: endpoint: $STORAGE_ENDPOINT|"http://127.0.0.1:9000" diff --git a/pyinfra/callback.py b/pyinfra/callback.py index ee6b4f6..5684bec 100644 --- a/pyinfra/callback.py +++ b/pyinfra/callback.py @@ -1,9 +1,13 @@ import json import logging +import tempfile from time import sleep +from pyinfra.config import CONFIG from pyinfra.exceptions import AnalysisFailure, DataLoadingFailure from pyinfra.rabbitmq import make_connection, make_channel, declare_queue +from pyinfra.storage.storages import get_storage +from pyinfra.utils.file import upload_compressed_response def make_retry_callback(republish, max_attempts): @@ -56,8 +60,17 @@ def make_callback_for_output_queue(json_wrapped_body_processor, output_queue_nam def callback(channel, method, _, body): - result = json_wrapped_body_processor(body) - channel.basic_publish(exchange="", routing_key=output_queue_name, body=result) + dossier_id, file_id, result = json_wrapped_body_processor(body) + + if not CONFIG.service.response.save: + channel.basic_publish(exchange="", routing_key=output_queue_name, body=result) + else: + upload_compressed_response( + get_storage(CONFIG.storage.backend), CONFIG.storage.bucket, dossier_id, file_id, result + ) + result = json.dumps({"dossierId": dossier_id, "fileId": file_id}) + channel.basic_publish(exchange="", routing_key=output_queue_name, body=result) + channel.basic_ack(delivery_tag=method.delivery_tag) return callback diff --git a/pyinfra/core.py b/pyinfra/core.py index fe85a9d..9a6c82f 100644 --- a/pyinfra/core.py +++ b/pyinfra/core.py @@ -1,18 +1,21 @@ import gzip +import json import logging from operator import itemgetter import requests -from pyinfra.callback import json_wrap +from pyinfra.config import CONFIG from pyinfra.exceptions import DataLoadingFailure, AnalysisFailure, ProcessingFailure -from pyinfra.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name +from pyinfra.utils.file import combine_dossier_id_and_file_id_and_extension def make_storage_data_loader(storage, bucket_name): def get_object_name(payload: dict) -> str: dossier_id, file_id = itemgetter("dossierId", "fileId")(payload) - object_name = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id) + object_name = combine_dossier_id_and_file_id_and_extension( + dossier_id, file_id, CONFIG.storage.target_file_extension + ) return object_name def download(payload): @@ -52,13 +55,14 @@ def make_analyzer(analysis_endpoint): def make_payload_processor(load_data, analyze_file): - @json_wrap def process(payload: dict): logging.info(f"Processing {payload}...") try: + payload = json.loads(payload) + dossier_id, file_id = itemgetter("dossierId", "fileId")(payload) data = load_data(payload) predictions = analyze_file(data) - return predictions + return dossier_id, file_id, json.dumps(predictions) except (DataLoadingFailure, AnalysisFailure) as err: logging.warning(f"Processing of {payload} failed.") raise ProcessingFailure() from err diff --git a/pyinfra/storage/storages.py b/pyinfra/storage/storages.py index 7b56940..2e58186 100644 --- a/pyinfra/storage/storages.py +++ b/pyinfra/storage/storages.py @@ -1,3 +1,4 @@ +from pyinfra.exceptions import UnknownStorageBackend from pyinfra.storage.adapters.azure import AzureStorageAdapter from pyinfra.storage.adapters.s3 import S3StorageAdapter from pyinfra.storage.clients.azure import get_azure_client @@ -11,3 +12,15 @@ def get_azure_storage(config=None): def get_s3_storage(config=None): return Storage(S3StorageAdapter(get_s3_client(config))) + + +def get_storage(storage_backend): + + if storage_backend == "s3": + storage = get_s3_storage() + elif storage_backend == "azure": + storage = get_azure_storage() + else: + raise UnknownStorageBackend(f"Unknown storage backend '{storage_backend}'.") + + return storage diff --git a/pyinfra/utils/file.py b/pyinfra/utils/file.py index 216f6ab..a2cbbd2 100644 --- a/pyinfra/utils/file.py +++ b/pyinfra/utils/file.py @@ -1,17 +1,16 @@ """Defines utilities for different operations on files.""" +import gzip import os from pyinfra.config import CONFIG -def produce_compressed_storage_pdf_object_name(path_no_ext, ext=None): - if not ext: - ext = CONFIG.storage.target_file_extension - return f"{path_no_ext}.ORIGIN.{ext}.gz" +def combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, extension): + return f"{dossier_id}/{file_id}{extension}" -def dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id): - path_no_ext = os.path.join(dossier_id, file_id) - pdf_object_name = produce_compressed_storage_pdf_object_name(path_no_ext) - return pdf_object_name +def upload_compressed_response(storage, bucket_name, dossier_id, file_id, result) -> None: + data = gzip.compress(result.encode()) + path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, CONFIG.service.response.extension) + storage.put_object(bucket_name, path_gz, data) diff --git a/scripts/manage_minio.py b/scripts/manage_minio.py index d141b09..c1bbc65 100644 --- a/scripts/manage_minio.py +++ b/scripts/manage_minio.py @@ -7,7 +7,7 @@ from tqdm import tqdm from pyinfra.config import CONFIG from pyinfra.storage.storages import get_s3_storage -from pyinfra.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name +from pyinfra.utils.file import combine_dossier_id_and_file_id_and_extension def parse_args(): @@ -29,7 +29,9 @@ def parse_args(): def add_file_compressed(storage, bucket_name, dossier_id, path) -> None: - path_gz = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, Path(path).stem) + path_gz = combine_dossier_id_and_file_id_and_extension( + dossier_id, Path(path).stem, CONFIG.storage.target_file_extension + ) with open(path, "rb") as f: data = gzip.compress(f.read()) diff --git a/src/serve.py b/src/serve.py index dca4d7d..a712f06 100644 --- a/src/serve.py +++ b/src/serve.py @@ -11,22 +11,8 @@ from pyinfra.callback import ( from pyinfra.config import CONFIG from pyinfra.consume import consume, ConsumerError from pyinfra.core import make_payload_processor, make_storage_data_loader, make_analyzer -from pyinfra.exceptions import UnknownStorageBackend from pyinfra.flask import run_probing_webserver, set_up_probing_webserver -from pyinfra.storage.storages import get_azure_storage, get_s3_storage - - -def get_storage(): - - storage_backend = CONFIG.storage.backend - if storage_backend == "s3": - storage = get_s3_storage() - elif storage_backend == "azure": - storage = get_azure_storage() - else: - raise UnknownStorageBackend(f"Unknown storage backend '{storage_backend}'.") - - return storage +from pyinfra.storage.storages import get_storage def republish(channel, body, n_current_attempts): @@ -40,7 +26,7 @@ def republish(channel, body, n_current_attempts): def make_callback(): - load_data = make_storage_data_loader(get_storage(), CONFIG.storage.bucket) + load_data = make_storage_data_loader(get_storage(CONFIG.storage.backend), CONFIG.storage.bucket) analyze_file = make_analyzer(CONFIG.rabbitmq.callback.analysis_endpoint) json_wrapped_body_processor = make_payload_processor(load_data, analyze_file)