pyinfra/tests/unit_test/queue_test.py
2024-04-16 16:19:45 +02:00

92 lines
3.0 KiB
Python

import json
from sys import stdout
from time import sleep
import pika
import pytest
from kn_utils.logging import logger
logger.remove()
logger.add(sink=stdout, level="DEBUG")
def make_callback(process_time):
def callback(x):
sleep(process_time)
return {"status": "success"}
return callback
def file_not_found_callback(x):
raise FileNotFoundError("File not found")
class TestQueueManager:
def test_not_available_file_leads_to_message_rejection_without_crashing(
self, queue_manager, input_message, stop_message
):
queue_manager.purge_queues()
queue_manager.publish_message_to_input_queue(input_message)
queue_manager.publish_message_to_input_queue(stop_message)
queue_manager.start_consuming(file_not_found_callback)
def test_processing_of_several_messages(self, queue_manager, input_message, stop_message):
queue_manager.purge_queues()
for _ in range(2):
queue_manager.publish_message_to_input_queue(input_message)
queue_manager.publish_message_to_input_queue(stop_message)
callback = make_callback(1)
queue_manager.start_consuming(callback)
for _ in range(2):
response = queue_manager.get_message_from_output_queue()
assert response is not None
assert json.loads(response[2].decode()) == {"status": "success"}
def test_all_headers_beginning_with_x_are_forwarded(self, queue_manager, input_message, stop_message):
queue_manager.purge_queues()
properties = pika.BasicProperties(
headers={
"X-TENANT-ID": "redaction",
"X-OTHER-HEADER": "other-header-value",
"x-tenant_id": "tenant-id-value",
"x_should_not_be_forwarded": "should-not-be-forwarded-value",
}
)
queue_manager.publish_message_to_input_queue(input_message, properties=properties)
queue_manager.publish_message_to_input_queue(stop_message)
callback = make_callback(0.2)
queue_manager.start_consuming(callback)
response = queue_manager.get_message_from_output_queue()
assert json.loads(response[2].decode()) == {"status": "success"}
assert response[1].headers["X-TENANT-ID"] == "redaction"
assert response[1].headers["X-OTHER-HEADER"] == "other-header-value"
assert response[1].headers["x-tenant_id"] == "tenant-id-value"
assert "x_should_not_be_forwarded" not in response[1].headers
def test_message_processing_does_not_block_heartbeat(self, queue_manager, input_message, stop_message):
queue_manager.purge_queues()
queue_manager.publish_message_to_input_queue(input_message)
queue_manager.publish_message_to_input_queue(stop_message)
callback = make_callback(15)
queue_manager.start_consuming(callback)
response = queue_manager.get_message_from_output_queue()
assert json.loads(response[2].decode()) == {"status": "success"}