diff --git a/config.yaml b/config.yaml index a574e3e..3cd4291 100755 --- a/config.yaml +++ b/config.yaml @@ -1,10 +1,8 @@ service: logging_level: $LOGGING_LEVEL_ROOT|DEBUG # Logging level for service logger name: $SERVICE_NAME|research # Default service name for research service, used for prometheus metric name - - # Specifies, how to handle the `page` key of a request. "multi" will download all pages matching the list of pages - # specified in the request - response_formatter: default # TODO: write formatter for analysis tasks that pulls metadata content into root of response json + response_formatter: default # formats analysis payloads of response messages + upload_formatter: projecting # formats analysis payloads of objects uploaded to storage # Note: This is not really the right place for this. It should be configured on a per-service basis. operations: conversion: @@ -12,21 +10,21 @@ service: subdir: "" extension: ORIGIN.pdf.gz output: - subdir: "conversion" + subdir: "pages_as_images" extension: json.gz extraction: input: subdir: "" extension: ORIGIN.pdf.gz output: - subdir: "extraction" + subdir: "extracted_images" extension: json.gz table_parsing: input: - subdir: "conversion" + subdir: "pages_as_images" extension: json.gz output: - subdir: "table_new" + subdir: "table_parses" extension: json.gz default: input: diff --git a/pyinfra/default_objects.py b/pyinfra/default_objects.py index df7b54b..c1d671e 100644 --- a/pyinfra/default_objects.py +++ b/pyinfra/default_objects.py @@ -1,7 +1,7 @@ import logging from functools import lru_cache -from funcy import rcompose, omit, merge, lmap, project +from funcy import rcompose, omit, merge, lmap, project, identity from pyinfra.config import parse_disjunction_string from pyinfra.exceptions import AnalysisFailure @@ -18,7 +18,10 @@ from pyinfra.visitor import QueueVisitor from pyinfra.visitor.downloader import Downloader from pyinfra.visitor.response_formatter.formatters.default import DefaultResponseFormatter from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter -from pyinfra.visitor.strategies.response.aggregation import AggregationStorageStrategy +from pyinfra.visitor.strategies.response.aggregation import ( + AggregationStorageStrategy, + ProjectingUploadFormatter, +) logger = logging.getLogger(__name__) @@ -70,7 +73,9 @@ class ComponentFactory: @lru_cache(maxsize=None) def get_response_strategy(self, storage=None): return AggregationStorageStrategy( - storage=storage or self.get_storage(), file_descriptor_manager=self.get_file_descriptor_manager() + storage=storage or self.get_storage(), + file_descriptor_manager=self.get_file_descriptor_manager(), + upload_formatter=self.get_upload_formatter(), ) @lru_cache(maxsize=None) @@ -80,6 +85,10 @@ class ComponentFactory: operation2file_patterns=self.get_operation2file_patterns(), ) + @lru_cache(maxsize=None) + def get_upload_formatter(self): + return {"identity": identity, "projecting": ProjectingUploadFormatter()}[self.config.service.upload_formatter] + @lru_cache(maxsize=None) def get_response_formatter(self): return {"default": DefaultResponseFormatter(), "identity": IdentityResponseFormatter()}[ diff --git a/pyinfra/visitor/strategies/response/aggregation.py b/pyinfra/visitor/strategies/response/aggregation.py index a8e4838..b8ac6a7 100644 --- a/pyinfra/visitor/strategies/response/aggregation.py +++ b/pyinfra/visitor/strategies/response/aggregation.py @@ -1,7 +1,8 @@ +import abc from collections import deque from typing import Callable -from funcy import omit, filter, first +from funcy import omit, filter, first, lpluck, identity from more_itertools import peekable from pyinfra.file_descriptor_manager import FileDescriptorManager @@ -10,17 +11,39 @@ from pyinfra.utils.encoding import pack_analysis_payload from pyinfra.visitor.strategies.response.response import ResponseStrategy -def default_merge(items): - merged = list(items) - merged = first(merged) if len(merged) == 1 else merged - return merged +class UploadFormatter(abc.ABC): + @abc.abstractmethod + def format(self, items): + raise NotImplementedError + + def __call__(self, items): + return self.format(items) + + +class ProjectingUploadFormatter(UploadFormatter): + def format(self, items): + + head = first(items) + if head["data"]: + assert len(items) == 1 + return head + else: + items = lpluck("metadata", items) + return items class AggregationStorageStrategy(ResponseStrategy): - def __init__(self, storage, file_descriptor_manager: FileDescriptorManager, merger: Callable = None): + def __init__( + self, + storage, + file_descriptor_manager: FileDescriptorManager, + merger: Callable = list, + upload_formatter: UploadFormatter = identity, + ): self.storage = storage self.file_descriptor_manager = file_descriptor_manager - self.merger = merger or default_merge + self.merger = merger + self.upload_formatter = upload_formatter self.buffer = deque() self.response_files = deque() @@ -50,7 +73,8 @@ class AggregationStorageStrategy(ResponseStrategy): self.buffer.append({**analysis_payload, "metadata": omit(analysis_payload["metadata"], ["id"])}) def upload_aggregated_items(self, object_descriptor): - self.upload_item(self.merge_queue_items(), object_descriptor) + items_to_upload = self.upload_formatter(self.merge_queue_items()) + self.upload_item(items_to_upload, object_descriptor) def build_response_message(self, storage_upload_info): diff --git a/test/integration_tests/serve_test.py b/test/integration_tests/serve_test.py index dc5c8af..6ce0647 100644 --- a/test/integration_tests/serve_test.py +++ b/test/integration_tests/serve_test.py @@ -6,7 +6,7 @@ from operator import itemgetter import pytest from funcy import compose, first, second, pluck, lflatten, lzip -from pyinfra.config import CONFIG, DotIndexable +from pyinfra.config import CONFIG from pyinfra.default_objects import get_component_factory from pyinfra.server.packing import unpack, pack from pyinfra.utils.encoding import compress, decompress @@ -209,6 +209,7 @@ def real_components(url): CONFIG["service"]["operations"] = TEST_CONFIG.service.operations CONFIG["service"]["response_formatter"] = TEST_CONFIG.service.response_formatter + CONFIG["service"]["upload_formatter"] = "identity" component_factory = get_component_factory(CONFIG)