refactoring
This commit is contained in:
parent
eb8ace4ddd
commit
da2572b8be
@ -2,11 +2,10 @@ import logging
|
|||||||
from collections import deque
|
from collections import deque
|
||||||
from itertools import chain, takewhile
|
from itertools import chain, takewhile
|
||||||
|
|
||||||
from funcy import first, repeatedly, flatten
|
from funcy import first, repeatedly
|
||||||
|
|
||||||
from pyinfra.server.bufferizer.buffer import bufferize
|
from pyinfra.server.bufferizer.buffer import bufferize
|
||||||
from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing
|
from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing
|
||||||
from pyinfra.server.utils import unpack
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -84,8 +83,7 @@ class InputOutputBuffer:
|
|||||||
|
|
||||||
class QueueBufferCoupler:
|
class QueueBufferCoupler:
|
||||||
def __init__(self, queue: Queue, buffer: InputOutputBuffer):
|
def __init__(self, queue: Queue, buffer: InputOutputBuffer):
|
||||||
# self.queue = queue
|
self.queue_streamer = QueueStreamer(queue)
|
||||||
self.queue = QueueStreamer(queue)
|
|
||||||
self.buffer = buffer
|
self.buffer = buffer
|
||||||
self.result_stream = self.compute()
|
self.result_stream = self.compute()
|
||||||
|
|
||||||
@ -96,22 +94,24 @@ class QueueBufferCoupler:
|
|||||||
return next(self.compute())
|
return next(self.compute())
|
||||||
|
|
||||||
def compute(self):
|
def compute(self):
|
||||||
|
yield from self.__consume_queue()
|
||||||
|
yield from self.__flush_buffer()
|
||||||
|
yield self.__signal_termination()
|
||||||
|
|
||||||
for qitem in self.queue.stream_queue():
|
def __consume_queue(self):
|
||||||
|
queue_items = takewhile(is_not_nothing, self.queue_streamer())
|
||||||
|
yield from chain.from_iterable(map(self.__add_to_buffer_and_consume, queue_items))
|
||||||
|
|
||||||
self.buffer.compute_next(qitem)
|
def __flush_buffer(self):
|
||||||
print("qitem", unpack(qitem) if qitem is not Nothing else qitem)
|
yield from self.__add_to_buffer_and_consume(Nothing)
|
||||||
|
|
||||||
for ritem in takewhile(is_not_nothing, repeatedly(self.buffer.get_next)):
|
def __add_to_buffer_and_consume(self, queue_item):
|
||||||
|
self.buffer.compute_next(queue_item)
|
||||||
|
yield from takewhile(is_not_nothing, repeatedly(self.buffer.get_next))
|
||||||
|
|
||||||
print("ritem", unpack(ritem) if ritem is not Nothing else ritem)
|
@staticmethod
|
||||||
|
def __signal_termination():
|
||||||
yield ritem
|
return Nothing
|
||||||
|
|
||||||
if qitem is Nothing:
|
|
||||||
break
|
|
||||||
|
|
||||||
yield Nothing
|
|
||||||
|
|
||||||
|
|
||||||
class QueueStreamer:
|
class QueueStreamer:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user