diff --git a/pyinfra/server/client_pipeline.py b/pyinfra/server/client_pipeline.py index 7a39dc4..4ebde5a 100644 --- a/pyinfra/server/client_pipeline.py +++ b/pyinfra/server/client_pipeline.py @@ -2,10 +2,10 @@ from funcy import rcompose, flatten class ClientPipeline: - def __init__(self, packer, sender, receiver, interpreter): + def __init__(self, packer, dispatcher, receiver, interpreter): self.pipe = rcompose( packer, - sender, + dispatcher, receiver, interpreter, flatten, # each analysis call returns an iterable. Can be empty, singleton or multi item. Hence, flatten. diff --git a/pyinfra/server/dispatcher/dispatcher.py b/pyinfra/server/dispatcher/dispatcher.py index 9aa7a44..62b5fc6 100644 --- a/pyinfra/server/dispatcher/dispatcher.py +++ b/pyinfra/server/dispatcher/dispatcher.py @@ -18,7 +18,6 @@ def has_next(peekable_iter): class Dispatcher: def __call__(self, packages: Iterable[dict]): - yield from self.dispatch_methods(packages) def dispatch_methods(self, packages): diff --git a/pyinfra/server/dispatcher/dispatchers/forwarding.py b/pyinfra/server/dispatcher/dispatchers/forwarding.py index 245db8e..68306e3 100644 --- a/pyinfra/server/dispatcher/dispatchers/forwarding.py +++ b/pyinfra/server/dispatcher/dispatchers/forwarding.py @@ -1,12 +1,14 @@ from pyinfra.server.dispatcher.dispatcher import Dispatcher -class ForwardingDispatcher(Dispatcher): - def __init__(self, fn): - self.fn = fn +class QueuedStreamFunctionDispatcher(Dispatcher): + def __init__(self, queue): + self.queue = queue def patch(self, package): - return self.fn(package) + self.queue.push(package) + return self.queue.pop() def post(self, package): - return self.fn(package) + self.queue.push(package) + return self.queue.pop() diff --git a/pyinfra/server/interpreter/interpreters/rest_callback.py b/pyinfra/server/interpreter/interpreters/rest_callback.py index 718fe97..27b5f50 100644 --- a/pyinfra/server/interpreter/interpreters/rest_callback.py +++ b/pyinfra/server/interpreter/interpreters/rest_callback.py @@ -6,7 +6,7 @@ from funcy import takewhile, repeatedly, mapcat from pyinfra.server.interpreter.interpreter import Interpreter -def stream_response_payloads(endpoint): +def stream_responses(endpoint): def receive(): response = requests.get(endpoint) return response @@ -20,4 +20,4 @@ def stream_response_payloads(endpoint): class RestPickupStreamer(Interpreter): def __call__(self, payloads: Iterable): - yield from mapcat(stream_response_payloads, payloads) + yield from mapcat(stream_responses, payloads) diff --git a/pyinfra/server/receiver/receivers/identity.py b/pyinfra/server/receiver/receivers/identity.py index 5abb22c..19ec76b 100644 --- a/pyinfra/server/receiver/receivers/identity.py +++ b/pyinfra/server/receiver/receivers/identity.py @@ -1,9 +1,11 @@ from typing import Iterable from pyinfra.server.receiver.receiver import Receiver +from funcy import notnone -class IdentityReceiver(Receiver): +class QueuedStreamFunctionReceiver(Receiver): + def __call__(self, responses: Iterable): - for response in responses: + for response in filter(notnone, responses): yield response diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index 05973ee..8f6e4ef 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -1,10 +1,17 @@ from flask import Flask, jsonify, request +from pyinfra.server.buffering.stream import FlatStreamBuffer from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction from pyinfra.server.stream.rest import LazyRestProcessor -def set_up_processing_server(queued_stream_function: QueuedStreamFunction): +def set_up_processing_server(server_stream_function, buffer_size): + flat_stream_buffer = FlatStreamBuffer(server_stream_function, buffer_size=buffer_size) + queued_stream_function = QueuedStreamFunction(flat_stream_buffer) + return __set_up_processing_server(queued_stream_function) + + +def __set_up_processing_server(queued_stream_function: QueuedStreamFunction): app = Flask(__name__) processor = LazyRestProcessor(queued_stream_function, submit_suffix="submit", pickup_suffix="pickup") diff --git a/test/fixtures/input.py b/test/fixtures/input.py index 209ebd9..2024889 100644 --- a/test/fixtures/input.py +++ b/test/fixtures/input.py @@ -10,7 +10,6 @@ from pyinfra.server.normalization import normalize_item from pyinfra.server.packing import pack, unpack from pyinfra.utils.func import star, lift, lstarlift from test.utils.image import image_to_bytes -from test.utils.server import string_to_bytes @pytest.fixture diff --git a/test/fixtures/server.py b/test/fixtures/server.py index efd2b47..65ccb7a 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -7,15 +7,13 @@ import fitz import pytest import requests from PIL import Image -from funcy import retry, first, compose, identity +from funcy import retry from waitress import serve -from pyinfra.server.buffering.stream import FlatStreamBuffer from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.server import ( set_up_processing_server, ) -from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction from pyinfra.server.utils import make_streamable_and_wrap_in_packing_logic from pyinfra.utils.func import starlift from test.utils.image import image_to_bytes @@ -44,9 +42,7 @@ def url(host, port): @pytest.fixture def server(server_stream_function, buffer_size): - flat_stream_buffer = FlatStreamBuffer(server_stream_function, buffer_size=buffer_size) - queued_stream_function = QueuedStreamFunction(flat_stream_buffer) - return set_up_processing_server(queued_stream_function) + return set_up_processing_server(server_stream_function, buffer_size) @pytest.fixture @@ -67,6 +63,7 @@ def operation(core_operation): return zip(result, metadata) else: return result, metadata + if core_operation is Nothing: return Nothing return op diff --git a/test/unit_tests/server/pipeline_test.py b/test/unit_tests/server/pipeline_test.py index 5505223..0a3bcc9 100644 --- a/test/unit_tests/server/pipeline_test.py +++ b/test/unit_tests/server/pipeline_test.py @@ -1,16 +1,18 @@ import pytest from funcy import rcompose, lmap +from pyinfra.server.buffering.stream import FlatStreamBuffer from pyinfra.server.client_pipeline import ClientPipeline from pyinfra.server.dispatcher.dispatcher import Nothing -from pyinfra.server.dispatcher.dispatchers.forwarding import ForwardingDispatcher +from pyinfra.server.dispatcher.dispatchers.forwarding import QueuedStreamFunctionDispatcher from pyinfra.server.dispatcher.dispatchers.rest import RestDispatcher from pyinfra.server.interpreter.interpreters.identity import IdentityInterpreter from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStreamer from pyinfra.server.packer.packers.rest import RestPacker -from pyinfra.server.receiver.receivers.identity import IdentityReceiver -from pyinfra.server.receiver.receivers.rest import RestReceiver from pyinfra.server.packing import unpack +from pyinfra.server.receiver.receivers.identity import QueuedStreamFunctionReceiver +from pyinfra.server.receiver.receivers.rest import RestReceiver +from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction from pyinfra.utils.func import lift @@ -29,7 +31,7 @@ def test_mock_pipeline(): "client_pipeline_type", [ "rest", - # "basic", + "basic", ], ) def test_pipeline(core_operation, client_pipeline, input_data_items, metadata, targets, item_type, one_to_many): @@ -77,9 +79,16 @@ def rest_client_pipeline(server_process, endpoint, rest_interpreter): @pytest.fixture -def basic_client_pipeline(endpoint, rest_interpreter, server_stream_function): +def basic_client_pipeline(endpoint, rest_interpreter, server_stream_function, buffer_size): + + flat_stream_buffer = FlatStreamBuffer(server_stream_function, buffer_size=buffer_size) + queued_stream_function = QueuedStreamFunction(flat_stream_buffer) + return ClientPipeline( - RestPacker(), ForwardingDispatcher(server_stream_function), IdentityReceiver(), IdentityInterpreter() + RestPacker(), + QueuedStreamFunctionDispatcher(queued_stream_function), + QueuedStreamFunctionReceiver(), + IdentityInterpreter(), )