RED-7002 Forward exceptions from thread context

PyInfra now reports exceptions that happen inside the processing
callback.
Also refactors queue manager logging to fit new logger by
changing "%s", var logic to f string, since this syntax is not supported
with knutlis logging.
This commit is contained in:
Julius Unverfehrt 2023-08-21 10:24:54 +02:00
parent 7187f0ec0c
commit 294688ea66

View File

@ -2,12 +2,11 @@ import atexit
import concurrent.futures
import json
import logging
import signal
from kn_utils.logging import getLogger
from pathlib import Path
import pika
import pika.exceptions
import signal
from kn_utils.logging import logger
from pathlib import Path
from pika.adapters.blocking_connection import BlockingChannel
from pyinfra.config import Config
@ -59,9 +58,6 @@ class QueueManager:
"""Handle RabbitMQ message reception & delivery"""
def __init__(self, config: Config):
self.logger = getLogger(__name__)
self.logger.setLevel(config.logging_level_root)
self._input_queue = config.request_queue
self._output_queue = config.response_queue
self._dead_letter_queue = config.dead_letter_queue
@ -123,16 +119,16 @@ class QueueManager:
"""
callback = self._create_queue_callback(process_payload)
self._set_consumer_token(None)
self.logger.info("Consuming from queue")
logger.info("Consuming from queue ...")
try:
self._open_channel()
self._set_consumer_token(self._channel.basic_consume(self._input_queue, callback))
self.logger.info("Registered with consumer-tag: %s", self._consumer_token)
logger.info(f"Registered with consumer-tag: {self._consumer_token}")
self._channel.start_consuming()
except Exception:
self.logger.error(
logger.error(
"An unexpected exception occurred while consuming messages. Consuming will stop.", exc_info=True
)
raise
@ -143,75 +139,67 @@ class QueueManager:
def stop_consuming(self):
if self._consumer_token and self._connection:
self.logger.info("Cancelling subscription for consumer-tag %s", self._consumer_token)
logger.info(f"Cancelling subscription for consumer-tag {self._consumer_token}")
self._channel.stop_consuming(self._consumer_token)
self._set_consumer_token(None)
def _handle_stop_signal(self, signal_number, _stack_frame, *args, **kwargs):
self.logger.info("Received signal %s", signal_number)
logger.info(f"Received signal {signal_number}")
self.stop_consuming()
def _create_queue_callback(self, process_payload: PayloadProcessor):
def process_message_body_and_await_result(unpacked_message_body):
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread_pool_executor:
self.logger.debug("Processing payload in separate thread")
logger.debug("Processing payload in separate thread.")
future = thread_pool_executor.submit(process_payload, unpacked_message_body)
while future.running():
self.logger.debug("Waiting for payload processing to finish")
logger.debug("Waiting for payload processing to finish...")
self._connection.sleep(float(self._connection_sleep))
try:
return future.result()
except Exception as err:
raise ProcessingFailure("QueueMessagePayload processing failed") from err
raise ProcessingFailure(f"QueueMessagePayload processing failed: {repr(err)}") from err
def acknowledge_message_and_publish_response(frame, headers, response_body):
response_properties = pika.BasicProperties(headers=headers) if headers else None
self._channel.basic_publish("", self._output_queue, json.dumps(response_body).encode(), response_properties)
self.logger.info(
"Result published, acknowledging incoming message with delivery_tag %s",
frame.delivery_tag,
)
logger.info(f"Result published, acknowledging incoming message with delivery_tag {frame.delivery_tag}.")
self._channel.basic_ack(frame.delivery_tag)
def callback(_channel, frame, properties, body):
self.logger.info("Received message from queue with delivery_tag %s", frame.delivery_tag)
self.logger.debug("Message headers: %s", properties.headers)
logger.info(f"Received message from queue with delivery_tag {frame.delivery_tag}.")
logger.debug(f"Message headers: {properties.headers}")
# Only try to process each message once. Re-queueing will be handled by the dead-letter-exchange. This
# prevents endless retries on messages that are impossible to process.
if frame.redelivered:
self.logger.info(
"Aborting message processing for delivery_tag %s due to it being redelivered",
frame.delivery_tag,
logger.info(
f"Aborting message processing for delivery_tag {frame.delivery_tag} due to it being redelivered.",
)
self._channel.basic_nack(frame.delivery_tag, requeue=False)
return
try:
self.logger.debug("Processing (%s, %s, %s)", frame, properties, body)
logger.debug(f"Processing {frame}, {properties}, {body}")
filtered_message_headers = safe_project(properties.headers, ["X-TENANT-ID"]) # TODO: parametrize key?
message_body = {**json.loads(body), **filtered_message_headers}
processing_result = process_message_body_and_await_result(message_body)
self.logger.info(
"Processed message with delivery_tag %s, publishing result to result-queue",
frame.delivery_tag,
logger.info(
f"Processed message with delivery_tag {frame.delivery_tag}, publishing result to result-queue."
)
acknowledge_message_and_publish_response(frame, filtered_message_headers, processing_result)
except ProcessingFailure:
self.logger.info(
"Processing message with delivery_tag %s failed, declining",
frame.delivery_tag,
)
except ProcessingFailure as err:
logger.info(f"Processing message with delivery_tag {frame.delivery_tag} failed, declining.")
logger.debug(f"ProcessingFailure: {err}")
self._channel.basic_nack(frame.delivery_tag, requeue=False)
except Exception:
n_attempts = _get_n_previous_attempts(properties) + 1
self.logger.warning("Failed to process message, %s attempts", n_attempts, exc_info=True)
logger.warning(f"Failed to process message, {n_attempts}", exc_info=True)
self._channel.basic_nack(frame.delivery_tag, requeue=False)
raise