non-rest pipeline works again

This commit is contained in:
Matthias Bisping 2022-05-17 10:27:32 +02:00
parent e5a4e7e994
commit 9c262e7138
9 changed files with 41 additions and 26 deletions

View File

@ -2,10 +2,10 @@ from funcy import rcompose, flatten
class ClientPipeline:
def __init__(self, packer, sender, receiver, interpreter):
def __init__(self, packer, dispatcher, receiver, interpreter):
self.pipe = rcompose(
packer,
sender,
dispatcher,
receiver,
interpreter,
flatten, # each analysis call returns an iterable. Can be empty, singleton or multi item. Hence, flatten.

View File

@ -18,7 +18,6 @@ def has_next(peekable_iter):
class Dispatcher:
def __call__(self, packages: Iterable[dict]):
yield from self.dispatch_methods(packages)
def dispatch_methods(self, packages):

View File

@ -1,12 +1,14 @@
from pyinfra.server.dispatcher.dispatcher import Dispatcher
class ForwardingDispatcher(Dispatcher):
def __init__(self, fn):
self.fn = fn
class QueuedStreamFunctionDispatcher(Dispatcher):
def __init__(self, queue):
self.queue = queue
def patch(self, package):
return self.fn(package)
self.queue.push(package)
return self.queue.pop()
def post(self, package):
return self.fn(package)
self.queue.push(package)
return self.queue.pop()

View File

@ -6,7 +6,7 @@ from funcy import takewhile, repeatedly, mapcat
from pyinfra.server.interpreter.interpreter import Interpreter
def stream_response_payloads(endpoint):
def stream_responses(endpoint):
def receive():
response = requests.get(endpoint)
return response
@ -20,4 +20,4 @@ def stream_response_payloads(endpoint):
class RestPickupStreamer(Interpreter):
def __call__(self, payloads: Iterable):
yield from mapcat(stream_response_payloads, payloads)
yield from mapcat(stream_responses, payloads)

View File

@ -1,9 +1,11 @@
from typing import Iterable
from pyinfra.server.receiver.receiver import Receiver
from funcy import notnone
class IdentityReceiver(Receiver):
class QueuedStreamFunctionReceiver(Receiver):
def __call__(self, responses: Iterable):
for response in responses:
for response in filter(notnone, responses):
yield response

View File

@ -1,10 +1,17 @@
from flask import Flask, jsonify, request
from pyinfra.server.buffering.stream import FlatStreamBuffer
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
from pyinfra.server.stream.rest import LazyRestProcessor
def set_up_processing_server(queued_stream_function: QueuedStreamFunction):
def set_up_processing_server(server_stream_function, buffer_size):
flat_stream_buffer = FlatStreamBuffer(server_stream_function, buffer_size=buffer_size)
queued_stream_function = QueuedStreamFunction(flat_stream_buffer)
return __set_up_processing_server(queued_stream_function)
def __set_up_processing_server(queued_stream_function: QueuedStreamFunction):
app = Flask(__name__)
processor = LazyRestProcessor(queued_stream_function, submit_suffix="submit", pickup_suffix="pickup")

View File

@ -10,7 +10,6 @@ from pyinfra.server.normalization import normalize_item
from pyinfra.server.packing import pack, unpack
from pyinfra.utils.func import star, lift, lstarlift
from test.utils.image import image_to_bytes
from test.utils.server import string_to_bytes
@pytest.fixture

View File

@ -7,15 +7,13 @@ import fitz
import pytest
import requests
from PIL import Image
from funcy import retry, first, compose, identity
from funcy import retry
from waitress import serve
from pyinfra.server.buffering.stream import FlatStreamBuffer
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.server import (
set_up_processing_server,
)
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
from pyinfra.server.utils import make_streamable_and_wrap_in_packing_logic
from pyinfra.utils.func import starlift
from test.utils.image import image_to_bytes
@ -44,9 +42,7 @@ def url(host, port):
@pytest.fixture
def server(server_stream_function, buffer_size):
flat_stream_buffer = FlatStreamBuffer(server_stream_function, buffer_size=buffer_size)
queued_stream_function = QueuedStreamFunction(flat_stream_buffer)
return set_up_processing_server(queued_stream_function)
return set_up_processing_server(server_stream_function, buffer_size)
@pytest.fixture
@ -67,6 +63,7 @@ def operation(core_operation):
return zip(result, metadata)
else:
return result, metadata
if core_operation is Nothing:
return Nothing
return op

View File

@ -1,16 +1,18 @@
import pytest
from funcy import rcompose, lmap
from pyinfra.server.buffering.stream import FlatStreamBuffer
from pyinfra.server.client_pipeline import ClientPipeline
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.dispatcher.dispatchers.forwarding import ForwardingDispatcher
from pyinfra.server.dispatcher.dispatchers.forwarding import QueuedStreamFunctionDispatcher
from pyinfra.server.dispatcher.dispatchers.rest import RestDispatcher
from pyinfra.server.interpreter.interpreters.identity import IdentityInterpreter
from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStreamer
from pyinfra.server.packer.packers.rest import RestPacker
from pyinfra.server.receiver.receivers.identity import IdentityReceiver
from pyinfra.server.receiver.receivers.rest import RestReceiver
from pyinfra.server.packing import unpack
from pyinfra.server.receiver.receivers.identity import QueuedStreamFunctionReceiver
from pyinfra.server.receiver.receivers.rest import RestReceiver
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
from pyinfra.utils.func import lift
@ -29,7 +31,7 @@ def test_mock_pipeline():
"client_pipeline_type",
[
"rest",
# "basic",
"basic",
],
)
def test_pipeline(core_operation, client_pipeline, input_data_items, metadata, targets, item_type, one_to_many):
@ -77,9 +79,16 @@ def rest_client_pipeline(server_process, endpoint, rest_interpreter):
@pytest.fixture
def basic_client_pipeline(endpoint, rest_interpreter, server_stream_function):
def basic_client_pipeline(endpoint, rest_interpreter, server_stream_function, buffer_size):
flat_stream_buffer = FlatStreamBuffer(server_stream_function, buffer_size=buffer_size)
queued_stream_function = QueuedStreamFunction(flat_stream_buffer)
return ClientPipeline(
RestPacker(), ForwardingDispatcher(server_stream_function), IdentityReceiver(), IdentityInterpreter()
RestPacker(),
QueuedStreamFunctionDispatcher(queued_stream_function),
QueuedStreamFunctionReceiver(),
IdentityInterpreter(),
)