From 79350b4ce71fcd095ed6a5e1d3a598ea246fae53 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Thu, 23 Jun 2022 12:26:15 +0200 Subject: [PATCH] refactoring WIP: moving response stratgey logic into storage strategy (needs to be refactored as well, later) and file descr mngr. Here the moved code needs to be cleaned up. --- pyinfra/file_descriptor_manager.py | 41 +++++++++++++++---- .../strategies/response/aggregation.py | 11 +++-- .../visitor/strategies/response/response.py | 25 ----------- .../visitor/strategies/response/storage.py | 25 +++++++++++ pyinfra/visitor/utils.py | 5 --- 5 files changed, 66 insertions(+), 41 deletions(-) diff --git a/pyinfra/file_descriptor_manager.py b/pyinfra/file_descriptor_manager.py index 49b7d03..7108c06 100644 --- a/pyinfra/file_descriptor_manager.py +++ b/pyinfra/file_descriptor_manager.py @@ -1,8 +1,11 @@ +import json import os from _operator import itemgetter from funcy import project +from pyinfra.config import CONFIG + class FileDescriptorManager: def __init__(self, bucket_name, operation2file_patterns: dict = None): @@ -21,12 +24,18 @@ class FileDescriptorManager: def get_object_name(self, queue_item_body: dict, end): - file_descriptor = self.build_file_descriptor(queue_item_body, end=end) - file_descriptor["pages"] = [queue_item_body.get("id", 0)] + # TODO: refactor - object_name = self.__build_matcher(file_descriptor) + if end == "output": + return self.get_response_object_name(queue_item_body) + else: - return object_name + file_descriptor = self.build_file_descriptor(queue_item_body, end=end) + file_descriptor["pages"] = [queue_item_body.get("id", 0)] + + object_name = self.__build_matcher(file_descriptor) + + return object_name @staticmethod def __build_matcher(file_descriptor): @@ -65,7 +74,7 @@ class FileDescriptorManager: return self.build_matcher(queue_item_body, end="input") def build_output_matcher(self, queue_item_body): - return self.build_matcher(queue_item_body, end="output") + return self.get_response_object_name(queue_item_body) def build_matcher(self, queue_item_body, end): file_descriptor = self.build_file_descriptor(queue_item_body, end=end) @@ -74,11 +83,29 @@ class FileDescriptorManager: def get_input_object_descriptor(self, queue_item_body): return self.get_object_descriptor(queue_item_body, end="input") - def get_output_object_descriptor(self, queue_item_body): - return self.get_object_descriptor(queue_item_body, end="output") + def get_output_object_descriptor(self, storage_upload_info): + return self.get_object_descriptor(storage_upload_info, end="output") def get_object_descriptor(self, queue_item_body, end): return { "bucket_name": self.bucket_name, "object_name": self.get_object_name(queue_item_body, end=end), } + + @staticmethod + def build_storage_upload_info(analysis_payload, request_metadata): + storage_upload_info = {**request_metadata, "id": analysis_payload["metadata"].get("id", 0)} + return storage_upload_info + + @staticmethod + def get_response_object_name(body): + # TODO: refactor + + if "id" not in body: + body["id"] = 0 + + dossier_id, file_id, idnt = itemgetter("dossierId", "fileId", "id")(body) + + object_name = f"{dossier_id}/{file_id}/id:{idnt}.{CONFIG.service.response_file_extension}" + + return object_name diff --git a/pyinfra/visitor/strategies/response/aggregation.py b/pyinfra/visitor/strategies/response/aggregation.py index 8494e50..098f6a0 100644 --- a/pyinfra/visitor/strategies/response/aggregation.py +++ b/pyinfra/visitor/strategies/response/aggregation.py @@ -9,7 +9,6 @@ from pyinfra.file_descriptor_manager import FileDescriptorManager from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing from pyinfra.utils.encoding import pack_analysis_payload from pyinfra.visitor.strategies.response.response import ResponseStrategy -from pyinfra.visitor.utils import build_storage_upload_info def default_merge(items): @@ -37,9 +36,13 @@ class AggregationStorageStrategy(ResponseStrategy): def upload_or_aggregate(self, analysis_payload, request_metadata, last=False): """analysis_payload : {data: ..., metadata: ...}""" - - storage_upload_info = build_storage_upload_info(analysis_payload, request_metadata) - object_descriptor = self.get_response_object_descriptor(storage_upload_info) + storage_upload_info = self.file_descriptor_manager.build_storage_upload_info(analysis_payload, request_metadata) + object_descriptor = self.file_descriptor_manager.get_output_object_descriptor(storage_upload_info) + print(111111111) + print(json.dumps(storage_upload_info, indent=2)) + print(222222222) + print(json.dumps(object_descriptor, indent=2)) + print() self.add_analysis_payload_to_buffer(analysis_payload) diff --git a/pyinfra/visitor/strategies/response/response.py b/pyinfra/visitor/strategies/response/response.py index bdc0ca2..d8ae847 100644 --- a/pyinfra/visitor/strategies/response/response.py +++ b/pyinfra/visitor/strategies/response/response.py @@ -1,7 +1,4 @@ import abc -from _operator import itemgetter - -from pyinfra.config import parse_disjunction_string, CONFIG class ResponseStrategy(abc.ABC): @@ -11,25 +8,3 @@ class ResponseStrategy(abc.ABC): def __call__(self, analysis_response: dict): return self.handle_response(analysis_response) - - def get_response_object_descriptor(self, body): - return { - "bucket_name": parse_disjunction_string(CONFIG.storage.bucket), - "object_name": self.get_response_object_name(body), - } - - @staticmethod - def get_response_object_name(body): - # FIXME: rewrite with config.service.operations - - if "pages" not in body: - body["pages"] = [] - - if "id" not in body: - body["id"] = 0 - - dossier_id, file_id, pages, idnt = itemgetter("dossierId", "fileId", "pages", "id")(body) - - object_name = f"{dossier_id}/{file_id}/id:{idnt}.{CONFIG.service.response_file_extension}" - - return object_name diff --git a/pyinfra/visitor/strategies/response/storage.py b/pyinfra/visitor/strategies/response/storage.py index aa5d60b..c427867 100644 --- a/pyinfra/visitor/strategies/response/storage.py +++ b/pyinfra/visitor/strategies/response/storage.py @@ -1,5 +1,7 @@ import json +from operator import itemgetter +from pyinfra.config import parse_disjunction_string, CONFIG from pyinfra.utils.encoding import compress from pyinfra.visitor.strategies.response.response import ResponseStrategy @@ -14,3 +16,26 @@ class StorageStrategy(ResponseStrategy): body.pop("analysis_payloads") body["response_files"] = [response_object_descriptor["object_name"]] return body + + def get_response_object_descriptor(self, body): + """TODO: refactor by using FileDescriptorManager""" + return { + "bucket_name": parse_disjunction_string(CONFIG.storage.bucket), + "object_name": self.get_response_object_name(body), + } + + @staticmethod + def get_response_object_name(body): + """TODO: refactor by using FileDescriptorManager""" + + if "pages" not in body: + body["pages"] = [] + + if "id" not in body: + body["id"] = 0 + + dossier_id, file_id, pages, idnt = itemgetter("dossierId", "fileId", "pages", "id")(body) + + object_name = f"{dossier_id}/{file_id}/id:{idnt}.{CONFIG.service.response_file_extension}" + + return object_name diff --git a/pyinfra/visitor/utils.py b/pyinfra/visitor/utils.py index 1083de5..72a7b57 100644 --- a/pyinfra/visitor/utils.py +++ b/pyinfra/visitor/utils.py @@ -7,11 +7,6 @@ from pyinfra.server.packing import string_to_bytes logger = logging.getLogger() -def build_storage_upload_info(analysis_payload, request_metadata): - storage_upload_info = {**request_metadata, "id": analysis_payload["metadata"].get("id", 0)} - return storage_upload_info - - def build_file_path(storage_upload_info, folder): return f"{storage_upload_info['fileId']}" + (f"/{folder}" if folder else "")