This commit is contained in:
Matthias Bisping 2022-05-10 12:49:36 +02:00
parent c68e19e6e4
commit c8579a8ad0

View File

@ -14,12 +14,12 @@ logger = logging.getLogger(__name__)
class LazyBufferizer:
def __init__(self, fn, buffer_size=3):
"""Function `fn` has to return an iterable, ideally is a generator and needs to be mappable."""
self.execution_queue = chain([])
self.input_queue = chain([])
self.fn = bufferize(fn, buffer_size=buffer_size, null_value=[])
self.result_stream = chain([])
def submit(self, package, **kwargs) -> None:
self.execution_queue = chain(self.execution_queue, [package])
self.input_queue = chain(self.input_queue, [package])
def compute_next(self) -> Union[Nothing, Any]:
try:
@ -31,6 +31,6 @@ class LazyBufferizer:
raise TypeError("Function failed with type error. Is it mappable?") from err
def compute(self):
items = chain(self.execution_queue, [Nothing])
items = chain(self.input_queue, [Nothing])
yield from compose(flatten, lift(self.fn))(items)
yield Nothing