refactoring; server pipeline WIP

This commit is contained in:
Matthias Bisping 2022-05-08 18:36:15 +02:00
parent 1d09337378
commit c092e7bcab
8 changed files with 45 additions and 25 deletions

View File

@ -1,7 +1,7 @@
from funcy import rcompose, flatten
class Pipeline:
class ClientPipeline:
def __init__(self, packer, sender, receiver, interpreter):
self.pipe = rcompose(
packer,

View File

@ -6,7 +6,7 @@ from pyinfra.server.dispatcher.dispatcher import Nothing
class OnDemandProcessor:
def __init__(self, fn):
"""Function has to return an iterable and ideally is a generator."""
"""Function `fn` has to return an iterable and ideally is a generator."""
self.execution_queue = chain([])
self.fn = fn

View File

@ -6,7 +6,7 @@ from pyinfra.server.dispatcher.dispatchers.rest import RestDispatcher
from pyinfra.server.interpreter.interpreters.identity import IdentityInterpreter
from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStreamer
from pyinfra.server.packer.packers.rest import RestPacker
from pyinfra.server.pipeline import Pipeline
from pyinfra.server.client_pipeline import ClientPipeline
from pyinfra.server.receiver.receivers.rest import RestReceiver
@ -28,11 +28,11 @@ def process_lazily(endpoint, data: Iterable[bytes], metadata: Iterable[dict]):
def get_eager_pipeline(endpoint):
return Pipeline(*pipeline_head(endpoint), IdentityInterpreter())
return ClientPipeline(*pipeline_head(endpoint), IdentityInterpreter())
def get_lazy_pipeline(endpoint):
return Pipeline(*pipeline_head(endpoint), rcompose(RestPickupStreamer(), RestReceiver()))
return ClientPipeline(*pipeline_head(endpoint), rcompose(RestPickupStreamer(), RestReceiver()))
def pipeline_head(url):

View File

@ -1,28 +1,49 @@
import logging
import flask
from flask import Flask, jsonify, request
from funcy import compose, flatten
from funcy import compose, flatten, rcompose
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.processor.processor import OnDemandProcessor
from pyinfra.server.utils import unpack_batchop_pack, unpack_op_pack
from pyinfra.server.utils import unpack_batchop_pack, unpack, normalize, pack
from pyinfra.utils.buffer import bufferize
from pyinfra.utils.func import lift
from pyinfra.utils.func import starlift, star, lift
logger = logging.getLogger()
class ServerPipeline:
def __init__(self, fn):
"""Function `fn` has to return an iterable and ideally is a generator."""
self.pipe = rcompose(
lift(unpack),
fn,
normalize,
starlift(pack),
)
def __call__(self, packages):
return self.pipe(packages)
def make_streamable(operation, buffer_size, batched):
wrapper = unpack_batchop_pack if batched else compose(lift, unpack_op_pack)
operation = wrapper(operation)
operation = bufferize(operation, buffer_size=buffer_size)
operation = operation if batched else starlift(operation)
operation = ServerPipeline(operation)
operation = BufferedProcessor(operation, buffer_size=buffer_size)
operation = compose(flatten, operation)
return operation
class BufferedProcessor:
def __init__(self, fn, buffer_size):
self.fn = bufferize(fn, buffer_size=buffer_size)
def __call__(self, item, final):
return self.fn(item, final=final)
class RestOndDemandProcessor(OnDemandProcessor):
def __init__(self, fn):
super(RestOndDemandProcessor, self).__init__(fn=fn)

View File

@ -44,8 +44,8 @@ def dispatch_methods(data):
return *repeat(requests.patch, ilen(data) - 1), requests.post
def unpack_op_pack(operation) -> Callable[[Dict], List[Dict]]:
return compose(starlift(pack), normalize, star(operation), unpack)
# def unpack_op_pack(operation) -> Callable[[Dict], List[Dict]]:
# return compose(starlift(pack), normalize, star(operation), unpack)
def unpack_batchop_pack(operation) -> Callable[[List[Dict]], List[Dict]]:

View File

@ -1,6 +1,5 @@
import logging
from collections import deque
from typing import Any
from funcy import repeatedly
@ -8,7 +7,7 @@ logger = logging.getLogger(__name__)
def bufferize(fn, buffer_size=3, persist_fn=lambda x: x):
def buffered_fn(item: Any, final=False):
def buffered_fn(item, final=False):
buffer.append(persist_fn(item))
response_payload = fn(repeatedly(buffer.popleft, n_items_to_pop(buffer, final)))
return response_payload

View File

@ -31,7 +31,7 @@ pytest_plugins = [
]
@pytest.fixture(autouse=False)
@pytest.fixture(autouse=True)
def mute_logger():
logger.setLevel(logging.CRITICAL + 1)

View File

@ -4,7 +4,7 @@ from funcy import rcompose, lmap
from pyinfra.server.dispatcher.dispatchers.rest import RestDispatcher
from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStreamer
from pyinfra.server.packer.packers.rest import RestPacker
from pyinfra.server.pipeline import Pipeline
from pyinfra.server.client_pipeline import ClientPipeline
from pyinfra.server.receiver.receivers.rest import RestReceiver
from pyinfra.server.utils import unpack
from pyinfra.utils.func import lift
@ -16,25 +16,25 @@ def test_mock_pipeline():
f, g, h, u = map(lift, [lambda x: x ** 2, lambda x: x + 2, lambda x: x / 2, lambda x: x])
pipeline = Pipeline(f, g, h, u)
pipeline = ClientPipeline(f, g, h, u)
assert list(pipeline(data)) == list(rcompose(f, g, h, u)(data))
def test_pipeline(pipeline, input_data_items, metadata, target_data_items):
output = pipeline(input_data_items, metadata)
def test_pipeline(client_pipeline, input_data_items, metadata, target_data_items):
output = client_pipeline(input_data_items, metadata)
output = lmap(unpack, output)
assert output == target_data_items
@pytest.fixture
def pipeline(rest_pipeline):
return rest_pipeline
def client_pipeline(rest_client_pipeline):
return rest_client_pipeline
@pytest.fixture
def rest_pipeline(server_process, endpoint, rest_interpreter):
return Pipeline(RestPacker(), RestDispatcher(endpoint), RestReceiver(), rest_interpreter)
def rest_client_pipeline(server_process, endpoint, rest_interpreter):
return ClientPipeline(RestPacker(), RestDispatcher(endpoint), RestReceiver(), rest_interpreter)
@pytest.fixture