refactoring: pipeline

This commit is contained in:
Matthias Bisping 2022-05-05 15:15:05 +02:00
parent 7a3bb9334b
commit b62652957a
5 changed files with 67 additions and 18 deletions

View File

@ -10,26 +10,30 @@ from pyinfra.server.pipeline import Pipeline
from pyinfra.server.receiver.receivers.rest import RestReceiver
def process_eagerly(url, data: Iterable[bytes], metadata: Iterable[dict]):
def process_eagerly(endpoint, data: Iterable[bytes], metadata: Iterable[dict]):
"""Posts `data` to `url` and aggregates responses for each element of `data`."""
pipeline = Pipeline(*pipeline_head(url), IdentityInterpreter())
pipeline = get_eager_pipeline(endpoint)
yield from pipeline(data, metadata)
def process_lazily(url, data: Iterable[bytes], metadata: Iterable[dict]):
def process_lazily(endpoint, data: Iterable[bytes], metadata: Iterable[dict]):
"""Posts `data` to `url` and aggregates responses for each element of `data` by querying a pickup endpoint.
Requires:
- responses must provide return pickup_endpoint as JSON payload
- responses must have status code 206 for more responses coming and 204 for the last response already sent
"""
pipeline = Pipeline(*pipeline_head(url), rcompose(RestPickupStreamer(), RestReceiver()))
pipeline = get_lazy_pipeline(endpoint)
yield from pipeline(data, metadata)
def get_eager_pipeline(endpoint):
return Pipeline(*pipeline_head(endpoint), IdentityInterpreter())
def get_lazy_pipeline(endpoint):
return Pipeline(*pipeline_head(endpoint), rcompose(RestPickupStreamer(), RestReceiver()))
def pipeline_head(url):
return RestPacker(), RestDispatcher(url), RestReceiver()

View File

@ -7,6 +7,6 @@ from pyinfra.server.utils import unpack
@pytest.mark.parametrize("batched", [True, False])
@pytest.mark.parametrize("item_type", ["pdf", "string", "image"])
def test_sending_partial_request(url, input_data_items, metadata, operation, target_data_items, server_process):
def test_sending_partial_request(url, input_data_items, metadata, target_data_items, server_process):
output = lmap(unpack, process_eagerly(f"{url}/process", input_data_items, metadata))
assert output == target_data_items

View File

@ -37,6 +37,14 @@ def target_data_items(input_data_items, item_type, operation, metadata):
return expected
@pytest.fixture
def endpoint(url, analysis_type):
return {
"eager": f"{url}/process",
"lazy": f"{url}/submit",
}[analysis_type]
@pytest.fixture(params=[0, 1, 5, 10])
def n_items(request):
return request.param
@ -47,6 +55,16 @@ def n_pages(request):
return request.param
@pytest.fixture(params=["eager", "lazy"])
def analysis_type(request):
return request.param
@pytest.fixture(params=[1, 5, 90])
def buffer_size(request):
return request.param
def array_to_image(array) -> Image.Image:
return Image.fromarray(np.uint8(array * 255), mode="RGB")
@ -55,11 +73,6 @@ def input_batch(n_items):
return np.random.random_sample(size=(n_items, 3, 30, 30))
@pytest.fixture(params=[1, 5, 90])
def buffer_size(request):
return request.param
def images(n_items):
return lmap(compose(image_to_bytes, array_to_image), input_batch(n_items))

View File

@ -1,6 +1,13 @@
from funcy import rcompose
import pytest
from funcy import rcompose, lmap
from pyinfra.server.dispatcher.senders.rest import RestDispatcher
from pyinfra.server.interpreter.interpreters.identity import IdentityInterpreter
from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStreamer
from pyinfra.server.packer.packers.rest import RestPacker
from pyinfra.server.pipeline import Pipeline
from pyinfra.server.receiver.receivers.rest import RestReceiver
from pyinfra.server.utils import unpack
from pyinfra.utils.func import lift
@ -13,3 +20,28 @@ def test_mock_pipeline():
pipeline = Pipeline(f, g, h, u)
assert list(pipeline(data)) == list(rcompose(f, g, h, u)(data))
@pytest.mark.parametrize("batched", [True, False])
@pytest.mark.parametrize("item_type", ["pdf", "string", "image"])
def test_pipeline(pipeline, input_data_items, metadata, target_data_items):
output = lmap(unpack, pipeline(input_data_items, metadata))
assert output == target_data_items
@pytest.fixture
def pipeline(rest_pipeline):
return rest_pipeline
@pytest.fixture
def rest_pipeline(server_process, endpoint, rest_interpreter):
return Pipeline(RestPacker(), RestDispatcher(endpoint), RestReceiver(), rest_interpreter)
@pytest.fixture
def rest_interpreter(analysis_type):
return {
"eager": IdentityInterpreter(),
"lazy": rcompose(RestPickupStreamer(), RestReceiver()),
}[analysis_type]

View File

@ -6,7 +6,7 @@ from pyinfra.server.dispatcher.senders.rest import RestDispatcher
@pytest.mark.parametrize("batched", [True, False])
@pytest.mark.parametrize("item_type", ["string", "image", "pdf"])
def test_rest_receiver(url, packages, server_process):
sender = RestDispatcher(f"{url}/process")
def test_rest_rest_receiver(url, packages, server_process):
dispatcher = RestDispatcher(f"{url}/process")
receiver = RestReceiver()
assert all((isinstance(r, list) for r in receiver(sender(packages))))
assert all((isinstance(r, list) for r in receiver(dispatcher(packages))))