101 lines
3.2 KiB
Python
101 lines
3.2 KiB
Python
import pytest
|
|
from funcy import rcompose, lmap
|
|
|
|
from pyinfra.server.client_pipeline import ClientPipeline
|
|
from pyinfra.server.dispatcher.dispatcher import Nothing
|
|
from pyinfra.server.dispatcher.dispatchers.forwarding import ForwardingDispatcher
|
|
from pyinfra.server.dispatcher.dispatchers.rest import RestDispatcher
|
|
from pyinfra.server.interpreter.interpreters.identity import IdentityInterpreter
|
|
from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStreamer
|
|
from pyinfra.server.packer.packers.rest import RestPacker
|
|
from pyinfra.server.receiver.receivers.identity import IdentityReceiver
|
|
from pyinfra.server.receiver.receivers.rest import RestReceiver
|
|
from pyinfra.server.utils import unpack
|
|
from pyinfra.utils.func import lift
|
|
from test.fixtures.server import InvalidParameterCombination
|
|
|
|
|
|
def test_mock_pipeline():
|
|
|
|
data = [1, 2, 3]
|
|
|
|
f, g, h, u = map(lift, [lambda x: x ** 2, lambda x: x + 2, lambda x: x / 2, lambda x: x])
|
|
|
|
pipeline = ClientPipeline(f, g, h, u)
|
|
|
|
assert list(pipeline(data)) == list(rcompose(f, g, h, u)(data))
|
|
|
|
|
|
def pass_invalid_combination(fn):
|
|
def inner(*args, **kwargs):
|
|
try:
|
|
return fn(*args, **kwargs)
|
|
except InvalidParameterCombination:
|
|
pass
|
|
|
|
return inner
|
|
|
|
|
|
# @pass_invalid_combination
|
|
@pytest.mark.parametrize(
|
|
"client_pipeline_type",
|
|
[
|
|
"rest",
|
|
# "basic",
|
|
],
|
|
)
|
|
def test_pipeline(core_operation, client_pipeline, input_data_items, metadata, targets, item_type, one_to_many):
|
|
if targets is Nothing:
|
|
pytest.skip(f"invalid parameter combination: {item_type=}, {one_to_many=}")
|
|
output = client_pipeline(input_data_items, metadata)
|
|
output = lmap(unpack, output)
|
|
assert output == targets
|
|
|
|
|
|
# @pytest.mark.parametrize("item_type", ["string"])
|
|
# @pytest.mark.parametrize("n_items", [1])
|
|
# def test_pipeline_is_lazy(input_data_items, metadata):
|
|
# def lazy_test_fn(*args, **kwargs):
|
|
# probe["executed"] = True
|
|
# return b"null", {}
|
|
#
|
|
# probe = {"executed": False}
|
|
# processor_fn = make_streamable(lazy_test_fn, buffer_size=3, batched=False)
|
|
#
|
|
# client_pipeline = ClientPipeline(
|
|
# RestPacker(), ForwardingDispatcher(processor_fn), IdentityReceiver(), IdentityInterpreter()
|
|
# )
|
|
# output = client_pipeline(input_data_items, metadata)
|
|
#
|
|
# assert not probe["executed"]
|
|
#
|
|
# list(output)
|
|
#
|
|
# assert probe["executed"]
|
|
|
|
|
|
@pytest.fixture
|
|
def client_pipeline(rest_client_pipeline, basic_client_pipeline, client_pipeline_type):
|
|
if client_pipeline_type == "rest":
|
|
return rest_client_pipeline
|
|
elif client_pipeline_type == "basic":
|
|
return basic_client_pipeline
|
|
|
|
|
|
@pytest.fixture
|
|
def rest_client_pipeline(server_process, endpoint, rest_interpreter):
|
|
"""Requires a webserver to listen on `endpoint`"""
|
|
return ClientPipeline(RestPacker(), RestDispatcher(endpoint), RestReceiver(), rest_interpreter)
|
|
|
|
|
|
@pytest.fixture
|
|
def basic_client_pipeline(endpoint, rest_interpreter, server_stream_function):
|
|
return ClientPipeline(
|
|
RestPacker(), ForwardingDispatcher(server_stream_function), IdentityReceiver(), IdentityInterpreter()
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def rest_interpreter():
|
|
return rcompose(RestPickupStreamer(), RestReceiver())
|