refactoring rest stream processor

This commit is contained in:
Matthias Bisping 2022-05-10 12:40:19 +02:00
parent de0deaa2f4
commit c68e19e6e4

View File

@ -1,11 +1,10 @@
import logging import logging
from functools import partial
from flask import Flask, jsonify, request from flask import Flask, jsonify, request
from funcy import rcompose, compose, first, chunks, flatten from funcy import rcompose
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.bufferizer.lazy_bufferizer import LazyBufferizer from pyinfra.server.bufferizer.lazy_bufferizer import LazyBufferizer
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.utils import unpack, normalize, pack from pyinfra.server.utils import unpack, normalize, pack
from pyinfra.utils.func import starlift, lift from pyinfra.utils.func import starlift, lift
@ -32,20 +31,20 @@ def make_streamable(operation, batched, batch_size):
return operation return operation
class RestStreamProcessor(LazyBufferizer): class RestStreamProcessor:
"""Wraps an on-demand-processor. Combine with a webserver that provides the endpoints 'submit' and 'pickup'.""" """Wraps an on-demand-processor. Combine with a webserver that provides the endpoints 'submit' and 'pickup'."""
def __init__(self, fn, submit_suffix="submit", pickup_suffix="pickup"): def __init__(self, bufferizer, submit_suffix="submit", pickup_suffix="pickup"):
super(RestStreamProcessor, self).__init__(fn=fn) self.bufferizer = bufferizer
self.submit_suffix = submit_suffix self.submit_suffix = submit_suffix
self.pickup_suffix = pickup_suffix self.pickup_suffix = pickup_suffix
def submit(self, request, **kwargs): def submit(self, request, **kwargs):
super(RestStreamProcessor, self).submit(request.json) self.bufferizer.submit(request.json)
return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix)) return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix))
def pickup(self): def pickup(self):
result = self.compute_next() result = self.bufferizer.compute_next()
if not valid(result): if not valid(result):
logger.error(f"Received invalid result: {result}") logger.error(f"Received invalid result: {result}")
@ -68,7 +67,7 @@ def valid(result):
def set_up_processing_server(process_fn): def set_up_processing_server(process_fn):
app = Flask(__name__) app = Flask(__name__)
stream = RestStreamProcessor(process_fn, submit_suffix="submit", pickup_suffix="pickup") stream = RestStreamProcessor(LazyBufferizer(process_fn), submit_suffix="submit", pickup_suffix="pickup")
@app.route("/ready", methods=["GET"]) @app.route("/ready", methods=["GET"])
def ready(): def ready():