125 lines
3.9 KiB
Python
125 lines
3.9 KiB
Python
import gzip
|
|
import logging
|
|
from operator import itemgetter
|
|
|
|
import pytest
|
|
|
|
from pyinfra.exceptions import ProcessingFailure
|
|
from pyinfra.queue.consumer import Consumer
|
|
from pyinfra.visitor import get_object_descriptor, ForwardingStrategy
|
|
|
|
logger = logging.getLogger()
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def consumer(queue_manager, callback):
|
|
return Consumer(callback, queue_manager)
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def access_callback():
|
|
return itemgetter("fileId")
|
|
|
|
|
|
@pytest.fixture()
|
|
def items():
|
|
def inner():
|
|
for i in range(3):
|
|
body = {
|
|
"dossierId": "folder",
|
|
"fileId": f"file{i}",
|
|
"targetFileExtension": "in.gz",
|
|
"responseFileExtension": "out.gz",
|
|
}
|
|
yield f"{i}".encode(), body
|
|
|
|
return list(inner())
|
|
|
|
|
|
class TestConsumer:
|
|
@pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session")
|
|
def test_consuming_empty_input_queue_does_not_put_anything_on_output_queue(self, consumer, queue_manager):
|
|
queue_manager.clear()
|
|
consumer.consume()
|
|
assert queue_manager.output_queue.empty()
|
|
|
|
@pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session")
|
|
def test_consuming_nonempty_input_queue_puts_messages_on_output_queue_in_fifo_order(
|
|
self, consumer, queue_manager, callback
|
|
):
|
|
def produce_items():
|
|
return map(str, range(3))
|
|
|
|
def mock_visitor(callback):
|
|
def inner(data):
|
|
return callback({"data": data.encode()})
|
|
|
|
return inner
|
|
|
|
callback = mock_visitor(callback)
|
|
|
|
queue_manager.clear()
|
|
|
|
for item in produce_items():
|
|
queue_manager.publish_request(item)
|
|
|
|
requests = consumer.consume()
|
|
|
|
for _, r in zip(produce_items(), requests):
|
|
queue_manager.publish_response(r, callback)
|
|
|
|
assert queue_manager.output_queue.to_list() == ["00", "11", "22"]
|
|
|
|
@pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session")
|
|
@pytest.mark.parametrize("client_name", ["mock", "s3", "azure"], scope="session")
|
|
@pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session")
|
|
def test_consuming_nonempty_input_queue_with_visitor_puts_messages_on_output_queue_in_fifo_order(
|
|
self, consumer, queue_manager, visitor, bucket_name, storage, items
|
|
):
|
|
|
|
visitor.response_strategy = ForwardingStrategy()
|
|
|
|
queue_manager.clear()
|
|
storage.clear_bucket(bucket_name)
|
|
|
|
for data, message in items:
|
|
storage.put_object(**get_object_descriptor(message), data=gzip.compress(data))
|
|
queue_manager.publish_request(message)
|
|
|
|
requests = consumer.consume(inactivity_timeout=5)
|
|
|
|
for itm, req in zip(items, requests):
|
|
logger.debug(f"Processing item {itm}")
|
|
queue_manager.publish_response(req, visitor)
|
|
|
|
assert list(map(itemgetter("data"), queue_manager.output_queue.to_list())) == ["00", "11", "22"]
|
|
|
|
@pytest.mark.parametrize("queue_manager_name", ["pika"], scope="session")
|
|
def test_message_is_republished_when_callback_raises_processing_failure_exception(
|
|
self, consumer, queue_manager, bucket_name, items
|
|
):
|
|
class DebugError(Exception):
|
|
pass
|
|
|
|
def callback(_):
|
|
raise ProcessingFailure()
|
|
|
|
def reject_patch(*args, **kwargs):
|
|
raise DebugError()
|
|
|
|
queue_manager.reject = reject_patch
|
|
|
|
queue_manager.clear()
|
|
|
|
for data, message in items:
|
|
queue_manager.publish_request(message)
|
|
|
|
requests = consumer.consume()
|
|
|
|
logger = logging.getLogger("pyinfra.queue.queue_manager.pika_queue_manager")
|
|
logger.addFilter(lambda record: False)
|
|
|
|
with pytest.raises(DebugError):
|
|
while True:
|
|
queue_manager.publish_response(next(requests), callback)
|