Pull request #13: Add storage save response

Merge in RR/pyinfra from add-storage-save-response to master

Squashed commit of the following:

commit cb96b7943c388f020d11e97e7e1f8402efa2dd12
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date:   Mon Feb 28 16:06:29 2022 +0100

    improved config

commit 607e938f16a39215c81c88bb76377bcc90282011
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date:   Mon Feb 28 15:57:21 2022 +0100

    response save on storage logic added

commit 7b66bcaafe32e80a64ad09ef424c28fe1f73cd23
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date:   Mon Feb 28 15:22:19 2022 +0100

    tidy up & black

commit 15a5687bac5bac599d162c86f5cec2cd945c5039
Author: Julius Unverfehrt <Julius.Unverfehrt@iqser.com>
Date:   Mon Feb 28 15:20:50 2022 +0100

    response save on storage logic added WIP
This commit is contained in:
Julius Unverfehrt 2022-02-28 16:12:57 +01:00
parent 6990840f64
commit 55be1c345b
7 changed files with 54 additions and 34 deletions

View File

@ -1,5 +1,8 @@
service:
logging_level: $LOGGING_LEVEL_ROOT|DEBUG # Logging level for service logger
response:
save: True # file-to-storage upload
extension: $RESPONSE_FILE_EXTENSION|".NER_ENTITIES.json.gz" # {.OBJECTS.json.gz | .NER_ENTITIES.json.gz}
probing_webserver:
host: $PROBING_WEBSERVER_HOST|"0.0.0.0" # Probe webserver address
@ -33,7 +36,7 @@ storage:
backend: $STORAGE_BACKEND|s3 # The type of storage to use {s3, azure}
bucket: $STORAGE_BUCKET|"pyinfra-test-bucket" # The bucket / container to pull files specified in queue requests from
target_file_extension: $TARGET_FILE_EXTENSION|pdf # Defines type of file to pull from storage
target_file_extension: $TARGET_FILE_EXTENSION|".TEXT.json.gz" # {.TEXT.json.gz | .ORIGIN.pdf.gz} Defines type of file to pull from storage
s3:
endpoint: $STORAGE_ENDPOINT|"http://127.0.0.1:9000"

View File

@ -1,9 +1,13 @@
import json
import logging
import tempfile
from time import sleep
from pyinfra.config import CONFIG
from pyinfra.exceptions import AnalysisFailure, DataLoadingFailure
from pyinfra.rabbitmq import make_connection, make_channel, declare_queue
from pyinfra.storage.storages import get_storage
from pyinfra.utils.file import upload_compressed_response
def make_retry_callback(republish, max_attempts):
@ -56,8 +60,17 @@ def make_callback_for_output_queue(json_wrapped_body_processor, output_queue_nam
def callback(channel, method, _, body):
result = json_wrapped_body_processor(body)
channel.basic_publish(exchange="", routing_key=output_queue_name, body=result)
dossier_id, file_id, result = json_wrapped_body_processor(body)
if not CONFIG.service.response.save:
channel.basic_publish(exchange="", routing_key=output_queue_name, body=result)
else:
upload_compressed_response(
get_storage(CONFIG.storage.backend), CONFIG.storage.bucket, dossier_id, file_id, result
)
result = json.dumps({"dossierId": dossier_id, "fileId": file_id})
channel.basic_publish(exchange="", routing_key=output_queue_name, body=result)
channel.basic_ack(delivery_tag=method.delivery_tag)
return callback

View File

@ -1,18 +1,21 @@
import gzip
import json
import logging
from operator import itemgetter
import requests
from pyinfra.callback import json_wrap
from pyinfra.config import CONFIG
from pyinfra.exceptions import DataLoadingFailure, AnalysisFailure, ProcessingFailure
from pyinfra.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name
from pyinfra.utils.file import combine_dossier_id_and_file_id_and_extension
def make_storage_data_loader(storage, bucket_name):
def get_object_name(payload: dict) -> str:
dossier_id, file_id = itemgetter("dossierId", "fileId")(payload)
object_name = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id)
object_name = combine_dossier_id_and_file_id_and_extension(
dossier_id, file_id, CONFIG.storage.target_file_extension
)
return object_name
def download(payload):
@ -52,13 +55,14 @@ def make_analyzer(analysis_endpoint):
def make_payload_processor(load_data, analyze_file):
@json_wrap
def process(payload: dict):
logging.info(f"Processing {payload}...")
try:
payload = json.loads(payload)
dossier_id, file_id = itemgetter("dossierId", "fileId")(payload)
data = load_data(payload)
predictions = analyze_file(data)
return predictions
return dossier_id, file_id, json.dumps(predictions)
except (DataLoadingFailure, AnalysisFailure) as err:
logging.warning(f"Processing of {payload} failed.")
raise ProcessingFailure() from err

View File

@ -1,3 +1,4 @@
from pyinfra.exceptions import UnknownStorageBackend
from pyinfra.storage.adapters.azure import AzureStorageAdapter
from pyinfra.storage.adapters.s3 import S3StorageAdapter
from pyinfra.storage.clients.azure import get_azure_client
@ -11,3 +12,15 @@ def get_azure_storage(config=None):
def get_s3_storage(config=None):
return Storage(S3StorageAdapter(get_s3_client(config)))
def get_storage(storage_backend):
if storage_backend == "s3":
storage = get_s3_storage()
elif storage_backend == "azure":
storage = get_azure_storage()
else:
raise UnknownStorageBackend(f"Unknown storage backend '{storage_backend}'.")
return storage

View File

@ -1,17 +1,16 @@
"""Defines utilities for different operations on files."""
import gzip
import os
from pyinfra.config import CONFIG
def produce_compressed_storage_pdf_object_name(path_no_ext, ext=None):
if not ext:
ext = CONFIG.storage.target_file_extension
return f"{path_no_ext}.ORIGIN.{ext}.gz"
def combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, extension):
return f"{dossier_id}/{file_id}{extension}"
def dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, file_id):
path_no_ext = os.path.join(dossier_id, file_id)
pdf_object_name = produce_compressed_storage_pdf_object_name(path_no_ext)
return pdf_object_name
def upload_compressed_response(storage, bucket_name, dossier_id, file_id, result) -> None:
data = gzip.compress(result.encode())
path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, CONFIG.service.response.extension)
storage.put_object(bucket_name, path_gz, data)

View File

@ -7,7 +7,7 @@ from tqdm import tqdm
from pyinfra.config import CONFIG
from pyinfra.storage.storages import get_s3_storage
from pyinfra.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name
from pyinfra.utils.file import combine_dossier_id_and_file_id_and_extension
def parse_args():
@ -29,7 +29,9 @@ def parse_args():
def add_file_compressed(storage, bucket_name, dossier_id, path) -> None:
path_gz = dossier_id_and_file_id_to_compressed_storage_pdf_object_name(dossier_id, Path(path).stem)
path_gz = combine_dossier_id_and_file_id_and_extension(
dossier_id, Path(path).stem, CONFIG.storage.target_file_extension
)
with open(path, "rb") as f:
data = gzip.compress(f.read())

View File

@ -11,22 +11,8 @@ from pyinfra.callback import (
from pyinfra.config import CONFIG
from pyinfra.consume import consume, ConsumerError
from pyinfra.core import make_payload_processor, make_storage_data_loader, make_analyzer
from pyinfra.exceptions import UnknownStorageBackend
from pyinfra.flask import run_probing_webserver, set_up_probing_webserver
from pyinfra.storage.storages import get_azure_storage, get_s3_storage
def get_storage():
storage_backend = CONFIG.storage.backend
if storage_backend == "s3":
storage = get_s3_storage()
elif storage_backend == "azure":
storage = get_azure_storage()
else:
raise UnknownStorageBackend(f"Unknown storage backend '{storage_backend}'.")
return storage
from pyinfra.storage.storages import get_storage
def republish(channel, body, n_current_attempts):
@ -40,7 +26,7 @@ def republish(channel, body, n_current_attempts):
def make_callback():
load_data = make_storage_data_loader(get_storage(), CONFIG.storage.bucket)
load_data = make_storage_data_loader(get_storage(CONFIG.storage.backend), CONFIG.storage.bucket)
analyze_file = make_analyzer(CONFIG.rabbitmq.callback.analysis_endpoint)
json_wrapped_body_processor = make_payload_processor(load_data, analyze_file)