From e221b009330e53f5b25368d0186a894e125abf87 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Thu, 5 May 2022 10:17:35 +0200 Subject: [PATCH] refactoring: Sender, Receiver --- pyinfra/server/receiver/receivers/rest.py | 12 +++++ pyinfra/server/rest.py | 14 +++--- pyinfra/server/sender/sender.py | 59 ++++++++++++----------- pyinfra/server/sender/senders/rest.py | 58 +++++++++++----------- test/unit_tests/rest/receiver_test.py | 22 +++++---- test/unit_tests/rest/sender_test.py | 2 +- 6 files changed, 94 insertions(+), 73 deletions(-) create mode 100644 pyinfra/server/receiver/receivers/rest.py diff --git a/pyinfra/server/receiver/receivers/rest.py b/pyinfra/server/receiver/receivers/rest.py new file mode 100644 index 0000000..d2ce17a --- /dev/null +++ b/pyinfra/server/receiver/receivers/rest.py @@ -0,0 +1,12 @@ +from typing import Iterable + +import requests + +from pyinfra.server.receiver.receiver import Receiver + + +class RestReceiver(Receiver): + def __call__(self, responses: Iterable[requests.Response]): + for response in responses: + response.raise_for_status() + yield response.json() diff --git a/pyinfra/server/rest.py b/pyinfra/server/rest.py index be45e17..367b07d 100644 --- a/pyinfra/server/rest.py +++ b/pyinfra/server/rest.py @@ -5,7 +5,8 @@ from typing import Iterable from funcy import identity, rcompose, flatten from pyinfra.server.packer.packers.rest import RestPacker -from pyinfra.server.sender.sender import RestServer, Sender +from pyinfra.server.receiver.receivers.rest import RestReceiver +from pyinfra.server.sender.sender import RestSender from pyinfra.server.utils import stream_response_payloads from pyinfra.utils.func import lift @@ -28,21 +29,21 @@ def process_lazily(url, data: Iterable[bytes], metadata: Iterable[dict]): - responses must have status code 206 for more responses coming and 204 for the last response already sent """ - receiver = rcompose( + interpreter = rcompose( lift(itemgetter("pickup_endpoint")), lift(lambda ep: f"{url}/{ep}"), lift(stream_response_payloads), ) - pipe = pipeline(f"{url}/submit", receiver) + pipe = pipeline(f"{url}/submit", interpreter) yield from pipe(data, metadata) -def pipeline(url, receiver): +def pipeline(url, interpreter): return rcompose( head(url), - receiver, + interpreter, flatten, # each analysis call returns an iterable. Can be empty, singleton or multi item. Hence, flatten. ) @@ -54,7 +55,8 @@ def head(endpoint): """Sends packages of data and metadata to endpoint and returns response.""" return rcompose( RestPacker(), - Sender(RestServer(endpoint)), + RestSender(endpoint), + RestReceiver() )(data, metadata) return send diff --git a/pyinfra/server/sender/sender.py b/pyinfra/server/sender/sender.py index 8fba62d..09ca62a 100644 --- a/pyinfra/server/sender/sender.py +++ b/pyinfra/server/sender/sender.py @@ -24,7 +24,26 @@ class Server(abc.ABC): pass -class RestServer(Server): +class Sender: + def __call__(self, packages: Iterable[dict]): + + packages = peekable(packages) + + for package in packages: + method = self.patch if has_next(packages) else self.post + response = method(package) + yield response + + @abc.abstractmethod + def patch(self, package): + raise NotImplementedError + + @abc.abstractmethod + def post(self, package): + raise NotImplementedError + + +class RestSender(Sender): def __init__(self, endpoint): self.endpoint = endpoint @@ -35,29 +54,15 @@ class RestServer(Server): return requests.post(self.endpoint, json=package) -class Sender: - def __init__(self, server: Server): - self.server = server - - def __call__(self, packages: Iterable[dict]): - - packages = peekable(packages) - - for package in packages: - method = self.server.patch if has_next(packages) else self.server.post - response = method(package) - yield response - - -class Receiver(abc.ABC): - @abc.abstractmethod - def __call__(self, responses: Iterable): - pass - - -class RestReceiver(abc.ABC): - @abc.abstractmethod - def __call__(self, responses: Iterable[requests.Response]): - for response in responses: - response.raise_for_status() - yield response.json() +# class Receiver(abc.ABC): +# @abc.abstractmethod +# def __call__(self, responses: Iterable): +# pass +# +# +# class RestReceiver(abc.ABC): +# @abc.abstractmethod +# def __call__(self, responses: Iterable[requests.Response]): +# for response in responses: +# response.raise_for_status() +# yield response.json() diff --git a/pyinfra/server/sender/senders/rest.py b/pyinfra/server/sender/senders/rest.py index ed14e32..b31aa77 100644 --- a/pyinfra/server/sender/senders/rest.py +++ b/pyinfra/server/sender/senders/rest.py @@ -1,29 +1,29 @@ -from typing import Iterable - -import requests -from more_itertools import peekable - -from pyinfra.server.sender.sender import Sender - - -class Nothing: - pass - - -def has_next(peekable_iter): - return peekable_iter.peek(Nothing) != Nothing - - -class RestSender(Sender): - def __init__(self, endpoint): - self.endpoint = endpoint - - def __call__(self, packages: Iterable[dict]): - - packages = peekable(packages) - - for package in packages: - method = requests.patch if has_next(packages) else requests.post - response = method(self.endpoint, json=package) - response.raise_for_status() - yield response.json() +# from typing import Iterable +# +# import requests +# from more_itertools import peekable +# +# from pyinfra.server.sender.sender import Sender +# +# +# class Nothing: +# pass +# +# +# def has_next(peekable_iter): +# return peekable_iter.peek(Nothing) != Nothing +# +# +# class RestSender(Sender): +# def __init__(self, endpoint): +# self.endpoint = endpoint +# +# def __call__(self, packages: Iterable[dict]): +# +# packages = peekable(packages) +# +# for package in packages: +# method = requests.patch if has_next(packages) else requests.post +# response = method(self.endpoint, json=package) +# response.raise_for_status() +# yield response.json() diff --git a/test/unit_tests/rest/receiver_test.py b/test/unit_tests/rest/receiver_test.py index 12b8c72..45264ea 100644 --- a/test/unit_tests/rest/receiver_test.py +++ b/test/unit_tests/rest/receiver_test.py @@ -1,10 +1,12 @@ -# import pytest -# -# from pyinfra.server.sender.sender import RestServer, Sender -# -# -# @pytest.mark.parametrize("batched", [True, False]) -# @pytest.mark.parametrize("item_type", ["string", "image", "pdf"]) -# def test_rest_receiver(url, packages, server_process): -# sender = Sender(RestServer(f"{url}/process")) -# assert all([r.status_code == 200 for r in receiver(sender(packages))]) +import pytest + +from pyinfra.server.receiver.receivers.rest import RestReceiver +from pyinfra.server.sender.sender import RestServer, Sender + + +@pytest.mark.parametrize("batched", [True, False]) +@pytest.mark.parametrize("item_type", ["string", "image", "pdf"]) +def test_rest_receiver(url, packages, server_process): + sender = Sender(RestServer(f"{url}/process")) + receiver = RestReceiver() + assert all((isinstance(r, list) for r in receiver(sender(packages)))) diff --git a/test/unit_tests/rest/sender_test.py b/test/unit_tests/rest/sender_test.py index dc3a56c..89f616f 100644 --- a/test/unit_tests/rest/sender_test.py +++ b/test/unit_tests/rest/sender_test.py @@ -7,4 +7,4 @@ from pyinfra.server.sender.sender import RestServer, Sender @pytest.mark.parametrize("item_type", ["string", "image", "pdf"]) def test_rest_sender(url, packages, server_process): sender = Sender(RestServer(f"{url}/process")) - assert all([r.status_code == 200 for r in sender(packages)]) + assert all((r.status_code == 200 for r in sender(packages)))