From 82d7b7f8cbe3e3c46b9675c052e3d2e09663dbdf Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Thu, 5 May 2022 11:56:06 +0200 Subject: [PATCH] refactoring: simplify pickup endpoint extraction --- .../interpreter/interpreters/rest_callback.py | 13 ++----------- pyinfra/server/rest.py | 2 +- pyinfra/server/server.py | 2 +- 3 files changed, 4 insertions(+), 13 deletions(-) diff --git a/pyinfra/server/interpreter/interpreters/rest_callback.py b/pyinfra/server/interpreter/interpreters/rest_callback.py index 2e6b85f..d4333ab 100644 --- a/pyinfra/server/interpreter/interpreters/rest_callback.py +++ b/pyinfra/server/interpreter/interpreters/rest_callback.py @@ -1,11 +1,9 @@ -from operator import itemgetter from typing import Iterable import requests -from funcy import takewhile, repeatedly, rcompose +from funcy import takewhile, repeatedly from pyinfra.server.interpreter.interpreter import Interpreter -from pyinfra.utils.func import lift def stream_response_payloads(endpoint): @@ -25,12 +23,5 @@ def stream_response_payloads(endpoint): class RestCallback(Interpreter): - def __init__(self, url): - self.pipeline = rcompose( - lift(itemgetter("pickup_endpoint")), - lift(lambda ep: f"{url}/{ep}"), - lift(stream_response_payloads), - ) - def __call__(self, payloads: Iterable): - yield from self.pipeline(payloads) + yield from map(stream_response_payloads, payloads) diff --git a/pyinfra/server/rest.py b/pyinfra/server/rest.py index 6fe17dc..6fb899b 100644 --- a/pyinfra/server/rest.py +++ b/pyinfra/server/rest.py @@ -30,7 +30,7 @@ def process_lazily(url, data: Iterable[bytes], metadata: Iterable[dict]): - responses must have status code 206 for more responses coming and 204 for the last response already sent """ - pipe = pipeline(f"{url}/submit", RestCallback(url)) + pipe = pipeline(f"{url}/submit", RestCallback()) yield from pipe(data, metadata) diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index eda6545..8eb27cc 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -26,7 +26,7 @@ def set_up_processing_server(process_fn): def submit(): nonlocal response_payload_iter response_payload_iter = chain(response_payload_iter, process_fn(request, final=request.method == "POST")) - return jsonify({"pickup_endpoint": "pickup"}) + return jsonify(f"{request.base_url.replace('/submit', '')}/pickup") @app.route("/pickup", methods=["GET"]) def pickup():