diff --git a/pyinfra/exceptions.py b/pyinfra/exceptions.py index ecaa06a..00d3920 100644 --- a/pyinfra/exceptions.py +++ b/pyinfra/exceptions.py @@ -36,3 +36,6 @@ class IntentionalTestException(RuntimeError): class UnexpectedItemType(ValueError): pass + +class NoBufferCapacity(ValueError): + pass \ No newline at end of file diff --git a/pyinfra/server/buffering/buffer.py b/pyinfra/server/buffering/bufferize.py similarity index 86% rename from pyinfra/server/buffering/buffer.py rename to pyinfra/server/buffering/bufferize.py index 3fe212b..0b152e0 100644 --- a/pyinfra/server/buffering/buffer.py +++ b/pyinfra/server/buffering/bufferize.py @@ -3,6 +3,7 @@ from collections import deque from funcy import repeatedly, identity +from pyinfra.exceptions import NoBufferCapacity from pyinfra.server.dispatcher.dispatcher import Nothing logger = logging.getLogger(__name__) @@ -19,8 +20,6 @@ def bufferize(fn, buffer_size=3, persist_fn=identity, null_value=None): return response_payload or null_value def buffer_full(current_buffer_size): - # TODO: this assert does not hold for receiver test, unclear why - # assert current_buffer_size <= buffer_size if current_buffer_size > buffer_size: logger.warning(f"Overfull buffer. size: {current_buffer_size}; intended capacity: {buffer_size}") @@ -30,6 +29,9 @@ def bufferize(fn, buffer_size=3, persist_fn=identity, null_value=None): current_buffer_size = len(buffer) return (final or buffer_full(current_buffer_size)) * current_buffer_size + if not buffer_size > 0: + raise NoBufferCapacity("Buffer size must be greater than zero.") + buffer = deque() return buffered_fn diff --git a/pyinfra/server/buffering/stream.py b/pyinfra/server/buffering/stream.py index bd63c5b..6a53c61 100644 --- a/pyinfra/server/buffering/stream.py +++ b/pyinfra/server/buffering/stream.py @@ -3,7 +3,7 @@ from typing import Iterable from funcy import first, repeatedly, flatten -from pyinfra.server.buffering.buffer import bufferize +from pyinfra.server.buffering.bufferize import bufferize from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing diff --git a/pyinfra/server/exceptions.py b/pyinfra/server/exceptions.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/server/rest.py b/pyinfra/server/rest.py index d659a92..4be6374 100644 --- a/pyinfra/server/rest.py +++ b/pyinfra/server/rest.py @@ -1,39 +1,39 @@ -from typing import Iterable - -from funcy import rcompose - -from pyinfra.server.dispatcher.dispatchers.rest import RestDispatcher -from pyinfra.server.interpreter.interpreters.identity import IdentityInterpreter -from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStreamer -from pyinfra.server.packer.packers.rest import RestPacker -from pyinfra.server.client_pipeline import ClientPipeline -from pyinfra.server.receiver.receivers.rest import RestReceiver - - -def process_eagerly(endpoint, data: Iterable[bytes], metadata: Iterable[dict]): - """Posts `data` to `url` and aggregates responses for each element of `data`.""" - pipeline = get_eager_pipeline(endpoint) - yield from pipeline(data, metadata) - - -def process_lazily(endpoint, data: Iterable[bytes], metadata: Iterable[dict]): - """Posts `data` to `url` and aggregates responses for each element of `data` by querying a pickup endpoint. - - Requires: - - responses must provide return pickup_endpoint as JSON payload - - responses must have status code 206 for more responses coming and 204 for the last response already sent - """ - pipeline = get_lazy_pipeline(endpoint) - yield from pipeline(data, metadata) - - -def get_eager_pipeline(endpoint): - return ClientPipeline(*pipeline_head(endpoint), IdentityInterpreter()) - - -def get_lazy_pipeline(endpoint): - return ClientPipeline(*pipeline_head(endpoint), rcompose(RestPickupStreamer(), RestReceiver())) - - -def pipeline_head(url): - return RestPacker(), RestDispatcher(url), RestReceiver() +# from typing import Iterable +# +# from funcy import rcompose +# +# from pyinfra.server.dispatcher.dispatchers.rest import RestDispatcher +# from pyinfra.server.interpreter.interpreters.identity import IdentityInterpreter +# from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStreamer +# from pyinfra.server.packer.packers.rest import RestPacker +# from pyinfra.server.client_pipeline import ClientPipeline +# from pyinfra.server.receiver.receivers.rest import RestReceiver +# +# +# def process_eagerly(endpoint, data: Iterable[bytes], metadata: Iterable[dict]): +# """Posts `data` to `url` and aggregates responses for each element of `data`.""" +# pipeline = get_eager_pipeline(endpoint) +# yield from pipeline(data, metadata) +# +# +# def process_lazily(endpoint, data: Iterable[bytes], metadata: Iterable[dict]): +# """Posts `data` to `url` and aggregates responses for each element of `data` by querying a pickup endpoint. +# +# Requires: +# - responses must provide return pickup_endpoint as JSON payload +# - responses must have status code 206 for more responses coming and 204 for the last response already sent +# """ +# pipeline = get_lazy_pipeline(endpoint) +# yield from pipeline(data, metadata) +# +# +# def get_eager_pipeline(endpoint): +# return ClientPipeline(*pipeline_head(endpoint), IdentityInterpreter()) +# +# +# def get_lazy_pipeline(endpoint): +# return ClientPipeline(*pipeline_head(endpoint), rcompose(RestPickupStreamer(), RestReceiver())) +# +# +# def pipeline_head(url): +# return RestPacker(), RestDispatcher(url), RestReceiver() diff --git a/pyinfra/server/utils.py b/pyinfra/server/utils.py index e96d2b0..05dd596 100644 --- a/pyinfra/server/utils.py +++ b/pyinfra/server/utils.py @@ -44,10 +44,6 @@ def dispatch_methods(data): return *repeat(requests.patch, ilen(data) - 1), requests.post -# def unpack_op_pack(operation) -> Callable[[Dict], List[Dict]]: -# return compose(starlift(pack), normalize, star(operation), unpack) - - def unpack_batchop_pack(operation) -> Callable[[List[Dict]], List[Dict]]: return compose(starlift(pack), normalize, operation, lift(unpack)) diff --git a/test/exploration_tests/pickup_endpoint_test.py b/test/exploration_tests/pickup_endpoint_test.py index 8e9bf56..1169679 100644 --- a/test/exploration_tests/pickup_endpoint_test.py +++ b/test/exploration_tests/pickup_endpoint_test.py @@ -1,12 +1,12 @@ -import pytest -from funcy import lmap - -from pyinfra.server.rest import process_lazily -from pyinfra.server.utils import unpack - - -@pytest.mark.parametrize("batched", [True, False]) -@pytest.mark.parametrize("item_type", ["string", "image", "pdf"]) -def test_pickup_endpoint(url, input_data_items, metadata, operation, targets, server_process): - output = lmap(unpack, process_lazily(f"{url}/submit", input_data_items, metadata)) - assert output == targets +# import pytest +# from funcy import lmap +# +# from pyinfra.server.rest import process_lazily +# from pyinfra.server.utils import unpack +# +# +# @pytest.mark.parametrize("batched", [True, False]) +# @pytest.mark.parametrize("item_type", ["string", "image", "pdf"]) +# def test_pickup_endpoint(url, input_data_items, metadata, operation, targets, server_process): +# output = lmap(unpack, process_lazily(f"{url}/submit", input_data_items, metadata)) +# assert output == targets diff --git a/test/unit_tests/server/buffer_test.py b/test/unit_tests/server/buffer_test.py index 674e537..828308f 100644 --- a/test/unit_tests/server/buffer_test.py +++ b/test/unit_tests/server/buffer_test.py @@ -1,10 +1,12 @@ -from funcy import compose, lmapcat, compact, flatten +import pytest +from funcy import compose, lmapcat, compact, flatten, identity +from pyinfra.exceptions import NoBufferCapacity +from pyinfra.server.buffering.bufferize import bufferize from pyinfra.server.dispatcher.dispatcher import Nothing -from pyinfra.server.buffering.buffer import bufferize -def test_buffer(): +def test_buffer(buffer_size): def buffer_mean(xs): return [sum(xs) / len(xs)] if xs else [] @@ -16,7 +18,7 @@ def test_buffer(): return reversed(list(xs)) reverse_buffer = bufferize(reverse_buffer, buffer_size=3) - ys = flatten(compact(map(reverse_buffer, (*range(10), Nothing)))) + ys = flatten(compact(map(reverse_buffer, (*range(10), Nothing)))) assert list(ys) == [2, 1, 0, 5, 4, 3, 8, 7, 6, 9] def buffer_sum(xs): @@ -25,3 +27,8 @@ def test_buffer(): buffer_sum = bufferize(buffer_sum, buffer_size=2) ys = flatten(compact(map(buffer_sum, (*range(10), Nothing)))) assert list(ys) == [0, 1, 0, 5, 0, 9, 0, 13, 0, 17, 0] + + +def test_buffer_overflow(buffer_size): + with pytest.raises(NoBufferCapacity): + bufferize(identity, buffer_size=0) diff --git a/test/unit_tests/server/stream_buffer_test.py b/test/unit_tests/server/stream_buffer_test.py index 3df908a..2a39190 100644 --- a/test/unit_tests/server/stream_buffer_test.py +++ b/test/unit_tests/server/stream_buffer_test.py @@ -68,6 +68,6 @@ def outputs(inputs, func): return lmap(func, inputs) -@pytest.fixture(params=[0, 1, 3, 10, 12]) +@pytest.fixture(params=[1, 3, 10, 12]) def buffer_size(request): return request.param