operations section in config cleaned up; added upload formatter

This commit is contained in:
Matthias Bisping 2022-06-23 20:04:10 +02:00
parent c85800aefc
commit 6c024e1a78
4 changed files with 52 additions and 20 deletions

View File

@ -1,10 +1,8 @@
service:
logging_level: $LOGGING_LEVEL_ROOT|DEBUG # Logging level for service logger
name: $SERVICE_NAME|research # Default service name for research service, used for prometheus metric name
# Specifies, how to handle the `page` key of a request. "multi" will download all pages matching the list of pages
# specified in the request
response_formatter: default # TODO: write formatter for analysis tasks that pulls metadata content into root of response json
response_formatter: default # formats analysis payloads of response messages
upload_formatter: projecting # formats analysis payloads of objects uploaded to storage
# Note: This is not really the right place for this. It should be configured on a per-service basis.
operations:
conversion:
@ -12,21 +10,21 @@ service:
subdir: ""
extension: ORIGIN.pdf.gz
output:
subdir: "conversion"
subdir: "pages_as_images"
extension: json.gz
extraction:
input:
subdir: ""
extension: ORIGIN.pdf.gz
output:
subdir: "extraction"
subdir: "extracted_images"
extension: json.gz
table_parsing:
input:
subdir: "conversion"
subdir: "pages_as_images"
extension: json.gz
output:
subdir: "table_new"
subdir: "table_parses"
extension: json.gz
default:
input:

View File

@ -1,7 +1,7 @@
import logging
from functools import lru_cache
from funcy import rcompose, omit, merge, lmap, project
from funcy import rcompose, omit, merge, lmap, project, identity
from pyinfra.config import parse_disjunction_string
from pyinfra.exceptions import AnalysisFailure
@ -18,7 +18,10 @@ from pyinfra.visitor import QueueVisitor
from pyinfra.visitor.downloader import Downloader
from pyinfra.visitor.response_formatter.formatters.default import DefaultResponseFormatter
from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter
from pyinfra.visitor.strategies.response.aggregation import AggregationStorageStrategy
from pyinfra.visitor.strategies.response.aggregation import (
AggregationStorageStrategy,
ProjectingUploadFormatter,
)
logger = logging.getLogger(__name__)
@ -70,7 +73,9 @@ class ComponentFactory:
@lru_cache(maxsize=None)
def get_response_strategy(self, storage=None):
return AggregationStorageStrategy(
storage=storage or self.get_storage(), file_descriptor_manager=self.get_file_descriptor_manager()
storage=storage or self.get_storage(),
file_descriptor_manager=self.get_file_descriptor_manager(),
upload_formatter=self.get_upload_formatter(),
)
@lru_cache(maxsize=None)
@ -80,6 +85,10 @@ class ComponentFactory:
operation2file_patterns=self.get_operation2file_patterns(),
)
@lru_cache(maxsize=None)
def get_upload_formatter(self):
return {"identity": identity, "projecting": ProjectingUploadFormatter()}[self.config.service.upload_formatter]
@lru_cache(maxsize=None)
def get_response_formatter(self):
return {"default": DefaultResponseFormatter(), "identity": IdentityResponseFormatter()}[

View File

@ -1,7 +1,8 @@
import abc
from collections import deque
from typing import Callable
from funcy import omit, filter, first
from funcy import omit, filter, first, lpluck, identity
from more_itertools import peekable
from pyinfra.file_descriptor_manager import FileDescriptorManager
@ -10,17 +11,39 @@ from pyinfra.utils.encoding import pack_analysis_payload
from pyinfra.visitor.strategies.response.response import ResponseStrategy
def default_merge(items):
merged = list(items)
merged = first(merged) if len(merged) == 1 else merged
return merged
class UploadFormatter(abc.ABC):
@abc.abstractmethod
def format(self, items):
raise NotImplementedError
def __call__(self, items):
return self.format(items)
class ProjectingUploadFormatter(UploadFormatter):
def format(self, items):
head = first(items)
if head["data"]:
assert len(items) == 1
return head
else:
items = lpluck("metadata", items)
return items
class AggregationStorageStrategy(ResponseStrategy):
def __init__(self, storage, file_descriptor_manager: FileDescriptorManager, merger: Callable = None):
def __init__(
self,
storage,
file_descriptor_manager: FileDescriptorManager,
merger: Callable = list,
upload_formatter: UploadFormatter = identity,
):
self.storage = storage
self.file_descriptor_manager = file_descriptor_manager
self.merger = merger or default_merge
self.merger = merger
self.upload_formatter = upload_formatter
self.buffer = deque()
self.response_files = deque()
@ -50,7 +73,8 @@ class AggregationStorageStrategy(ResponseStrategy):
self.buffer.append({**analysis_payload, "metadata": omit(analysis_payload["metadata"], ["id"])})
def upload_aggregated_items(self, object_descriptor):
self.upload_item(self.merge_queue_items(), object_descriptor)
items_to_upload = self.upload_formatter(self.merge_queue_items())
self.upload_item(items_to_upload, object_descriptor)
def build_response_message(self, storage_upload_info):

View File

@ -6,7 +6,7 @@ from operator import itemgetter
import pytest
from funcy import compose, first, second, pluck, lflatten, lzip
from pyinfra.config import CONFIG, DotIndexable
from pyinfra.config import CONFIG
from pyinfra.default_objects import get_component_factory
from pyinfra.server.packing import unpack, pack
from pyinfra.utils.encoding import compress, decompress
@ -209,6 +209,7 @@ def real_components(url):
CONFIG["service"]["operations"] = TEST_CONFIG.service.operations
CONFIG["service"]["response_formatter"] = TEST_CONFIG.service.response_formatter
CONFIG["service"]["upload_formatter"] = "identity"
component_factory = get_component_factory(CONFIG)