This commit is contained in:
Julius Unverfehrt 2022-02-14 14:15:27 +01:00
parent 4507bbb4e6
commit fce35894b4
7 changed files with 79 additions and 46 deletions

View File

@ -1,27 +1,42 @@
from functools import partial
import pika
from retry import retry
from mini_queue.utils.config import CONFIG
from mini_queue.producer import produce_response
def get_message(parameters):
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
inp_queue = CONFIG.rabbitmq.queues.input
out_queue = CONFIG.rabbitmq.queues.output
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=inp_queue, durable=True)
channel.basic_consume(queue=inp_queue,
auto_ack=True,
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
def callback(ch, method, properties, body):
print("Received %r" % body)
parameters = init_params()
queue = CONFIG.rabbitmq.queues.output
produce_response(parameters, queue, body)
if __name__ == "__main__":
def init_params():
credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password)
parameters = pika.ConnectionParameters(host=CONFIG.rabbitmq.host, port=CONFIG.rabbitmq.port, heartbeat=CONFIG.rabbitmq.heartbeat, credentials=credentials)
get_message(parameters)
return parameters
@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
def consume(parameters, queue):
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
while True:
try:
channel.basic_consume(queue=queue,
auto_ack=True,
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
except pika.exceptions.ConnectionClosedByBroker:
pass
except pika.exceptions.AMQPChannelError:
pass
except pika.exceptions.AMQPConnectionError:
pass

14
mini_queue/producer.py Normal file
View File

@ -0,0 +1,14 @@
import pika
from mini_queue.utils.config import CONFIG
def produce_response(parameters, queue, body):
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=queue, durable=True)
channel.basic_publish("",
queue,
body)
print(f" [x] Sent {body} on {queue}")
connection.close()

View File

@ -1,21 +0,0 @@
import pika
from mini_queue.utils.config import CONFIG
def put_message(parameters):
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
inp_queue = CONFIG.rabbitmq.queues.input
channel.queue_declare(queue=inp_queue, durable=True)
channel.basic_publish('',
inp_queue,
'Hello Wald!')
print(" [x] Sent something'")
connection.close()
if __name__ == "__main__":
credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password)
parameters = pika.ConnectionParameters(host=CONFIG.rabbitmq.host, port=CONFIG.rabbitmq.port, heartbeat=CONFIG.rabbitmq.heartbeat, credentials=credentials)
put_message(parameters)

10
mini_queue/run.py Normal file
View File

@ -0,0 +1,10 @@
import pika
from mini_queue.consumer import consume, init_params
from mini_queue.utils.config import CONFIG
if __name__ == "__main__":
queue = CONFIG.rabbitmq.queues.input
parameters = init_params()
consume(parameters, queue)

View File

@ -1,15 +1,15 @@
"""Defines constant paths relative to the module root path."""
from pathlib import Path
from os import path
MODULE_DIR = path.dirname(path.abspath(__file__))
PACKAGE_ROOT_DIR = path.dirname(MODULE_DIR)
REPO_ROOT_DIR = path.dirname(PACKAGE_ROOT_DIR)
MODULE_DIR = Path(__file__).resolve().parents[0]
PACKAGE_ROOT_DIR = MODULE_DIR.parents[0]
REPO_ROOT_DIR = PACKAGE_ROOT_DIR.parents[0]
DOCKER_COMPOSE_FILE = path.join(REPO_ROOT_DIR, "docker-compose.yaml")
DOCKER_COMPOSE_FILE = REPO_ROOT_DIR.joinpath("docker-compose.yaml")
CONFIG_FILE = path.join(REPO_ROOT_DIR, "config.yaml")
LOG_FILE = "/tmp/log.log"
CONFIG_FILE = REPO_ROOT_DIR.joinpath("config.yaml")
LOG_FILE = Path("/tmp/log.log")
DATA_DIR = path.join(PACKAGE_ROOT_DIR, "data")
DATA_DIR = PACKAGE_ROOT_DIR.joinpath("data")

View File

@ -1 +1,2 @@
pika
retry

14
scripts/mock_publish.py Normal file
View File

@ -0,0 +1,14 @@
import pika
from mini_queue.producer import produce_response
from mini_queue.utils.config import CONFIG
if __name__ == "__main__":
credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password)
queue = CONFIG.rabbitmq.queues.input
parameters = pika.ConnectionParameters(host=CONFIG.rabbitmq.host, port=CONFIG.rabbitmq.port, heartbeat=CONFIG.rabbitmq.heartbeat, credentials=credentials)
body = "Pika pika!"
produce_response(parameters, queue, body)