refactoring: introduced queue wrapper

This commit is contained in:
Matthias Bisping 2022-05-11 10:44:50 +02:00
parent b1a318872e
commit ccf7a7379d
2 changed files with 29 additions and 23 deletions

View File

@ -5,30 +5,27 @@ from typing import Union, Any
from funcy import flatten, compose, repeatedly
from pyinfra.server.bufferizer.buffer import bufferize
from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing
from pyinfra.utils.func import lift
logger = logging.getLogger(__name__)
class LazyBufferizer:
def __init__(self, fn, buffer_size=3):
"""Function `fn` has to return an iterable, ideally is a generator and needs to be mappable."""
self.input_queue = deque()
self.queue_processor = QueueProcessor(bufferize(fn, buffer_size=buffer_size, null_value=[]), self.input_queue)
class Queue:
def __init__(self):
self.__queue = deque()
def submit(self, package, **kwargs) -> None:
self.input_queue.append(package)
def append(self, package) -> None:
self.__queue.append(package)
def compute_next(self):
return self.queue_processor.compute_next()
def popleft(self):
return self.__queue.popleft() if self.__queue else Nothing
class QueueProcessor:
def __init__(self, fn, queue):
def __init__(self, fn, queue: Queue):
self.fn = fn
self.input_queue = queue
self.queue = queue
self.result_stream = chain([])
def __call__(self):
@ -43,11 +40,8 @@ class QueueProcessor:
except TypeError as err:
raise TypeError("Function failed with type error. Is it mappable?") from err
def consume_queue(self):
return self.input_queue.popleft() if self.input_queue else Nothing
def stream_queue(self):
yield from takewhile(is_not_nothing, repeatedly(self.consume_queue))
yield from takewhile(is_not_nothing, repeatedly(self.queue.popleft))
yield Nothing
def compute(self):

View File

@ -3,7 +3,8 @@ import logging
from flask import Flask, jsonify, request
from funcy import rcompose
from pyinfra.server.bufferizer.lazy_bufferizer import LazyBufferizer
from pyinfra.server.bufferizer.buffer import bufferize
from pyinfra.server.bufferizer.lazy_bufferizer import QueueProcessor, Queue
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.utils import unpack, normalize, pack
from pyinfra.utils.func import starlift, lift
@ -11,6 +12,16 @@ from pyinfra.utils.func import starlift, lift
logger = logging.getLogger()
class ServerPipeline:
def __init__(self):
"""
- dequeue
-
"""
pass
class StreamProcessor:
def __init__(self, fn):
"""Function `fn` has to return an iterable and ideally is a generator."""
@ -34,17 +45,18 @@ def make_streamable(operation, batched, batch_size):
class RestStreamProcessor:
"""Wraps an on-demand-processor. Combine with a webserver that provides the endpoints 'submit' and 'pickup'."""
def __init__(self, bufferizer, submit_suffix="submit", pickup_suffix="pickup"):
self.bufferizer = bufferizer
def __init__(self, fn, submit_suffix="submit", pickup_suffix="pickup"):
self.submit_suffix = submit_suffix
self.pickup_suffix = pickup_suffix
self.queue = Queue()
self.queue_processor = QueueProcessor(bufferize(fn, buffer_size=3, null_value=[]), queue=self.queue)
def submit(self, request, **kwargs):
self.bufferizer.submit(request.json)
def submit(self, request):
self.queue.append(request.json)
return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix))
def pickup(self):
result = self.bufferizer.compute_next()
result = self.queue_processor()
if not valid(result):
logger.error(f"Received invalid result: {result}")
@ -67,7 +79,7 @@ def valid(result):
def set_up_processing_server(process_fn):
app = Flask(__name__)
stream = RestStreamProcessor(LazyBufferizer(process_fn), submit_suffix="submit", pickup_suffix="pickup")
stream = RestStreamProcessor(process_fn, submit_suffix="submit", pickup_suffix="pickup")
@app.route("/ready", methods=["GET"])
def ready():