108 lines
3.9 KiB
Python
108 lines
3.9 KiB
Python
from functools import singledispatch
|
|
from typing import Dict, Callable, Union
|
|
|
|
from flask import Flask, jsonify, request
|
|
from funcy import merge
|
|
from prometheus_client import generate_latest, Summary, CollectorRegistry
|
|
|
|
from pyinfra.server.buffering.stream import FlatStreamBuffer
|
|
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
|
|
from pyinfra.server.stream.rest import LazyRestProcessor
|
|
|
|
|
|
@singledispatch
|
|
def set_up_processing_server(arg: Union[dict, Callable], buffer_size):
|
|
"""Produces a processing server given a streamable function or a mapping from operations to streamable functions.
|
|
Streamable functions are constructed by calling pyinfra.server.utils.make_streamable_and_wrap_in_packing_logic on a
|
|
function taking a tuple of data and metadata and also returning a tuple or yielding tuples of data and metadata.
|
|
If the function doesn't produce data, data should be an empty byte string.
|
|
If the function doesn't produce metadata, metadata should be an empty dictionary.
|
|
|
|
Args:
|
|
arg: streamable function or mapping of operations: str to streamable functions
|
|
buffer_size: If your function operates on batches this parameter controls how many items are aggregated before
|
|
your function is applied.
|
|
|
|
TODO: buffer_size has to be controllable on per function basis.
|
|
|
|
Returns:
|
|
Processing server: flask app
|
|
"""
|
|
pass
|
|
|
|
|
|
@set_up_processing_server.register
|
|
def _(operation2stream_fn: dict, buffer_size=1):
|
|
return __stream_fn_to_processing_server(operation2stream_fn, buffer_size)
|
|
|
|
|
|
@set_up_processing_server.register
|
|
def _(stream_fn: object, buffer_size=1):
|
|
operation2stream_fn = {None: stream_fn}
|
|
return __stream_fn_to_processing_server(operation2stream_fn, buffer_size)
|
|
|
|
|
|
def __stream_fn_to_processing_server(operation2stream_fn: dict, buffer_size):
|
|
operation2stream_fn = {
|
|
op: QueuedStreamFunction(FlatStreamBuffer(fn, buffer_size)) for op, fn in operation2stream_fn.items()
|
|
}
|
|
return __set_up_processing_server(operation2stream_fn)
|
|
|
|
|
|
def __set_up_processing_server(operation2function: Dict[str, QueuedStreamFunction]):
|
|
app = Flask(__name__)
|
|
registry = CollectorRegistry(auto_describe=True)
|
|
|
|
operation2processor = {
|
|
op: LazyRestProcessor(fn, **build_endpoint_suffixes(op)) for op, fn in operation2function.items()
|
|
}
|
|
|
|
def make_summary_instance(op: str):
|
|
op = op.replace("_pickup", "")
|
|
return Summary(f"redactmanager_{op}_seconds", f"Time spent on {op}.", registry=registry)
|
|
|
|
submit_operation2processor = {submit_suffix(op): prc for op, prc in operation2processor.items()}
|
|
pickup_operation2processor = {pickup_suffix(op): prc for op, prc in operation2processor.items()}
|
|
operation2processor = merge(submit_operation2processor, pickup_operation2processor)
|
|
|
|
operation2metric = {op: make_summary_instance(op) for op in pickup_operation2processor}
|
|
|
|
@app.route("/ready", methods=["GET"])
|
|
def ready():
|
|
resp = jsonify("OK")
|
|
resp.status_code = 200
|
|
return resp
|
|
|
|
@app.route("/health", methods=["GET"])
|
|
def healthy():
|
|
resp = jsonify("OK")
|
|
resp.status_code = 200
|
|
return resp
|
|
|
|
@app.route("/prometheus", methods=["GET"])
|
|
def prometheus():
|
|
return generate_latest(registry=registry)
|
|
|
|
@app.route("/<operation>", methods=["POST", "PATCH"])
|
|
def submit(operation):
|
|
return operation2processor[operation].push(request)
|
|
|
|
@app.route("/<operation>", methods=["GET"])
|
|
def pickup(operation):
|
|
with operation2metric[operation].time():
|
|
return operation2processor[operation].pop()
|
|
|
|
return app
|
|
|
|
|
|
def build_endpoint_suffixes(op: str):
|
|
return {"submit_suffix": submit_suffix(op), "pickup_suffix": pickup_suffix(op)}
|
|
|
|
|
|
def submit_suffix(op: str):
|
|
return "submit" if not op else op
|
|
|
|
|
|
def pickup_suffix(op: str):
|
|
return "pickup" if not op else f"{op}_pickup"
|