pyinfra/test/unit_tests/server/pipeline_test.py
Julius Unverfehrt a1bfec765c Pull request #43: Image prediction v2 support
Merge in RR/pyinfra from image-prediction-v2-support to 2.0.0

Squashed commit of the following:

commit 37c536324e847357e86dd9b72d1e07ad792ed90f
Merge: 77d1db8 01bfb1d
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Mon Jul 11 13:53:56 2022 +0200

    Merge branch '2.0.0' of ssh://git.iqser.com:2222/rr/pyinfra into image-prediction-v2-support

commit 77d1db8e8630de8822c124eb39f4cd817ed1d3e1
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Mon Jul 11 13:07:41 2022 +0200

    add operation assignment via config if operation is not defined by caller

commit 36c8ca48a8c6151f713c093a23de110901ba6b02
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Mon Jul 11 10:33:34 2022 +0200

    refactor nothing part 2

commit f6cd0ef986802554dd544b9b7a24073d3b3f05b5
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Mon Jul 11 10:28:49 2022 +0200

    refactor nothing

commit 1e70d49531e89613c70903be49290b94ee014f65
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jul 6 17:42:12 2022 +0200

    enable docker-compose fixture

commit 9fee32cecdd120cfac3e065fb8ad2b4f37b49226
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jul 6 17:40:35 2022 +0200

    added 'multi' key to actual operation configurations

commit 4287f6d9878dd361489b8490eafd06f81df472ce
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jul 6 16:56:12 2022 +0200

    removed debug prints

commit 23a533e8f99222c7e598fb0864f65e9aa3508a3b
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jul 6 16:31:50 2022 +0200

    completed correcting / cleaning upload and download logic with regard to operations and ids. next: remove debug code

commit 33246d1ff94989d2ea70242c7ae2e58afa4d35c1
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jul 6 14:37:17 2022 +0200

    corrected / cleaned upload and download logic with regard to operations and ids

commit 7f2b4e882022c6843cb2f80df202caa495c54ee9
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Tue Jul 5 18:41:07 2022 +0200

    partially decomplected file descriptor manager from concrete and non-generic descriptor code

commit 40b892da17670dae3b8eba1700877c1dcf219852
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Tue Jul 5 09:53:46 2022 +0200

    typo

commit ec4fa8e6f4551ff1f8d4f78c484b7a260f274898
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Tue Jul 5 09:52:41 2022 +0200

    typo

commit 701b43403c328161fd96a73ce388a66035cca348
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jul 4 17:26:53 2022 +0200

    made adjustments for image classification with pyinfra 2.x; added related fixmes

commit 7a794bdcc987631cdc4d89b5620359464e2e018e
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jul 4 13:05:26 2022 +0200

    removed obsolete imports

commit 3fc6a7ef5d0172dbce1c4292d245eced2f378b5a
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jul 4 11:47:12 2022 +0200

    enable docker-compose fixture

commit 36d8d3bc851b06d94cf12a73048a00a67ef79c42
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jul 4 11:46:53 2022 +0200

    renaming

commit 3bf00d11cd041dff325b66f13fcd00d3ce96b8b5
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 30 12:47:57 2022 +0200

    refactoring: added cached pipeline factory

commit 90e735852af2f86e35be845fabf28494de952edb
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jun 29 13:47:08 2022 +0200

    renaming

commit 93b3d4b202b41183ed8cabe193a4bfa03f520787
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jun 29 13:25:03 2022 +0200

    further refactored server setup code: moving and decomplecting

commit 8b2ed83c7ade5bd811cb045d56fbfb0353fa385e
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jun 29 12:53:09 2022 +0200

    refactored server setup code: factored out and decoupled operation registry and prometheus summary registry

... and 6 more commits
2022-07-11 14:17:59 +02:00

105 lines
3.6 KiB
Python

import pytest
from funcy import rcompose, compose, project, merge
from pyinfra.server.buffering.stream import FlatStreamBuffer
from pyinfra.server.client_pipeline import ClientPipeline
from pyinfra.server.nothing import Nothing
from pyinfra.server.dispatcher.dispatchers.queue import QueuedStreamFunctionDispatcher
from pyinfra.server.dispatcher.dispatchers.rest import RestDispatcher
from pyinfra.server.interpreter.interpreters.identity import IdentityInterpreter
from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStreamer
from pyinfra.server.packer.packers.rest import RestPacker
from pyinfra.server.packing import unpack
from pyinfra.server.receiver.receivers.identity import QueuedStreamFunctionReceiver
from pyinfra.server.receiver.receivers.rest import RestReceiver
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
from pyinfra.server.utils import make_streamable_and_wrap_in_packing_logic
from pyinfra.utils.func import lift, llift
def test_mock_pipeline():
data = [1, 2, 3]
f, g, h, u = map(lift, [lambda x: x ** 2, lambda x: x + 2, lambda x: x / 2, lambda x: x])
pipeline = ClientPipeline(f, g, h, u)
assert list(pipeline(data)) == list(rcompose(f, g, h, u)(data))
@pytest.mark.parametrize("client_pipeline_type", ["basic", "rest"])
@pytest.mark.parametrize("server_side_test", [True])
def test_pipeline(
core_operation, client_pipeline, input_data_items, metadata, queue_message_metadata, targets, n_items
):
assert core_operation is not Nothing
metadata = [
merge(storage_mdt, project(queue_mdt, ["operation", "pages"]))
for storage_mdt, queue_mdt in zip(metadata, queue_message_metadata)
]
outputs = compose(llift(unpack), client_pipeline)(input_data_items, metadata)
assert n_items == 0 or len(outputs) > 0
assert outputs == targets
@pytest.mark.parametrize("item_type", ["string"])
@pytest.mark.parametrize("n_items", [1])
def test_pipeline_is_lazy(input_data_items, metadata, basic_client_pipeline, buffer_size):
def lazy_test_fn(*args, **kwargs):
probe["executed"] = True
return b"null", {}
probe = {"executed": False}
stream_function = make_streamable_and_wrap_in_packing_logic(lazy_test_fn, batched=False)
client_pipeline = get_basic_client_pipeline(stream_function, buffer_size=buffer_size)
output = client_pipeline(input_data_items, metadata)
assert not probe["executed"]
list(output)
assert probe["executed"]
@pytest.fixture
def client_pipeline(rest_client_pipeline, basic_client_pipeline, client_pipeline_type):
if client_pipeline_type == "rest":
return rest_client_pipeline
elif client_pipeline_type == "basic":
return basic_client_pipeline
@pytest.fixture
def rest_client_pipeline(server_process, endpoint, rest_interpreter):
"""Requires a webserver to listen on `endpoint`"""
return ClientPipeline(RestPacker(), RestDispatcher(endpoint), RestReceiver(), rest_interpreter)
@pytest.fixture
def basic_client_pipeline(endpoint, rest_interpreter, server_stream_function, buffer_size):
return get_basic_client_pipeline(server_stream_function, buffer_size)
def get_basic_client_pipeline(stream_function, buffer_size=3):
return ClientPipeline(
RestPacker(),
QueuedStreamFunctionDispatcher(
QueuedStreamFunction(
FlatStreamBuffer(
stream_function,
buffer_size=buffer_size,
),
),
),
QueuedStreamFunctionReceiver(),
IdentityInterpreter(),
)
@pytest.fixture
def rest_interpreter():
return rcompose(RestPickupStreamer(), RestReceiver())