From 562e383bdd582e805f5656594ebcebb1ed3f77e3 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Tue, 21 Jun 2022 14:02:51 +0200 Subject: [PATCH] file pattern matching WIP: added first bit of file pattern matching logic to multi dl strat --- config.yaml | 3 ++ pyinfra/visitor/strategies/download/multi.py | 52 +++++++++++++++----- test/integration_tests/serve_test.py | 1 - 3 files changed, 44 insertions(+), 12 deletions(-) diff --git a/config.yaml b/config.yaml index caf7eea..5ecda68 100755 --- a/config.yaml +++ b/config.yaml @@ -9,6 +9,9 @@ service: # specified in the request download_strategy: $DOWNLOAD_STRATEGY|single response_formatter: default + operations: + default: + input_file_pattern: //ORIGIN.pdf.gz probing_webserver: host: $PROBING_WEBSERVER_HOST|"0.0.0.0" # Probe webserver address diff --git a/pyinfra/visitor/strategies/download/multi.py b/pyinfra/visitor/strategies/download/multi.py index bb5b3b5..1cb50fb 100644 --- a/pyinfra/visitor/strategies/download/multi.py +++ b/pyinfra/visitor/strategies/download/multi.py @@ -1,6 +1,5 @@ from _operator import itemgetter from functools import partial -from typing import Collection from funcy import compose @@ -12,19 +11,35 @@ from pyinfra.utils.func import flift from pyinfra.visitor.strategies.download.download import DownloadStrategy +def get_default_operation2file_patterns(): + return CONFIG.service.operations + + class MultiDownloadStrategy(DownloadStrategy): - def __init__(self): + def __init__(self, operation2file_patterns=None): # TODO: pass in bucket name from outside / introduce closure-like abstraction for the bucket self.bucket_name = parse_disjunction_string(CONFIG.storage.bucket) + self.operation2file_patterns = operation2file_patterns or get_default_operation2file_patterns() + + def __call__(self, storage, queue_item_body): + return self.download(storage, queue_item_body) + + def get_page_matcher(self, queue_item_body): + + pages = queue_item_body.get("pages", None) + + if "pages" in queue_item_body: + pages = "|".join(map(str, pages)) + page_matcher = "pages/id:(" + pages + ")." + + else: + page_matcher = "" - def get_page_matcher(self, pages): - pages = "|".join(map(str, pages)) - page_matcher = r".*id:(" + pages + r").*" return page_matcher - def get_names_of_objects_by_pages(self, storage, pages: Collection[int]): - matches_page = flift(self.get_page_matcher(pages)) - page_object_names = compose(matches_page, storage.get_all_object_names)(self.bucket_name) + def get_names_of_objects_by_pages(self, storage, file_pattern: str): + matches_pattern = flift(file_pattern) + page_object_names = compose(matches_pattern, storage.get_all_object_names)(self.bucket_name) return page_object_names def download_and_decompress_object(self, storage, object_names): @@ -32,7 +47,11 @@ class MultiDownloadStrategy(DownloadStrategy): return map(compose(decompress, download), object_names) def download(self, storage: Storage, queue_item_body): - page_object_names = self.get_names_of_objects_by_pages(storage, queue_item_body["pages"]) + file_pattern = self.instantiate_file_pattern_matcher(queue_item_body) + + print(file_pattern) + + page_object_names = self.get_names_of_objects_by_pages(storage, file_pattern) objects = self.download_and_decompress_object(storage, page_object_names) return objects @@ -54,5 +73,16 @@ class MultiDownloadStrategy(DownloadStrategy): return object_name - def __call__(self, storage, queue_item_body): - return self.download(storage, queue_item_body) + def instantiate_file_pattern_matcher(self, queue_item_body): + + operation = queue_item_body.get("operation", "default") + + file_pattern = self.operation2file_patterns[operation]["input_file_pattern"] + + dossier_id, file_id = itemgetter("dossierId", "fileId")(queue_item_body) + + file_pattern = file_pattern.replace("", dossier_id) + file_pattern = file_pattern.replace("", file_id) + file_pattern = file_pattern.replace("", self.get_page_matcher(queue_item_body)) + + return file_pattern diff --git a/test/integration_tests/serve_test.py b/test/integration_tests/serve_test.py index 2eeffd3..97f931a 100644 --- a/test/integration_tests/serve_test.py +++ b/test/integration_tests/serve_test.py @@ -164,7 +164,6 @@ def upload_data_to_folder_in_storage_and_publish_single_request_to_queue( def build_filepath(object_descriptor, page): object_name = object_descriptor["object_name"] parts = object_name.split("/") - parts.insert(-1, "pages") path = "/".join(parts) path = re.sub("id:\d", f"id:{page}", path)