pyinfra/tests/conftest.py
2024-01-25 09:08:51 +01:00

47 lines
977 B
Python

import json
import pytest
from pyinfra.config.loader import load_settings, pyinfra_config_path
from pyinfra.queue.manager import QueueManager
from pyinfra.storage.connection import get_storage
@pytest.fixture(scope="session")
def settings():
return load_settings(pyinfra_config_path)
@pytest.fixture(scope="class")
def storage(storage_backend, settings):
settings.storage.backend = storage_backend
storage = get_storage(settings)
storage.make_bucket()
yield storage
storage.clear_bucket()
@pytest.fixture(scope="session")
def queue_manager(settings):
settings.rabbitmq_heartbeat = 10
settings.connection_sleep = 5
queue_manager = QueueManager(settings)
yield queue_manager
@pytest.fixture
def input_message():
return json.dumps(
{
"targetFilePath": "test/target.json.gz",
"responseFilePath": "test/response.json.gz",
}
)
@pytest.fixture
def stop_message():
return "STOP"