From 3f645484d979f62b9bb2c289ea0528178f4b911a Mon Sep 17 00:00:00 2001 From: Julius Unverfehrt Date: Mon, 11 Jul 2022 11:28:18 +0200 Subject: [PATCH] Pull request #41: RED-4564 Merge in RR/pyinfra from RED-4564 to master Squashed commit of the following: commit bf4c85a0ab9fed19a44508f2cbef6858cbb32259 Author: Viktor Seifert Date: Fri Jul 8 15:46:11 2022 +0200 RED-4564: POC-test to see if cancelling the consumer prevents messages from being stuck commit 12ebd186b220f263ac2275463b0c124e8f4210fc Author: Viktor Seifert Date: Fri Jul 8 14:32:05 2022 +0200 RED-4564: Print full exception with traceback when processing from the queue --- pyinfra/queue/queue_manager/pika_queue_manager.py | 10 ++++++++-- src/serve.py | 3 +++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/pyinfra/queue/queue_manager/pika_queue_manager.py b/pyinfra/queue/queue_manager/pika_queue_manager.py index 5ff27be..ad5191a 100644 --- a/pyinfra/queue/queue_manager/pika_queue_manager.py +++ b/pyinfra/queue/queue_manager/pika_queue_manager.py @@ -143,8 +143,14 @@ class PikaQueueManager(QueueManager): message = (frame, properties, body) return self.publish_response(message, visitor) - self.channel.basic_consume(self._input_queue, callback) - self.channel.start_consuming() + consumer_tag = None + + try: + consumer_tag = self.channel.basic_consume(self._input_queue, callback) + self.channel.start_consuming() + finally: + if consumer_tag: + self.channel.basic_cancel(consumer_tag) def clear(self): try: diff --git a/src/serve.py b/src/serve.py index 45025ab..435d632 100644 --- a/src/serve.py +++ b/src/serve.py @@ -40,6 +40,8 @@ def make_callback(analysis_endpoint): def main(): + logger = logging.getLogger("main") + show_banner() webserver = Process(target=run_probing_webserver, args=(set_up_probing_webserver(),)) @@ -58,6 +60,7 @@ def main(): consumer = Consumer(visitor, queue_manager) consumer.basic_consume_and_publish() except Exception as err: + logger.exception(err) raise ConsumerError from err try: