introduced parsing strategy for storage blobs as part of the queue visitor

This commit is contained in:
Matthias Bisping 2022-06-03 14:49:19 +02:00
parent 7730950b50
commit 26573eeda3

View File

@ -6,7 +6,7 @@ import logging
import time
from collections import deque
from operator import itemgetter
from typing import Callable, Dict
from typing import Callable, Dict, Union
from funcy import omit
from more_itertools import peekable
@ -159,23 +159,40 @@ class InvalidStorageItemFormat(ValueError):
pass
class QueueVisitor:
def __init__(self, storage: Storage, callback: Callable, response_strategy):
self.storage = storage
self.callback = callback
self.response_strategy = response_strategy
class ParsingStrategy(abc.ABC):
@abc.abstractmethod
def parse(self, data: bytes):
pass
def download(self, object_descriptor):
try:
data = self.storage.get_object(**object_descriptor)
except Exception as err:
logging.warning(f"Loading data from storage failed for {object_descriptor}.")
raise DataLoadingFailure() from err
@abc.abstractmethod
def parse_and_wrap(self, data: bytes):
pass
return data
def __call__(self, data: bytes):
return self.parse_and_wrap(data)
class DynamicParsingStrategy(ParsingStrategy):
@staticmethod
def attempt_parse_json(data):
return json.loads(data.decode())
@staticmethod
def standardize(data: bytes, queue_item_body) -> Dict:
def fallback(data):
return data
def parse(self, data: bytes) -> Union[bytes, dict]:
try:
data = self.attempt_parse_json(data)
if "data" in data:
data["data"] = string_to_bytes(data["data"])
return data
except json.JSONDecodeError:
pass
return self.fallback(data)
def parse_and_wrap(self, data):
"""Storage items can be a blob or a blob with metadata. Standardizes to the latter.
Cases:
@ -187,30 +204,57 @@ class QueueVisitor:
{"data": bytes, "metadata": dict}
TODO:
This is really kinda wonky.
Each analysis service should specify a custom parsing strategy for the type of data it expects to be
found on the storage. This class is only a temporary trial-and-error->fallback type of solution.
"""
def validate(data):
if not ("data" in data and "metadata" in data):
raise InvalidStorageItemFormat(f"Expected a mapping with keys 'data' and 'metadata', got {data}.")
data = self.parse(data)
def wrap(data):
return {"data": data, "metadata": {}}
assert isinstance(data, bytes)
data = data.decode()
data = json.loads(data)
if not isinstance(data, dict): # case 1
return wrap(string_to_bytes(data))
if isinstance(data, bytes): # case 1
return wrap(data)
else: # case 2
assert isinstance(data, dict)
validate(data)
data["data"] = string_to_bytes(data["data"])
return data
def validate(data):
if not ("data" in data and "metadata" in data):
raise InvalidStorageItemFormat(f"Expected a mapping with keys 'data' and 'metadata', got {data}.")
def wrap(data):
return {"data": data, "metadata": {}}
class QueueVisitor:
def __init__(
self,
storage: Storage,
callback: Callable,
response_strategy,
parsing_strategy: ParsingStrategy = None,
):
self.storage = storage
self.callback = callback
self.response_strategy = response_strategy
self.parsing_strategy = parsing_strategy or DynamicParsingStrategy()
def download(self, object_descriptor):
try:
data = self.storage.get_object(**object_descriptor)
except Exception as err:
logging.warning(f"Loading data from storage failed for {object_descriptor}.")
raise DataLoadingFailure() from err
return data
def standardize(self, data: bytes, queue_item_body) -> Dict:
assert isinstance(data, bytes)
data = self.parsing_strategy(data)
return data
def load_data(self, queue_item_body):
object_descriptor = get_object_descriptor(queue_item_body)
logging.debug(f"Downloading {object_descriptor}...")