pyinfra/pyinfra/server/server.py
2022-06-20 08:19:51 +02:00

91 lines
2.9 KiB
Python

from functools import singledispatch
from typing import Dict
from flask import Flask, jsonify, request
from funcy import merge
from prometheus_client import generate_latest, Summary, CollectorRegistry
from pyinfra.server.buffering.stream import FlatStreamBuffer
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
from pyinfra.server.stream.rest import LazyRestProcessor
def set_up_processing_server(server_stream_function, buffer_size):
flat_stream_buffer = FlatStreamBuffer(server_stream_function, buffer_size=buffer_size)
queued_stream_function = QueuedStreamFunction(flat_stream_buffer)
return __set_up_processing_server(queued_stream_function)
def build_endpoint_suffixes(op: str):
return {"submit_suffix": submit_suffix(op), "pickup_suffix": pickup_suffix(op)}
def submit_suffix(op: str):
return "submit" if not op else op
def pickup_suffix(op: str):
return "pickup" if not op else f"{op}_pickup"
@singledispatch
def __set_up_processing_server(arg):
pass
@__set_up_processing_server.register
def _(operation2function: dict):
return __set_up_processing_server_impl(operation2function)
@__set_up_processing_server.register
def _(queued_stream_function: object):
operation2function = {None: queued_stream_function}
return __set_up_processing_server_impl(operation2function)
def __set_up_processing_server_impl(operation2function: Dict[str, QueuedStreamFunction]):
app = Flask(__name__)
registry = CollectorRegistry(auto_describe=True)
operation2processor = {
op: LazyRestProcessor(fn, **build_endpoint_suffixes(op)) for op, fn in operation2function.items()
}
def make_summary_instance(op):
op = op.replace("_pickup", "")
return Summary(f"redactmanager_{op}_seconds", f"Time spent on {op}.", registry=registry)
submit_operation2processor = {submit_suffix(op): prc for op, prc in operation2processor.items()}
pickup_operation2processor = {pickup_suffix(op): prc for op, prc in operation2processor.items()}
operation2processor = merge(submit_operation2processor, pickup_operation2processor)
operation2metric = {op: make_summary_instance(op) for op in pickup_operation2processor}
@app.route("/ready", methods=["GET"])
def ready():
resp = jsonify("OK")
resp.status_code = 200
return resp
@app.route("/health", methods=["GET"])
def healthy():
resp = jsonify("OK")
resp.status_code = 200
return resp
@app.route("/prometheus", methods=["GET"])
def prometheus():
return generate_latest(registry=registry)
@app.route("/<operation>", methods=["POST", "PATCH"])
def submit(operation):
return operation2processor[operation].push(request)
@app.route("/<operation>", methods=["GET"])
def pickup(operation):
with operation2metric[operation.replace("_pickup", "")].time():
return operation2processor[operation].pop()
return app