target data fixture and test for flat stream buffer on different data

This commit is contained in:
Matthias Bisping 2022-05-16 13:21:31 +02:00
parent d12124b2d5
commit 5d2b71d647
4 changed files with 27 additions and 17 deletions

View File

@ -3,12 +3,13 @@ from itertools import starmap, repeat
import numpy as np
import pytest
from PIL import Image
from funcy import lmap, compose, flatten
from funcy import lmap, compose, flatten, lflatten
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.utils import pack, unpack, normalize_item
from pyinfra.utils.func import star, lift, lstarlift
from test.utils.image import image_to_bytes
from test.utils.server import string_to_bytes
@pytest.fixture
@ -32,9 +33,22 @@ def input_data_items(item_type, n_items, pdf):
@pytest.fixture
def targets(input_data_items, item_type, operation, metadata):
def target_data_items(input_data_items, core_operation):
if core_operation is Nothing:
return Nothing
op = compose(normalize_item, core_operation)
expected = lflatten(map(op, input_data_items))
return expected
@pytest.fixture
def targets(input_data_items, operation, metadata):
if operation is Nothing:
return Nothing
op = compose(lift(star(pack)), normalize_item, operation)
expected = lmap(unpack, flatten(starmap(op, zip(input_data_items, metadata))))
return expected

View File

@ -72,10 +72,6 @@ def operation(core_operation):
return op
class InvalidParameterCombination(ValueError):
pass
@pytest.fixture
def core_operation(item_type, one_to_many):
def upper(string: bytes):

View File

@ -12,7 +12,6 @@ from pyinfra.server.receiver.receivers.identity import IdentityReceiver
from pyinfra.server.receiver.receivers.rest import RestReceiver
from pyinfra.server.utils import unpack
from pyinfra.utils.func import lift
from test.fixtures.server import InvalidParameterCombination
def test_mock_pipeline():
@ -26,16 +25,6 @@ def test_mock_pipeline():
assert list(pipeline(data)) == list(rcompose(f, g, h, u)(data))
def pass_invalid_combination(fn):
def inner(*args, **kwargs):
try:
return fn(*args, **kwargs)
except InvalidParameterCombination:
pass
return inner
@pytest.mark.parametrize(
"client_pipeline_type",
[

View File

@ -39,6 +39,17 @@ def test_flat_stream_buffer(func, inputs, outputs, buffer_size):
assert list(flat_stream_buffer([])) == []
def test_flat_stream_buffer_on_different_data(core_operation, input_data_items, target_data_items, buffer_size, item_type, one_to_many):
if core_operation is Nothing:
pytest.skip(f"No operation defined for parameter combination: {item_type=}, {one_to_many=}")
flat_stream_buffer = FlatStreamBuffer(lift(core_operation), buffer_size=buffer_size)
assert list(flat_stream_buffer(input_data_items)) == target_data_items
assert list(flat_stream_buffer([])) == []
def test_queued_stream_function(func, inputs, outputs):
queued_stream_function = QueuedStreamFunction(lift(func))