feat: wip for multiple tenants

This commit is contained in:
Jonathan Kössler 2024-07-05 13:27:16 +02:00
parent c81d967aee
commit de41030e69
5 changed files with 275 additions and 143 deletions

View File

@ -1,12 +1,9 @@
from dynaconf import Dynaconf
from fastapi import FastAPI
from kn_utils.logging import logger
import multiprocessing
from threading import Thread
from pyinfra.config.loader import get_pyinfra_validators, validate_settings
from pyinfra.queue.callback import Callback
# from pyinfra.queue.manager import QueueManager
from pyinfra.queue.sequential_tenants import QueueManager
from pyinfra.queue.threaded_tenants import ServiceQueueManager, TenantQueueManager
from pyinfra.utils.opentelemetry import instrument_pika, setup_trace, instrument_app
from pyinfra.webserver.prometheus import (
@ -47,22 +44,20 @@ def start_standard_queue_consumer(
if settings.tracing.enabled:
setup_trace(settings)
instrument_pika()
instrument_app(app)
# app = add_health_check_endpoint(app, queue_manager.is_ready)
app = add_health_check_endpoint(app, service_manager.is_ready)
webserver_thread = create_webserver_thread_from_settings(app, settings)
webserver_thread.start()
# queue_manager.start_consuming(callback)
# queue_manager.start_sequential_consume(callback)
# p1 = multiprocessing.Process(target=tenant_manager.start_consuming, daemon=True)
# p2 = multiprocessing.Process(target=service_manager.start_sequential_consume, kwargs={"callback":callback}, daemon=True)
thread = Thread(target=tenant_manager.start_consuming, daemon=True)
thread.start()
# p1.start()
# p2.start()
service_manager.start_sequential_consume(callback)
thread_t = Thread(target=tenant_manager.start_consuming, daemon=True)
thread_s = Thread(target=service_manager.start_sequential_basic_get, args=(callback,), daemon=True)
thread_t.start()
thread_s.start()
thread_t.join()
thread_s.join()

View File

@ -8,6 +8,7 @@ import signal
import sys
import requests
import time
import threading
import pika.exceptions
from dynaconf import Dynaconf
from typing import Callable, Union
@ -19,6 +20,7 @@ from retry import retry
from pyinfra.config.loader import validate_settings
from pyinfra.config.validators import queue_manager_validators
logger.set_level("DEBUG")
pika_logger = logging.getLogger("pika")
pika_logger.setLevel(logging.WARNING) # disables non-informative pika log clutter
@ -26,7 +28,10 @@ MessageProcessor = Callable[[dict], dict]
class BaseQueueManager:
tenant_exchange = queue.Queue()
tenant_exchange_queue = queue.Queue()
_connection = None
_lock = threading.Lock()
should_stop = threading.Event()
def __init__(self, settings: Dynaconf):
validate_settings(settings, queue_manager_validators)
@ -43,7 +48,7 @@ class BaseQueueManager:
signal.signal(signal.SIGINT, self._handle_stop_signal)
@staticmethod
def create_connection_parameters(settings: Dynaconf):
def create_connection_parameters(settings: Dynaconf) -> pika.ConnectionParameters:
credentials = pika.PlainCredentials(username=settings.rabbitmq.username, password=settings.rabbitmq.password)
pika_connection_params = {
"host": settings.rabbitmq.host,
@ -53,42 +58,46 @@ class BaseQueueManager:
}
return pika.ConnectionParameters(**pika_connection_params)
def get_connection(self) -> BlockingConnection:
with self._lock:
if not self._connection or self._connection.is_closed:
self._connection = pika.BlockingConnection(self.connection_parameters)
return self._connection
@retry(tries=3, delay=5, jitter=(1, 3), logger=logger)
def establish_connection(self):
if self.connection and self.connection.is_open:
logger.debug("Connection to RabbitMQ already established.")
return
def establish_connection(self) -> None:
logger.info(f"Establishing connection to RabbitMQ for {self.__class__.__name__}...")
self.connection = self.get_connection()
if not self.channel or self.channel.is_closed:
logger.debug("Opening channel...")
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_count=1)
self.initialize_queues()
logger.info(f"Connection to RabbitMQ established for {self.__class__.__name__}, channel open.")
logger.info("Establishing connection to RabbitMQ...")
logger.info(self.__class__.__name__)
self.connection = pika.BlockingConnection(parameters=self.connection_parameters)
logger.debug("Opening channel...")
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_count=1)
self.initialize_queues()
logger.info("Connection to RabbitMQ established, channel open.")
def is_ready(self):
def is_ready(self) -> bool:
self.establish_connection()
return self.channel.is_open
def initialize_queues(self):
def initialize_queues(self) -> None:
raise NotImplementedError("Subclasses should implement this method")
def stop_consuming(self):
if self.channel and self.channel.is_open:
logger.info("Stopping consuming...")
self.channel.stop_consuming()
logger.info("Closing channel...")
self.channel.close()
def stop_consuming(self) -> None:
if not self.should_stop.is_set():
self.should_stop.set()
if self.channel and self.channel.is_open:
try:
self.channel.stop_consuming()
self.channel.close()
except Exception as e:
logger.error(f"Error stopping consuming: {e}", exc_info=True)
if self.connection and self.connection.is_open:
try:
self.connection.close()
except Exception as e:
logger.error(f"Error closing connection: {e}", exc_info=True)
if self.connection and self.connection.is_open:
logger.info("Closing connection to RabbitMQ...")
self.connection.close()
def _handle_stop_signal(self, signum, *args, **kwargs):
def _handle_stop_signal(self, signum, *args, **kwargs) -> None:
logger.info(f"Received signal {signum}, stopping consuming...")
self.stop_consuming()
sys.exit(0)
@ -102,7 +111,7 @@ class TenantQueueManager(BaseQueueManager):
self.tenant_deleted_queue_name = self.get_tenant_deleted_queue_name(settings)
self.tenant_events_dlq_name = self.get_tenant_events_dlq_name(settings)
def initialize_queues(self):
def initialize_queues(self) -> None:
self.channel.exchange_declare(exchange=self.tenant_exchange_name, exchange_type="topic")
self.channel.queue_declare(
@ -134,7 +143,7 @@ class TenantQueueManager(BaseQueueManager):
)
@retry(exceptions=pika.exceptions.AMQPConnectionError, tries=3, delay=5, jitter=(1, 3), logger=logger)
def start_consuming(self):
def start_consuming(self) -> None:
try:
self.establish_connection()
@ -147,49 +156,49 @@ class TenantQueueManager(BaseQueueManager):
finally:
self.stop_consuming()
def get_tenant_created_queue_name(self, settings: Dynaconf):
def get_tenant_created_queue_name(self, settings: Dynaconf) -> str:
return self.get_queue_name_with_suffix(
suffix=settings.rabbitmq.tenant_created_event_queue_suffix, pod_name=settings.kubernetes.pod_name
)
def get_tenant_deleted_queue_name(self, settings: Dynaconf):
def get_tenant_deleted_queue_name(self, settings: Dynaconf) -> str:
return self.get_queue_name_with_suffix(
suffix=settings.rabbitmq.tenant_deleted_event_queue_suffix, pod_name=settings.kubernetes.pod_name
)
def get_tenant_events_dlq_name(self, settings: Dynaconf):
def get_tenant_events_dlq_name(self, settings: Dynaconf) -> str:
return self.get_queue_name_with_suffix(
suffix=settings.rabbitmq.tenant_event_dlq_suffix, pod_name=settings.kubernetes.pod_name
)
def get_queue_name_with_suffix(self, suffix: str, pod_name: str):
def get_queue_name_with_suffix(self, suffix: str, pod_name: str) -> str:
if not self.use_default_queue_name() and pod_name:
return f"{pod_name}{suffix}"
return self.get_default_queue_name()
def use_default_queue_name(self):
def use_default_queue_name(self) -> bool:
return False
def get_default_queue_name(self):
raise NotImplementedError("Queue name method not implemented")
def on_tenant_created(self, ch: Channel, method, properties, body):
def on_tenant_created(self, ch: Channel, method, properties, body) -> None:
logger.info("Received tenant created event")
message = json.loads(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
tenant_id = message["tenantId"]
self.tenant_exchange.put(("create", tenant_id))
self.tenant_exchange_queue.put(("create", tenant_id))
def on_tenant_deleted(self, ch: Channel, method, properties, body):
def on_tenant_deleted(self, ch: Channel, method, properties, body) -> None:
logger.info("Received tenant deleted event")
message = json.loads(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
tenant_id = message["tenantId"]
self.tenant_exchange.put(("delete", tenant_id))
self.tenant_exchange_queue.put(("delete", tenant_id))
def purge_queues(self):
def purge_queues(self) -> None:
self.establish_connection()
try:
self.channel.queue_purge(self.tenant_created_queue_name)
@ -198,6 +207,40 @@ class TenantQueueManager(BaseQueueManager):
except pika.exceptions.ChannelWrongStateError:
pass
def publish_message_to_tenant_created_queue(
self, message: Union[str, bytes, dict], properties: pika.BasicProperties = None
) -> None:
if isinstance(message, str):
message = message.encode("utf-8")
elif isinstance(message, dict):
message = json.dumps(message).encode("utf-8")
self.establish_connection()
self.channel.basic_publish(
exchange=self.tenant_exchange_name,
routing_key="tenant.created",
properties=properties,
body=message,
)
logger.info(f"Published message to queue {self.tenant_created_queue_name}.")
def publish_message_to_tenant_deleted_queue(
self, message: Union[str, bytes, dict], properties: pika.BasicProperties = None
) -> None:
if isinstance(message, str):
message = message.encode("utf-8")
elif isinstance(message, dict):
message = json.dumps(message).encode("utf-8")
self.establish_connection()
self.channel.basic_publish(
exchange=self.tenant_exchange_name,
routing_key="tenant.delete",
properties=properties,
body=message,
)
logger.info(f"Published message to queue {self.tenant_deleted_queue_name}.")
class ServiceQueueManager(BaseQueueManager):
def __init__(self, settings: Dynaconf):
@ -205,21 +248,22 @@ class ServiceQueueManager(BaseQueueManager):
self.service_request_exchange_name = settings.rabbitmq.service_request_exchange_name
self.service_response_exchange_name = settings.rabbitmq.service_response_exchange_name
self.service_queue_prefix = settings.rabbitmq.service_request_queue_prefix
self.service_request_queue_prefix = settings.rabbitmq.service_request_queue_prefix
self.service_response_queue_prefix = settings.rabbitmq.service_response_queue_prefix
self.service_dlq_name = settings.rabbitmq.service_dlq_name
self.tenant_ids = self.get_initial_tenant_ids(tenant_endpoint_url=settings.storage.tenant_server.endpoint)
self._consuming = False
def initialize_queues(self):
def initialize_queues(self) -> None:
self.channel.exchange_declare(exchange=self.service_request_exchange_name, exchange_type="direct")
self.channel.exchange_declare(exchange=self.service_response_exchange_name, exchange_type="direct")
for tenant_id in self.tenant_ids:
queue_name = self.service_queue_prefix + "_" + tenant_id
response_queue_name = f"{self.service_request_queue_prefix}_{tenant_id}"
self.channel.queue_declare(
queue=queue_name,
queue=response_queue_name,
durable=True,
arguments={
"x-dead-letter-exchange": "",
@ -229,68 +273,74 @@ class ServiceQueueManager(BaseQueueManager):
},
)
self.channel.queue_bind(
queue=queue_name, exchange=self.service_request_exchange_name, routing_key=tenant_id
queue=response_queue_name, exchange=self.service_request_exchange_name, routing_key=tenant_id
)
@retry(tries=3, delay=5, jitter=(1, 3), logger=logger, exceptions=requests.exceptions.HTTPError)
response_queue_name = f"{self.service_response_queue_prefix}_{tenant_id}"
self.channel.queue_declare(
queue=response_queue_name,
durable=True,
arguments={
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": self.service_dlq_name,
"x-expires": self.queue_expiration_time, # TODO: check if necessary
},
)
self.channel.queue_bind(queue=response_queue_name, exchange=self.service_response_exchange_name, routing_key=tenant_id)
@retry(tries=3, delay=5, jitter=(1, 3), logger=logger, exceptions=(requests.exceptions.HTTPError, requests.exceptions.ConnectionError))
def get_initial_tenant_ids(self, tenant_endpoint_url: str) -> list:
try:
response = requests.get(tenant_endpoint_url, timeout=10)
response.raise_for_status() # Raise an HTTPError for bad responses
response = requests.get(tenant_endpoint_url, timeout=10)
response.raise_for_status() # Raise an HTTPError for bad responses
if response.headers["content-type"].lower() == "application/json":
tenants = [tenant["tenantId"] for tenant in response.json()]
else:
logger.warning("Response is not in JSON format.")
except Exception as e:
logger.warning("An unexpected error occurred:", e)
return tenants
if response.headers["content-type"].lower() == "application/json":
tenants = [tenant["tenantId"] for tenant in response.json()]
return tenants
return []
@retry(exceptions=pika.exceptions.AMQPConnectionError, tries=3, delay=5, jitter=(1, 3), logger=logger)
def start_sequential_consume(self, message_processor: Callable):
def start_sequential_basic_get(self, message_processor: Callable) -> None:
self.establish_connection()
self._consuming = True
try:
while self._consuming:
while not self.should_stop.is_set():
for tenant_id in self.tenant_ids:
queue_name = self.service_queue_prefix + "_" + tenant_id
queue_name = f"{self.service_request_queue_prefix}_{tenant_id}"
method_frame, properties, body = self.channel.basic_get(queue_name)
if method_frame:
logger.debug("PROCESSING MESSAGE")
on_message_callback = self._make_on_message_callback(message_processor, tenant_id)
on_message_callback(self.channel, method_frame, properties, body)
else:
logger.debug("No message returned")
time.sleep(self.connection_sleep)
logger.debug(f"No message returned for queue {queue_name}")
# time.sleep(self.connection_sleep)
time.sleep(0.1)
### Handle tenant events
self.check_tenant_exchange()
except KeyboardInterrupt:
logger.info("Exiting...")
finally:
self.stop_consuming()
def check_tenant_exchange(self):
while True:
def check_tenant_exchange(self) -> None:
while not self.tenant_exchange_queue.empty():
try:
event, tenant = self.tenant_exchange.get(block=False)
event, tenant = self.tenant_exchange_queue.get_nowait()
if event == "create":
self.on_tenant_created(tenant)
elif event == "delete":
self.on_tenant_deleted(tenant)
else:
break
except Exception:
logger.debug("No tenant exchange events.")
except queue.Empty:
# time.sleep(self.connection_sleep)
break
def publish_message_to_input_queue(
self, tenant_id: str, message: Union[str, bytes, dict], properties: pika.BasicProperties = None
):
) -> None:
if isinstance(message, str):
message = message.encode("utf-8")
elif isinstance(message, dict):
@ -305,20 +355,22 @@ class ServiceQueueManager(BaseQueueManager):
)
logger.info(f"Published message to queue {tenant_id}.")
def purge_queues(self):
def purge_queues(self) -> None:
self.establish_connection()
try:
for tenant_id in self.tenant_ids:
queue_name = self.service_queue_prefix + "_" + tenant_id
self.channel.queue_purge(queue_name)
request_queue_name = f"{self.service_request_queue_prefix}_{tenant_id}"
response_queue_name = f"{self.service_response_queue_prefix}_{tenant_id}"
self.channel.queue_purge(request_queue_name)
self.channel.queue_purge(response_queue_name)
logger.info("Queues purged.")
except pika.exceptions.ChannelWrongStateError:
pass
def on_tenant_created(self, tenant_id: str):
queue_name = self.service_queue_prefix + "_" + tenant_id
def on_tenant_created(self, tenant_id: str) -> None:
request_queue_name = f"{self.service_request_queue_prefix}_{tenant_id}"
self.channel.queue_declare(
queue=queue_name,
queue=request_queue_name,
durable=True,
arguments={
"x-dead-letter-exchange": "",
@ -326,18 +378,36 @@ class ServiceQueueManager(BaseQueueManager):
"x-expires": self.queue_expiration_time, # TODO: check if necessary
},
)
self.channel.queue_bind(queue=queue_name, exchange=self.service_request_exchange_name, routing_key=tenant_id)
self.channel.queue_bind(queue=request_queue_name, exchange=self.service_request_exchange_name, routing_key=tenant_id)
response_queue_name = f"{self.service_response_queue_prefix}_{tenant_id}"
self.channel.queue_declare(
queue=response_queue_name,
durable=True,
arguments={
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": self.service_dlq_name,
"x-expires": self.queue_expiration_time, # TODO: check if necessary
},
)
self.channel.queue_bind(queue=response_queue_name, exchange=self.service_response_exchange_name, routing_key=tenant_id)
self.tenant_ids.append(tenant_id)
logger.debug(f"Added tenant {tenant_id}.")
def on_tenant_deleted(self, tenant_id: str):
queue_name = self.service_queue_prefix + "_" + tenant_id
self.channel.queue_unbind(queue=queue_name, exchange=self.service_request_exchange_name, routing_key=tenant_id)
self.channel.queue_delete(queue_name)
def on_tenant_deleted(self, tenant_id: str) -> None:
request_queue_name = f"{self.service_request_queue_prefix}_{tenant_id}"
self.channel.queue_unbind(queue=request_queue_name, exchange=self.service_request_exchange_name, routing_key=tenant_id)
self.channel.queue_delete(request_queue_name)
response_queue_name = f"{self.service_response_queue_prefix}_{tenant_id}"
self.channel.queue_unbind(queue=response_queue_name, exchange=self.service_response_exchange_name, routing_key=tenant_id)
self.channel.queue_delete(response_queue_name)
self.tenant_ids.remove(tenant_id)
logger.debug(f"Deleted tenant {tenant_id}.")
def _make_on_message_callback(self, message_processor: MessageProcessor, tenant_id: str):
def _make_on_message_callback(self, message_processor: MessageProcessor, tenant_id: str) -> Callable:
def process_message_body_and_await_result(unpacked_message_body):
# Processing the message in a separate thread is necessary for the main thread pika client to be able to
# process data events (e.g. heartbeats) while the message is being processed.
@ -390,16 +460,4 @@ class ServiceQueueManager(BaseQueueManager):
channel.basic_nack(method.delivery_tag, requeue=False)
raise
return on_message_callback
def stop_consuming(self):
self._consuming = False
if self.channel and self.channel.is_open:
logger.info("Stopping consuming...")
self.channel.stop_consuming()
logger.info("Closing channel...")
self.channel.close()
if self.connection and self.connection.is_open:
logger.info("Closing connection to RabbitMQ...")
self.connection.close()
return on_message_callback

View File

@ -19,6 +19,17 @@ class DossierIdFileIdDownloadPayload(BaseModel):
return f"{self.dossierId}/{self.fileId}.{self.targetFileExtension}"
class TenantIdDossierIdFileIdDownloadPayload(BaseModel):
tenantId: str
dossierId: str
fileId: str
targetFileExtension: str
@property
def targetFilePath(self):
return f"{self.tenantId}/{self.dossierId}/{self.fileId}.{self.targetFileExtension}"
class DossierIdFileIdUploadPayload(BaseModel):
dossierId: str
fileId: str
@ -27,6 +38,17 @@ class DossierIdFileIdUploadPayload(BaseModel):
@property
def responseFilePath(self):
return f"{self.dossierId}/{self.fileId}.{self.responseFileExtension}"
class TenantIdDossierIdFileIdUploadPayload(BaseModel):
tenantId: str
dossierId: str
fileId: str
responseFileExtension: str
@property
def responseFilePath(self):
return f"{self.tenantId}/{self.dossierId}/{self.fileId}.{self.responseFileExtension}"
class TargetResponseFilePathDownloadPayload(BaseModel):
@ -55,7 +77,9 @@ def download_data_as_specified_in_message(storage: Storage, raw_payload: dict) -
"""
try:
if "dossierId" in raw_payload:
if "tenantId" in raw_payload and "dossierId" in raw_payload:
payload = TenantIdDossierIdFileIdDownloadPayload(**raw_payload)
elif "tenantId" not in raw_payload and "dossierId" in raw_payload:
payload = DossierIdFileIdDownloadPayload(**raw_payload)
else:
payload = TargetResponseFilePathDownloadPayload(**raw_payload)
@ -106,7 +130,9 @@ def upload_data_as_specified_in_message(storage: Storage, raw_payload: dict, dat
"""
try:
if "dossierId" in raw_payload:
if "tenantId" in raw_payload and "dossierId" in raw_payload:
payload = TenantIdDossierIdFileIdUploadPayload(**raw_payload)
elif "tenantId" not in raw_payload and "dossierId" in raw_payload:
payload = DossierIdFileIdUploadPayload(**raw_payload)
else:
payload = TargetResponseFilePathUploadPayload(**raw_payload)

View File

@ -1,25 +1,27 @@
import gzip
import json
import time
from operator import itemgetter
from kn_utils.logging import logger
from pyinfra.config.loader import load_settings, local_pyinfra_root_path
# from pyinfra.queue.manager import QueueManager
from pyinfra.queue.sequential_tenants import QueueManager
# from pyinfra.queue.sequential_tenants import QueueManager
from pyinfra.queue.threaded_tenants import ServiceQueueManager, TenantQueueManager
from pyinfra.storage.storages.s3 import get_s3_storage_from_settings
settings = load_settings(local_pyinfra_root_path / "config/")
def upload_json_and_make_message_body():
def upload_json_and_make_message_body(tenant_id: str):
dossier_id, file_id, suffix = "dossier", "file", "json.gz"
content = {
"numberOfPages": 7,
"sectionTexts": "data",
}
object_name = f"{dossier_id}/{file_id}.{suffix}"
object_name = f"{tenant_id}/{dossier_id}/{file_id}.{suffix}"
data = gzip.compress(json.dumps(content).encode("utf-8"))
storage = get_s3_storage_from_settings(settings)
@ -28,6 +30,7 @@ def upload_json_and_make_message_body():
storage.put_object(object_name, data)
message_body = {
"tenantId": tenant_id,
"dossierId": dossier_id,
"fileId": file_id,
"targetFileExtension": suffix,
@ -36,18 +39,38 @@ def upload_json_and_make_message_body():
return message_body
def main():
queue_manager = QueueManager(settings)
def tenant_event_message(tenant_id: str):
return {"tenantId": tenant_id}
def send_tenant_event(tenant_id: str, event_type: str):
queue_manager = TenantQueueManager(settings)
queue_manager.purge_queues()
message = tenant_event_message(tenant_id)
if event_type == "create":
queue_manager.publish_message_to_tenant_created_queue(message=message)
elif event_type == "delete":
queue_manager.publish_message_to_tenant_deleted_queue(message=message)
else:
logger.warning(f"Event type '{event_type}' not known.")
queue_manager.stop_consuming()
def send_service_request(tenant_id: str):
queue_manager = ServiceQueueManager(settings)
queue_name = f"{settings.rabbitmq.service_response_queue_prefix}_{tenant_id}"
queue_manager.purge_queues()
message = upload_json_and_make_message_body()
message = upload_json_and_make_message_body(tenant_id)
queue_manager.publish_message_to_input_queue(tenant_id="redaction", message=message)
logger.info(f"Put {message} on {settings.rabbitmq.input_queue}.")
queue_manager.publish_message_to_input_queue(tenant_id=tenant_id, message=message)
logger.info(f"Put {message} on {queue_name}.")
storage = get_s3_storage_from_settings(settings)
for method_frame, properties, body in queue_manager.channel.consume(
queue=settings.rabbitmq.output_queue, inactivity_timeout=15
queue=queue_name, inactivity_timeout=15
):
if not body:
break
@ -55,14 +78,34 @@ def main():
logger.info(f"Received {response}")
logger.info(f"Message headers: {properties.headers}")
queue_manager.channel.basic_ack(method_frame.delivery_tag)
dossier_id, file_id = itemgetter("dossierId", "fileId")(response)
tenant_id, dossier_id, file_id = itemgetter("tenantId", "dossierId", "fileId")(response)
suffix = message["responseFileExtension"]
print(f"{dossier_id}/{file_id}.{suffix}")
result = storage.get_object(f"{dossier_id}/{file_id}.{suffix}")
print(f"{tenant_id}/{dossier_id}/{file_id}.{suffix}")
result = storage.get_object(f"{tenant_id}/{dossier_id}/{file_id}.{suffix}")
result = json.loads(gzip.decompress(result))
logger.info(f"Contents of result on storage: {result}")
break
queue_manager.stop_consuming()
if __name__ == "__main__":
main()
tenant_ids = ["a", "b", "c", "d"]
# with ccf.ThreadPoolExecutor() as executor:
# results = executor.map(main, tenant_ids)
# for tenant in tenant_ids:
# main(tenant)
send_service_request("redaction")
# for tenant in tenant_ids:
# send_tenant_event(tenant_id=tenant, event_type="create")
# # time.sleep(1)
# for tenant in tenant_ids:
# send_service_request(tenant_id=tenant)
# # time.sleep(1)
# for tenant in tenant_ids:
# send_tenant_event(tenant_id=tenant, event_type="delete")

View File

@ -1,31 +1,41 @@
version: '2'
version: '3.8'
services:
minio:
image: minio/minio:RELEASE.2022-06-11T19-55-32Z
image: minio/minio:latest
container_name: minio
ports:
- "9000:9000"
environment:
- MINIO_ROOT_PASSWORD=password
- MINIO_ROOT_USER=root
volumes:
- /tmp/minio_store:/data
- /tmp/data/minio_store:/data
command: server /data
network_mode: "bridge"
network_mode: "bridge"
extra_hosts:
- "host.docker.internal:host-gateway"
rabbitmq:
image: docker.io/bitnami/rabbitmq:3.9.8
image: docker.io/bitnami/rabbitmq:latest
container_name: rabbitmq
ports:
- '4369:4369'
- '5551:5551'
- '5552:5552'
# - '4369:4369'
# - '5551:5551'
# - '5552:5552'
- '5672:5672'
- '25672:25672'
- '15672:15672'
# - '25672:25672'
environment:
- RABBITMQ_SECURE_PASSWORD=yes
- RABBITMQ_VM_MEMORY_HIGH_WATERMARK=100%
- RABBITMQ_DISK_FREE_ABSOLUTE_LIMIT=20Gi
- RABBITMQ_MANAGEMENT_ALLOW_WEB_ACCESS=true
network_mode: "bridge"
volumes:
- /opt/bitnami/rabbitmq/.rabbitmq/:/data/bitnami
volumes:
mdata:
- /tmp/bitnami/rabbitmq/.rabbitmq/:/data/bitnami
healthcheck:
test: [ "CMD", "curl", "-f", "http://localhost:15672" ]
interval: 30s
timeout: 10s
retries: 5
extra_hosts:
- "host.docker.internal:host-gateway"