# NOTE: Something is messed up in the test setups. These tests fail when run together with other tests. Do not know yet # which ones and why. # import logging # from operator import itemgetter # # import pytest # from funcy import lmapcat # # from pyinfra.exceptions import ProcessingFailure # from pyinfra.utils.encoding import pack_for_upload # from pyinfra.visitor import get_object_descriptor, ForwardingStrategy # # logger = logging.getLogger() # # # class TestConsumer: # @pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session") # def test_consuming_empty_input_queue_does_not_put_anything_on_output_queue(self, consumer, queue_manager): # queue_manager.clear() # consumer.consume() # assert queue_manager.output_queue.empty() # # @pytest.mark.parametrize( # "queue_manager_name", # [ # "pika", # NOTE: pika must come first. Test fails IFF pika is in second place, for whatever reason. # "mock", # ], # scope="session", # ) # def test_consuming_nonempty_input_queue_puts_messages_on_output_queue_in_fifo_order( # self, consumer, queue_manager, callback, queue_manager_name # ): # # assert consumer.queue_manager is queue_manager # # def produce_items(): # return map(str, range(4)) # # def mock_visitor(callback): # def inner(data): # return callback({"data": data.encode()}) # # return inner # # callback = mock_visitor(callback) # # queue_manager.clear() # # for item in produce_items(): # queue_manager.publish_request(item) # # requests = consumer.consume(n=4) # # for r in requests: # queue_manager.publish_response(r, callback) # # assert queue_manager.output_queue.to_list()[:3] == ["00", "11", "22"] # # @pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session") # @pytest.mark.parametrize("client_name", ["mock", "s3", "azure"], scope="session") # @pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session") # def test_consuming_nonempty_input_queue_with_visitor_puts_messages_on_output_queue_in_fifo_order( # self, consumer, queue_manager, visitor, bucket_name, storage, items, input_data_encoder # ): # # visitor.response_strategy = ForwardingStrategy() # # queue_manager.clear() # storage.clear_bucket(bucket_name) # # for data, message in items: # storage.put_object(**get_object_descriptor(message), data=pack_for_upload(data)) # queue_manager.publish_request(message) # # requests = consumer.consume(inactivity_timeout=5) # # for itm, req in zip(items, requests): # logger.debug(f"Processing item {itm}") # queue_manager.publish_response(req, visitor) # # assert lmapcat(itemgetter("data"), queue_manager.output_queue.to_list()) == ["00", "11", "22"] # # @pytest.mark.parametrize("queue_manager_name", ["pika"], scope="session") # def test_message_is_republished_when_callback_raises_processing_failure_exception( # self, consumer, queue_manager, bucket_name, items # ): # class DebugError(Exception): # pass # # def callback(_): # raise ProcessingFailure # # def reject_patch(*args, **kwargs): # raise DebugError # # queue_manager.reject = reject_patch # # queue_manager.clear() # # for data, message in items: # queue_manager.publish_request(message) # # requests = consumer.consume() # # logger = logging.getLogger("pyinfra.queue.queue_manager.pika_queue_manager") # logger.addFilter(lambda record: False) # # # TODO: # # Note above says order of mock and pika matters. Note, that you can produce an error for debugging, when # # using `queue_manager.publish_response(next(requests), callback)` without the-with and while-block, where it # # becomes obvious, that the test with the note above then uses the data from THIS here test. # with pytest.raises(DebugError): # while True: # queue_manager.publish_response(next(requests), callback)