import json from pyinfra.config import CONFIG as MAIN_CONFIG MAIN_CONFIG["retry"]["delay"] = 0.1 MAIN_CONFIG["retry"]["jitter"] = (0.1, 0.2) from pyinfra.default_objects import get_component_factory from test.config import CONFIG as TEST_CONFIG import logging import time from unittest.mock import Mock import pika import pytest from testcontainers.compose import DockerCompose from pyinfra.exceptions import UnknownClient from pyinfra.locations import TEST_DIR, COMPOSE_PATH from pyinfra.queue.queue_manager.pika_queue_manager import PikaQueueManager, get_connection_params from pyinfra.queue.queue_manager.queue_manager import QueueManager from pyinfra.storage.adapters.azure import AzureStorageAdapter from pyinfra.storage.adapters.s3 import S3StorageAdapter from pyinfra.storage.clients.azure import get_azure_client from pyinfra.storage.clients.s3 import get_s3_client from pyinfra.storage.storage import Storage from pyinfra.visitor import QueueVisitor from pyinfra.visitor.strategies.response.forwarding import ForwardingStrategy from pyinfra.visitor.strategies.response.storage import StorageStrategy from test.config import CONFIG from test.queue.queue_manager_mock import QueueManagerMock from test.storage.adapter_mock import StorageAdapterMock from test.storage.client_mock import StorageClientMock logging.basicConfig() logger = logging.getLogger() # TODO: refactor all fixtures into cleanly separated modules pytest_plugins = [ "test.fixtures.consumer", "test.fixtures.input", "test.fixtures.pdf", "test.fixtures.server", "test.integration_tests.serve_test", ] logging.getLogger("PIL.PngImagePlugin").setLevel(level=logging.CRITICAL + 1) logging.getLogger("waitress").setLevel(level=logging.CRITICAL + 1) @pytest.fixture(autouse=True) def mute_logger(): if not CONFIG.logging: logger.setLevel(logging.CRITICAL + 1) @pytest.fixture(scope="session") def bucket_name(): return "pyinfra-test-bucket" @pytest.fixture def storage_data(): with open(f"{TEST_DIR}/test_data/test_data.TEXT.json", "r") as f: data = json.load(f) return data @pytest.fixture def mock_response(storage_data): response = Mock(status_code=200) response.json.return_value = storage_data return response @pytest.fixture def mock_payload(): return json.dumps({"dossierId": "test", "fileId": "test"}) @pytest.fixture def mock_make_load_data(): def load_data(payload): return storage_data return load_data # def pytest_make_parametrize_id(config, val, argname): # return f"\n\t{argname}={val}\n" @pytest.fixture def storage(client_name, bucket_name, s3_backend, docker_compose): logger.debug("Setup for storage") storage = Storage(get_adapter(client_name, s3_backend)) storage.make_bucket(bucket_name) storage.clear_bucket(bucket_name) yield storage logger.debug("Teardown for storage") storage.clear_bucket(bucket_name) @pytest.fixture(params=["minio", "aws"]) def s3_backend(request): return request.param @pytest.fixture(scope="session", autouse=True) def docker_compose(sleep_seconds=30): if CONFIG.use_docker_fixture: logger.info(f"Starting docker containers with {COMPOSE_PATH}/docker-compose.yml...") compose = DockerCompose(COMPOSE_PATH, compose_file_name="docker-compose.yml") compose.start() logger.info(f"Sleeping for {sleep_seconds} seconds to wait for containers to finish startup... ") time.sleep(sleep_seconds) yield compose compose.stop() else: yield None def get_pika_connection_params(): params = get_connection_params() return params def get_s3_params(s3_backend): params = CONFIG.storage[s3_backend] return params def get_adapter(client_name, s3_backend): if client_name == "mock": return StorageAdapterMock(StorageClientMock()) if client_name == "azure": return AzureStorageAdapter(get_azure_client(CONFIG.storage.azure.connection_string)) if client_name == "s3": return S3StorageAdapter(get_s3_client(get_s3_params(s3_backend))) else: raise UnknownClient(client_name) def get_queue_manager(queue_manager_name) -> QueueManager: if queue_manager_name == "mock": return QueueManagerMock("input", "output") if queue_manager_name == "pika": return PikaQueueManager("input", "output", connection_params=get_pika_connection_params()) @pytest.fixture(scope="session") def queue_manager(queue_manager_name, docker_compose): def close_connections(): if queue_manager_name == "pika": try: queue_manager.connection.close() except (pika.exceptions.StreamLostError, pika.exceptions.ConnectionWrongStateError, ConnectionResetError): logger.debug("Connection was already closed when attempting to close explicitly.") def close_channel(): if queue_manager_name == "pika": try: queue_manager.channel.close() except pika.exceptions.ChannelWrongStateError: logger.debug("Channel was already closed when attempting to close explicitly.") queue_manager = get_queue_manager(queue_manager_name) yield queue_manager close_connections() close_channel() @pytest.fixture(scope="session") def callback(): def inner(request): return [request["data"].decode() * 2] return inner @pytest.fixture def analysis_callback(callback): def inner(request): return callback(request) return inner @pytest.fixture def response_strategy(response_strategy_name, storage): if response_strategy_name == "storage": return StorageStrategy(storage) if response_strategy_name == "forwarding": return ForwardingStrategy() @pytest.fixture() def visitor(storage, analysis_callback, response_strategy, component_factory): return QueueVisitor( storage=storage, callback=analysis_callback, download_strategy=component_factory.get_download_strategy(), response_strategy=response_strategy, ) @pytest.fixture def file_descriptor_manager(component_factory): return component_factory.get_file_descriptor_manager() @pytest.fixture def component_factory(): CONFIG["service"]["operations"] = TEST_CONFIG.service.operations CONFIG["service"]["download_strategy"] = "single" return get_component_factory(CONFIG)