added rest callback interpreter

This commit is contained in:
Matthias Bisping 2022-05-05 11:45:52 +02:00
parent 7a1db32c3b
commit d4ffd75e26
5 changed files with 46 additions and 7 deletions

View File

View File

@ -0,0 +1,8 @@
import abc
from typing import Iterable
class Interpreter(abc.ABC):
@abc.abstractmethod
def __call__(self, payloads: Iterable):
pass

View File

@ -0,0 +1,36 @@
from operator import itemgetter
from typing import Iterable
import requests
from funcy import takewhile, repeatedly, rcompose
from pyinfra.server.interpreter.interpreter import Interpreter
from pyinfra.utils.func import lift
def stream_response_payloads(endpoint):
def receive():
response = requests.get(endpoint)
return response
def more_is_coming(response):
return response.status_code == 206
def load_payload(response):
return response.json()
response_stream = takewhile(more_is_coming, repeatedly(receive))
payloads = map(load_payload, response_stream)
yield from payloads
class RestCallback(Interpreter):
def __init__(self, url):
self.pipeline = rcompose(
lift(itemgetter("pickup_endpoint")),
lift(lambda ep: f"{url}/{ep}"),
lift(stream_response_payloads),
)
def __call__(self, payloads: Iterable):
yield from self.pipeline(payloads)

View File

@ -4,6 +4,7 @@ from typing import Iterable
from funcy import identity, rcompose, flatten
from pyinfra.server.interpreter.interpreters.rest_callback import RestCallback
from pyinfra.server.packer.packers.rest import RestPacker
from pyinfra.server.receiver.receivers.rest import RestReceiver
from pyinfra.server.dispatcher.senders.rest import RestDispatcher
@ -29,13 +30,7 @@ def process_lazily(url, data: Iterable[bytes], metadata: Iterable[dict]):
- responses must have status code 206 for more responses coming and 204 for the last response already sent
"""
interpreter = rcompose(
lift(itemgetter("pickup_endpoint")),
lift(lambda ep: f"{url}/{ep}"),
lift(stream_response_payloads),
)
pipe = pipeline(f"{url}/submit", interpreter)
pipe = pipeline(f"{url}/submit", RestCallback(url))
yield from pipe(data, metadata)