import json import pytest from pyinfra.config.loader import load_settings, local_pyinfra_root_path from pyinfra.queue.manager import QueueManager from pyinfra.storage.connection import get_storage @pytest.fixture(scope="session") def settings(): return load_settings(local_pyinfra_root_path / "config/") @pytest.fixture(scope="class") def storage(storage_backend, settings): settings.storage.backend = storage_backend storage = get_storage(settings) storage.make_bucket() yield storage storage.clear_bucket() @pytest.fixture(scope="session") def queue_manager(settings): settings.rabbitmq_heartbeat = 10 settings.connection_sleep = 5 settings.rabbitmq.max_retries = 3 settings.rabbitmq.max_delay = 10 queue_manager = QueueManager(settings) yield queue_manager @pytest.fixture def input_message(): return json.dumps( { "targetFilePath": "test/target.json.gz", "responseFilePath": "test/response.json.gz", } ) @pytest.fixture def stop_message(): return "STOP"