refactoring
This commit is contained in:
parent
da2572b8be
commit
e151d2005b
@ -83,7 +83,7 @@ class InputOutputBuffer:
|
||||
|
||||
class QueueBufferCoupler:
|
||||
def __init__(self, queue: Queue, buffer: InputOutputBuffer):
|
||||
self.queue_streamer = QueueStreamer(queue)
|
||||
self.queue = queue
|
||||
self.buffer = buffer
|
||||
self.result_stream = self.compute()
|
||||
|
||||
@ -99,7 +99,7 @@ class QueueBufferCoupler:
|
||||
yield self.__signal_termination()
|
||||
|
||||
def __consume_queue(self):
|
||||
queue_items = takewhile(is_not_nothing, self.queue_streamer())
|
||||
queue_items = takewhile(is_not_nothing, stream_queue(self.queue))
|
||||
yield from chain.from_iterable(map(self.__add_to_buffer_and_consume, queue_items))
|
||||
|
||||
def __flush_buffer(self):
|
||||
@ -114,13 +114,6 @@ class QueueBufferCoupler:
|
||||
return Nothing
|
||||
|
||||
|
||||
class QueueStreamer:
|
||||
def __init__(self, queue: Queue):
|
||||
self.queue = queue
|
||||
|
||||
def __call__(self):
|
||||
return self.stream_queue()
|
||||
|
||||
def stream_queue(self):
|
||||
yield from repeatedly(self.queue.popleft)
|
||||
yield Nothing
|
||||
def stream_queue(queue):
|
||||
yield from repeatedly(queue.popleft)
|
||||
yield Nothing
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user