From c092e7bcabb54ca5e16a3adf814a314903f5694c Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Sun, 8 May 2022 18:36:15 +0200 Subject: [PATCH] refactoring; server pipeline WIP --- .../{pipeline.py => client_pipeline.py} | 2 +- pyinfra/server/processor/processor.py | 2 +- pyinfra/server/rest.py | 6 ++-- pyinfra/server/server.py | 35 +++++++++++++++---- pyinfra/server/utils.py | 4 +-- pyinfra/utils/buffer.py | 3 +- test/conftest.py | 2 +- test/unit_tests/server/pipeline_test.py | 16 ++++----- 8 files changed, 45 insertions(+), 25 deletions(-) rename pyinfra/server/{pipeline.py => client_pipeline.py} (95%) diff --git a/pyinfra/server/pipeline.py b/pyinfra/server/client_pipeline.py similarity index 95% rename from pyinfra/server/pipeline.py rename to pyinfra/server/client_pipeline.py index e824504..c53d4e5 100644 --- a/pyinfra/server/pipeline.py +++ b/pyinfra/server/client_pipeline.py @@ -1,7 +1,7 @@ from funcy import rcompose, flatten -class Pipeline: +class ClientPipeline: def __init__(self, packer, sender, receiver, interpreter): self.pipe = rcompose( packer, diff --git a/pyinfra/server/processor/processor.py b/pyinfra/server/processor/processor.py index 80cd220..0c030ad 100644 --- a/pyinfra/server/processor/processor.py +++ b/pyinfra/server/processor/processor.py @@ -6,7 +6,7 @@ from pyinfra.server.dispatcher.dispatcher import Nothing class OnDemandProcessor: def __init__(self, fn): - """Function has to return an iterable and ideally is a generator.""" + """Function `fn` has to return an iterable and ideally is a generator.""" self.execution_queue = chain([]) self.fn = fn diff --git a/pyinfra/server/rest.py b/pyinfra/server/rest.py index 6c246ab..d659a92 100644 --- a/pyinfra/server/rest.py +++ b/pyinfra/server/rest.py @@ -6,7 +6,7 @@ from pyinfra.server.dispatcher.dispatchers.rest import RestDispatcher from pyinfra.server.interpreter.interpreters.identity import IdentityInterpreter from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStreamer from pyinfra.server.packer.packers.rest import RestPacker -from pyinfra.server.pipeline import Pipeline +from pyinfra.server.client_pipeline import ClientPipeline from pyinfra.server.receiver.receivers.rest import RestReceiver @@ -28,11 +28,11 @@ def process_lazily(endpoint, data: Iterable[bytes], metadata: Iterable[dict]): def get_eager_pipeline(endpoint): - return Pipeline(*pipeline_head(endpoint), IdentityInterpreter()) + return ClientPipeline(*pipeline_head(endpoint), IdentityInterpreter()) def get_lazy_pipeline(endpoint): - return Pipeline(*pipeline_head(endpoint), rcompose(RestPickupStreamer(), RestReceiver())) + return ClientPipeline(*pipeline_head(endpoint), rcompose(RestPickupStreamer(), RestReceiver())) def pipeline_head(url): diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index b46b569..e46d611 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -1,28 +1,49 @@ import logging -import flask from flask import Flask, jsonify, request -from funcy import compose, flatten +from funcy import compose, flatten, rcompose from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.processor.processor import OnDemandProcessor -from pyinfra.server.utils import unpack_batchop_pack, unpack_op_pack +from pyinfra.server.utils import unpack_batchop_pack, unpack, normalize, pack from pyinfra.utils.buffer import bufferize -from pyinfra.utils.func import lift +from pyinfra.utils.func import starlift, star, lift logger = logging.getLogger() +class ServerPipeline: + def __init__(self, fn): + """Function `fn` has to return an iterable and ideally is a generator.""" + self.pipe = rcompose( + lift(unpack), + fn, + normalize, + starlift(pack), + ) + + def __call__(self, packages): + return self.pipe(packages) + + def make_streamable(operation, buffer_size, batched): - wrapper = unpack_batchop_pack if batched else compose(lift, unpack_op_pack) - operation = wrapper(operation) - operation = bufferize(operation, buffer_size=buffer_size) + operation = operation if batched else starlift(operation) + operation = ServerPipeline(operation) + operation = BufferedProcessor(operation, buffer_size=buffer_size) operation = compose(flatten, operation) return operation +class BufferedProcessor: + def __init__(self, fn, buffer_size): + self.fn = bufferize(fn, buffer_size=buffer_size) + + def __call__(self, item, final): + return self.fn(item, final=final) + + class RestOndDemandProcessor(OnDemandProcessor): def __init__(self, fn): super(RestOndDemandProcessor, self).__init__(fn=fn) diff --git a/pyinfra/server/utils.py b/pyinfra/server/utils.py index 5322436..6caefa7 100644 --- a/pyinfra/server/utils.py +++ b/pyinfra/server/utils.py @@ -44,8 +44,8 @@ def dispatch_methods(data): return *repeat(requests.patch, ilen(data) - 1), requests.post -def unpack_op_pack(operation) -> Callable[[Dict], List[Dict]]: - return compose(starlift(pack), normalize, star(operation), unpack) +# def unpack_op_pack(operation) -> Callable[[Dict], List[Dict]]: +# return compose(starlift(pack), normalize, star(operation), unpack) def unpack_batchop_pack(operation) -> Callable[[List[Dict]], List[Dict]]: diff --git a/pyinfra/utils/buffer.py b/pyinfra/utils/buffer.py index 4603d34..67c2242 100644 --- a/pyinfra/utils/buffer.py +++ b/pyinfra/utils/buffer.py @@ -1,6 +1,5 @@ import logging from collections import deque -from typing import Any from funcy import repeatedly @@ -8,7 +7,7 @@ logger = logging.getLogger(__name__) def bufferize(fn, buffer_size=3, persist_fn=lambda x: x): - def buffered_fn(item: Any, final=False): + def buffered_fn(item, final=False): buffer.append(persist_fn(item)) response_payload = fn(repeatedly(buffer.popleft, n_items_to_pop(buffer, final))) return response_payload diff --git a/test/conftest.py b/test/conftest.py index 1000563..d1c9da6 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -31,7 +31,7 @@ pytest_plugins = [ ] -@pytest.fixture(autouse=False) +@pytest.fixture(autouse=True) def mute_logger(): logger.setLevel(logging.CRITICAL + 1) diff --git a/test/unit_tests/server/pipeline_test.py b/test/unit_tests/server/pipeline_test.py index b87b5fb..a07a1c1 100644 --- a/test/unit_tests/server/pipeline_test.py +++ b/test/unit_tests/server/pipeline_test.py @@ -4,7 +4,7 @@ from funcy import rcompose, lmap from pyinfra.server.dispatcher.dispatchers.rest import RestDispatcher from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStreamer from pyinfra.server.packer.packers.rest import RestPacker -from pyinfra.server.pipeline import Pipeline +from pyinfra.server.client_pipeline import ClientPipeline from pyinfra.server.receiver.receivers.rest import RestReceiver from pyinfra.server.utils import unpack from pyinfra.utils.func import lift @@ -16,25 +16,25 @@ def test_mock_pipeline(): f, g, h, u = map(lift, [lambda x: x ** 2, lambda x: x + 2, lambda x: x / 2, lambda x: x]) - pipeline = Pipeline(f, g, h, u) + pipeline = ClientPipeline(f, g, h, u) assert list(pipeline(data)) == list(rcompose(f, g, h, u)(data)) -def test_pipeline(pipeline, input_data_items, metadata, target_data_items): - output = pipeline(input_data_items, metadata) +def test_pipeline(client_pipeline, input_data_items, metadata, target_data_items): + output = client_pipeline(input_data_items, metadata) output = lmap(unpack, output) assert output == target_data_items @pytest.fixture -def pipeline(rest_pipeline): - return rest_pipeline +def client_pipeline(rest_client_pipeline): + return rest_client_pipeline @pytest.fixture -def rest_pipeline(server_process, endpoint, rest_interpreter): - return Pipeline(RestPacker(), RestDispatcher(endpoint), RestReceiver(), rest_interpreter) +def rest_client_pipeline(server_process, endpoint, rest_interpreter): + return ClientPipeline(RestPacker(), RestDispatcher(endpoint), RestReceiver(), rest_interpreter) @pytest.fixture