From abc56e6d9fedc9dc160bfe25f1a375cb49d505e7 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Wed, 11 May 2022 10:07:34 +0200 Subject: [PATCH] refactoring: factored out queue processor fom lazy bufferizer --- pyinfra/server/bufferizer/lazy_bufferizer.py | 36 +++++++++++++++----- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/pyinfra/server/bufferizer/lazy_bufferizer.py b/pyinfra/server/bufferizer/lazy_bufferizer.py index 614378a..687a95c 100644 --- a/pyinfra/server/bufferizer/lazy_bufferizer.py +++ b/pyinfra/server/bufferizer/lazy_bufferizer.py @@ -1,11 +1,12 @@ import logging +from collections import deque from itertools import chain from typing import Union, Any from funcy import flatten, compose -from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.bufferizer.buffer import bufferize +from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.utils.func import lift logger = logging.getLogger(__name__) @@ -14,12 +15,24 @@ logger = logging.getLogger(__name__) class LazyBufferizer: def __init__(self, fn, buffer_size=3): """Function `fn` has to return an iterable, ideally is a generator and needs to be mappable.""" - self.input_queue = chain([]) - self.fn = bufferize(fn, buffer_size=buffer_size, null_value=[]) - self.result_stream = chain([]) + self.input_queue = deque() + self.queue_processor = QueueProcessor(bufferize(fn, buffer_size=buffer_size, null_value=[]), self.input_queue) def submit(self, package, **kwargs) -> None: - self.input_queue = chain(self.input_queue, [package]) + self.input_queue.append(package) + + def compute_next(self): + return self.queue_processor.compute_next() + + +class QueueProcessor: + def __init__(self, fn, queue): + self.fn = fn + self.input_queue = queue + self.result_stream = chain([]) + + def __call__(self): + return self.compute_next() def compute_next(self) -> Union[Nothing, Any]: try: @@ -30,7 +43,14 @@ class LazyBufferizer: except TypeError as err: raise TypeError("Function failed with type error. Is it mappable?") from err - def compute(self): - items = chain(self.input_queue, [Nothing]) - yield from compose(flatten, lift(self.fn))(items) + def stream_queue(self): + while True: + if self.input_queue: + yield self.input_queue.popleft() + else: + break + yield Nothing + + def compute(self): + yield from compose(flatten, lift(self.fn))(self.stream_queue()) yield Nothing