no buffer capacity test; commented out probably dead codde -- removing next
This commit is contained in:
parent
96bf831b00
commit
1074f44b30
@ -36,3 +36,6 @@ class IntentionalTestException(RuntimeError):
|
||||
|
||||
class UnexpectedItemType(ValueError):
|
||||
pass
|
||||
|
||||
class NoBufferCapacity(ValueError):
|
||||
pass
|
||||
@ -3,6 +3,7 @@ from collections import deque
|
||||
|
||||
from funcy import repeatedly, identity
|
||||
|
||||
from pyinfra.exceptions import NoBufferCapacity
|
||||
from pyinfra.server.dispatcher.dispatcher import Nothing
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -19,8 +20,6 @@ def bufferize(fn, buffer_size=3, persist_fn=identity, null_value=None):
|
||||
return response_payload or null_value
|
||||
|
||||
def buffer_full(current_buffer_size):
|
||||
# TODO: this assert does not hold for receiver test, unclear why
|
||||
# assert current_buffer_size <= buffer_size
|
||||
if current_buffer_size > buffer_size:
|
||||
logger.warning(f"Overfull buffer. size: {current_buffer_size}; intended capacity: {buffer_size}")
|
||||
|
||||
@ -30,6 +29,9 @@ def bufferize(fn, buffer_size=3, persist_fn=identity, null_value=None):
|
||||
current_buffer_size = len(buffer)
|
||||
return (final or buffer_full(current_buffer_size)) * current_buffer_size
|
||||
|
||||
if not buffer_size > 0:
|
||||
raise NoBufferCapacity("Buffer size must be greater than zero.")
|
||||
|
||||
buffer = deque()
|
||||
|
||||
return buffered_fn
|
||||
@ -3,7 +3,7 @@ from typing import Iterable
|
||||
|
||||
from funcy import first, repeatedly, flatten
|
||||
|
||||
from pyinfra.server.buffering.buffer import bufferize
|
||||
from pyinfra.server.buffering.bufferize import bufferize
|
||||
from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing
|
||||
|
||||
|
||||
|
||||
0
pyinfra/server/exceptions.py
Normal file
0
pyinfra/server/exceptions.py
Normal file
@ -1,39 +1,39 @@
|
||||
from typing import Iterable
|
||||
|
||||
from funcy import rcompose
|
||||
|
||||
from pyinfra.server.dispatcher.dispatchers.rest import RestDispatcher
|
||||
from pyinfra.server.interpreter.interpreters.identity import IdentityInterpreter
|
||||
from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStreamer
|
||||
from pyinfra.server.packer.packers.rest import RestPacker
|
||||
from pyinfra.server.client_pipeline import ClientPipeline
|
||||
from pyinfra.server.receiver.receivers.rest import RestReceiver
|
||||
|
||||
|
||||
def process_eagerly(endpoint, data: Iterable[bytes], metadata: Iterable[dict]):
|
||||
"""Posts `data` to `url` and aggregates responses for each element of `data`."""
|
||||
pipeline = get_eager_pipeline(endpoint)
|
||||
yield from pipeline(data, metadata)
|
||||
|
||||
|
||||
def process_lazily(endpoint, data: Iterable[bytes], metadata: Iterable[dict]):
|
||||
"""Posts `data` to `url` and aggregates responses for each element of `data` by querying a pickup endpoint.
|
||||
|
||||
Requires:
|
||||
- responses must provide return pickup_endpoint as JSON payload
|
||||
- responses must have status code 206 for more responses coming and 204 for the last response already sent
|
||||
"""
|
||||
pipeline = get_lazy_pipeline(endpoint)
|
||||
yield from pipeline(data, metadata)
|
||||
|
||||
|
||||
def get_eager_pipeline(endpoint):
|
||||
return ClientPipeline(*pipeline_head(endpoint), IdentityInterpreter())
|
||||
|
||||
|
||||
def get_lazy_pipeline(endpoint):
|
||||
return ClientPipeline(*pipeline_head(endpoint), rcompose(RestPickupStreamer(), RestReceiver()))
|
||||
|
||||
|
||||
def pipeline_head(url):
|
||||
return RestPacker(), RestDispatcher(url), RestReceiver()
|
||||
# from typing import Iterable
|
||||
#
|
||||
# from funcy import rcompose
|
||||
#
|
||||
# from pyinfra.server.dispatcher.dispatchers.rest import RestDispatcher
|
||||
# from pyinfra.server.interpreter.interpreters.identity import IdentityInterpreter
|
||||
# from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStreamer
|
||||
# from pyinfra.server.packer.packers.rest import RestPacker
|
||||
# from pyinfra.server.client_pipeline import ClientPipeline
|
||||
# from pyinfra.server.receiver.receivers.rest import RestReceiver
|
||||
#
|
||||
#
|
||||
# def process_eagerly(endpoint, data: Iterable[bytes], metadata: Iterable[dict]):
|
||||
# """Posts `data` to `url` and aggregates responses for each element of `data`."""
|
||||
# pipeline = get_eager_pipeline(endpoint)
|
||||
# yield from pipeline(data, metadata)
|
||||
#
|
||||
#
|
||||
# def process_lazily(endpoint, data: Iterable[bytes], metadata: Iterable[dict]):
|
||||
# """Posts `data` to `url` and aggregates responses for each element of `data` by querying a pickup endpoint.
|
||||
#
|
||||
# Requires:
|
||||
# - responses must provide return pickup_endpoint as JSON payload
|
||||
# - responses must have status code 206 for more responses coming and 204 for the last response already sent
|
||||
# """
|
||||
# pipeline = get_lazy_pipeline(endpoint)
|
||||
# yield from pipeline(data, metadata)
|
||||
#
|
||||
#
|
||||
# def get_eager_pipeline(endpoint):
|
||||
# return ClientPipeline(*pipeline_head(endpoint), IdentityInterpreter())
|
||||
#
|
||||
#
|
||||
# def get_lazy_pipeline(endpoint):
|
||||
# return ClientPipeline(*pipeline_head(endpoint), rcompose(RestPickupStreamer(), RestReceiver()))
|
||||
#
|
||||
#
|
||||
# def pipeline_head(url):
|
||||
# return RestPacker(), RestDispatcher(url), RestReceiver()
|
||||
|
||||
@ -44,10 +44,6 @@ def dispatch_methods(data):
|
||||
return *repeat(requests.patch, ilen(data) - 1), requests.post
|
||||
|
||||
|
||||
# def unpack_op_pack(operation) -> Callable[[Dict], List[Dict]]:
|
||||
# return compose(starlift(pack), normalize, star(operation), unpack)
|
||||
|
||||
|
||||
def unpack_batchop_pack(operation) -> Callable[[List[Dict]], List[Dict]]:
|
||||
return compose(starlift(pack), normalize, operation, lift(unpack))
|
||||
|
||||
|
||||
@ -1,12 +1,12 @@
|
||||
import pytest
|
||||
from funcy import lmap
|
||||
|
||||
from pyinfra.server.rest import process_lazily
|
||||
from pyinfra.server.utils import unpack
|
||||
|
||||
|
||||
@pytest.mark.parametrize("batched", [True, False])
|
||||
@pytest.mark.parametrize("item_type", ["string", "image", "pdf"])
|
||||
def test_pickup_endpoint(url, input_data_items, metadata, operation, targets, server_process):
|
||||
output = lmap(unpack, process_lazily(f"{url}/submit", input_data_items, metadata))
|
||||
assert output == targets
|
||||
# import pytest
|
||||
# from funcy import lmap
|
||||
#
|
||||
# from pyinfra.server.rest import process_lazily
|
||||
# from pyinfra.server.utils import unpack
|
||||
#
|
||||
#
|
||||
# @pytest.mark.parametrize("batched", [True, False])
|
||||
# @pytest.mark.parametrize("item_type", ["string", "image", "pdf"])
|
||||
# def test_pickup_endpoint(url, input_data_items, metadata, operation, targets, server_process):
|
||||
# output = lmap(unpack, process_lazily(f"{url}/submit", input_data_items, metadata))
|
||||
# assert output == targets
|
||||
|
||||
@ -1,10 +1,12 @@
|
||||
from funcy import compose, lmapcat, compact, flatten
|
||||
import pytest
|
||||
from funcy import compose, lmapcat, compact, flatten, identity
|
||||
|
||||
from pyinfra.exceptions import NoBufferCapacity
|
||||
from pyinfra.server.buffering.bufferize import bufferize
|
||||
from pyinfra.server.dispatcher.dispatcher import Nothing
|
||||
from pyinfra.server.buffering.buffer import bufferize
|
||||
|
||||
|
||||
def test_buffer():
|
||||
def test_buffer(buffer_size):
|
||||
def buffer_mean(xs):
|
||||
return [sum(xs) / len(xs)] if xs else []
|
||||
|
||||
@ -16,7 +18,7 @@ def test_buffer():
|
||||
return reversed(list(xs))
|
||||
|
||||
reverse_buffer = bufferize(reverse_buffer, buffer_size=3)
|
||||
ys = flatten(compact(map(reverse_buffer, (*range(10), Nothing))))
|
||||
ys = flatten(compact(map(reverse_buffer, (*range(10), Nothing))))
|
||||
assert list(ys) == [2, 1, 0, 5, 4, 3, 8, 7, 6, 9]
|
||||
|
||||
def buffer_sum(xs):
|
||||
@ -25,3 +27,8 @@ def test_buffer():
|
||||
buffer_sum = bufferize(buffer_sum, buffer_size=2)
|
||||
ys = flatten(compact(map(buffer_sum, (*range(10), Nothing))))
|
||||
assert list(ys) == [0, 1, 0, 5, 0, 9, 0, 13, 0, 17, 0]
|
||||
|
||||
|
||||
def test_buffer_overflow(buffer_size):
|
||||
with pytest.raises(NoBufferCapacity):
|
||||
bufferize(identity, buffer_size=0)
|
||||
|
||||
@ -68,6 +68,6 @@ def outputs(inputs, func):
|
||||
return lmap(func, inputs)
|
||||
|
||||
|
||||
@pytest.fixture(params=[0, 1, 3, 10, 12])
|
||||
@pytest.fixture(params=[1, 3, 10, 12])
|
||||
def buffer_size(request):
|
||||
return request.param
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user