refactor: finnish queue manager, queue manager tests, also add validation logic, integrate new settings

This commit is contained in:
Julius Unverfehrt 2024-01-16 14:21:41 +01:00
parent ebc519ee0d
commit 27917863c9
5 changed files with 8 additions and 165 deletions

View File

@ -1,36 +0,0 @@
import sys
from kn_utils.logging import logger
from pathlib import Path
from pyinfra.queue.queue_manager import token_file_name
def check_token_file():
"""
Checks if the token file of the QueueManager exists and is not empty, i.e. the queue manager has been started.
NOTE: This function suppresses all Exception's.
Returns True if the queue manager has been started, False otherwise
"""
try:
token_file_path = Path(token_file_name())
if token_file_path.exists():
with token_file_path.open(mode="r", encoding="utf8") as token_file:
contents = token_file.read().strip()
return contents != ""
# We intentionally do not handle exception here, since we're only using this in a short script.
# Take care to expand this if the intended use changes
except Exception as err:
logger.warning(f"{err}: Caught exception when reading from token file", exc_info=True)
return False
def run_checks():
if check_token_file():
sys.exit(0)
else:
sys.exit(1)

View File

@ -18,6 +18,8 @@ from pyinfra.utils.config_validation import validate_settings, queue_manager_val
pika_logger = logging.getLogger("pika")
pika_logger.setLevel(logging.WARNING) # disables non-informative pika log clutter
MessageProcessor = Callable[[dict], dict]
class QueueManager:
def __init__(self, settings: Dynaconf):
@ -129,7 +131,7 @@ class QueueManager:
self.establish_connection()
return self.channel.basic_get(self.output_queue, auto_ack=True)
def _make_on_message_callback(self, message_processor: Callable):
def _make_on_message_callback(self, message_processor: MessageProcessor):
def process_message_body_and_await_result(unpacked_message_body):
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread_pool_executor:
logger.debug("Processing payload in separate thread.")
@ -163,7 +165,9 @@ class QueueManager:
else {}
)
logger.debug(f"Processing message with {filtered_message_headers=}.")
result = process_message_body_and_await_result({**json.loads(body), **filtered_message_headers})
result: dict = (
process_message_body_and_await_result({**json.loads(body), **filtered_message_headers}) or {}
)
channel.basic_publish(
"",

View File

@ -1,123 +0,0 @@
import json
import logging
import time
from multiprocessing import Process
import pika
import pika.exceptions
import pytest
from pyinfra.queue.development_queue_manager import DevelopmentQueueManager
from pyinfra.queue.queue_manager import QueueManager
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
@pytest.fixture(scope="session")
def development_queue_manager(test_queue_config):
test_queue_config.rabbitmq_heartbeat = 7200
development_queue_manager = DevelopmentQueueManager(test_queue_config)
yield development_queue_manager
logger.info("Tearing down development queue manager...")
try:
development_queue_manager.close_channel()
except pika.exceptions.ConnectionClosedByBroker:
pass
@pytest.fixture(scope="session")
def payload_processing_time(test_queue_config, offset=5):
# FIXME: this implicitly tests the heartbeat when running the end-to-end test. There should be another way to test
# this explicitly.
return test_queue_config.rabbitmq_heartbeat + offset
@pytest.fixture(scope="session")
def payload_processor(response_payload, payload_processing_time, payload_processor_type):
def process(payload):
time.sleep(payload_processing_time)
return response_payload
def process_with_failure(payload):
raise MemoryError
if payload_processor_type == "mock":
return process
elif payload_processor_type == "failing":
return process_with_failure
@pytest.fixture(scope="session", autouse=True)
def start_queue_consumer(test_queue_config, payload_processor, sleep_seconds=5):
def consume_queue():
queue_manager.start_consuming(payload_processor)
queue_manager = QueueManager(test_queue_config)
p = Process(target=consume_queue)
p.start()
logger.info(f"Setting up consumer, waiting for {sleep_seconds}...")
time.sleep(sleep_seconds)
yield
logger.info("Tearing down consumer...")
p.terminate()
@pytest.fixture
def message_properties(message_headers):
if not message_headers:
return pika.BasicProperties(headers=None)
elif message_headers == "X-TENANT-ID":
return pika.BasicProperties(headers={"X-TENANT-ID": "redaction"})
else:
raise Exception(f"Invalid {message_headers=}.")
@pytest.mark.parametrize("x_tenant_id", [None])
class TestQueueManager:
# FIXME: All tests here are wonky. This is due to the implementation of running the process-blocking queue_manager
# in a subprocess. It is then very hard to interact directly with the subprocess. If you have a better idea, please
# refactor; the tests here are insufficient to ensure the functionality of the queue manager!
@pytest.mark.parametrize("payload_processor_type", ["mock"], scope="session")
def test_message_processing_does_not_block_heartbeat(
self, development_queue_manager, payload, response_payload, payload_processing_time
):
development_queue_manager.clear_queues()
development_queue_manager.publish_request(payload)
time.sleep(payload_processing_time + 10)
_, _, body = development_queue_manager.get_response()
result = json.loads(body)
assert result == response_payload
@pytest.mark.parametrize("message_headers", [None, "X-TENANT-ID"])
@pytest.mark.parametrize("payload_processor_type", ["mock"], scope="session")
def test_queue_manager_forwards_message_headers(
self,
development_queue_manager,
payload,
response_payload,
payload_processing_time,
message_properties,
):
development_queue_manager.clear_queues()
development_queue_manager.publish_request(payload, message_properties)
time.sleep(payload_processing_time + 10)
_, properties, _ = development_queue_manager.get_response()
assert properties.headers == message_properties.headers
# FIXME: It is not possible to test the behavior of the queue manager directly, since it is running in a separate
# process. You require logging to see if the exception is handled correctly. Hence, this test is only useful for
# development, but insufficient to guarantee the correct behavior.
@pytest.mark.parametrize("payload_processor_type", ["failing"], scope="session")
def test_failed_message_processing_is_handled(
self,
development_queue_manager,
payload,
response_payload,
payload_processing_time,
):
development_queue_manager.clear_queues()
development_queue_manager.publish_request(payload)
time.sleep(payload_processing_time + 10)
_, _, body = development_queue_manager.get_response()
assert not body

View File

@ -6,7 +6,7 @@ import pika
import pytest
from kn_utils.logging import logger
from pyinfra.queue.queue_manager import QueueManager
from pyinfra.queue.manager import QueueManager
logger.remove()
logger.add(sink=stdout, level="DEBUG")
@ -15,7 +15,7 @@ logger.add(sink=stdout, level="DEBUG")
def make_callback(process_time):
def callback(x):
sleep(process_time)
return json.dumps({"status": "success"}).encode("utf-8")
return json.dumps({"status": "success"})
return callback
@ -57,7 +57,6 @@ class TestQueueManager:
response = queue_manager.get_message_from_output_queue()
assert response is not None
assert response[2] == b'{"status": "success"}'
print(response)
def test_all_headers_beginning_with_x_are_forwarded(self, queue_manager, input_message, stop_message):
queue_manager.purge_queues()
@ -78,7 +77,6 @@ class TestQueueManager:
queue_manager.start_consuming(callback)
response = queue_manager.get_message_from_output_queue()
print(response)
assert response[2] == b'{"status": "success"}'