diff --git a/config.yaml b/config.yaml index f2af8f2..73b2943 100755 --- a/config.yaml +++ b/config.yaml @@ -7,9 +7,9 @@ rabbitmq: heartbeat: $RABBITMQ_HEARTBEAT|7200 # Controls AMQP heartbeat timeout in seconds queues: # Names of queues for... - input: input_queue # requests to service - output: response_queue # responses by service - dead_letter: dead_letter_queue # messages that failed to process + input: image_request_queue # requests to service + output: image_response_queue # responses by service + dead_letter: image_dead_letter_queue # messages that failed to process prefetch_count: 1 diff --git a/mini_queue/consumer.py b/mini_queue/consumer.py index 8b73873..05f9718 100644 --- a/mini_queue/consumer.py +++ b/mini_queue/consumer.py @@ -1,17 +1,10 @@ -import sys import pika +import sys from retry import retry from mini_queue.utils.config import CONFIG -def callback(channel, method, properties, body): - print("Received %r" % body) - response = body - channel.basic_publish(exchange="", routing_key=CONFIG.rabbitmq.queues.output, body=response) - channel.basic_ack(delivery_tag=method.delivery_tag) - - def init_params(): credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password) parameters = pika.ConnectionParameters( @@ -21,13 +14,3 @@ def init_params(): credentials=credentials, ) return parameters - - -# @retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3)) -def consume(parameters, queue): - connection = pika.BlockingConnection(parameters) - channel = connection.channel() - channel.basic_consume(queue=queue, auto_ack=False, on_message_callback=callback) - print(" [*] Waiting for messages. To exit press CTRL+C") - - channel.start_consuming() diff --git a/mini_queue/run.py b/mini_queue/run.py index a262f63..e73b4b7 100644 --- a/mini_queue/run.py +++ b/mini_queue/run.py @@ -1,10 +1,35 @@ -from mini_queue.consumer import consume, init_params +from mini_queue.consumer import init_params from mini_queue.utils.config import CONFIG +import sys +import pika +from retry import retry + +from mini_queue.utils.config import CONFIG + + +def callback(channel, method, properties, body): + print("Received %r" % body) + response = body + channel.basic_publish(exchange="", routing_key=CONFIG.rabbitmq.queues.output, body=response) + channel.basic_ack(delivery_tag=method.delivery_tag) if __name__ == "__main__": print("startet happy pikachu!") - queue = CONFIG.rabbitmq.queues.input + parameters = init_params() - consume(parameters, queue) + + connection = pika.BlockingConnection(parameters) + + channel = connection.channel() + + channel.queue_declare(queue=CONFIG.rabbitmq.queues.output, durable=True) + + while True: + try: + channel.basic_consume(queue=CONFIG.rabbitmq.queues.input, auto_ack=False, on_message_callback=callback) + print(" [*] Waiting for messages. To exit press CTRL+C") + channel.start_consuming() + except: + pass diff --git a/scripts/mock_publish.py b/scripts/mock_publish.py index 6543cc3..336998f 100644 --- a/scripts/mock_publish.py +++ b/scripts/mock_publish.py @@ -8,7 +8,6 @@ from mini_queue.utils.config import CONFIG if __name__ == "__main__": credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password) - queue = CONFIG.rabbitmq.queues.input parameters = pika.ConnectionParameters( host=CONFIG.rabbitmq.host, port=CONFIG.rabbitmq.port, @@ -18,9 +17,13 @@ if __name__ == "__main__": body = json.dumps({"fileId": "234", "dossierId": "3403"}) connection = pika.BlockingConnection(parameters) channel = connection.channel() - channel.queue_declare(queue=queue, durable=True) - channel.basic_publish("", queue, body) - print(f" [x] Sent {body} on {queue}") + + channel.queue_declare(queue=CONFIG.rabbitmq.queues.input, durable=True) + channel.queue_declare(queue=CONFIG.rabbitmq.queues.output, durable=True) + + channel.basic_publish("", CONFIG.rabbitmq.queues.input, body) + + print(f" [x] Put {body} on {CONFIG.rabbitmq.queues.input}") for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output): print(json.loads(body))