refactoring
This commit is contained in:
parent
abc56e6d9f
commit
b1a318872e
@ -1,12 +1,12 @@
|
||||
import logging
|
||||
from collections import deque
|
||||
from itertools import chain
|
||||
from itertools import chain, takewhile
|
||||
from typing import Union, Any
|
||||
|
||||
from funcy import flatten, compose
|
||||
from funcy import flatten, compose, repeatedly
|
||||
|
||||
from pyinfra.server.bufferizer.buffer import bufferize
|
||||
from pyinfra.server.dispatcher.dispatcher import Nothing
|
||||
from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing
|
||||
from pyinfra.utils.func import lift
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -43,12 +43,11 @@ class QueueProcessor:
|
||||
except TypeError as err:
|
||||
raise TypeError("Function failed with type error. Is it mappable?") from err
|
||||
|
||||
def consume_queue(self):
|
||||
return self.input_queue.popleft() if self.input_queue else Nothing
|
||||
|
||||
def stream_queue(self):
|
||||
while True:
|
||||
if self.input_queue:
|
||||
yield self.input_queue.popleft()
|
||||
else:
|
||||
break
|
||||
yield from takewhile(is_not_nothing, repeatedly(self.consume_queue))
|
||||
yield Nothing
|
||||
|
||||
def compute(self):
|
||||
|
||||
@ -8,6 +8,10 @@ class Nothing:
|
||||
pass
|
||||
|
||||
|
||||
def is_not_nothing(x):
|
||||
return x is not Nothing
|
||||
|
||||
|
||||
def has_next(peekable_iter):
|
||||
return peekable_iter.peek(Nothing) is not Nothing
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user