refactoring: multi dl strat -> downloader, rm single dl strat
This commit is contained in:
parent
afffdeb993
commit
efd36d0fc4
@ -1,5 +1,6 @@
|
||||
"""Implements a config object with dot-indexing syntax."""
|
||||
import os
|
||||
from functools import partial
|
||||
from itertools import chain
|
||||
from operator import truth
|
||||
from typing import Iterable
|
||||
@ -47,20 +48,23 @@ class Config:
|
||||
def __setitem__(self, key, value):
|
||||
self.__config.key = value
|
||||
|
||||
def to_dict(self):
|
||||
return to_dict(self.__config.export())
|
||||
def to_dict(self, frozen=True):
|
||||
return to_dict(self.__config.export(), frozen=frozen)
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.to_dict())
|
||||
|
||||
|
||||
def to_dict(v):
|
||||
def to_dict(v, frozen=True):
|
||||
def make_dict(*args, **kwargs):
|
||||
return (frozendict if frozen else dict)(*args, **kwargs)
|
||||
|
||||
if isinstance(v, list):
|
||||
return tuple(map(to_dict, v))
|
||||
return tuple(map(partial(to_dict, frozen=frozen), v))
|
||||
elif isinstance(v, DotIndexable):
|
||||
return frozendict({k: to_dict(v) for k, v in v.x.items()})
|
||||
return make_dict({k: to_dict(v, frozen=frozen) for k, v in v.x.items()})
|
||||
elif isinstance(v, dict):
|
||||
return frozendict({k: to_dict(v) for k, v in v.items()})
|
||||
return make_dict({k: to_dict(v, frozen=frozen) for k, v in v.items()})
|
||||
else:
|
||||
return v
|
||||
|
||||
|
||||
@ -17,8 +17,7 @@ from pyinfra.storage import storages
|
||||
from pyinfra.visitor import QueueVisitor
|
||||
from pyinfra.visitor.response_formatter.formatters.default import DefaultResponseFormatter
|
||||
from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter
|
||||
from pyinfra.visitor.strategies.download.multi import MultiDownloadStrategy
|
||||
from pyinfra.visitor.strategies.download.single import SingleDownloadStrategy
|
||||
from pyinfra.visitor.strategies.download.multi import Downloader
|
||||
from pyinfra.visitor.strategies.response.aggregation import AggregationStorageStrategy
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -54,9 +53,8 @@ class ComponentFactory:
|
||||
@lru_cache(maxsize=None)
|
||||
def get_visitor(self, callback):
|
||||
return QueueVisitor(
|
||||
storage=self.get_storage(),
|
||||
callback=callback,
|
||||
download_strategy=self.get_download_strategy(),
|
||||
downloader=self.get_downloader(),
|
||||
response_strategy=self.get_response_strategy(),
|
||||
response_formatter=self.get_response_formatter(),
|
||||
)
|
||||
@ -93,24 +91,13 @@ class ComponentFactory:
|
||||
return self.config.service.operations
|
||||
|
||||
@lru_cache(maxsize=None)
|
||||
def get_download_strategy(self, download_strategy_type=None):
|
||||
download_strategies = {
|
||||
"single": self.get_single_download_strategy(),
|
||||
"multi": self.get_multi_download_strategy(),
|
||||
}
|
||||
return download_strategies.get(
|
||||
download_strategy_type or self.config.service.download_strategy,
|
||||
self.get_single_download_strategy(),
|
||||
def get_downloader(self, storage=None):
|
||||
return Downloader(
|
||||
storage=storage or self.get_storage(),
|
||||
bucket_name=parse_disjunction_string(self.config.storage.bucket),
|
||||
file_descriptor_manager=self.get_file_descriptor_manager(),
|
||||
)
|
||||
|
||||
@lru_cache(maxsize=None)
|
||||
def get_single_download_strategy(self):
|
||||
return SingleDownloadStrategy(self.get_file_descriptor_manager())
|
||||
|
||||
@lru_cache(maxsize=None)
|
||||
def get_multi_download_strategy(self):
|
||||
return MultiDownloadStrategy(self.get_file_descriptor_manager())
|
||||
|
||||
|
||||
class Callback:
|
||||
def __init__(self, base_url):
|
||||
|
||||
0
pyinfra/visitor/downloader.py
Normal file
0
pyinfra/visitor/downloader.py
Normal file
@ -2,36 +2,34 @@ from functools import partial
|
||||
|
||||
from funcy import compose
|
||||
|
||||
from pyinfra.config import parse_disjunction_string, CONFIG
|
||||
from pyinfra.file_descriptor_manager import FileDescriptorManager
|
||||
from pyinfra.storage.storage import Storage
|
||||
from pyinfra.utils.encoding import decompress
|
||||
from pyinfra.utils.func import flift
|
||||
from pyinfra.visitor.strategies.download.download import DownloadStrategy
|
||||
|
||||
|
||||
class MultiDownloadStrategy(DownloadStrategy):
|
||||
def __init__(self, file_descriptor_manager: FileDescriptorManager):
|
||||
# TODO: pass in bucket name from outside / introduce closure-like abstraction for the bucket
|
||||
self.bucket_name = parse_disjunction_string(CONFIG.storage.bucket)
|
||||
super().__init__(file_descriptor_manager=file_descriptor_manager)
|
||||
class Downloader:
|
||||
def __init__(self, storage: Storage, bucket_name, file_descriptor_manager: FileDescriptorManager):
|
||||
self.storage = storage
|
||||
self.bucket_name = bucket_name
|
||||
self.file_descriptor_manager = file_descriptor_manager
|
||||
|
||||
def __call__(self, storage, queue_item_body):
|
||||
return self.download(storage, queue_item_body)
|
||||
def __call__(self, queue_item_body):
|
||||
return self.download(queue_item_body)
|
||||
|
||||
def get_names_of_objects_by_pages(self, storage, file_pattern: str):
|
||||
def get_names_of_objects_by_pattern(self, file_pattern: str):
|
||||
matches_pattern = flift(file_pattern)
|
||||
page_object_names = compose(matches_pattern, storage.get_all_object_names)(self.bucket_name)
|
||||
page_object_names = compose(matches_pattern, self.storage.get_all_object_names)(self.bucket_name)
|
||||
return page_object_names
|
||||
|
||||
def download_and_decompress_object(self, storage, object_names):
|
||||
download = partial(storage.get_object, self.bucket_name)
|
||||
def download_and_decompress_object(self, object_names):
|
||||
download = partial(self.storage.get_object, self.bucket_name)
|
||||
return map(compose(decompress, download), object_names)
|
||||
|
||||
def download(self, storage: Storage, queue_item_body):
|
||||
def download(self, queue_item_body):
|
||||
file_pattern = self.file_descriptor_manager.build_input_matcher(queue_item_body)
|
||||
|
||||
page_object_names = self.get_names_of_objects_by_pages(storage, file_pattern)
|
||||
objects = self.download_and_decompress_object(storage, page_object_names)
|
||||
page_object_names = self.get_names_of_objects_by_pattern(file_pattern)
|
||||
objects = self.download_and_decompress_object(page_object_names)
|
||||
|
||||
return objects
|
||||
|
||||
@ -1,36 +0,0 @@
|
||||
import logging
|
||||
|
||||
from pyinfra.exceptions import DataLoadingFailure
|
||||
from pyinfra.file_descriptor_manager import FileDescriptorManager
|
||||
from pyinfra.utils.encoding import decompress
|
||||
from pyinfra.visitor.strategies.download.download import DownloadStrategy
|
||||
|
||||
|
||||
class SingleDownloadStrategy(DownloadStrategy):
|
||||
def __init__(self, file_descriptor_manager: FileDescriptorManager):
|
||||
super().__init__(file_descriptor_manager=file_descriptor_manager)
|
||||
|
||||
def download(self, storage, queue_item_body):
|
||||
return self._load_data(storage, queue_item_body)
|
||||
|
||||
def _load_data(self, storage, queue_item_body):
|
||||
object_descriptor = self.file_descriptor_manager.get_input_object_descriptor(queue_item_body)
|
||||
logging.debug(f"Downloading {object_descriptor}...")
|
||||
data = self.__download(storage, object_descriptor)
|
||||
logging.debug(f"Downloaded {object_descriptor}.")
|
||||
assert isinstance(data, bytes)
|
||||
data = decompress(data)
|
||||
return [data]
|
||||
|
||||
@staticmethod
|
||||
def __download(storage, object_descriptor):
|
||||
try:
|
||||
data = storage.get_object(**object_descriptor)
|
||||
except Exception as err:
|
||||
logging.warning(f"Loading data from storage failed for {object_descriptor}.")
|
||||
raise DataLoadingFailure from err
|
||||
|
||||
return data
|
||||
|
||||
def __call__(self, storage, queue_item_body):
|
||||
return self.download(storage, queue_item_body)
|
||||
@ -28,13 +28,10 @@ class StorageStrategy(ResponseStrategy):
|
||||
def get_response_object_name(body):
|
||||
"""TODO: refactor by using FileDescriptorManager"""
|
||||
|
||||
if "pages" not in body:
|
||||
body["pages"] = []
|
||||
|
||||
if "id" not in body:
|
||||
body["id"] = 0
|
||||
|
||||
dossier_id, file_id, pages, idnt = itemgetter("dossierId", "fileId", "pages", "id")(body)
|
||||
dossier_id, file_id, idnt = itemgetter("dossierId", "fileId", "id")(body)
|
||||
|
||||
object_name = f"{dossier_id}/{file_id}/id:{idnt}.{CONFIG.service.response_file_extension}"
|
||||
|
||||
|
||||
@ -2,24 +2,22 @@ from typing import Callable
|
||||
|
||||
from funcy import lflatten, compose, itervalues, lfilter
|
||||
|
||||
from pyinfra.storage.storage import Storage
|
||||
from pyinfra.server.debugging import inspect
|
||||
from pyinfra.utils.func import lift
|
||||
from pyinfra.visitor.response_formatter.formatter import ResponseFormatter
|
||||
from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter
|
||||
from pyinfra.visitor.strategies.blob_parsing.blob_parsing import BlobParsingStrategy
|
||||
from pyinfra.visitor.strategies.blob_parsing.dynamic import DynamicParsingStrategy
|
||||
from pyinfra.visitor.strategies.download.download import DownloadStrategy
|
||||
from pyinfra.visitor.strategies.download.multi import Downloader
|
||||
from pyinfra.visitor.strategies.response.response import ResponseStrategy
|
||||
from pyinfra.visitor.strategies.response.storage import StorageStrategy
|
||||
from pyinfra.visitor.utils import standardize
|
||||
|
||||
|
||||
class QueueVisitor:
|
||||
def __init__(
|
||||
self,
|
||||
storage: Storage,
|
||||
callback: Callable,
|
||||
download_strategy: DownloadStrategy,
|
||||
downloader: Downloader,
|
||||
parsing_strategy: BlobParsingStrategy = None,
|
||||
response_strategy: ResponseStrategy = None,
|
||||
response_formatter: ResponseFormatter = None,
|
||||
@ -27,20 +25,19 @@ class QueueVisitor:
|
||||
"""Processes queue messages that specify items on a storage to process with a given callback.
|
||||
|
||||
Args:
|
||||
storage: storage to pull items specified by queue message from
|
||||
callback: callback to apply to storage items
|
||||
download_strategy: behaviour for loading items from the storage
|
||||
downloader: behaviour for loading items from the storage
|
||||
parsing_strategy: behaviour for interpreting storage items
|
||||
response_strategy: behaviour for response production
|
||||
|
||||
Returns:
|
||||
depends on response strategy
|
||||
"""
|
||||
self.storage = storage
|
||||
self.callback = callback
|
||||
self.download_strategy = download_strategy
|
||||
self.downloader = downloader
|
||||
self.parsing_strategy = parsing_strategy or DynamicParsingStrategy()
|
||||
self.response_strategy = response_strategy or StorageStrategy(storage)
|
||||
self.response_strategy = response_strategy
|
||||
assert self.response_strategy is not None
|
||||
self.response_formatter = response_formatter or IdentityResponseFormatter()
|
||||
|
||||
def __call__(self, queue_item_body):
|
||||
@ -68,8 +65,8 @@ class QueueVisitor:
|
||||
data = compose(
|
||||
lift(standardize),
|
||||
lift(self.parsing_strategy),
|
||||
self.download_strategy,
|
||||
)(self.storage, queue_item_body)
|
||||
self.downloader,
|
||||
)(queue_item_body)
|
||||
|
||||
return data
|
||||
|
||||
|
||||
@ -28,7 +28,6 @@ from pyinfra.storage.storage import Storage
|
||||
from pyinfra.visitor import QueueVisitor
|
||||
from pyinfra.visitor.strategies.response.forwarding import ForwardingStrategy
|
||||
from pyinfra.visitor.strategies.response.storage import StorageStrategy
|
||||
from test.config import CONFIG
|
||||
from test.queue.queue_manager_mock import QueueManagerMock
|
||||
from test.storage.adapter_mock import StorageAdapterMock
|
||||
from test.storage.client_mock import StorageClientMock
|
||||
@ -54,7 +53,7 @@ logging.getLogger("waitress").setLevel(level=logging.CRITICAL + 1)
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mute_logger():
|
||||
if not CONFIG.logging:
|
||||
if not TEST_CONFIG.logging:
|
||||
logger.setLevel(logging.CRITICAL + 1)
|
||||
|
||||
|
||||
@ -112,7 +111,7 @@ def s3_backend(request):
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def docker_compose(sleep_seconds=30):
|
||||
if CONFIG.use_docker_fixture:
|
||||
if TEST_CONFIG.use_docker_fixture:
|
||||
logger.info(f"Starting docker containers with {COMPOSE_PATH}/docker-compose.yml...")
|
||||
compose = DockerCompose(COMPOSE_PATH, compose_file_name="docker-compose.yml")
|
||||
compose.start()
|
||||
@ -130,7 +129,7 @@ def get_pika_connection_params():
|
||||
|
||||
|
||||
def get_s3_params(s3_backend):
|
||||
params = CONFIG.storage[s3_backend]
|
||||
params = TEST_CONFIG.storage[s3_backend]
|
||||
|
||||
return params
|
||||
|
||||
@ -139,7 +138,7 @@ def get_adapter(client_name, s3_backend):
|
||||
if client_name == "mock":
|
||||
return StorageAdapterMock(StorageClientMock())
|
||||
if client_name == "azure":
|
||||
return AzureStorageAdapter(get_azure_client(CONFIG.storage.azure.connection_string))
|
||||
return AzureStorageAdapter(get_azure_client(TEST_CONFIG.storage.azure.connection_string))
|
||||
if client_name == "s3":
|
||||
return S3StorageAdapter(get_s3_client(get_s3_params(s3_backend)))
|
||||
else:
|
||||
@ -203,9 +202,8 @@ def response_strategy(response_strategy_name, storage):
|
||||
def visitor(storage, analysis_callback, response_strategy, component_factory):
|
||||
|
||||
return QueueVisitor(
|
||||
storage=storage,
|
||||
callback=analysis_callback,
|
||||
download_strategy=component_factory.get_download_strategy(),
|
||||
downloader=component_factory.get_downloader(storage),
|
||||
response_strategy=response_strategy,
|
||||
)
|
||||
|
||||
@ -217,6 +215,7 @@ def file_descriptor_manager(component_factory):
|
||||
|
||||
@pytest.fixture
|
||||
def component_factory():
|
||||
CONFIG["service"]["operations"] = TEST_CONFIG.service.operations
|
||||
CONFIG["service"]["download_strategy"] = "single"
|
||||
return get_component_factory(CONFIG)
|
||||
MAIN_CONFIG["service"]["operations"] = TEST_CONFIG.service.operations
|
||||
MAIN_CONFIG["service"]["download_strategy"] = "single"
|
||||
|
||||
return get_component_factory(MAIN_CONFIG)
|
||||
|
||||
29
test/fixtures/consumer.py
vendored
29
test/fixtures/consumer.py
vendored
@ -1,3 +1,6 @@
|
||||
from operator import itemgetter
|
||||
from typing import Iterable
|
||||
|
||||
import pytest
|
||||
|
||||
from pyinfra.queue.consumer import Consumer
|
||||
@ -6,3 +9,29 @@ from pyinfra.queue.consumer import Consumer
|
||||
@pytest.fixture(scope="session")
|
||||
def consumer(queue_manager, callback):
|
||||
return Consumer(callback, queue_manager)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def access_callback():
|
||||
return itemgetter("fileId")
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def items():
|
||||
numbers = [f"{i}".encode() for i in range(3)]
|
||||
return pair_data_with_queue_message(numbers)
|
||||
|
||||
|
||||
def pair_data_with_queue_message(data: Iterable[bytes]):
|
||||
def inner():
|
||||
for i, d in enumerate(data):
|
||||
body = {
|
||||
"dossierId": "folder",
|
||||
"fileId": f"file{i}",
|
||||
"targetFileExtension": "in.gz",
|
||||
"responseFileExtension": "out.gz",
|
||||
"pages": [0, 2, 3],
|
||||
}
|
||||
yield d, body
|
||||
|
||||
return list(inner())
|
||||
|
||||
@ -6,12 +6,10 @@ from operator import itemgetter
|
||||
import pytest
|
||||
from funcy import compose, first, second, pluck, lflatten, lzip
|
||||
|
||||
from pyinfra.config import CONFIG
|
||||
from pyinfra.default_objects import ComponentFactory, get_component_factory
|
||||
from pyinfra.queue.consumer import Consumer
|
||||
from pyinfra.config import CONFIG, DotIndexable
|
||||
from pyinfra.default_objects import get_component_factory
|
||||
from pyinfra.server.packing import unpack, pack
|
||||
from pyinfra.utils.encoding import compress, decompress
|
||||
from pyinfra.visitor import QueueVisitor
|
||||
from test.config import CONFIG as TEST_CONFIG
|
||||
|
||||
|
||||
|
||||
@ -1,13 +1,13 @@
|
||||
import logging
|
||||
from operator import itemgetter
|
||||
|
||||
import pytest
|
||||
from funcy import lmapcat
|
||||
from funcy import pluck, lflatten
|
||||
|
||||
from pyinfra.config import CONFIG
|
||||
from pyinfra.default_objects import ComponentFactory
|
||||
from pyinfra.exceptions import ProcessingFailure
|
||||
from test.utils.storage import pack_for_upload
|
||||
from pyinfra.visitor.strategies.download.single import SingleDownloadStrategy
|
||||
from pyinfra.visitor.strategies.response.forwarding import ForwardingStrategy
|
||||
from test.utils.storage import pack_for_upload
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
@ -26,7 +26,7 @@ class TestConsumer:
|
||||
@pytest.mark.parametrize(
|
||||
"queue_manager_name",
|
||||
[
|
||||
"pika", # NOTE: pika must come first. Test fails IFF pika is in second place, for whatever reason.
|
||||
"pika", # NOTE: pika must come first. Test fails IFF pika is in second position, for whatever reason.
|
||||
"mock",
|
||||
],
|
||||
scope="session",
|
||||
@ -64,8 +64,9 @@ class TestConsumer:
|
||||
@pytest.mark.parametrize("client_name", ["mock", "s3", "azure"], scope="session")
|
||||
@pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session")
|
||||
def test_consuming_nonempty_input_queue_with_visitor_puts_messages_on_output_queue_in_fifo_order(
|
||||
self, consumer, queue_manager, visitor, bucket_name, storage, items, input_data_encoder
|
||||
self, consumer, queue_manager, visitor, bucket_name, storage, items, input_data_encoder,
|
||||
):
|
||||
file_descriptor_manager = ComponentFactory(CONFIG).get_file_descriptor_manager()
|
||||
|
||||
visitor.response_strategy = ForwardingStrategy()
|
||||
|
||||
@ -73,7 +74,7 @@ class TestConsumer:
|
||||
storage.clear_bucket(bucket_name)
|
||||
|
||||
for data, message in items:
|
||||
storage.put_object(**SingleDownloadStrategy().get_object_descriptor(message), data=pack_for_upload(data))
|
||||
storage.put_object(**file_descriptor_manager.get_input_object_descriptor(message), data=pack_for_upload(data))
|
||||
queue_manager.publish_request(message)
|
||||
|
||||
requests = consumer.consume(inactivity_timeout=5)
|
||||
@ -82,7 +83,7 @@ class TestConsumer:
|
||||
logger.debug(f"Processing item {itm}")
|
||||
queue_manager.publish_response(req, visitor)
|
||||
|
||||
assert lmapcat(itemgetter("data"), queue_manager.output_queue.to_list()) == ["00", "11", "22"]
|
||||
assert lflatten(pluck("analysis_payloads", queue_manager.output_queue.to_list())) == ["00", "11", "22"]
|
||||
|
||||
@pytest.mark.parametrize("queue_manager_name", ["pika"], scope="session")
|
||||
def test_message_is_republished_when_callback_raises_processing_failure_exception(
|
||||
@ -110,7 +111,7 @@ class TestConsumer:
|
||||
logger.addFilter(lambda record: False)
|
||||
|
||||
# TODO:
|
||||
# Note above says order of mock and pika matters. Note, that you can produce an error for debugging, when
|
||||
# The note above says order of mock and pika matters. Note, that you can produce an error for debugging, when
|
||||
# using `queue_manager.publish_response(next(requests), callback)` without the-with and while-block, where it
|
||||
# becomes obvious, that the test with the note above then uses the data from THIS here test.
|
||||
with pytest.raises(DebugError):
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user