import gzip import json import pytest from pyinfra.server.packing import bytes_to_string from pyinfra.visitor import get_object_descriptor, get_response_object_descriptor @pytest.fixture() def body(): return {"dossierId": "folder", "fileId": "file", "targetFileExtension": "in.gz", "responseFileExtension": "out.gz"} @pytest.mark.parametrize("client_name", ["mock", "azure", "s3"], scope="session") class TestVisitor: @pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session") def test_given_a_input_queue_message_callback_pulls_the_data_from_storage( self, visitor, body, storage, bucket_name ): storage.clear_bucket(bucket_name) storage.put_object( **get_object_descriptor(body), data=gzip.compress(json.dumps(bytes_to_string(b"content")).encode()) ) data_received = visitor.load_data(body) assert {"data": b"content", "metadata": {}} == data_received @pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session") def test_visitor_pulls_and_processes_data(self, visitor, body, storage, bucket_name): storage.clear_bucket(bucket_name) storage.put_object( **get_object_descriptor(body), data=gzip.compress(json.dumps(bytes_to_string(b"2")).encode()) ) response_body = visitor.load_item_from_storage_and_process_with_callback(body) assert response_body["data"] == ["22"] @pytest.mark.parametrize("response_strategy_name", ["storage"], scope="session") def test_visitor_puts_response_on_storage(self, visitor, body, storage, bucket_name): storage.clear_bucket(bucket_name) storage.put_object( **get_object_descriptor(body), data=gzip.compress(json.dumps(bytes_to_string(b"2")).encode()) ) response_body = visitor(body) assert "data" not in response_body assert json.loads( gzip.decompress(storage.get_object(bucket_name=bucket_name, object_name=response_body["responseFile"])) )["data"] == ["22"]