import gzip import json import pytest from pyinfra.payload_processing.payload import LegacyQueueMessagePayload, QueueMessagePayload @pytest.fixture def legacy_payload(x_tenant_id, optional_processing_kwargs): x_tenant_entry = {"X-TENANT-ID": x_tenant_id} if x_tenant_id else {} optional_processing_kwargs = optional_processing_kwargs or {} return { "dossierId": "test", "fileId": "test", "targetFileExtension": "target.json.gz", "responseFileExtension": "response.json.gz", **x_tenant_entry, **optional_processing_kwargs, } @pytest.fixture def target_file_path(): return "test/test.target.json.gz" @pytest.fixture def response_file_path(): return "test/test.response.json.gz" @pytest.fixture def payload(x_tenant_id, optional_processing_kwargs, target_file_path, response_file_path): x_tenant_entry = {"X-TENANT-ID": x_tenant_id} if x_tenant_id else {} optional_processing_kwargs = optional_processing_kwargs or {} return { "targetFilePath": target_file_path, "responseFilePath": response_file_path, **x_tenant_entry, **optional_processing_kwargs, } @pytest.fixture def legacy_queue_response_payload(x_tenant_id, optional_processing_kwargs): x_tenant_entry = {"X-TENANT-ID": x_tenant_id} if x_tenant_id else {} optional_processing_kwargs = optional_processing_kwargs or {} return { "dossierId": "test", "fileId": "test", **x_tenant_entry, **optional_processing_kwargs, } @pytest.fixture def queue_response_payload(x_tenant_id, optional_processing_kwargs, target_file_path, response_file_path): x_tenant_entry = {"X-TENANT-ID": x_tenant_id} if x_tenant_id else {} optional_processing_kwargs = optional_processing_kwargs or {} return { "targetFilePath": target_file_path, "responseFilePath": response_file_path, **x_tenant_entry, **optional_processing_kwargs, } @pytest.fixture def legacy_storage_payload(x_tenant_id, optional_processing_kwargs, processing_result_json): x_tenant_entry = {"X-TENANT-ID": x_tenant_id} if x_tenant_id else {} optional_processing_kwargs = optional_processing_kwargs or {} return { "dossierId": "test", "fileId": "test", "targetFileExtension": "target.json.gz", "responseFileExtension": "response.json.gz", **x_tenant_entry, **optional_processing_kwargs, "data": processing_result_json, } @pytest.fixture def storage_payload(x_tenant_id, optional_processing_kwargs, processing_result_json, target_file_path, response_file_path): x_tenant_entry = {"X-TENANT-ID": x_tenant_id} if x_tenant_id else {} optional_processing_kwargs = optional_processing_kwargs or {} return { "targetFilePath": target_file_path, "responseFilePath": response_file_path, **x_tenant_entry, **optional_processing_kwargs, "data": processing_result_json, } @pytest.fixture def legacy_parsed_payload( x_tenant_id, optional_processing_kwargs, target_file_path, response_file_path ) -> LegacyQueueMessagePayload: return LegacyQueueMessagePayload( dossier_id="test", file_id="test", x_tenant_id=x_tenant_id, target_file_extension="target.json.gz", response_file_extension="response.json.gz", target_file_type="json", target_compression_type="gz", response_file_type="json", response_compression_type="gz", target_file_path=target_file_path, response_file_path=response_file_path, processing_kwargs=optional_processing_kwargs or {}, ) @pytest.fixture def parsed_payload( x_tenant_id, optional_processing_kwargs, target_file_path, response_file_path ) -> QueueMessagePayload: return QueueMessagePayload( x_tenant_id=x_tenant_id, target_file_type="json", target_compression_type="gz", response_file_type="json", response_compression_type="gz", target_file_path=target_file_path, response_file_path=response_file_path, processing_kwargs=optional_processing_kwargs or {}, ) @pytest.fixture def target_json_file() -> bytes: data = {"target": "test"} enc_data = json.dumps(data).encode("utf-8") compr_data = gzip.compress(enc_data) return compr_data @pytest.fixture def processing_result_json() -> dict: return {"response": "test"}