download strategy WIP: added 1 -> n upload logic
This commit is contained in:
parent
249c6203b2
commit
9962651d88
@ -30,8 +30,8 @@ def unique_hash(pages):
|
||||
|
||||
|
||||
def get_object_name(body):
|
||||
dossier_id, file_id = itemgetter("dossierId", "fileId")(body)
|
||||
object_name = f"{dossier_id}/{file_id}.{CONFIG.service.response_file_extension}"
|
||||
dossier_id, file_id, target_file_extension = itemgetter("dossierId", "fileId", "targetFileExtension")(body)
|
||||
object_name = f"{dossier_id}/{file_id}.{target_file_extension}"
|
||||
return object_name
|
||||
|
||||
|
||||
@ -43,11 +43,11 @@ def get_response_object_name(body):
|
||||
if "id" not in body:
|
||||
body["id"] = 0
|
||||
|
||||
dossier_id, file_id, pages, idnt, response_file_extension = itemgetter(
|
||||
"dossierId", "fileId", "pages", "id", "responseFileExtension"
|
||||
dossier_id, file_id, pages, idnt= itemgetter(
|
||||
"dossierId", "fileId", "pages", "id"
|
||||
)(body)
|
||||
|
||||
object_name = f"{dossier_id}/{file_id}/{unique_hash(pages)}-id:{idnt}.{response_file_extension}"
|
||||
object_name = f"{dossier_id}/{file_id}/{unique_hash(pages)}-id:{idnt}.{CONFIG.service.response_file_extension}"
|
||||
|
||||
return object_name
|
||||
|
||||
|
||||
5
test/fixtures/input.py
vendored
5
test/fixtures/input.py
vendored
@ -140,3 +140,8 @@ def metadata(n_items):
|
||||
@pytest.fixture
|
||||
def packages(input_data_items, metadata):
|
||||
return lstarlift(pack)(zip(input_data_items, metadata))
|
||||
|
||||
|
||||
@pytest.fixture(params=[False])
|
||||
def many_to_many(request):
|
||||
return request.param
|
||||
|
||||
@ -68,11 +68,15 @@ from test.utils.input import pair_data_with_queue_message
|
||||
"real",
|
||||
],
|
||||
)
|
||||
def test_serving(server_process, bucket_name, components, targets, data_message_pairs, n_items):
|
||||
def test_serving(server_process, bucket_name, components, targets, data_message_pairs, n_items, many_to_many):
|
||||
|
||||
storage, queue_manager, consumer = components
|
||||
|
||||
upload_data_to_storage_and_publish_requests_to_queue(storage, queue_manager, data_message_pairs)
|
||||
if many_to_many:
|
||||
upload_data_to_folder_in_storage_and_publish_single_request_to_queue(storage, queue_manager, data_message_pairs)
|
||||
else:
|
||||
upload_data_to_storage_and_publish_requests_to_queue(storage, queue_manager, data_message_pairs)
|
||||
|
||||
consumer.consume_and_publish(n=n_items)
|
||||
outputs = get_data_uploaded_by_consumer(queue_manager, storage, bucket_name)
|
||||
|
||||
@ -101,6 +105,16 @@ def upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager,
|
||||
queue_manager.publish_request(message)
|
||||
|
||||
|
||||
def upload_data_to_folder_in_storage_and_publish_single_request_to_queue(storage, queue_manager, data_message_pairs):
|
||||
for i, (data, message) in enumerate(data_message_pairs):
|
||||
object_descriptor = get_object_descriptor(message)
|
||||
object_name = object_descriptor["object_name"]
|
||||
object_descriptor["object_name"] = f"{object_name}/pages/{i}"
|
||||
storage.put_object(**object_descriptor, data=gzip.compress(data))
|
||||
|
||||
queue_manager.publish_request(message)
|
||||
|
||||
|
||||
def get_data_uploaded_by_consumer(queue_manager, storage, bucket_name):
|
||||
names_of_uploaded_files = lpluck("responseFile", queue_manager.output_queue.to_list())
|
||||
uploaded_files = starmap(storage.get_object, zip(repeat(bucket_name), names_of_uploaded_files))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user