From eb8ace4ddda6e1139bc9aa737ee1ea4fcc4b2391 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Wed, 11 May 2022 16:38:36 +0200 Subject: [PATCH] refactoring --- pyinfra/server/bufferizer/lazy_bufferizer.py | 90 +++++++++++--------- 1 file changed, 48 insertions(+), 42 deletions(-) diff --git a/pyinfra/server/bufferizer/lazy_bufferizer.py b/pyinfra/server/bufferizer/lazy_bufferizer.py index a5cd89f..047b0f8 100644 --- a/pyinfra/server/bufferizer/lazy_bufferizer.py +++ b/pyinfra/server/bufferizer/lazy_bufferizer.py @@ -1,41 +1,16 @@ import logging from collections import deque -from itertools import chain +from itertools import chain, takewhile -from funcy import first +from funcy import first, repeatedly, flatten from pyinfra.server.bufferizer.buffer import bufferize -from pyinfra.server.dispatcher.dispatcher import Nothing +from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing +from pyinfra.server.utils import unpack logger = logging.getLogger(__name__) -class Queue: - def __init__(self): - self.__queue = deque() - - def append(self, package) -> None: - self.__queue.append(package) - - def popleft(self): - return self.__queue.popleft() if self.__queue else Nothing - - def __bool__(self): - return bool(self.__queue) - - -# class QueueStreamer: -# def __init__(self, queue: Queue): -# self.queue = queue -# -# def __call__(self): -# return self.stream_queue() -# -# def stream_queue(self): -# yield from takewhile(is_not_nothing, repeatedly(self.queue.popleft)) -# yield Nothing - - # class OutputBuffer: # def __init__(self, fn, queue_streamer): # self.fn = fn @@ -75,6 +50,20 @@ class Queue: # return bool(self.result_queue) +class Queue: + def __init__(self): + self.__queue = deque() + + def append(self, package) -> None: + self.__queue.append(package) + + def popleft(self): + return self.__queue.popleft() if self.__queue else Nothing + + def __bool__(self): + return bool(self.__queue) + + class InputOutputBuffer: def __init__(self, fn, buffer_size=3): self.fn = bufferize(fn, buffer_size=buffer_size, null_value=[]) @@ -88,7 +77,6 @@ class InputOutputBuffer: def compute(self, item): yield from self.fn(item) - yield Nothing def get_next(self): return first(chain(self.result_stream, [Nothing])) @@ -96,25 +84,43 @@ class InputOutputBuffer: class QueueBufferCoupler: def __init__(self, queue: Queue, buffer: InputOutputBuffer): - self.queue = queue + # self.queue = queue + self.queue = QueueStreamer(queue) self.buffer = buffer + self.result_stream = self.compute() def __call__(self): - return first(self.compute_next()) + return self.compute_next() def compute_next(self): - while True: - item = self.buffer.get_next() + return next(self.compute()) - if item is Nothing: + def compute(self): - item = self.queue.popleft() - self.buffer.compute_next(item) - if item is Nothing: - break + for qitem in self.queue.stream_queue(): - else: - yield item + self.buffer.compute_next(qitem) + print("qitem", unpack(qitem) if qitem is not Nothing else qitem) - yield self.buffer.get_next() + for ritem in takewhile(is_not_nothing, repeatedly(self.buffer.get_next)): + print("ritem", unpack(ritem) if ritem is not Nothing else ritem) + + yield ritem + + if qitem is Nothing: + break + + yield Nothing + + +class QueueStreamer: + def __init__(self, queue: Queue): + self.queue = queue + + def __call__(self): + return self.stream_queue() + + def stream_queue(self): + yield from repeatedly(self.queue.popleft) + yield Nothing