refactoring: split queue processor into output buffer and queue streamer
This commit is contained in:
parent
ccf7a7379d
commit
1eb4dbc657
@ -22,10 +22,22 @@ class Queue:
|
||||
return self.__queue.popleft() if self.__queue else Nothing
|
||||
|
||||
|
||||
class QueueProcessor:
|
||||
def __init__(self, fn, queue: Queue):
|
||||
self.fn = fn
|
||||
class QueueStreamer:
|
||||
def __init__(self, queue: Queue):
|
||||
self.queue = queue
|
||||
|
||||
def __call__(self):
|
||||
return self.stream_queue()
|
||||
|
||||
def stream_queue(self):
|
||||
yield from takewhile(is_not_nothing, repeatedly(self.queue.popleft))
|
||||
yield Nothing
|
||||
|
||||
|
||||
class OutputBuffer:
|
||||
def __init__(self, fn, queue_streamer):
|
||||
self.fn = fn
|
||||
self.queue_streamer = queue_streamer
|
||||
self.result_stream = chain([])
|
||||
|
||||
def __call__(self):
|
||||
@ -40,10 +52,6 @@ class QueueProcessor:
|
||||
except TypeError as err:
|
||||
raise TypeError("Function failed with type error. Is it mappable?") from err
|
||||
|
||||
def stream_queue(self):
|
||||
yield from takewhile(is_not_nothing, repeatedly(self.queue.popleft))
|
||||
yield Nothing
|
||||
|
||||
def compute(self):
|
||||
yield from compose(flatten, lift(self.fn))(self.stream_queue())
|
||||
yield from compose(flatten, lift(self.fn))(self.queue_streamer())
|
||||
yield Nothing
|
||||
|
||||
@ -4,7 +4,7 @@ from flask import Flask, jsonify, request
|
||||
from funcy import rcompose
|
||||
|
||||
from pyinfra.server.bufferizer.buffer import bufferize
|
||||
from pyinfra.server.bufferizer.lazy_bufferizer import QueueProcessor, Queue
|
||||
from pyinfra.server.bufferizer.lazy_bufferizer import QueueStreamer, Queue, OutputBuffer
|
||||
from pyinfra.server.dispatcher.dispatcher import Nothing
|
||||
from pyinfra.server.utils import unpack, normalize, pack
|
||||
from pyinfra.utils.func import starlift, lift
|
||||
@ -49,7 +49,9 @@ class RestStreamProcessor:
|
||||
self.submit_suffix = submit_suffix
|
||||
self.pickup_suffix = pickup_suffix
|
||||
self.queue = Queue()
|
||||
self.queue_processor = QueueProcessor(bufferize(fn, buffer_size=3, null_value=[]), queue=self.queue)
|
||||
self.queue_processor = OutputBuffer(
|
||||
bufferize(fn, buffer_size=3, null_value=[]), queue_streamer=QueueStreamer(self.queue)
|
||||
)
|
||||
|
||||
def submit(self, request):
|
||||
self.queue.append(request.json)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user