refactoring; renaming

This commit is contained in:
Matthias Bisping 2022-05-12 19:46:29 +02:00
parent 1552cd10cc
commit bfdce62ccf
3 changed files with 36 additions and 32 deletions

View File

@ -32,6 +32,7 @@ def make_buffered_consumer(fn, buffer_size=3):
"""Produces a function, which applied to n inputs, produces m >= n outputs, when called m times. If m > n, then the
function needs to be called m - n times with `Nothing` to return the remaining values.
"""
fn = StreamBuffer(fn, buffer_size=buffer_size)
def consume(items):

View File

@ -4,7 +4,7 @@ from itertools import chain
from flask import Flask, jsonify, request
from funcy import compose, identity
from pyinfra.server.bufferizer.lazy_bufferizer import Queue, stream_queue, make_buffered_consumer
from pyinfra.server.bufferizer.lazy_bufferizer import Queue, stream_queue
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.utils import unpack, normalize, pack
from pyinfra.utils.func import starlift, lift
@ -12,16 +12,6 @@ from pyinfra.utils.func import starlift, lift
logger = logging.getLogger()
class ServerPipeline:
def __init__(self):
"""
- dequeue
-
"""
pass
def make_streamable(fn, batched):
return compose(normalize, (identity if batched else starlift)(fn))
@ -30,28 +20,38 @@ def unpack_fn_pack(fn):
return compose(starlift(pack), fn, lift(unpack))
def make_streamable_and_wrap_in_packing_logic(fn, batched, batch_size):
def make_streamable_and_wrap_in_packing_logic(fn, batched):
fn = make_streamable(fn, batched)
fn = unpack_fn_pack(fn)
return fn
class RestStreamProcessor:
"""Wraps an on-demand-processor. Combine with a webserver that provides the endpoints 'submit' and 'pickup'."""
def __init__(self, fn, submit_suffix="submit", pickup_suffix="pickup"):
self.submit_suffix = submit_suffix
self.pickup_suffix = pickup_suffix
class LazyProcessor:
def __init__(self, buffered_consumer):
self.queue = Queue()
self.buffered_consumer = make_buffered_consumer(fn)
self.buffered_consumer = buffered_consumer
def submit(self, request):
self.queue.append(request.json)
return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix))
def push(self, item):
self.queue.append(item)
def pickup(self):
def pop(self):
items = chain(stream_queue(self.queue), [Nothing])
result = self.buffered_consumer(items)
return result
class LazyRestProcessor:
def __init__(self, lazy_processor, submit_suffix="submit", pickup_suffix="pickup"):
self.submit_suffix = submit_suffix
self.pickup_suffix = pickup_suffix
self.lazy_processor = lazy_processor
def push(self, request):
self.lazy_processor.push(request.json)
return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix))
def pop(self):
result = self.lazy_processor.pop()
if not valid(result):
logger.error(f"Received invalid result: {result}")
@ -72,9 +72,9 @@ def valid(result):
return isinstance(result, dict) or result is Nothing
def set_up_processing_server(process_fn):
def set_up_processing_server(package_processor):
app = Flask(__name__)
stream = RestStreamProcessor(process_fn, submit_suffix="submit", pickup_suffix="pickup")
stream = LazyRestProcessor(package_processor, submit_suffix="submit", pickup_suffix="pickup")
@app.route("/ready", methods=["GET"])
def ready():
@ -84,10 +84,10 @@ def set_up_processing_server(process_fn):
@app.route("/submit", methods=["POST", "PATCH"])
def submit():
return stream.submit(request)
return stream.push(request)
@app.route("/pickup", methods=["GET"])
def pickup():
return stream.pickup()
return stream.pop()
return app

View File

@ -9,7 +9,8 @@ from PIL import Image
from funcy import retry
from waitress import serve
from pyinfra.server.server import set_up_processing_server, make_streamable_and_wrap_in_packing_logic
from pyinfra.server.bufferizer.lazy_bufferizer import make_buffered_consumer
from pyinfra.server.server import set_up_processing_server, make_streamable_and_wrap_in_packing_logic, LazyProcessor
from pyinfra.utils.func import starlift
from test.utils.image import image_to_bytes
@ -36,13 +37,15 @@ def url(host, port):
@pytest.fixture
def server(processor_fn):
return set_up_processing_server(processor_fn)
def server(processor_fn, buffer_size):
buffered_consumer = make_buffered_consumer(processor_fn, buffer_size=buffer_size)
lazy_processor = LazyProcessor(buffered_consumer)
return set_up_processing_server(lazy_processor)
@pytest.fixture
def processor_fn(operation_conditionally_batched, buffer_size, batched):
return make_streamable_and_wrap_in_packing_logic(operation_conditionally_batched, batched, buffer_size)
def processor_fn(operation_conditionally_batched, batched):
return make_streamable_and_wrap_in_packing_logic(operation_conditionally_batched, batched)
@pytest.fixture