From d4ffd75e26eaba912cd0d48604e2fad561e5ee39 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Thu, 5 May 2022 11:45:52 +0200 Subject: [PATCH] added rest callback interpreter --- pyinfra/server/interpreter/__init__.py | 0 pyinfra/server/interpreter/interpreter.py | 8 +++++ .../interpreter/interpreters/__init__.py | 0 .../interpreter/interpreters/rest_callback.py | 36 +++++++++++++++++++ pyinfra/server/rest.py | 9 ++--- 5 files changed, 46 insertions(+), 7 deletions(-) create mode 100644 pyinfra/server/interpreter/__init__.py create mode 100644 pyinfra/server/interpreter/interpreter.py create mode 100644 pyinfra/server/interpreter/interpreters/__init__.py create mode 100644 pyinfra/server/interpreter/interpreters/rest_callback.py diff --git a/pyinfra/server/interpreter/__init__.py b/pyinfra/server/interpreter/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/server/interpreter/interpreter.py b/pyinfra/server/interpreter/interpreter.py new file mode 100644 index 0000000..a131cc1 --- /dev/null +++ b/pyinfra/server/interpreter/interpreter.py @@ -0,0 +1,8 @@ +import abc +from typing import Iterable + + +class Interpreter(abc.ABC): + @abc.abstractmethod + def __call__(self, payloads: Iterable): + pass diff --git a/pyinfra/server/interpreter/interpreters/__init__.py b/pyinfra/server/interpreter/interpreters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/server/interpreter/interpreters/rest_callback.py b/pyinfra/server/interpreter/interpreters/rest_callback.py new file mode 100644 index 0000000..2e6b85f --- /dev/null +++ b/pyinfra/server/interpreter/interpreters/rest_callback.py @@ -0,0 +1,36 @@ +from operator import itemgetter +from typing import Iterable + +import requests +from funcy import takewhile, repeatedly, rcompose + +from pyinfra.server.interpreter.interpreter import Interpreter +from pyinfra.utils.func import lift + + +def stream_response_payloads(endpoint): + def receive(): + response = requests.get(endpoint) + return response + + def more_is_coming(response): + return response.status_code == 206 + + def load_payload(response): + return response.json() + + response_stream = takewhile(more_is_coming, repeatedly(receive)) + payloads = map(load_payload, response_stream) + yield from payloads + + +class RestCallback(Interpreter): + def __init__(self, url): + self.pipeline = rcompose( + lift(itemgetter("pickup_endpoint")), + lift(lambda ep: f"{url}/{ep}"), + lift(stream_response_payloads), + ) + + def __call__(self, payloads: Iterable): + yield from self.pipeline(payloads) diff --git a/pyinfra/server/rest.py b/pyinfra/server/rest.py index 0aa5e53..6fe17dc 100644 --- a/pyinfra/server/rest.py +++ b/pyinfra/server/rest.py @@ -4,6 +4,7 @@ from typing import Iterable from funcy import identity, rcompose, flatten +from pyinfra.server.interpreter.interpreters.rest_callback import RestCallback from pyinfra.server.packer.packers.rest import RestPacker from pyinfra.server.receiver.receivers.rest import RestReceiver from pyinfra.server.dispatcher.senders.rest import RestDispatcher @@ -29,13 +30,7 @@ def process_lazily(url, data: Iterable[bytes], metadata: Iterable[dict]): - responses must have status code 206 for more responses coming and 204 for the last response already sent """ - interpreter = rcompose( - lift(itemgetter("pickup_endpoint")), - lift(lambda ep: f"{url}/{ep}"), - lift(stream_response_payloads), - ) - - pipe = pipeline(f"{url}/submit", interpreter) + pipe = pipeline(f"{url}/submit", RestCallback(url)) yield from pipe(data, metadata)