From 9c28498d8ae09c56e6d04f3e9cdcb99a24b3140f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonathan=20K=C3=B6ssler?= Date: Fri, 12 Jul 2024 15:12:46 +0200 Subject: [PATCH] feat: rollback testing logic for send_request --- scripts/send_request.py | 92 ++++++++++++----------------------------- 1 file changed, 27 insertions(+), 65 deletions(-) diff --git a/scripts/send_request.py b/scripts/send_request.py index 9fe3f08..d1f1fda 100644 --- a/scripts/send_request.py +++ b/scripts/send_request.py @@ -1,27 +1,24 @@ -import asyncio import gzip import json -import time from operator import itemgetter -from aio_pika.abc import AbstractIncomingMessage from kn_utils.logging import logger from pyinfra.config.loader import load_settings, local_pyinfra_root_path -from pyinfra.queue.async_tenants import AsyncQueueManager -from pyinfra.storage.storages.s3 import S3Storage, get_s3_storage_from_settings +from pyinfra.queue.manager import QueueManager +from pyinfra.storage.storages.s3 import get_s3_storage_from_settings settings = load_settings(local_pyinfra_root_path / "config/") -def upload_json_and_make_message_body(tenant_id: str): +def upload_json_and_make_message_body(): dossier_id, file_id, suffix = "dossier", "file", "json.gz" content = { "numberOfPages": 7, "sectionTexts": "data", } - object_name = f"{tenant_id}/{dossier_id}/{file_id}.{suffix}" + object_name = f"{dossier_id}/{file_id}.{suffix}" data = gzip.compress(json.dumps(content).encode("utf-8")) storage = get_s3_storage_from_settings(settings) @@ -30,7 +27,6 @@ def upload_json_and_make_message_body(tenant_id: str): storage.put_object(object_name, data) message_body = { - "tenantId": tenant_id, "dossierId": dossier_id, "fileId": file_id, "targetFileExtension": suffix, @@ -39,67 +35,33 @@ def upload_json_and_make_message_body(tenant_id: str): return message_body -def tenant_event_message(tenant_id: str): - return {"tenantId": tenant_id} +def main(): + queue_manager = QueueManager(settings) + queue_manager.purge_queues() + message = upload_json_and_make_message_body() -def on_message_callback(storage: S3Storage): - async def on_message(message: AbstractIncomingMessage) -> None: - async with message.process(): - if not message.body: - raise ValueError - response = json.loads(message.body) - logger.info(f"Received {response}") - logger.info(f"Message headers: {message.properties.headers}") - await message.ack() - tenant_id, dossier_id, file_id = itemgetter("tenantId", "dossierId", "fileId")(response) - suffix = response["responseFileExtension"] - result = storage.get_object(f"{tenant_id}/{dossier_id}/{file_id}.{suffix}") - result = json.loads(gzip.decompress(result)) - logger.info(f"Contents of result on storage: {result}") - - return on_message - - -async def send_tenant_event(queue_manager: AsyncQueueManager, tenant_id: str, event_type: str): - await queue_manager.purge_queues() - - message = tenant_event_message(tenant_id) - if event_type == "create": - await queue_manager.publish_message_to_tenant_created_queue(message=message) - elif event_type == "delete": - await queue_manager.publish_message_to_tenant_deleted_queue(message=message) - else: - logger.warning(f"Event type '{event_type}' not known.") - await queue_manager.stop_consumers() - - -async def send_service_request(queue_manager: AsyncQueueManager, tenant_id: str): - request_queue_name = f"{settings.rabbitmq.service_request_queue_prefix}_{tenant_id}" - - await queue_manager.purge_queues() - - message = upload_json_and_make_message_body(tenant_id) - - await queue_manager.publish_message_to_input_queue(tenant_id=tenant_id, message=message) - logger.info(f"Put {message} on {request_queue_name}.") + queue_manager.publish_message_to_input_queue(message) + logger.info(f"Put {message} on {settings.rabbitmq.input_queue}.") storage = get_s3_storage_from_settings(settings) - - response_queue_name = f"{settings.rabbitmq.service_response_queue_prefix}_{tenant_id}" - service_response_queue = await queue_manager.channel.get_queue(name=response_queue_name) - - time.sleep(10) - - callback = on_message_callback(storage) - await service_response_queue.consume(callback=callback) - await queue_manager.stop_consumers() + for method_frame, properties, body in queue_manager.channel.consume( + queue=settings.rabbitmq.output_queue, inactivity_timeout=15 + ): + if not body: + break + response = json.loads(body) + logger.info(f"Received {response}") + logger.info(f"Message headers: {properties.headers}") + queue_manager.channel.basic_ack(method_frame.delivery_tag) + dossier_id, file_id = itemgetter("dossierId", "fileId")(response) + suffix = message["responseFileExtension"] + print(f"{dossier_id}/{file_id}.{suffix}") + result = storage.get_object(f"{dossier_id}/{file_id}.{suffix}") + result = json.loads(gzip.decompress(result)) + logger.info(f"Contents of result on storage: {result}") + queue_manager.stop_consuming() if __name__ == "__main__": - # tenant_ids = ["a", "b", "c", "d"] - - # asyncio.run(send_tenant_event(AsyncQueueManager(settings), "test_1", "create")) - - asyncio.run(send_service_request(AsyncQueueManager(settings), "redaction")) - # asyncio.run(consume_service_request(AsyncQueueManager(settings),"redaction")) + main()