refactoring; removed operation / response folder from output path

This commit is contained in:
Matthias Bisping 2022-06-23 12:00:48 +02:00
parent 8e6cbdaf23
commit 7e48c66f0c
4 changed files with 37 additions and 26 deletions

View File

@ -5,6 +5,7 @@ from funcy import rcompose, omit, merge, lmap, project
from pyinfra.config import parse_disjunction_string
from pyinfra.exceptions import AnalysisFailure
from pyinfra.file_descriptor_manager import FileDescriptorManager
from pyinfra.queue.consumer import Consumer
from pyinfra.queue.queue_manager.pika_queue_manager import PikaQueueManager
from pyinfra.server.client_pipeline import ClientPipeline
@ -17,7 +18,6 @@ from pyinfra.visitor import QueueVisitor
from pyinfra.visitor.response_formatter.formatters.default import DefaultResponseFormatter
from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter
from pyinfra.visitor.strategies.download.multi import MultiDownloadStrategy
from pyinfra.file_descriptor_manager import FileDescriptorManager
from pyinfra.visitor.strategies.download.single import SingleDownloadStrategy
from pyinfra.visitor.strategies.response.aggregation import AggregationStorageStrategy
@ -38,6 +38,19 @@ class ComponentFactory:
callback = callback or self.get_callback()
return Consumer(self.get_visitor(callback), self.get_queue_manager())
@lru_cache(maxsize=None)
def get_callback(self, analysis_base_url=None):
analysis_base_url = analysis_base_url or self.config.rabbitmq.callback.analysis_endpoint
callback = Callback(analysis_base_url)
def wrapped(body):
body_repr = project(body, ["dossierId", "fileId", "pages", "images", "operation"])
logger.info(f"Processing {body_repr}...")
return callback(body)
return wrapped
@lru_cache(maxsize=None)
def get_visitor(self, callback):
return QueueVisitor(
@ -57,21 +70,17 @@ class ComponentFactory:
return storages.get_storage(self.config.storage.backend)
@lru_cache(maxsize=None)
def get_callback(self, analysis_base_url=None):
analysis_base_url = analysis_base_url or self.config.rabbitmq.callback.analysis_endpoint
callback = Callback(analysis_base_url)
def wrapped(body):
body_repr = project(body, ["dossierId", "fileId", "pages", "images", "operation"])
logger.info(f"Processing {body_repr}...")
return callback(body)
return wrapped
def get_response_strategy(self, storage=None):
return AggregationStorageStrategy(
storage=storage or self.get_storage(), file_descriptor_manager=self.get_file_descriptor_manager()
)
@lru_cache(maxsize=None)
def get_response_strategy(self, storage=None):
return AggregationStorageStrategy(storage or self.get_storage())
def get_file_descriptor_manager(self):
return FileDescriptorManager(
bucket_name=parse_disjunction_string(self.config.storage.bucket),
operation2file_patterns=self.get_operation2file_patterns(),
)
@lru_cache(maxsize=None)
def get_response_formatter(self):
@ -102,13 +111,6 @@ class ComponentFactory:
def get_multi_download_strategy(self):
return MultiDownloadStrategy(self.get_file_descriptor_manager())
@lru_cache(maxsize=None)
def get_file_descriptor_manager(self):
return FileDescriptorManager(
bucket_name=parse_disjunction_string(self.config.storage.bucket),
operation2file_patterns=self.get_operation2file_patterns(),
)
class Callback:
def __init__(self, base_url):

View File

@ -1,9 +1,11 @@
import json
from collections import deque
from typing import Callable
from funcy import omit, filter, first
from more_itertools import peekable
from pyinfra.file_descriptor_manager import FileDescriptorManager
from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing
from pyinfra.utils.encoding import pack_analysis_payload
from pyinfra.visitor.strategies.response.response import ResponseStrategy
@ -17,9 +19,11 @@ def default_merge(items):
class AggregationStorageStrategy(ResponseStrategy):
def __init__(self, storage, merger: Callable = None):
def __init__(self, storage, file_descriptor_manager: FileDescriptorManager, merger: Callable = None):
self.storage = storage
self.file_descriptor_manager = file_descriptor_manager
self.merger = merger or default_merge
self.buffer = deque()
self.response_files = deque()

View File

@ -1,7 +1,6 @@
import logging
from typing import Dict
from pyinfra.config import CONFIG
from pyinfra.exceptions import InvalidStorageItemFormat
from pyinfra.server.packing import string_to_bytes
@ -10,9 +9,6 @@ logger = logging.getLogger()
def build_storage_upload_info(analysis_payload, request_metadata):
storage_upload_info = {**request_metadata, "id": analysis_payload["metadata"].get("id", 0)}
storage_upload_info["fileId"] = build_file_path(
storage_upload_info, storage_upload_info.get("operation", CONFIG.service.response_folder)
)
return storage_upload_info

View File

@ -5,14 +5,23 @@ service:
input:
subdir: op_inp_files
extension: IN.gz
output:
subdir: op_outp_files
extension: OUT.gz
single_inp_op:
input:
subdir: ""
extension: IN.gz
output:
subdir: ""
extension: OUT.gz
default:
input:
subdir: ""
extension: IN.gz
output:
subdir: ""
extension: OUT.gz
storage:
minio: