Pull request #56: RED-6118 multi tenancy

Merge in RR/pyinfra from RED-6118-multi-tenancy to master

Squashed commit of the following:

commit 0a1301f9d7a12a1097e6bf9a1bb0a94025312d0a
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Thu Feb 16 09:12:54 2023 +0100

    delete (for now) not needed exception module

commit 9b624f9c95c129bf186eaea8405a14d359ccb1ae
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Thu Feb 16 09:08:57 2023 +0100

    implement message properties forwarding

    - revert tenant validation logic since this functionality is not wanted
    - implement request message properties forwarding to response message.
    Thus, all message headers including x-tenant-id are present in the
    reponse.

commit ddac812d32eeec09d9434c32595875eb354767f8
Merge: ed4b495 6828c65
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Wed Feb 15 17:00:54 2023 +0100

    Merge branch 'master' of ssh://git.iqser.com:2222/rr/pyinfra into RED-6118-multi-tenancy

commit ed4b4956c6cb6d201fc29b0318078dfb8fa99006
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Wed Feb 15 10:00:28 2023 +0100

    refactor

commit 970fd72aa73ace97d36f129031fb143209c5076b
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Tue Feb 14 17:22:54 2023 +0100

    RED-6118 make pyinfra multi-tenant ready

    - refactor message validation logic
    - add tenant validation step:
    	- messages without header/tenant id are accepted for now, until
    	  multi-tenancy is implemented in backend
    	- only valid tenant is 'redaction'

commit 0f04e799620e01b3346eeaf86f3e941830824202
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Tue Feb 14 15:42:28 2023 +0100

    add dev scripts

    - add scripts to ease pyinfra development by allowing to run pyinfra
    locally with callback mock and publishing script.
This commit is contained in:
Julius Unverfehrt 2023-02-16 09:44:43 +01:00
parent 6828c65396
commit c97ae3d2c2
5 changed files with 208 additions and 2 deletions

View File

@ -15,7 +15,7 @@ from pyinfra.config import Config
CONFIG = Config()
pika_logger = logging.getLogger("pika")
pika_logger.setLevel(CONFIG.logging_level_root)
pika_logger.setLevel(logging.WARNING) # disables non-informative pika log clutter
def get_connection_params(config: Config) -> pika.ConnectionParameters:
@ -163,6 +163,7 @@ class QueueManager:
def callback(_channel, frame, properties, body):
self.logger.info("Received message from queue with delivery_tag %s", frame.delivery_tag)
self.logger.debug("Message headers: %s.", properties.headers)
# Only try to process each message once.
# Requeueing will be handled by the dead-letter-exchange.
@ -184,7 +185,9 @@ class QueueManager:
self.logger.info(
"Processed message with delivery_tag %s, publishing result to result-queue", frame.delivery_tag
)
self._channel.basic_publish("", self._output_queue, json.dumps(callback_result).encode())
self._channel.basic_publish(
"", self._output_queue, json.dumps(callback_result).encode(), properties
)
self.logger.info(
"Result published, acknowledging incoming message with delivery_tag %s", frame.delivery_tag

View File

@ -0,0 +1,31 @@
version: '2'
services:
minio:
image: minio/minio:RELEASE.2022-06-11T19-55-32Z
ports:
- "9000:9000"
environment:
- MINIO_ROOT_PASSWORD=password
- MINIO_ROOT_USER=root
volumes:
- ./data/minio_store:/data
command: server /data
network_mode: "bridge"
rabbitmq:
image: docker.io/bitnami/rabbitmq:3.9.8
ports:
- '4369:4369'
- '5551:5551'
- '5552:5552'
- '5672:5672'
- '25672:25672'
- '15672:15672'
environment:
- RABBITMQ_SECURE_PASSWORD=yes
- RABBITMQ_VM_MEMORY_HIGH_WATERMARK=100%
- RABBITMQ_DISK_FREE_ABSOLUTE_LIMIT=20Gi
network_mode: "bridge"
volumes:
- /opt/bitnami/rabbitmq/.rabbitmq/:/data/bitnami
volumes:
mdata:

View File

@ -0,0 +1,100 @@
import gzip
import json
import logging
from operator import itemgetter
import pika
from pyinfra.config import get_config
from pyinfra.storage.adapters.s3 import get_s3_storage
CONFIG = get_config()
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def read_connection_params():
credentials = pika.PlainCredentials(CONFIG.rabbitmq_username, CONFIG.rabbitmq_password)
parameters = pika.ConnectionParameters(
host=CONFIG.rabbitmq_host,
port=CONFIG.rabbitmq_port,
heartbeat=int(CONFIG.rabbitmq_heartbeat),
credentials=credentials,
)
return parameters
def make_channel(connection) -> pika.adapters.blocking_connection.BlockingChannel:
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
return channel
def declare_queue(channel, queue: str):
args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": CONFIG.dead_letter_queue}
return channel.queue_declare(queue=queue, auto_delete=False, durable=True, arguments=args)
def make_connection() -> pika.BlockingConnection:
parameters = read_connection_params()
connection = pika.BlockingConnection(parameters)
return connection
def upload_and_make_message_body():
bucket = CONFIG.storage_bucket
dossier_id, file_id, suffix = "dossier", "file", "json.gz"
content = {"key": "value"}
object_name = f"{dossier_id}/{file_id}.{suffix}"
data = gzip.compress(json.dumps(content).encode("utf-8"))
storage = get_s3_storage(CONFIG)
if not storage.has_bucket(bucket):
storage.make_bucket(bucket)
storage.put_object(bucket, object_name, data)
message_body = {
"dossierId": dossier_id,
"fileId": file_id,
"targetFileExtension": suffix,
"responseFileExtension": f"result.{suffix}",
}
return message_body
def main():
connection = make_connection()
channel = make_channel(connection)
declare_queue(channel, CONFIG.request_queue)
declare_queue(channel, CONFIG.response_queue)
message = upload_and_make_message_body()
message_encoded = json.dumps(message).encode("utf-8")
channel.basic_publish(
"",
CONFIG.request_queue,
# properties=pika.BasicProperties(headers=None),
properties=pika.BasicProperties(headers={"x-tenant-id": "redaction"}),
body=message_encoded,
)
logger.info(f"Put {message} on {CONFIG.request_queue}")
storage = get_s3_storage(CONFIG)
for method_frame, properties, body in channel.consume(queue=CONFIG.response_queue, inactivity_timeout=10):
if not body:
break
response = json.loads(body)
logger.info(f"Received {response}")
logger.info(f"Message headers: {properties.headers}")
channel.basic_ack(method_frame.delivery_tag)
dossier_id, file_id = itemgetter("dossierId", "fileId")(response)
suffix = message["responseFileExtension"]
result = storage.get_object(CONFIG.storage_bucket, f"{dossier_id}/{file_id}.{suffix}")
result = json.loads(gzip.decompress(result))
logger.info(f"Contents of result on storage: {result}")
channel.close()
if __name__ == "__main__":
main()

54
scripts/start_pyinfra.py Normal file
View File

@ -0,0 +1,54 @@
import gzip
import json
import logging
from typing import Callable
from pyinfra.config import get_config
from pyinfra.queue.queue_manager import QueueManager
from pyinfra.storage import get_storage
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def make_callback(processor: Callable, config=get_config()):
bucket = config.storage_bucket
storage = get_storage(config)
def callback(request_message):
dossier_id = request_message["dossierId"]
file_id = request_message["fileId"]
logger.info(f"Processing {dossier_id=} {file_id=} ...")
target_file_name = f"{dossier_id}/{file_id}.{request_message['targetFileExtension']}"
response_file_name = f"{dossier_id}/{file_id}.{request_message['responseFileExtension']}"
if not storage.exists(bucket, target_file_name):
logger.warning(f"{target_file_name=} not present in {bucket=}, cancelling processing...")
return None
object_bytes = storage.get_object(bucket, target_file_name)
object_bytes = gzip.decompress(object_bytes)
result_body = list(processor(object_bytes))
result = {**request_message, "data": result_body}
storage_bytes = gzip.compress(json.dumps(result).encode("utf-8"))
storage.put_object(bucket, response_file_name, storage_bytes)
return {"dossierId": dossier_id, "fileId": file_id}
return callback
def process(body):
return [{"response_key": "response_value"}]
def main():
logger.info("Start consuming...")
queue_manager = QueueManager(get_config())
queue_manager.start_consuming(make_callback(process))
if __name__ == "__main__":
main()

View File

@ -0,0 +1,18 @@
# Scripts Usage
## Run pyinfra locally
**Shell 1**: Start minio and rabbitmq containers
```bash
$ cd scripts && docker-compose up
```
**Shell 2**: Start pyinfra with callback mock
```bash
$ python scripts/start_pyinfra.py
```
**Shell 3**: Upload dummy content on storage and publish message
```bash
$ python scripts/mock_process_request.py
```