diff --git a/pyinfra/storage/adapters/azure.py b/pyinfra/storage/adapters/azure.py index 9f908d8..1aad2af 100644 --- a/pyinfra/storage/adapters/azure.py +++ b/pyinfra/storage/adapters/azure.py @@ -1,5 +1,4 @@ import logging -from itertools import repeat from operator import attrgetter from azure.storage.blob import ContainerClient, BlobServiceClient diff --git a/pyinfra/utils/encoding.py b/pyinfra/utils/encoding.py index d6ed7d3..1aedb22 100644 --- a/pyinfra/utils/encoding.py +++ b/pyinfra/utils/encoding.py @@ -4,5 +4,13 @@ import json from pyinfra.server.packing import bytes_to_string +def compress(data: bytes): + return gzip.compress(data) + + +def decompress(data: bytes): + return gzip.decompress(data) + + def pack_for_upload(data: bytes): - return gzip.compress(json.dumps(bytes_to_string(data)).encode()) + return compress(json.dumps(bytes_to_string(data)).encode()) diff --git a/pyinfra/visitor/strategies/download/download.py b/pyinfra/visitor/strategies/download/download.py index 574671f..0f7bc50 100644 --- a/pyinfra/visitor/strategies/download/download.py +++ b/pyinfra/visitor/strategies/download/download.py @@ -1,9 +1,9 @@ import abc -import gzip import logging from pyinfra.config import parse_disjunction_string, CONFIG from pyinfra.exceptions import DataLoadingFailure +from pyinfra.utils.encoding import decompress class DownloadStrategy(abc.ABC): @@ -13,7 +13,7 @@ class DownloadStrategy(abc.ABC): data = self.__download(storage, object_descriptor) logging.debug(f"Downloaded {object_descriptor}.") assert isinstance(data, bytes) - data = gzip.decompress(data) + data = decompress(data) return [data] @staticmethod diff --git a/pyinfra/visitor/strategies/download/multi.py b/pyinfra/visitor/strategies/download/multi.py index f09932c..bb5b3b5 100644 --- a/pyinfra/visitor/strategies/download/multi.py +++ b/pyinfra/visitor/strategies/download/multi.py @@ -1,6 +1,4 @@ -import gzip from _operator import itemgetter -from copy import deepcopy from functools import partial from typing import Collection @@ -9,7 +7,8 @@ from funcy import compose from pyinfra.config import parse_disjunction_string, CONFIG from pyinfra.exceptions import InvalidMessage from pyinfra.storage.storage import Storage -from pyinfra.utils.func import flift, lift +from pyinfra.utils.encoding import decompress +from pyinfra.utils.func import flift from pyinfra.visitor.strategies.download.download import DownloadStrategy @@ -29,8 +28,8 @@ class MultiDownloadStrategy(DownloadStrategy): return page_object_names def download_and_decompress_object(self, storage, object_names): - download = lift(partial(storage.get_object, self.bucket_name)) - return compose(lift(gzip.decompress), download)(object_names) + download = partial(storage.get_object, self.bucket_name) + return map(compose(decompress, download), object_names) def download(self, storage: Storage, queue_item_body): page_object_names = self.get_names_of_objects_by_pages(storage, queue_item_body["pages"]) @@ -43,9 +42,6 @@ class MultiDownloadStrategy(DownloadStrategy): def get_key(key): return key if key in body else False - # TODO: deepcopy still necessary? - body = deepcopy(body) - folder = f"/{get_key('pages') or get_key('images')}/" if not folder: raise InvalidMessage("Expected a folder like 'images' oder 'pages' to be specified in message.") diff --git a/pyinfra/visitor/strategies/response/aggregation.py b/pyinfra/visitor/strategies/response/aggregation.py index 28a6c9d..fb8024d 100644 --- a/pyinfra/visitor/strategies/response/aggregation.py +++ b/pyinfra/visitor/strategies/response/aggregation.py @@ -1,4 +1,3 @@ -import gzip import json from collections import deque from typing import Callable @@ -7,6 +6,7 @@ from funcy import omit, filter from more_itertools import peekable from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing +from pyinfra.utils.encoding import compress from pyinfra.visitor.dispatch.dispatch import DispatchCallback from pyinfra.visitor.dispatch.identifier_dispatch import IdentifierDispatchCallback from pyinfra.visitor.strategies.response.response import ResponseStrategy @@ -22,7 +22,7 @@ class AggregationStorageStrategy(ResponseStrategy): def put_object(self, data: bytes, storage_upload_info): object_descriptor = self.get_response_object_descriptor(storage_upload_info) - self.storage.put_object(**object_descriptor, data=gzip.compress(data)) + self.storage.put_object(**object_descriptor, data=compress(data)) return {**storage_upload_info, "responseFile": object_descriptor["object_name"]} def merge_queue_items(self): diff --git a/pyinfra/visitor/strategies/response/storage.py b/pyinfra/visitor/strategies/response/storage.py index 0749c38..b0be8cf 100644 --- a/pyinfra/visitor/strategies/response/storage.py +++ b/pyinfra/visitor/strategies/response/storage.py @@ -1,6 +1,6 @@ -import gzip import json +from pyinfra.utils.encoding import compress from pyinfra.visitor.strategies.response.response import ResponseStrategy @@ -10,7 +10,7 @@ class StorageStrategy(ResponseStrategy): def handle_response(self, body: dict): response_object_descriptor = self.get_response_object_descriptor(body) - self.storage.put_object(**response_object_descriptor, data=gzip.compress(json.dumps(body).encode())) + self.storage.put_object(**response_object_descriptor, data=compress(json.dumps(body).encode())) body.pop("data") body["responseFile"] = response_object_descriptor["object_name"] return body diff --git a/scripts/manage_minio.py b/scripts/manage_minio.py index 88e1127..d9e7eaa 100644 --- a/scripts/manage_minio.py +++ b/scripts/manage_minio.py @@ -1,5 +1,4 @@ import argparse -import gzip import os from pathlib import Path @@ -7,6 +6,7 @@ from tqdm import tqdm from pyinfra.config import CONFIG, parse_disjunction_string from pyinfra.storage.storages import get_s3_storage +from pyinfra.utils.encoding import compress def parse_args(): @@ -31,7 +31,7 @@ def combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, extension) def upload_compressed_response(storage, bucket_name, dossier_id, file_id, result) -> None: - data = gzip.compress(result.encode()) + data = compress(result.encode()) path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, CONFIG.service.response.extension) storage.put_object(bucket_name, path_gz, data) @@ -44,7 +44,7 @@ def add_file_compressed(storage, bucket_name, dossier_id, path) -> None: path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, Path(path).stem, suffix_gz) with open(path, "rb") as f: - data = gzip.compress(f.read()) + data = compress(f.read()) storage.put_object(bucket_name, path_gz, data) diff --git a/test/integration_tests/serve_test.py b/test/integration_tests/serve_test.py index f6ad94f..9046e95 100644 --- a/test/integration_tests/serve_test.py +++ b/test/integration_tests/serve_test.py @@ -1,4 +1,3 @@ -import gzip import json import re from itertools import starmap, repeat, chain @@ -16,6 +15,7 @@ from pyinfra.default_objects import ( ) from pyinfra.queue.consumer import Consumer from pyinfra.server.packing import unpack, pack +from pyinfra.utils.encoding import compress, decompress from pyinfra.visitor import QueueVisitor from pyinfra.visitor.utils import get_download_strategy from test.utils.input import pair_data_with_queue_message @@ -139,7 +139,7 @@ def upload_data_to_storage_and_publish_requests_to_queue(storage, queue_manager, # TODO: refactor; too many params def upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message, download_strategy): - storage.put_object(**download_strategy.get_object_descriptor(message), data=gzip.compress(data)) + storage.put_object(**download_strategy.get_object_descriptor(message), data=compress(data)) queue_manager.publish_request(message) @@ -156,7 +156,7 @@ def upload_data_to_folder_in_storage_and_publish_single_request_to_queue( object_descriptor = download_strategy.get_object_descriptor(ref_message) object_descriptor["object_name"] = build_filepath(object_descriptor, page) - storage.put_object(**object_descriptor, data=gzip.compress(data)) + storage.put_object(**object_descriptor, data=compress(data)) queue_manager.publish_request(ref_message) @@ -200,7 +200,7 @@ def components(components_type, real_components, test_components, bucket_name): def decode(storage_item): - storage_item = json.loads(gzip.decompress(storage_item).decode()) + storage_item = json.loads(decompress(storage_item).decode()) if not isinstance(storage_item, list): storage_item = [storage_item] diff --git a/test/unit_tests/queue_visitor_test.py b/test/unit_tests/queue_visitor_test.py index 8485a54..e5da21e 100644 --- a/test/unit_tests/queue_visitor_test.py +++ b/test/unit_tests/queue_visitor_test.py @@ -1,9 +1,8 @@ -import gzip import json import pytest -from pyinfra.utils.encoding import pack_for_upload +from pyinfra.utils.encoding import pack_for_upload, decompress from pyinfra.visitor.strategies.download.single import SingleDownloadStrategy @@ -37,5 +36,5 @@ class TestVisitor: response_body = visitor(body) assert "data" not in response_body assert json.loads( - gzip.decompress(storage.get_object(bucket_name=bucket_name, object_name=response_body["responseFile"])) + decompress(storage.get_object(bucket_name=bucket_name, object_name=response_body["responseFile"])) )["data"] == ["22"]