diff --git a/mini_queue/consumer.py b/mini_queue/consumer.py index c81aff8..f693fbc 100644 --- a/mini_queue/consumer.py +++ b/mini_queue/consumer.py @@ -1,27 +1,42 @@ +from functools import partial import pika - +from retry import retry from mini_queue.utils.config import CONFIG +from mini_queue.producer import produce_response -def get_message(parameters): - def callback(ch, method, properties, body): - print(" [x] Received %r" % body) - inp_queue = CONFIG.rabbitmq.queues.input - out_queue = CONFIG.rabbitmq.queues.output - connection = pika.BlockingConnection(parameters) - channel = connection.channel() - channel.queue_declare(queue=inp_queue, durable=True) - channel.basic_consume(queue=inp_queue, - auto_ack=True, - on_message_callback=callback) - print(' [*] Waiting for messages. To exit press CTRL+C') - channel.start_consuming() +def callback(ch, method, properties, body): + print("Received %r" % body) + parameters = init_params() + queue = CONFIG.rabbitmq.queues.output + produce_response(parameters, queue, body) -if __name__ == "__main__": - +def init_params(): credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password) parameters = pika.ConnectionParameters(host=CONFIG.rabbitmq.host, port=CONFIG.rabbitmq.port, heartbeat=CONFIG.rabbitmq.heartbeat, credentials=credentials) - get_message(parameters) + return parameters + + +@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3)) +def consume(parameters, queue): + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + while True: + try: + channel.basic_consume(queue=queue, + auto_ack=True, + on_message_callback=callback) + print(' [*] Waiting for messages. To exit press CTRL+C') + + channel.start_consuming() + except pika.exceptions.ConnectionClosedByBroker: + pass + except pika.exceptions.AMQPChannelError: + pass + except pika.exceptions.AMQPConnectionError: + pass + + diff --git a/mini_queue/producer.py b/mini_queue/producer.py new file mode 100644 index 0000000..f4ae359 --- /dev/null +++ b/mini_queue/producer.py @@ -0,0 +1,14 @@ +import pika + +from mini_queue.utils.config import CONFIG + + +def produce_response(parameters, queue, body): + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + channel.queue_declare(queue=queue, durable=True) + channel.basic_publish("", + queue, + body) + print(f" [x] Sent {body} on {queue}") + connection.close() \ No newline at end of file diff --git a/mini_queue/put_on_queue.py b/mini_queue/put_on_queue.py deleted file mode 100644 index 0db966a..0000000 --- a/mini_queue/put_on_queue.py +++ /dev/null @@ -1,21 +0,0 @@ -import pika - -from mini_queue.utils.config import CONFIG - -def put_message(parameters): - connection = pika.BlockingConnection(parameters) - channel = connection.channel() - inp_queue = CONFIG.rabbitmq.queues.input - channel.queue_declare(queue=inp_queue, durable=True) - channel.basic_publish('', - inp_queue, - 'Hello Wald!') - print(" [x] Sent something'") - connection.close() - - -if __name__ == "__main__": - - credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password) - parameters = pika.ConnectionParameters(host=CONFIG.rabbitmq.host, port=CONFIG.rabbitmq.port, heartbeat=CONFIG.rabbitmq.heartbeat, credentials=credentials) - put_message(parameters) \ No newline at end of file diff --git a/mini_queue/run.py b/mini_queue/run.py new file mode 100644 index 0000000..564ef48 --- /dev/null +++ b/mini_queue/run.py @@ -0,0 +1,10 @@ +import pika +from mini_queue.consumer import consume, init_params +from mini_queue.utils.config import CONFIG + + +if __name__ == "__main__": + + queue = CONFIG.rabbitmq.queues.input + parameters = init_params() + consume(parameters, queue) diff --git a/mini_queue/utils/locations.py b/mini_queue/utils/locations.py index 0e34b40..7770a8a 100644 --- a/mini_queue/utils/locations.py +++ b/mini_queue/utils/locations.py @@ -1,15 +1,15 @@ """Defines constant paths relative to the module root path.""" +from pathlib import Path -from os import path -MODULE_DIR = path.dirname(path.abspath(__file__)) -PACKAGE_ROOT_DIR = path.dirname(MODULE_DIR) -REPO_ROOT_DIR = path.dirname(PACKAGE_ROOT_DIR) +MODULE_DIR = Path(__file__).resolve().parents[0] +PACKAGE_ROOT_DIR = MODULE_DIR.parents[0] +REPO_ROOT_DIR = PACKAGE_ROOT_DIR.parents[0] -DOCKER_COMPOSE_FILE = path.join(REPO_ROOT_DIR, "docker-compose.yaml") +DOCKER_COMPOSE_FILE = REPO_ROOT_DIR.joinpath("docker-compose.yaml") -CONFIG_FILE = path.join(REPO_ROOT_DIR, "config.yaml") -LOG_FILE = "/tmp/log.log" +CONFIG_FILE = REPO_ROOT_DIR.joinpath("config.yaml") +LOG_FILE = Path("/tmp/log.log") -DATA_DIR = path.join(PACKAGE_ROOT_DIR, "data") +DATA_DIR = PACKAGE_ROOT_DIR.joinpath("data") diff --git a/requirements.txt b/requirements.txt index df7f423..ecba2e8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ pika +retry diff --git a/scripts/mock_publish.py b/scripts/mock_publish.py new file mode 100644 index 0000000..a17a5ab --- /dev/null +++ b/scripts/mock_publish.py @@ -0,0 +1,14 @@ +import pika +from mini_queue.producer import produce_response + +from mini_queue.utils.config import CONFIG + + + +if __name__ == "__main__": + + credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password) + queue = CONFIG.rabbitmq.queues.input + parameters = pika.ConnectionParameters(host=CONFIG.rabbitmq.host, port=CONFIG.rabbitmq.port, heartbeat=CONFIG.rabbitmq.heartbeat, credentials=credentials) + body = "Pika pika!" + produce_response(parameters, queue, body) \ No newline at end of file