import gzip import json from operator import itemgetter import pytest import requests from pyinfra.payload_processing.processor import make_payload_processor @pytest.fixture def target_file(): contents = {"numberOfPages": 10, "content1": "value1", "content2": "value2"} return gzip.compress(json.dumps(contents).encode("utf-8")) @pytest.fixture def file_names(payload): dossier_id, file_id, target_suffix, response_suffix = itemgetter( "dossierId", "fileId", "targetFileExtension", "responseFileExtension", )(payload) return f"{dossier_id}/{file_id}.{target_suffix}", f"{dossier_id}/{file_id}.{response_suffix}" @pytest.fixture(scope="session") def payload_processor(test_storage_config): def file_processor_mock(json_file: dict): return [json_file] yield make_payload_processor(file_processor_mock, test_storage_config) @pytest.mark.parametrize("storage_backend", ["s3"], scope="session") @pytest.mark.parametrize("bucket_name", ["testbucket"], scope="session") @pytest.mark.parametrize("monitoring_enabled", [True, False], scope="session") @pytest.mark.parametrize("x_tenant_id", [None]) class TestPayloadProcessor: def test_payload_processor_yields_correct_response_and_uploads_result( self, payload_processor, storage, bucket_name, payload, response_payload, target_file, file_names, ): storage.clear_bucket(bucket_name) storage.put_object(bucket_name, file_names[0], target_file) response = payload_processor(payload) assert response == response_payload data_received = storage.get_object(bucket_name, file_names[1]) assert json.loads((gzip.decompress(data_received)).decode("utf-8")) == { **payload, "data": [json.loads(gzip.decompress(target_file).decode("utf-8"))], } def test_catching_of_processing_failure(self, payload_processor, storage, bucket_name, payload): storage.clear_bucket(bucket_name) with pytest.raises(Exception): payload_processor(payload) def test_prometheus_endpoint_is_available(self, test_storage_config, monitoring_enabled, storage_backend, x_tenant_id): if monitoring_enabled: resp = requests.get( f"http://{test_storage_config.prometheus_host}:{test_storage_config.prometheus_port}/prometheus" ) assert resp.status_code == 200