From 0dee98b23d361537f8e073dbbb6f86b76051ec2e Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Tue, 7 Jun 2022 15:11:38 +0200 Subject: [PATCH] consumer test adjustment WIP --- test/unit_tests/consumer_test.py | 151 +++++++++++++++++-------------- 1 file changed, 82 insertions(+), 69 deletions(-) diff --git a/test/unit_tests/consumer_test.py b/test/unit_tests/consumer_test.py index 3a8b2a6..3d8f1c3 100644 --- a/test/unit_tests/consumer_test.py +++ b/test/unit_tests/consumer_test.py @@ -3,8 +3,10 @@ import logging from operator import itemgetter import pytest +from funcy import lmap, lmapcat from pyinfra.exceptions import ProcessingFailure +from pyinfra.utils.encoding import pack_for_upload from pyinfra.visitor import get_object_descriptor, ForwardingStrategy logger = logging.getLogger() @@ -17,83 +19,94 @@ class TestConsumer: consumer.consume() assert queue_manager.output_queue.empty() - @pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session") - def test_consuming_nonempty_input_queue_puts_messages_on_output_queue_in_fifo_order( - self, consumer, queue_manager, callback + # @pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session") + # def test_consuming_nonempty_input_queue_puts_messages_on_output_queue_in_fifo_order( + # self, consumer, queue_manager, callback + # ): + # def produce_items(): + # return map(str, range(3)) + # + # def mock_visitor(callback): + # def inner(data): + # print("data", data) + # return callback({"data": data.encode()}) + # + # return inner + # + # callback = mock_visitor(callback) + # + # print(11111111111) + # queue_manager.clear() + # print(22222222222) + # + # for item in produce_items(): + # print("item", item) + # queue_manager.publish_request(item) + # + # requests = consumer.consume(n=3) + # print(33333333333) + # + # print(requests) + # print(list(produce_items())) + # print(44444444444) + # for _, r in zip(produce_items(), requests): + # print(1) + # queue_manager.publish_response(r, callback) + # print(55555555555) + # + # assert queue_manager.output_queue.to_list() == ["00", "11", "22"] + + # @pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session") + # @pytest.mark.parametrize("client_name", ["mock", "s3", "azure"], scope="session") + # @pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session") + # def test_consuming_nonempty_input_queue_with_visitor_puts_messages_on_output_queue_in_fifo_order( + # self, consumer, queue_manager, visitor, bucket_name, storage, items, input_data_encoder + # ): + # + # visitor.response_strategy = ForwardingStrategy() + # + # queue_manager.clear() + # storage.clear_bucket(bucket_name) + # + # for data, message in items: + # storage.put_object(**get_object_descriptor(message), data=pack_for_upload(data)) + # queue_manager.publish_request(message) + # + # requests = consumer.consume(inactivity_timeout=5) + # + # for itm, req in zip(items, requests): + # logger.debug(f"Processing item {itm}") + # queue_manager.publish_response(req, visitor) + # + # assert lmapcat(itemgetter("data"), queue_manager.output_queue.to_list()) == ["00", "11", "22"] + + @pytest.mark.parametrize("queue_manager_name", ["pika"], scope="session") + def test_message_is_republished_when_callback_raises_processing_failure_exception( + self, consumer, queue_manager, bucket_name, items ): - def produce_items(): - return map(str, range(3)) + class DebugError(Exception): + pass - def mock_visitor(callback): - def inner(data): - return callback({"data": data.encode()}) + def callback(_): + raise ProcessingFailure - return inner + def reject_patch(*args, **kwargs): + raise DebugError - callback = mock_visitor(callback) + queue_manager.reject = reject_patch queue_manager.clear() - for item in produce_items(): - queue_manager.publish_request(item) + for data, message in items: + queue_manager.publish_request(message) requests = consumer.consume() - for _, r in zip(produce_items(), requests): - queue_manager.publish_response(r, callback) + logger = logging.getLogger("pyinfra.queue.queue_manager.pika_queue_manager") + logger.addFilter(lambda record: False) - assert queue_manager.output_queue.to_list() == ["00", "11", "22"] - - @pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session") - @pytest.mark.parametrize("client_name", ["mock", "s3", "azure"], scope="session") - @pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session") - def test_consuming_nonempty_input_queue_with_visitor_puts_messages_on_output_queue_in_fifo_order( - self, consumer, queue_manager, visitor, bucket_name, storage, items, input_data_encoder - ): - - visitor.response_strategy = ForwardingStrategy() - - queue_manager.clear() - storage.clear_bucket(bucket_name) - - for data, message in items: - print(data) - storage.put_object(**get_object_descriptor(message), data=gzip.compress(data)) - queue_manager.publish_request(message) - - requests = consumer.consume(inactivity_timeout=5) - - for itm, req in zip(items, requests): - logger.debug(f"Processing item {itm}") - queue_manager.publish_response(req, visitor) - - assert list(map(itemgetter("data"), queue_manager.output_queue.to_list())) == ["00", "11", "22"] - - # @pytest.mark.parametrize("queue_manager_name", ["pika"], scope="session") - # def test_message_is_republished_when_callback_raises_processing_failure_exception( - # self, consumer, queue_manager, bucket_name, items - # ): - # class DebugError(Exception): - # pass - # - # def callback(_): - # raise ProcessingFailure - # - # def reject_patch(*args, **kwargs): - # raise DebugError - # - # queue_manager.reject = reject_patch - # - # queue_manager.clear() - # - # for data, message in items: - # queue_manager.publish_request(message) - # - # requests = consumer.consume() - # - # logger = logging.getLogger("pyinfra.queue.queue_manager.pika_queue_manager") - # logger.addFilter(lambda record: False) - # - # with pytest.raises(DebugError): - # while True: - # queue_manager.publish_response(next(requests), callback) + # TODO: for some reason this code now interferes with the commented out tests. all tests work on their own, but + # not together. + with pytest.raises(DebugError): + while True: + queue_manager.publish_response(next(requests), callback)