file pattern matching WIP: using of different operations to select file pattern config section -- semi-hardcoded multi input file operation. next: refactor fixtures in order to de-hardcode

This commit is contained in:
Matthias Bisping 2022-06-22 11:18:55 +02:00
parent 5728961b26
commit af88e2d7d4
10 changed files with 122 additions and 58 deletions

View File

@ -80,11 +80,15 @@ class ComponentFactory:
def get_operation2file_patterns(self):
return self.config.service.operations
@lru_cache(maxsize=None)
def get_file_descriptor_manager(self):
return FileDescriptorManager(self.get_operation2file_patterns())
@lru_cache(maxsize=None)
def get_download_strategy(self, download_strategy_type=None):
download_strategies = {
"single": SingleDownloadStrategy(),
"multi": MultiDownloadStrategy(FileDescriptorManager(self.get_operation2file_patterns())),
"multi": MultiDownloadStrategy(self.get_file_descriptor_manager()),
}
return download_strategies.get(
download_strategy_type or self.config.service.download_strategy, SingleDownloadStrategy()

View File

@ -1,5 +1,5 @@
from functools import singledispatch
from typing import Dict
from typing import Dict, Union, Callable
from flask import Flask, jsonify, request
from funcy import merge
@ -10,41 +10,74 @@ from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
from pyinfra.server.stream.rest import LazyRestProcessor
def set_up_processing_server(server_stream_function, buffer_size):
flat_stream_buffer = FlatStreamBuffer(server_stream_function, buffer_size=buffer_size)
queued_stream_function = QueuedStreamFunction(flat_stream_buffer)
return __set_up_processing_server(queued_stream_function)
def build_endpoint_suffixes(op: str):
return {"submit_suffix": submit_suffix(op), "pickup_suffix": pickup_suffix(op)}
def submit_suffix(op: str):
return "submit" if not op else op
def pickup_suffix(op: str):
return "pickup" if not op else f"{op}_pickup"
@singledispatch
def __set_up_processing_server(arg):
def set_up_processing_server(arg: Union[dict, Callable], buffer_size=1):
"""Produces a processing server given a streamable function or a mapping from operations to streamable functions.
Streamable functions are constructed by calling pyinfra.server.utils.make_streamable_and_wrap_in_packing_logic on a
function taking a tuple of data and metadata and also returning a tuple or yielding tuples of data and metadata.
If the function doesn't produce data, data should be an empty byte string.
If the function doesn't produce metadata, metadata should be an empty dictionary.
Args:
arg: streamable function or mapping of operations: str to streamable functions
buffer_size: If your function operates on batches this parameter controls how many items are aggregated before
your function is applied.
TODO: buffer_size has to be controllable on per function basis.
Returns:
Processing server: flask app
"""
pass
@__set_up_processing_server.register
def _(operation2function: dict):
return __set_up_processing_server_impl(operation2function)
@set_up_processing_server.register
def _(operation2stream_fn: dict, buffer_size=1):
return __stream_fn_to_processing_server(operation2stream_fn, buffer_size)
@__set_up_processing_server.register
def _(queued_stream_function: object):
operation2function = {None: queued_stream_function}
return __set_up_processing_server_impl(operation2function)
@set_up_processing_server.register
def _(stream_fn: object, buffer_size=1):
operation2stream_fn = {None: stream_fn}
return __stream_fn_to_processing_server(operation2stream_fn, buffer_size)
def __set_up_processing_server_impl(operation2function: Dict[str, QueuedStreamFunction]):
def __stream_fn_to_processing_server(operation2stream_fn: dict, buffer_size):
operation2stream_fn = {
op: QueuedStreamFunction(FlatStreamBuffer(fn, buffer_size)) for op, fn in operation2stream_fn.items()
}
return __set_up_processing_server(operation2stream_fn)
#
#
# def set_up_processing_server(server_stream_function, buffer_size):
# flat_stream_buffer = FlatStreamBuffer(server_stream_function, buffer_size=buffer_size)
# queued_stream_function = QueuedStreamFunction(flat_stream_buffer)
# return __set_up_processing_server(queued_stream_function)
#
#
# @singledispatch
# def __set_up_processing_server(arg):
# pass
#
#
# @__set_up_processing_server.register
# def _(operation2function: dict):
# return __set_up_processing_server_impl(operation2function)
#
#
# @__set_up_processing_server.register
# def _(queued_stream_function: object):
# operation2function = {None: queued_stream_function}
# return __set_up_processing_server_impl(operation2function)
def __set_up_processing_server(operation2function: Dict[str, QueuedStreamFunction]):
app = Flask(__name__)
registry = CollectorRegistry(auto_describe=True)
@ -88,3 +121,15 @@ def __set_up_processing_server_impl(operation2function: Dict[str, QueuedStreamFu
return operation2processor[operation].pop()
return app
def build_endpoint_suffixes(op: str):
return {"submit_suffix": submit_suffix(op), "pickup_suffix": pickup_suffix(op)}
def submit_suffix(op: str):
return "submit" if not op else op
def pickup_suffix(op: str):
return "pickup" if not op else f"{op}_pickup"

View File

@ -21,7 +21,7 @@ class FileDescriptorManager:
file_descriptor = self.build_file_descriptor(queue_item_body)
file_descriptor["pages"] = [queue_item_body.get("id", 0)]
object_name = self.build_matcher(file_descriptor)
object_name = self.__build_matcher(file_descriptor)
return object_name
@ -39,8 +39,10 @@ class FileDescriptorManager:
return file_descriptor
def build_matcher(self, queue_item_body):
file_descriptor = self.build_file_descriptor(queue_item_body)
return self.__build_matcher(file_descriptor)
def __build_matcher(self, file_descriptor):
dossier_id, file_id, subdir, pages, extension = itemgetter(
"dossierId", "fileId", "subdir", "pages", "extension"
@ -57,6 +59,12 @@ class FileDescriptorManager:
return matcher
def get_object_descriptor(self, queue_item_body):
return {
"bucket_name": parse_disjunction_string(CONFIG.storage.bucket),
"object_name": self.get_object_name(queue_item_body),
}
class MultiDownloadStrategy:
def __init__(self, file_descriptor_manager: FileDescriptorManager):
@ -83,9 +91,3 @@ class MultiDownloadStrategy:
objects = self.download_and_decompress_object(storage, page_object_names)
return objects
def get_object_descriptor(self, body):
return {
"bucket_name": parse_disjunction_string(CONFIG.storage.bucket),
"object_name": self.file_descriptor_manager.get_object_name(body),
}

View File

@ -4,8 +4,6 @@ from typing import Dict
from pyinfra.config import CONFIG
from pyinfra.exceptions import InvalidStorageItemFormat
from pyinfra.server.packing import string_to_bytes
from pyinfra.visitor.strategies.download.multi import MultiDownloadStrategy
from pyinfra.visitor.strategies.download.single import SingleDownloadStrategy
logger = logging.getLogger()

View File

@ -2,7 +2,6 @@ from typing import Callable
from funcy import lflatten, compose
from pyinfra.server.debugging import inspect
from pyinfra.storage.storage import Storage
from pyinfra.utils.func import lift
from pyinfra.visitor.response_formatter.formatter import ResponseFormatter

View File

@ -1,9 +1,9 @@
service:
response_formatter: identity
operations:
default:
multi_inp_op:
input:
subdir: pages
subdir: op_inp_files
extension: IN.gz
storage:
@ -32,4 +32,4 @@ webserver:
mock_analysis_endpoint: "http://127.0.0.1:5000"
use_docker_fixture: 0
logging: 0
logging: 1

View File

@ -141,7 +141,10 @@ def images_to_bytes(images):
@pytest.fixture
def metadata(n_items):
def metadata(n_items, many_to_n):
"""storage metadata
TODO: rename
"""
return list(repeat({"key": "value"}, times=n_items))

View File

@ -46,8 +46,13 @@ def url(host, port):
@pytest.fixture
def server(server_stream_function, buffer_size):
return set_up_processing_server(server_stream_function, buffer_size)
def server(server_stream_function, buffer_size, operation_name):
return set_up_processing_server({operation_name: server_stream_function}, buffer_size)
@pytest.fixture
def operation_name(many_to_n):
return "multi_inp_op" if many_to_n else "single_inp_op"
@pytest.fixture

View File

@ -87,7 +87,7 @@ from test.utils.input import pair_data_with_queue_message
)
def test_serving(server_process, bucket_name, components, targets, data_message_pairs, n_items, many_to_n):
storage, queue_manager, consumer, download_strategy = components
storage, queue_manager, consumer, download_strategy, file_descriptor_manager = components
assert queue_manager.input_queue.to_list() == []
assert queue_manager.output_queue.to_list() == []
@ -98,11 +98,11 @@ def test_serving(server_process, bucket_name, components, targets, data_message_
if many_to_n:
upload_data_to_folder_in_storage_and_publish_single_request_to_queue(
storage, queue_manager, data_message_pairs, download_strategy
storage, queue_manager, data_message_pairs, file_descriptor_manager
)
else:
upload_data_to_storage_and_publish_requests_to_queue(
storage, queue_manager, data_message_pairs, download_strategy
storage, queue_manager, data_message_pairs, file_descriptor_manager
)
consumer.consume_and_publish(n=int(many_to_n) or n_items)
@ -125,20 +125,24 @@ def data_message_pairs(data_metadata_packs):
# TODO: refactor; too many params
def upload_data_to_storage_and_publish_requests_to_queue(storage, queue_manager, data_message_pairs, download_strategy):
def upload_data_to_storage_and_publish_requests_to_queue(
storage, queue_manager, data_message_pairs, file_descriptor_manager
):
for data, message in data_message_pairs:
upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message, download_strategy)
upload_data_to_storage_and_publish_request_to_queue(
storage, queue_manager, data, message, file_descriptor_manager
)
# TODO: refactor; too many params
def upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message, download_strategy):
storage.put_object(**download_strategy.get_object_descriptor(message), data=compress(data))
def upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message, file_descriptor_manager):
storage.put_object(**file_descriptor_manager.get_object_descriptor(message), data=compress(data))
queue_manager.publish_request(message)
# TODO: refactor body; too long and scripty
def upload_data_to_folder_in_storage_and_publish_single_request_to_queue(
storage, queue_manager, data_message_pairs, download_strategy
storage, queue_manager, data_message_pairs, file_descriptor_manager
):
assert data_message_pairs
@ -146,7 +150,7 @@ def upload_data_to_folder_in_storage_and_publish_single_request_to_queue(
pages = ref_message["pages"]
for data, page in zip(map(first, data_message_pairs), pages):
object_descriptor = download_strategy.get_object_descriptor(ref_message)
object_descriptor = file_descriptor_manager.get_object_descriptor(ref_message)
object_descriptor["object_name"] = build_filepath(object_descriptor, page)
storage.put_object(**object_descriptor, data=compress(data))
@ -179,13 +183,13 @@ def components(components_type, real_components, test_components, bucket_name):
else:
raise ValueError(f"Unknown component type '{components_type}'.")
storage, queue_manager, consumer, download_strategy = components
storage, queue_manager, consumer, download_strategy, file_descriptor_manager = components
queue_manager.clear()
storage.make_bucket(bucket_name)
storage.clear_bucket(bucket_name)
yield storage, queue_manager, consumer, download_strategy
yield storage, queue_manager, consumer, download_strategy, file_descriptor_manager
queue_manager.clear()
# storage.clear_bucket(bucket_name)
@ -216,11 +220,12 @@ def real_components(url, many_to_n):
consumer = component_factory.get_consumer(callback)
queue_manager = component_factory.get_queue_manager()
storage = component_factory.get_storage()
file_descriptor_manager = component_factory.get_file_descriptor_manager()
download_strategy = component_factory.get_download_strategy("multi" if many_to_n else "single")
consumer.visitor.download_strategy = download_strategy
return storage, queue_manager, consumer, download_strategy
return storage, queue_manager, consumer, download_strategy, file_descriptor_manager
@pytest.fixture
@ -229,6 +234,7 @@ def test_components(url, queue_manager, storage, many_to_n):
component_factory = ComponentFactory(CONFIG)
download_strategy = component_factory.get_download_strategy("multi" if many_to_n else "single")
file_descriptor_manager = component_factory.get_file_descriptor_manager()
visitor = QueueVisitor(
storage=storage,
@ -238,4 +244,4 @@ def test_components(url, queue_manager, storage, many_to_n):
)
consumer = Consumer(visitor, queue_manager)
return storage, queue_manager, consumer, download_strategy
return storage, queue_manager, consumer, download_strategy, file_descriptor_manager

View File

@ -1,13 +1,15 @@
from typing import Iterable
# TODO: make into fixture
def pair_data_with_queue_message(data: Iterable[bytes]):
def inner():
for i, d in enumerate(data):
body = {
"dossierId": "folder",
"fileId": f"file{i}",
"pages": [0, 2, 3]
"pages": [0, 2, 3],
"operation": "multi_inp_op", # TODO: de-hardcode
}
yield d, body