pyinfra/test/unit_tests/conftest.py
Matthias Bisping 0e47ba61c7 Pull request #34: Aws region
Merge in RR/pyinfra from aws_region to master

Squashed commit of the following:

commit 7d5513e8b2e46218fe6f030abf99cc05b3fed7f5
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Apr 28 16:08:20 2022 +0200

    changed test bucket

commit caba7b503edbc6147aa69f8044ac92a81e768abd
Merge: e76f0ab 52acc30
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Apr 28 15:30:57 2022 +0200

    Merge branch 'master' of ssh://git.iqser.com:2222/rr/pyinfra into aws_region

commit e76f0ab1f29b774d9297453c82b3b5749ea4cb44
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Apr 28 15:30:19 2022 +0200

    changed test bucket

commit 29b15be9337fb409f6a0bfbb29026baa4bbe7236
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Apr 28 15:08:43 2022 +0200

    added aws region to config
2022-04-28 16:29:15 +02:00

161 lines
4.7 KiB
Python

import json
import logging
import time
from unittest.mock import Mock
import pika
import pytest
import testcontainers.compose
from pyinfra.exceptions import UnknownClient
from pyinfra.locations import TEST_DIR, COMPOSE_PATH
from pyinfra.queue.queue_manager.pika_queue_manager import PikaQueueManager, get_connection_params
from pyinfra.queue.queue_manager.queue_manager import QueueManager
from pyinfra.storage.adapters.azure import AzureStorageAdapter
from pyinfra.storage.adapters.s3 import S3StorageAdapter
from pyinfra.storage.clients.azure import get_azure_client
from pyinfra.storage.clients.s3 import get_s3_client
from pyinfra.storage.storage import Storage
from test.config import CONFIG
from test.queue.queue_manager_mock import QueueManagerMock
from test.storage.adapter_mock import StorageAdapterMock
from test.storage.client_mock import StorageClientMock
from pyinfra.visitor import StorageStrategy, ForwardingStrategy, QueueVisitor
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
@pytest.fixture(scope="session")
def bucket_name():
return "pyinfra-test-bucket"
@pytest.fixture
def storage_data():
with open(f"{TEST_DIR}/test_data/test_data.TEXT.json", "r") as f:
data = json.load(f)
return data
@pytest.fixture
def mock_response(storage_data):
response = Mock(status_code=200)
response.json.return_value = storage_data
return response
@pytest.fixture
def mock_payload():
return json.dumps({"dossierId": "test", "fileId": "test"})
@pytest.fixture
def mock_make_load_data():
def load_data(payload):
return storage_data
return load_data
@pytest.fixture(params=["minio", "aws"], scope="session")
def storage(client_name, bucket_name, request):
logger.debug("Setup for storage")
storage = Storage(get_adapter(client_name, request.param))
storage.make_bucket(bucket_name)
storage.clear_bucket(bucket_name)
yield storage
logger.debug("Teardown for storage")
storage.clear_bucket(bucket_name)
@pytest.fixture(scope="session", autouse=True)
def docker_compose(sleep_seconds=30):
logger.info(f"Starting docker containers with {COMPOSE_PATH}/docker-compose.yml...")
compose = testcontainers.compose.DockerCompose(COMPOSE_PATH, compose_file_name="docker-compose.yml")
compose.start()
logger.info(f"Sleeping for {sleep_seconds} seconds to wait for containers to finish startup... ")
time.sleep(sleep_seconds)
yield compose
compose.stop()
def get_pika_connection_params():
params = get_connection_params()
return params
def get_s3_params(s3_backend):
params = CONFIG.storage[s3_backend]
return params
def get_adapter(client_name, s3_backend):
if client_name == "mock":
return StorageAdapterMock(StorageClientMock())
if client_name == "azure":
return AzureStorageAdapter(get_azure_client(CONFIG.storage.azure.connection_string))
if client_name == "s3":
return S3StorageAdapter(get_s3_client(get_s3_params(s3_backend)))
else:
raise UnknownClient(client_name)
def get_queue_manager(queue_manager_name) -> QueueManager:
if queue_manager_name == "mock":
return QueueManagerMock("input", "output")
if queue_manager_name == "pika":
return PikaQueueManager("input", "output", connection_params=get_pika_connection_params())
@pytest.fixture(scope="session")
def queue_manager(queue_manager_name):
def close_connections():
if queue_manager_name == "pika":
try:
queue_manager.connection.close()
except (pika.exceptions.StreamLostError, pika.exceptions.ConnectionWrongStateError, ConnectionResetError):
logger.debug("Connection was already closed when attempting to close explicitly.")
def close_channel():
if queue_manager_name == "pika":
try:
queue_manager.channel.close()
except pika.exceptions.ChannelWrongStateError:
logger.debug("Channel was already closed when attempting to close explicitly.")
queue_manager = get_queue_manager(queue_manager_name)
yield queue_manager
close_connections()
close_channel()
@pytest.fixture(scope="session")
def callback():
def inner(request):
return request["data"].decode() * 2
return inner
@pytest.fixture
def analysis_callback(callback):
def inner(request):
return callback(request)
return inner
@pytest.fixture
def response_strategy(response_strategy_name, storage):
if response_strategy_name == "storage":
return StorageStrategy(storage)
if response_strategy_name == "forwarding":
return ForwardingStrategy()
@pytest.fixture()
def visitor(storage, analysis_callback, response_strategy):
return QueueVisitor(storage, analysis_callback, response_strategy)