From 294688ea6687e197c603cf3c88f7853b018ab50d Mon Sep 17 00:00:00 2001 From: Julius Unverfehrt Date: Mon, 21 Aug 2023 10:24:54 +0200 Subject: [PATCH] RED-7002 Forward exceptions from thread context PyInfra now reports exceptions that happen inside the processing callback. Also refactors queue manager logging to fit new logger by changing "%s", var logic to f string, since this syntax is not supported with knutlis logging. --- pyinfra/queue/queue_manager.py | 58 ++++++++++++++-------------------- 1 file changed, 23 insertions(+), 35 deletions(-) diff --git a/pyinfra/queue/queue_manager.py b/pyinfra/queue/queue_manager.py index 2d0d608..7301506 100644 --- a/pyinfra/queue/queue_manager.py +++ b/pyinfra/queue/queue_manager.py @@ -2,12 +2,11 @@ import atexit import concurrent.futures import json import logging -import signal -from kn_utils.logging import getLogger -from pathlib import Path - import pika import pika.exceptions +import signal +from kn_utils.logging import logger +from pathlib import Path from pika.adapters.blocking_connection import BlockingChannel from pyinfra.config import Config @@ -59,9 +58,6 @@ class QueueManager: """Handle RabbitMQ message reception & delivery""" def __init__(self, config: Config): - self.logger = getLogger(__name__) - self.logger.setLevel(config.logging_level_root) - self._input_queue = config.request_queue self._output_queue = config.response_queue self._dead_letter_queue = config.dead_letter_queue @@ -123,16 +119,16 @@ class QueueManager: """ callback = self._create_queue_callback(process_payload) self._set_consumer_token(None) - self.logger.info("Consuming from queue") + logger.info("Consuming from queue ...") try: self._open_channel() self._set_consumer_token(self._channel.basic_consume(self._input_queue, callback)) - self.logger.info("Registered with consumer-tag: %s", self._consumer_token) + logger.info(f"Registered with consumer-tag: {self._consumer_token}") self._channel.start_consuming() except Exception: - self.logger.error( + logger.error( "An unexpected exception occurred while consuming messages. Consuming will stop.", exc_info=True ) raise @@ -143,75 +139,67 @@ class QueueManager: def stop_consuming(self): if self._consumer_token and self._connection: - self.logger.info("Cancelling subscription for consumer-tag %s", self._consumer_token) + logger.info(f"Cancelling subscription for consumer-tag {self._consumer_token}") self._channel.stop_consuming(self._consumer_token) self._set_consumer_token(None) def _handle_stop_signal(self, signal_number, _stack_frame, *args, **kwargs): - self.logger.info("Received signal %s", signal_number) + logger.info(f"Received signal {signal_number}") self.stop_consuming() def _create_queue_callback(self, process_payload: PayloadProcessor): def process_message_body_and_await_result(unpacked_message_body): with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread_pool_executor: - self.logger.debug("Processing payload in separate thread") + logger.debug("Processing payload in separate thread.") future = thread_pool_executor.submit(process_payload, unpacked_message_body) while future.running(): - self.logger.debug("Waiting for payload processing to finish") + logger.debug("Waiting for payload processing to finish...") self._connection.sleep(float(self._connection_sleep)) try: return future.result() except Exception as err: - raise ProcessingFailure("QueueMessagePayload processing failed") from err + raise ProcessingFailure(f"QueueMessagePayload processing failed: {repr(err)}") from err def acknowledge_message_and_publish_response(frame, headers, response_body): response_properties = pika.BasicProperties(headers=headers) if headers else None self._channel.basic_publish("", self._output_queue, json.dumps(response_body).encode(), response_properties) - self.logger.info( - "Result published, acknowledging incoming message with delivery_tag %s", - frame.delivery_tag, - ) + logger.info(f"Result published, acknowledging incoming message with delivery_tag {frame.delivery_tag}.") self._channel.basic_ack(frame.delivery_tag) def callback(_channel, frame, properties, body): - - self.logger.info("Received message from queue with delivery_tag %s", frame.delivery_tag) - self.logger.debug("Message headers: %s", properties.headers) + logger.info(f"Received message from queue with delivery_tag {frame.delivery_tag}.") + logger.debug(f"Message headers: {properties.headers}") # Only try to process each message once. Re-queueing will be handled by the dead-letter-exchange. This # prevents endless retries on messages that are impossible to process. if frame.redelivered: - self.logger.info( - "Aborting message processing for delivery_tag %s due to it being redelivered", - frame.delivery_tag, + logger.info( + f"Aborting message processing for delivery_tag {frame.delivery_tag} due to it being redelivered.", ) self._channel.basic_nack(frame.delivery_tag, requeue=False) return try: - self.logger.debug("Processing (%s, %s, %s)", frame, properties, body) + logger.debug(f"Processing {frame}, {properties}, {body}") filtered_message_headers = safe_project(properties.headers, ["X-TENANT-ID"]) # TODO: parametrize key? message_body = {**json.loads(body), **filtered_message_headers} processing_result = process_message_body_and_await_result(message_body) - self.logger.info( - "Processed message with delivery_tag %s, publishing result to result-queue", - frame.delivery_tag, + logger.info( + f"Processed message with delivery_tag {frame.delivery_tag}, publishing result to result-queue." ) acknowledge_message_and_publish_response(frame, filtered_message_headers, processing_result) - except ProcessingFailure: - self.logger.info( - "Processing message with delivery_tag %s failed, declining", - frame.delivery_tag, - ) + except ProcessingFailure as err: + logger.info(f"Processing message with delivery_tag {frame.delivery_tag} failed, declining.") + logger.debug(f"ProcessingFailure: {err}") self._channel.basic_nack(frame.delivery_tag, requeue=False) except Exception: n_attempts = _get_n_previous_attempts(properties) + 1 - self.logger.warning("Failed to process message, %s attempts", n_attempts, exc_info=True) + logger.warning(f"Failed to process message, {n_attempts}", exc_info=True) self._channel.basic_nack(frame.delivery_tag, requeue=False) raise