fixed bug in operation wrapper returning a tuple instead of an singleton-iterable with a tuple in one of the return-cases.

This commit is contained in:
Matthias Bisping 2022-06-13 15:36:17 +02:00
parent 8a64e5d868
commit 14ab23b2cc
3 changed files with 17 additions and 20 deletions

View File

@ -4,7 +4,7 @@ from itertools import starmap, repeat
import numpy as np
import pytest
from PIL import Image
from funcy import lmap, compose, flatten, lflatten, omit, join, pluck, lpluck, second, project, first, lzip
from funcy import lmap, compose, flatten, lflatten, omit, second, first, lzip
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.normalization import normalize_item
@ -84,10 +84,7 @@ def strings_to_bytes(strings):
@pytest.fixture
def targets(data_message_pairs, input_data_items, operation, metadata):
"""TODO: this has become super wonky"""
print(data_message_pairs)
klaus = lmap(second, data_message_pairs)
print(klaus)
metadata = [{**m1, **m2} for m1, m2 in zip(klaus, metadata)]
metadata = [{**m1, **m2} for m1, m2 in zip(lmap(second, data_message_pairs), metadata)]
if operation is Nothing:
return Nothing
@ -98,21 +95,15 @@ def targets(data_message_pairs, input_data_items, operation, metadata):
response_data, response_metadata = zip(*map(unpack, flatten(starmap(op, zip(input_data_items, metadata)))))
queue_message_keys = second(first(pair_data_with_queue_message([b""]))).keys()
print(queue_message_keys)
response_metadata = lmap(partial(omit, keys=queue_message_keys), response_metadata)
expected = lzip(response_data, response_metadata)
except ValueError:
print()
print(input_data_items)
print(metadata)
expected = []
print("expected", expected)
return expected
@pytest.fixture
def endpoint(url):
return f"{url}/submit"
@ -156,8 +147,8 @@ def images_to_bytes(images):
@pytest.fixture
def metadata():
return repeat({"key": "value"})
def metadata(n_items):
return list(repeat({"key": "value"}, times=n_items))
@pytest.fixture

View File

@ -63,14 +63,12 @@ def operation(core_operation):
def op(data, metadata):
assert isinstance(metadata, dict)
result = core_operation(data, metadata)
print("result", result, type(result))
if isinstance(result, Generator):
for data, metadata in result:
print(5555555555555555555555555555555)
yield data, omit(metadata, ["pages", "operation"])
else:
data, metadata = result
return data, omit(metadata, ["pages", "operation"])
yield data, omit(metadata, ["pages", "operation"])
if core_operation is Nothing:
return Nothing
@ -87,11 +85,8 @@ def core_operation(item_type, one_to_many, analysis_task):
return string.decode().upper().encode(), metadata
def extract(string: bytes, metadata):
print()
print("metadata", metadata)
for i, c in project(dict(enumerate(string.decode())), metadata["pages"]).items():
metadata["id"] = i
print("XYZ", metadata)
yield c.encode(), metadata
def rotate(im: bytes, metadata):

View File

@ -1,5 +1,5 @@
import pytest
from funcy import rcompose, compose
from funcy import rcompose, compose, project, second, merge
from pyinfra.server.buffering.stream import FlatStreamBuffer
from pyinfra.server.client_pipeline import ClientPipeline
@ -15,6 +15,7 @@ from pyinfra.server.receiver.receivers.rest import RestReceiver
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
from pyinfra.server.utils import make_streamable_and_wrap_in_packing_logic
from pyinfra.utils.func import lift, llift
from test.utils.input import pair_data_with_queue_message
def test_mock_pipeline():
@ -31,11 +32,21 @@ def test_mock_pipeline():
@pytest.mark.parametrize("client_pipeline_type", ["rest", "basic"])
def test_pipeline(core_operation, client_pipeline, input_data_items, metadata, targets, n_items):
assert core_operation is not Nothing
metadata = add_queue_metadata_to_server_side_metadata(metadata)
output = compose(llift(unpack), client_pipeline)(input_data_items, metadata)
assert n_items == 0 or len(output) > 0
assert output == targets
def add_queue_metadata_to_server_side_metadata(metadata):
return [
merge(project(second(mdt), [*mdt_o.keys(), "pages"]), mdt_o)
for mdt, mdt_o in zip(pair_data_with_queue_message(metadata), metadata)
]
@pytest.mark.parametrize("item_type", ["string"])
@pytest.mark.parametrize("n_items", [1])
def test_pipeline_is_lazy(input_data_items, metadata, basic_client_pipeline, buffer_size):