From d770e1f71799979da7c2f061b5db83ebdc8f6f06 Mon Sep 17 00:00:00 2001 From: Julius Unverfehrt Date: Mon, 14 Feb 2022 16:36:01 +0100 Subject: [PATCH] setup pika --- Dockerfile | 0 README.md | 0 config.yaml | 0 docker-compose.yaml | 12 ------------ mini_queue/consumer.py | 29 ++++++++++++----------------- mini_queue/producer.py | 10 ---------- requirements.txt | 0 scripts/mock_publish.py | 11 +++++++++-- setup.py | 0 9 files changed, 21 insertions(+), 41 deletions(-) mode change 100644 => 100755 Dockerfile mode change 100644 => 100755 README.md mode change 100644 => 100755 config.yaml delete mode 100644 mini_queue/producer.py mode change 100644 => 100755 requirements.txt mode change 100644 => 100755 setup.py diff --git a/Dockerfile b/Dockerfile old mode 100644 new mode 100755 diff --git a/README.md b/README.md old mode 100644 new mode 100755 diff --git a/config.yaml b/config.yaml old mode 100644 new mode 100755 diff --git a/docker-compose.yaml b/docker-compose.yaml index 346d93e..d6d75a2 100755 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,17 +1,5 @@ version: '2' services: -# minio: -# image: minio/minio -# ports: -# - "9000:9000" -# environment: -# - MINIO_ROOT_PASSWORD=password -# - MINIO_ROOT_USER=root -# volumes: -# - ./data/minio_store:/data -# command: server /data -# network_mode: "bridge" - rabbitmq: image: docker.io/bitnami/rabbitmq:3.9 ports: diff --git a/mini_queue/consumer.py b/mini_queue/consumer.py index 89990b0..ae5e076 100644 --- a/mini_queue/consumer.py +++ b/mini_queue/consumer.py @@ -1,17 +1,18 @@ +import sys import pika from retry import retry -from mini_queue.producer import produce_response from mini_queue.utils.config import CONFIG -def callback(ch, method, properties, body): - print("Received %r" % body) - parameters = init_params() - queue = CONFIG.rabbitmq.queues.output - produce_response(parameters, queue, body) +def callback(channel, method, properties, body): + print("Received %r" % body) + response = body + channel.basic_publish(exchange="", routing_key=CONFIG.rabbitmq.queues.output, body=response) + channel.basic_ack(delivery_tag=method.delivery_tag) + def init_params(): credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password) parameters = pika.ConnectionParameters( @@ -23,18 +24,12 @@ def init_params(): return parameters -@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3)) +#@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3)) def consume(parameters, queue): connection = pika.BlockingConnection(parameters) channel = connection.channel() - try: - channel.basic_consume(queue=queue, auto_ack=True, on_message_callback=callback) - print(" [*] Waiting for messages. To exit press CTRL+C") + channel.basic_consume(queue=queue, auto_ack=False, on_message_callback=callback) + print(" [*] Waiting for messages. To exit press CTRL+C") + + channel.start_consuming() - channel.start_consuming() - except pika.exceptions.ConnectionClosedByBroker: - pass - except pika.exceptions.AMQPChannelError: - pass - except pika.exceptions.AMQPConnectionError: - pass diff --git a/mini_queue/producer.py b/mini_queue/producer.py deleted file mode 100644 index 8d829d6..0000000 --- a/mini_queue/producer.py +++ /dev/null @@ -1,10 +0,0 @@ -import pika - - -def produce_response(parameters, queue, body): - connection = pika.BlockingConnection(parameters) - channel = connection.channel() - channel.queue_declare(queue=queue, durable=True) - channel.basic_publish("", queue, body) - print(f" [x] Sent {body} on {queue}") - connection.close() diff --git a/requirements.txt b/requirements.txt old mode 100644 new mode 100755 diff --git a/scripts/mock_publish.py b/scripts/mock_publish.py index 542ab65..b5df849 100644 --- a/scripts/mock_publish.py +++ b/scripts/mock_publish.py @@ -1,7 +1,6 @@ import json import pika -from mini_queue.producer import produce_response from mini_queue.utils.config import CONFIG @@ -17,4 +16,12 @@ if __name__ == "__main__": credentials=credentials, ) body = json.dumps({"fileId": "234", "dossierId": "3403"}) - produce_response(parameters, queue, body) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + channel.queue_declare(queue=queue, durable=True) + channel.basic_publish("", queue, body) + print(f" [x] Sent {body} on {queue}") + + for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output): + print(json.loads(body)) + break \ No newline at end of file diff --git a/setup.py b/setup.py old mode 100644 new mode 100755