diff --git a/pyinfra/queue/queue_manager/pika_queue_manager.py b/pyinfra/queue/queue_manager/pika_queue_manager.py index 5ff27be..ad5191a 100644 --- a/pyinfra/queue/queue_manager/pika_queue_manager.py +++ b/pyinfra/queue/queue_manager/pika_queue_manager.py @@ -143,8 +143,14 @@ class PikaQueueManager(QueueManager): message = (frame, properties, body) return self.publish_response(message, visitor) - self.channel.basic_consume(self._input_queue, callback) - self.channel.start_consuming() + consumer_tag = None + + try: + consumer_tag = self.channel.basic_consume(self._input_queue, callback) + self.channel.start_consuming() + finally: + if consumer_tag: + self.channel.basic_cancel(consumer_tag) def clear(self): try: diff --git a/src/serve.py b/src/serve.py index 45025ab..435d632 100644 --- a/src/serve.py +++ b/src/serve.py @@ -40,6 +40,8 @@ def make_callback(analysis_endpoint): def main(): + logger = logging.getLogger("main") + show_banner() webserver = Process(target=run_probing_webserver, args=(set_up_probing_webserver(),)) @@ -58,6 +60,7 @@ def main(): consumer = Consumer(visitor, queue_manager) consumer.basic_consume_and_publish() except Exception as err: + logger.exception(err) raise ConsumerError from err try: