From 5cd52928cea3a93168af1be1141fecf1fd320443 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Wed, 16 Feb 2022 15:49:38 +0100 Subject: [PATCH] refactorig WIP --- {mini_queue => pyinfra}/__init__.py | 0 pyinfra/callback.py | 14 ++++++ {mini_queue/utils => pyinfra}/config.py | 2 +- pyinfra/consume.py | 42 ++++++++++++++++++ {mini_queue/utils => pyinfra}/locations.py | 5 +-- {mini_queue/utils => pyinfra}/minio.py | 4 +- {mini_queue/utils => pyinfra}/rabbitmq.py | 12 ++++-- {mini_queue/utils => pyinfra}/storage.py | 4 +- {mini_queue => pyinfra}/utils/__init__.py | 0 {mini_queue => pyinfra}/utils/file.py | 0 {mini_queue => pyinfra}/utils/meta.py | 0 requirements.txt | 1 + scripts/manage_minio.py | 2 +- scripts/mock_client.py | 18 ++------ setup.py | 4 +- src/serve.py | 50 +++++++++------------- 16 files changed, 100 insertions(+), 58 deletions(-) rename {mini_queue => pyinfra}/__init__.py (100%) create mode 100644 pyinfra/callback.py rename {mini_queue/utils => pyinfra}/config.py (94%) create mode 100644 pyinfra/consume.py rename {mini_queue/utils => pyinfra}/locations.py (61%) rename {mini_queue/utils => pyinfra}/minio.py (97%) rename {mini_queue/utils => pyinfra}/rabbitmq.py (88%) rename {mini_queue/utils => pyinfra}/storage.py (97%) rename {mini_queue => pyinfra}/utils/__init__.py (100%) rename {mini_queue => pyinfra}/utils/file.py (100%) rename {mini_queue => pyinfra}/utils/meta.py (100%) diff --git a/mini_queue/__init__.py b/pyinfra/__init__.py similarity index 100% rename from mini_queue/__init__.py rename to pyinfra/__init__.py diff --git a/pyinfra/callback.py b/pyinfra/callback.py new file mode 100644 index 0000000..c9e3667 --- /dev/null +++ b/pyinfra/callback.py @@ -0,0 +1,14 @@ +from pyinfra.rabbitmq import make_connection, make_channel, declare_queue + + +def make_callback(body_processor, output_queue_name): + + connection = make_connection() + channel = make_channel(connection) + declare_queue(channel, output_queue_name) + + def callback(channel, method, _, body): + channel.basic_publish(exchange="", routing_key=output_queue_name, body=body_processor(body)) + channel.basic_ack(delivery_tag=method.delivery_tag) + + return callback diff --git a/mini_queue/utils/config.py b/pyinfra/config.py similarity index 94% rename from mini_queue/utils/config.py rename to pyinfra/config.py index c04a5e1..2e5f071 100644 --- a/mini_queue/utils/config.py +++ b/pyinfra/config.py @@ -3,7 +3,7 @@ from envyaml import EnvYAML -from mini_queue.utils.locations import CONFIG_FILE +from pyinfra.locations import CONFIG_FILE def _get_item_and_maybe_make_dotindexable(container, item): diff --git a/pyinfra/consume.py b/pyinfra/consume.py new file mode 100644 index 0000000..a074885 --- /dev/null +++ b/pyinfra/consume.py @@ -0,0 +1,42 @@ +import logging +from typing import Callable + +import pika +from retry import retry + +from pyinfra.rabbitmq import make_connection, make_channel, declare_queue + + +class ConsumerError(Exception): + pass + + +# @retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3)) +def consume(queue_name: str, on_message_callback: Callable): + + logging.info("Starting mini-queue...") + connection = make_connection() + channel = make_channel(connection) + declare_queue(channel, queue_name) + logging.info("Starting webserver...") + + while True: + try: + logging.info("Waiting for messages...") + channel.basic_consume(queue=queue_name, auto_ack=False, on_message_callback=on_message_callback) + channel.start_consuming() + + except pika.exceptions.ConnectionClosedByBroker as err: + logging.critical(f"Caught a channel error: {err}, stopping.") + break + + except pika.exceptions.AMQPChannelError as err: + logging.critical(f"Caught a channel error: {err}, stopping.") + break + + except pika.exceptions.AMQPConnectionError as err: + logging.info("No AMPQ-connection found, retrying...") + logging.debug(err) + continue + + raise ConsumerError(f"Error while consuming {queue_name}.") diff --git a/mini_queue/utils/locations.py b/pyinfra/locations.py similarity index 61% rename from mini_queue/utils/locations.py rename to pyinfra/locations.py index 7770a8a..330da8b 100644 --- a/mini_queue/utils/locations.py +++ b/pyinfra/locations.py @@ -5,11 +5,10 @@ from pathlib import Path MODULE_DIR = Path(__file__).resolve().parents[0] PACKAGE_ROOT_DIR = MODULE_DIR.parents[0] -REPO_ROOT_DIR = PACKAGE_ROOT_DIR.parents[0] -DOCKER_COMPOSE_FILE = REPO_ROOT_DIR.joinpath("docker-compose.yaml") +DOCKER_COMPOSE_FILE = PACKAGE_ROOT_DIR.joinpath("docker-compose.yaml") -CONFIG_FILE = REPO_ROOT_DIR.joinpath("config.yaml") +CONFIG_FILE = PACKAGE_ROOT_DIR.joinpath("config.yaml") LOG_FILE = Path("/tmp/log.log") DATA_DIR = PACKAGE_ROOT_DIR.joinpath("data") diff --git a/mini_queue/utils/minio.py b/pyinfra/minio.py similarity index 97% rename from mini_queue/utils/minio.py rename to pyinfra/minio.py index 1b8c5c0..4455257 100644 --- a/mini_queue/utils/minio.py +++ b/pyinfra/minio.py @@ -4,8 +4,8 @@ from typing import Iterable from minio import Minio -from mini_queue.utils.config import CONFIG -from mini_queue.utils.storage import StorageHandle +from pyinfra.config import CONFIG +from pyinfra.storage import StorageHandle def get_minio_client(access_key=None, secret_key=None) -> Minio: diff --git a/mini_queue/utils/rabbitmq.py b/pyinfra/rabbitmq.py similarity index 88% rename from mini_queue/utils/rabbitmq.py rename to pyinfra/rabbitmq.py index a69b47f..1114a16 100644 --- a/mini_queue/utils/rabbitmq.py +++ b/pyinfra/rabbitmq.py @@ -6,9 +6,9 @@ from operator import itemgetter import pika -from mini_queue.utils.config import CONFIG -from mini_queue.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name, download, unzip -from mini_queue.utils.minio import MinioHandle +from pyinfra.config import CONFIG +from pyinfra.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name, download, unzip +from pyinfra.minio import MinioHandle def make_channel(connection) -> pika.adapters.blocking_connection.BlockingChannel: @@ -87,3 +87,9 @@ def read_connection_params(): credentials=credentials, ) return parameters + + +def make_connection() -> pika.BlockingConnection: + parameters = read_connection_params() + connection = pika.BlockingConnection(parameters) + return connection diff --git a/mini_queue/utils/storage.py b/pyinfra/storage.py similarity index 97% rename from mini_queue/utils/storage.py rename to pyinfra/storage.py index 22e4053..e1cae48 100644 --- a/mini_queue/utils/storage.py +++ b/pyinfra/storage.py @@ -6,8 +6,8 @@ from itertools import repeat from operator import attrgetter from typing import Iterable -from mini_queue.utils.file import path_to_compressed_storage_pdf_object_name, provide_directory -from mini_queue.utils.meta import NoAttemptsLeft, max_attempts +from pyinfra.utils.file import path_to_compressed_storage_pdf_object_name, provide_directory +from pyinfra.utils.meta import NoAttemptsLeft, max_attempts class StorageHandle: diff --git a/mini_queue/utils/__init__.py b/pyinfra/utils/__init__.py similarity index 100% rename from mini_queue/utils/__init__.py rename to pyinfra/utils/__init__.py diff --git a/mini_queue/utils/file.py b/pyinfra/utils/file.py similarity index 100% rename from mini_queue/utils/file.py rename to pyinfra/utils/file.py diff --git a/mini_queue/utils/meta.py b/pyinfra/utils/meta.py similarity index 100% rename from mini_queue/utils/meta.py rename to pyinfra/utils/meta.py diff --git a/requirements.txt b/requirements.txt index 2b2cfa6..ae2cb1a 100755 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ envyaml==1.8.210417 minio==7.1.1 Flask==2.0.2 waitress==2.0.0 +tqdm==4.62.3 diff --git a/scripts/manage_minio.py b/scripts/manage_minio.py index 5577f06..278f32c 100644 --- a/scripts/manage_minio.py +++ b/scripts/manage_minio.py @@ -3,7 +3,7 @@ import os from tqdm import tqdm -from mini_queue.utils.minio import MinioHandle +from pyinfra.minio import MinioHandle def parse_args(): diff --git a/scripts/mock_client.py b/scripts/mock_client.py index f30da16..34bbe32 100644 --- a/scripts/mock_client.py +++ b/scripts/mock_client.py @@ -1,10 +1,8 @@ import json -import pika - -from mini_queue.utils.config import CONFIG -from mini_queue.utils.minio import MinioHandle -from mini_queue.utils.rabbitmq import make_channel, declare_queue +from pyinfra.config import CONFIG +from pyinfra.minio import MinioHandle +from pyinfra.rabbitmq import make_channel, declare_queue, make_connection def build_message_bodies(): @@ -15,17 +13,9 @@ def build_message_bodies(): if __name__ == "__main__": - credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password) - parameters = pika.ConnectionParameters( - host=CONFIG.rabbitmq.host, - port=CONFIG.rabbitmq.port, - heartbeat=CONFIG.rabbitmq.heartbeat, - credentials=credentials, - ) - connection = pika.BlockingConnection(parameters) + connection = make_connection() channel = make_channel(connection) - declare_queue(channel, CONFIG.rabbitmq.queues.input) declare_queue(channel, CONFIG.rabbitmq.queues.output) diff --git a/setup.py b/setup.py index f080f6a..b268e3b 100755 --- a/setup.py +++ b/setup.py @@ -3,11 +3,11 @@ from distutils.core import setup setup( - name="mini_queue", + name="pyinfra", version="0.0.1", description="", author="", author_email="", url="", - packages=["mini_queue"], + packages=["pyinfra"], ) diff --git a/src/serve.py b/src/serve.py index fbc73e9..020b003 100644 --- a/src/serve.py +++ b/src/serve.py @@ -1,12 +1,13 @@ +import json import logging from multiprocessing import Process -import pika from flask import Flask, jsonify from waitress import serve -from mini_queue.utils.config import CONFIG -from mini_queue.utils.rabbitmq import make_channel, declare_queue, make_callback, read_connection_params +from pyinfra.config import CONFIG +from pyinfra.consume import consume, ConsumerError +from pyinfra.callback import make_callback # TODO: implement meaningful checks @@ -34,40 +35,29 @@ def start_integrity_checks_webserver(mode="debug"): def main(): - logging.info("Starting mini-queue...") + def processor(body): + print(f"[OUT]: {json.loads(body)}") + return body - parameters = read_connection_params() - connection = pika.BlockingConnection(parameters) - channel = make_channel(connection) - declare_queue(channel, CONFIG.rabbitmq.queues.input) + callback = make_callback(body_processor=processor, output_queue_name=CONFIG.rabbitmq.queues.output) - logging.info("Starting webserver...") + webserver = Process(target=start_integrity_checks_webserver, args=("debug", )) + webserver.start() - p = Process(target=start_integrity_checks_webserver, args=("debug", )) - p.start() + try: + consume(CONFIG.rabbitmq.queues.input, callback) + except KeyboardInterrupt: + pass + except ConsumerError: + webserver.terminate() + raise - while True: - try: - channel.basic_consume( - queue=CONFIG.rabbitmq.queues.input, auto_ack=False, on_message_callback=make_callback("dummy") - ) - logging.info(" [*] Waiting for messages. To exit press CTRL+C") - channel.start_consuming() - - except pika.exceptions.ConnectionClosedByBroker as err: - logging.info(f"Caught a channel error: {err}, retrying...") - continue - except pika.exceptions.AMQPChannelError as err: - logging.critical(f"Caught a channel error: {err}, stopping...") - break - except pika.exceptions.AMQPConnectionError: - logging.info("Connection was closed, retrying...") - continue - - p.join() + webserver.join() if __name__ == "__main__": logging_level = CONFIG.service.logging_level logging.basicConfig(level=logging_level) + logging.getLogger("pika").setLevel(logging.ERROR) + main()