pyinfra/test/unit_tests/pipeline_test.py
2022-05-05 15:47:25 +02:00

48 lines
1.5 KiB
Python

import pytest
from funcy import rcompose, lmap
from pyinfra.server.dispatcher.dispatchers.rest import RestDispatcher
from pyinfra.server.interpreter.interpreters.identity import IdentityInterpreter
from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStreamer
from pyinfra.server.packer.packers.rest import RestPacker
from pyinfra.server.pipeline import Pipeline
from pyinfra.server.receiver.receivers.rest import RestReceiver
from pyinfra.server.utils import unpack
from pyinfra.utils.func import lift
def test_mock_pipeline():
data = [1, 2, 3]
f, g, h, u = map(lift, [lambda x: x ** 2, lambda x: x + 2, lambda x: x / 2, lambda x: x])
pipeline = Pipeline(f, g, h, u)
assert list(pipeline(data)) == list(rcompose(f, g, h, u)(data))
@pytest.mark.parametrize("batched", [True, False])
@pytest.mark.parametrize("item_type", ["pdf", "string", "image"])
def test_pipeline(pipeline, input_data_items, metadata, target_data_items):
output = lmap(unpack, pipeline(input_data_items, metadata))
assert output == target_data_items
@pytest.fixture
def pipeline(rest_pipeline):
return rest_pipeline
@pytest.fixture
def rest_pipeline(server_process, endpoint, rest_interpreter):
return Pipeline(RestPacker(), RestDispatcher(endpoint), RestReceiver(), rest_interpreter)
@pytest.fixture
def rest_interpreter(analysis_type):
return {
"eager": IdentityInterpreter(),
"lazy": rcompose(RestPickupStreamer(), RestReceiver()),
}[analysis_type]