setup pika

This commit is contained in:
Julius Unverfehrt 2022-02-14 16:36:01 +01:00
parent 78aca3f40c
commit d770e1f717
9 changed files with 21 additions and 41 deletions

0
Dockerfile Normal file → Executable file
View File

0
README.md Normal file → Executable file
View File

0
config.yaml Normal file → Executable file
View File

View File

@ -1,17 +1,5 @@
version: '2'
services:
# minio:
# image: minio/minio
# ports:
# - "9000:9000"
# environment:
# - MINIO_ROOT_PASSWORD=password
# - MINIO_ROOT_USER=root
# volumes:
# - ./data/minio_store:/data
# command: server /data
# network_mode: "bridge"
rabbitmq:
image: docker.io/bitnami/rabbitmq:3.9
ports:

View File

@ -1,17 +1,18 @@
import sys
import pika
from retry import retry
from mini_queue.producer import produce_response
from mini_queue.utils.config import CONFIG
def callback(ch, method, properties, body):
print("Received %r" % body)
parameters = init_params()
queue = CONFIG.rabbitmq.queues.output
produce_response(parameters, queue, body)
def callback(channel, method, properties, body):
print("Received %r" % body)
response = body
channel.basic_publish(exchange="", routing_key=CONFIG.rabbitmq.queues.output, body=response)
channel.basic_ack(delivery_tag=method.delivery_tag)
def init_params():
credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password)
parameters = pika.ConnectionParameters(
@ -23,18 +24,12 @@ def init_params():
return parameters
@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
#@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
def consume(parameters, queue):
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
try:
channel.basic_consume(queue=queue, auto_ack=True, on_message_callback=callback)
print(" [*] Waiting for messages. To exit press CTRL+C")
channel.basic_consume(queue=queue, auto_ack=False, on_message_callback=callback)
print(" [*] Waiting for messages. To exit press CTRL+C")
channel.start_consuming()
channel.start_consuming()
except pika.exceptions.ConnectionClosedByBroker:
pass
except pika.exceptions.AMQPChannelError:
pass
except pika.exceptions.AMQPConnectionError:
pass

View File

@ -1,10 +0,0 @@
import pika
def produce_response(parameters, queue, body):
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=queue, durable=True)
channel.basic_publish("", queue, body)
print(f" [x] Sent {body} on {queue}")
connection.close()

0
requirements.txt Normal file → Executable file
View File

View File

@ -1,7 +1,6 @@
import json
import pika
from mini_queue.producer import produce_response
from mini_queue.utils.config import CONFIG
@ -17,4 +16,12 @@ if __name__ == "__main__":
credentials=credentials,
)
body = json.dumps({"fileId": "234", "dossierId": "3403"})
produce_response(parameters, queue, body)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=queue, durable=True)
channel.basic_publish("", queue, body)
print(f" [x] Sent {body} on {queue}")
for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output):
print(json.loads(body))
break

0
setup.py Normal file → Executable file
View File