refactoring: de-hardcorded operation by refactoring queue message metadata fixture

This commit is contained in:
Matthias Bisping 2022-06-22 12:00:42 +02:00
parent af88e2d7d4
commit 23876501dc
6 changed files with 33 additions and 52 deletions

View File

@ -5,6 +5,10 @@ service:
input:
subdir: op_inp_files
extension: IN.gz
single_inp_op:
input:
subdir: ""
extension: IN.gz
storage:
minio:

View File

@ -1,22 +1,8 @@
from _operator import itemgetter
import pytest
from pyinfra.queue.consumer import Consumer
from test.utils.input import pair_data_with_queue_message
@pytest.fixture(scope="session")
def consumer(queue_manager, callback):
return Consumer(callback, queue_manager)
@pytest.fixture(scope="session")
def access_callback():
return itemgetter("fileId")
@pytest.fixture()
def items():
numbers = [f"{i}".encode() for i in range(3)]
return pair_data_with_queue_message(numbers)

View File

@ -11,7 +11,6 @@ from pyinfra.server.normalization import normalize_item
from pyinfra.server.packing import pack, unpack
from pyinfra.utils.func import star, lift, lstarlift
from test.utils.image import image_to_bytes
from test.utils.input import pair_data_with_queue_message
from test.utils.pdf import pdf_stream
@ -74,7 +73,7 @@ def strings_to_bytes(strings):
@pytest.fixture
def targets(data_message_pairs, input_data_items, operation, metadata, server_side_test):
def targets(data_message_pairs, input_data_items, operation, metadata, server_side_test, queue_message_metadata):
"""TODO: this has become super wonky"""
metadata = [{**m1, **m2} for m1, m2 in zip(lmap(second, data_message_pairs), metadata)]
@ -87,7 +86,7 @@ def targets(data_message_pairs, input_data_items, operation, metadata, server_si
response_data, response_metadata = zip(*map(unpack, flatten(starmap(op, zip(input_data_items, metadata)))))
queue_message_keys = ["id"] * (not server_side_test) + [
*second(first(pair_data_with_queue_message([b""]))).keys()
*first(queue_message_metadata).keys()
]
response_metadata = lmap(partial(omit, keys=queue_message_keys), response_metadata)
expected = lzip(response_data, response_metadata)
@ -99,8 +98,8 @@ def targets(data_message_pairs, input_data_items, operation, metadata, server_si
@pytest.fixture
def endpoint(url):
return f"{url}/submit"
def endpoint(url, operation_name):
return f"{url}/{operation_name}"
@pytest.fixture(params=["rest", "basic"])
@ -148,6 +147,19 @@ def metadata(n_items, many_to_n):
return list(repeat({"key": "value"}, times=n_items))
@pytest.fixture
def queue_message_metadata(n_items, operation_name):
def metadata(i):
return {
"dossierId": "folder",
"fileId": f"file{i}",
"pages": [0, 2, 3],
"operation": operation_name,
}
return lmap(metadata, range(n_items))
@pytest.fixture
def packages(input_data_items, metadata):
return lstarlift(pack)(zip(input_data_items, metadata))

View File

@ -4,7 +4,7 @@ from itertools import starmap, repeat, chain
from operator import itemgetter
import pytest
from funcy import compose, first, second, pluck, lflatten
from funcy import compose, first, second, pluck, lflatten, lzip
from pyinfra.config import CONFIG
from pyinfra.default_objects import ComponentFactory, get_component_factory
@ -13,7 +13,6 @@ from pyinfra.server.packing import unpack, pack
from pyinfra.utils.encoding import compress, decompress
from pyinfra.visitor import QueueVisitor
from test.config import CONFIG as TEST_CONFIG
from test.utils.input import pair_data_with_queue_message
@pytest.mark.parametrize(
@ -119,9 +118,8 @@ def data_metadata_packs(input_data_items, metadata):
@pytest.fixture
def data_message_pairs(data_metadata_packs):
data_message_pairs = pair_data_with_queue_message(data_metadata_packs)
return data_message_pairs
def data_message_pairs(data_metadata_packs, queue_message_metadata):
return lzip(data_metadata_packs, queue_message_metadata)
# TODO: refactor; too many params

View File

@ -1,5 +1,5 @@
import pytest
from funcy import rcompose, compose, project, second, merge
from funcy import rcompose, compose, project, second, merge, lpluck
from pyinfra.server.buffering.stream import FlatStreamBuffer
from pyinfra.server.client_pipeline import ClientPipeline
@ -15,14 +15,13 @@ from pyinfra.server.receiver.receivers.rest import RestReceiver
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
from pyinfra.server.utils import make_streamable_and_wrap_in_packing_logic
from pyinfra.utils.func import lift, llift
from test.utils.input import pair_data_with_queue_message
def test_mock_pipeline():
data = [1, 2, 3]
f, g, h, u = map(lift, [lambda x: x**2, lambda x: x + 2, lambda x: x / 2, lambda x: x])
f, g, h, u = map(lift, [lambda x: x ** 2, lambda x: x + 2, lambda x: x / 2, lambda x: x])
pipeline = ClientPipeline(f, g, h, u)
@ -31,23 +30,21 @@ def test_mock_pipeline():
@pytest.mark.parametrize("client_pipeline_type", ["basic", "rest"])
@pytest.mark.parametrize("server_side_test", [True])
def test_pipeline(core_operation, client_pipeline, input_data_items, metadata, targets, n_items):
def test_pipeline(
core_operation, client_pipeline, input_data_items, metadata, queue_message_metadata, targets, n_items
):
assert core_operation is not Nothing
metadata = add_queue_metadata_to_server_side_metadata(metadata)
metadata = [
merge(storage_mdt, project(queue_mdt, ["operation", "pages"]))
for storage_mdt, queue_mdt in zip(metadata, queue_message_metadata)
]
output = compose(llift(unpack), client_pipeline)(input_data_items, metadata)
assert n_items == 0 or len(output) > 0
assert output == targets
def add_queue_metadata_to_server_side_metadata(metadata):
return [
merge(project(second(mdt), [*mdt_o.keys(), "pages"]), mdt_o)
for mdt, mdt_o in zip(pair_data_with_queue_message(metadata), metadata)
]
@pytest.mark.parametrize("item_type", ["string"])
@pytest.mark.parametrize("n_items", [1])
def test_pipeline_is_lazy(input_data_items, metadata, basic_client_pipeline, buffer_size):

View File

@ -1,16 +0,0 @@
from typing import Iterable
# TODO: make into fixture
def pair_data_with_queue_message(data: Iterable[bytes]):
def inner():
for i, d in enumerate(data):
body = {
"dossierId": "folder",
"fileId": f"file{i}",
"pages": [0, 2, 3],
"operation": "multi_inp_op", # TODO: de-hardcode
}
yield d, body
return list(inner())