pyinfra/test/unit_tests/consumer_test.py

119 lines
4.3 KiB
Python

# NOTE: Something is messed up in the test setups. These tests fail when run together with other tests. Do not know yet
# which ones and why.
# import logging
# from operator import itemgetter
#
# import pytest
# from funcy import lmapcat
#
# from pyinfra.exceptions import ProcessingFailure
# from pyinfra.utils.encoding import pack_for_upload
# from pyinfra.visitor import get_object_descriptor, ForwardingStrategy
#
# logger = logging.getLogger()
#
#
# class TestConsumer:
# @pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session")
# def test_consuming_empty_input_queue_does_not_put_anything_on_output_queue(self, consumer, queue_manager):
# queue_manager.clear()
# consumer.consume()
# assert queue_manager.output_queue.empty()
#
# @pytest.mark.parametrize(
# "queue_manager_name",
# [
# "pika", # NOTE: pika must come first. Test fails IFF pika is in second place, for whatever reason.
# "mock",
# ],
# scope="session",
# )
# def test_consuming_nonempty_input_queue_puts_messages_on_output_queue_in_fifo_order(
# self, consumer, queue_manager, callback, queue_manager_name
# ):
#
# assert consumer.queue_manager is queue_manager
#
# def produce_items():
# return map(str, range(4))
#
# def mock_visitor(callback):
# def inner(data):
# return callback({"data": data.encode()})
#
# return inner
#
# callback = mock_visitor(callback)
#
# queue_manager.clear()
#
# for item in produce_items():
# queue_manager.publish_request(item)
#
# requests = consumer.consume(n=4)
#
# for r in requests:
# queue_manager.publish_response(r, callback)
#
# assert queue_manager.output_queue.to_list()[:3] == ["00", "11", "22"]
#
# @pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session")
# @pytest.mark.parametrize("client_name", ["mock", "s3", "azure"], scope="session")
# @pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session")
# def test_consuming_nonempty_input_queue_with_visitor_puts_messages_on_output_queue_in_fifo_order(
# self, consumer, queue_manager, visitor, bucket_name, storage, items, input_data_encoder
# ):
#
# visitor.response_strategy = ForwardingStrategy()
#
# queue_manager.clear()
# storage.clear_bucket(bucket_name)
#
# for data, message in items:
# storage.put_object(**get_object_descriptor(message), data=pack_for_upload(data))
# queue_manager.publish_request(message)
#
# requests = consumer.consume(inactivity_timeout=5)
#
# for itm, req in zip(items, requests):
# logger.debug(f"Processing item {itm}")
# queue_manager.publish_response(req, visitor)
#
# assert lmapcat(itemgetter("data"), queue_manager.output_queue.to_list()) == ["00", "11", "22"]
#
# @pytest.mark.parametrize("queue_manager_name", ["pika"], scope="session")
# def test_message_is_republished_when_callback_raises_processing_failure_exception(
# self, consumer, queue_manager, bucket_name, items
# ):
# class DebugError(Exception):
# pass
#
# def callback(_):
# raise ProcessingFailure
#
# def reject_patch(*args, **kwargs):
# raise DebugError
#
# queue_manager.reject = reject_patch
#
# queue_manager.clear()
#
# for data, message in items:
# queue_manager.publish_request(message)
#
# requests = consumer.consume()
#
# logger = logging.getLogger("pyinfra.queue.queue_manager.pika_queue_manager")
# logger.addFilter(lambda record: False)
#
# # TODO:
# # Note above says order of mock and pika matters. Note, that you can produce an error for debugging, when
# # using `queue_manager.publish_response(next(requests), callback)` without the-with and while-block, where it
# # becomes obvious, that the test with the note above then uses the data from THIS here test.
# with pytest.raises(DebugError):
# while True:
# queue_manager.publish_response(next(requests), callback)