replaced trial and error logic for parsing blobs in visitor with parser composer instance
This commit is contained in:
parent
6e5af4092e
commit
e48fa85784
@ -13,6 +13,10 @@ from more_itertools import peekable
|
|||||||
|
|
||||||
from pyinfra.config import CONFIG, parse_disjunction_string
|
from pyinfra.config import CONFIG, parse_disjunction_string
|
||||||
from pyinfra.exceptions import DataLoadingFailure
|
from pyinfra.exceptions import DataLoadingFailure
|
||||||
|
from pyinfra.parser.parser_composer import EitherParserComposer
|
||||||
|
from pyinfra.parser.parsers.identity import IdentityBlobParser
|
||||||
|
from pyinfra.parser.parsers.json import JsonBlobParser
|
||||||
|
from pyinfra.parser.parsers.string import StringBlobParser
|
||||||
from pyinfra.server.packing import string_to_bytes
|
from pyinfra.server.packing import string_to_bytes
|
||||||
from pyinfra.storage.storage import Storage
|
from pyinfra.storage.storage import Storage
|
||||||
|
|
||||||
@ -173,29 +177,12 @@ class ParsingStrategy(abc.ABC):
|
|||||||
|
|
||||||
|
|
||||||
class DynamicParsingStrategy(ParsingStrategy):
|
class DynamicParsingStrategy(ParsingStrategy):
|
||||||
@staticmethod
|
|
||||||
def attempt_parse_json(data):
|
|
||||||
data = json.loads(data)
|
|
||||||
if "data" in data:
|
|
||||||
data["data"] = string_to_bytes(data["data"])
|
|
||||||
return data
|
|
||||||
|
|
||||||
@staticmethod
|
def __init__(self):
|
||||||
def fallback(data):
|
self.parser = EitherParserComposer(JsonBlobParser(), StringBlobParser(), IdentityBlobParser())
|
||||||
return data
|
|
||||||
|
|
||||||
def parse(self, data: bytes) -> Union[bytes, dict]:
|
def parse(self, data: bytes) -> Union[bytes, dict]:
|
||||||
try:
|
return self.parser(data)
|
||||||
data = data.decode()
|
|
||||||
except UnicodeDecodeError:
|
|
||||||
return self.fallback(data)
|
|
||||||
|
|
||||||
try:
|
|
||||||
return self.attempt_parse_json(data)
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
return self.fallback(data)
|
|
||||||
|
|
||||||
def parse_and_wrap(self, data):
|
def parse_and_wrap(self, data):
|
||||||
"""Storage items can be a blob or a blob with metadata. Standardizes to the latter.
|
"""Storage items can be a blob or a blob with metadata. Standardizes to the latter.
|
||||||
@ -255,7 +242,7 @@ class QueueVisitor:
|
|||||||
|
|
||||||
return data
|
return data
|
||||||
|
|
||||||
def standardize(self, data: bytes, queue_item_body) -> Dict:
|
def standardize(self, data: bytes) -> Dict:
|
||||||
assert isinstance(data, bytes)
|
assert isinstance(data, bytes)
|
||||||
data = self.parsing_strategy(data)
|
data = self.parsing_strategy(data)
|
||||||
return data
|
return data
|
||||||
@ -266,7 +253,7 @@ class QueueVisitor:
|
|||||||
data = self.download(object_descriptor)
|
data = self.download(object_descriptor)
|
||||||
logging.debug(f"Downloaded {object_descriptor}.")
|
logging.debug(f"Downloaded {object_descriptor}.")
|
||||||
data = gzip.decompress(data)
|
data = gzip.decompress(data)
|
||||||
data = self.standardize(data, queue_item_body)
|
data = self.standardize(data)
|
||||||
return data
|
return data
|
||||||
|
|
||||||
def process_storage_item(self, data_metadata_pack):
|
def process_storage_item(self, data_metadata_pack):
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user