From 7e48c66f0c378b25a433a4034eefdc8a0957e775 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Thu, 23 Jun 2022 12:00:48 +0200 Subject: [PATCH] refactoring; removed operation / response folder from output path --- pyinfra/default_objects.py | 44 ++++++++++--------- .../strategies/response/aggregation.py | 6 ++- pyinfra/visitor/utils.py | 4 -- test/config.yaml | 9 ++++ 4 files changed, 37 insertions(+), 26 deletions(-) diff --git a/pyinfra/default_objects.py b/pyinfra/default_objects.py index 76cb7b3..ad699dd 100644 --- a/pyinfra/default_objects.py +++ b/pyinfra/default_objects.py @@ -5,6 +5,7 @@ from funcy import rcompose, omit, merge, lmap, project from pyinfra.config import parse_disjunction_string from pyinfra.exceptions import AnalysisFailure +from pyinfra.file_descriptor_manager import FileDescriptorManager from pyinfra.queue.consumer import Consumer from pyinfra.queue.queue_manager.pika_queue_manager import PikaQueueManager from pyinfra.server.client_pipeline import ClientPipeline @@ -17,7 +18,6 @@ from pyinfra.visitor import QueueVisitor from pyinfra.visitor.response_formatter.formatters.default import DefaultResponseFormatter from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter from pyinfra.visitor.strategies.download.multi import MultiDownloadStrategy -from pyinfra.file_descriptor_manager import FileDescriptorManager from pyinfra.visitor.strategies.download.single import SingleDownloadStrategy from pyinfra.visitor.strategies.response.aggregation import AggregationStorageStrategy @@ -38,6 +38,19 @@ class ComponentFactory: callback = callback or self.get_callback() return Consumer(self.get_visitor(callback), self.get_queue_manager()) + @lru_cache(maxsize=None) + def get_callback(self, analysis_base_url=None): + analysis_base_url = analysis_base_url or self.config.rabbitmq.callback.analysis_endpoint + + callback = Callback(analysis_base_url) + + def wrapped(body): + body_repr = project(body, ["dossierId", "fileId", "pages", "images", "operation"]) + logger.info(f"Processing {body_repr}...") + return callback(body) + + return wrapped + @lru_cache(maxsize=None) def get_visitor(self, callback): return QueueVisitor( @@ -57,21 +70,17 @@ class ComponentFactory: return storages.get_storage(self.config.storage.backend) @lru_cache(maxsize=None) - def get_callback(self, analysis_base_url=None): - analysis_base_url = analysis_base_url or self.config.rabbitmq.callback.analysis_endpoint - - callback = Callback(analysis_base_url) - - def wrapped(body): - body_repr = project(body, ["dossierId", "fileId", "pages", "images", "operation"]) - logger.info(f"Processing {body_repr}...") - return callback(body) - - return wrapped + def get_response_strategy(self, storage=None): + return AggregationStorageStrategy( + storage=storage or self.get_storage(), file_descriptor_manager=self.get_file_descriptor_manager() + ) @lru_cache(maxsize=None) - def get_response_strategy(self, storage=None): - return AggregationStorageStrategy(storage or self.get_storage()) + def get_file_descriptor_manager(self): + return FileDescriptorManager( + bucket_name=parse_disjunction_string(self.config.storage.bucket), + operation2file_patterns=self.get_operation2file_patterns(), + ) @lru_cache(maxsize=None) def get_response_formatter(self): @@ -102,13 +111,6 @@ class ComponentFactory: def get_multi_download_strategy(self): return MultiDownloadStrategy(self.get_file_descriptor_manager()) - @lru_cache(maxsize=None) - def get_file_descriptor_manager(self): - return FileDescriptorManager( - bucket_name=parse_disjunction_string(self.config.storage.bucket), - operation2file_patterns=self.get_operation2file_patterns(), - ) - class Callback: def __init__(self, base_url): diff --git a/pyinfra/visitor/strategies/response/aggregation.py b/pyinfra/visitor/strategies/response/aggregation.py index c0f96af..8494e50 100644 --- a/pyinfra/visitor/strategies/response/aggregation.py +++ b/pyinfra/visitor/strategies/response/aggregation.py @@ -1,9 +1,11 @@ +import json from collections import deque from typing import Callable from funcy import omit, filter, first from more_itertools import peekable +from pyinfra.file_descriptor_manager import FileDescriptorManager from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing from pyinfra.utils.encoding import pack_analysis_payload from pyinfra.visitor.strategies.response.response import ResponseStrategy @@ -17,9 +19,11 @@ def default_merge(items): class AggregationStorageStrategy(ResponseStrategy): - def __init__(self, storage, merger: Callable = None): + def __init__(self, storage, file_descriptor_manager: FileDescriptorManager, merger: Callable = None): self.storage = storage + self.file_descriptor_manager = file_descriptor_manager self.merger = merger or default_merge + self.buffer = deque() self.response_files = deque() diff --git a/pyinfra/visitor/utils.py b/pyinfra/visitor/utils.py index d39525e..1083de5 100644 --- a/pyinfra/visitor/utils.py +++ b/pyinfra/visitor/utils.py @@ -1,7 +1,6 @@ import logging from typing import Dict -from pyinfra.config import CONFIG from pyinfra.exceptions import InvalidStorageItemFormat from pyinfra.server.packing import string_to_bytes @@ -10,9 +9,6 @@ logger = logging.getLogger() def build_storage_upload_info(analysis_payload, request_metadata): storage_upload_info = {**request_metadata, "id": analysis_payload["metadata"].get("id", 0)} - storage_upload_info["fileId"] = build_file_path( - storage_upload_info, storage_upload_info.get("operation", CONFIG.service.response_folder) - ) return storage_upload_info diff --git a/test/config.yaml b/test/config.yaml index 69d72a1..228f397 100644 --- a/test/config.yaml +++ b/test/config.yaml @@ -5,14 +5,23 @@ service: input: subdir: op_inp_files extension: IN.gz + output: + subdir: op_outp_files + extension: OUT.gz single_inp_op: input: subdir: "" extension: IN.gz + output: + subdir: "" + extension: OUT.gz default: input: subdir: "" extension: IN.gz + output: + subdir: "" + extension: OUT.gz storage: minio: