diff --git a/pyinfra/examples.py b/pyinfra/examples.py index 8856c5b..9f8a227 100644 --- a/pyinfra/examples.py +++ b/pyinfra/examples.py @@ -1,12 +1,9 @@ from dynaconf import Dynaconf from fastapi import FastAPI from kn_utils.logging import logger -import multiprocessing from threading import Thread from pyinfra.config.loader import get_pyinfra_validators, validate_settings from pyinfra.queue.callback import Callback -# from pyinfra.queue.manager import QueueManager -from pyinfra.queue.sequential_tenants import QueueManager from pyinfra.queue.threaded_tenants import ServiceQueueManager, TenantQueueManager from pyinfra.utils.opentelemetry import instrument_pika, setup_trace, instrument_app from pyinfra.webserver.prometheus import ( @@ -47,22 +44,20 @@ def start_standard_queue_consumer( if settings.tracing.enabled: setup_trace(settings) - + instrument_pika() instrument_app(app) - - # app = add_health_check_endpoint(app, queue_manager.is_ready) + app = add_health_check_endpoint(app, service_manager.is_ready) webserver_thread = create_webserver_thread_from_settings(app, settings) webserver_thread.start() - # queue_manager.start_consuming(callback) - # queue_manager.start_sequential_consume(callback) - # p1 = multiprocessing.Process(target=tenant_manager.start_consuming, daemon=True) - # p2 = multiprocessing.Process(target=service_manager.start_sequential_consume, kwargs={"callback":callback}, daemon=True) - thread = Thread(target=tenant_manager.start_consuming, daemon=True) - thread.start() - # p1.start() - # p2.start() - service_manager.start_sequential_consume(callback) \ No newline at end of file + thread_t = Thread(target=tenant_manager.start_consuming, daemon=True) + thread_s = Thread(target=service_manager.start_sequential_basic_get, args=(callback,), daemon=True) + + thread_t.start() + thread_s.start() + + thread_t.join() + thread_s.join() diff --git a/pyinfra/queue/threaded_tenants.py b/pyinfra/queue/threaded_tenants.py index 6ec9327..24fe9c3 100644 --- a/pyinfra/queue/threaded_tenants.py +++ b/pyinfra/queue/threaded_tenants.py @@ -8,6 +8,7 @@ import signal import sys import requests import time +import threading import pika.exceptions from dynaconf import Dynaconf from typing import Callable, Union @@ -19,6 +20,7 @@ from retry import retry from pyinfra.config.loader import validate_settings from pyinfra.config.validators import queue_manager_validators +logger.set_level("DEBUG") pika_logger = logging.getLogger("pika") pika_logger.setLevel(logging.WARNING) # disables non-informative pika log clutter @@ -26,7 +28,10 @@ MessageProcessor = Callable[[dict], dict] class BaseQueueManager: - tenant_exchange = queue.Queue() + tenant_exchange_queue = queue.Queue() + _connection = None + _lock = threading.Lock() + should_stop = threading.Event() def __init__(self, settings: Dynaconf): validate_settings(settings, queue_manager_validators) @@ -43,7 +48,7 @@ class BaseQueueManager: signal.signal(signal.SIGINT, self._handle_stop_signal) @staticmethod - def create_connection_parameters(settings: Dynaconf): + def create_connection_parameters(settings: Dynaconf) -> pika.ConnectionParameters: credentials = pika.PlainCredentials(username=settings.rabbitmq.username, password=settings.rabbitmq.password) pika_connection_params = { "host": settings.rabbitmq.host, @@ -53,42 +58,46 @@ class BaseQueueManager: } return pika.ConnectionParameters(**pika_connection_params) + def get_connection(self) -> BlockingConnection: + with self._lock: + if not self._connection or self._connection.is_closed: + self._connection = pika.BlockingConnection(self.connection_parameters) + return self._connection + @retry(tries=3, delay=5, jitter=(1, 3), logger=logger) - def establish_connection(self): - if self.connection and self.connection.is_open: - logger.debug("Connection to RabbitMQ already established.") - return + def establish_connection(self) -> None: + logger.info(f"Establishing connection to RabbitMQ for {self.__class__.__name__}...") + self.connection = self.get_connection() + if not self.channel or self.channel.is_closed: + logger.debug("Opening channel...") + self.channel = self.connection.channel() + self.channel.basic_qos(prefetch_count=1) + self.initialize_queues() + logger.info(f"Connection to RabbitMQ established for {self.__class__.__name__}, channel open.") - logger.info("Establishing connection to RabbitMQ...") - logger.info(self.__class__.__name__) - self.connection = pika.BlockingConnection(parameters=self.connection_parameters) - - logger.debug("Opening channel...") - self.channel = self.connection.channel() - self.channel.basic_qos(prefetch_count=1) - self.initialize_queues() - - logger.info("Connection to RabbitMQ established, channel open.") - - def is_ready(self): + def is_ready(self) -> bool: self.establish_connection() return self.channel.is_open - def initialize_queues(self): + def initialize_queues(self) -> None: raise NotImplementedError("Subclasses should implement this method") - def stop_consuming(self): - if self.channel and self.channel.is_open: - logger.info("Stopping consuming...") - self.channel.stop_consuming() - logger.info("Closing channel...") - self.channel.close() + def stop_consuming(self) -> None: + if not self.should_stop.is_set(): + self.should_stop.set() + if self.channel and self.channel.is_open: + try: + self.channel.stop_consuming() + self.channel.close() + except Exception as e: + logger.error(f"Error stopping consuming: {e}", exc_info=True) + if self.connection and self.connection.is_open: + try: + self.connection.close() + except Exception as e: + logger.error(f"Error closing connection: {e}", exc_info=True) - if self.connection and self.connection.is_open: - logger.info("Closing connection to RabbitMQ...") - self.connection.close() - - def _handle_stop_signal(self, signum, *args, **kwargs): + def _handle_stop_signal(self, signum, *args, **kwargs) -> None: logger.info(f"Received signal {signum}, stopping consuming...") self.stop_consuming() sys.exit(0) @@ -102,7 +111,7 @@ class TenantQueueManager(BaseQueueManager): self.tenant_deleted_queue_name = self.get_tenant_deleted_queue_name(settings) self.tenant_events_dlq_name = self.get_tenant_events_dlq_name(settings) - def initialize_queues(self): + def initialize_queues(self) -> None: self.channel.exchange_declare(exchange=self.tenant_exchange_name, exchange_type="topic") self.channel.queue_declare( @@ -134,7 +143,7 @@ class TenantQueueManager(BaseQueueManager): ) @retry(exceptions=pika.exceptions.AMQPConnectionError, tries=3, delay=5, jitter=(1, 3), logger=logger) - def start_consuming(self): + def start_consuming(self) -> None: try: self.establish_connection() @@ -147,49 +156,49 @@ class TenantQueueManager(BaseQueueManager): finally: self.stop_consuming() - def get_tenant_created_queue_name(self, settings: Dynaconf): + def get_tenant_created_queue_name(self, settings: Dynaconf) -> str: return self.get_queue_name_with_suffix( suffix=settings.rabbitmq.tenant_created_event_queue_suffix, pod_name=settings.kubernetes.pod_name ) - def get_tenant_deleted_queue_name(self, settings: Dynaconf): + def get_tenant_deleted_queue_name(self, settings: Dynaconf) -> str: return self.get_queue_name_with_suffix( suffix=settings.rabbitmq.tenant_deleted_event_queue_suffix, pod_name=settings.kubernetes.pod_name ) - def get_tenant_events_dlq_name(self, settings: Dynaconf): + def get_tenant_events_dlq_name(self, settings: Dynaconf) -> str: return self.get_queue_name_with_suffix( suffix=settings.rabbitmq.tenant_event_dlq_suffix, pod_name=settings.kubernetes.pod_name ) - def get_queue_name_with_suffix(self, suffix: str, pod_name: str): + def get_queue_name_with_suffix(self, suffix: str, pod_name: str) -> str: if not self.use_default_queue_name() and pod_name: return f"{pod_name}{suffix}" return self.get_default_queue_name() - def use_default_queue_name(self): + def use_default_queue_name(self) -> bool: return False def get_default_queue_name(self): raise NotImplementedError("Queue name method not implemented") - def on_tenant_created(self, ch: Channel, method, properties, body): + def on_tenant_created(self, ch: Channel, method, properties, body) -> None: logger.info("Received tenant created event") message = json.loads(body) ch.basic_ack(delivery_tag=method.delivery_tag) tenant_id = message["tenantId"] - self.tenant_exchange.put(("create", tenant_id)) + self.tenant_exchange_queue.put(("create", tenant_id)) - def on_tenant_deleted(self, ch: Channel, method, properties, body): + def on_tenant_deleted(self, ch: Channel, method, properties, body) -> None: logger.info("Received tenant deleted event") message = json.loads(body) ch.basic_ack(delivery_tag=method.delivery_tag) tenant_id = message["tenantId"] - self.tenant_exchange.put(("delete", tenant_id)) + self.tenant_exchange_queue.put(("delete", tenant_id)) - def purge_queues(self): + def purge_queues(self) -> None: self.establish_connection() try: self.channel.queue_purge(self.tenant_created_queue_name) @@ -198,6 +207,40 @@ class TenantQueueManager(BaseQueueManager): except pika.exceptions.ChannelWrongStateError: pass + def publish_message_to_tenant_created_queue( + self, message: Union[str, bytes, dict], properties: pika.BasicProperties = None + ) -> None: + if isinstance(message, str): + message = message.encode("utf-8") + elif isinstance(message, dict): + message = json.dumps(message).encode("utf-8") + + self.establish_connection() + self.channel.basic_publish( + exchange=self.tenant_exchange_name, + routing_key="tenant.created", + properties=properties, + body=message, + ) + logger.info(f"Published message to queue {self.tenant_created_queue_name}.") + + def publish_message_to_tenant_deleted_queue( + self, message: Union[str, bytes, dict], properties: pika.BasicProperties = None + ) -> None: + if isinstance(message, str): + message = message.encode("utf-8") + elif isinstance(message, dict): + message = json.dumps(message).encode("utf-8") + + self.establish_connection() + self.channel.basic_publish( + exchange=self.tenant_exchange_name, + routing_key="tenant.delete", + properties=properties, + body=message, + ) + logger.info(f"Published message to queue {self.tenant_deleted_queue_name}.") + class ServiceQueueManager(BaseQueueManager): def __init__(self, settings: Dynaconf): @@ -205,21 +248,22 @@ class ServiceQueueManager(BaseQueueManager): self.service_request_exchange_name = settings.rabbitmq.service_request_exchange_name self.service_response_exchange_name = settings.rabbitmq.service_response_exchange_name - self.service_queue_prefix = settings.rabbitmq.service_request_queue_prefix + + self.service_request_queue_prefix = settings.rabbitmq.service_request_queue_prefix + self.service_response_queue_prefix = settings.rabbitmq.service_response_queue_prefix + self.service_dlq_name = settings.rabbitmq.service_dlq_name self.tenant_ids = self.get_initial_tenant_ids(tenant_endpoint_url=settings.storage.tenant_server.endpoint) - self._consuming = False - - def initialize_queues(self): + def initialize_queues(self) -> None: self.channel.exchange_declare(exchange=self.service_request_exchange_name, exchange_type="direct") self.channel.exchange_declare(exchange=self.service_response_exchange_name, exchange_type="direct") for tenant_id in self.tenant_ids: - queue_name = self.service_queue_prefix + "_" + tenant_id + response_queue_name = f"{self.service_request_queue_prefix}_{tenant_id}" self.channel.queue_declare( - queue=queue_name, + queue=response_queue_name, durable=True, arguments={ "x-dead-letter-exchange": "", @@ -229,68 +273,74 @@ class ServiceQueueManager(BaseQueueManager): }, ) self.channel.queue_bind( - queue=queue_name, exchange=self.service_request_exchange_name, routing_key=tenant_id + queue=response_queue_name, exchange=self.service_request_exchange_name, routing_key=tenant_id ) - @retry(tries=3, delay=5, jitter=(1, 3), logger=logger, exceptions=requests.exceptions.HTTPError) + response_queue_name = f"{self.service_response_queue_prefix}_{tenant_id}" + self.channel.queue_declare( + queue=response_queue_name, + durable=True, + arguments={ + "x-dead-letter-exchange": "", + "x-dead-letter-routing-key": self.service_dlq_name, + "x-expires": self.queue_expiration_time, # TODO: check if necessary + }, + ) + self.channel.queue_bind(queue=response_queue_name, exchange=self.service_response_exchange_name, routing_key=tenant_id) + + @retry(tries=3, delay=5, jitter=(1, 3), logger=logger, exceptions=(requests.exceptions.HTTPError, requests.exceptions.ConnectionError)) def get_initial_tenant_ids(self, tenant_endpoint_url: str) -> list: - try: - response = requests.get(tenant_endpoint_url, timeout=10) - response.raise_for_status() # Raise an HTTPError for bad responses + response = requests.get(tenant_endpoint_url, timeout=10) + response.raise_for_status() # Raise an HTTPError for bad responses - if response.headers["content-type"].lower() == "application/json": - tenants = [tenant["tenantId"] for tenant in response.json()] - else: - logger.warning("Response is not in JSON format.") - except Exception as e: - logger.warning("An unexpected error occurred:", e) - - return tenants + if response.headers["content-type"].lower() == "application/json": + tenants = [tenant["tenantId"] for tenant in response.json()] + return tenants + return [] @retry(exceptions=pika.exceptions.AMQPConnectionError, tries=3, delay=5, jitter=(1, 3), logger=logger) - def start_sequential_consume(self, message_processor: Callable): + def start_sequential_basic_get(self, message_processor: Callable) -> None: self.establish_connection() - self._consuming = True - try: - while self._consuming: + while not self.should_stop.is_set(): for tenant_id in self.tenant_ids: - queue_name = self.service_queue_prefix + "_" + tenant_id + queue_name = f"{self.service_request_queue_prefix}_{tenant_id}" method_frame, properties, body = self.channel.basic_get(queue_name) if method_frame: + logger.debug("PROCESSING MESSAGE") on_message_callback = self._make_on_message_callback(message_processor, tenant_id) on_message_callback(self.channel, method_frame, properties, body) else: - logger.debug("No message returned") - time.sleep(self.connection_sleep) + logger.debug(f"No message returned for queue {queue_name}") + # time.sleep(self.connection_sleep) + time.sleep(0.1) ### Handle tenant events self.check_tenant_exchange() + except KeyboardInterrupt: logger.info("Exiting...") finally: self.stop_consuming() - def check_tenant_exchange(self): - while True: + def check_tenant_exchange(self) -> None: + while not self.tenant_exchange_queue.empty(): try: - event, tenant = self.tenant_exchange.get(block=False) + event, tenant = self.tenant_exchange_queue.get_nowait() if event == "create": self.on_tenant_created(tenant) elif event == "delete": self.on_tenant_deleted(tenant) - else: - break - except Exception: - logger.debug("No tenant exchange events.") + except queue.Empty: + # time.sleep(self.connection_sleep) break def publish_message_to_input_queue( self, tenant_id: str, message: Union[str, bytes, dict], properties: pika.BasicProperties = None - ): + ) -> None: if isinstance(message, str): message = message.encode("utf-8") elif isinstance(message, dict): @@ -305,20 +355,22 @@ class ServiceQueueManager(BaseQueueManager): ) logger.info(f"Published message to queue {tenant_id}.") - def purge_queues(self): + def purge_queues(self) -> None: self.establish_connection() try: for tenant_id in self.tenant_ids: - queue_name = self.service_queue_prefix + "_" + tenant_id - self.channel.queue_purge(queue_name) + request_queue_name = f"{self.service_request_queue_prefix}_{tenant_id}" + response_queue_name = f"{self.service_response_queue_prefix}_{tenant_id}" + self.channel.queue_purge(request_queue_name) + self.channel.queue_purge(response_queue_name) logger.info("Queues purged.") except pika.exceptions.ChannelWrongStateError: pass - def on_tenant_created(self, tenant_id: str): - queue_name = self.service_queue_prefix + "_" + tenant_id + def on_tenant_created(self, tenant_id: str) -> None: + request_queue_name = f"{self.service_request_queue_prefix}_{tenant_id}" self.channel.queue_declare( - queue=queue_name, + queue=request_queue_name, durable=True, arguments={ "x-dead-letter-exchange": "", @@ -326,18 +378,36 @@ class ServiceQueueManager(BaseQueueManager): "x-expires": self.queue_expiration_time, # TODO: check if necessary }, ) - self.channel.queue_bind(queue=queue_name, exchange=self.service_request_exchange_name, routing_key=tenant_id) + self.channel.queue_bind(queue=request_queue_name, exchange=self.service_request_exchange_name, routing_key=tenant_id) + + response_queue_name = f"{self.service_response_queue_prefix}_{tenant_id}" + self.channel.queue_declare( + queue=response_queue_name, + durable=True, + arguments={ + "x-dead-letter-exchange": "", + "x-dead-letter-routing-key": self.service_dlq_name, + "x-expires": self.queue_expiration_time, # TODO: check if necessary + }, + ) + self.channel.queue_bind(queue=response_queue_name, exchange=self.service_response_exchange_name, routing_key=tenant_id) + self.tenant_ids.append(tenant_id) logger.debug(f"Added tenant {tenant_id}.") - def on_tenant_deleted(self, tenant_id: str): - queue_name = self.service_queue_prefix + "_" + tenant_id - self.channel.queue_unbind(queue=queue_name, exchange=self.service_request_exchange_name, routing_key=tenant_id) - self.channel.queue_delete(queue_name) + def on_tenant_deleted(self, tenant_id: str) -> None: + request_queue_name = f"{self.service_request_queue_prefix}_{tenant_id}" + self.channel.queue_unbind(queue=request_queue_name, exchange=self.service_request_exchange_name, routing_key=tenant_id) + self.channel.queue_delete(request_queue_name) + + response_queue_name = f"{self.service_response_queue_prefix}_{tenant_id}" + self.channel.queue_unbind(queue=response_queue_name, exchange=self.service_response_exchange_name, routing_key=tenant_id) + self.channel.queue_delete(response_queue_name) + self.tenant_ids.remove(tenant_id) logger.debug(f"Deleted tenant {tenant_id}.") - def _make_on_message_callback(self, message_processor: MessageProcessor, tenant_id: str): + def _make_on_message_callback(self, message_processor: MessageProcessor, tenant_id: str) -> Callable: def process_message_body_and_await_result(unpacked_message_body): # Processing the message in a separate thread is necessary for the main thread pika client to be able to # process data events (e.g. heartbeats) while the message is being processed. @@ -390,16 +460,4 @@ class ServiceQueueManager(BaseQueueManager): channel.basic_nack(method.delivery_tag, requeue=False) raise - return on_message_callback - - def stop_consuming(self): - self._consuming = False - if self.channel and self.channel.is_open: - logger.info("Stopping consuming...") - self.channel.stop_consuming() - logger.info("Closing channel...") - self.channel.close() - - if self.connection and self.connection.is_open: - logger.info("Closing connection to RabbitMQ...") - self.connection.close() \ No newline at end of file + return on_message_callback \ No newline at end of file diff --git a/pyinfra/storage/utils.py b/pyinfra/storage/utils.py index 36b5b81..3e14a2d 100644 --- a/pyinfra/storage/utils.py +++ b/pyinfra/storage/utils.py @@ -19,6 +19,17 @@ class DossierIdFileIdDownloadPayload(BaseModel): return f"{self.dossierId}/{self.fileId}.{self.targetFileExtension}" +class TenantIdDossierIdFileIdDownloadPayload(BaseModel): + tenantId: str + dossierId: str + fileId: str + targetFileExtension: str + + @property + def targetFilePath(self): + return f"{self.tenantId}/{self.dossierId}/{self.fileId}.{self.targetFileExtension}" + + class DossierIdFileIdUploadPayload(BaseModel): dossierId: str fileId: str @@ -27,6 +38,17 @@ class DossierIdFileIdUploadPayload(BaseModel): @property def responseFilePath(self): return f"{self.dossierId}/{self.fileId}.{self.responseFileExtension}" + + +class TenantIdDossierIdFileIdUploadPayload(BaseModel): + tenantId: str + dossierId: str + fileId: str + responseFileExtension: str + + @property + def responseFilePath(self): + return f"{self.tenantId}/{self.dossierId}/{self.fileId}.{self.responseFileExtension}" class TargetResponseFilePathDownloadPayload(BaseModel): @@ -55,7 +77,9 @@ def download_data_as_specified_in_message(storage: Storage, raw_payload: dict) - """ try: - if "dossierId" in raw_payload: + if "tenantId" in raw_payload and "dossierId" in raw_payload: + payload = TenantIdDossierIdFileIdDownloadPayload(**raw_payload) + elif "tenantId" not in raw_payload and "dossierId" in raw_payload: payload = DossierIdFileIdDownloadPayload(**raw_payload) else: payload = TargetResponseFilePathDownloadPayload(**raw_payload) @@ -106,7 +130,9 @@ def upload_data_as_specified_in_message(storage: Storage, raw_payload: dict, dat """ try: - if "dossierId" in raw_payload: + if "tenantId" in raw_payload and "dossierId" in raw_payload: + payload = TenantIdDossierIdFileIdUploadPayload(**raw_payload) + elif "tenantId" not in raw_payload and "dossierId" in raw_payload: payload = DossierIdFileIdUploadPayload(**raw_payload) else: payload = TargetResponseFilePathUploadPayload(**raw_payload) diff --git a/scripts/send_request.py b/scripts/send_request.py index c7d2046..76b640d 100644 --- a/scripts/send_request.py +++ b/scripts/send_request.py @@ -1,25 +1,27 @@ import gzip import json +import time from operator import itemgetter from kn_utils.logging import logger from pyinfra.config.loader import load_settings, local_pyinfra_root_path # from pyinfra.queue.manager import QueueManager -from pyinfra.queue.sequential_tenants import QueueManager +# from pyinfra.queue.sequential_tenants import QueueManager +from pyinfra.queue.threaded_tenants import ServiceQueueManager, TenantQueueManager from pyinfra.storage.storages.s3 import get_s3_storage_from_settings settings = load_settings(local_pyinfra_root_path / "config/") -def upload_json_and_make_message_body(): +def upload_json_and_make_message_body(tenant_id: str): dossier_id, file_id, suffix = "dossier", "file", "json.gz" content = { "numberOfPages": 7, "sectionTexts": "data", } - object_name = f"{dossier_id}/{file_id}.{suffix}" + object_name = f"{tenant_id}/{dossier_id}/{file_id}.{suffix}" data = gzip.compress(json.dumps(content).encode("utf-8")) storage = get_s3_storage_from_settings(settings) @@ -28,6 +30,7 @@ def upload_json_and_make_message_body(): storage.put_object(object_name, data) message_body = { + "tenantId": tenant_id, "dossierId": dossier_id, "fileId": file_id, "targetFileExtension": suffix, @@ -36,18 +39,38 @@ def upload_json_and_make_message_body(): return message_body -def main(): - queue_manager = QueueManager(settings) +def tenant_event_message(tenant_id: str): + return {"tenantId": tenant_id} + + +def send_tenant_event(tenant_id: str, event_type: str): + queue_manager = TenantQueueManager(settings) + queue_manager.purge_queues() + message = tenant_event_message(tenant_id) + if event_type == "create": + queue_manager.publish_message_to_tenant_created_queue(message=message) + elif event_type == "delete": + queue_manager.publish_message_to_tenant_deleted_queue(message=message) + else: + logger.warning(f"Event type '{event_type}' not known.") + queue_manager.stop_consuming() + + +def send_service_request(tenant_id: str): + queue_manager = ServiceQueueManager(settings) + queue_name = f"{settings.rabbitmq.service_response_queue_prefix}_{tenant_id}" + queue_manager.purge_queues() - message = upload_json_and_make_message_body() + message = upload_json_and_make_message_body(tenant_id) - queue_manager.publish_message_to_input_queue(tenant_id="redaction", message=message) - logger.info(f"Put {message} on {settings.rabbitmq.input_queue}.") + queue_manager.publish_message_to_input_queue(tenant_id=tenant_id, message=message) + logger.info(f"Put {message} on {queue_name}.") storage = get_s3_storage_from_settings(settings) + for method_frame, properties, body in queue_manager.channel.consume( - queue=settings.rabbitmq.output_queue, inactivity_timeout=15 + queue=queue_name, inactivity_timeout=15 ): if not body: break @@ -55,14 +78,34 @@ def main(): logger.info(f"Received {response}") logger.info(f"Message headers: {properties.headers}") queue_manager.channel.basic_ack(method_frame.delivery_tag) - dossier_id, file_id = itemgetter("dossierId", "fileId")(response) + tenant_id, dossier_id, file_id = itemgetter("tenantId", "dossierId", "fileId")(response) suffix = message["responseFileExtension"] - print(f"{dossier_id}/{file_id}.{suffix}") - result = storage.get_object(f"{dossier_id}/{file_id}.{suffix}") + print(f"{tenant_id}/{dossier_id}/{file_id}.{suffix}") + result = storage.get_object(f"{tenant_id}/{dossier_id}/{file_id}.{suffix}") result = json.loads(gzip.decompress(result)) logger.info(f"Contents of result on storage: {result}") + break queue_manager.stop_consuming() if __name__ == "__main__": - main() + tenant_ids = ["a", "b", "c", "d"] + # with ccf.ThreadPoolExecutor() as executor: + # results = executor.map(main, tenant_ids) + # for tenant in tenant_ids: + # main(tenant) + + send_service_request("redaction") + + # for tenant in tenant_ids: + # send_tenant_event(tenant_id=tenant, event_type="create") + + # # time.sleep(1) + + # for tenant in tenant_ids: + # send_service_request(tenant_id=tenant) + + # # time.sleep(1) + + # for tenant in tenant_ids: + # send_tenant_event(tenant_id=tenant, event_type="delete") \ No newline at end of file diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index c53537c..108a437 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -1,31 +1,41 @@ -version: '2' +version: '3.8' services: minio: - image: minio/minio:RELEASE.2022-06-11T19-55-32Z + image: minio/minio:latest + container_name: minio ports: - "9000:9000" environment: - MINIO_ROOT_PASSWORD=password - MINIO_ROOT_USER=root volumes: - - /tmp/minio_store:/data + - /tmp/data/minio_store:/data command: server /data - network_mode: "bridge" + network_mode: "bridge" + extra_hosts: + - "host.docker.internal:host-gateway" rabbitmq: - image: docker.io/bitnami/rabbitmq:3.9.8 + image: docker.io/bitnami/rabbitmq:latest + container_name: rabbitmq ports: - - '4369:4369' - - '5551:5551' - - '5552:5552' + # - '4369:4369' + # - '5551:5551' + # - '5552:5552' - '5672:5672' - - '25672:25672' - '15672:15672' + # - '25672:25672' environment: - RABBITMQ_SECURE_PASSWORD=yes - RABBITMQ_VM_MEMORY_HIGH_WATERMARK=100% - RABBITMQ_DISK_FREE_ABSOLUTE_LIMIT=20Gi + - RABBITMQ_MANAGEMENT_ALLOW_WEB_ACCESS=true network_mode: "bridge" volumes: - - /opt/bitnami/rabbitmq/.rabbitmq/:/data/bitnami -volumes: - mdata: \ No newline at end of file + - /tmp/bitnami/rabbitmq/.rabbitmq/:/data/bitnami + healthcheck: + test: [ "CMD", "curl", "-f", "http://localhost:15672" ] + interval: 30s + timeout: 10s + retries: 5 + extra_hosts: + - "host.docker.internal:host-gateway"