refactoring WIP: moving response stratgey logic into storage strategy (needs to be refactored as well, later) and file descr mngr. Here the moved code needs to be cleaned up.
This commit is contained in:
parent
7e48c66f0c
commit
79350b4ce7
@ -1,8 +1,11 @@
|
||||
import json
|
||||
import os
|
||||
from _operator import itemgetter
|
||||
|
||||
from funcy import project
|
||||
|
||||
from pyinfra.config import CONFIG
|
||||
|
||||
|
||||
class FileDescriptorManager:
|
||||
def __init__(self, bucket_name, operation2file_patterns: dict = None):
|
||||
@ -21,12 +24,18 @@ class FileDescriptorManager:
|
||||
|
||||
def get_object_name(self, queue_item_body: dict, end):
|
||||
|
||||
file_descriptor = self.build_file_descriptor(queue_item_body, end=end)
|
||||
file_descriptor["pages"] = [queue_item_body.get("id", 0)]
|
||||
# TODO: refactor
|
||||
|
||||
object_name = self.__build_matcher(file_descriptor)
|
||||
if end == "output":
|
||||
return self.get_response_object_name(queue_item_body)
|
||||
else:
|
||||
|
||||
return object_name
|
||||
file_descriptor = self.build_file_descriptor(queue_item_body, end=end)
|
||||
file_descriptor["pages"] = [queue_item_body.get("id", 0)]
|
||||
|
||||
object_name = self.__build_matcher(file_descriptor)
|
||||
|
||||
return object_name
|
||||
|
||||
@staticmethod
|
||||
def __build_matcher(file_descriptor):
|
||||
@ -65,7 +74,7 @@ class FileDescriptorManager:
|
||||
return self.build_matcher(queue_item_body, end="input")
|
||||
|
||||
def build_output_matcher(self, queue_item_body):
|
||||
return self.build_matcher(queue_item_body, end="output")
|
||||
return self.get_response_object_name(queue_item_body)
|
||||
|
||||
def build_matcher(self, queue_item_body, end):
|
||||
file_descriptor = self.build_file_descriptor(queue_item_body, end=end)
|
||||
@ -74,11 +83,29 @@ class FileDescriptorManager:
|
||||
def get_input_object_descriptor(self, queue_item_body):
|
||||
return self.get_object_descriptor(queue_item_body, end="input")
|
||||
|
||||
def get_output_object_descriptor(self, queue_item_body):
|
||||
return self.get_object_descriptor(queue_item_body, end="output")
|
||||
def get_output_object_descriptor(self, storage_upload_info):
|
||||
return self.get_object_descriptor(storage_upload_info, end="output")
|
||||
|
||||
def get_object_descriptor(self, queue_item_body, end):
|
||||
return {
|
||||
"bucket_name": self.bucket_name,
|
||||
"object_name": self.get_object_name(queue_item_body, end=end),
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def build_storage_upload_info(analysis_payload, request_metadata):
|
||||
storage_upload_info = {**request_metadata, "id": analysis_payload["metadata"].get("id", 0)}
|
||||
return storage_upload_info
|
||||
|
||||
@staticmethod
|
||||
def get_response_object_name(body):
|
||||
# TODO: refactor
|
||||
|
||||
if "id" not in body:
|
||||
body["id"] = 0
|
||||
|
||||
dossier_id, file_id, idnt = itemgetter("dossierId", "fileId", "id")(body)
|
||||
|
||||
object_name = f"{dossier_id}/{file_id}/id:{idnt}.{CONFIG.service.response_file_extension}"
|
||||
|
||||
return object_name
|
||||
|
||||
@ -9,7 +9,6 @@ from pyinfra.file_descriptor_manager import FileDescriptorManager
|
||||
from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing
|
||||
from pyinfra.utils.encoding import pack_analysis_payload
|
||||
from pyinfra.visitor.strategies.response.response import ResponseStrategy
|
||||
from pyinfra.visitor.utils import build_storage_upload_info
|
||||
|
||||
|
||||
def default_merge(items):
|
||||
@ -37,9 +36,13 @@ class AggregationStorageStrategy(ResponseStrategy):
|
||||
|
||||
def upload_or_aggregate(self, analysis_payload, request_metadata, last=False):
|
||||
"""analysis_payload : {data: ..., metadata: ...}"""
|
||||
|
||||
storage_upload_info = build_storage_upload_info(analysis_payload, request_metadata)
|
||||
object_descriptor = self.get_response_object_descriptor(storage_upload_info)
|
||||
storage_upload_info = self.file_descriptor_manager.build_storage_upload_info(analysis_payload, request_metadata)
|
||||
object_descriptor = self.file_descriptor_manager.get_output_object_descriptor(storage_upload_info)
|
||||
print(111111111)
|
||||
print(json.dumps(storage_upload_info, indent=2))
|
||||
print(222222222)
|
||||
print(json.dumps(object_descriptor, indent=2))
|
||||
print()
|
||||
|
||||
self.add_analysis_payload_to_buffer(analysis_payload)
|
||||
|
||||
|
||||
@ -1,7 +1,4 @@
|
||||
import abc
|
||||
from _operator import itemgetter
|
||||
|
||||
from pyinfra.config import parse_disjunction_string, CONFIG
|
||||
|
||||
|
||||
class ResponseStrategy(abc.ABC):
|
||||
@ -11,25 +8,3 @@ class ResponseStrategy(abc.ABC):
|
||||
|
||||
def __call__(self, analysis_response: dict):
|
||||
return self.handle_response(analysis_response)
|
||||
|
||||
def get_response_object_descriptor(self, body):
|
||||
return {
|
||||
"bucket_name": parse_disjunction_string(CONFIG.storage.bucket),
|
||||
"object_name": self.get_response_object_name(body),
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def get_response_object_name(body):
|
||||
# FIXME: rewrite with config.service.operations
|
||||
|
||||
if "pages" not in body:
|
||||
body["pages"] = []
|
||||
|
||||
if "id" not in body:
|
||||
body["id"] = 0
|
||||
|
||||
dossier_id, file_id, pages, idnt = itemgetter("dossierId", "fileId", "pages", "id")(body)
|
||||
|
||||
object_name = f"{dossier_id}/{file_id}/id:{idnt}.{CONFIG.service.response_file_extension}"
|
||||
|
||||
return object_name
|
||||
|
||||
@ -1,5 +1,7 @@
|
||||
import json
|
||||
from operator import itemgetter
|
||||
|
||||
from pyinfra.config import parse_disjunction_string, CONFIG
|
||||
from pyinfra.utils.encoding import compress
|
||||
from pyinfra.visitor.strategies.response.response import ResponseStrategy
|
||||
|
||||
@ -14,3 +16,26 @@ class StorageStrategy(ResponseStrategy):
|
||||
body.pop("analysis_payloads")
|
||||
body["response_files"] = [response_object_descriptor["object_name"]]
|
||||
return body
|
||||
|
||||
def get_response_object_descriptor(self, body):
|
||||
"""TODO: refactor by using FileDescriptorManager"""
|
||||
return {
|
||||
"bucket_name": parse_disjunction_string(CONFIG.storage.bucket),
|
||||
"object_name": self.get_response_object_name(body),
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def get_response_object_name(body):
|
||||
"""TODO: refactor by using FileDescriptorManager"""
|
||||
|
||||
if "pages" not in body:
|
||||
body["pages"] = []
|
||||
|
||||
if "id" not in body:
|
||||
body["id"] = 0
|
||||
|
||||
dossier_id, file_id, pages, idnt = itemgetter("dossierId", "fileId", "pages", "id")(body)
|
||||
|
||||
object_name = f"{dossier_id}/{file_id}/id:{idnt}.{CONFIG.service.response_file_extension}"
|
||||
|
||||
return object_name
|
||||
|
||||
@ -7,11 +7,6 @@ from pyinfra.server.packing import string_to_bytes
|
||||
logger = logging.getLogger()
|
||||
|
||||
|
||||
def build_storage_upload_info(analysis_payload, request_metadata):
|
||||
storage_upload_info = {**request_metadata, "id": analysis_payload["metadata"].get("id", 0)}
|
||||
return storage_upload_info
|
||||
|
||||
|
||||
def build_file_path(storage_upload_info, folder):
|
||||
return f"{storage_upload_info['fileId']}" + (f"/{folder}" if folder else "")
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user