refactoring: split stream processor into two functions; moved queue streaming Nothing check from caller to stream function
This commit is contained in:
parent
1a04dfb426
commit
e3793e5c7c
@ -70,6 +70,10 @@ class StreamBuffer:
|
||||
self.fn = bufferize(fn, buffer_size=buffer_size, null_value=[])
|
||||
self.result_stream = chain([])
|
||||
|
||||
def __call__(self, item):
|
||||
self.push(item)
|
||||
yield from takewhile(is_not_nothing, repeatedly(self.pop))
|
||||
|
||||
def push(self, item):
|
||||
try:
|
||||
self.result_stream = chain(self.result_stream, self.compute(item))
|
||||
@ -87,7 +91,6 @@ class QueueBufferCoupler:
|
||||
def __init__(self, queue: Queue, buffer: StreamBuffer):
|
||||
self.queue = queue
|
||||
self.buffer = buffer
|
||||
self.result_stream = self.compute()
|
||||
|
||||
def __call__(self):
|
||||
return self.compute_next()
|
||||
@ -101,7 +104,7 @@ class QueueBufferCoupler:
|
||||
yield self.__signal_termination()
|
||||
|
||||
def __consume_queue(self):
|
||||
queue_items = takewhile(is_not_nothing, stream_queue(self.queue))
|
||||
queue_items = stream_queue(self.queue)
|
||||
yield from chain.from_iterable(map(self.__add_to_buffer_and_consume, queue_items))
|
||||
|
||||
def __flush_buffer(self):
|
||||
@ -117,5 +120,4 @@ class QueueBufferCoupler:
|
||||
|
||||
|
||||
def stream_queue(queue):
|
||||
yield from repeatedly(queue.popleft)
|
||||
yield Nothing
|
||||
yield from takewhile(is_not_nothing, repeatedly(queue.popleft))
|
||||
|
||||
@ -1,10 +1,10 @@
|
||||
import logging
|
||||
from itertools import chain
|
||||
|
||||
from flask import Flask, jsonify, request
|
||||
from funcy import rcompose
|
||||
from funcy import compose, identity
|
||||
|
||||
from pyinfra.server.bufferizer.buffer import bufferize
|
||||
from pyinfra.server.bufferizer.lazy_bufferizer import Queue, StreamBuffer, QueueBufferCoupler
|
||||
from pyinfra.server.bufferizer.lazy_bufferizer import Queue, StreamBuffer, QueueBufferCoupler, stream_queue
|
||||
from pyinfra.server.dispatcher.dispatcher import Nothing
|
||||
from pyinfra.server.utils import unpack, normalize, pack
|
||||
from pyinfra.utils.func import starlift, lift
|
||||
@ -22,27 +22,18 @@ class ServerPipeline:
|
||||
pass
|
||||
|
||||
|
||||
class StreamProcessor:
|
||||
def __init__(self, fn):
|
||||
"""Function `fn` has to return an iterable and ideally is a generator."""
|
||||
self.pipe = rcompose(
|
||||
lift(unpack),
|
||||
fn,
|
||||
normalize,
|
||||
starlift(pack),
|
||||
)
|
||||
|
||||
def __call__(self, packages):
|
||||
return self.pipe(packages)
|
||||
def make_streamable(fn, batched):
|
||||
return compose(normalize, (identity if batched else starlift)(fn))
|
||||
|
||||
|
||||
def unpack_fn_pack(fn):
|
||||
return compose(starlift(pack), fn, lift(unpack))
|
||||
|
||||
|
||||
|
||||
def make_streamable(operation, batched, batch_size):
|
||||
operation = operation if batched else starlift(operation)
|
||||
operation = StreamProcessor(operation)
|
||||
return operation
|
||||
def make_streamable_and_wrap_in_packing_logic(fn, batched, batch_size):
|
||||
fn = make_streamable(fn, batched)
|
||||
fn = unpack_fn_pack(fn)
|
||||
return fn
|
||||
|
||||
|
||||
class RestStreamProcessor:
|
||||
|
||||
4
test/fixtures/server.py
vendored
4
test/fixtures/server.py
vendored
@ -9,7 +9,7 @@ from PIL import Image
|
||||
from funcy import retry
|
||||
from waitress import serve
|
||||
|
||||
from pyinfra.server.server import set_up_processing_server, make_streamable
|
||||
from pyinfra.server.server import set_up_processing_server, make_streamable_and_wrap_in_packing_logic
|
||||
from pyinfra.utils.func import starlift
|
||||
from test.utils.image import image_to_bytes
|
||||
|
||||
@ -42,7 +42,7 @@ def server(processor_fn):
|
||||
|
||||
@pytest.fixture
|
||||
def processor_fn(operation_conditionally_batched, buffer_size, batched):
|
||||
return make_streamable(operation_conditionally_batched, batched, buffer_size)
|
||||
return make_streamable_and_wrap_in_packing_logic(operation_conditionally_batched, batched, buffer_size)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
||||
@ -1,44 +1,44 @@
|
||||
from itertools import takewhile
|
||||
|
||||
import pytest
|
||||
from funcy import repeatedly
|
||||
|
||||
from pyinfra.server.dispatcher.dispatcher import Nothing
|
||||
from pyinfra.server.bufferizer.lazy_bufferizer import LazyBufferizer
|
||||
from pyinfra.utils.func import lift
|
||||
|
||||
|
||||
def func(x):
|
||||
return [x ** 2]
|
||||
|
||||
|
||||
def test_lazy_bufferizer():
|
||||
|
||||
lazy_bufferizer = LazyBufferizer(lift(func))
|
||||
|
||||
for i in range(10):
|
||||
lazy_bufferizer.submit(i)
|
||||
|
||||
output = list(takewhile(lambda r: r != Nothing, repeatedly(lazy_bufferizer.compute_next)))
|
||||
|
||||
assert output == [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
|
||||
|
||||
|
||||
def test_lazy_bufferizer_returns_nothing_when_input_queue_is_empty():
|
||||
|
||||
lazy_bufferizer = LazyBufferizer(lift(func))
|
||||
assert lazy_bufferizer.compute_next() is Nothing
|
||||
assert lazy_bufferizer.compute_next() is Nothing
|
||||
|
||||
lazy_bufferizer.submit(2)
|
||||
assert lazy_bufferizer.compute_next() == 4
|
||||
assert lazy_bufferizer.compute_next() is Nothing
|
||||
|
||||
|
||||
def test_lazy_bufferizer_raises_when_wrapped_function_is_not_mappable():
|
||||
|
||||
lazy_bufferizer = LazyBufferizer(func)
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
lazy_bufferizer.submit(2)
|
||||
assert lazy_bufferizer.compute_next() == 4
|
||||
# from itertools import takewhile
|
||||
#
|
||||
# import pytest
|
||||
# from funcy import repeatedly
|
||||
#
|
||||
# from pyinfra.server.dispatcher.dispatcher import Nothing
|
||||
# from pyinfra.server.bufferizer.lazy_bufferizer import LazyBufferizer
|
||||
# from pyinfra.utils.func import lift
|
||||
#
|
||||
#
|
||||
# def func(x):
|
||||
# return [x ** 2]
|
||||
#
|
||||
#
|
||||
# def test_lazy_bufferizer():
|
||||
#
|
||||
# lazy_bufferizer = LazyBufferizer(lift(func))
|
||||
#
|
||||
# for i in range(10):
|
||||
# lazy_bufferizer.submit(i)
|
||||
#
|
||||
# output = list(takewhile(lambda r: r != Nothing, repeatedly(lazy_bufferizer.compute_next)))
|
||||
#
|
||||
# assert output == [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
|
||||
#
|
||||
#
|
||||
# def test_lazy_bufferizer_returns_nothing_when_input_queue_is_empty():
|
||||
#
|
||||
# lazy_bufferizer = LazyBufferizer(lift(func))
|
||||
# assert lazy_bufferizer.compute_next() is Nothing
|
||||
# assert lazy_bufferizer.compute_next() is Nothing
|
||||
#
|
||||
# lazy_bufferizer.submit(2)
|
||||
# assert lazy_bufferizer.compute_next() == 4
|
||||
# assert lazy_bufferizer.compute_next() is Nothing
|
||||
#
|
||||
#
|
||||
# def test_lazy_bufferizer_raises_when_wrapped_function_is_not_mappable():
|
||||
#
|
||||
# lazy_bufferizer = LazyBufferizer(func)
|
||||
#
|
||||
# with pytest.raises(TypeError):
|
||||
# lazy_bufferizer.submit(2)
|
||||
# assert lazy_bufferizer.compute_next() == 4
|
||||
|
||||
17
test/unit_tests/server/stream_buffer_test.py
Normal file
17
test/unit_tests/server/stream_buffer_test.py
Normal file
@ -0,0 +1,17 @@
|
||||
from itertools import chain
|
||||
|
||||
import pytest
|
||||
|
||||
from pyinfra.server.bufferizer.lazy_bufferizer import StreamBuffer
|
||||
from pyinfra.server.dispatcher.dispatcher import Nothing
|
||||
from pyinfra.utils.func import lift
|
||||
|
||||
|
||||
@pytest.mark.parametrize("buffer_size", [0, 1, 3, 10, 12])
|
||||
def test_stream_buffer(buffer_size):
|
||||
def func(x):
|
||||
return x ** 2
|
||||
|
||||
func = StreamBuffer(lift(func))
|
||||
|
||||
assert list(chain(*map(func, [*range(10), Nothing]))) == [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
|
||||
Loading…
x
Reference in New Issue
Block a user