refactoring: Sender, Receiver

This commit is contained in:
Matthias Bisping 2022-05-05 10:17:35 +02:00
parent 24313241a8
commit e221b00933
6 changed files with 94 additions and 73 deletions

View File

@ -0,0 +1,12 @@
from typing import Iterable
import requests
from pyinfra.server.receiver.receiver import Receiver
class RestReceiver(Receiver):
def __call__(self, responses: Iterable[requests.Response]):
for response in responses:
response.raise_for_status()
yield response.json()

View File

@ -5,7 +5,8 @@ from typing import Iterable
from funcy import identity, rcompose, flatten
from pyinfra.server.packer.packers.rest import RestPacker
from pyinfra.server.sender.sender import RestServer, Sender
from pyinfra.server.receiver.receivers.rest import RestReceiver
from pyinfra.server.sender.sender import RestSender
from pyinfra.server.utils import stream_response_payloads
from pyinfra.utils.func import lift
@ -28,21 +29,21 @@ def process_lazily(url, data: Iterable[bytes], metadata: Iterable[dict]):
- responses must have status code 206 for more responses coming and 204 for the last response already sent
"""
receiver = rcompose(
interpreter = rcompose(
lift(itemgetter("pickup_endpoint")),
lift(lambda ep: f"{url}/{ep}"),
lift(stream_response_payloads),
)
pipe = pipeline(f"{url}/submit", receiver)
pipe = pipeline(f"{url}/submit", interpreter)
yield from pipe(data, metadata)
def pipeline(url, receiver):
def pipeline(url, interpreter):
return rcompose(
head(url),
receiver,
interpreter,
flatten, # each analysis call returns an iterable. Can be empty, singleton or multi item. Hence, flatten.
)
@ -54,7 +55,8 @@ def head(endpoint):
"""Sends packages of data and metadata to endpoint and returns response."""
return rcompose(
RestPacker(),
Sender(RestServer(endpoint)),
RestSender(endpoint),
RestReceiver()
)(data, metadata)
return send

View File

@ -24,7 +24,26 @@ class Server(abc.ABC):
pass
class RestServer(Server):
class Sender:
def __call__(self, packages: Iterable[dict]):
packages = peekable(packages)
for package in packages:
method = self.patch if has_next(packages) else self.post
response = method(package)
yield response
@abc.abstractmethod
def patch(self, package):
raise NotImplementedError
@abc.abstractmethod
def post(self, package):
raise NotImplementedError
class RestSender(Sender):
def __init__(self, endpoint):
self.endpoint = endpoint
@ -35,29 +54,15 @@ class RestServer(Server):
return requests.post(self.endpoint, json=package)
class Sender:
def __init__(self, server: Server):
self.server = server
def __call__(self, packages: Iterable[dict]):
packages = peekable(packages)
for package in packages:
method = self.server.patch if has_next(packages) else self.server.post
response = method(package)
yield response
class Receiver(abc.ABC):
@abc.abstractmethod
def __call__(self, responses: Iterable):
pass
class RestReceiver(abc.ABC):
@abc.abstractmethod
def __call__(self, responses: Iterable[requests.Response]):
for response in responses:
response.raise_for_status()
yield response.json()
# class Receiver(abc.ABC):
# @abc.abstractmethod
# def __call__(self, responses: Iterable):
# pass
#
#
# class RestReceiver(abc.ABC):
# @abc.abstractmethod
# def __call__(self, responses: Iterable[requests.Response]):
# for response in responses:
# response.raise_for_status()
# yield response.json()

View File

@ -1,29 +1,29 @@
from typing import Iterable
import requests
from more_itertools import peekable
from pyinfra.server.sender.sender import Sender
class Nothing:
pass
def has_next(peekable_iter):
return peekable_iter.peek(Nothing) != Nothing
class RestSender(Sender):
def __init__(self, endpoint):
self.endpoint = endpoint
def __call__(self, packages: Iterable[dict]):
packages = peekable(packages)
for package in packages:
method = requests.patch if has_next(packages) else requests.post
response = method(self.endpoint, json=package)
response.raise_for_status()
yield response.json()
# from typing import Iterable
#
# import requests
# from more_itertools import peekable
#
# from pyinfra.server.sender.sender import Sender
#
#
# class Nothing:
# pass
#
#
# def has_next(peekable_iter):
# return peekable_iter.peek(Nothing) != Nothing
#
#
# class RestSender(Sender):
# def __init__(self, endpoint):
# self.endpoint = endpoint
#
# def __call__(self, packages: Iterable[dict]):
#
# packages = peekable(packages)
#
# for package in packages:
# method = requests.patch if has_next(packages) else requests.post
# response = method(self.endpoint, json=package)
# response.raise_for_status()
# yield response.json()

View File

@ -1,10 +1,12 @@
# import pytest
#
# from pyinfra.server.sender.sender import RestServer, Sender
#
#
# @pytest.mark.parametrize("batched", [True, False])
# @pytest.mark.parametrize("item_type", ["string", "image", "pdf"])
# def test_rest_receiver(url, packages, server_process):
# sender = Sender(RestServer(f"{url}/process"))
# assert all([r.status_code == 200 for r in receiver(sender(packages))])
import pytest
from pyinfra.server.receiver.receivers.rest import RestReceiver
from pyinfra.server.sender.sender import RestServer, Sender
@pytest.mark.parametrize("batched", [True, False])
@pytest.mark.parametrize("item_type", ["string", "image", "pdf"])
def test_rest_receiver(url, packages, server_process):
sender = Sender(RestServer(f"{url}/process"))
receiver = RestReceiver()
assert all((isinstance(r, list) for r in receiver(sender(packages))))

View File

@ -7,4 +7,4 @@ from pyinfra.server.sender.sender import RestServer, Sender
@pytest.mark.parametrize("item_type", ["string", "image", "pdf"])
def test_rest_sender(url, packages, server_process):
sender = Sender(RestServer(f"{url}/process"))
assert all([r.status_code == 200 for r in sender(packages)])
assert all((r.status_code == 200 for r in sender(packages)))