diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index ec001c3..b9b0657 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -1,4 +1,8 @@ +from functools import singledispatch +from typing import Dict, Callable + from flask import Flask, jsonify, request +from funcy import merge from pyinfra.server.buffering.stream import FlatStreamBuffer from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction @@ -8,12 +12,75 @@ from pyinfra.server.stream.rest import LazyRestProcessor def set_up_processing_server(server_stream_function, buffer_size): flat_stream_buffer = FlatStreamBuffer(server_stream_function, buffer_size=buffer_size) queued_stream_function = QueuedStreamFunction(flat_stream_buffer) - return __set_up_processing_server(queued_stream_function) + operation2function = {None: queued_stream_function} + return __set_up_processing_server(operation2function) -def __set_up_processing_server(queued_stream_function: QueuedStreamFunction): +# def __set_up_processing_server(queued_stream_function: QueuedStreamFunction): +# app = Flask(__name__) +# processor = LazyRestProcessor(queued_stream_function, submit_suffix="submit", pickup_suffix="pickup") +# +# @app.route("/ready", methods=["GET"]) +# def ready(): +# resp = jsonify("OK") +# resp.status_code = 200 +# return resp +# +# @app.route("/health", methods=["GET"]) +# def healthy(): +# resp = jsonify("OK") +# resp.status_code = 200 +# return resp +# +# @app.route("/submit", methods=["POST", "PATCH"]) +# def submit(): +# return processor.push(request) +# +# @app.route("/pickup", methods=["GET"]) +# def pickup(): +# return processor.pop() +# +# return app + + +def build_endpoint_suffixes(op: str): + return {"submit_suffix": submit_suffix(op), "pickup_suffix": pickup_suffix(op)} + + +def submit_suffix(op: str): + return "submit" if not op else f"{op}_submit" + + +def pickup_suffix(op: str): + return "pickup" if not op else f"{op}_pickup" + + +@singledispatch +def __set_up_processing_server(arg): + pass + + +@__set_up_processing_server.register +def _(operation2function: dict): + return __set_up_processing_server_impl(operation2function) + + +@__set_up_processing_server.register +def _(queued_stream_function: object): + operation2function = {None: queued_stream_function} + return __set_up_processing_server_impl(operation2function) + + +def __set_up_processing_server_impl(operation2function: Dict[str, QueuedStreamFunction]): app = Flask(__name__) - processor = LazyRestProcessor(queued_stream_function, submit_suffix="submit", pickup_suffix="pickup") + + operation2processor = { + op: LazyRestProcessor(fn, **build_endpoint_suffixes(op)) for op, fn in operation2function.items() + } + + submit_operation2processor = {submit_suffix(op): prc for op, prc in operation2processor.items()} + pickup_operation2processor = {pickup_suffix(op): prc for op, prc in operation2processor.items()} + operation2processor = merge(submit_operation2processor, pickup_operation2processor) @app.route("/ready", methods=["GET"]) def ready(): @@ -27,12 +94,12 @@ def __set_up_processing_server(queued_stream_function: QueuedStreamFunction): resp.status_code = 200 return resp - @app.route("/submit", methods=["POST", "PATCH"]) - def submit(): - return processor.push(request) + @app.route("/", methods=["POST", "PATCH"]) + def submit(operation): + return operation2processor[operation].push(request) - @app.route("/pickup", methods=["GET"]) - def pickup(): - return processor.pop() + @app.route("/", methods=["GET"]) + def pickup(operation): + return operation2processor[operation].pop() return app