Pull request #40: 2.0.0 documentation

Merge in RR/pyinfra from 2.0.0-documentation to 2.0.0

Squashed commit of the following:

commit 7a794bdcc987631cdc4d89b5620359464e2e018e
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jul 4 13:05:26 2022 +0200

    removed obsolete imports

commit 3fc6a7ef5d0172dbce1c4292d245eced2f378b5a
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jul 4 11:47:12 2022 +0200

    enable docker-compose fixture

commit 36d8d3bc851b06d94cf12a73048a00a67ef79c42
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jul 4 11:46:53 2022 +0200

    renaming

commit 3bf00d11cd041dff325b66f13fcd00d3ce96b8b5
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 30 12:47:57 2022 +0200

    refactoring: added cached pipeline factory

commit 90e735852af2f86e35be845fabf28494de952edb
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jun 29 13:47:08 2022 +0200

    renaming

commit 93b3d4b202b41183ed8cabe193a4bfa03f520787
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jun 29 13:25:03 2022 +0200

    further refactored server setup code: moving and decomplecting

commit 8b2ed83c7ade5bd811cb045d56fbfb0353fa385e
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jun 29 12:53:09 2022 +0200

    refactored server setup code: factored out and decoupled operation registry and prometheus summary registry

commit da2dce762bdd6889165fbb320dc9ee8a0bd089b2
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Tue Jun 28 19:40:04 2022 +0200

    adjusted test target

commit 70df7911b9b92f4b72afd7d4b33ca2bbf136295e
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Tue Jun 28 19:32:38 2022 +0200

    minor refactoring

commit 0937b63dc000346559bde353381304b273244109
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jun 27 13:59:59 2022 +0200

    support for empty operation suffix

commit 5e56917970962a2e69bbd66a324bdb4618c040bd
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jun 27 12:52:36 2022 +0200

    minor refactoring

commit 40665a7815ae5927b3877bda14fb77deef37d667
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jun 27 10:57:04 2022 +0200

    optimization: prefix filtering via storage API for get_all_object_names

commit af0892a899d09023eb0e61eecb63e03dc2fd3b60
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jun 27 10:55:47 2022 +0200

    topological sorting of definitions by caller hierarchy
This commit is contained in:
Julius Unverfehrt 2022-07-11 09:09:44 +02:00
parent 94254e1681
commit 01bfb1d668
22 changed files with 230 additions and 125 deletions

View File

@ -3,6 +3,7 @@ import logging
from funcy import merge, omit, lmap
from pyinfra.exceptions import AnalysisFailure
from pyinfra.pipeline_factory import CachedPipelineFactory
logger = logging.getLogger(__name__)
@ -12,30 +13,18 @@ class Callback:
endpoint.
"""
def __init__(self, base_url, pipeline_factory):
self.base_url = base_url
def __init__(self, pipeline_factory: CachedPipelineFactory):
self.pipeline_factory = pipeline_factory
self.endpoint2pipeline = {}
def __make_endpoint(self, operation):
return f"{self.base_url}/{operation}"
def __get_pipeline(self, endpoint):
if endpoint in self.endpoint2pipeline:
pipeline = self.endpoint2pipeline[endpoint]
else:
pipeline = self.pipeline_factory(endpoint)
self.endpoint2pipeline[endpoint] = pipeline
return pipeline
return self.pipeline_factory.get_pipeline(endpoint)
@staticmethod
def __run_pipeline(pipeline, body):
def __run_pipeline(pipeline, analysis_input: dict):
"""
TODO: Since data and metadata are passed as singletons, there is no buffering and hence no batching happening
within the pipeline. However, the queue acknowledgment logic needs to be changed in order to facilitate
passing non-singletons, to only ack a message, once a response is pulled from the output queue of the
passing non-singletons, to only ack a message, once a response is pulled from the output queue of the
pipeline. Probably the pipeline return value needs to contains the queue message frame (or so), in order for
the queue manager to tell which message to ack.
@ -43,19 +32,19 @@ class Callback:
operates on singletons ([data], [metadata]).
"""
def combine_storage_item_metadata_with_queue_message_metadata(body):
return merge(body["metadata"], omit(body, ["data", "metadata"]))
def combine_storage_item_metadata_with_queue_message_metadata(analysis_input):
return merge(analysis_input["metadata"], omit(analysis_input, ["data", "metadata"]))
def remove_queue_message_metadata(result):
metadata = omit(result["metadata"], queue_message_keys(body))
return {**result, "metadata": metadata}
def remove_queue_message_metadata(analysis_result):
metadata = omit(analysis_result["metadata"], queue_message_keys(analysis_input))
return {**analysis_result, "metadata": metadata}
def queue_message_keys(body):
return {*body.keys()}.difference({"data", "metadata"})
def queue_message_keys(analysis_input):
return {*analysis_input.keys()}.difference({"data", "metadata"})
try:
data = body["data"]
metadata = combine_storage_item_metadata_with_queue_message_metadata(body)
data = analysis_input["data"]
metadata = combine_storage_item_metadata_with_queue_message_metadata(analysis_input)
analysis_response_stream = pipeline([data], [metadata])
analysis_response_stream = lmap(remove_queue_message_metadata, analysis_response_stream)
return analysis_response_stream
@ -64,13 +53,13 @@ class Callback:
logger.error(err)
raise AnalysisFailure from err
def __call__(self, body: dict):
operation = body.get("operation", "submit")
endpoint = self.__make_endpoint(operation)
pipeline = self.__get_pipeline(endpoint)
def __call__(self, analysis_input: dict):
"""data_metadata_pack: {'dossierId': ..., 'fileId': ..., 'pages': ..., 'operation': ...}"""
operation = analysis_input.get("operation", "")
pipeline = self.__get_pipeline(operation)
try:
logging.debug(f"Requesting analysis from {endpoint}...")
return self.__run_pipeline(pipeline, body)
logging.debug(f"Requesting analysis for operation '{operation}'...")
return self.__run_pipeline(pipeline, analysis_input)
except AnalysisFailure:
logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.")
logging.warning(f"Exception caught when calling analysis endpoint for operation '{operation}'.")

View File

@ -6,6 +6,7 @@ from funcy import project, identity, rcompose
from pyinfra.callback import Callback
from pyinfra.config import parse_disjunction_string
from pyinfra.file_descriptor_manager import FileDescriptorManager
from pyinfra.pipeline_factory import CachedPipelineFactory
from pyinfra.queue.consumer import Consumer
from pyinfra.queue.queue_manager.pika_queue_manager import PikaQueueManager
from pyinfra.server.client_pipeline import ClientPipeline
@ -36,7 +37,7 @@ class ComponentFactory:
def get_callback(self, analysis_base_url=None):
analysis_base_url = analysis_base_url or self.config.rabbitmq.callback.analysis_endpoint
callback = Callback(analysis_base_url, self.get_pipeline)
callback = Callback(CachedPipelineFactory(base_url=analysis_base_url, pipeline_factory=self.get_pipeline))
def wrapped(body):
body_repr = project(body, ["dossierId", "fileId", "operation"])
@ -60,6 +61,13 @@ class ComponentFactory:
def get_queue_manager(self):
return PikaQueueManager(self.config.rabbitmq.queues.input, self.config.rabbitmq.queues.output)
@staticmethod
@lru_cache(maxsize=None)
def get_pipeline(endpoint):
return ClientPipeline(
RestPacker(), RestDispatcher(endpoint), RestReceiver(), rcompose(RestPickupStreamer(), RestReceiver())
)
@lru_cache(maxsize=None)
def get_storage(self):
return storages.get_storage(self.config.storage.backend)
@ -100,10 +108,3 @@ class ComponentFactory:
bucket_name=parse_disjunction_string(self.config.storage.bucket),
file_descriptor_manager=self.get_file_descriptor_manager(),
)
@staticmethod
@lru_cache(maxsize=None)
def get_pipeline(endpoint):
return ClientPipeline(
RestPacker(), RestDispatcher(endpoint), RestReceiver(), rcompose(RestPickupStreamer(), RestReceiver())
)

View File

@ -87,3 +87,12 @@ class FileDescriptorManager:
def build_storage_upload_info(analysis_payload, request_metadata):
storage_upload_info = {**request_metadata, "id": analysis_payload["metadata"].get("id", 0)}
return storage_upload_info
def get_path_prefix(self, queue_item_body):
prefix = "/".join(
itemgetter("dossierId", "fileId")(
self.build_file_descriptor(queue_item_body, end="input"),
)
)
return prefix

View File

@ -0,0 +1,18 @@
class CachedPipelineFactory:
def __init__(self, base_url, pipeline_factory):
self.base_url = base_url
self.operation2pipeline = {}
self.pipeline_factory = pipeline_factory
def get_pipeline(self, operation: str):
pipeline = self.operation2pipeline.get(operation, None) or self.__register_pipeline(operation)
return pipeline
def __register_pipeline(self, operation):
endpoint = self.__make_endpoint(operation)
pipeline = self.pipeline_factory(endpoint)
self.operation2pipeline[operation] = pipeline
return pipeline
def __make_endpoint(self, operation):
return f"{self.base_url}/{operation}"

View File

@ -1,7 +1,7 @@
from itertools import chain, takewhile
from typing import Iterable
from funcy import first, repeatedly, flatten
from funcy import first, repeatedly, mapcat
from pyinfra.server.buffering.bufferize import bufferize
from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing
@ -16,7 +16,7 @@ class FlatStreamBuffer:
def __call__(self, items):
items = chain(items, [Nothing])
yield from flatten(map(self.stream_buffer, items))
yield from mapcat(self.stream_buffer, items)
class StreamBuffer:

View File

@ -0,0 +1,39 @@
from functools import lru_cache
from funcy import identity
from prometheus_client import CollectorRegistry, Summary
from pyinfra.server.operation_dispatcher import OperationDispatcher
class OperationDispatcherMonitoringDecorator:
def __init__(self, operation_dispatcher: OperationDispatcher, naming_policy=identity):
self.operation_dispatcher = operation_dispatcher
self.operation2metric = {}
self.naming_policy = naming_policy
@property
@lru_cache(maxsize=None)
def registry(self):
return CollectorRegistry(auto_describe=True)
def make_summary_instance(self, op: str):
return Summary(f"{self.naming_policy(op)}_seconds", f"Time spent on {op}.", registry=self.registry)
def submit(self, operation, request):
return self.operation_dispatcher.submit(operation, request)
def pickup(self, operation):
with self.get_monitor(operation):
return self.operation_dispatcher.pickup(operation)
def get_monitor(self, operation):
monitor = self.operation2metric.get(operation, None) or self.register_operation(operation)
return monitor.time()
def register_operation(self, operation):
summary = self.make_summary_instance(operation)
self.operation2metric[operation] = summary
return summary

View File

@ -0,0 +1,33 @@
from itertools import starmap, tee
from typing import Dict
from funcy import juxt, zipdict, cat
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
from pyinfra.server.stream.rest import LazyRestProcessor
class OperationDispatcher:
def __init__(self, operation2function: Dict[str, QueuedStreamFunction]):
submit_suffixes, pickup_suffixes = zip(*map(juxt(submit_suffix, pickup_suffix), operation2function))
processors = starmap(LazyRestProcessor, zip(operation2function.values(), submit_suffixes, pickup_suffixes))
self.operation2processor = zipdict(submit_suffixes + pickup_suffixes, cat(tee(processors)))
@classmethod
@property
def pickup_suffix(cls):
return pickup_suffix("")
def submit(self, operation, request):
return self.operation2processor[operation].push(request)
def pickup(self, operation):
return self.operation2processor[operation].pop()
def submit_suffix(op: str):
return "" if not op else op
def pickup_suffix(op: str):
return "pickup" if not op else f"{op}_pickup"

View File

@ -2,13 +2,13 @@ from functools import singledispatch
from typing import Dict, Callable, Union
from flask import Flask, jsonify, request
from funcy import merge
from prometheus_client import generate_latest, Summary, CollectorRegistry
from prometheus_client import generate_latest
from pyinfra.config import CONFIG
from pyinfra.server.buffering.stream import FlatStreamBuffer
from pyinfra.server.monitoring import OperationDispatcherMonitoringDecorator
from pyinfra.server.operation_dispatcher import OperationDispatcher
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
from pyinfra.server.stream.rest import LazyRestProcessor
@singledispatch
@ -52,58 +52,49 @@ def __stream_fn_to_processing_server(operation2stream_fn: dict, buffer_size):
def __set_up_processing_server(operation2function: Dict[str, QueuedStreamFunction]):
app = Flask(__name__)
registry = CollectorRegistry(auto_describe=True)
operation2processor = {
op: LazyRestProcessor(fn, **build_endpoint_suffixes(op)) for op, fn in operation2function.items()
}
dispatcher = OperationDispatcherMonitoringDecorator(
OperationDispatcher(operation2function),
naming_policy=naming_policy,
)
def make_summary_instance(op: str):
op = op.replace("_pickup", "") if op != "pickup" else "default"
service_name = CONFIG.service.name
return Summary(f"redactmanager_{service_name}_{op}_seconds", f"Time spent on {op}.", registry=registry)
submit_operation2processor = {submit_suffix(op): prc for op, prc in operation2processor.items()}
pickup_operation2processor = {pickup_suffix(op): prc for op, prc in operation2processor.items()}
operation2processor = merge(submit_operation2processor, pickup_operation2processor)
operation2metric = {op: make_summary_instance(op) for op in pickup_operation2processor}
def ok():
resp = jsonify("OK")
resp.status_code = 200
return resp
@app.route("/ready", methods=["GET"])
def ready():
resp = jsonify("OK")
resp.status_code = 200
return resp
return ok()
@app.route("/health", methods=["GET"])
def healthy():
resp = jsonify("OK")
resp.status_code = 200
return resp
return ok()
@app.route("/prometheus", methods=["GET"])
def prometheus():
return generate_latest(registry=registry)
return generate_latest(registry=dispatcher.registry)
@app.route("/<operation>", methods=["POST", "PATCH"])
def submit(operation):
return operation2processor[operation].push(request)
return dispatcher.submit(operation, request)
@app.route("/", methods=["POST", "PATCH"])
def submit_default():
return dispatcher.submit("", request)
@app.route("/<operation>", methods=["GET"])
def pickup(operation):
with operation2metric[operation].time():
return operation2processor[operation].pop()
return dispatcher.pickup(operation)
return app
def build_endpoint_suffixes(op: str):
return {"submit_suffix": submit_suffix(op), "pickup_suffix": pickup_suffix(op)}
def naming_policy(op_name: str):
pop_suffix = OperationDispatcher.pickup_suffix
prefix = f"redactmanager_{CONFIG.service.name}"
op_display_name = op_name.replace(f"_{pop_suffix}", "") if op_name != pop_suffix else "default"
complete_display_name = f"{prefix}_{op_display_name}"
def submit_suffix(op: str):
return "submit" if not op else op
def pickup_suffix(op: str):
return "pickup" if not op else f"{op}_pickup"
return complete_display_name

View File

@ -4,13 +4,18 @@ from pyinfra.server.buffering.queue import stream_queue, Queue
class QueuedStreamFunction:
def __init__(self, fn):
def __init__(self, stream_function):
"""Combines a stream function with a queue.
Args:
stream_function: Needs to operate on iterables.
"""
self.queue = Queue()
self.fn = fn
self.stream_function = stream_function
def push(self, item):
self.queue.append(item)
def pop(self):
items = stream_queue(self.queue)
return first(self.fn(items))
return first(self.stream_function(items))

View File

@ -1,6 +1,7 @@
import logging
from flask import jsonify
from funcy import drop
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
@ -16,7 +17,7 @@ class LazyRestProcessor:
def push(self, request):
self.queued_stream_function.push(request.json)
return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix))
return jsonify(replace_suffix(request.base_url, self.submit_suffix, self.pickup_suffix))
def pop(self):
result = self.queued_stream_function.pop() or Nothing
@ -38,3 +39,11 @@ class LazyRestProcessor:
def valid(result):
return isinstance(result, dict) or result is Nothing
def replace_suffix(strn, suf, repl):
return remove_last_n(strn, len(suf)) + repl
def remove_last_n(strn, n):
return "".join(reversed(list(drop(n, reversed(strn)))))

View File

@ -30,5 +30,5 @@ class StorageAdapter(ABC):
raise NotImplementedError
@abstractmethod
def get_all_object_names(self, bucket_name):
def get_all_object_names(self, bucket_name, prefix=None):
raise NotImplementedError

View File

@ -58,7 +58,7 @@ class AzureStorageAdapter(StorageAdapter):
blobs = container_client.list_blobs()
container_client.delete_blobs(*blobs)
def get_all_object_names(self, bucket_name):
def get_all_object_names(self, bucket_name, prefix=None):
container_client = self.__provide_container_client(bucket_name)
blobs = container_client.list_blobs()
blobs = container_client.list_blobs(name_starts_with=prefix)
return map(attrgetter("name"), blobs)

View File

@ -53,6 +53,6 @@ class S3StorageAdapter(StorageAdapter):
for obj in objects:
self.__client.remove_object(bucket_name, obj.object_name)
def get_all_object_names(self, bucket_name):
objs = self.__client.list_objects(bucket_name, recursive=True)
def get_all_object_names(self, bucket_name, prefix=None):
objs = self.__client.list_objects(bucket_name, recursive=True, prefix=prefix)
return map(attrgetter("object_name"), objs)

View File

@ -40,5 +40,5 @@ class Storage:
def clear_bucket(self, bucket_name):
return self.__adapter.clear_bucket(bucket_name)
def get_all_object_names(self, bucket_name):
return self.__adapter.get_all_object_names(bucket_name)
def get_all_object_names(self, bucket_name, prefix=None):
return self.__adapter.get_all_object_names(bucket_name, prefix=prefix)

View File

@ -1,3 +1,4 @@
import logging
from functools import partial
from funcy import compose
@ -7,6 +8,8 @@ from pyinfra.storage.storage import Storage
from pyinfra.utils.encoding import decompress
from pyinfra.utils.func import flift
logger = logging.getLogger(__name__)
class Downloader:
def __init__(self, storage: Storage, bucket_name, file_descriptor_manager: FileDescriptorManager):
@ -17,19 +20,36 @@ class Downloader:
def __call__(self, queue_item_body):
return self.download(queue_item_body)
def get_names_of_objects_by_pattern(self, file_pattern: str):
matches_pattern = flift(file_pattern)
page_object_names = compose(matches_pattern, self.storage.get_all_object_names)(self.bucket_name)
return page_object_names
def download(self, queue_item_body):
names_of_relevant_objects = self.get_names_of_objects_by_pattern(queue_item_body)
objects = self.download_and_decompress_object(names_of_relevant_objects)
return objects
def get_names_of_objects_by_pattern(self, queue_item_body):
logger.debug(f"Filtering objects in bucket {self.bucket_name} by pattern...")
names_of_relevant_objects = compose(
self.get_pattern_filter(queue_item_body),
self.get_names_of_all_associated_objects,
)(queue_item_body)
logger.debug(f"Completed filtering.")
return names_of_relevant_objects
def download_and_decompress_object(self, object_names):
download = partial(self.storage.get_object, self.bucket_name)
return map(compose(decompress, download), object_names)
def download(self, queue_item_body):
def get_names_of_all_associated_objects(self, queue_item_body):
prefix = self.file_descriptor_manager.get_path_prefix(queue_item_body)
# TODO: performance tests for the following situations:
# 1) dossier with very many files
# 2) prefix matches very many files, independent of dossier cardinality
yield from self.storage.get_all_object_names(self.bucket_name, prefix=prefix)
def get_pattern_filter(self, queue_item_body):
file_pattern = self.file_descriptor_manager.build_input_matcher(queue_item_body)
page_object_names = self.get_names_of_objects_by_pattern(file_pattern)
objects = self.download_and_decompress_object(page_object_names)
return objects
matches_pattern = flift(file_pattern)
return matches_pattern

View File

@ -8,19 +8,12 @@ service:
output:
subdir: op_outp_files
extension: OUT.gz
single_inp_op:
input:
subdir: ""
extension: IN.gz
output:
subdir: op_outp_files
extension: OUT.gz
default:
input:
subdir: ""
extension: IN.gz
output:
subdir: ""
subdir: op_outp_files
extension: OUT.gz
storage:

View File

@ -1,8 +1,6 @@
import json
from pyinfra.config import CONFIG as MAIN_CONFIG
from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter
from pyinfra.visitor.strategies.blob_parsing.dynamic import DynamicParsingStrategy
MAIN_CONFIG["retry"]["delay"] = 0.1
MAIN_CONFIG["retry"]["jitter"] = (0.1, 0.2)

View File

@ -4,7 +4,7 @@ from itertools import starmap, repeat
import numpy as np
import pytest
from PIL import Image
from funcy import lmap, compose, flatten, lflatten, omit, second, first, lzip
from funcy import lmap, compose, flatten, lflatten, omit, second, first, lzip, merge
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.normalization import normalize_item
@ -85,9 +85,7 @@ def targets(data_message_pairs, input_data_items, operation, metadata, server_si
try:
response_data, response_metadata = zip(*map(unpack, flatten(starmap(op, zip(input_data_items, metadata)))))
queue_message_keys = ["id"] * (not server_side_test) + [
*first(queue_message_metadata).keys()
]
queue_message_keys = ["id"] * (not server_side_test) + [*first(queue_message_metadata).keys()]
response_metadata = lmap(partial(omit, keys=queue_message_keys), response_metadata)
expected = lzip(response_data, response_metadata)
@ -150,12 +148,14 @@ def metadata(n_items, many_to_n):
@pytest.fixture
def queue_message_metadata(n_items, operation_name):
def metadata(i):
return {
"dossierId": "folder",
"fileId": f"file{i}",
"pages": [0, 2, 3],
"operation": operation_name,
}
return merge(
{
"dossierId": "folder",
"fileId": f"file{i}",
"pages": [0, 2, 3],
},
({"operation": operation_name} if operation_name else {}),
)
return lmap(metadata, range(n_items))

View File

@ -52,7 +52,7 @@ def server(server_stream_function, buffer_size, operation_name):
@pytest.fixture
def operation_name(many_to_n):
return "multi_inp_op" if many_to_n else "single_inp_op"
return "multi_inp_op" if many_to_n else ""
@pytest.fixture

View File

@ -26,5 +26,5 @@ class StorageAdapterMock(StorageAdapter):
def clear_bucket(self, bucket_name):
return self.__client.clear_bucket(bucket_name)
def get_all_object_names(self, bucket_name):
def get_all_object_names(self, bucket_name, prefix=None):
return self.__client.get_all_object_names(bucket_name)

View File

@ -1,5 +1,5 @@
import pytest
from funcy import rcompose, compose, project, second, merge, lpluck
from funcy import rcompose, compose, project, merge
from pyinfra.server.buffering.stream import FlatStreamBuffer
from pyinfra.server.client_pipeline import ClientPipeline
@ -40,9 +40,9 @@ def test_pipeline(
for storage_mdt, queue_mdt in zip(metadata, queue_message_metadata)
]
output = compose(llift(unpack), client_pipeline)(input_data_items, metadata)
assert n_items == 0 or len(output) > 0
assert output == targets
outputs = compose(llift(unpack), client_pipeline)(input_data_items, metadata)
assert n_items == 0 or len(outputs) > 0
assert outputs == targets
@pytest.mark.parametrize("item_type", ["string"])

View File

@ -35,7 +35,7 @@ def test_stream_buffer_catches_type_error(func, inputs, outputs):
def test_flat_stream_buffer(func, inputs, outputs, buffer_size):
flat_stream_buffer = FlatStreamBuffer(lift(func), buffer_size=buffer_size)
assert list(flat_stream_buffer(inputs)) == lflatten(outputs)
assert list(flat_stream_buffer(inputs)) == outputs
assert list(flat_stream_buffer([])) == []