import logging import sys import pika from retry import retry from mini_queue.utils.config import CONFIG def callback(channel, method, properties, body): logging.info(" [R] Received %r" % body) sys.sleep(1) response = body channel.basic_publish(exchange="", routing_key=CONFIG.rabbitmq.queues.output, body=response) channel.basic_ack(delivery_tag=method.delivery_tag) def init_params(): credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password) parameters = pika.ConnectionParameters( host=CONFIG.rabbitmq.host, port=CONFIG.rabbitmq.port, heartbeat=CONFIG.rabbitmq.heartbeat, credentials=credentials, ) return parameters def main(): logging.info(" [S] Startet happy pikachu!") parameters = init_params() connection = pika.BlockingConnection(parameters) channel = connection.channel() #channel.queue_declare(queue=CONFIG.rabbitmq.queues.output, durable=True) while True: try: channel.basic_consume(queue=CONFIG.rabbitmq.queues.input, auto_ack=False, on_message_callback=callback) logging.info(" [*] Waiting for messages. To exit press CTRL+C") channel.start_consuming() except pika.exceptions.ConnectionClosedByBroker as err: logging.info("Caught a channel error: {}, stopping...".format(err)) continue except pika.exceptions.AMQPChannelError as err: logging.warning("Caught a channel error: {}, stopping...".format(err)) break except pika.exceptions.AMQPConnectionError: logging.info("Connection was closed, retrying...") continue if __name__ == "__main__": logging_level = CONFIG.service.logging_level logging.basicConfig(level=logging_level) main()