From 9e2ed6a9f9f83b7da84ec8538de4c4e1f39b4556 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Tue, 24 May 2022 14:51:24 +0200 Subject: [PATCH] fix: data was doubly encoded and hence always triggering the immediate upload path --- pyinfra/visitor.py | 12 ++++---- requirements.txt | 1 + test/integration_tests/serve_test.py | 45 ++++++++++++---------------- 3 files changed, 26 insertions(+), 32 deletions(-) diff --git a/pyinfra/visitor.py b/pyinfra/visitor.py index 5eccb94..bb0eb85 100644 --- a/pyinfra/visitor.py +++ b/pyinfra/visitor.py @@ -99,9 +99,9 @@ class AggregationStorageStrategy(ResponseStrategy): self.dispatch_callback = dispatch_callback or IdentifierDispatchCallback() self.buffer = deque() - def put_object(self, data, metadata): + def put_object(self, data: bytes, metadata): object_descriptor = get_response_object_descriptor(metadata) - self.storage.put_object(**object_descriptor, data=gzip.compress(json.dumps(data).encode())) + self.storage.put_object(**object_descriptor, data=data) def merge_queue_items(self): merged_buffer_content = self.merger(self.buffer) @@ -109,13 +109,13 @@ class AggregationStorageStrategy(ResponseStrategy): return merged_buffer_content def upload_queue_items(self, metadata): - data = self.merge_queue_items() + data = json.dumps(self.merge_queue_items()).encode() self.put_object(data, metadata) def upload_or_aggregate(self, data, metadata): if isinstance(data, str): - self.put_object(data, metadata) + self.put_object(data.encode(), metadata) else: self.buffer.append(data) @@ -194,8 +194,8 @@ class QueueVisitor: def load_and_process(self, body): data_from_storage = self.load_data(body) - result = self.process_data(data_from_storage) - result = lmap(json.dumps, result) + result = list(self.process_data(data_from_storage)) + # result = lmap(json.dumps, result) result_body = {"result_data": result, **body} return result_body diff --git a/requirements.txt b/requirements.txt index dafad0c..c046d66 100755 --- a/requirements.txt +++ b/requirements.txt @@ -14,3 +14,4 @@ pytest~=7.0.1 funcy==1.17 fpdf==1.7.2 PyMuPDF==1.19.6 +frozendict==2.3.2 diff --git a/test/integration_tests/serve_test.py b/test/integration_tests/serve_test.py index 8feced8..9400d44 100644 --- a/test/integration_tests/serve_test.py +++ b/test/integration_tests/serve_test.py @@ -1,39 +1,32 @@ import gzip import json import logging -from itertools import starmap, repeat +from itertools import starmap, repeat, chain import pytest -from funcy import lfilter, lmap, compose, lzip +from frozendict import frozendict +from funcy import lfilter, compose, lzip from pyinfra.default_objects import get_visitor, get_queue_manager, get_storage, get_consumer, get_callback from pyinfra.server.packing import bytes_to_string, unpack, pack +from pyinfra.utils.func import star from pyinfra.visitor import get_object_descriptor from test.utils.input import adorn_data_with_storage_info logger = logging.getLogger(__name__) +def freeze(data, metadata): + return data, frozendict(metadata) + + @pytest.mark.parametrize("one_to_many", [False, True]) -@pytest.mark.parametrize("analysis_task", [False]) +@pytest.mark.parametrize("analysis_task", [False, True]) @pytest.mark.parametrize("n_items", [2]) @pytest.mark.parametrize("n_pages", [1]) @pytest.mark.parametrize("buffer_size", [2]) -@pytest.mark.parametrize( - "storage_item_has_metadata", - [ - True, - False - ], -) -@pytest.mark.parametrize( - "item_type", - [ - "string", - "image", - "pdf", - ], -) +@pytest.mark.parametrize("storage_item_has_metadata", [True, False]) +@pytest.mark.parametrize("item_type", ["string", "image", "pdf"]) def test_serving( server_process, input_data_items, @@ -45,7 +38,6 @@ def test_serving( target_data_items, targets, ): - print() callback = get_callback(endpoint) visitor = get_visitor(callback) @@ -63,6 +55,8 @@ def test_serving( metadata = repeat({}) targets = lzip(target_data_items, metadata) + targets = {*starmap(freeze, targets)} + adorned_data_metadata_packs = adorn_data_with_storage_info(data_metadata_packs) for data, message in adorned_data_metadata_packs: @@ -75,15 +69,14 @@ def test_serving( queue_manager.publish_response(req, visitor) def decode(storage_item): - repr = gzip.decompress(storage_item).decode().replace(r"\"", "'").replace('"', "").replace("'", '"') - storage_item = json.loads(repr) - data, metadata = unpack(storage_item) - return data, metadata + storage_item = json.loads(storage_item.decode()) + if not isinstance(storage_item, list): + storage_item = [storage_item] + + yield from map(compose(star(freeze), unpack), storage_item) names_of_uploaded_files = lfilter(".out", storage.get_all_object_names(bucket_name)) uploaded_files = [storage.get_object(bucket_name, fn) for fn in names_of_uploaded_files] - outputs = lmap(decode, uploaded_files) - print(outputs) - print(targets) + outputs = {*chain(*map(decode, uploaded_files))} assert outputs == targets