Merge in RR/pyinfra from RED-6273-multi-tenant-storage to master
Squashed commit of the following:
commit 0fead1f8b59c9187330879b4e48d48355885c27c
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Tue Mar 28 15:02:22 2023 +0200
fix typos
commit 892a803726946876f8b8cd7905a0e73c419b2fb1
Author: Matthias Bisping <matthias.bisping@axbit.com>
Date: Tue Mar 28 14:41:49 2023 +0200
Refactoring
Replace custom storage caching logic with LRU decorator
commit eafcd90260731e3360ce960571f07dee8f521327
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Fri Mar 24 12:50:13 2023 +0100
fix bug in storage connection from endpoint
commit d0c9fb5b7d1c55ae2f90e8faa1efec9f7587c26a
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Fri Mar 24 11:49:34 2023 +0100
add logs to PayloadProcessor
- set log messages to determine if x-tenant
storage connection is working
commit 97309fe58037b90469cf7a3de342d4749a0edfde
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Fri Mar 24 10:41:59 2023 +0100
update PayloadProcessor
- introduce storage cache to make every unique
storage connection only once
- add functionality to pass optional processing
kwargs in queue message like the operation key to
the processing function
commit d48e8108fdc0d463c89aaa0d672061ab7dca83a0
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Wed Mar 22 13:34:43 2023 +0100
add multi-tenant storage connection 1st iteration
- forward x-tenant-id from queue message header to
payload processor
- add functions to receive storage infos from an
endpoint or the config. This enables hashing and
caching of connections created from these infos
- add function to initialize storage connections
from storage infos
- streamline and refactor tests to make them more
readable and robust and to make it easier to add
new tests
- update payload processor with first iteration
of multi tenancy storage connection support
with connection caching and backwards compability
commit 52c047c47b98e62d0b834a9b9b6c0e2bb0db41e5
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Tue Mar 21 15:35:57 2023 +0100
add AES/GCM cipher functions
- decrypt x-tenant storage connection strings
124 lines
4.6 KiB
Python
124 lines
4.6 KiB
Python
import json
|
|
import logging
|
|
import time
|
|
from multiprocessing import Process
|
|
|
|
import pika
|
|
import pika.exceptions
|
|
import pytest
|
|
|
|
from pyinfra.queue.development_queue_manager import DevelopmentQueueManager
|
|
from pyinfra.queue.queue_manager import QueueManager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def development_queue_manager(test_queue_config):
|
|
test_queue_config.rabbitmq_heartbeat = 7200
|
|
development_queue_manager = DevelopmentQueueManager(test_queue_config)
|
|
yield development_queue_manager
|
|
logger.info("Tearing down development queue manager...")
|
|
try:
|
|
development_queue_manager.close_channel()
|
|
except pika.exceptions.ConnectionClosedByBroker:
|
|
pass
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def payload_processing_time(test_queue_config, offset=5):
|
|
# FIXME: this implicitly tests the heartbeat when running the end-to-end test. There should be another way to test
|
|
# this explicitly.
|
|
return test_queue_config.rabbitmq_heartbeat + offset
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def payload_processor(response_payload, payload_processing_time, payload_processor_type):
|
|
def process(payload):
|
|
time.sleep(payload_processing_time)
|
|
return response_payload
|
|
|
|
def process_with_failure(payload):
|
|
raise MemoryError
|
|
|
|
if payload_processor_type == "mock":
|
|
return process
|
|
elif payload_processor_type == "failing":
|
|
return process_with_failure
|
|
|
|
|
|
@pytest.fixture(scope="session", autouse=True)
|
|
def start_queue_consumer(test_queue_config, payload_processor, sleep_seconds=5):
|
|
def consume_queue():
|
|
queue_manager.start_consuming(payload_processor)
|
|
|
|
queue_manager = QueueManager(test_queue_config)
|
|
p = Process(target=consume_queue)
|
|
p.start()
|
|
logger.info(f"Setting up consumer, waiting for {sleep_seconds}...")
|
|
time.sleep(sleep_seconds)
|
|
yield
|
|
logger.info("Tearing down consumer...")
|
|
p.terminate()
|
|
|
|
|
|
@pytest.fixture
|
|
def message_properties(message_headers):
|
|
if not message_headers:
|
|
return pika.BasicProperties(headers=None)
|
|
elif message_headers == "X-TENANT-ID":
|
|
return pika.BasicProperties(headers={"X-TENANT-ID": "redaction"})
|
|
else:
|
|
raise Exception(f"Invalid {message_headers=}.")
|
|
|
|
|
|
@pytest.mark.parametrize("x_tenant_id", [None])
|
|
class TestQueueManager:
|
|
# FIXME: All tests here are wonky. This is due to the implementation of running the process-blocking queue_manager
|
|
# in a subprocess. It is then very hard to interact directly with the subprocess. If you have a better idea, please
|
|
# refactor; the tests here are insufficient to ensure the functionality of the queue manager!
|
|
@pytest.mark.parametrize("payload_processor_type", ["mock"], scope="session")
|
|
def test_message_processing_does_not_block_heartbeat(
|
|
self, development_queue_manager, payload, response_payload, payload_processing_time
|
|
):
|
|
development_queue_manager.clear_queues()
|
|
development_queue_manager.publish_request(payload)
|
|
time.sleep(payload_processing_time + 10)
|
|
_, _, body = development_queue_manager.get_response()
|
|
result = json.loads(body)
|
|
assert result == response_payload
|
|
|
|
@pytest.mark.parametrize("message_headers", [None, "X-TENANT-ID"])
|
|
@pytest.mark.parametrize("payload_processor_type", ["mock"], scope="session")
|
|
def test_queue_manager_forwards_message_headers(
|
|
self,
|
|
development_queue_manager,
|
|
payload,
|
|
response_payload,
|
|
payload_processing_time,
|
|
message_properties,
|
|
):
|
|
development_queue_manager.clear_queues()
|
|
development_queue_manager.publish_request(payload, message_properties)
|
|
time.sleep(payload_processing_time + 10)
|
|
_, properties, _ = development_queue_manager.get_response()
|
|
assert properties.headers == message_properties.headers
|
|
|
|
# FIXME: It is not possible to test the behavior of the queue manager directly, since it is running in a separate
|
|
# process. You require logging to see if the exception is handled correctly. Hence, this test is only useful for
|
|
# development, but insufficient to guarantee the correct behavior.
|
|
@pytest.mark.parametrize("payload_processor_type", ["failing"], scope="session")
|
|
def test_failed_message_processing_is_handled(
|
|
self,
|
|
development_queue_manager,
|
|
payload,
|
|
response_payload,
|
|
payload_processing_time,
|
|
):
|
|
development_queue_manager.clear_queues()
|
|
development_queue_manager.publish_request(payload)
|
|
time.sleep(payload_processing_time + 10)
|
|
_, _, body = development_queue_manager.get_response()
|
|
assert not body
|