diff --git a/pyinfra/server/bufferizer/lazy_bufferizer.py b/pyinfra/server/bufferizer/lazy_bufferizer.py index 687a95c..9fdb976 100644 --- a/pyinfra/server/bufferizer/lazy_bufferizer.py +++ b/pyinfra/server/bufferizer/lazy_bufferizer.py @@ -1,12 +1,12 @@ import logging from collections import deque -from itertools import chain +from itertools import chain, takewhile from typing import Union, Any -from funcy import flatten, compose +from funcy import flatten, compose, repeatedly from pyinfra.server.bufferizer.buffer import bufferize -from pyinfra.server.dispatcher.dispatcher import Nothing +from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing from pyinfra.utils.func import lift logger = logging.getLogger(__name__) @@ -43,12 +43,11 @@ class QueueProcessor: except TypeError as err: raise TypeError("Function failed with type error. Is it mappable?") from err + def consume_queue(self): + return self.input_queue.popleft() if self.input_queue else Nothing + def stream_queue(self): - while True: - if self.input_queue: - yield self.input_queue.popleft() - else: - break + yield from takewhile(is_not_nothing, repeatedly(self.consume_queue)) yield Nothing def compute(self): diff --git a/pyinfra/server/dispatcher/dispatcher.py b/pyinfra/server/dispatcher/dispatcher.py index 3f71804..a9c5ba1 100644 --- a/pyinfra/server/dispatcher/dispatcher.py +++ b/pyinfra/server/dispatcher/dispatcher.py @@ -8,6 +8,10 @@ class Nothing: pass +def is_not_nothing(x): + return x is not Nothing + + def has_next(peekable_iter): return peekable_iter.peek(Nothing) is not Nothing