All components from payload processing downwards are tested. Tests that depend on docker compose have been disabled by default because they take too long to use during development. Furthermore, the queue manager tests are not stable, a refactoring with inversion of control is urgently needed to make the components properly testable. The storage tests are stable and should be run once before releasing, this should be implemented via the CI script. Also adds, if present, tenant Id and operation kwargs to storage and queue response.
82 lines
2.4 KiB
Python
82 lines
2.4 KiB
Python
import gzip
|
|
import json
|
|
import pytest
|
|
|
|
from pyinfra.config import get_config
|
|
from pyinfra.payload_processing.payload import get_queue_message_payload_parser
|
|
from pyinfra.payload_processing.processor import PayloadProcessor
|
|
from pyinfra.storage.storage_info import StorageInfo
|
|
from pyinfra.storage.storage_provider import StorageProviderMock
|
|
from pyinfra.storage.storages.mock import StorageMock
|
|
|
|
|
|
@pytest.fixture
|
|
def bucket_name():
|
|
return "test_bucket"
|
|
|
|
|
|
@pytest.fixture
|
|
def storage_mock(target_json_file, target_file_path, bucket_name):
|
|
storage = StorageMock(target_json_file, target_file_path, bucket_name)
|
|
return storage
|
|
|
|
|
|
@pytest.fixture
|
|
def storage_info_mock(bucket_name):
|
|
return StorageInfo(bucket_name)
|
|
|
|
|
|
@pytest.fixture
|
|
def data_processor_mock(processing_result_json):
|
|
def inner(data, **kwargs):
|
|
return processing_result_json
|
|
|
|
return inner
|
|
|
|
|
|
@pytest.fixture
|
|
def payload_processor(storage_mock, storage_info_mock, data_processor_mock):
|
|
storage_provider = StorageProviderMock(storage_mock, storage_info_mock)
|
|
payload_parser = get_queue_message_payload_parser(get_config())
|
|
return PayloadProcessor(storage_provider, payload_parser, data_processor_mock)
|
|
|
|
|
|
@pytest.mark.parametrize("x_tenant_id", [None, "klaus"])
|
|
@pytest.mark.parametrize("optional_processing_kwargs", [{}, {"operation": "test"}])
|
|
class TestPayloadProcessor:
|
|
def test_payload_processor_yields_correct_response_and_uploads_result_for_legacy_message(
|
|
self,
|
|
payload_processor,
|
|
storage_mock,
|
|
bucket_name,
|
|
response_file_path,
|
|
legacy_payload,
|
|
legacy_queue_response_payload,
|
|
legacy_storage_payload,
|
|
):
|
|
response = payload_processor(legacy_payload)
|
|
|
|
assert response == legacy_queue_response_payload
|
|
|
|
data_stored = storage_mock.get_object(bucket_name, response_file_path)
|
|
|
|
assert json.loads(gzip.decompress(data_stored).decode()) == legacy_storage_payload
|
|
|
|
def test_payload_processor_yields_correct_response_and_uploads_result(
|
|
self,
|
|
payload_processor,
|
|
storage_mock,
|
|
bucket_name,
|
|
response_file_path,
|
|
payload,
|
|
queue_response_payload,
|
|
storage_payload,
|
|
):
|
|
response = payload_processor(payload)
|
|
|
|
assert response == queue_response_payload
|
|
|
|
data_stored = storage_mock.get_object(bucket_name, response_file_path)
|
|
|
|
assert json.loads(gzip.decompress(data_stored).decode()) == storage_payload
|