From f7e4953a4e5932d957434108e2882f75e057a673 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Sat, 18 Jun 2022 22:24:18 +0200 Subject: [PATCH] refactored visitor module: split into various submodules --- pyinfra/default_objects.py | 3 +- pyinfra/exceptions.py | 4 + pyinfra/visitor.py | 361 ------------------ pyinfra/visitor/__init__.py | 1 + pyinfra/visitor/dispatch/__init__.py | 0 pyinfra/visitor/dispatch/dispatch.py | 7 + .../visitor/dispatch/identifier_dispatch.py | 21 + pyinfra/visitor/strategies/__init__.py | 0 .../visitor/strategies/download/__init__.py | 0 .../visitor/strategies/download/download.py | 38 ++ pyinfra/visitor/strategies/download/multi.py | 50 +++ pyinfra/visitor/strategies/download/single.py | 25 ++ .../visitor/strategies/parsing/__init__.py | 0 pyinfra/visitor/strategies/parsing/dynamic.py | 20 + pyinfra/visitor/strategies/parsing/parsing.py | 14 + .../visitor/strategies/response/__init__.py | 0 .../strategies/response/aggregation.py | 60 +++ .../visitor/strategies/response/forwarding.py | 6 + .../visitor/strategies/response/response.py | 34 ++ .../visitor/strategies/response/storage.py | 16 + pyinfra/visitor/utils.py | 70 ++++ pyinfra/visitor/visitor.py | 50 +++ test/conftest.py | 4 +- test/integration_tests/serve_test.py | 3 +- test/unit_tests/queue_visitor_test.py | 2 +- 25 files changed, 424 insertions(+), 365 deletions(-) delete mode 100644 pyinfra/visitor.py create mode 100644 pyinfra/visitor/__init__.py create mode 100644 pyinfra/visitor/dispatch/__init__.py create mode 100644 pyinfra/visitor/dispatch/dispatch.py create mode 100644 pyinfra/visitor/dispatch/identifier_dispatch.py create mode 100644 pyinfra/visitor/strategies/__init__.py create mode 100644 pyinfra/visitor/strategies/download/__init__.py create mode 100644 pyinfra/visitor/strategies/download/download.py create mode 100644 pyinfra/visitor/strategies/download/multi.py create mode 100644 pyinfra/visitor/strategies/download/single.py create mode 100644 pyinfra/visitor/strategies/parsing/__init__.py create mode 100644 pyinfra/visitor/strategies/parsing/dynamic.py create mode 100644 pyinfra/visitor/strategies/parsing/parsing.py create mode 100644 pyinfra/visitor/strategies/response/__init__.py create mode 100644 pyinfra/visitor/strategies/response/aggregation.py create mode 100644 pyinfra/visitor/strategies/response/forwarding.py create mode 100644 pyinfra/visitor/strategies/response/response.py create mode 100644 pyinfra/visitor/strategies/response/storage.py create mode 100644 pyinfra/visitor/utils.py create mode 100644 pyinfra/visitor/visitor.py diff --git a/pyinfra/default_objects.py b/pyinfra/default_objects.py index 9026b9f..4ade2b6 100644 --- a/pyinfra/default_objects.py +++ b/pyinfra/default_objects.py @@ -13,7 +13,8 @@ from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStre from pyinfra.server.packer.packers.rest import RestPacker from pyinfra.server.receiver.receivers.rest import RestReceiver from pyinfra.storage import storages -from pyinfra.visitor import QueueVisitor, AggregationStorageStrategy +from pyinfra.visitor import QueueVisitor +from pyinfra.visitor.strategies.response.aggregation import AggregationStorageStrategy logger = logging.getLogger(__name__) logger.setLevel(logging.ERROR) diff --git a/pyinfra/exceptions.py b/pyinfra/exceptions.py index 0fbbe54..5857415 100644 --- a/pyinfra/exceptions.py +++ b/pyinfra/exceptions.py @@ -44,3 +44,7 @@ class NoBufferCapacity(ValueError): class InvalidMessage(ValueError): pass + + +class InvalidStorageItemFormat(ValueError): + pass diff --git a/pyinfra/visitor.py b/pyinfra/visitor.py deleted file mode 100644 index 554abcc..0000000 --- a/pyinfra/visitor.py +++ /dev/null @@ -1,361 +0,0 @@ -import abc -import gzip -import json -import logging -from collections import deque -from copy import deepcopy -from operator import itemgetter -from typing import Callable, Dict, Union - -from funcy import omit, filter, lflatten -from more_itertools import peekable - -from pyinfra.config import CONFIG, parse_disjunction_string -from pyinfra.exceptions import DataLoadingFailure, InvalidMessage -from pyinfra.parser.parser_composer import EitherParserComposer -from pyinfra.parser.parsers.identity import IdentityBlobParser -from pyinfra.parser.parsers.json import JsonBlobParser -from pyinfra.parser.parsers.string import StringBlobParser -from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing -from pyinfra.server.packing import string_to_bytes -from pyinfra.storage.storage import Storage - -logger = logging.getLogger(__name__) - - -class ResponseStrategy(abc.ABC): - @abc.abstractmethod - def handle_response(self, analysis_response: dict): - pass - - def __call__(self, analysis_response: dict): - return self.handle_response(analysis_response) - - def get_response_object_descriptor(self, body): - return { - "bucket_name": parse_disjunction_string(CONFIG.storage.bucket), - "object_name": self.get_response_object_name(body), - } - - @staticmethod - def get_response_object_name(body): - - if "pages" not in body: - body["pages"] = [] - - if "id" not in body: - body["id"] = 0 - - dossier_id, file_id, pages, idnt = itemgetter("dossierId", "fileId", "pages", "id")(body) - - object_name = f"{dossier_id}/{file_id}/id:{idnt}.{CONFIG.service.response_file_extension}" - - return object_name - - -class StorageStrategy(ResponseStrategy): - def __init__(self, storage): - self.storage = storage - - def handle_response(self, body: dict): - response_object_descriptor = self.get_response_object_descriptor(body) - self.storage.put_object(**response_object_descriptor, data=gzip.compress(json.dumps(body).encode())) - body.pop("data") - body["responseFile"] = response_object_descriptor["object_name"] - return body - - -class ForwardingStrategy(ResponseStrategy): - def handle_response(self, analysis_response): - return analysis_response - - -class DispatchCallback(abc.ABC): - @abc.abstractmethod - def __call__(self, payload): - pass - - -class IdentifierDispatchCallback(DispatchCallback): - def __init__(self): - self.identifier = None - - def has_new_identifier(self, metadata): - - identifier = ":".join(itemgetter("fileId", "dossierId")(metadata)) - - if not self.identifier: - self.identifier = identifier - - return identifier != self.identifier - - def __call__(self, metadata): - - return self.has_new_identifier(metadata) - - -class AggregationStorageStrategy(ResponseStrategy): - def __init__(self, storage, merger: Callable = None, dispatch_callback: DispatchCallback = None): - self.storage = storage - self.merger = merger or list - self.dispatch_callback = dispatch_callback or IdentifierDispatchCallback() - self.buffer = deque() - - def put_object(self, data: bytes, storage_upload_info): - object_descriptor = self.get_response_object_descriptor(storage_upload_info) - self.storage.put_object(**object_descriptor, data=gzip.compress(data)) - return {**storage_upload_info, "responseFile": object_descriptor["object_name"]} - - def merge_queue_items(self): - merged_buffer_content = self.merger(self.buffer) - self.buffer.clear() - return merged_buffer_content - - def upload_queue_items(self, storage_upload_info): - data = json.dumps(self.merge_queue_items()).encode() - return self.put_object(data, storage_upload_info) - - def upload_or_aggregate(self, analysis_payload, request_metadata, last=False): - """analysis_payload : {data: ..., metadata: ...}""" - - storage_upload_info = build_storage_upload_info(analysis_payload, request_metadata) - analysis_payload["metadata"].pop("id") - - if analysis_payload["data"]: - return self.put_object(json.dumps(analysis_payload).encode(), storage_upload_info) - - else: - self.buffer.append(analysis_payload) - if last or self.dispatch_callback(storage_upload_info): - return self.upload_queue_items(storage_upload_info) - else: - return Nothing - - def handle_response(self, analysis_response, final=False): - def upload_or_aggregate(analysis_payload): - return self.upload_or_aggregate(analysis_payload, request_metadata, last=not result_data.peek(False)) - - request_metadata = omit(analysis_response, ["data"]) - result_data = peekable(analysis_response["data"]) - - yield from filter(is_not_nothing, map(upload_or_aggregate, result_data)) - - -def build_storage_upload_info(analysis_payload, request_metadata): - storage_upload_info = {**request_metadata, "id": analysis_payload["metadata"].get("id", 0)} - storage_upload_info["fileId"] = build_file_path( - storage_upload_info, storage_upload_info.get("operation", CONFIG.service.response_folder) - ) - return storage_upload_info - - -def build_file_path(storage_upload_info, folder): - return f"{storage_upload_info['fileId']}" + (f"/{folder}" if folder else "") - - -class InvalidStorageItemFormat(ValueError): - pass - - -class ParsingStrategy(abc.ABC): - @abc.abstractmethod - def parse(self, data: bytes): - pass - - @abc.abstractmethod - def parse_and_wrap(self, data: bytes): - pass - - def __call__(self, data: bytes): - return self.parse_and_wrap(data) - - -# TODO: Each analysis service should specify a custom parsing strategy for the type of data it expects to be found -# on the storage. This class is only a temporary trial-and-error->fallback type of solution. -class DynamicParsingStrategy(ParsingStrategy): - def __init__(self): - self.parser = EitherParserComposer(JsonBlobParser(), StringBlobParser(), IdentityBlobParser()) - - def parse(self, data: bytes) -> Union[bytes, dict]: - return self.parser(data) - - def parse_and_wrap(self, data): - return self.parse(data) - - -def validate(data): - if not ("data" in data and "metadata" in data): - raise InvalidStorageItemFormat(f"Expected a mapping with keys 'data' and 'metadata', got {data}.") - - -def wrap(data): - return {"data": data, "metadata": {}} - - -class QueueVisitor: - def __init__( - self, - storage: Storage, - callback: Callable, - response_strategy: ResponseStrategy, - parsing_strategy: ParsingStrategy = None, - download_strategy=None, - ): - self.storage = storage - self.callback = callback - self.download_strategy = download_strategy or get_download_strategy() - self.response_strategy = response_strategy - self.parsing_strategy = parsing_strategy or DynamicParsingStrategy() - - def load_data(self, queue_item_body): - data = self.download_strategy(self.storage, queue_item_body) - data = map(self.parsing_strategy, data) - data = map(standardize, data) - return data - - def process_storage_item(self, data_metadata_pack): - return self.callback(data_metadata_pack) - - def load_item_from_storage_and_process_with_callback(self, queue_item_body): - """Bundles the result from processing a storage item with the body of the corresponding queue item.""" - - def process_storage_item(storage_item): - analysis_input = {**storage_item, **queue_item_body} - return self.process_storage_item(analysis_input) - - storage_items = self.load_data(queue_item_body) - results = lflatten(map(process_storage_item, storage_items)) - return {"data": results, **queue_item_body} - - def __call__(self, queue_item_body): - analysis_result_body = self.load_item_from_storage_and_process_with_callback(queue_item_body) - return self.response_strategy(analysis_result_body) - - -def standardize(data) -> Dict: - """Storage items can be a blob or a blob with metadata. Standardizes to the latter. - - Cases: - 1) backend upload: data as bytes - 2) Some Python service's upload: data as bytes of a json string "{'data': , 'metadata': }", - where value of key 'data' was encoded with bytes_to_string(...) - - Returns: - {"data": bytes, "metadata": dict} - """ - - def is_blob_without_metadata(data): - return isinstance(data, bytes) - - def is_blob_with_metadata(data: Dict): - return isinstance(data, dict) - - if is_blob_without_metadata(data): - return wrap(data) - - elif is_blob_with_metadata(data): - validate(data) - return data - - else: # Fallback / used for testing with simple data - logger.warning("Encountered storage data in unexpected format.") - assert isinstance(data, str) - return wrap(string_to_bytes(data)) - - -def get_download_strategy(download_strategy_type=None): - download_strategies = { - "single": SingleDownloadStrategy(), - "multi": MultiDownloadStrategy(), - } - return download_strategies.get(download_strategy_type or CONFIG.service.download_strategy, SingleDownloadStrategy()) - - -class DownloadStrategy(abc.ABC): - def _load_data(self, storage, queue_item_body): - object_descriptor = self.get_object_descriptor(queue_item_body) - logging.debug(f"Downloading {object_descriptor}...") - data = self.__download(storage, object_descriptor) - logging.debug(f"Downloaded {object_descriptor}.") - assert isinstance(data, bytes) - data = gzip.decompress(data) - return [data] - - @staticmethod - def __download(storage, object_descriptor): - try: - data = storage.get_object(**object_descriptor) - except Exception as err: - logging.warning(f"Loading data from storage failed for {object_descriptor}.") - raise DataLoadingFailure from err - - return data - - @staticmethod - @abc.abstractmethod - def get_object_name(body: dict): - raise NotImplementedError - - def get_object_descriptor(self, body): - return {"bucket_name": parse_disjunction_string(CONFIG.storage.bucket), "object_name": self.get_object_name(body)} - - -class SingleDownloadStrategy(DownloadStrategy): - def download(self, storage, queue_item_body): - return self._load_data(storage, queue_item_body) - - @staticmethod - def get_object_name(body: dict): - - # TODO: deepcopy still necessary? - body = deepcopy(body) - - dossier_id, file_id = itemgetter("dossierId", "fileId")(body) - - object_name = f"{dossier_id}/{file_id}.{CONFIG.service.target_file_extension}" - - return object_name - - def __call__(self, storage, queue_item_body): - return self.download(storage, queue_item_body) - - -class MultiDownloadStrategy(DownloadStrategy): - def __init__(self): - # TODO: pass in bucket name from outside / introduce closure-like abstraction for the bucket - self.bucket_name = parse_disjunction_string(CONFIG.storage.bucket) - - def download(self, storage: Storage, queue_item_body): - pages = "|".join(map(str, queue_item_body["pages"])) - matches_page = r".*id:(" + pages + r").*" - - object_names = storage.get_all_object_names(self.bucket_name) - object_names = filter(matches_page, object_names) - objects = (storage.get_object(self.bucket_name, objn) for objn in object_names) - objects = map(gzip.decompress, objects) - - return objects - - @staticmethod - def get_object_name(body: dict): - - def get_key(key): - return key if key in body else False - - # TODO: deepcopy still necessary? - body = deepcopy(body) - - folder = f"/{get_key('pages') or get_key('images')}/" - if not folder: - raise InvalidMessage("Expected a folder like 'images' oder 'pages' to be specified in message.") - - idnt = f"id:{body.get('id', 0)}" - - dossier_id, file_id = itemgetter("dossierId", "fileId")(body) - - object_name = f"{dossier_id}/{file_id}{folder}{idnt}.{CONFIG.service.target_file_extension}" - - return object_name - - def __call__(self, storage, queue_item_body): - return self.download(storage, queue_item_body) diff --git a/pyinfra/visitor/__init__.py b/pyinfra/visitor/__init__.py new file mode 100644 index 0000000..e8fbecb --- /dev/null +++ b/pyinfra/visitor/__init__.py @@ -0,0 +1 @@ +from .visitor import QueueVisitor \ No newline at end of file diff --git a/pyinfra/visitor/dispatch/__init__.py b/pyinfra/visitor/dispatch/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/visitor/dispatch/dispatch.py b/pyinfra/visitor/dispatch/dispatch.py new file mode 100644 index 0000000..55ce1b9 --- /dev/null +++ b/pyinfra/visitor/dispatch/dispatch.py @@ -0,0 +1,7 @@ +import abc + + +class DispatchCallback(abc.ABC): + @abc.abstractmethod + def __call__(self, payload): + pass diff --git a/pyinfra/visitor/dispatch/identifier_dispatch.py b/pyinfra/visitor/dispatch/identifier_dispatch.py new file mode 100644 index 0000000..17842ee --- /dev/null +++ b/pyinfra/visitor/dispatch/identifier_dispatch.py @@ -0,0 +1,21 @@ +from _operator import itemgetter + +from pyinfra.visitor.dispatch.dispatch import DispatchCallback + + +class IdentifierDispatchCallback(DispatchCallback): + def __init__(self): + self.identifier = None + + def has_new_identifier(self, metadata): + + identifier = ":".join(itemgetter("fileId", "dossierId")(metadata)) + + if not self.identifier: + self.identifier = identifier + + return identifier != self.identifier + + def __call__(self, metadata): + + return self.has_new_identifier(metadata) diff --git a/pyinfra/visitor/strategies/__init__.py b/pyinfra/visitor/strategies/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/visitor/strategies/download/__init__.py b/pyinfra/visitor/strategies/download/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/visitor/strategies/download/download.py b/pyinfra/visitor/strategies/download/download.py new file mode 100644 index 0000000..574671f --- /dev/null +++ b/pyinfra/visitor/strategies/download/download.py @@ -0,0 +1,38 @@ +import abc +import gzip +import logging + +from pyinfra.config import parse_disjunction_string, CONFIG +from pyinfra.exceptions import DataLoadingFailure + + +class DownloadStrategy(abc.ABC): + def _load_data(self, storage, queue_item_body): + object_descriptor = self.get_object_descriptor(queue_item_body) + logging.debug(f"Downloading {object_descriptor}...") + data = self.__download(storage, object_descriptor) + logging.debug(f"Downloaded {object_descriptor}.") + assert isinstance(data, bytes) + data = gzip.decompress(data) + return [data] + + @staticmethod + def __download(storage, object_descriptor): + try: + data = storage.get_object(**object_descriptor) + except Exception as err: + logging.warning(f"Loading data from storage failed for {object_descriptor}.") + raise DataLoadingFailure from err + + return data + + @staticmethod + @abc.abstractmethod + def get_object_name(body: dict): + raise NotImplementedError + + def get_object_descriptor(self, body): + return { + "bucket_name": parse_disjunction_string(CONFIG.storage.bucket), + "object_name": self.get_object_name(body), + } diff --git a/pyinfra/visitor/strategies/download/multi.py b/pyinfra/visitor/strategies/download/multi.py new file mode 100644 index 0000000..82b00c9 --- /dev/null +++ b/pyinfra/visitor/strategies/download/multi.py @@ -0,0 +1,50 @@ +import gzip +from _operator import itemgetter +from copy import deepcopy + +from funcy import filter + +from pyinfra.config import parse_disjunction_string, CONFIG +from pyinfra.exceptions import InvalidMessage +from pyinfra.storage.storage import Storage +from pyinfra.visitor.strategies.download.download import DownloadStrategy + + +class MultiDownloadStrategy(DownloadStrategy): + def __init__(self): + # TODO: pass in bucket name from outside / introduce closure-like abstraction for the bucket + self.bucket_name = parse_disjunction_string(CONFIG.storage.bucket) + + def download(self, storage: Storage, queue_item_body): + pages = "|".join(map(str, queue_item_body["pages"])) + matches_page = r".*id:(" + pages + r").*" + + object_names = storage.get_all_object_names(self.bucket_name) + object_names = filter(matches_page, object_names) + objects = (storage.get_object(self.bucket_name, objn) for objn in object_names) + objects = map(gzip.decompress, objects) + + return objects + + @staticmethod + def get_object_name(body: dict): + def get_key(key): + return key if key in body else False + + # TODO: deepcopy still necessary? + body = deepcopy(body) + + folder = f"/{get_key('pages') or get_key('images')}/" + if not folder: + raise InvalidMessage("Expected a folder like 'images' oder 'pages' to be specified in message.") + + idnt = f"id:{body.get('id', 0)}" + + dossier_id, file_id = itemgetter("dossierId", "fileId")(body) + + object_name = f"{dossier_id}/{file_id}{folder}{idnt}.{CONFIG.service.target_file_extension}" + + return object_name + + def __call__(self, storage, queue_item_body): + return self.download(storage, queue_item_body) diff --git a/pyinfra/visitor/strategies/download/single.py b/pyinfra/visitor/strategies/download/single.py new file mode 100644 index 0000000..42d8073 --- /dev/null +++ b/pyinfra/visitor/strategies/download/single.py @@ -0,0 +1,25 @@ +from _operator import itemgetter +from copy import deepcopy + +from pyinfra.config import CONFIG +from pyinfra.visitor.strategies.download.download import DownloadStrategy + + +class SingleDownloadStrategy(DownloadStrategy): + def download(self, storage, queue_item_body): + return self._load_data(storage, queue_item_body) + + @staticmethod + def get_object_name(body: dict): + + # TODO: deepcopy still necessary? + body = deepcopy(body) + + dossier_id, file_id = itemgetter("dossierId", "fileId")(body) + + object_name = f"{dossier_id}/{file_id}.{CONFIG.service.target_file_extension}" + + return object_name + + def __call__(self, storage, queue_item_body): + return self.download(storage, queue_item_body) diff --git a/pyinfra/visitor/strategies/parsing/__init__.py b/pyinfra/visitor/strategies/parsing/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/visitor/strategies/parsing/dynamic.py b/pyinfra/visitor/strategies/parsing/dynamic.py new file mode 100644 index 0000000..e2a2167 --- /dev/null +++ b/pyinfra/visitor/strategies/parsing/dynamic.py @@ -0,0 +1,20 @@ +from typing import Union + +from pyinfra.parser.parser_composer import EitherParserComposer +from pyinfra.parser.parsers.identity import IdentityBlobParser +from pyinfra.parser.parsers.json import JsonBlobParser +from pyinfra.parser.parsers.string import StringBlobParser +from pyinfra.visitor.strategies.parsing.parsing import ParsingStrategy + + +# TODO: Each analysis service should specify a custom parsing strategy for the type of data it expects to be found +# on the storage. This class is only a temporary trial-and-error->fallback type of solution. +class DynamicParsingStrategy(ParsingStrategy): + def __init__(self): + self.parser = EitherParserComposer(JsonBlobParser(), StringBlobParser(), IdentityBlobParser()) + + def parse(self, data: bytes) -> Union[bytes, dict]: + return self.parser(data) + + def parse_and_wrap(self, data): + return self.parse(data) diff --git a/pyinfra/visitor/strategies/parsing/parsing.py b/pyinfra/visitor/strategies/parsing/parsing.py new file mode 100644 index 0000000..836c402 --- /dev/null +++ b/pyinfra/visitor/strategies/parsing/parsing.py @@ -0,0 +1,14 @@ +import abc + + +class ParsingStrategy(abc.ABC): + @abc.abstractmethod + def parse(self, data: bytes): + pass + + @abc.abstractmethod + def parse_and_wrap(self, data: bytes): + pass + + def __call__(self, data: bytes): + return self.parse_and_wrap(data) diff --git a/pyinfra/visitor/strategies/response/__init__.py b/pyinfra/visitor/strategies/response/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/visitor/strategies/response/aggregation.py b/pyinfra/visitor/strategies/response/aggregation.py new file mode 100644 index 0000000..28a6c9d --- /dev/null +++ b/pyinfra/visitor/strategies/response/aggregation.py @@ -0,0 +1,60 @@ +import gzip +import json +from collections import deque +from typing import Callable + +from funcy import omit, filter +from more_itertools import peekable + +from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing +from pyinfra.visitor.dispatch.dispatch import DispatchCallback +from pyinfra.visitor.dispatch.identifier_dispatch import IdentifierDispatchCallback +from pyinfra.visitor.strategies.response.response import ResponseStrategy +from pyinfra.visitor.utils import build_storage_upload_info + + +class AggregationStorageStrategy(ResponseStrategy): + def __init__(self, storage, merger: Callable = None, dispatch_callback: DispatchCallback = None): + self.storage = storage + self.merger = merger or list + self.dispatch_callback = dispatch_callback or IdentifierDispatchCallback() + self.buffer = deque() + + def put_object(self, data: bytes, storage_upload_info): + object_descriptor = self.get_response_object_descriptor(storage_upload_info) + self.storage.put_object(**object_descriptor, data=gzip.compress(data)) + return {**storage_upload_info, "responseFile": object_descriptor["object_name"]} + + def merge_queue_items(self): + merged_buffer_content = self.merger(self.buffer) + self.buffer.clear() + return merged_buffer_content + + def upload_queue_items(self, storage_upload_info): + data = json.dumps(self.merge_queue_items()).encode() + return self.put_object(data, storage_upload_info) + + def upload_or_aggregate(self, analysis_payload, request_metadata, last=False): + """analysis_payload : {data: ..., metadata: ...}""" + + storage_upload_info = build_storage_upload_info(analysis_payload, request_metadata) + analysis_payload["metadata"].pop("id") + + if analysis_payload["data"]: + return self.put_object(json.dumps(analysis_payload).encode(), storage_upload_info) + + else: + self.buffer.append(analysis_payload) + if last or self.dispatch_callback(storage_upload_info): + return self.upload_queue_items(storage_upload_info) + else: + return Nothing + + def handle_response(self, analysis_response, final=False): + def upload_or_aggregate(analysis_payload): + return self.upload_or_aggregate(analysis_payload, request_metadata, last=not result_data.peek(False)) + + request_metadata = omit(analysis_response, ["data"]) + result_data = peekable(analysis_response["data"]) + + yield from filter(is_not_nothing, map(upload_or_aggregate, result_data)) diff --git a/pyinfra/visitor/strategies/response/forwarding.py b/pyinfra/visitor/strategies/response/forwarding.py new file mode 100644 index 0000000..c71604d --- /dev/null +++ b/pyinfra/visitor/strategies/response/forwarding.py @@ -0,0 +1,6 @@ +from pyinfra.visitor.strategies.response.response import ResponseStrategy + + +class ForwardingStrategy(ResponseStrategy): + def handle_response(self, analysis_response): + return analysis_response diff --git a/pyinfra/visitor/strategies/response/response.py b/pyinfra/visitor/strategies/response/response.py new file mode 100644 index 0000000..001207a --- /dev/null +++ b/pyinfra/visitor/strategies/response/response.py @@ -0,0 +1,34 @@ +import abc +from _operator import itemgetter + +from pyinfra.config import parse_disjunction_string, CONFIG + + +class ResponseStrategy(abc.ABC): + @abc.abstractmethod + def handle_response(self, analysis_response: dict): + pass + + def __call__(self, analysis_response: dict): + return self.handle_response(analysis_response) + + def get_response_object_descriptor(self, body): + return { + "bucket_name": parse_disjunction_string(CONFIG.storage.bucket), + "object_name": self.get_response_object_name(body), + } + + @staticmethod + def get_response_object_name(body): + + if "pages" not in body: + body["pages"] = [] + + if "id" not in body: + body["id"] = 0 + + dossier_id, file_id, pages, idnt = itemgetter("dossierId", "fileId", "pages", "id")(body) + + object_name = f"{dossier_id}/{file_id}/id:{idnt}.{CONFIG.service.response_file_extension}" + + return object_name diff --git a/pyinfra/visitor/strategies/response/storage.py b/pyinfra/visitor/strategies/response/storage.py new file mode 100644 index 0000000..0749c38 --- /dev/null +++ b/pyinfra/visitor/strategies/response/storage.py @@ -0,0 +1,16 @@ +import gzip +import json + +from pyinfra.visitor.strategies.response.response import ResponseStrategy + + +class StorageStrategy(ResponseStrategy): + def __init__(self, storage): + self.storage = storage + + def handle_response(self, body: dict): + response_object_descriptor = self.get_response_object_descriptor(body) + self.storage.put_object(**response_object_descriptor, data=gzip.compress(json.dumps(body).encode())) + body.pop("data") + body["responseFile"] = response_object_descriptor["object_name"] + return body diff --git a/pyinfra/visitor/utils.py b/pyinfra/visitor/utils.py new file mode 100644 index 0000000..242704b --- /dev/null +++ b/pyinfra/visitor/utils.py @@ -0,0 +1,70 @@ +import logging +from typing import Dict + +from pyinfra.config import CONFIG +from pyinfra.exceptions import InvalidStorageItemFormat +from pyinfra.server.packing import string_to_bytes +from pyinfra.visitor.strategies.download.multi import MultiDownloadStrategy +from pyinfra.visitor.strategies.download.single import SingleDownloadStrategy + +logger = logging.getLogger() + + +def build_storage_upload_info(analysis_payload, request_metadata): + storage_upload_info = {**request_metadata, "id": analysis_payload["metadata"].get("id", 0)} + storage_upload_info["fileId"] = build_file_path( + storage_upload_info, storage_upload_info.get("operation", CONFIG.service.response_folder) + ) + return storage_upload_info + + +def build_file_path(storage_upload_info, folder): + return f"{storage_upload_info['fileId']}" + (f"/{folder}" if folder else "") + + +def validate(data): + if not ("data" in data and "metadata" in data): + raise InvalidStorageItemFormat(f"Expected a mapping with keys 'data' and 'metadata', got {data}.") + + +def wrap(data): + return {"data": data, "metadata": {}} + + +def standardize(data) -> Dict: + """Storage items can be a blob or a blob with metadata. Standardizes to the latter. + + Cases: + 1) backend upload: data as bytes + 2) Some Python service's upload: data as bytes of a json string "{'data': , 'metadata': }", + where value of key 'data' was encoded with bytes_to_string(...) + + Returns: + {"data": bytes, "metadata": dict} + """ + + def is_blob_without_metadata(data): + return isinstance(data, bytes) + + def is_blob_with_metadata(data: Dict): + return isinstance(data, dict) + + if is_blob_without_metadata(data): + return wrap(data) + + elif is_blob_with_metadata(data): + validate(data) + return data + + else: # Fallback / used for testing with simple data + logger.warning("Encountered storage data in unexpected format.") + assert isinstance(data, str) + return wrap(string_to_bytes(data)) + + +def get_download_strategy(download_strategy_type=None): + download_strategies = { + "single": SingleDownloadStrategy(), + "multi": MultiDownloadStrategy(), + } + return download_strategies.get(download_strategy_type or CONFIG.service.download_strategy, SingleDownloadStrategy()) diff --git a/pyinfra/visitor/visitor.py b/pyinfra/visitor/visitor.py new file mode 100644 index 0000000..df36235 --- /dev/null +++ b/pyinfra/visitor/visitor.py @@ -0,0 +1,50 @@ +from typing import Callable + +from funcy import lflatten + +from pyinfra.storage.storage import Storage +from pyinfra.visitor.strategies.download.download import DownloadStrategy +from pyinfra.visitor.strategies.parsing.dynamic import DynamicParsingStrategy +from pyinfra.visitor.strategies.parsing.parsing import ParsingStrategy +from pyinfra.visitor.strategies.response.response import ResponseStrategy +from pyinfra.visitor.utils import standardize, get_download_strategy + + +class QueueVisitor: + def __init__( + self, + storage: Storage, + callback: Callable, + response_strategy: ResponseStrategy, + parsing_strategy: ParsingStrategy = None, + download_strategy: DownloadStrategy = None, + ): + self.storage = storage + self.callback = callback + self.response_strategy = response_strategy + self.parsing_strategy = parsing_strategy or DynamicParsingStrategy() + self.download_strategy = download_strategy or get_download_strategy() + + def load_data(self, queue_item_body): + data = self.download_strategy(self.storage, queue_item_body) + data = map(self.parsing_strategy, data) + data = map(standardize, data) + return data + + def process_storage_item(self, data_metadata_pack): + return self.callback(data_metadata_pack) + + def load_item_from_storage_and_process_with_callback(self, queue_item_body): + """Bundles the result from processing a storage item with the body of the corresponding queue item.""" + + def process_storage_item(storage_item): + analysis_input = {**storage_item, **queue_item_body} + return self.process_storage_item(analysis_input) + + storage_items = self.load_data(queue_item_body) + results = lflatten(map(process_storage_item, storage_items)) + return {"data": results, **queue_item_body} + + def __call__(self, queue_item_body): + analysis_result_body = self.load_item_from_storage_and_process_with_callback(queue_item_body) + return self.response_strategy(analysis_result_body) diff --git a/test/conftest.py b/test/conftest.py index 69b86e4..416b372 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -21,7 +21,9 @@ from pyinfra.storage.adapters.s3 import S3StorageAdapter from pyinfra.storage.clients.azure import get_azure_client from pyinfra.storage.clients.s3 import get_s3_client from pyinfra.storage.storage import Storage -from pyinfra.visitor import StorageStrategy, ForwardingStrategy, QueueVisitor +from pyinfra.visitor import QueueVisitor +from pyinfra.visitor.strategies.response.forwarding import ForwardingStrategy +from pyinfra.visitor.strategies.response.storage import StorageStrategy from test.config import CONFIG from test.queue.queue_manager_mock import QueueManagerMock from test.storage.adapter_mock import StorageAdapterMock diff --git a/test/integration_tests/serve_test.py b/test/integration_tests/serve_test.py index 21c0e34..f6ad94f 100644 --- a/test/integration_tests/serve_test.py +++ b/test/integration_tests/serve_test.py @@ -16,7 +16,8 @@ from pyinfra.default_objects import ( ) from pyinfra.queue.consumer import Consumer from pyinfra.server.packing import unpack, pack -from pyinfra.visitor import QueueVisitor, get_download_strategy +from pyinfra.visitor import QueueVisitor +from pyinfra.visitor.utils import get_download_strategy from test.utils.input import pair_data_with_queue_message diff --git a/test/unit_tests/queue_visitor_test.py b/test/unit_tests/queue_visitor_test.py index aec7c96..8485a54 100644 --- a/test/unit_tests/queue_visitor_test.py +++ b/test/unit_tests/queue_visitor_test.py @@ -4,7 +4,7 @@ import json import pytest from pyinfra.utils.encoding import pack_for_upload -from pyinfra.visitor import SingleDownloadStrategy +from pyinfra.visitor.strategies.download.single import SingleDownloadStrategy @pytest.fixture()