pyinfra/mini_queue/consumer.py
Julius Unverfehrt f2c1ee5a95 blacked
2022-02-14 14:16:41 +01:00

39 lines
1.3 KiB
Python

from functools import partial
import pika
from retry import retry
from mini_queue.utils.config import CONFIG
from mini_queue.producer import produce_response
def callback(ch, method, properties, body):
print("Received %r" % body)
parameters = init_params()
queue = CONFIG.rabbitmq.queues.output
produce_response(parameters, queue, body)
def init_params():
credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password)
parameters = pika.ConnectionParameters(
host=CONFIG.rabbitmq.host, port=CONFIG.rabbitmq.port, heartbeat=CONFIG.rabbitmq.heartbeat, credentials=credentials
)
return parameters
@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
def consume(parameters, queue):
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
while True:
try:
channel.basic_consume(queue=queue, auto_ack=True, on_message_callback=callback)
print(" [*] Waiting for messages. To exit press CTRL+C")
channel.start_consuming()
except pika.exceptions.ConnectionClosedByBroker:
pass
except pika.exceptions.AMQPChannelError:
pass
except pika.exceptions.AMQPConnectionError:
pass