pyinfra/tests/unit_tests/processor_test.py
Julius Unverfehrt c09476cfae Update tests
All components from payload processing downwards are tested.

Tests that depend on docker compose have been disabled by default
because they take too long to use during development. Furthermore, the
queue manager tests are not stable, a refactoring with inversion of
control is urgently needed to make the components properly testable. The
storage tests are stable and should be run once before releasing, this
should be implemented via the CI script.

Also adds, if present, tenant Id and operation kwargs to storage and
queue response.
2023-08-22 17:33:22 +02:00

82 lines
2.4 KiB
Python

import gzip
import json
import pytest
from pyinfra.config import get_config
from pyinfra.payload_processing.payload import get_queue_message_payload_parser
from pyinfra.payload_processing.processor import PayloadProcessor
from pyinfra.storage.storage_info import StorageInfo
from pyinfra.storage.storage_provider import StorageProviderMock
from pyinfra.storage.storages.mock import StorageMock
@pytest.fixture
def bucket_name():
return "test_bucket"
@pytest.fixture
def storage_mock(target_json_file, target_file_path, bucket_name):
storage = StorageMock(target_json_file, target_file_path, bucket_name)
return storage
@pytest.fixture
def storage_info_mock(bucket_name):
return StorageInfo(bucket_name)
@pytest.fixture
def data_processor_mock(processing_result_json):
def inner(data, **kwargs):
return processing_result_json
return inner
@pytest.fixture
def payload_processor(storage_mock, storage_info_mock, data_processor_mock):
storage_provider = StorageProviderMock(storage_mock, storage_info_mock)
payload_parser = get_queue_message_payload_parser(get_config())
return PayloadProcessor(storage_provider, payload_parser, data_processor_mock)
@pytest.mark.parametrize("x_tenant_id", [None, "klaus"])
@pytest.mark.parametrize("optional_processing_kwargs", [{}, {"operation": "test"}])
class TestPayloadProcessor:
def test_payload_processor_yields_correct_response_and_uploads_result_for_legacy_message(
self,
payload_processor,
storage_mock,
bucket_name,
response_file_path,
legacy_payload,
legacy_queue_response_payload,
legacy_storage_payload,
):
response = payload_processor(legacy_payload)
assert response == legacy_queue_response_payload
data_stored = storage_mock.get_object(bucket_name, response_file_path)
assert json.loads(gzip.decompress(data_stored).decode()) == legacy_storage_payload
def test_payload_processor_yields_correct_response_and_uploads_result(
self,
payload_processor,
storage_mock,
bucket_name,
response_file_path,
payload,
queue_response_payload,
storage_payload,
):
response = payload_processor(payload)
assert response == queue_response_payload
data_stored = storage_mock.get_object(bucket_name, response_file_path)
assert json.loads(gzip.decompress(data_stored).decode()) == storage_payload