From 249c6203b2272e9c675e13ee8e2e9e8ba2288723 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Fri, 10 Jun 2022 13:26:43 +0200 Subject: [PATCH] download strategy WIP --- pyinfra/visitor.py | 137 +++++++++++++++++++++++++++++---------------- 1 file changed, 88 insertions(+), 49 deletions(-) diff --git a/pyinfra/visitor.py b/pyinfra/visitor.py index fda3dbb..2beb3ce 100644 --- a/pyinfra/visitor.py +++ b/pyinfra/visitor.py @@ -122,8 +122,6 @@ class AggregationStorageStrategy(ResponseStrategy): def put_object(self, data: bytes, storage_upload_info): object_descriptor = get_response_object_descriptor(storage_upload_info) - # TODO: object_descriptor needs suffix - # Note: what did I mean with that? self.storage.put_object(**object_descriptor, data=gzip.compress(data)) return {**storage_upload_info, "responseFile": object_descriptor["object_name"]} @@ -214,62 +212,20 @@ class QueueVisitor: self, storage: Storage, callback: Callable, - response_strategy, + response_strategy: ResponseStrategy, parsing_strategy: ParsingStrategy = None, + download_strategy=None, ): self.storage = storage self.callback = callback + self.download_strategy = download_strategy or get_download_strategy() self.response_strategy = response_strategy self.parsing_strategy = parsing_strategy or DynamicParsingStrategy() - def download(self, object_descriptor): - try: - data = self.storage.get_object(**object_descriptor) - except Exception as err: - logging.warning(f"Loading data from storage failed for {object_descriptor}.") - raise DataLoadingFailure from err - - return data - - def standardize(self, data) -> Dict: - """Storage items can be a blob or a blob with metadata. Standardizes to the latter. - - Cases: - 1) backend upload: data as bytes - 2) Some Python service's upload: data as bytes of a json string "{'data': , 'metadata': }", - where value of key 'data' was encoded with bytes_to_string(...) - - Returns: - {"data": bytes, "metadata": dict} - """ - - def is_blob_without_metadata(data): - return isinstance(data, bytes) - - def is_blob_with_metadata(data: Dict): - return isinstance(data, dict) - - if is_blob_without_metadata(data): - return wrap(data) - - elif is_blob_with_metadata(data): - validate(data) - return data - - else: # Fallback / used for testing with simple data - logger.warning("Encountered storage data in unexpected format.") - assert isinstance(data, str) - return wrap(string_to_bytes(data)) - def load_data(self, queue_item_body): - object_descriptor = get_object_descriptor(queue_item_body) - logging.debug(f"Downloading {object_descriptor}...") - data = self.download(object_descriptor) - logging.debug(f"Downloaded {object_descriptor}.") - assert isinstance(data, bytes) - data = gzip.decompress(data) + data = self.download_strategy(self.storage, queue_item_body) data = self.parsing_strategy(data) - data = self.standardize(data) + data = standardize(data) return data def process_storage_item(self, data_metadata_pack): @@ -286,3 +242,86 @@ class QueueVisitor: def __call__(self, queue_item_body): analysis_result_body = self.load_item_from_storage_and_process_with_callback(queue_item_body) return self.response_strategy(analysis_result_body) + + +def standardize(data) -> Dict: + """Storage items can be a blob or a blob with metadata. Standardizes to the latter. + + Cases: + 1) backend upload: data as bytes + 2) Some Python service's upload: data as bytes of a json string "{'data': , 'metadata': }", + where value of key 'data' was encoded with bytes_to_string(...) + + Returns: + {"data": bytes, "metadata": dict} + """ + + def is_blob_without_metadata(data): + return isinstance(data, bytes) + + def is_blob_with_metadata(data: Dict): + return isinstance(data, dict) + + if is_blob_without_metadata(data): + return wrap(data) + + elif is_blob_with_metadata(data): + validate(data) + return data + + else: # Fallback / used for testing with simple data + logger.warning("Encountered storage data in unexpected format.") + assert isinstance(data, str) + return wrap(string_to_bytes(data)) + + +def get_download_strategy(): + download_strategies = { + "single": SingleDownloadStrategy(), + # "multi": MultiDownloadStratey(), + } + return download_strategies.get(CONFIG.download_strategy, SingleDownloadStrategy()) + + +class DownloadStrategy(abc.ABC): + def _load_data(self, storage, queue_item_body): + object_descriptor = get_object_descriptor(queue_item_body) + logging.debug(f"Downloading {object_descriptor}...") + data = self.__download(storage, object_descriptor) + logging.debug(f"Downloaded {object_descriptor}.") + assert isinstance(data, bytes) + data = gzip.decompress(data) + return data + + @staticmethod + def __download(storage, object_descriptor): + try: + data = storage.get_object(**object_descriptor) + except Exception as err: + logging.warning(f"Loading data from storage failed for {object_descriptor}.") + raise DataLoadingFailure from err + + return data + + # def __call__(self, storage, queue_item_body): + # return self._load_data(storage, queue_item_body) + + +class SingleDownloadStrategy(DownloadStrategy): + def download(self, storage, object_descriptor): + return self._load_data(storage, object_descriptor) + + def __call__(self, storage, queue_item_body): + return self.download(storage, queue_item_body) + + +# class MultiDownloadStratey(DownloadStratey): +# +# def download(self, object_descriptor): +# try: +# data = self.storage.get_object(**object_descriptor) +# except Exception as err: +# logging.warning(f"Loading data from storage failed for {object_descriptor}.") +# raise DataLoadingFailure from err +# +# return data