From 11c9f8a073ef228768fe31605d411129e1f79a4c Mon Sep 17 00:00:00 2001 From: Julius Unverfehrt Date: Tue, 22 Mar 2022 15:48:27 +0100 Subject: [PATCH] Pull request #25: Fixes Merge in RR/pyinfra from fixes to master Squashed commit of the following: commit e3eff12ccdea52e041cc7a14cda72d3e32aa2144 Author: Julius Unverfehrt Date: Tue Mar 22 15:42:35 2022 +0100 black commit 2bc520c849ea4e833cb60b2c97626da6636d3155 Author: Julius Unverfehrt Date: Tue Mar 22 15:42:08 2022 +0100 adjust mock script commit 429b6b8f3a3fc8aa35515395712057d1c7bec13e Author: Julius Unverfehrt Date: Tue Mar 22 15:41:42 2022 +0100 change scope for retry consume commit 7488394e313270fe7ba356c40d810e7cb3c706ee Author: Julius Unverfehrt Date: Tue Mar 22 15:39:39 2022 +0100 add heartbeat to AMQP connection commit 004c5fa805bfb982f55de533bc109fa21bacfbc8 Author: Julius Unverfehrt Date: Tue Mar 22 15:38:15 2022 +0100 Adjust error handling for missing prometheus endpoint: error is logged not raised --- pyinfra/config.py | 15 +++++++-------- pyinfra/flask.py | 2 +- pyinfra/queue/queue_manager/pika_queue_manager.py | 9 +++++++-- scripts/mock_client.py | 3 +++ src/serve.py | 4 ++-- 5 files changed, 20 insertions(+), 13 deletions(-) diff --git a/pyinfra/config.py b/pyinfra/config.py index 5927171..937b760 100644 --- a/pyinfra/config.py +++ b/pyinfra/config.py @@ -7,14 +7,13 @@ from pyinfra.locations import CONFIG_FILE def make_art(): return """ -______ _____ __ -| ___ \ |_ _| / _| -| |_/ / _ | | _ __ | |_ _ __ __ _ -| __/ | | || || '_ \| _| '__/ _` | -| | | |_| || || | | | | | | | (_| | -\_| \__, \___/_| |_|_| |_| \__,_| - __/ | - |___/ + ___ _ _ ___ __ + o O O | _ \ | || | |_ _| _ _ / _| _ _ __ _ + o | _/ \_, | | | | ' \ | _| | '_| / _` | + TS__[O] _|_|_ _|__/ |___| |_||_| _|_|_ _|_|_ \__,_| + {======|_| ``` |_| ````|_|`````|_|`````|_|`````|_|`````|_|`````| +./o--000' `-0-0-' `-0-0-' `-0-0-' `-0-0-' `-0-0-' `-0-0-' `-0-0-' + """ diff --git a/pyinfra/flask.py b/pyinfra/flask.py index fcddb97..549ef0c 100644 --- a/pyinfra/flask.py +++ b/pyinfra/flask.py @@ -58,7 +58,7 @@ def set_up_probing_webserver(): logger.warning(f"Got no metrics from analysis prometheus endpoint: {err}") informed_about_missing_prometheus_endpoint = True else: - raise err + logging.warning(f"Caught {err}") return resp.text return app diff --git a/pyinfra/queue/queue_manager/pika_queue_manager.py b/pyinfra/queue/queue_manager/pika_queue_manager.py index 75e1596..3b91fea 100644 --- a/pyinfra/queue/queue_manager/pika_queue_manager.py +++ b/pyinfra/queue/queue_manager/pika_queue_manager.py @@ -45,7 +45,12 @@ def get_connection(): credentials = pika.PlainCredentials(username=CONFIG.rabbitmq.user, password=CONFIG.rabbitmq.password) - kwargs = {"host": CONFIG.rabbitmq.host, "port": CONFIG.rabbitmq.port, "credentials": credentials} + kwargs = { + "host": CONFIG.rabbitmq.host, + "port": CONFIG.rabbitmq.port, + "credentials": credentials, + "heartbeat": CONFIG.rabbitmq.heartbeat, + } parameters = pika.ConnectionParameters(**kwargs) @@ -96,7 +101,7 @@ class PikaQueueManager(QueueManager): def publish_response(self, message, callback, max_attempts=3): - logger.debug(f"Publishing response for {message}.") + logger.debug(f"Processing {message}.") frame, properties, body = message diff --git a/scripts/mock_client.py b/scripts/mock_client.py index 89b2b90..e7699fa 100644 --- a/scripts/mock_client.py +++ b/scripts/mock_client.py @@ -54,6 +54,8 @@ def build_message_bodies(analyse_container_type): storage = get_s3_storage() for bucket_name, pdf_name in storage.get_all_object_names(CONFIG.storage.bucket): + if "pdf" not in pdf_name: + continue file_id = pdf_name.split(".")[0] dossier_id, file_id = file_id.split("/") message_dict = {"dossierId": dossier_id, "fileId": file_id} @@ -74,6 +76,7 @@ def main(args): for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output): print(f"Received {json.loads(body)}") channel.basic_ack(method_frame.delivery_tag) + channel.close() if __name__ == "__main__": diff --git a/src/serve.py b/src/serve.py index 1948abf..7e26ce4 100644 --- a/src/serve.py +++ b/src/serve.py @@ -54,11 +54,11 @@ def main(): @retry(ConsumerError, tries=3, delay=5, jitter=(1, 3)) def consume(): + consumer = Consumer(visitor, queue_manager) try: - consumer = Consumer(visitor, queue_manager) consumer.consume_and_publish() except Exception as err: - raise ConsumerError from err + raise ConsumerError() from err try: consume()