fix: data was doubly encoded and hence always triggering the immediate upload path
This commit is contained in:
parent
ab56c9a173
commit
9e2ed6a9f9
@ -99,9 +99,9 @@ class AggregationStorageStrategy(ResponseStrategy):
|
||||
self.dispatch_callback = dispatch_callback or IdentifierDispatchCallback()
|
||||
self.buffer = deque()
|
||||
|
||||
def put_object(self, data, metadata):
|
||||
def put_object(self, data: bytes, metadata):
|
||||
object_descriptor = get_response_object_descriptor(metadata)
|
||||
self.storage.put_object(**object_descriptor, data=gzip.compress(json.dumps(data).encode()))
|
||||
self.storage.put_object(**object_descriptor, data=data)
|
||||
|
||||
def merge_queue_items(self):
|
||||
merged_buffer_content = self.merger(self.buffer)
|
||||
@ -109,13 +109,13 @@ class AggregationStorageStrategy(ResponseStrategy):
|
||||
return merged_buffer_content
|
||||
|
||||
def upload_queue_items(self, metadata):
|
||||
data = self.merge_queue_items()
|
||||
data = json.dumps(self.merge_queue_items()).encode()
|
||||
self.put_object(data, metadata)
|
||||
|
||||
def upload_or_aggregate(self, data, metadata):
|
||||
|
||||
if isinstance(data, str):
|
||||
self.put_object(data, metadata)
|
||||
self.put_object(data.encode(), metadata)
|
||||
|
||||
else:
|
||||
self.buffer.append(data)
|
||||
@ -194,8 +194,8 @@ class QueueVisitor:
|
||||
|
||||
def load_and_process(self, body):
|
||||
data_from_storage = self.load_data(body)
|
||||
result = self.process_data(data_from_storage)
|
||||
result = lmap(json.dumps, result)
|
||||
result = list(self.process_data(data_from_storage))
|
||||
# result = lmap(json.dumps, result)
|
||||
result_body = {"result_data": result, **body}
|
||||
return result_body
|
||||
|
||||
|
||||
@ -14,3 +14,4 @@ pytest~=7.0.1
|
||||
funcy==1.17
|
||||
fpdf==1.7.2
|
||||
PyMuPDF==1.19.6
|
||||
frozendict==2.3.2
|
||||
|
||||
@ -1,39 +1,32 @@
|
||||
import gzip
|
||||
import json
|
||||
import logging
|
||||
from itertools import starmap, repeat
|
||||
from itertools import starmap, repeat, chain
|
||||
|
||||
import pytest
|
||||
from funcy import lfilter, lmap, compose, lzip
|
||||
from frozendict import frozendict
|
||||
from funcy import lfilter, compose, lzip
|
||||
|
||||
from pyinfra.default_objects import get_visitor, get_queue_manager, get_storage, get_consumer, get_callback
|
||||
from pyinfra.server.packing import bytes_to_string, unpack, pack
|
||||
from pyinfra.utils.func import star
|
||||
from pyinfra.visitor import get_object_descriptor
|
||||
from test.utils.input import adorn_data_with_storage_info
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def freeze(data, metadata):
|
||||
return data, frozendict(metadata)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("one_to_many", [False, True])
|
||||
@pytest.mark.parametrize("analysis_task", [False])
|
||||
@pytest.mark.parametrize("analysis_task", [False, True])
|
||||
@pytest.mark.parametrize("n_items", [2])
|
||||
@pytest.mark.parametrize("n_pages", [1])
|
||||
@pytest.mark.parametrize("buffer_size", [2])
|
||||
@pytest.mark.parametrize(
|
||||
"storage_item_has_metadata",
|
||||
[
|
||||
True,
|
||||
False
|
||||
],
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
"item_type",
|
||||
[
|
||||
"string",
|
||||
"image",
|
||||
"pdf",
|
||||
],
|
||||
)
|
||||
@pytest.mark.parametrize("storage_item_has_metadata", [True, False])
|
||||
@pytest.mark.parametrize("item_type", ["string", "image", "pdf"])
|
||||
def test_serving(
|
||||
server_process,
|
||||
input_data_items,
|
||||
@ -45,7 +38,6 @@ def test_serving(
|
||||
target_data_items,
|
||||
targets,
|
||||
):
|
||||
print()
|
||||
|
||||
callback = get_callback(endpoint)
|
||||
visitor = get_visitor(callback)
|
||||
@ -63,6 +55,8 @@ def test_serving(
|
||||
metadata = repeat({})
|
||||
targets = lzip(target_data_items, metadata)
|
||||
|
||||
targets = {*starmap(freeze, targets)}
|
||||
|
||||
adorned_data_metadata_packs = adorn_data_with_storage_info(data_metadata_packs)
|
||||
|
||||
for data, message in adorned_data_metadata_packs:
|
||||
@ -75,15 +69,14 @@ def test_serving(
|
||||
queue_manager.publish_response(req, visitor)
|
||||
|
||||
def decode(storage_item):
|
||||
repr = gzip.decompress(storage_item).decode().replace(r"\"", "'").replace('"', "").replace("'", '"')
|
||||
storage_item = json.loads(repr)
|
||||
data, metadata = unpack(storage_item)
|
||||
return data, metadata
|
||||
storage_item = json.loads(storage_item.decode())
|
||||
if not isinstance(storage_item, list):
|
||||
storage_item = [storage_item]
|
||||
|
||||
yield from map(compose(star(freeze), unpack), storage_item)
|
||||
|
||||
names_of_uploaded_files = lfilter(".out", storage.get_all_object_names(bucket_name))
|
||||
uploaded_files = [storage.get_object(bucket_name, fn) for fn in names_of_uploaded_files]
|
||||
|
||||
outputs = lmap(decode, uploaded_files)
|
||||
print(outputs)
|
||||
print(targets)
|
||||
outputs = {*chain(*map(decode, uploaded_files))}
|
||||
assert outputs == targets
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user