refactoring
This commit is contained in:
parent
e48fa85784
commit
730bdfb220
@ -17,7 +17,6 @@ from pyinfra.parser.parser_composer import EitherParserComposer
|
||||
from pyinfra.parser.parsers.identity import IdentityBlobParser
|
||||
from pyinfra.parser.parsers.json import JsonBlobParser
|
||||
from pyinfra.parser.parsers.string import StringBlobParser
|
||||
from pyinfra.server.packing import string_to_bytes
|
||||
from pyinfra.storage.storage import Storage
|
||||
|
||||
|
||||
@ -176,8 +175,9 @@ class ParsingStrategy(abc.ABC):
|
||||
return self.parse_and_wrap(data)
|
||||
|
||||
|
||||
# TODO: Each analysis service should specify a custom parsing strategy for the type of data it expects to be found
|
||||
# on the storage. This class is only a temporary trial-and-error->fallback type of solution.
|
||||
class DynamicParsingStrategy(ParsingStrategy):
|
||||
|
||||
def __init__(self):
|
||||
self.parser = EitherParserComposer(JsonBlobParser(), StringBlobParser(), IdentityBlobParser())
|
||||
|
||||
@ -185,30 +185,7 @@ class DynamicParsingStrategy(ParsingStrategy):
|
||||
return self.parser(data)
|
||||
|
||||
def parse_and_wrap(self, data):
|
||||
"""Storage items can be a blob or a blob with metadata. Standardizes to the latter.
|
||||
|
||||
Cases:
|
||||
1) backend upload: data as bytes
|
||||
2) Some Python service's upload: data as bytes of a json string "{'data': <str>, 'metadata': <dict>}",
|
||||
where value of key 'data' was encoded with bytes_to_string(...)
|
||||
|
||||
Returns:
|
||||
{"data": bytes, "metadata": dict}
|
||||
|
||||
TODO:
|
||||
Each analysis service should specify a custom parsing strategy for the type of data it expects to be
|
||||
found on the storage. This class is only a temporary trial-and-error->fallback type of solution.
|
||||
"""
|
||||
|
||||
data = self.parse(data)
|
||||
|
||||
if isinstance(data, bytes): # case 1
|
||||
return wrap(data)
|
||||
|
||||
else: # case 2
|
||||
assert isinstance(data, dict)
|
||||
validate(data)
|
||||
return data
|
||||
return self.parse(data)
|
||||
|
||||
|
||||
def validate(data):
|
||||
@ -243,16 +220,33 @@ class QueueVisitor:
|
||||
return data
|
||||
|
||||
def standardize(self, data: bytes) -> Dict:
|
||||
assert isinstance(data, bytes)
|
||||
data = self.parsing_strategy(data)
|
||||
return data
|
||||
"""Storage items can be a blob or a blob with metadata. Standardizes to the latter.
|
||||
|
||||
Cases:
|
||||
1) backend upload: data as bytes
|
||||
2) Some Python service's upload: data as bytes of a json string "{'data': <str>, 'metadata': <dict>}",
|
||||
where value of key 'data' was encoded with bytes_to_string(...)
|
||||
|
||||
Returns:
|
||||
{"data": bytes, "metadata": dict}
|
||||
"""
|
||||
|
||||
if isinstance(data, bytes): # case 1
|
||||
return wrap(data)
|
||||
|
||||
else: # case 2
|
||||
assert isinstance(data, dict)
|
||||
validate(data)
|
||||
return data
|
||||
|
||||
def load_data(self, queue_item_body):
|
||||
object_descriptor = get_object_descriptor(queue_item_body)
|
||||
logging.debug(f"Downloading {object_descriptor}...")
|
||||
data = self.download(object_descriptor)
|
||||
logging.debug(f"Downloaded {object_descriptor}.")
|
||||
assert isinstance(data, bytes)
|
||||
data = gzip.decompress(data)
|
||||
data = self.parsing_strategy(data)
|
||||
data = self.standardize(data)
|
||||
return data
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user