refactoring: factored out queue processor fom lazy bufferizer

This commit is contained in:
Matthias Bisping 2022-05-11 10:07:34 +02:00
parent c8579a8ad0
commit abc56e6d9f

View File

@ -1,11 +1,12 @@
import logging
from collections import deque
from itertools import chain
from typing import Union, Any
from funcy import flatten, compose
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.bufferizer.buffer import bufferize
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.utils.func import lift
logger = logging.getLogger(__name__)
@ -14,12 +15,24 @@ logger = logging.getLogger(__name__)
class LazyBufferizer:
def __init__(self, fn, buffer_size=3):
"""Function `fn` has to return an iterable, ideally is a generator and needs to be mappable."""
self.input_queue = chain([])
self.fn = bufferize(fn, buffer_size=buffer_size, null_value=[])
self.result_stream = chain([])
self.input_queue = deque()
self.queue_processor = QueueProcessor(bufferize(fn, buffer_size=buffer_size, null_value=[]), self.input_queue)
def submit(self, package, **kwargs) -> None:
self.input_queue = chain(self.input_queue, [package])
self.input_queue.append(package)
def compute_next(self):
return self.queue_processor.compute_next()
class QueueProcessor:
def __init__(self, fn, queue):
self.fn = fn
self.input_queue = queue
self.result_stream = chain([])
def __call__(self):
return self.compute_next()
def compute_next(self) -> Union[Nothing, Any]:
try:
@ -30,7 +43,14 @@ class LazyBufferizer:
except TypeError as err:
raise TypeError("Function failed with type error. Is it mappable?") from err
def compute(self):
items = chain(self.input_queue, [Nothing])
yield from compose(flatten, lift(self.fn))(items)
def stream_queue(self):
while True:
if self.input_queue:
yield self.input_queue.popleft()
else:
break
yield Nothing
def compute(self):
yield from compose(flatten, lift(self.fn))(self.stream_queue())
yield Nothing