diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index 45d7c87..c41c1b6 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -1,11 +1,10 @@ import logging -from functools import partial from flask import Flask, jsonify, request -from funcy import rcompose, compose, first, chunks, flatten +from funcy import rcompose -from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.bufferizer.lazy_bufferizer import LazyBufferizer +from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.utils import unpack, normalize, pack from pyinfra.utils.func import starlift, lift @@ -32,20 +31,20 @@ def make_streamable(operation, batched, batch_size): return operation -class RestStreamProcessor(LazyBufferizer): +class RestStreamProcessor: """Wraps an on-demand-processor. Combine with a webserver that provides the endpoints 'submit' and 'pickup'.""" - def __init__(self, fn, submit_suffix="submit", pickup_suffix="pickup"): - super(RestStreamProcessor, self).__init__(fn=fn) + def __init__(self, bufferizer, submit_suffix="submit", pickup_suffix="pickup"): + self.bufferizer = bufferizer self.submit_suffix = submit_suffix self.pickup_suffix = pickup_suffix def submit(self, request, **kwargs): - super(RestStreamProcessor, self).submit(request.json) + self.bufferizer.submit(request.json) return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix)) def pickup(self): - result = self.compute_next() + result = self.bufferizer.compute_next() if not valid(result): logger.error(f"Received invalid result: {result}") @@ -68,7 +67,7 @@ def valid(result): def set_up_processing_server(process_fn): app = Flask(__name__) - stream = RestStreamProcessor(process_fn, submit_suffix="submit", pickup_suffix="pickup") + stream = RestStreamProcessor(LazyBufferizer(process_fn), submit_suffix="submit", pickup_suffix="pickup") @app.route("/ready", methods=["GET"]) def ready():