pyinfra/pyinfra/visitor.py

363 lines
12 KiB
Python

import abc
import gzip
import hashlib
import json
import logging
from collections import deque
from copy import deepcopy
from operator import itemgetter
from typing import Callable, Dict, Union
from funcy import omit, filter, lflatten
from more_itertools import peekable
from pyinfra.config import CONFIG, parse_disjunction_string
from pyinfra.exceptions import DataLoadingFailure, InvalidMessage
from pyinfra.parser.parser_composer import EitherParserComposer
from pyinfra.parser.parsers.identity import IdentityBlobParser
from pyinfra.parser.parsers.json import JsonBlobParser
from pyinfra.parser.parsers.string import StringBlobParser
from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing
from pyinfra.server.packing import string_to_bytes
from pyinfra.storage.storage import Storage
logger = logging.getLogger(__name__)
class ResponseStrategy(abc.ABC):
@abc.abstractmethod
def handle_response(self, analysis_response: dict):
pass
def __call__(self, analysis_response: dict):
return self.handle_response(analysis_response)
def get_response_object_descriptor(self, body):
return {
"bucket_name": parse_disjunction_string(CONFIG.storage.bucket),
"object_name": self.get_response_object_name(body),
}
@staticmethod
def get_response_object_name(body):
if "pages" not in body:
body["pages"] = []
if "id" not in body:
body["id"] = 0
dossier_id, file_id, pages, idnt = itemgetter("dossierId", "fileId", "pages", "id")(body)
object_name = f"{dossier_id}/{file_id}/id:{idnt}.{CONFIG.service.response_file_extension}"
return object_name
class StorageStrategy(ResponseStrategy):
def __init__(self, storage):
self.storage = storage
def handle_response(self, body):
response_object_descriptor = self.get_response_object_descriptor(body)
self.storage.put_object(**response_object_descriptor, data=gzip.compress(json.dumps(body).encode()))
body.pop("data")
body["responseFile"] = response_object_descriptor["object_name"]
return body
class ForwardingStrategy(ResponseStrategy):
def handle_response(self, analysis_response):
return analysis_response
class DispatchCallback(abc.ABC):
@abc.abstractmethod
def __call__(self, payload):
pass
class IdentifierDispatchCallback(DispatchCallback):
def __init__(self):
self.identifier = None
def has_new_identifier(self, metadata):
identifier = ":".join(itemgetter("fileId", "dossierId")(metadata))
if not self.identifier:
self.identifier = identifier
return identifier != self.identifier
def __call__(self, metadata):
return self.has_new_identifier(metadata)
class AggregationStorageStrategy(ResponseStrategy):
def __init__(self, storage, merger: Callable = None, dispatch_callback: DispatchCallback = None):
self.storage = storage
self.merger = merger or list
self.dispatch_callback = dispatch_callback or IdentifierDispatchCallback()
self.buffer = deque()
def put_object(self, data: bytes, storage_upload_info):
object_descriptor = self.get_response_object_descriptor(storage_upload_info)
self.storage.put_object(**object_descriptor, data=gzip.compress(data))
return {**storage_upload_info, "responseFile": object_descriptor["object_name"]}
def merge_queue_items(self):
merged_buffer_content = self.merger(self.buffer)
self.buffer.clear()
return merged_buffer_content
def upload_queue_items(self, storage_upload_info):
data = json.dumps(self.merge_queue_items()).encode()
return self.put_object(data, storage_upload_info)
def upload_or_aggregate(self, analysis_payload, request_metadata, last=False):
"""analysis_payload : {data: ..., metadata: ...}"""
storage_upload_info = build_storage_upload_info(analysis_payload, request_metadata)
analysis_payload["metadata"].pop("id")
if analysis_payload["data"]:
return self.put_object(json.dumps(analysis_payload).encode(), storage_upload_info)
else:
self.buffer.append(analysis_payload)
if last or self.dispatch_callback(storage_upload_info):
return self.upload_queue_items(storage_upload_info)
else:
return Nothing
def handle_response(self, analysis_response, final=False):
def upload_or_aggregate(analysis_payload):
return self.upload_or_aggregate(analysis_payload, request_metadata, last=not result_data.peek(False))
request_metadata = omit(analysis_response, ["data"])
result_data = peekable(analysis_response["data"])
yield from filter(is_not_nothing, map(upload_or_aggregate, result_data))
def build_storage_upload_info(analysis_payload, request_metadata):
storage_upload_info = {**request_metadata, "id": analysis_payload["metadata"].get("id", 0)}
storage_upload_info["fileId"] = build_file_path(
storage_upload_info, storage_upload_info.get("operation", CONFIG.service.response_folder)
)
return storage_upload_info
def build_file_path(storage_upload_info, folder):
return f"{storage_upload_info['fileId']}" + (f"/{folder}" if folder else "")
class InvalidStorageItemFormat(ValueError):
pass
class ParsingStrategy(abc.ABC):
@abc.abstractmethod
def parse(self, data: bytes):
pass
@abc.abstractmethod
def parse_and_wrap(self, data: bytes):
pass
def __call__(self, data: bytes):
return self.parse_and_wrap(data)
# TODO: Each analysis service should specify a custom parsing strategy for the type of data it expects to be found
# on the storage. This class is only a temporary trial-and-error->fallback type of solution.
class DynamicParsingStrategy(ParsingStrategy):
def __init__(self):
self.parser = EitherParserComposer(JsonBlobParser(), StringBlobParser(), IdentityBlobParser())
def parse(self, data: bytes) -> Union[bytes, dict]:
return self.parser(data)
def parse_and_wrap(self, data):
return self.parse(data)
def validate(data):
if not ("data" in data and "metadata" in data):
raise InvalidStorageItemFormat(f"Expected a mapping with keys 'data' and 'metadata', got {data}.")
def wrap(data):
return {"data": data, "metadata": {}}
class QueueVisitor:
def __init__(
self,
storage: Storage,
callback: Callable,
response_strategy: ResponseStrategy,
parsing_strategy: ParsingStrategy = None,
download_strategy=None,
):
self.storage = storage
self.callback = callback
self.download_strategy = download_strategy or get_download_strategy()
self.response_strategy = response_strategy
self.parsing_strategy = parsing_strategy or DynamicParsingStrategy()
def load_data(self, queue_item_body):
data = self.download_strategy(self.storage, queue_item_body)
data = map(self.parsing_strategy, data)
data = map(standardize, data)
return data
def process_storage_item(self, data_metadata_pack):
return self.callback(data_metadata_pack)
def load_item_from_storage_and_process_with_callback(self, queue_item_body):
"""Bundles the result from processing a storage item with the body of the corresponding queue item."""
def process_storage_item(storage_item):
analysis_input = {**storage_item, **queue_item_body}
return self.process_storage_item(analysis_input)
storage_items = self.load_data(queue_item_body)
results = lflatten(map(process_storage_item, storage_items))
return {"data": results, **queue_item_body}
def __call__(self, queue_item_body):
analysis_result_body = self.load_item_from_storage_and_process_with_callback(queue_item_body)
return self.response_strategy(analysis_result_body)
def standardize(data) -> Dict:
"""Storage items can be a blob or a blob with metadata. Standardizes to the latter.
Cases:
1) backend upload: data as bytes
2) Some Python service's upload: data as bytes of a json string "{'data': <str>, 'metadata': <dict>}",
where value of key 'data' was encoded with bytes_to_string(...)
Returns:
{"data": bytes, "metadata": dict}
"""
def is_blob_without_metadata(data):
return isinstance(data, bytes)
def is_blob_with_metadata(data: Dict):
return isinstance(data, dict)
if is_blob_without_metadata(data):
return wrap(data)
elif is_blob_with_metadata(data):
validate(data)
return data
else: # Fallback / used for testing with simple data
logger.warning("Encountered storage data in unexpected format.")
assert isinstance(data, str)
return wrap(string_to_bytes(data))
def get_download_strategy(download_strategy_type=None):
download_strategies = {
"single": SingleDownloadStrategy(),
"multi": MultiDownloadStrategy(),
}
return download_strategies.get(download_strategy_type or CONFIG.service.download_strategy, SingleDownloadStrategy())
class DownloadStrategy(abc.ABC):
def _load_data(self, storage, queue_item_body):
object_descriptor = self.get_object_descriptor(queue_item_body)
logging.debug(f"Downloading {object_descriptor}...")
data = self.__download(storage, object_descriptor)
logging.debug(f"Downloaded {object_descriptor}.")
assert isinstance(data, bytes)
data = gzip.decompress(data)
return [data]
@staticmethod
def __download(storage, object_descriptor):
try:
data = storage.get_object(**object_descriptor)
except Exception as err:
logging.warning(f"Loading data from storage failed for {object_descriptor}.")
raise DataLoadingFailure from err
return data
@staticmethod
@abc.abstractmethod
def get_object_name(body: dict):
raise NotImplementedError
def get_object_descriptor(self, body):
return {"bucket_name": parse_disjunction_string(CONFIG.storage.bucket), "object_name": self.get_object_name(body)}
class SingleDownloadStrategy(DownloadStrategy):
def download(self, storage, queue_item_body):
return self._load_data(storage, queue_item_body)
@staticmethod
def get_object_name(body: dict):
# TODO: deepcopy still necessary?
body = deepcopy(body)
dossier_id, file_id = itemgetter("dossierId", "fileId")(body)
object_name = f"{dossier_id}/{file_id}.{CONFIG.service.target_file_extension}"
return object_name
def __call__(self, storage, queue_item_body):
return self.download(storage, queue_item_body)
class MultiDownloadStrategy(DownloadStrategy):
def __init__(self):
# TODO: pass in bucket name from outside / introduce closure-like abstraction for the bucket
self.bucket_name = parse_disjunction_string(CONFIG.storage.bucket)
def download(self, storage: Storage, queue_item_body):
pages = "|".join(map(str, queue_item_body["pages"]))
matches_page = r".*id:(" + pages + r").*"
object_names = storage.get_all_object_names(self.bucket_name)
object_names = filter(matches_page, object_names)
objects = (storage.get_object(self.bucket_name, objn) for objn in object_names)
objects = map(gzip.decompress, objects)
return objects
@staticmethod
def get_object_name(body: dict):
def get_key(key):
return key if key in body else False
# TODO: deepcopy still necessary?
body = deepcopy(body)
folder = f"/{get_key('pages') or get_key('images')}/"
if not folder:
raise InvalidMessage("Expected a folder like 'images' oder 'pages' to be specified in message.")
idnt = f"id:{body.get('id', 0)}"
dossier_id, file_id = itemgetter("dossierId", "fileId")(body)
object_name = f"{dossier_id}/{file_id}{folder}{idnt}.{CONFIG.service.target_file_extension}"
return object_name
def __call__(self, storage, queue_item_body):
return self.download(storage, queue_item_body)