file pattern matching WIP: added first bit of file pattern matching logic to multi dl strat
This commit is contained in:
parent
8e3badd053
commit
562e383bdd
@ -9,6 +9,9 @@ service:
|
||||
# specified in the request
|
||||
download_strategy: $DOWNLOAD_STRATEGY|single
|
||||
response_formatter: default
|
||||
operations:
|
||||
default:
|
||||
input_file_pattern: <dossier_id>/<file_id>/<id>ORIGIN.pdf.gz
|
||||
|
||||
probing_webserver:
|
||||
host: $PROBING_WEBSERVER_HOST|"0.0.0.0" # Probe webserver address
|
||||
|
||||
@ -1,6 +1,5 @@
|
||||
from _operator import itemgetter
|
||||
from functools import partial
|
||||
from typing import Collection
|
||||
|
||||
from funcy import compose
|
||||
|
||||
@ -12,19 +11,35 @@ from pyinfra.utils.func import flift
|
||||
from pyinfra.visitor.strategies.download.download import DownloadStrategy
|
||||
|
||||
|
||||
def get_default_operation2file_patterns():
|
||||
return CONFIG.service.operations
|
||||
|
||||
|
||||
class MultiDownloadStrategy(DownloadStrategy):
|
||||
def __init__(self):
|
||||
def __init__(self, operation2file_patterns=None):
|
||||
# TODO: pass in bucket name from outside / introduce closure-like abstraction for the bucket
|
||||
self.bucket_name = parse_disjunction_string(CONFIG.storage.bucket)
|
||||
self.operation2file_patterns = operation2file_patterns or get_default_operation2file_patterns()
|
||||
|
||||
def __call__(self, storage, queue_item_body):
|
||||
return self.download(storage, queue_item_body)
|
||||
|
||||
def get_page_matcher(self, queue_item_body):
|
||||
|
||||
pages = queue_item_body.get("pages", None)
|
||||
|
||||
if "pages" in queue_item_body:
|
||||
pages = "|".join(map(str, pages))
|
||||
page_matcher = "pages/id:(" + pages + ")."
|
||||
|
||||
else:
|
||||
page_matcher = ""
|
||||
|
||||
def get_page_matcher(self, pages):
|
||||
pages = "|".join(map(str, pages))
|
||||
page_matcher = r".*id:(" + pages + r").*"
|
||||
return page_matcher
|
||||
|
||||
def get_names_of_objects_by_pages(self, storage, pages: Collection[int]):
|
||||
matches_page = flift(self.get_page_matcher(pages))
|
||||
page_object_names = compose(matches_page, storage.get_all_object_names)(self.bucket_name)
|
||||
def get_names_of_objects_by_pages(self, storage, file_pattern: str):
|
||||
matches_pattern = flift(file_pattern)
|
||||
page_object_names = compose(matches_pattern, storage.get_all_object_names)(self.bucket_name)
|
||||
return page_object_names
|
||||
|
||||
def download_and_decompress_object(self, storage, object_names):
|
||||
@ -32,7 +47,11 @@ class MultiDownloadStrategy(DownloadStrategy):
|
||||
return map(compose(decompress, download), object_names)
|
||||
|
||||
def download(self, storage: Storage, queue_item_body):
|
||||
page_object_names = self.get_names_of_objects_by_pages(storage, queue_item_body["pages"])
|
||||
file_pattern = self.instantiate_file_pattern_matcher(queue_item_body)
|
||||
|
||||
print(file_pattern)
|
||||
|
||||
page_object_names = self.get_names_of_objects_by_pages(storage, file_pattern)
|
||||
objects = self.download_and_decompress_object(storage, page_object_names)
|
||||
|
||||
return objects
|
||||
@ -54,5 +73,16 @@ class MultiDownloadStrategy(DownloadStrategy):
|
||||
|
||||
return object_name
|
||||
|
||||
def __call__(self, storage, queue_item_body):
|
||||
return self.download(storage, queue_item_body)
|
||||
def instantiate_file_pattern_matcher(self, queue_item_body):
|
||||
|
||||
operation = queue_item_body.get("operation", "default")
|
||||
|
||||
file_pattern = self.operation2file_patterns[operation]["input_file_pattern"]
|
||||
|
||||
dossier_id, file_id = itemgetter("dossierId", "fileId")(queue_item_body)
|
||||
|
||||
file_pattern = file_pattern.replace("<dossier_id>", dossier_id)
|
||||
file_pattern = file_pattern.replace("<file_id>", file_id)
|
||||
file_pattern = file_pattern.replace("<id>", self.get_page_matcher(queue_item_body))
|
||||
|
||||
return file_pattern
|
||||
|
||||
@ -164,7 +164,6 @@ def upload_data_to_folder_in_storage_and_publish_single_request_to_queue(
|
||||
def build_filepath(object_descriptor, page):
|
||||
object_name = object_descriptor["object_name"]
|
||||
parts = object_name.split("/")
|
||||
parts.insert(-1, "pages")
|
||||
path = "/".join(parts)
|
||||
path = re.sub("id:\d", f"id:{page}", path)
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user