pyinfra/test/server.py
Matthias Bisping 67c4bac4b7 sync
2022-04-27 17:45:48 +02:00

38 lines
950 B
Python

from flask import Flask, jsonify, request
from more_itertools import peekable
def set_up_processing_server(process_fn):
app = Flask(__name__)
response_payload_iter = None
@app.route("/ready", methods=["GET"])
def ready():
resp = jsonify("OK")
resp.status_code = 200
return resp
@app.route("/process", methods=["POST"])
def process():
response_payload = process_fn(request)
return jsonify(response_payload)
@app.route("/submit", methods=["POST"])
def submit():
nonlocal response_payload_iter
response_payload_iter = peekable(iter(process_fn(request)))
return jsonify({"pickup_endpoint": "pickup"})
@app.route("/pickup", methods=["GET"])
def pickup():
response_payload = next(response_payload_iter)
print(response_payload)
resp = jsonify({"a": 1})
resp.status_code = 200
return resp
return app