feat: rollback testing logic for send_request
This commit is contained in:
parent
3c3580d3bc
commit
9c28498d8a
@ -1,27 +1,24 @@
|
||||
import asyncio
|
||||
import gzip
|
||||
import json
|
||||
import time
|
||||
from operator import itemgetter
|
||||
|
||||
from aio_pika.abc import AbstractIncomingMessage
|
||||
from kn_utils.logging import logger
|
||||
|
||||
from pyinfra.config.loader import load_settings, local_pyinfra_root_path
|
||||
from pyinfra.queue.async_tenants import AsyncQueueManager
|
||||
from pyinfra.storage.storages.s3 import S3Storage, get_s3_storage_from_settings
|
||||
from pyinfra.queue.manager import QueueManager
|
||||
from pyinfra.storage.storages.s3 import get_s3_storage_from_settings
|
||||
|
||||
settings = load_settings(local_pyinfra_root_path / "config/")
|
||||
|
||||
|
||||
def upload_json_and_make_message_body(tenant_id: str):
|
||||
def upload_json_and_make_message_body():
|
||||
dossier_id, file_id, suffix = "dossier", "file", "json.gz"
|
||||
content = {
|
||||
"numberOfPages": 7,
|
||||
"sectionTexts": "data",
|
||||
}
|
||||
|
||||
object_name = f"{tenant_id}/{dossier_id}/{file_id}.{suffix}"
|
||||
object_name = f"{dossier_id}/{file_id}.{suffix}"
|
||||
data = gzip.compress(json.dumps(content).encode("utf-8"))
|
||||
|
||||
storage = get_s3_storage_from_settings(settings)
|
||||
@ -30,7 +27,6 @@ def upload_json_and_make_message_body(tenant_id: str):
|
||||
storage.put_object(object_name, data)
|
||||
|
||||
message_body = {
|
||||
"tenantId": tenant_id,
|
||||
"dossierId": dossier_id,
|
||||
"fileId": file_id,
|
||||
"targetFileExtension": suffix,
|
||||
@ -39,67 +35,33 @@ def upload_json_and_make_message_body(tenant_id: str):
|
||||
return message_body
|
||||
|
||||
|
||||
def tenant_event_message(tenant_id: str):
|
||||
return {"tenantId": tenant_id}
|
||||
def main():
|
||||
queue_manager = QueueManager(settings)
|
||||
queue_manager.purge_queues()
|
||||
|
||||
message = upload_json_and_make_message_body()
|
||||
|
||||
def on_message_callback(storage: S3Storage):
|
||||
async def on_message(message: AbstractIncomingMessage) -> None:
|
||||
async with message.process():
|
||||
if not message.body:
|
||||
raise ValueError
|
||||
response = json.loads(message.body)
|
||||
logger.info(f"Received {response}")
|
||||
logger.info(f"Message headers: {message.properties.headers}")
|
||||
await message.ack()
|
||||
tenant_id, dossier_id, file_id = itemgetter("tenantId", "dossierId", "fileId")(response)
|
||||
suffix = response["responseFileExtension"]
|
||||
result = storage.get_object(f"{tenant_id}/{dossier_id}/{file_id}.{suffix}")
|
||||
result = json.loads(gzip.decompress(result))
|
||||
logger.info(f"Contents of result on storage: {result}")
|
||||
|
||||
return on_message
|
||||
|
||||
|
||||
async def send_tenant_event(queue_manager: AsyncQueueManager, tenant_id: str, event_type: str):
|
||||
await queue_manager.purge_queues()
|
||||
|
||||
message = tenant_event_message(tenant_id)
|
||||
if event_type == "create":
|
||||
await queue_manager.publish_message_to_tenant_created_queue(message=message)
|
||||
elif event_type == "delete":
|
||||
await queue_manager.publish_message_to_tenant_deleted_queue(message=message)
|
||||
else:
|
||||
logger.warning(f"Event type '{event_type}' not known.")
|
||||
await queue_manager.stop_consumers()
|
||||
|
||||
|
||||
async def send_service_request(queue_manager: AsyncQueueManager, tenant_id: str):
|
||||
request_queue_name = f"{settings.rabbitmq.service_request_queue_prefix}_{tenant_id}"
|
||||
|
||||
await queue_manager.purge_queues()
|
||||
|
||||
message = upload_json_and_make_message_body(tenant_id)
|
||||
|
||||
await queue_manager.publish_message_to_input_queue(tenant_id=tenant_id, message=message)
|
||||
logger.info(f"Put {message} on {request_queue_name}.")
|
||||
queue_manager.publish_message_to_input_queue(message)
|
||||
logger.info(f"Put {message} on {settings.rabbitmq.input_queue}.")
|
||||
|
||||
storage = get_s3_storage_from_settings(settings)
|
||||
|
||||
response_queue_name = f"{settings.rabbitmq.service_response_queue_prefix}_{tenant_id}"
|
||||
service_response_queue = await queue_manager.channel.get_queue(name=response_queue_name)
|
||||
|
||||
time.sleep(10)
|
||||
|
||||
callback = on_message_callback(storage)
|
||||
await service_response_queue.consume(callback=callback)
|
||||
await queue_manager.stop_consumers()
|
||||
for method_frame, properties, body in queue_manager.channel.consume(
|
||||
queue=settings.rabbitmq.output_queue, inactivity_timeout=15
|
||||
):
|
||||
if not body:
|
||||
break
|
||||
response = json.loads(body)
|
||||
logger.info(f"Received {response}")
|
||||
logger.info(f"Message headers: {properties.headers}")
|
||||
queue_manager.channel.basic_ack(method_frame.delivery_tag)
|
||||
dossier_id, file_id = itemgetter("dossierId", "fileId")(response)
|
||||
suffix = message["responseFileExtension"]
|
||||
print(f"{dossier_id}/{file_id}.{suffix}")
|
||||
result = storage.get_object(f"{dossier_id}/{file_id}.{suffix}")
|
||||
result = json.loads(gzip.decompress(result))
|
||||
logger.info(f"Contents of result on storage: {result}")
|
||||
queue_manager.stop_consuming()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# tenant_ids = ["a", "b", "c", "d"]
|
||||
|
||||
# asyncio.run(send_tenant_event(AsyncQueueManager(settings), "test_1", "create"))
|
||||
|
||||
asyncio.run(send_service_request(AsyncQueueManager(settings), "redaction"))
|
||||
# asyncio.run(consume_service_request(AsyncQueueManager(settings),"redaction"))
|
||||
main()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user