fixed following bugs:

- upper() did yield instead of return
 - metdadata was not repeated when zipping with results generator
 - since test metadata was empty dict,  target data was therefore empty always, since results were zipped with {}
 - hence added check for target lengths > 0
 - fixed return value of queued stream function dispatcher; only returned first item of 1 -> n results
This commit is contained in:
Matthias Bisping 2022-05-17 21:48:16 +02:00
parent 456cb4157d
commit 6cb13051eb
6 changed files with 21 additions and 7 deletions

View File

@ -1,6 +1,7 @@
from funcy import rcompose, flatten
# TODO: remove the dispatcher component from the pipeline; it no longer actually dispatches
class ClientPipeline:
def __init__(self, packer, dispatcher, receiver, interpreter):
self.pipe = rcompose(

View File

@ -1,3 +1,7 @@
from itertools import takewhile
from funcy import repeatedly, notnone
from pyinfra.server.dispatcher.dispatcher import Dispatcher
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
@ -8,7 +12,10 @@ class QueuedStreamFunctionDispatcher(Dispatcher):
def patch(self, package):
self.queued_stream_function.push(package)
return self.queued_stream_function.pop()
# TODO: this is wonky and a result of the pipeline components having shifted behaviour through previous
# refactorings. The analogous functionality for the rest pipeline is in the interpreter. Correct this
# asymmetry!
yield from takewhile(notnone, repeatedly(self.queued_stream_function.pop))
def post(self, package):
return self.patch(package)
yield from self.patch(package)

View File

@ -36,7 +36,7 @@ logging.getLogger("PIL.PngImagePlugin").setLevel(level=logging.CRITICAL + 1)
logging.getLogger("waitress").setLevel(level=logging.CRITICAL + 1)
@pytest.fixture(autouse=False)
@pytest.fixture(autouse=True)
def mute_logger():
logger.setLevel(logging.CRITICAL + 1)

View File

@ -93,7 +93,7 @@ def images(n_items):
@pytest.fixture
def metadata(n_items):
return list(repeat({}, n_items))
return list(repeat({"key": "value"}, n_items))
@pytest.fixture

View File

@ -1,5 +1,6 @@
import io
import socket
from itertools import repeat
from multiprocessing import Process
from typing import Generator
@ -58,9 +59,10 @@ def operation_conditionally_batched(operation, batched):
@pytest.fixture
def operation(core_operation):
def op(data, metadata):
assert isinstance(metadata, dict)
result = core_operation(data)
if isinstance(result, Generator):
return zip(result, metadata)
return zip(result, repeat(metadata))
else:
return result, metadata
@ -72,7 +74,7 @@ def operation(core_operation):
@pytest.fixture
def core_operation(item_type, one_to_many):
def upper(string: bytes):
yield string.decode().upper().encode()
return string.decode().upper().encode()
def duplicate(string: bytes):
for _ in range(2):

View File

@ -29,10 +29,14 @@ def test_mock_pipeline():
@pytest.mark.parametrize("client_pipeline_type", ["rest", "basic"])
def test_pipeline(core_operation, client_pipeline, input_data_items, metadata, targets, item_type, one_to_many):
def test_pipeline(
core_operation, client_pipeline, input_data_items, metadata, targets, item_type, one_to_many, n_items
):
if core_operation is Nothing:
pytest.skip(f"No operation defined for parameter combination: {item_type=}, {one_to_many=}")
output = compose(llift(unpack), client_pipeline)(input_data_items, metadata)
if n_items > 0:
assert len(output) > 0
assert output == targets