pyinfra/pyinfra/visitor.py

160 lines
5.0 KiB
Python

import abc
import gzip
import json
import logging
from collections import deque
from operator import itemgetter
import random
from typing import Callable, Generator
from funcy import omit, pluck, first, lmap
from pyinfra.config import CONFIG, parse_disjunction_string
from pyinfra.exceptions import DataLoadingFailure
from pyinfra.server.packing import string_to_bytes
from pyinfra.storage.storage import Storage
def get_object_name(body):
dossier_id, file_id, target_file_extension = itemgetter("dossierId", "fileId", "targetFileExtension")(body)
object_name = f"{dossier_id}/{file_id}.{target_file_extension}"
return object_name
def get_response_object_name(body):
dossier_id, file_id, response_file_extension = itemgetter("dossierId", "fileId", "responseFileExtension")(body)
object_name = f"{dossier_id}/{file_id}.{response_file_extension}"
return object_name
def get_object_descriptor(body):
return {"bucket_name": parse_disjunction_string(CONFIG.storage.bucket), "object_name": get_object_name(body)}
def get_response_object_descriptor(body):
return {
"bucket_name": parse_disjunction_string(CONFIG.storage.bucket),
"object_name": get_response_object_name(body)+str(random.randint(0, 100)), # TODO: this random suffix should be built by some policy
}
class ResponseStrategy(abc.ABC):
@abc.abstractmethod
def handle_response(self, body):
pass
def __call__(self, body):
return self.handle_response(body)
class StorageStrategy(ResponseStrategy):
def __init__(self, storage):
self.storage = storage
def handle_response(self, body):
self.storage.put_object(**get_response_object_descriptor(body), data=gzip.compress(json.dumps(body).encode()))
body.pop("data")
return body
class ForwardingStrategy(ResponseStrategy):
def handle_response(self, body):
return body
class DispatchCallback(abc.ABC):
@abc.abstractmethod
def __call__(self, payload):
pass
class IdentifierDispatchCallback(DispatchCallback):
def __init__(self):
self.identifier = None
def has_new_identifier(self, metadata):
identifier = ":".join(itemgetter("fileId", "dossierId")(metadata))
if not self.identifier:
self.identifier = identifier
return identifier != self.identifier
def __call__(self, payload):
return self.has_new_identifier(payload)
class AggregationStorageStrategy(ResponseStrategy):
def __init__(self, storage, merger: Callable = None, dispatch_callback: DispatchCallback = None):
self.storage = storage
self.merger = merger or list
self.dispatch_callback = dispatch_callback or IdentifierDispatchCallback()
self.buffer = deque()
def put_object(self, data, metadata):
object_descriptor = get_response_object_descriptor(metadata)
self.storage.put_object(**object_descriptor, data=gzip.compress(json.dumps(data).encode()))
def merge_queue_items(self):
merged_buffer_content = self.merger(self.buffer)
self.buffer.clear()
return merged_buffer_content
def upload_queue_items(self, metadata):
data = self.merge_queue_items()
self.put_object(data, metadata)
def upload_or_aggregate(self, data, metadata):
if isinstance(data, str):
self.put_object(data, metadata)
else:
self.buffer.append(data)
if self.dispatch_callback(metadata):
self.upload_queue_items(metadata)
def handle_response(self, payload, final=False):
metadata = omit(payload, ["data"])
data = payload["data"]
for item in data:
self.upload_or_aggregate(item, metadata)
return metadata
class QueueVisitor:
def __init__(self, storage: Storage, callback: Callable, response_strategy):
self.storage = storage
self.callback = callback
self.response_strategy = response_strategy
def load_data(self, body):
def download():
logging.debug(f"Downloading {object_descriptor}...")
data = self.storage.get_object(**object_descriptor)
logging.debug(f"Downloaded {object_descriptor}.")
return data
object_descriptor = get_object_descriptor(body)
try:
return gzip.decompress(download())
except Exception as err:
logging.warning(f"Loading data from storage failed for {object_descriptor}.")
raise DataLoadingFailure() from err
def process_data(self, data, body):
return self.callback({"data": data, "metadata": body})
def load_and_process(self, body):
data_from_storage = self.load_data(body)
result = self.process_data(data_from_storage, body)
result = lmap(json.dumps, result)
result_body = {"data": result, **body}
return result_body
def __call__(self, body):
result_body = self.load_and_process(body)
return self.response_strategy(result_body)