From 9962651d885d9b8100b5521a8a217dcf834fc90a Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Fri, 10 Jun 2022 14:06:13 +0200 Subject: [PATCH] download strategy WIP: added 1 -> n upload logic --- pyinfra/visitor.py | 10 +++++----- test/fixtures/input.py | 5 +++++ test/integration_tests/serve_test.py | 18 ++++++++++++++++-- 3 files changed, 26 insertions(+), 7 deletions(-) diff --git a/pyinfra/visitor.py b/pyinfra/visitor.py index 2beb3ce..d7d560e 100644 --- a/pyinfra/visitor.py +++ b/pyinfra/visitor.py @@ -30,8 +30,8 @@ def unique_hash(pages): def get_object_name(body): - dossier_id, file_id = itemgetter("dossierId", "fileId")(body) - object_name = f"{dossier_id}/{file_id}.{CONFIG.service.response_file_extension}" + dossier_id, file_id, target_file_extension = itemgetter("dossierId", "fileId", "targetFileExtension")(body) + object_name = f"{dossier_id}/{file_id}.{target_file_extension}" return object_name @@ -43,11 +43,11 @@ def get_response_object_name(body): if "id" not in body: body["id"] = 0 - dossier_id, file_id, pages, idnt, response_file_extension = itemgetter( - "dossierId", "fileId", "pages", "id", "responseFileExtension" + dossier_id, file_id, pages, idnt= itemgetter( + "dossierId", "fileId", "pages", "id" )(body) - object_name = f"{dossier_id}/{file_id}/{unique_hash(pages)}-id:{idnt}.{response_file_extension}" + object_name = f"{dossier_id}/{file_id}/{unique_hash(pages)}-id:{idnt}.{CONFIG.service.response_file_extension}" return object_name diff --git a/test/fixtures/input.py b/test/fixtures/input.py index b27b43f..8c76f7f 100644 --- a/test/fixtures/input.py +++ b/test/fixtures/input.py @@ -140,3 +140,8 @@ def metadata(n_items): @pytest.fixture def packages(input_data_items, metadata): return lstarlift(pack)(zip(input_data_items, metadata)) + + +@pytest.fixture(params=[False]) +def many_to_many(request): + return request.param diff --git a/test/integration_tests/serve_test.py b/test/integration_tests/serve_test.py index bfbd4a6..494b91c 100644 --- a/test/integration_tests/serve_test.py +++ b/test/integration_tests/serve_test.py @@ -68,11 +68,15 @@ from test.utils.input import pair_data_with_queue_message "real", ], ) -def test_serving(server_process, bucket_name, components, targets, data_message_pairs, n_items): +def test_serving(server_process, bucket_name, components, targets, data_message_pairs, n_items, many_to_many): storage, queue_manager, consumer = components - upload_data_to_storage_and_publish_requests_to_queue(storage, queue_manager, data_message_pairs) + if many_to_many: + upload_data_to_folder_in_storage_and_publish_single_request_to_queue(storage, queue_manager, data_message_pairs) + else: + upload_data_to_storage_and_publish_requests_to_queue(storage, queue_manager, data_message_pairs) + consumer.consume_and_publish(n=n_items) outputs = get_data_uploaded_by_consumer(queue_manager, storage, bucket_name) @@ -101,6 +105,16 @@ def upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, queue_manager.publish_request(message) +def upload_data_to_folder_in_storage_and_publish_single_request_to_queue(storage, queue_manager, data_message_pairs): + for i, (data, message) in enumerate(data_message_pairs): + object_descriptor = get_object_descriptor(message) + object_name = object_descriptor["object_name"] + object_descriptor["object_name"] = f"{object_name}/pages/{i}" + storage.put_object(**object_descriptor, data=gzip.compress(data)) + + queue_manager.publish_request(message) + + def get_data_uploaded_by_consumer(queue_manager, storage, bucket_name): names_of_uploaded_files = lpluck("responseFile", queue_manager.output_queue.to_list()) uploaded_files = starmap(storage.get_object, zip(repeat(bucket_name), names_of_uploaded_files))