refactoring: simplyfing lazy processor to queued function WIP

This commit is contained in:
Matthias Bisping 2022-05-13 16:59:50 +02:00
parent 2434e0ea55
commit 40777ae609
3 changed files with 21 additions and 21 deletions

View File

@ -25,32 +25,31 @@ def make_streamable_and_wrap_in_packing_logic(fn, batched):
return fn
class LazyProcessor:
"""Accepts computation requests (push) and lazily produces results (pop)."""
def __init__(self, flat_stream_buffer: FlatStreamBuffer):
class QueuedFunction:
def __init__(self, fn):
self.queue = Queue()
self.flat_stream_buffer = flat_stream_buffer
self.fn = fn
def push(self, item):
self.queue.append(item)
def pop(self):
items = stream_queue(self.queue)
return first(self.flat_stream_buffer(items))
return self.fn(items)
class LazyRestProcessor:
def __init__(self, lazy_processor: LazyProcessor, submit_suffix="submit", pickup_suffix="pickup"):
def __init__(self, queued_function: QueuedFunction, submit_suffix="submit", pickup_suffix="pickup"):
self.submit_suffix = submit_suffix
self.pickup_suffix = pickup_suffix
self.lazy_processor = lazy_processor
self.queued_function = queued_function
def push(self, request):
self.lazy_processor.push(request.json)
self.queued_function.push(request.json)
return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix))
def pop(self):
result = self.lazy_processor.pop() or Nothing
result = self.queued_function.pop() or Nothing
if not valid(result):
logger.error(f"Received invalid result: {result}")
@ -71,9 +70,9 @@ def valid(result):
return isinstance(result, dict) or result is Nothing
def set_up_processing_server(lazy_processor: LazyProcessor):
def set_up_processing_server(queued_function: QueuedFunction):
app = Flask(__name__)
processor = LazyRestProcessor(lazy_processor, submit_suffix="submit", pickup_suffix="pickup")
processor = LazyRestProcessor(queued_function, submit_suffix="submit", pickup_suffix="pickup")
@app.route("/ready", methods=["GET"])
def ready():

View File

@ -6,11 +6,11 @@ import fitz
import pytest
import requests
from PIL import Image
from funcy import retry
from funcy import retry, first, compose
from waitress import serve
from pyinfra.server.bufferizer.lazy_bufferizer import FlatStreamBuffer
from pyinfra.server.server import set_up_processing_server, make_streamable_and_wrap_in_packing_logic, LazyProcessor
from pyinfra.server.server import set_up_processing_server, make_streamable_and_wrap_in_packing_logic, QueuedFunction
from pyinfra.utils.func import starlift
from test.utils.image import image_to_bytes
@ -39,8 +39,8 @@ def url(host, port):
@pytest.fixture
def server(processor_fn, buffer_size):
flat_stream_buffer = FlatStreamBuffer(processor_fn, buffer_size=buffer_size)
lazy_processor = LazyProcessor(flat_stream_buffer)
return set_up_processing_server(lazy_processor)
queued_function = QueuedFunction(compose(first, flat_stream_buffer))
return set_up_processing_server(queued_function)
@pytest.fixture

View File

@ -3,14 +3,15 @@ from funcy import repeatedly, takewhile, notnone, lmap, lmapcat
from pyinfra.server.bufferizer.lazy_bufferizer import FlatStreamBuffer, StreamBuffer
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.server import LazyProcessor
from pyinfra.server.server import QueuedFunction
from pyinfra.utils.func import lift, foreach
@pytest.fixture
def func():
def func(one_to_many):
def fn(x):
return x ** 2
y = x ** 2
return y if not one_to_many else (y, y)
return fn
@ -38,9 +39,9 @@ def test_flat_stream_buffer(func, inputs, outputs, buffer_size):
assert list(flat_stream_buffer([])) == []
def test_lazy_processor(func, inputs, outputs):
stream_buffer = FlatStreamBuffer(lift(func))
lazy_processor = LazyProcessor(stream_buffer)
def test_queued_function(func, inputs, outputs):
lazy_processor = QueuedFunction(lift(func))
foreach(lazy_processor.push, inputs)