Pull request #25: Fixes
Merge in RR/pyinfra from fixes to master
Squashed commit of the following:
commit e3eff12ccdea52e041cc7a14cda72d3e32aa2144
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Tue Mar 22 15:42:35 2022 +0100
black
commit 2bc520c849ea4e833cb60b2c97626da6636d3155
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Tue Mar 22 15:42:08 2022 +0100
adjust mock script
commit 429b6b8f3a3fc8aa35515395712057d1c7bec13e
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Tue Mar 22 15:41:42 2022 +0100
change scope for retry consume
commit 7488394e313270fe7ba356c40d810e7cb3c706ee
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Tue Mar 22 15:39:39 2022 +0100
add heartbeat to AMQP connection
commit 004c5fa805bfb982f55de533bc109fa21bacfbc8
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Tue Mar 22 15:38:15 2022 +0100
Adjust error handling for missing prometheus endpoint: error is logged not raised
This commit is contained in:
parent
1b1da50faf
commit
11c9f8a073
@ -7,14 +7,13 @@ from pyinfra.locations import CONFIG_FILE
|
||||
|
||||
def make_art():
|
||||
return """
|
||||
______ _____ __
|
||||
| ___ \ |_ _| / _|
|
||||
| |_/ / _ | | _ __ | |_ _ __ __ _
|
||||
| __/ | | || || '_ \| _| '__/ _` |
|
||||
| | | |_| || || | | | | | | | (_| |
|
||||
\_| \__, \___/_| |_|_| |_| \__,_|
|
||||
__/ |
|
||||
|___/
|
||||
___ _ _ ___ __
|
||||
o O O | _ \ | || | |_ _| _ _ / _| _ _ __ _
|
||||
o | _/ \_, | | | | ' \ | _| | '_| / _` |
|
||||
TS__[O] _|_|_ _|__/ |___| |_||_| _|_|_ _|_|_ \__,_|
|
||||
{======|_| ``` |_| ````|_|`````|_|`````|_|`````|_|`````|_|`````|
|
||||
./o--000' `-0-0-' `-0-0-' `-0-0-' `-0-0-' `-0-0-' `-0-0-' `-0-0-'
|
||||
|
||||
"""
|
||||
|
||||
|
||||
|
||||
@ -58,7 +58,7 @@ def set_up_probing_webserver():
|
||||
logger.warning(f"Got no metrics from analysis prometheus endpoint: {err}")
|
||||
informed_about_missing_prometheus_endpoint = True
|
||||
else:
|
||||
raise err
|
||||
logging.warning(f"Caught {err}")
|
||||
return resp.text
|
||||
|
||||
return app
|
||||
|
||||
@ -45,7 +45,12 @@ def get_connection():
|
||||
|
||||
credentials = pika.PlainCredentials(username=CONFIG.rabbitmq.user, password=CONFIG.rabbitmq.password)
|
||||
|
||||
kwargs = {"host": CONFIG.rabbitmq.host, "port": CONFIG.rabbitmq.port, "credentials": credentials}
|
||||
kwargs = {
|
||||
"host": CONFIG.rabbitmq.host,
|
||||
"port": CONFIG.rabbitmq.port,
|
||||
"credentials": credentials,
|
||||
"heartbeat": CONFIG.rabbitmq.heartbeat,
|
||||
}
|
||||
|
||||
parameters = pika.ConnectionParameters(**kwargs)
|
||||
|
||||
@ -96,7 +101,7 @@ class PikaQueueManager(QueueManager):
|
||||
|
||||
def publish_response(self, message, callback, max_attempts=3):
|
||||
|
||||
logger.debug(f"Publishing response for {message}.")
|
||||
logger.debug(f"Processing {message}.")
|
||||
|
||||
frame, properties, body = message
|
||||
|
||||
|
||||
@ -54,6 +54,8 @@ def build_message_bodies(analyse_container_type):
|
||||
|
||||
storage = get_s3_storage()
|
||||
for bucket_name, pdf_name in storage.get_all_object_names(CONFIG.storage.bucket):
|
||||
if "pdf" not in pdf_name:
|
||||
continue
|
||||
file_id = pdf_name.split(".")[0]
|
||||
dossier_id, file_id = file_id.split("/")
|
||||
message_dict = {"dossierId": dossier_id, "fileId": file_id}
|
||||
@ -74,6 +76,7 @@ def main(args):
|
||||
for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output):
|
||||
print(f"Received {json.loads(body)}")
|
||||
channel.basic_ack(method_frame.delivery_tag)
|
||||
channel.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@ -54,11 +54,11 @@ def main():
|
||||
|
||||
@retry(ConsumerError, tries=3, delay=5, jitter=(1, 3))
|
||||
def consume():
|
||||
consumer = Consumer(visitor, queue_manager)
|
||||
try:
|
||||
consumer = Consumer(visitor, queue_manager)
|
||||
consumer.consume_and_publish()
|
||||
except Exception as err:
|
||||
raise ConsumerError from err
|
||||
raise ConsumerError() from err
|
||||
|
||||
try:
|
||||
consume()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user