From c97ae3d2c242dfc88a342955311dd488cb9a5f60 Mon Sep 17 00:00:00 2001 From: Julius Unverfehrt Date: Thu, 16 Feb 2023 09:44:43 +0100 Subject: [PATCH] Pull request #56: RED-6118 multi tenancy Merge in RR/pyinfra from RED-6118-multi-tenancy to master Squashed commit of the following: commit 0a1301f9d7a12a1097e6bf9a1bb0a94025312d0a Author: Julius Unverfehrt Date: Thu Feb 16 09:12:54 2023 +0100 delete (for now) not needed exception module commit 9b624f9c95c129bf186eaea8405a14d359ccb1ae Author: Julius Unverfehrt Date: Thu Feb 16 09:08:57 2023 +0100 implement message properties forwarding - revert tenant validation logic since this functionality is not wanted - implement request message properties forwarding to response message. Thus, all message headers including x-tenant-id are present in the reponse. commit ddac812d32eeec09d9434c32595875eb354767f8 Merge: ed4b495 6828c65 Author: Julius Unverfehrt Date: Wed Feb 15 17:00:54 2023 +0100 Merge branch 'master' of ssh://git.iqser.com:2222/rr/pyinfra into RED-6118-multi-tenancy commit ed4b4956c6cb6d201fc29b0318078dfb8fa99006 Author: Julius Unverfehrt Date: Wed Feb 15 10:00:28 2023 +0100 refactor commit 970fd72aa73ace97d36f129031fb143209c5076b Author: Julius Unverfehrt Date: Tue Feb 14 17:22:54 2023 +0100 RED-6118 make pyinfra multi-tenant ready - refactor message validation logic - add tenant validation step: - messages without header/tenant id are accepted for now, until multi-tenancy is implemented in backend - only valid tenant is 'redaction' commit 0f04e799620e01b3346eeaf86f3e941830824202 Author: Julius Unverfehrt Date: Tue Feb 14 15:42:28 2023 +0100 add dev scripts - add scripts to ease pyinfra development by allowing to run pyinfra locally with callback mock and publishing script. --- pyinfra/queue/queue_manager.py | 7 +- scripts/docker-compose.yml | 31 +++++++++ scripts/mock_process_request.py | 100 +++++++++++++++++++++++++++ scripts/start_pyinfra.py | 54 +++++++++++++++ scripts/usage_pyinfra_dev_scripts.md | 18 +++++ 5 files changed, 208 insertions(+), 2 deletions(-) create mode 100644 scripts/docker-compose.yml create mode 100644 scripts/mock_process_request.py create mode 100644 scripts/start_pyinfra.py create mode 100644 scripts/usage_pyinfra_dev_scripts.md diff --git a/pyinfra/queue/queue_manager.py b/pyinfra/queue/queue_manager.py index c901347..4210728 100644 --- a/pyinfra/queue/queue_manager.py +++ b/pyinfra/queue/queue_manager.py @@ -15,7 +15,7 @@ from pyinfra.config import Config CONFIG = Config() pika_logger = logging.getLogger("pika") -pika_logger.setLevel(CONFIG.logging_level_root) +pika_logger.setLevel(logging.WARNING) # disables non-informative pika log clutter def get_connection_params(config: Config) -> pika.ConnectionParameters: @@ -163,6 +163,7 @@ class QueueManager: def callback(_channel, frame, properties, body): self.logger.info("Received message from queue with delivery_tag %s", frame.delivery_tag) + self.logger.debug("Message headers: %s.", properties.headers) # Only try to process each message once. # Requeueing will be handled by the dead-letter-exchange. @@ -184,7 +185,9 @@ class QueueManager: self.logger.info( "Processed message with delivery_tag %s, publishing result to result-queue", frame.delivery_tag ) - self._channel.basic_publish("", self._output_queue, json.dumps(callback_result).encode()) + self._channel.basic_publish( + "", self._output_queue, json.dumps(callback_result).encode(), properties + ) self.logger.info( "Result published, acknowledging incoming message with delivery_tag %s", frame.delivery_tag diff --git a/scripts/docker-compose.yml b/scripts/docker-compose.yml new file mode 100644 index 0000000..311192d --- /dev/null +++ b/scripts/docker-compose.yml @@ -0,0 +1,31 @@ +version: '2' +services: + minio: + image: minio/minio:RELEASE.2022-06-11T19-55-32Z + ports: + - "9000:9000" + environment: + - MINIO_ROOT_PASSWORD=password + - MINIO_ROOT_USER=root + volumes: + - ./data/minio_store:/data + command: server /data + network_mode: "bridge" + rabbitmq: + image: docker.io/bitnami/rabbitmq:3.9.8 + ports: + - '4369:4369' + - '5551:5551' + - '5552:5552' + - '5672:5672' + - '25672:25672' + - '15672:15672' + environment: + - RABBITMQ_SECURE_PASSWORD=yes + - RABBITMQ_VM_MEMORY_HIGH_WATERMARK=100% + - RABBITMQ_DISK_FREE_ABSOLUTE_LIMIT=20Gi + network_mode: "bridge" + volumes: + - /opt/bitnami/rabbitmq/.rabbitmq/:/data/bitnami +volumes: + mdata: \ No newline at end of file diff --git a/scripts/mock_process_request.py b/scripts/mock_process_request.py new file mode 100644 index 0000000..a4a3f1a --- /dev/null +++ b/scripts/mock_process_request.py @@ -0,0 +1,100 @@ +import gzip +import json +import logging +from operator import itemgetter + +import pika + +from pyinfra.config import get_config +from pyinfra.storage.adapters.s3 import get_s3_storage + +CONFIG = get_config() +logging.basicConfig() +logger = logging.getLogger() +logger.setLevel(logging.INFO) + + +def read_connection_params(): + credentials = pika.PlainCredentials(CONFIG.rabbitmq_username, CONFIG.rabbitmq_password) + parameters = pika.ConnectionParameters( + host=CONFIG.rabbitmq_host, + port=CONFIG.rabbitmq_port, + heartbeat=int(CONFIG.rabbitmq_heartbeat), + credentials=credentials, + ) + return parameters + + +def make_channel(connection) -> pika.adapters.blocking_connection.BlockingChannel: + channel = connection.channel() + channel.basic_qos(prefetch_count=1) + return channel + + +def declare_queue(channel, queue: str): + args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": CONFIG.dead_letter_queue} + return channel.queue_declare(queue=queue, auto_delete=False, durable=True, arguments=args) + + +def make_connection() -> pika.BlockingConnection: + parameters = read_connection_params() + connection = pika.BlockingConnection(parameters) + return connection + + +def upload_and_make_message_body(): + bucket = CONFIG.storage_bucket + dossier_id, file_id, suffix = "dossier", "file", "json.gz" + content = {"key": "value"} + object_name = f"{dossier_id}/{file_id}.{suffix}" + data = gzip.compress(json.dumps(content).encode("utf-8")) + + storage = get_s3_storage(CONFIG) + if not storage.has_bucket(bucket): + storage.make_bucket(bucket) + storage.put_object(bucket, object_name, data) + + message_body = { + "dossierId": dossier_id, + "fileId": file_id, + "targetFileExtension": suffix, + "responseFileExtension": f"result.{suffix}", + } + return message_body + + +def main(): + connection = make_connection() + channel = make_channel(connection) + declare_queue(channel, CONFIG.request_queue) + declare_queue(channel, CONFIG.response_queue) + + message = upload_and_make_message_body() + message_encoded = json.dumps(message).encode("utf-8") + channel.basic_publish( + "", + CONFIG.request_queue, + # properties=pika.BasicProperties(headers=None), + properties=pika.BasicProperties(headers={"x-tenant-id": "redaction"}), + body=message_encoded, + ) + logger.info(f"Put {message} on {CONFIG.request_queue}") + + storage = get_s3_storage(CONFIG) + for method_frame, properties, body in channel.consume(queue=CONFIG.response_queue, inactivity_timeout=10): + if not body: + break + response = json.loads(body) + logger.info(f"Received {response}") + logger.info(f"Message headers: {properties.headers}") + channel.basic_ack(method_frame.delivery_tag) + dossier_id, file_id = itemgetter("dossierId", "fileId")(response) + suffix = message["responseFileExtension"] + result = storage.get_object(CONFIG.storage_bucket, f"{dossier_id}/{file_id}.{suffix}") + result = json.loads(gzip.decompress(result)) + logger.info(f"Contents of result on storage: {result}") + channel.close() + + +if __name__ == "__main__": + main() diff --git a/scripts/start_pyinfra.py b/scripts/start_pyinfra.py new file mode 100644 index 0000000..9c151d2 --- /dev/null +++ b/scripts/start_pyinfra.py @@ -0,0 +1,54 @@ +import gzip +import json +import logging +from typing import Callable + +from pyinfra.config import get_config +from pyinfra.queue.queue_manager import QueueManager +from pyinfra.storage import get_storage + +logging.basicConfig() +logger = logging.getLogger() +logger.setLevel(logging.INFO) + + +def make_callback(processor: Callable, config=get_config()): + bucket = config.storage_bucket + storage = get_storage(config) + + def callback(request_message): + dossier_id = request_message["dossierId"] + file_id = request_message["fileId"] + logger.info(f"Processing {dossier_id=} {file_id=} ...") + target_file_name = f"{dossier_id}/{file_id}.{request_message['targetFileExtension']}" + response_file_name = f"{dossier_id}/{file_id}.{request_message['responseFileExtension']}" + + if not storage.exists(bucket, target_file_name): + logger.warning(f"{target_file_name=} not present in {bucket=}, cancelling processing...") + return None + + object_bytes = storage.get_object(bucket, target_file_name) + object_bytes = gzip.decompress(object_bytes) + result_body = list(processor(object_bytes)) + + result = {**request_message, "data": result_body} + storage_bytes = gzip.compress(json.dumps(result).encode("utf-8")) + storage.put_object(bucket, response_file_name, storage_bytes) + + return {"dossierId": dossier_id, "fileId": file_id} + + return callback + + +def process(body): + return [{"response_key": "response_value"}] + + +def main(): + logger.info("Start consuming...") + queue_manager = QueueManager(get_config()) + queue_manager.start_consuming(make_callback(process)) + + +if __name__ == "__main__": + main() diff --git a/scripts/usage_pyinfra_dev_scripts.md b/scripts/usage_pyinfra_dev_scripts.md new file mode 100644 index 0000000..6d383d5 --- /dev/null +++ b/scripts/usage_pyinfra_dev_scripts.md @@ -0,0 +1,18 @@ +# Scripts Usage + +## Run pyinfra locally + +**Shell 1**: Start minio and rabbitmq containers +```bash +$ cd scripts && docker-compose up +``` + +**Shell 2**: Start pyinfra with callback mock +```bash +$ python scripts/start_pyinfra.py +``` + +**Shell 3**: Upload dummy content on storage and publish message +```bash +$ python scripts/mock_process_request.py +```