49 lines
1.0 KiB
Python
49 lines
1.0 KiB
Python
import json
|
|
|
|
import pytest
|
|
|
|
from pyinfra.config.loader import load_settings, local_pyinfra_root_path
|
|
from pyinfra.queue.manager import QueueManager
|
|
from pyinfra.storage.connection import get_storage
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def settings():
|
|
return load_settings(local_pyinfra_root_path / "config/")
|
|
|
|
|
|
@pytest.fixture(scope="class")
|
|
def storage(storage_backend, settings):
|
|
settings.storage.backend = storage_backend
|
|
|
|
storage = get_storage(settings)
|
|
storage.make_bucket()
|
|
|
|
yield storage
|
|
storage.clear_bucket()
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def queue_manager(settings):
|
|
settings.rabbitmq_heartbeat = 10
|
|
settings.connection_sleep = 5
|
|
settings.rabbitmq.max_retries = 3
|
|
settings.rabbitmq.max_delay = 10
|
|
queue_manager = QueueManager(settings)
|
|
yield queue_manager
|
|
|
|
|
|
@pytest.fixture
|
|
def input_message():
|
|
return json.dumps(
|
|
{
|
|
"targetFilePath": "test/target.json.gz",
|
|
"responseFilePath": "test/response.json.gz",
|
|
}
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def stop_message():
|
|
return "STOP"
|