From 26573eeda37ca7a63c8405e157a31978470011bb Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Fri, 3 Jun 2022 14:49:19 +0200 Subject: [PATCH] introduced parsing strategy for storage blobs as part of the queue visitor --- pyinfra/visitor.py | 104 ++++++++++++++++++++++++++++++++------------- 1 file changed, 74 insertions(+), 30 deletions(-) diff --git a/pyinfra/visitor.py b/pyinfra/visitor.py index 2c42b79..1705c61 100644 --- a/pyinfra/visitor.py +++ b/pyinfra/visitor.py @@ -6,7 +6,7 @@ import logging import time from collections import deque from operator import itemgetter -from typing import Callable, Dict +from typing import Callable, Dict, Union from funcy import omit from more_itertools import peekable @@ -159,23 +159,40 @@ class InvalidStorageItemFormat(ValueError): pass -class QueueVisitor: - def __init__(self, storage: Storage, callback: Callable, response_strategy): - self.storage = storage - self.callback = callback - self.response_strategy = response_strategy +class ParsingStrategy(abc.ABC): + @abc.abstractmethod + def parse(self, data: bytes): + pass - def download(self, object_descriptor): - try: - data = self.storage.get_object(**object_descriptor) - except Exception as err: - logging.warning(f"Loading data from storage failed for {object_descriptor}.") - raise DataLoadingFailure() from err + @abc.abstractmethod + def parse_and_wrap(self, data: bytes): + pass - return data + def __call__(self, data: bytes): + return self.parse_and_wrap(data) + + +class DynamicParsingStrategy(ParsingStrategy): + @staticmethod + def attempt_parse_json(data): + return json.loads(data.decode()) @staticmethod - def standardize(data: bytes, queue_item_body) -> Dict: + def fallback(data): + return data + + def parse(self, data: bytes) -> Union[bytes, dict]: + try: + data = self.attempt_parse_json(data) + if "data" in data: + data["data"] = string_to_bytes(data["data"]) + return data + except json.JSONDecodeError: + pass + + return self.fallback(data) + + def parse_and_wrap(self, data): """Storage items can be a blob or a blob with metadata. Standardizes to the latter. Cases: @@ -187,30 +204,57 @@ class QueueVisitor: {"data": bytes, "metadata": dict} TODO: - This is really kinda wonky. + Each analysis service should specify a custom parsing strategy for the type of data it expects to be + found on the storage. This class is only a temporary trial-and-error->fallback type of solution. """ - def validate(data): - if not ("data" in data and "metadata" in data): - raise InvalidStorageItemFormat(f"Expected a mapping with keys 'data' and 'metadata', got {data}.") + data = self.parse(data) - def wrap(data): - return {"data": data, "metadata": {}} - - assert isinstance(data, bytes) - - data = data.decode() - - data = json.loads(data) - - if not isinstance(data, dict): # case 1 - return wrap(string_to_bytes(data)) + if isinstance(data, bytes): # case 1 + return wrap(data) else: # case 2 + assert isinstance(data, dict) validate(data) - data["data"] = string_to_bytes(data["data"]) return data + +def validate(data): + if not ("data" in data and "metadata" in data): + raise InvalidStorageItemFormat(f"Expected a mapping with keys 'data' and 'metadata', got {data}.") + + +def wrap(data): + return {"data": data, "metadata": {}} + + +class QueueVisitor: + def __init__( + self, + storage: Storage, + callback: Callable, + response_strategy, + parsing_strategy: ParsingStrategy = None, + ): + self.storage = storage + self.callback = callback + self.response_strategy = response_strategy + self.parsing_strategy = parsing_strategy or DynamicParsingStrategy() + + def download(self, object_descriptor): + try: + data = self.storage.get_object(**object_descriptor) + except Exception as err: + logging.warning(f"Loading data from storage failed for {object_descriptor}.") + raise DataLoadingFailure() from err + + return data + + def standardize(self, data: bytes, queue_item_body) -> Dict: + assert isinstance(data, bytes) + data = self.parsing_strategy(data) + return data + def load_data(self, queue_item_body): object_descriptor = get_object_descriptor(queue_item_body) logging.debug(f"Downloading {object_descriptor}...")