renaming; adjusted and added tests for lazy bufferize (formerlay on-demand processor)

This commit is contained in:
Matthias Bisping 2022-05-10 12:11:32 +02:00
parent 949413af4a
commit 83ce7692e6
7 changed files with 64 additions and 31 deletions

View File

@ -1,18 +1,21 @@
import logging
from itertools import chain
from typing import Union, Any
from funcy import flatten, compose, compact
from funcy import flatten, compose
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.utils.buffer import bufferize
from pyinfra.server.processor.buffer import bufferize
from pyinfra.utils.func import lift
logger = logging.getLogger(__name__)
class OnDemandProcessor:
def __init__(self, fn):
"""Function `fn` has to return an iterable and ideally is a generator."""
class LazyBufferizer:
def __init__(self, fn, buffer_size=3):
"""Function `fn` has to return an iterable, ideally is a generator and needs to be mappable."""
self.execution_queue = chain([])
self.fn = bufferize(fn, null_value=[])
self.fn = bufferize(fn, buffer_size=buffer_size, null_value=[])
self.result_stream = chain([])
def submit(self, package, **kwargs) -> None:
@ -23,7 +26,9 @@ class OnDemandProcessor:
return next(self.result_stream)
except StopIteration:
self.result_stream = chain(self.result_stream, self.compute())
return self.compute_next()
return self.compute_next() # Produces stream of `Nothing` if execution queue is empty
except TypeError as err:
raise TypeError("Function failed with type error. Is it mappable?") from err
def compute(self):
items = chain(self.execution_queue, [Nothing])

View File

@ -5,7 +5,7 @@ from flask import Flask, jsonify, request
from funcy import rcompose, compose, first, chunks, flatten
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.processor.processor import OnDemandProcessor
from pyinfra.server.processor.lazy_bufferizer import LazyBufferizer
from pyinfra.server.utils import unpack, normalize, pack
from pyinfra.utils.func import starlift, lift
@ -32,7 +32,7 @@ def make_streamable(operation, batched, batch_size):
return operation
class RestStreamProcessor(OnDemandProcessor):
class RestStreamProcessor(LazyBufferizer):
"""Wraps an on-demand-processor. Combine with a webserver that provides the endpoints 'submit' and 'pickup'."""
def __init__(self, fn, submit_suffix="submit", pickup_suffix="pickup"):

View File

@ -31,7 +31,11 @@ pytest_plugins = [
]
@pytest.fixture(autouse=True)
logging.getLogger("PIL.PngImagePlugin").setLevel(level=logging.CRITICAL + 1)
logging.getLogger("waitress").setLevel(level=logging.CRITICAL + 1)
@pytest.fixture(autouse=False)
def mute_logger():
logger.setLevel(logging.CRITICAL + 1)

View File

@ -1,7 +1,7 @@
from funcy import compose, lmapcat, compact, flatten
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.utils.buffer import bufferize
from pyinfra.server.processor.buffer import bufferize
def test_buffer():

View File

@ -0,0 +1,44 @@
from itertools import takewhile
import pytest
from funcy import repeatedly
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.processor.lazy_bufferizer import LazyBufferizer
from pyinfra.utils.func import lift
def func(x):
return [x ** 2]
def test_lazy_bufferizer():
lazy_bufferizer = LazyBufferizer(lift(func))
for i in range(10):
lazy_bufferizer.submit(i)
output = list(takewhile(lambda r: r != Nothing, repeatedly(lazy_bufferizer.compute_next)))
assert output == [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
def test_lazy_bufferizer_returns_nothing_when_input_queue_is_empty():
lazy_bufferizer = LazyBufferizer(lift(func))
assert lazy_bufferizer.compute_next() is Nothing
assert lazy_bufferizer.compute_next() is Nothing
lazy_bufferizer.submit(2)
assert lazy_bufferizer.compute_next() == 4
assert lazy_bufferizer.compute_next() is Nothing
def test_lazy_bufferizer_raises_when_wrapped_function_is_not_mappable():
lazy_bufferizer = LazyBufferizer(func)
with pytest.raises(TypeError):
lazy_bufferizer.submit(2)
assert lazy_bufferizer.compute_next() == 4

View File

@ -1,20 +0,0 @@
from itertools import takewhile
from funcy import repeatedly
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.processor.processor import OnDemandProcessor
def test_processor():
def func(x):
return [x ** 2]
processor = OnDemandProcessor(func)
for i in range(10):
processor.submit(i)
output = list(takewhile(lambda r: r != Nothing, repeatedly(processor.compute_next)))
assert output == [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]