diff --git a/pyinfra/visitor.py b/pyinfra/visitor.py index d841ec1..ef2ed86 100644 --- a/pyinfra/visitor.py +++ b/pyinfra/visitor.py @@ -17,7 +17,6 @@ from pyinfra.parser.parser_composer import EitherParserComposer from pyinfra.parser.parsers.identity import IdentityBlobParser from pyinfra.parser.parsers.json import JsonBlobParser from pyinfra.parser.parsers.string import StringBlobParser -from pyinfra.server.packing import string_to_bytes from pyinfra.storage.storage import Storage @@ -176,8 +175,9 @@ class ParsingStrategy(abc.ABC): return self.parse_and_wrap(data) +# TODO: Each analysis service should specify a custom parsing strategy for the type of data it expects to be found +# on the storage. This class is only a temporary trial-and-error->fallback type of solution. class DynamicParsingStrategy(ParsingStrategy): - def __init__(self): self.parser = EitherParserComposer(JsonBlobParser(), StringBlobParser(), IdentityBlobParser()) @@ -185,30 +185,7 @@ class DynamicParsingStrategy(ParsingStrategy): return self.parser(data) def parse_and_wrap(self, data): - """Storage items can be a blob or a blob with metadata. Standardizes to the latter. - - Cases: - 1) backend upload: data as bytes - 2) Some Python service's upload: data as bytes of a json string "{'data': , 'metadata': }", - where value of key 'data' was encoded with bytes_to_string(...) - - Returns: - {"data": bytes, "metadata": dict} - - TODO: - Each analysis service should specify a custom parsing strategy for the type of data it expects to be - found on the storage. This class is only a temporary trial-and-error->fallback type of solution. - """ - - data = self.parse(data) - - if isinstance(data, bytes): # case 1 - return wrap(data) - - else: # case 2 - assert isinstance(data, dict) - validate(data) - return data + return self.parse(data) def validate(data): @@ -243,16 +220,33 @@ class QueueVisitor: return data def standardize(self, data: bytes) -> Dict: - assert isinstance(data, bytes) - data = self.parsing_strategy(data) - return data + """Storage items can be a blob or a blob with metadata. Standardizes to the latter. + + Cases: + 1) backend upload: data as bytes + 2) Some Python service's upload: data as bytes of a json string "{'data': , 'metadata': }", + where value of key 'data' was encoded with bytes_to_string(...) + + Returns: + {"data": bytes, "metadata": dict} + """ + + if isinstance(data, bytes): # case 1 + return wrap(data) + + else: # case 2 + assert isinstance(data, dict) + validate(data) + return data def load_data(self, queue_item_body): object_descriptor = get_object_descriptor(queue_item_body) logging.debug(f"Downloading {object_descriptor}...") data = self.download(object_descriptor) logging.debug(f"Downloaded {object_descriptor}.") + assert isinstance(data, bytes) data = gzip.decompress(data) + data = self.parsing_strategy(data) data = self.standardize(data) return data