From e3793e5c7cebba6343b18d1a28bbe1548f980f9a Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Thu, 12 May 2022 17:12:26 +0200 Subject: [PATCH] refactoring: split stream processor into two functions; moved queue streaming Nothing check from caller to stream function --- pyinfra/server/bufferizer/lazy_bufferizer.py | 10 ++- pyinfra/server/server.py | 31 +++---- test/fixtures/server.py | 4 +- .../unit_tests/server/lazy_bufferizer_test.py | 88 +++++++++---------- test/unit_tests/server/stream_buffer_test.py | 17 ++++ 5 files changed, 80 insertions(+), 70 deletions(-) create mode 100644 test/unit_tests/server/stream_buffer_test.py diff --git a/pyinfra/server/bufferizer/lazy_bufferizer.py b/pyinfra/server/bufferizer/lazy_bufferizer.py index 83c2de1..06727a0 100644 --- a/pyinfra/server/bufferizer/lazy_bufferizer.py +++ b/pyinfra/server/bufferizer/lazy_bufferizer.py @@ -70,6 +70,10 @@ class StreamBuffer: self.fn = bufferize(fn, buffer_size=buffer_size, null_value=[]) self.result_stream = chain([]) + def __call__(self, item): + self.push(item) + yield from takewhile(is_not_nothing, repeatedly(self.pop)) + def push(self, item): try: self.result_stream = chain(self.result_stream, self.compute(item)) @@ -87,7 +91,6 @@ class QueueBufferCoupler: def __init__(self, queue: Queue, buffer: StreamBuffer): self.queue = queue self.buffer = buffer - self.result_stream = self.compute() def __call__(self): return self.compute_next() @@ -101,7 +104,7 @@ class QueueBufferCoupler: yield self.__signal_termination() def __consume_queue(self): - queue_items = takewhile(is_not_nothing, stream_queue(self.queue)) + queue_items = stream_queue(self.queue) yield from chain.from_iterable(map(self.__add_to_buffer_and_consume, queue_items)) def __flush_buffer(self): @@ -117,5 +120,4 @@ class QueueBufferCoupler: def stream_queue(queue): - yield from repeatedly(queue.popleft) - yield Nothing + yield from takewhile(is_not_nothing, repeatedly(queue.popleft)) diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index b0e1196..9e77176 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -1,10 +1,10 @@ import logging +from itertools import chain from flask import Flask, jsonify, request -from funcy import rcompose +from funcy import compose, identity -from pyinfra.server.bufferizer.buffer import bufferize -from pyinfra.server.bufferizer.lazy_bufferizer import Queue, StreamBuffer, QueueBufferCoupler +from pyinfra.server.bufferizer.lazy_bufferizer import Queue, StreamBuffer, QueueBufferCoupler, stream_queue from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.utils import unpack, normalize, pack from pyinfra.utils.func import starlift, lift @@ -22,27 +22,18 @@ class ServerPipeline: pass -class StreamProcessor: - def __init__(self, fn): - """Function `fn` has to return an iterable and ideally is a generator.""" - self.pipe = rcompose( - lift(unpack), - fn, - normalize, - starlift(pack), - ) - - def __call__(self, packages): - return self.pipe(packages) +def make_streamable(fn, batched): + return compose(normalize, (identity if batched else starlift)(fn)) +def unpack_fn_pack(fn): + return compose(starlift(pack), fn, lift(unpack)) - -def make_streamable(operation, batched, batch_size): - operation = operation if batched else starlift(operation) - operation = StreamProcessor(operation) - return operation +def make_streamable_and_wrap_in_packing_logic(fn, batched, batch_size): + fn = make_streamable(fn, batched) + fn = unpack_fn_pack(fn) + return fn class RestStreamProcessor: diff --git a/test/fixtures/server.py b/test/fixtures/server.py index 807e9a6..ef6bb83 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -9,7 +9,7 @@ from PIL import Image from funcy import retry from waitress import serve -from pyinfra.server.server import set_up_processing_server, make_streamable +from pyinfra.server.server import set_up_processing_server, make_streamable_and_wrap_in_packing_logic from pyinfra.utils.func import starlift from test.utils.image import image_to_bytes @@ -42,7 +42,7 @@ def server(processor_fn): @pytest.fixture def processor_fn(operation_conditionally_batched, buffer_size, batched): - return make_streamable(operation_conditionally_batched, batched, buffer_size) + return make_streamable_and_wrap_in_packing_logic(operation_conditionally_batched, batched, buffer_size) @pytest.fixture diff --git a/test/unit_tests/server/lazy_bufferizer_test.py b/test/unit_tests/server/lazy_bufferizer_test.py index 225eb68..5cc092f 100644 --- a/test/unit_tests/server/lazy_bufferizer_test.py +++ b/test/unit_tests/server/lazy_bufferizer_test.py @@ -1,44 +1,44 @@ -from itertools import takewhile - -import pytest -from funcy import repeatedly - -from pyinfra.server.dispatcher.dispatcher import Nothing -from pyinfra.server.bufferizer.lazy_bufferizer import LazyBufferizer -from pyinfra.utils.func import lift - - -def func(x): - return [x ** 2] - - -def test_lazy_bufferizer(): - - lazy_bufferizer = LazyBufferizer(lift(func)) - - for i in range(10): - lazy_bufferizer.submit(i) - - output = list(takewhile(lambda r: r != Nothing, repeatedly(lazy_bufferizer.compute_next))) - - assert output == [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] - - -def test_lazy_bufferizer_returns_nothing_when_input_queue_is_empty(): - - lazy_bufferizer = LazyBufferizer(lift(func)) - assert lazy_bufferizer.compute_next() is Nothing - assert lazy_bufferizer.compute_next() is Nothing - - lazy_bufferizer.submit(2) - assert lazy_bufferizer.compute_next() == 4 - assert lazy_bufferizer.compute_next() is Nothing - - -def test_lazy_bufferizer_raises_when_wrapped_function_is_not_mappable(): - - lazy_bufferizer = LazyBufferizer(func) - - with pytest.raises(TypeError): - lazy_bufferizer.submit(2) - assert lazy_bufferizer.compute_next() == 4 +# from itertools import takewhile +# +# import pytest +# from funcy import repeatedly +# +# from pyinfra.server.dispatcher.dispatcher import Nothing +# from pyinfra.server.bufferizer.lazy_bufferizer import LazyBufferizer +# from pyinfra.utils.func import lift +# +# +# def func(x): +# return [x ** 2] +# +# +# def test_lazy_bufferizer(): +# +# lazy_bufferizer = LazyBufferizer(lift(func)) +# +# for i in range(10): +# lazy_bufferizer.submit(i) +# +# output = list(takewhile(lambda r: r != Nothing, repeatedly(lazy_bufferizer.compute_next))) +# +# assert output == [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] +# +# +# def test_lazy_bufferizer_returns_nothing_when_input_queue_is_empty(): +# +# lazy_bufferizer = LazyBufferizer(lift(func)) +# assert lazy_bufferizer.compute_next() is Nothing +# assert lazy_bufferizer.compute_next() is Nothing +# +# lazy_bufferizer.submit(2) +# assert lazy_bufferizer.compute_next() == 4 +# assert lazy_bufferizer.compute_next() is Nothing +# +# +# def test_lazy_bufferizer_raises_when_wrapped_function_is_not_mappable(): +# +# lazy_bufferizer = LazyBufferizer(func) +# +# with pytest.raises(TypeError): +# lazy_bufferizer.submit(2) +# assert lazy_bufferizer.compute_next() == 4 diff --git a/test/unit_tests/server/stream_buffer_test.py b/test/unit_tests/server/stream_buffer_test.py new file mode 100644 index 0000000..966e3c9 --- /dev/null +++ b/test/unit_tests/server/stream_buffer_test.py @@ -0,0 +1,17 @@ +from itertools import chain + +import pytest + +from pyinfra.server.bufferizer.lazy_bufferizer import StreamBuffer +from pyinfra.server.dispatcher.dispatcher import Nothing +from pyinfra.utils.func import lift + + +@pytest.mark.parametrize("buffer_size", [0, 1, 3, 10, 12]) +def test_stream_buffer(buffer_size): + def func(x): + return x ** 2 + + func = StreamBuffer(lift(func)) + + assert list(chain(*map(func, [*range(10), Nothing]))) == [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]