download strategy WIP
This commit is contained in:
parent
3a3c497383
commit
249c6203b2
@ -122,8 +122,6 @@ class AggregationStorageStrategy(ResponseStrategy):
|
||||
|
||||
def put_object(self, data: bytes, storage_upload_info):
|
||||
object_descriptor = get_response_object_descriptor(storage_upload_info)
|
||||
# TODO: object_descriptor needs suffix
|
||||
# Note: what did I mean with that?
|
||||
self.storage.put_object(**object_descriptor, data=gzip.compress(data))
|
||||
return {**storage_upload_info, "responseFile": object_descriptor["object_name"]}
|
||||
|
||||
@ -214,62 +212,20 @@ class QueueVisitor:
|
||||
self,
|
||||
storage: Storage,
|
||||
callback: Callable,
|
||||
response_strategy,
|
||||
response_strategy: ResponseStrategy,
|
||||
parsing_strategy: ParsingStrategy = None,
|
||||
download_strategy=None,
|
||||
):
|
||||
self.storage = storage
|
||||
self.callback = callback
|
||||
self.download_strategy = download_strategy or get_download_strategy()
|
||||
self.response_strategy = response_strategy
|
||||
self.parsing_strategy = parsing_strategy or DynamicParsingStrategy()
|
||||
|
||||
def download(self, object_descriptor):
|
||||
try:
|
||||
data = self.storage.get_object(**object_descriptor)
|
||||
except Exception as err:
|
||||
logging.warning(f"Loading data from storage failed for {object_descriptor}.")
|
||||
raise DataLoadingFailure from err
|
||||
|
||||
return data
|
||||
|
||||
def standardize(self, data) -> Dict:
|
||||
"""Storage items can be a blob or a blob with metadata. Standardizes to the latter.
|
||||
|
||||
Cases:
|
||||
1) backend upload: data as bytes
|
||||
2) Some Python service's upload: data as bytes of a json string "{'data': <str>, 'metadata': <dict>}",
|
||||
where value of key 'data' was encoded with bytes_to_string(...)
|
||||
|
||||
Returns:
|
||||
{"data": bytes, "metadata": dict}
|
||||
"""
|
||||
|
||||
def is_blob_without_metadata(data):
|
||||
return isinstance(data, bytes)
|
||||
|
||||
def is_blob_with_metadata(data: Dict):
|
||||
return isinstance(data, dict)
|
||||
|
||||
if is_blob_without_metadata(data):
|
||||
return wrap(data)
|
||||
|
||||
elif is_blob_with_metadata(data):
|
||||
validate(data)
|
||||
return data
|
||||
|
||||
else: # Fallback / used for testing with simple data
|
||||
logger.warning("Encountered storage data in unexpected format.")
|
||||
assert isinstance(data, str)
|
||||
return wrap(string_to_bytes(data))
|
||||
|
||||
def load_data(self, queue_item_body):
|
||||
object_descriptor = get_object_descriptor(queue_item_body)
|
||||
logging.debug(f"Downloading {object_descriptor}...")
|
||||
data = self.download(object_descriptor)
|
||||
logging.debug(f"Downloaded {object_descriptor}.")
|
||||
assert isinstance(data, bytes)
|
||||
data = gzip.decompress(data)
|
||||
data = self.download_strategy(self.storage, queue_item_body)
|
||||
data = self.parsing_strategy(data)
|
||||
data = self.standardize(data)
|
||||
data = standardize(data)
|
||||
return data
|
||||
|
||||
def process_storage_item(self, data_metadata_pack):
|
||||
@ -286,3 +242,86 @@ class QueueVisitor:
|
||||
def __call__(self, queue_item_body):
|
||||
analysis_result_body = self.load_item_from_storage_and_process_with_callback(queue_item_body)
|
||||
return self.response_strategy(analysis_result_body)
|
||||
|
||||
|
||||
def standardize(data) -> Dict:
|
||||
"""Storage items can be a blob or a blob with metadata. Standardizes to the latter.
|
||||
|
||||
Cases:
|
||||
1) backend upload: data as bytes
|
||||
2) Some Python service's upload: data as bytes of a json string "{'data': <str>, 'metadata': <dict>}",
|
||||
where value of key 'data' was encoded with bytes_to_string(...)
|
||||
|
||||
Returns:
|
||||
{"data": bytes, "metadata": dict}
|
||||
"""
|
||||
|
||||
def is_blob_without_metadata(data):
|
||||
return isinstance(data, bytes)
|
||||
|
||||
def is_blob_with_metadata(data: Dict):
|
||||
return isinstance(data, dict)
|
||||
|
||||
if is_blob_without_metadata(data):
|
||||
return wrap(data)
|
||||
|
||||
elif is_blob_with_metadata(data):
|
||||
validate(data)
|
||||
return data
|
||||
|
||||
else: # Fallback / used for testing with simple data
|
||||
logger.warning("Encountered storage data in unexpected format.")
|
||||
assert isinstance(data, str)
|
||||
return wrap(string_to_bytes(data))
|
||||
|
||||
|
||||
def get_download_strategy():
|
||||
download_strategies = {
|
||||
"single": SingleDownloadStrategy(),
|
||||
# "multi": MultiDownloadStratey(),
|
||||
}
|
||||
return download_strategies.get(CONFIG.download_strategy, SingleDownloadStrategy())
|
||||
|
||||
|
||||
class DownloadStrategy(abc.ABC):
|
||||
def _load_data(self, storage, queue_item_body):
|
||||
object_descriptor = get_object_descriptor(queue_item_body)
|
||||
logging.debug(f"Downloading {object_descriptor}...")
|
||||
data = self.__download(storage, object_descriptor)
|
||||
logging.debug(f"Downloaded {object_descriptor}.")
|
||||
assert isinstance(data, bytes)
|
||||
data = gzip.decompress(data)
|
||||
return data
|
||||
|
||||
@staticmethod
|
||||
def __download(storage, object_descriptor):
|
||||
try:
|
||||
data = storage.get_object(**object_descriptor)
|
||||
except Exception as err:
|
||||
logging.warning(f"Loading data from storage failed for {object_descriptor}.")
|
||||
raise DataLoadingFailure from err
|
||||
|
||||
return data
|
||||
|
||||
# def __call__(self, storage, queue_item_body):
|
||||
# return self._load_data(storage, queue_item_body)
|
||||
|
||||
|
||||
class SingleDownloadStrategy(DownloadStrategy):
|
||||
def download(self, storage, object_descriptor):
|
||||
return self._load_data(storage, object_descriptor)
|
||||
|
||||
def __call__(self, storage, queue_item_body):
|
||||
return self.download(storage, queue_item_body)
|
||||
|
||||
|
||||
# class MultiDownloadStratey(DownloadStratey):
|
||||
#
|
||||
# def download(self, object_descriptor):
|
||||
# try:
|
||||
# data = self.storage.get_object(**object_descriptor)
|
||||
# except Exception as err:
|
||||
# logging.warning(f"Loading data from storage failed for {object_descriptor}.")
|
||||
# raise DataLoadingFailure from err
|
||||
#
|
||||
# return data
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user