import logging import pytest from funcy import pluck, lflatten from pyinfra.config import CONFIG from pyinfra.default_objects import ComponentFactory from pyinfra.exceptions import ProcessingFailure from pyinfra.visitor.strategies.response.forwarding import ForwardingStrategy from test.utils.storage import pack_for_upload logger = logging.getLogger() @pytest.mark.xfail( reason="NOTE: Something is messed up in the test setups." " These tests fail when run together with other tests. Do not know yet which ones and why." ) class TestConsumer: @pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session") def test_consuming_empty_input_queue_does_not_put_anything_on_output_queue(self, consumer, queue_manager): queue_manager.clear() consumer.consume() assert queue_manager.output_queue.empty() @pytest.mark.parametrize( "queue_manager_name", [ "pika", # NOTE: pika must come first. Test fails IFF pika is in second position, for whatever reason. "mock", ], scope="session", ) def test_consuming_nonempty_input_queue_puts_messages_on_output_queue_in_fifo_order( self, consumer, queue_manager, callback, queue_manager_name ): assert consumer.queue_manager is queue_manager def produce_items(): return map(str, range(4)) def mock_visitor(callback): def inner(data): return callback({"data": data.encode()}) return inner callback = mock_visitor(callback) queue_manager.clear() for item in produce_items(): queue_manager.publish_request(item) requests = consumer.consume(n=4) for r in requests: queue_manager.publish_response(r, callback) assert queue_manager.output_queue.to_list()[:3] == ["00", "11", "22"] @pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session") @pytest.mark.parametrize("client_name", ["mock", "s3", "azure"], scope="session") @pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session") def test_consuming_nonempty_input_queue_with_visitor_puts_messages_on_output_queue_in_fifo_order( self, consumer, queue_manager, visitor, bucket_name, storage, items, input_data_encoder, ): file_descriptor_manager = ComponentFactory(CONFIG).get_file_descriptor_manager() visitor.response_strategy = ForwardingStrategy() queue_manager.clear() storage.clear_bucket(bucket_name) for data, message in items: storage.put_object(**file_descriptor_manager.get_input_object_descriptor(message), data=pack_for_upload(data)) queue_manager.publish_request(message) requests = consumer.consume(inactivity_timeout=5) for itm, req in zip(items, requests): logger.debug(f"Processing item {itm}") queue_manager.publish_response(req, visitor) assert lflatten(pluck("analysis_payloads", queue_manager.output_queue.to_list())) == ["00", "11", "22"] @pytest.mark.parametrize("queue_manager_name", ["pika"], scope="session") def test_message_is_republished_when_callback_raises_processing_failure_exception( self, consumer, queue_manager, bucket_name, items ): class DebugError(Exception): pass def callback(_): raise ProcessingFailure def reject_patch(*args, **kwargs): raise DebugError queue_manager.reject = reject_patch queue_manager.clear() for data, message in items: queue_manager.publish_request(message) requests = consumer.consume() logger = logging.getLogger("pyinfra.queue.queue_manager.pika_queue_manager") logger.addFilter(lambda record: False) # TODO: # The note above says order of mock and pika matters. Note, that you can produce an error for debugging, when # using `queue_manager.publish_response(next(requests), callback)` without the-with and while-block, where it # becomes obvious, that the test with the note above then uses the data from THIS here test. with pytest.raises(DebugError): while True: queue_manager.publish_response(next(requests), callback)