refactoring

This commit is contained in:
Matthias Bisping 2022-05-11 16:38:36 +02:00
parent 096068367f
commit eb8ace4ddd

View File

@ -1,41 +1,16 @@
import logging import logging
from collections import deque from collections import deque
from itertools import chain from itertools import chain, takewhile
from funcy import first from funcy import first, repeatedly, flatten
from pyinfra.server.bufferizer.buffer import bufferize from pyinfra.server.bufferizer.buffer import bufferize
from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing
from pyinfra.server.utils import unpack
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Queue:
def __init__(self):
self.__queue = deque()
def append(self, package) -> None:
self.__queue.append(package)
def popleft(self):
return self.__queue.popleft() if self.__queue else Nothing
def __bool__(self):
return bool(self.__queue)
# class QueueStreamer:
# def __init__(self, queue: Queue):
# self.queue = queue
#
# def __call__(self):
# return self.stream_queue()
#
# def stream_queue(self):
# yield from takewhile(is_not_nothing, repeatedly(self.queue.popleft))
# yield Nothing
# class OutputBuffer: # class OutputBuffer:
# def __init__(self, fn, queue_streamer): # def __init__(self, fn, queue_streamer):
# self.fn = fn # self.fn = fn
@ -75,6 +50,20 @@ class Queue:
# return bool(self.result_queue) # return bool(self.result_queue)
class Queue:
def __init__(self):
self.__queue = deque()
def append(self, package) -> None:
self.__queue.append(package)
def popleft(self):
return self.__queue.popleft() if self.__queue else Nothing
def __bool__(self):
return bool(self.__queue)
class InputOutputBuffer: class InputOutputBuffer:
def __init__(self, fn, buffer_size=3): def __init__(self, fn, buffer_size=3):
self.fn = bufferize(fn, buffer_size=buffer_size, null_value=[]) self.fn = bufferize(fn, buffer_size=buffer_size, null_value=[])
@ -88,7 +77,6 @@ class InputOutputBuffer:
def compute(self, item): def compute(self, item):
yield from self.fn(item) yield from self.fn(item)
yield Nothing
def get_next(self): def get_next(self):
return first(chain(self.result_stream, [Nothing])) return first(chain(self.result_stream, [Nothing]))
@ -96,25 +84,43 @@ class InputOutputBuffer:
class QueueBufferCoupler: class QueueBufferCoupler:
def __init__(self, queue: Queue, buffer: InputOutputBuffer): def __init__(self, queue: Queue, buffer: InputOutputBuffer):
self.queue = queue # self.queue = queue
self.queue = QueueStreamer(queue)
self.buffer = buffer self.buffer = buffer
self.result_stream = self.compute()
def __call__(self): def __call__(self):
return first(self.compute_next()) return self.compute_next()
def compute_next(self): def compute_next(self):
while True: return next(self.compute())
item = self.buffer.get_next()
if item is Nothing: def compute(self):
item = self.queue.popleft() for qitem in self.queue.stream_queue():
self.buffer.compute_next(item)
if item is Nothing:
break
else: self.buffer.compute_next(qitem)
yield item print("qitem", unpack(qitem) if qitem is not Nothing else qitem)
yield self.buffer.get_next() for ritem in takewhile(is_not_nothing, repeatedly(self.buffer.get_next)):
print("ritem", unpack(ritem) if ritem is not Nothing else ritem)
yield ritem
if qitem is Nothing:
break
yield Nothing
class QueueStreamer:
def __init__(self, queue: Queue):
self.queue = queue
def __call__(self):
return self.stream_queue()
def stream_queue(self):
yield from repeatedly(self.queue.popleft)
yield Nothing