diff --git a/pyinfra/server/bufferizer/lazy_bufferizer.py b/pyinfra/server/bufferizer/lazy_bufferizer.py index e8e3937..ad40312 100644 --- a/pyinfra/server/bufferizer/lazy_bufferizer.py +++ b/pyinfra/server/bufferizer/lazy_bufferizer.py @@ -1,6 +1,7 @@ import logging from collections import deque from itertools import chain, takewhile +from typing import Iterable from funcy import first, repeatedly, flatten @@ -48,7 +49,7 @@ class StreamBuffer: self.fn = bufferize(fn, buffer_size=buffer_size, null_value=[]) self.result_stream = chain([]) - def __call__(self, item): + def __call__(self, item) -> Iterable: self.push(item) yield from takewhile(is_not_nothing, repeatedly(self.pop)) diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index 0bfb1ad..18ba11a 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -27,16 +27,16 @@ def make_streamable_and_wrap_in_packing_logic(fn, batched): class LazyProcessor: """Accepts computation requests (push) and lazily produces results (pop).""" - def __init__(self, stream_buffer: FlatStreamBuffer): + def __init__(self, flat_stream_buffer: FlatStreamBuffer): self.queue = Queue() - self.stream_buffer = stream_buffer + self.flat_stream_buffer = flat_stream_buffer def push(self, item): self.queue.append(item) def pop(self): items = stream_queue(self.queue) - return first(self.stream_buffer(items)) + return first(self.flat_stream_buffer(items)) class LazyRestProcessor: @@ -73,7 +73,7 @@ def valid(result): def set_up_processing_server(package_processor): app = Flask(__name__) - stream = LazyRestProcessor(package_processor, submit_suffix="submit", pickup_suffix="pickup") + processor = LazyRestProcessor(package_processor, submit_suffix="submit", pickup_suffix="pickup") @app.route("/ready", methods=["GET"]) def ready(): @@ -83,10 +83,10 @@ def set_up_processing_server(package_processor): @app.route("/submit", methods=["POST", "PATCH"]) def submit(): - return stream.push(request) + return processor.push(request) @app.route("/pickup", methods=["GET"]) def pickup(): - return stream.pop() + return processor.pop() return app diff --git a/test/fixtures/server.py b/test/fixtures/server.py index 2307d6d..393e653 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -38,8 +38,8 @@ def url(host, port): @pytest.fixture def server(processor_fn, buffer_size): - buffered_consumer = FlatStreamBuffer(processor_fn, buffer_size=buffer_size) - lazy_processor = LazyProcessor(buffered_consumer) + flat_stream_buffer = FlatStreamBuffer(processor_fn, buffer_size=buffer_size) + lazy_processor = LazyProcessor(flat_stream_buffer) return set_up_processing_server(lazy_processor) @@ -54,7 +54,7 @@ def operation_conditionally_batched(operation, batched): @pytest.fixture -def operation(item_type, batched, one_to_many): +def operation(item_type, one_to_many): def upper(string: bytes, metadata): return string.decode().upper().encode(), metadata