This commit is contained in:
Matthias Bisping 2022-05-13 15:02:02 +02:00
parent 3b7605772e
commit 9870aa38d1
3 changed files with 11 additions and 10 deletions

View File

@ -1,6 +1,7 @@
import logging import logging
from collections import deque from collections import deque
from itertools import chain, takewhile from itertools import chain, takewhile
from typing import Iterable
from funcy import first, repeatedly, flatten from funcy import first, repeatedly, flatten
@ -48,7 +49,7 @@ class StreamBuffer:
self.fn = bufferize(fn, buffer_size=buffer_size, null_value=[]) self.fn = bufferize(fn, buffer_size=buffer_size, null_value=[])
self.result_stream = chain([]) self.result_stream = chain([])
def __call__(self, item): def __call__(self, item) -> Iterable:
self.push(item) self.push(item)
yield from takewhile(is_not_nothing, repeatedly(self.pop)) yield from takewhile(is_not_nothing, repeatedly(self.pop))

View File

@ -27,16 +27,16 @@ def make_streamable_and_wrap_in_packing_logic(fn, batched):
class LazyProcessor: class LazyProcessor:
"""Accepts computation requests (push) and lazily produces results (pop).""" """Accepts computation requests (push) and lazily produces results (pop)."""
def __init__(self, stream_buffer: FlatStreamBuffer): def __init__(self, flat_stream_buffer: FlatStreamBuffer):
self.queue = Queue() self.queue = Queue()
self.stream_buffer = stream_buffer self.flat_stream_buffer = flat_stream_buffer
def push(self, item): def push(self, item):
self.queue.append(item) self.queue.append(item)
def pop(self): def pop(self):
items = stream_queue(self.queue) items = stream_queue(self.queue)
return first(self.stream_buffer(items)) return first(self.flat_stream_buffer(items))
class LazyRestProcessor: class LazyRestProcessor:
@ -73,7 +73,7 @@ def valid(result):
def set_up_processing_server(package_processor): def set_up_processing_server(package_processor):
app = Flask(__name__) app = Flask(__name__)
stream = LazyRestProcessor(package_processor, submit_suffix="submit", pickup_suffix="pickup") processor = LazyRestProcessor(package_processor, submit_suffix="submit", pickup_suffix="pickup")
@app.route("/ready", methods=["GET"]) @app.route("/ready", methods=["GET"])
def ready(): def ready():
@ -83,10 +83,10 @@ def set_up_processing_server(package_processor):
@app.route("/submit", methods=["POST", "PATCH"]) @app.route("/submit", methods=["POST", "PATCH"])
def submit(): def submit():
return stream.push(request) return processor.push(request)
@app.route("/pickup", methods=["GET"]) @app.route("/pickup", methods=["GET"])
def pickup(): def pickup():
return stream.pop() return processor.pop()
return app return app

View File

@ -38,8 +38,8 @@ def url(host, port):
@pytest.fixture @pytest.fixture
def server(processor_fn, buffer_size): def server(processor_fn, buffer_size):
buffered_consumer = FlatStreamBuffer(processor_fn, buffer_size=buffer_size) flat_stream_buffer = FlatStreamBuffer(processor_fn, buffer_size=buffer_size)
lazy_processor = LazyProcessor(buffered_consumer) lazy_processor = LazyProcessor(flat_stream_buffer)
return set_up_processing_server(lazy_processor) return set_up_processing_server(lazy_processor)
@ -54,7 +54,7 @@ def operation_conditionally_batched(operation, batched):
@pytest.fixture @pytest.fixture
def operation(item_type, batched, one_to_many): def operation(item_type, one_to_many):
def upper(string: bytes, metadata): def upper(string: bytes, metadata):
return string.decode().upper().encode(), metadata return string.decode().upper().encode(), metadata