wrapped gzip.(de)compress

This commit is contained in:
Matthias Bisping 2022-06-18 23:15:22 +02:00
parent f795cdbc9c
commit 02da828268
9 changed files with 28 additions and 26 deletions

View File

@ -1,5 +1,4 @@
import logging
from itertools import repeat
from operator import attrgetter
from azure.storage.blob import ContainerClient, BlobServiceClient

View File

@ -4,5 +4,13 @@ import json
from pyinfra.server.packing import bytes_to_string
def compress(data: bytes):
return gzip.compress(data)
def decompress(data: bytes):
return gzip.decompress(data)
def pack_for_upload(data: bytes):
return gzip.compress(json.dumps(bytes_to_string(data)).encode())
return compress(json.dumps(bytes_to_string(data)).encode())

View File

@ -1,9 +1,9 @@
import abc
import gzip
import logging
from pyinfra.config import parse_disjunction_string, CONFIG
from pyinfra.exceptions import DataLoadingFailure
from pyinfra.utils.encoding import decompress
class DownloadStrategy(abc.ABC):
@ -13,7 +13,7 @@ class DownloadStrategy(abc.ABC):
data = self.__download(storage, object_descriptor)
logging.debug(f"Downloaded {object_descriptor}.")
assert isinstance(data, bytes)
data = gzip.decompress(data)
data = decompress(data)
return [data]
@staticmethod

View File

@ -1,6 +1,4 @@
import gzip
from _operator import itemgetter
from copy import deepcopy
from functools import partial
from typing import Collection
@ -9,7 +7,8 @@ from funcy import compose
from pyinfra.config import parse_disjunction_string, CONFIG
from pyinfra.exceptions import InvalidMessage
from pyinfra.storage.storage import Storage
from pyinfra.utils.func import flift, lift
from pyinfra.utils.encoding import decompress
from pyinfra.utils.func import flift
from pyinfra.visitor.strategies.download.download import DownloadStrategy
@ -29,8 +28,8 @@ class MultiDownloadStrategy(DownloadStrategy):
return page_object_names
def download_and_decompress_object(self, storage, object_names):
download = lift(partial(storage.get_object, self.bucket_name))
return compose(lift(gzip.decompress), download)(object_names)
download = partial(storage.get_object, self.bucket_name)
return map(compose(decompress, download), object_names)
def download(self, storage: Storage, queue_item_body):
page_object_names = self.get_names_of_objects_by_pages(storage, queue_item_body["pages"])
@ -43,9 +42,6 @@ class MultiDownloadStrategy(DownloadStrategy):
def get_key(key):
return key if key in body else False
# TODO: deepcopy still necessary?
body = deepcopy(body)
folder = f"/{get_key('pages') or get_key('images')}/"
if not folder:
raise InvalidMessage("Expected a folder like 'images' oder 'pages' to be specified in message.")

View File

@ -1,4 +1,3 @@
import gzip
import json
from collections import deque
from typing import Callable
@ -7,6 +6,7 @@ from funcy import omit, filter
from more_itertools import peekable
from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing
from pyinfra.utils.encoding import compress
from pyinfra.visitor.dispatch.dispatch import DispatchCallback
from pyinfra.visitor.dispatch.identifier_dispatch import IdentifierDispatchCallback
from pyinfra.visitor.strategies.response.response import ResponseStrategy
@ -22,7 +22,7 @@ class AggregationStorageStrategy(ResponseStrategy):
def put_object(self, data: bytes, storage_upload_info):
object_descriptor = self.get_response_object_descriptor(storage_upload_info)
self.storage.put_object(**object_descriptor, data=gzip.compress(data))
self.storage.put_object(**object_descriptor, data=compress(data))
return {**storage_upload_info, "responseFile": object_descriptor["object_name"]}
def merge_queue_items(self):

View File

@ -1,6 +1,6 @@
import gzip
import json
from pyinfra.utils.encoding import compress
from pyinfra.visitor.strategies.response.response import ResponseStrategy
@ -10,7 +10,7 @@ class StorageStrategy(ResponseStrategy):
def handle_response(self, body: dict):
response_object_descriptor = self.get_response_object_descriptor(body)
self.storage.put_object(**response_object_descriptor, data=gzip.compress(json.dumps(body).encode()))
self.storage.put_object(**response_object_descriptor, data=compress(json.dumps(body).encode()))
body.pop("data")
body["responseFile"] = response_object_descriptor["object_name"]
return body

View File

@ -1,5 +1,4 @@
import argparse
import gzip
import os
from pathlib import Path
@ -7,6 +6,7 @@ from tqdm import tqdm
from pyinfra.config import CONFIG, parse_disjunction_string
from pyinfra.storage.storages import get_s3_storage
from pyinfra.utils.encoding import compress
def parse_args():
@ -31,7 +31,7 @@ def combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, extension)
def upload_compressed_response(storage, bucket_name, dossier_id, file_id, result) -> None:
data = gzip.compress(result.encode())
data = compress(result.encode())
path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, CONFIG.service.response.extension)
storage.put_object(bucket_name, path_gz, data)
@ -44,7 +44,7 @@ def add_file_compressed(storage, bucket_name, dossier_id, path) -> None:
path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, Path(path).stem, suffix_gz)
with open(path, "rb") as f:
data = gzip.compress(f.read())
data = compress(f.read())
storage.put_object(bucket_name, path_gz, data)

View File

@ -1,4 +1,3 @@
import gzip
import json
import re
from itertools import starmap, repeat, chain
@ -16,6 +15,7 @@ from pyinfra.default_objects import (
)
from pyinfra.queue.consumer import Consumer
from pyinfra.server.packing import unpack, pack
from pyinfra.utils.encoding import compress, decompress
from pyinfra.visitor import QueueVisitor
from pyinfra.visitor.utils import get_download_strategy
from test.utils.input import pair_data_with_queue_message
@ -139,7 +139,7 @@ def upload_data_to_storage_and_publish_requests_to_queue(storage, queue_manager,
# TODO: refactor; too many params
def upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message, download_strategy):
storage.put_object(**download_strategy.get_object_descriptor(message), data=gzip.compress(data))
storage.put_object(**download_strategy.get_object_descriptor(message), data=compress(data))
queue_manager.publish_request(message)
@ -156,7 +156,7 @@ def upload_data_to_folder_in_storage_and_publish_single_request_to_queue(
object_descriptor = download_strategy.get_object_descriptor(ref_message)
object_descriptor["object_name"] = build_filepath(object_descriptor, page)
storage.put_object(**object_descriptor, data=gzip.compress(data))
storage.put_object(**object_descriptor, data=compress(data))
queue_manager.publish_request(ref_message)
@ -200,7 +200,7 @@ def components(components_type, real_components, test_components, bucket_name):
def decode(storage_item):
storage_item = json.loads(gzip.decompress(storage_item).decode())
storage_item = json.loads(decompress(storage_item).decode())
if not isinstance(storage_item, list):
storage_item = [storage_item]

View File

@ -1,9 +1,8 @@
import gzip
import json
import pytest
from pyinfra.utils.encoding import pack_for_upload
from pyinfra.utils.encoding import pack_for_upload, decompress
from pyinfra.visitor.strategies.download.single import SingleDownloadStrategy
@ -37,5 +36,5 @@ class TestVisitor:
response_body = visitor(body)
assert "data" not in response_body
assert json.loads(
gzip.decompress(storage.get_object(bucket_name=bucket_name, object_name=response_body["responseFile"]))
decompress(storage.get_object(bucket_name=bucket_name, object_name=response_body["responseFile"]))
)["data"] == ["22"]