pyinfra/tests/conftest.py

49 lines
1.0 KiB
Python

import json
import pytest
from pyinfra.config.loader import load_settings, local_pyinfra_root_path
from pyinfra.queue.manager import QueueManager
from pyinfra.storage.connection import get_storage
@pytest.fixture(scope="session")
def settings():
return load_settings(local_pyinfra_root_path / "config/")
@pytest.fixture(scope="class")
def storage(storage_backend, settings):
settings.storage.backend = storage_backend
storage = get_storage(settings)
storage.make_bucket()
yield storage
storage.clear_bucket()
@pytest.fixture(scope="session")
def queue_manager(settings):
settings.rabbitmq_heartbeat = 10
settings.connection_sleep = 5
settings.rabbitmq.max_retries = 3
settings.rabbitmq.max_delay = 10
queue_manager = QueueManager(settings)
yield queue_manager
@pytest.fixture
def input_message():
return json.dumps(
{
"targetFilePath": "test/target.json.gz",
"responseFilePath": "test/response.json.gz",
}
)
@pytest.fixture
def stop_message():
return "STOP"