refactored visitor module: split into various submodules
This commit is contained in:
parent
4aef3316a3
commit
f7e4953a4e
@ -13,7 +13,8 @@ from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStre
|
||||
from pyinfra.server.packer.packers.rest import RestPacker
|
||||
from pyinfra.server.receiver.receivers.rest import RestReceiver
|
||||
from pyinfra.storage import storages
|
||||
from pyinfra.visitor import QueueVisitor, AggregationStorageStrategy
|
||||
from pyinfra.visitor import QueueVisitor
|
||||
from pyinfra.visitor.strategies.response.aggregation import AggregationStorageStrategy
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.ERROR)
|
||||
|
||||
@ -44,3 +44,7 @@ class NoBufferCapacity(ValueError):
|
||||
|
||||
class InvalidMessage(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class InvalidStorageItemFormat(ValueError):
|
||||
pass
|
||||
|
||||
@ -1,361 +0,0 @@
|
||||
import abc
|
||||
import gzip
|
||||
import json
|
||||
import logging
|
||||
from collections import deque
|
||||
from copy import deepcopy
|
||||
from operator import itemgetter
|
||||
from typing import Callable, Dict, Union
|
||||
|
||||
from funcy import omit, filter, lflatten
|
||||
from more_itertools import peekable
|
||||
|
||||
from pyinfra.config import CONFIG, parse_disjunction_string
|
||||
from pyinfra.exceptions import DataLoadingFailure, InvalidMessage
|
||||
from pyinfra.parser.parser_composer import EitherParserComposer
|
||||
from pyinfra.parser.parsers.identity import IdentityBlobParser
|
||||
from pyinfra.parser.parsers.json import JsonBlobParser
|
||||
from pyinfra.parser.parsers.string import StringBlobParser
|
||||
from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing
|
||||
from pyinfra.server.packing import string_to_bytes
|
||||
from pyinfra.storage.storage import Storage
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ResponseStrategy(abc.ABC):
|
||||
@abc.abstractmethod
|
||||
def handle_response(self, analysis_response: dict):
|
||||
pass
|
||||
|
||||
def __call__(self, analysis_response: dict):
|
||||
return self.handle_response(analysis_response)
|
||||
|
||||
def get_response_object_descriptor(self, body):
|
||||
return {
|
||||
"bucket_name": parse_disjunction_string(CONFIG.storage.bucket),
|
||||
"object_name": self.get_response_object_name(body),
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def get_response_object_name(body):
|
||||
|
||||
if "pages" not in body:
|
||||
body["pages"] = []
|
||||
|
||||
if "id" not in body:
|
||||
body["id"] = 0
|
||||
|
||||
dossier_id, file_id, pages, idnt = itemgetter("dossierId", "fileId", "pages", "id")(body)
|
||||
|
||||
object_name = f"{dossier_id}/{file_id}/id:{idnt}.{CONFIG.service.response_file_extension}"
|
||||
|
||||
return object_name
|
||||
|
||||
|
||||
class StorageStrategy(ResponseStrategy):
|
||||
def __init__(self, storage):
|
||||
self.storage = storage
|
||||
|
||||
def handle_response(self, body: dict):
|
||||
response_object_descriptor = self.get_response_object_descriptor(body)
|
||||
self.storage.put_object(**response_object_descriptor, data=gzip.compress(json.dumps(body).encode()))
|
||||
body.pop("data")
|
||||
body["responseFile"] = response_object_descriptor["object_name"]
|
||||
return body
|
||||
|
||||
|
||||
class ForwardingStrategy(ResponseStrategy):
|
||||
def handle_response(self, analysis_response):
|
||||
return analysis_response
|
||||
|
||||
|
||||
class DispatchCallback(abc.ABC):
|
||||
@abc.abstractmethod
|
||||
def __call__(self, payload):
|
||||
pass
|
||||
|
||||
|
||||
class IdentifierDispatchCallback(DispatchCallback):
|
||||
def __init__(self):
|
||||
self.identifier = None
|
||||
|
||||
def has_new_identifier(self, metadata):
|
||||
|
||||
identifier = ":".join(itemgetter("fileId", "dossierId")(metadata))
|
||||
|
||||
if not self.identifier:
|
||||
self.identifier = identifier
|
||||
|
||||
return identifier != self.identifier
|
||||
|
||||
def __call__(self, metadata):
|
||||
|
||||
return self.has_new_identifier(metadata)
|
||||
|
||||
|
||||
class AggregationStorageStrategy(ResponseStrategy):
|
||||
def __init__(self, storage, merger: Callable = None, dispatch_callback: DispatchCallback = None):
|
||||
self.storage = storage
|
||||
self.merger = merger or list
|
||||
self.dispatch_callback = dispatch_callback or IdentifierDispatchCallback()
|
||||
self.buffer = deque()
|
||||
|
||||
def put_object(self, data: bytes, storage_upload_info):
|
||||
object_descriptor = self.get_response_object_descriptor(storage_upload_info)
|
||||
self.storage.put_object(**object_descriptor, data=gzip.compress(data))
|
||||
return {**storage_upload_info, "responseFile": object_descriptor["object_name"]}
|
||||
|
||||
def merge_queue_items(self):
|
||||
merged_buffer_content = self.merger(self.buffer)
|
||||
self.buffer.clear()
|
||||
return merged_buffer_content
|
||||
|
||||
def upload_queue_items(self, storage_upload_info):
|
||||
data = json.dumps(self.merge_queue_items()).encode()
|
||||
return self.put_object(data, storage_upload_info)
|
||||
|
||||
def upload_or_aggregate(self, analysis_payload, request_metadata, last=False):
|
||||
"""analysis_payload : {data: ..., metadata: ...}"""
|
||||
|
||||
storage_upload_info = build_storage_upload_info(analysis_payload, request_metadata)
|
||||
analysis_payload["metadata"].pop("id")
|
||||
|
||||
if analysis_payload["data"]:
|
||||
return self.put_object(json.dumps(analysis_payload).encode(), storage_upload_info)
|
||||
|
||||
else:
|
||||
self.buffer.append(analysis_payload)
|
||||
if last or self.dispatch_callback(storage_upload_info):
|
||||
return self.upload_queue_items(storage_upload_info)
|
||||
else:
|
||||
return Nothing
|
||||
|
||||
def handle_response(self, analysis_response, final=False):
|
||||
def upload_or_aggregate(analysis_payload):
|
||||
return self.upload_or_aggregate(analysis_payload, request_metadata, last=not result_data.peek(False))
|
||||
|
||||
request_metadata = omit(analysis_response, ["data"])
|
||||
result_data = peekable(analysis_response["data"])
|
||||
|
||||
yield from filter(is_not_nothing, map(upload_or_aggregate, result_data))
|
||||
|
||||
|
||||
def build_storage_upload_info(analysis_payload, request_metadata):
|
||||
storage_upload_info = {**request_metadata, "id": analysis_payload["metadata"].get("id", 0)}
|
||||
storage_upload_info["fileId"] = build_file_path(
|
||||
storage_upload_info, storage_upload_info.get("operation", CONFIG.service.response_folder)
|
||||
)
|
||||
return storage_upload_info
|
||||
|
||||
|
||||
def build_file_path(storage_upload_info, folder):
|
||||
return f"{storage_upload_info['fileId']}" + (f"/{folder}" if folder else "")
|
||||
|
||||
|
||||
class InvalidStorageItemFormat(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class ParsingStrategy(abc.ABC):
|
||||
@abc.abstractmethod
|
||||
def parse(self, data: bytes):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def parse_and_wrap(self, data: bytes):
|
||||
pass
|
||||
|
||||
def __call__(self, data: bytes):
|
||||
return self.parse_and_wrap(data)
|
||||
|
||||
|
||||
# TODO: Each analysis service should specify a custom parsing strategy for the type of data it expects to be found
|
||||
# on the storage. This class is only a temporary trial-and-error->fallback type of solution.
|
||||
class DynamicParsingStrategy(ParsingStrategy):
|
||||
def __init__(self):
|
||||
self.parser = EitherParserComposer(JsonBlobParser(), StringBlobParser(), IdentityBlobParser())
|
||||
|
||||
def parse(self, data: bytes) -> Union[bytes, dict]:
|
||||
return self.parser(data)
|
||||
|
||||
def parse_and_wrap(self, data):
|
||||
return self.parse(data)
|
||||
|
||||
|
||||
def validate(data):
|
||||
if not ("data" in data and "metadata" in data):
|
||||
raise InvalidStorageItemFormat(f"Expected a mapping with keys 'data' and 'metadata', got {data}.")
|
||||
|
||||
|
||||
def wrap(data):
|
||||
return {"data": data, "metadata": {}}
|
||||
|
||||
|
||||
class QueueVisitor:
|
||||
def __init__(
|
||||
self,
|
||||
storage: Storage,
|
||||
callback: Callable,
|
||||
response_strategy: ResponseStrategy,
|
||||
parsing_strategy: ParsingStrategy = None,
|
||||
download_strategy=None,
|
||||
):
|
||||
self.storage = storage
|
||||
self.callback = callback
|
||||
self.download_strategy = download_strategy or get_download_strategy()
|
||||
self.response_strategy = response_strategy
|
||||
self.parsing_strategy = parsing_strategy or DynamicParsingStrategy()
|
||||
|
||||
def load_data(self, queue_item_body):
|
||||
data = self.download_strategy(self.storage, queue_item_body)
|
||||
data = map(self.parsing_strategy, data)
|
||||
data = map(standardize, data)
|
||||
return data
|
||||
|
||||
def process_storage_item(self, data_metadata_pack):
|
||||
return self.callback(data_metadata_pack)
|
||||
|
||||
def load_item_from_storage_and_process_with_callback(self, queue_item_body):
|
||||
"""Bundles the result from processing a storage item with the body of the corresponding queue item."""
|
||||
|
||||
def process_storage_item(storage_item):
|
||||
analysis_input = {**storage_item, **queue_item_body}
|
||||
return self.process_storage_item(analysis_input)
|
||||
|
||||
storage_items = self.load_data(queue_item_body)
|
||||
results = lflatten(map(process_storage_item, storage_items))
|
||||
return {"data": results, **queue_item_body}
|
||||
|
||||
def __call__(self, queue_item_body):
|
||||
analysis_result_body = self.load_item_from_storage_and_process_with_callback(queue_item_body)
|
||||
return self.response_strategy(analysis_result_body)
|
||||
|
||||
|
||||
def standardize(data) -> Dict:
|
||||
"""Storage items can be a blob or a blob with metadata. Standardizes to the latter.
|
||||
|
||||
Cases:
|
||||
1) backend upload: data as bytes
|
||||
2) Some Python service's upload: data as bytes of a json string "{'data': <str>, 'metadata': <dict>}",
|
||||
where value of key 'data' was encoded with bytes_to_string(...)
|
||||
|
||||
Returns:
|
||||
{"data": bytes, "metadata": dict}
|
||||
"""
|
||||
|
||||
def is_blob_without_metadata(data):
|
||||
return isinstance(data, bytes)
|
||||
|
||||
def is_blob_with_metadata(data: Dict):
|
||||
return isinstance(data, dict)
|
||||
|
||||
if is_blob_without_metadata(data):
|
||||
return wrap(data)
|
||||
|
||||
elif is_blob_with_metadata(data):
|
||||
validate(data)
|
||||
return data
|
||||
|
||||
else: # Fallback / used for testing with simple data
|
||||
logger.warning("Encountered storage data in unexpected format.")
|
||||
assert isinstance(data, str)
|
||||
return wrap(string_to_bytes(data))
|
||||
|
||||
|
||||
def get_download_strategy(download_strategy_type=None):
|
||||
download_strategies = {
|
||||
"single": SingleDownloadStrategy(),
|
||||
"multi": MultiDownloadStrategy(),
|
||||
}
|
||||
return download_strategies.get(download_strategy_type or CONFIG.service.download_strategy, SingleDownloadStrategy())
|
||||
|
||||
|
||||
class DownloadStrategy(abc.ABC):
|
||||
def _load_data(self, storage, queue_item_body):
|
||||
object_descriptor = self.get_object_descriptor(queue_item_body)
|
||||
logging.debug(f"Downloading {object_descriptor}...")
|
||||
data = self.__download(storage, object_descriptor)
|
||||
logging.debug(f"Downloaded {object_descriptor}.")
|
||||
assert isinstance(data, bytes)
|
||||
data = gzip.decompress(data)
|
||||
return [data]
|
||||
|
||||
@staticmethod
|
||||
def __download(storage, object_descriptor):
|
||||
try:
|
||||
data = storage.get_object(**object_descriptor)
|
||||
except Exception as err:
|
||||
logging.warning(f"Loading data from storage failed for {object_descriptor}.")
|
||||
raise DataLoadingFailure from err
|
||||
|
||||
return data
|
||||
|
||||
@staticmethod
|
||||
@abc.abstractmethod
|
||||
def get_object_name(body: dict):
|
||||
raise NotImplementedError
|
||||
|
||||
def get_object_descriptor(self, body):
|
||||
return {"bucket_name": parse_disjunction_string(CONFIG.storage.bucket), "object_name": self.get_object_name(body)}
|
||||
|
||||
|
||||
class SingleDownloadStrategy(DownloadStrategy):
|
||||
def download(self, storage, queue_item_body):
|
||||
return self._load_data(storage, queue_item_body)
|
||||
|
||||
@staticmethod
|
||||
def get_object_name(body: dict):
|
||||
|
||||
# TODO: deepcopy still necessary?
|
||||
body = deepcopy(body)
|
||||
|
||||
dossier_id, file_id = itemgetter("dossierId", "fileId")(body)
|
||||
|
||||
object_name = f"{dossier_id}/{file_id}.{CONFIG.service.target_file_extension}"
|
||||
|
||||
return object_name
|
||||
|
||||
def __call__(self, storage, queue_item_body):
|
||||
return self.download(storage, queue_item_body)
|
||||
|
||||
|
||||
class MultiDownloadStrategy(DownloadStrategy):
|
||||
def __init__(self):
|
||||
# TODO: pass in bucket name from outside / introduce closure-like abstraction for the bucket
|
||||
self.bucket_name = parse_disjunction_string(CONFIG.storage.bucket)
|
||||
|
||||
def download(self, storage: Storage, queue_item_body):
|
||||
pages = "|".join(map(str, queue_item_body["pages"]))
|
||||
matches_page = r".*id:(" + pages + r").*"
|
||||
|
||||
object_names = storage.get_all_object_names(self.bucket_name)
|
||||
object_names = filter(matches_page, object_names)
|
||||
objects = (storage.get_object(self.bucket_name, objn) for objn in object_names)
|
||||
objects = map(gzip.decompress, objects)
|
||||
|
||||
return objects
|
||||
|
||||
@staticmethod
|
||||
def get_object_name(body: dict):
|
||||
|
||||
def get_key(key):
|
||||
return key if key in body else False
|
||||
|
||||
# TODO: deepcopy still necessary?
|
||||
body = deepcopy(body)
|
||||
|
||||
folder = f"/{get_key('pages') or get_key('images')}/"
|
||||
if not folder:
|
||||
raise InvalidMessage("Expected a folder like 'images' oder 'pages' to be specified in message.")
|
||||
|
||||
idnt = f"id:{body.get('id', 0)}"
|
||||
|
||||
dossier_id, file_id = itemgetter("dossierId", "fileId")(body)
|
||||
|
||||
object_name = f"{dossier_id}/{file_id}{folder}{idnt}.{CONFIG.service.target_file_extension}"
|
||||
|
||||
return object_name
|
||||
|
||||
def __call__(self, storage, queue_item_body):
|
||||
return self.download(storage, queue_item_body)
|
||||
1
pyinfra/visitor/__init__.py
Normal file
1
pyinfra/visitor/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
from .visitor import QueueVisitor
|
||||
0
pyinfra/visitor/dispatch/__init__.py
Normal file
0
pyinfra/visitor/dispatch/__init__.py
Normal file
7
pyinfra/visitor/dispatch/dispatch.py
Normal file
7
pyinfra/visitor/dispatch/dispatch.py
Normal file
@ -0,0 +1,7 @@
|
||||
import abc
|
||||
|
||||
|
||||
class DispatchCallback(abc.ABC):
|
||||
@abc.abstractmethod
|
||||
def __call__(self, payload):
|
||||
pass
|
||||
21
pyinfra/visitor/dispatch/identifier_dispatch.py
Normal file
21
pyinfra/visitor/dispatch/identifier_dispatch.py
Normal file
@ -0,0 +1,21 @@
|
||||
from _operator import itemgetter
|
||||
|
||||
from pyinfra.visitor.dispatch.dispatch import DispatchCallback
|
||||
|
||||
|
||||
class IdentifierDispatchCallback(DispatchCallback):
|
||||
def __init__(self):
|
||||
self.identifier = None
|
||||
|
||||
def has_new_identifier(self, metadata):
|
||||
|
||||
identifier = ":".join(itemgetter("fileId", "dossierId")(metadata))
|
||||
|
||||
if not self.identifier:
|
||||
self.identifier = identifier
|
||||
|
||||
return identifier != self.identifier
|
||||
|
||||
def __call__(self, metadata):
|
||||
|
||||
return self.has_new_identifier(metadata)
|
||||
0
pyinfra/visitor/strategies/__init__.py
Normal file
0
pyinfra/visitor/strategies/__init__.py
Normal file
0
pyinfra/visitor/strategies/download/__init__.py
Normal file
0
pyinfra/visitor/strategies/download/__init__.py
Normal file
38
pyinfra/visitor/strategies/download/download.py
Normal file
38
pyinfra/visitor/strategies/download/download.py
Normal file
@ -0,0 +1,38 @@
|
||||
import abc
|
||||
import gzip
|
||||
import logging
|
||||
|
||||
from pyinfra.config import parse_disjunction_string, CONFIG
|
||||
from pyinfra.exceptions import DataLoadingFailure
|
||||
|
||||
|
||||
class DownloadStrategy(abc.ABC):
|
||||
def _load_data(self, storage, queue_item_body):
|
||||
object_descriptor = self.get_object_descriptor(queue_item_body)
|
||||
logging.debug(f"Downloading {object_descriptor}...")
|
||||
data = self.__download(storage, object_descriptor)
|
||||
logging.debug(f"Downloaded {object_descriptor}.")
|
||||
assert isinstance(data, bytes)
|
||||
data = gzip.decompress(data)
|
||||
return [data]
|
||||
|
||||
@staticmethod
|
||||
def __download(storage, object_descriptor):
|
||||
try:
|
||||
data = storage.get_object(**object_descriptor)
|
||||
except Exception as err:
|
||||
logging.warning(f"Loading data from storage failed for {object_descriptor}.")
|
||||
raise DataLoadingFailure from err
|
||||
|
||||
return data
|
||||
|
||||
@staticmethod
|
||||
@abc.abstractmethod
|
||||
def get_object_name(body: dict):
|
||||
raise NotImplementedError
|
||||
|
||||
def get_object_descriptor(self, body):
|
||||
return {
|
||||
"bucket_name": parse_disjunction_string(CONFIG.storage.bucket),
|
||||
"object_name": self.get_object_name(body),
|
||||
}
|
||||
50
pyinfra/visitor/strategies/download/multi.py
Normal file
50
pyinfra/visitor/strategies/download/multi.py
Normal file
@ -0,0 +1,50 @@
|
||||
import gzip
|
||||
from _operator import itemgetter
|
||||
from copy import deepcopy
|
||||
|
||||
from funcy import filter
|
||||
|
||||
from pyinfra.config import parse_disjunction_string, CONFIG
|
||||
from pyinfra.exceptions import InvalidMessage
|
||||
from pyinfra.storage.storage import Storage
|
||||
from pyinfra.visitor.strategies.download.download import DownloadStrategy
|
||||
|
||||
|
||||
class MultiDownloadStrategy(DownloadStrategy):
|
||||
def __init__(self):
|
||||
# TODO: pass in bucket name from outside / introduce closure-like abstraction for the bucket
|
||||
self.bucket_name = parse_disjunction_string(CONFIG.storage.bucket)
|
||||
|
||||
def download(self, storage: Storage, queue_item_body):
|
||||
pages = "|".join(map(str, queue_item_body["pages"]))
|
||||
matches_page = r".*id:(" + pages + r").*"
|
||||
|
||||
object_names = storage.get_all_object_names(self.bucket_name)
|
||||
object_names = filter(matches_page, object_names)
|
||||
objects = (storage.get_object(self.bucket_name, objn) for objn in object_names)
|
||||
objects = map(gzip.decompress, objects)
|
||||
|
||||
return objects
|
||||
|
||||
@staticmethod
|
||||
def get_object_name(body: dict):
|
||||
def get_key(key):
|
||||
return key if key in body else False
|
||||
|
||||
# TODO: deepcopy still necessary?
|
||||
body = deepcopy(body)
|
||||
|
||||
folder = f"/{get_key('pages') or get_key('images')}/"
|
||||
if not folder:
|
||||
raise InvalidMessage("Expected a folder like 'images' oder 'pages' to be specified in message.")
|
||||
|
||||
idnt = f"id:{body.get('id', 0)}"
|
||||
|
||||
dossier_id, file_id = itemgetter("dossierId", "fileId")(body)
|
||||
|
||||
object_name = f"{dossier_id}/{file_id}{folder}{idnt}.{CONFIG.service.target_file_extension}"
|
||||
|
||||
return object_name
|
||||
|
||||
def __call__(self, storage, queue_item_body):
|
||||
return self.download(storage, queue_item_body)
|
||||
25
pyinfra/visitor/strategies/download/single.py
Normal file
25
pyinfra/visitor/strategies/download/single.py
Normal file
@ -0,0 +1,25 @@
|
||||
from _operator import itemgetter
|
||||
from copy import deepcopy
|
||||
|
||||
from pyinfra.config import CONFIG
|
||||
from pyinfra.visitor.strategies.download.download import DownloadStrategy
|
||||
|
||||
|
||||
class SingleDownloadStrategy(DownloadStrategy):
|
||||
def download(self, storage, queue_item_body):
|
||||
return self._load_data(storage, queue_item_body)
|
||||
|
||||
@staticmethod
|
||||
def get_object_name(body: dict):
|
||||
|
||||
# TODO: deepcopy still necessary?
|
||||
body = deepcopy(body)
|
||||
|
||||
dossier_id, file_id = itemgetter("dossierId", "fileId")(body)
|
||||
|
||||
object_name = f"{dossier_id}/{file_id}.{CONFIG.service.target_file_extension}"
|
||||
|
||||
return object_name
|
||||
|
||||
def __call__(self, storage, queue_item_body):
|
||||
return self.download(storage, queue_item_body)
|
||||
0
pyinfra/visitor/strategies/parsing/__init__.py
Normal file
0
pyinfra/visitor/strategies/parsing/__init__.py
Normal file
20
pyinfra/visitor/strategies/parsing/dynamic.py
Normal file
20
pyinfra/visitor/strategies/parsing/dynamic.py
Normal file
@ -0,0 +1,20 @@
|
||||
from typing import Union
|
||||
|
||||
from pyinfra.parser.parser_composer import EitherParserComposer
|
||||
from pyinfra.parser.parsers.identity import IdentityBlobParser
|
||||
from pyinfra.parser.parsers.json import JsonBlobParser
|
||||
from pyinfra.parser.parsers.string import StringBlobParser
|
||||
from pyinfra.visitor.strategies.parsing.parsing import ParsingStrategy
|
||||
|
||||
|
||||
# TODO: Each analysis service should specify a custom parsing strategy for the type of data it expects to be found
|
||||
# on the storage. This class is only a temporary trial-and-error->fallback type of solution.
|
||||
class DynamicParsingStrategy(ParsingStrategy):
|
||||
def __init__(self):
|
||||
self.parser = EitherParserComposer(JsonBlobParser(), StringBlobParser(), IdentityBlobParser())
|
||||
|
||||
def parse(self, data: bytes) -> Union[bytes, dict]:
|
||||
return self.parser(data)
|
||||
|
||||
def parse_and_wrap(self, data):
|
||||
return self.parse(data)
|
||||
14
pyinfra/visitor/strategies/parsing/parsing.py
Normal file
14
pyinfra/visitor/strategies/parsing/parsing.py
Normal file
@ -0,0 +1,14 @@
|
||||
import abc
|
||||
|
||||
|
||||
class ParsingStrategy(abc.ABC):
|
||||
@abc.abstractmethod
|
||||
def parse(self, data: bytes):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def parse_and_wrap(self, data: bytes):
|
||||
pass
|
||||
|
||||
def __call__(self, data: bytes):
|
||||
return self.parse_and_wrap(data)
|
||||
0
pyinfra/visitor/strategies/response/__init__.py
Normal file
0
pyinfra/visitor/strategies/response/__init__.py
Normal file
60
pyinfra/visitor/strategies/response/aggregation.py
Normal file
60
pyinfra/visitor/strategies/response/aggregation.py
Normal file
@ -0,0 +1,60 @@
|
||||
import gzip
|
||||
import json
|
||||
from collections import deque
|
||||
from typing import Callable
|
||||
|
||||
from funcy import omit, filter
|
||||
from more_itertools import peekable
|
||||
|
||||
from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing
|
||||
from pyinfra.visitor.dispatch.dispatch import DispatchCallback
|
||||
from pyinfra.visitor.dispatch.identifier_dispatch import IdentifierDispatchCallback
|
||||
from pyinfra.visitor.strategies.response.response import ResponseStrategy
|
||||
from pyinfra.visitor.utils import build_storage_upload_info
|
||||
|
||||
|
||||
class AggregationStorageStrategy(ResponseStrategy):
|
||||
def __init__(self, storage, merger: Callable = None, dispatch_callback: DispatchCallback = None):
|
||||
self.storage = storage
|
||||
self.merger = merger or list
|
||||
self.dispatch_callback = dispatch_callback or IdentifierDispatchCallback()
|
||||
self.buffer = deque()
|
||||
|
||||
def put_object(self, data: bytes, storage_upload_info):
|
||||
object_descriptor = self.get_response_object_descriptor(storage_upload_info)
|
||||
self.storage.put_object(**object_descriptor, data=gzip.compress(data))
|
||||
return {**storage_upload_info, "responseFile": object_descriptor["object_name"]}
|
||||
|
||||
def merge_queue_items(self):
|
||||
merged_buffer_content = self.merger(self.buffer)
|
||||
self.buffer.clear()
|
||||
return merged_buffer_content
|
||||
|
||||
def upload_queue_items(self, storage_upload_info):
|
||||
data = json.dumps(self.merge_queue_items()).encode()
|
||||
return self.put_object(data, storage_upload_info)
|
||||
|
||||
def upload_or_aggregate(self, analysis_payload, request_metadata, last=False):
|
||||
"""analysis_payload : {data: ..., metadata: ...}"""
|
||||
|
||||
storage_upload_info = build_storage_upload_info(analysis_payload, request_metadata)
|
||||
analysis_payload["metadata"].pop("id")
|
||||
|
||||
if analysis_payload["data"]:
|
||||
return self.put_object(json.dumps(analysis_payload).encode(), storage_upload_info)
|
||||
|
||||
else:
|
||||
self.buffer.append(analysis_payload)
|
||||
if last or self.dispatch_callback(storage_upload_info):
|
||||
return self.upload_queue_items(storage_upload_info)
|
||||
else:
|
||||
return Nothing
|
||||
|
||||
def handle_response(self, analysis_response, final=False):
|
||||
def upload_or_aggregate(analysis_payload):
|
||||
return self.upload_or_aggregate(analysis_payload, request_metadata, last=not result_data.peek(False))
|
||||
|
||||
request_metadata = omit(analysis_response, ["data"])
|
||||
result_data = peekable(analysis_response["data"])
|
||||
|
||||
yield from filter(is_not_nothing, map(upload_or_aggregate, result_data))
|
||||
6
pyinfra/visitor/strategies/response/forwarding.py
Normal file
6
pyinfra/visitor/strategies/response/forwarding.py
Normal file
@ -0,0 +1,6 @@
|
||||
from pyinfra.visitor.strategies.response.response import ResponseStrategy
|
||||
|
||||
|
||||
class ForwardingStrategy(ResponseStrategy):
|
||||
def handle_response(self, analysis_response):
|
||||
return analysis_response
|
||||
34
pyinfra/visitor/strategies/response/response.py
Normal file
34
pyinfra/visitor/strategies/response/response.py
Normal file
@ -0,0 +1,34 @@
|
||||
import abc
|
||||
from _operator import itemgetter
|
||||
|
||||
from pyinfra.config import parse_disjunction_string, CONFIG
|
||||
|
||||
|
||||
class ResponseStrategy(abc.ABC):
|
||||
@abc.abstractmethod
|
||||
def handle_response(self, analysis_response: dict):
|
||||
pass
|
||||
|
||||
def __call__(self, analysis_response: dict):
|
||||
return self.handle_response(analysis_response)
|
||||
|
||||
def get_response_object_descriptor(self, body):
|
||||
return {
|
||||
"bucket_name": parse_disjunction_string(CONFIG.storage.bucket),
|
||||
"object_name": self.get_response_object_name(body),
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def get_response_object_name(body):
|
||||
|
||||
if "pages" not in body:
|
||||
body["pages"] = []
|
||||
|
||||
if "id" not in body:
|
||||
body["id"] = 0
|
||||
|
||||
dossier_id, file_id, pages, idnt = itemgetter("dossierId", "fileId", "pages", "id")(body)
|
||||
|
||||
object_name = f"{dossier_id}/{file_id}/id:{idnt}.{CONFIG.service.response_file_extension}"
|
||||
|
||||
return object_name
|
||||
16
pyinfra/visitor/strategies/response/storage.py
Normal file
16
pyinfra/visitor/strategies/response/storage.py
Normal file
@ -0,0 +1,16 @@
|
||||
import gzip
|
||||
import json
|
||||
|
||||
from pyinfra.visitor.strategies.response.response import ResponseStrategy
|
||||
|
||||
|
||||
class StorageStrategy(ResponseStrategy):
|
||||
def __init__(self, storage):
|
||||
self.storage = storage
|
||||
|
||||
def handle_response(self, body: dict):
|
||||
response_object_descriptor = self.get_response_object_descriptor(body)
|
||||
self.storage.put_object(**response_object_descriptor, data=gzip.compress(json.dumps(body).encode()))
|
||||
body.pop("data")
|
||||
body["responseFile"] = response_object_descriptor["object_name"]
|
||||
return body
|
||||
70
pyinfra/visitor/utils.py
Normal file
70
pyinfra/visitor/utils.py
Normal file
@ -0,0 +1,70 @@
|
||||
import logging
|
||||
from typing import Dict
|
||||
|
||||
from pyinfra.config import CONFIG
|
||||
from pyinfra.exceptions import InvalidStorageItemFormat
|
||||
from pyinfra.server.packing import string_to_bytes
|
||||
from pyinfra.visitor.strategies.download.multi import MultiDownloadStrategy
|
||||
from pyinfra.visitor.strategies.download.single import SingleDownloadStrategy
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
|
||||
def build_storage_upload_info(analysis_payload, request_metadata):
|
||||
storage_upload_info = {**request_metadata, "id": analysis_payload["metadata"].get("id", 0)}
|
||||
storage_upload_info["fileId"] = build_file_path(
|
||||
storage_upload_info, storage_upload_info.get("operation", CONFIG.service.response_folder)
|
||||
)
|
||||
return storage_upload_info
|
||||
|
||||
|
||||
def build_file_path(storage_upload_info, folder):
|
||||
return f"{storage_upload_info['fileId']}" + (f"/{folder}" if folder else "")
|
||||
|
||||
|
||||
def validate(data):
|
||||
if not ("data" in data and "metadata" in data):
|
||||
raise InvalidStorageItemFormat(f"Expected a mapping with keys 'data' and 'metadata', got {data}.")
|
||||
|
||||
|
||||
def wrap(data):
|
||||
return {"data": data, "metadata": {}}
|
||||
|
||||
|
||||
def standardize(data) -> Dict:
|
||||
"""Storage items can be a blob or a blob with metadata. Standardizes to the latter.
|
||||
|
||||
Cases:
|
||||
1) backend upload: data as bytes
|
||||
2) Some Python service's upload: data as bytes of a json string "{'data': <str>, 'metadata': <dict>}",
|
||||
where value of key 'data' was encoded with bytes_to_string(...)
|
||||
|
||||
Returns:
|
||||
{"data": bytes, "metadata": dict}
|
||||
"""
|
||||
|
||||
def is_blob_without_metadata(data):
|
||||
return isinstance(data, bytes)
|
||||
|
||||
def is_blob_with_metadata(data: Dict):
|
||||
return isinstance(data, dict)
|
||||
|
||||
if is_blob_without_metadata(data):
|
||||
return wrap(data)
|
||||
|
||||
elif is_blob_with_metadata(data):
|
||||
validate(data)
|
||||
return data
|
||||
|
||||
else: # Fallback / used for testing with simple data
|
||||
logger.warning("Encountered storage data in unexpected format.")
|
||||
assert isinstance(data, str)
|
||||
return wrap(string_to_bytes(data))
|
||||
|
||||
|
||||
def get_download_strategy(download_strategy_type=None):
|
||||
download_strategies = {
|
||||
"single": SingleDownloadStrategy(),
|
||||
"multi": MultiDownloadStrategy(),
|
||||
}
|
||||
return download_strategies.get(download_strategy_type or CONFIG.service.download_strategy, SingleDownloadStrategy())
|
||||
50
pyinfra/visitor/visitor.py
Normal file
50
pyinfra/visitor/visitor.py
Normal file
@ -0,0 +1,50 @@
|
||||
from typing import Callable
|
||||
|
||||
from funcy import lflatten
|
||||
|
||||
from pyinfra.storage.storage import Storage
|
||||
from pyinfra.visitor.strategies.download.download import DownloadStrategy
|
||||
from pyinfra.visitor.strategies.parsing.dynamic import DynamicParsingStrategy
|
||||
from pyinfra.visitor.strategies.parsing.parsing import ParsingStrategy
|
||||
from pyinfra.visitor.strategies.response.response import ResponseStrategy
|
||||
from pyinfra.visitor.utils import standardize, get_download_strategy
|
||||
|
||||
|
||||
class QueueVisitor:
|
||||
def __init__(
|
||||
self,
|
||||
storage: Storage,
|
||||
callback: Callable,
|
||||
response_strategy: ResponseStrategy,
|
||||
parsing_strategy: ParsingStrategy = None,
|
||||
download_strategy: DownloadStrategy = None,
|
||||
):
|
||||
self.storage = storage
|
||||
self.callback = callback
|
||||
self.response_strategy = response_strategy
|
||||
self.parsing_strategy = parsing_strategy or DynamicParsingStrategy()
|
||||
self.download_strategy = download_strategy or get_download_strategy()
|
||||
|
||||
def load_data(self, queue_item_body):
|
||||
data = self.download_strategy(self.storage, queue_item_body)
|
||||
data = map(self.parsing_strategy, data)
|
||||
data = map(standardize, data)
|
||||
return data
|
||||
|
||||
def process_storage_item(self, data_metadata_pack):
|
||||
return self.callback(data_metadata_pack)
|
||||
|
||||
def load_item_from_storage_and_process_with_callback(self, queue_item_body):
|
||||
"""Bundles the result from processing a storage item with the body of the corresponding queue item."""
|
||||
|
||||
def process_storage_item(storage_item):
|
||||
analysis_input = {**storage_item, **queue_item_body}
|
||||
return self.process_storage_item(analysis_input)
|
||||
|
||||
storage_items = self.load_data(queue_item_body)
|
||||
results = lflatten(map(process_storage_item, storage_items))
|
||||
return {"data": results, **queue_item_body}
|
||||
|
||||
def __call__(self, queue_item_body):
|
||||
analysis_result_body = self.load_item_from_storage_and_process_with_callback(queue_item_body)
|
||||
return self.response_strategy(analysis_result_body)
|
||||
@ -21,7 +21,9 @@ from pyinfra.storage.adapters.s3 import S3StorageAdapter
|
||||
from pyinfra.storage.clients.azure import get_azure_client
|
||||
from pyinfra.storage.clients.s3 import get_s3_client
|
||||
from pyinfra.storage.storage import Storage
|
||||
from pyinfra.visitor import StorageStrategy, ForwardingStrategy, QueueVisitor
|
||||
from pyinfra.visitor import QueueVisitor
|
||||
from pyinfra.visitor.strategies.response.forwarding import ForwardingStrategy
|
||||
from pyinfra.visitor.strategies.response.storage import StorageStrategy
|
||||
from test.config import CONFIG
|
||||
from test.queue.queue_manager_mock import QueueManagerMock
|
||||
from test.storage.adapter_mock import StorageAdapterMock
|
||||
|
||||
@ -16,7 +16,8 @@ from pyinfra.default_objects import (
|
||||
)
|
||||
from pyinfra.queue.consumer import Consumer
|
||||
from pyinfra.server.packing import unpack, pack
|
||||
from pyinfra.visitor import QueueVisitor, get_download_strategy
|
||||
from pyinfra.visitor import QueueVisitor
|
||||
from pyinfra.visitor.utils import get_download_strategy
|
||||
from test.utils.input import pair_data_with_queue_message
|
||||
|
||||
|
||||
|
||||
@ -4,7 +4,7 @@ import json
|
||||
import pytest
|
||||
|
||||
from pyinfra.utils.encoding import pack_for_upload
|
||||
from pyinfra.visitor import SingleDownloadStrategy
|
||||
from pyinfra.visitor.strategies.download.single import SingleDownloadStrategy
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user