refactoring: simplify pickup endpoint extraction

This commit is contained in:
Matthias Bisping 2022-05-05 11:56:06 +02:00
parent d4ffd75e26
commit 82d7b7f8cb
3 changed files with 4 additions and 13 deletions

View File

@ -1,11 +1,9 @@
from operator import itemgetter
from typing import Iterable
import requests
from funcy import takewhile, repeatedly, rcompose
from funcy import takewhile, repeatedly
from pyinfra.server.interpreter.interpreter import Interpreter
from pyinfra.utils.func import lift
def stream_response_payloads(endpoint):
@ -25,12 +23,5 @@ def stream_response_payloads(endpoint):
class RestCallback(Interpreter):
def __init__(self, url):
self.pipeline = rcompose(
lift(itemgetter("pickup_endpoint")),
lift(lambda ep: f"{url}/{ep}"),
lift(stream_response_payloads),
)
def __call__(self, payloads: Iterable):
yield from self.pipeline(payloads)
yield from map(stream_response_payloads, payloads)

View File

@ -30,7 +30,7 @@ def process_lazily(url, data: Iterable[bytes], metadata: Iterable[dict]):
- responses must have status code 206 for more responses coming and 204 for the last response already sent
"""
pipe = pipeline(f"{url}/submit", RestCallback(url))
pipe = pipeline(f"{url}/submit", RestCallback())
yield from pipe(data, metadata)

View File

@ -26,7 +26,7 @@ def set_up_processing_server(process_fn):
def submit():
nonlocal response_payload_iter
response_payload_iter = chain(response_payload_iter, process_fn(request, final=request.method == "POST"))
return jsonify({"pickup_endpoint": "pickup"})
return jsonify(f"{request.base_url.replace('/submit', '')}/pickup")
@app.route("/pickup", methods=["GET"])
def pickup():