refactorig WIP

This commit is contained in:
Matthias Bisping 2022-02-16 15:49:38 +01:00
parent ca020ee64f
commit 5cd52928ce
16 changed files with 100 additions and 58 deletions

14
pyinfra/callback.py Normal file
View File

@ -0,0 +1,14 @@
from pyinfra.rabbitmq import make_connection, make_channel, declare_queue
def make_callback(body_processor, output_queue_name):
connection = make_connection()
channel = make_channel(connection)
declare_queue(channel, output_queue_name)
def callback(channel, method, _, body):
channel.basic_publish(exchange="", routing_key=output_queue_name, body=body_processor(body))
channel.basic_ack(delivery_tag=method.delivery_tag)
return callback

View File

@ -3,7 +3,7 @@
from envyaml import EnvYAML
from mini_queue.utils.locations import CONFIG_FILE
from pyinfra.locations import CONFIG_FILE
def _get_item_and_maybe_make_dotindexable(container, item):

42
pyinfra/consume.py Normal file
View File

@ -0,0 +1,42 @@
import logging
from typing import Callable
import pika
from retry import retry
from pyinfra.rabbitmq import make_connection, make_channel, declare_queue
class ConsumerError(Exception):
pass
# @retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
def consume(queue_name: str, on_message_callback: Callable):
logging.info("Starting mini-queue...")
connection = make_connection()
channel = make_channel(connection)
declare_queue(channel, queue_name)
logging.info("Starting webserver...")
while True:
try:
logging.info("Waiting for messages...")
channel.basic_consume(queue=queue_name, auto_ack=False, on_message_callback=on_message_callback)
channel.start_consuming()
except pika.exceptions.ConnectionClosedByBroker as err:
logging.critical(f"Caught a channel error: {err}, stopping.")
break
except pika.exceptions.AMQPChannelError as err:
logging.critical(f"Caught a channel error: {err}, stopping.")
break
except pika.exceptions.AMQPConnectionError as err:
logging.info("No AMPQ-connection found, retrying...")
logging.debug(err)
continue
raise ConsumerError(f"Error while consuming {queue_name}.")

View File

@ -5,11 +5,10 @@ from pathlib import Path
MODULE_DIR = Path(__file__).resolve().parents[0]
PACKAGE_ROOT_DIR = MODULE_DIR.parents[0]
REPO_ROOT_DIR = PACKAGE_ROOT_DIR.parents[0]
DOCKER_COMPOSE_FILE = REPO_ROOT_DIR.joinpath("docker-compose.yaml")
DOCKER_COMPOSE_FILE = PACKAGE_ROOT_DIR.joinpath("docker-compose.yaml")
CONFIG_FILE = REPO_ROOT_DIR.joinpath("config.yaml")
CONFIG_FILE = PACKAGE_ROOT_DIR.joinpath("config.yaml")
LOG_FILE = Path("/tmp/log.log")
DATA_DIR = PACKAGE_ROOT_DIR.joinpath("data")

View File

@ -4,8 +4,8 @@ from typing import Iterable
from minio import Minio
from mini_queue.utils.config import CONFIG
from mini_queue.utils.storage import StorageHandle
from pyinfra.config import CONFIG
from pyinfra.storage import StorageHandle
def get_minio_client(access_key=None, secret_key=None) -> Minio:

View File

@ -6,9 +6,9 @@ from operator import itemgetter
import pika
from mini_queue.utils.config import CONFIG
from mini_queue.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name, download, unzip
from mini_queue.utils.minio import MinioHandle
from pyinfra.config import CONFIG
from pyinfra.utils.file import dossier_id_and_file_id_to_compressed_storage_pdf_object_name, download, unzip
from pyinfra.minio import MinioHandle
def make_channel(connection) -> pika.adapters.blocking_connection.BlockingChannel:
@ -87,3 +87,9 @@ def read_connection_params():
credentials=credentials,
)
return parameters
def make_connection() -> pika.BlockingConnection:
parameters = read_connection_params()
connection = pika.BlockingConnection(parameters)
return connection

View File

@ -6,8 +6,8 @@ from itertools import repeat
from operator import attrgetter
from typing import Iterable
from mini_queue.utils.file import path_to_compressed_storage_pdf_object_name, provide_directory
from mini_queue.utils.meta import NoAttemptsLeft, max_attempts
from pyinfra.utils.file import path_to_compressed_storage_pdf_object_name, provide_directory
from pyinfra.utils.meta import NoAttemptsLeft, max_attempts
class StorageHandle:

View File

@ -4,3 +4,4 @@ envyaml==1.8.210417
minio==7.1.1
Flask==2.0.2
waitress==2.0.0
tqdm==4.62.3

View File

@ -3,7 +3,7 @@ import os
from tqdm import tqdm
from mini_queue.utils.minio import MinioHandle
from pyinfra.minio import MinioHandle
def parse_args():

View File

@ -1,10 +1,8 @@
import json
import pika
from mini_queue.utils.config import CONFIG
from mini_queue.utils.minio import MinioHandle
from mini_queue.utils.rabbitmq import make_channel, declare_queue
from pyinfra.config import CONFIG
from pyinfra.minio import MinioHandle
from pyinfra.rabbitmq import make_channel, declare_queue, make_connection
def build_message_bodies():
@ -15,17 +13,9 @@ def build_message_bodies():
if __name__ == "__main__":
credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password)
parameters = pika.ConnectionParameters(
host=CONFIG.rabbitmq.host,
port=CONFIG.rabbitmq.port,
heartbeat=CONFIG.rabbitmq.heartbeat,
credentials=credentials,
)
connection = pika.BlockingConnection(parameters)
connection = make_connection()
channel = make_channel(connection)
declare_queue(channel, CONFIG.rabbitmq.queues.input)
declare_queue(channel, CONFIG.rabbitmq.queues.output)

View File

@ -3,11 +3,11 @@
from distutils.core import setup
setup(
name="mini_queue",
name="pyinfra",
version="0.0.1",
description="",
author="",
author_email="",
url="",
packages=["mini_queue"],
packages=["pyinfra"],
)

View File

@ -1,12 +1,13 @@
import json
import logging
from multiprocessing import Process
import pika
from flask import Flask, jsonify
from waitress import serve
from mini_queue.utils.config import CONFIG
from mini_queue.utils.rabbitmq import make_channel, declare_queue, make_callback, read_connection_params
from pyinfra.config import CONFIG
from pyinfra.consume import consume, ConsumerError
from pyinfra.callback import make_callback
# TODO: implement meaningful checks
@ -34,40 +35,29 @@ def start_integrity_checks_webserver(mode="debug"):
def main():
logging.info("Starting mini-queue...")
def processor(body):
print(f"[OUT]: {json.loads(body)}")
return body
parameters = read_connection_params()
connection = pika.BlockingConnection(parameters)
channel = make_channel(connection)
declare_queue(channel, CONFIG.rabbitmq.queues.input)
callback = make_callback(body_processor=processor, output_queue_name=CONFIG.rabbitmq.queues.output)
logging.info("Starting webserver...")
webserver = Process(target=start_integrity_checks_webserver, args=("debug", ))
webserver.start()
p = Process(target=start_integrity_checks_webserver, args=("debug", ))
p.start()
try:
consume(CONFIG.rabbitmq.queues.input, callback)
except KeyboardInterrupt:
pass
except ConsumerError:
webserver.terminate()
raise
while True:
try:
channel.basic_consume(
queue=CONFIG.rabbitmq.queues.input, auto_ack=False, on_message_callback=make_callback("dummy")
)
logging.info(" [*] Waiting for messages. To exit press CTRL+C")
channel.start_consuming()
except pika.exceptions.ConnectionClosedByBroker as err:
logging.info(f"Caught a channel error: {err}, retrying...")
continue
except pika.exceptions.AMQPChannelError as err:
logging.critical(f"Caught a channel error: {err}, stopping...")
break
except pika.exceptions.AMQPConnectionError:
logging.info("Connection was closed, retrying...")
continue
p.join()
webserver.join()
if __name__ == "__main__":
logging_level = CONFIG.service.logging_level
logging.basicConfig(level=logging_level)
logging.getLogger("pika").setLevel(logging.ERROR)
main()