removed direct download strategy usages in test setups
This commit is contained in:
parent
bb6b28bb4e
commit
1adeb4038a
@ -9,6 +9,10 @@ service:
|
||||
input:
|
||||
subdir: ""
|
||||
extension: IN.gz
|
||||
default:
|
||||
input:
|
||||
subdir: ""
|
||||
extension: IN.gz
|
||||
|
||||
storage:
|
||||
minio:
|
||||
@ -36,4 +40,4 @@ webserver:
|
||||
mock_analysis_endpoint: "http://127.0.0.1:5000"
|
||||
|
||||
use_docker_fixture: 0
|
||||
logging: 1
|
||||
logging: 0
|
||||
@ -5,6 +5,9 @@ from pyinfra.config import CONFIG as MAIN_CONFIG
|
||||
MAIN_CONFIG["retry"]["delay"] = 0.1
|
||||
MAIN_CONFIG["retry"]["jitter"] = (0.1, 0.2)
|
||||
|
||||
from pyinfra.default_objects import get_component_factory
|
||||
from test.config import CONFIG as TEST_CONFIG
|
||||
|
||||
import logging
|
||||
import time
|
||||
from unittest.mock import Mock
|
||||
@ -197,5 +200,23 @@ def response_strategy(response_strategy_name, storage):
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def visitor(storage, analysis_callback, response_strategy):
|
||||
return QueueVisitor(storage=storage, callback=analysis_callback, response_strategy=response_strategy)
|
||||
def visitor(storage, analysis_callback, response_strategy, component_factory):
|
||||
|
||||
return QueueVisitor(
|
||||
storage=storage,
|
||||
callback=analysis_callback,
|
||||
download_strategy=component_factory.get_download_strategy(),
|
||||
response_strategy=response_strategy,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def file_descriptor_manager(component_factory):
|
||||
return component_factory.get_file_descriptor_manager()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def component_factory():
|
||||
CONFIG["service"]["operations"] = TEST_CONFIG.service.operations
|
||||
CONFIG["service"]["download_strategy"] = "single"
|
||||
return get_component_factory(CONFIG)
|
||||
|
||||
@ -80,13 +80,13 @@ from test.config import CONFIG as TEST_CONFIG
|
||||
@pytest.mark.parametrize(
|
||||
"many_to_n",
|
||||
[
|
||||
False,
|
||||
# True,
|
||||
True,
|
||||
False, # NOTE: has to be in this order, because who knows why... pytest
|
||||
],
|
||||
)
|
||||
def test_serving(server_process, bucket_name, components, targets, data_message_pairs, n_items, many_to_n):
|
||||
|
||||
storage, queue_manager, consumer, download_strategy, file_descriptor_manager = components
|
||||
storage, queue_manager, consumer, file_descriptor_manager = components
|
||||
|
||||
assert queue_manager.input_queue.to_list() == []
|
||||
assert queue_manager.output_queue.to_list() == []
|
||||
@ -134,7 +134,6 @@ def upload_data_to_storage_and_publish_requests_to_queue(
|
||||
|
||||
# TODO: refactor; too many params
|
||||
def upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message, file_descriptor_manager):
|
||||
print(file_descriptor_manager.get_object_descriptor(message))
|
||||
storage.put_object(**file_descriptor_manager.get_object_descriptor(message), data=compress(data))
|
||||
queue_manager.publish_request(message)
|
||||
|
||||
@ -182,16 +181,16 @@ def components(components_type, real_components, test_components, bucket_name):
|
||||
else:
|
||||
raise ValueError(f"Unknown component type '{components_type}'.")
|
||||
|
||||
storage, queue_manager, consumer, download_strategy, file_descriptor_manager = components
|
||||
storage, queue_manager, consumer, file_descriptor_manager = components
|
||||
|
||||
queue_manager.clear()
|
||||
storage.make_bucket(bucket_name)
|
||||
storage.clear_bucket(bucket_name)
|
||||
|
||||
yield storage, queue_manager, consumer, download_strategy, file_descriptor_manager
|
||||
yield storage, queue_manager, consumer, file_descriptor_manager
|
||||
|
||||
queue_manager.clear()
|
||||
# storage.clear_bucket(bucket_name)
|
||||
storage.clear_bucket(bucket_name)
|
||||
|
||||
|
||||
def decode(storage_item):
|
||||
@ -222,10 +221,7 @@ def real_components(url, download_strategy_type):
|
||||
storage = component_factory.get_storage()
|
||||
file_descriptor_manager = component_factory.get_file_descriptor_manager()
|
||||
|
||||
download_strategy = component_factory.get_download_strategy()
|
||||
|
||||
# consumer.visitor.download_strategy = download_strategy
|
||||
return storage, queue_manager, consumer, download_strategy, file_descriptor_manager
|
||||
return storage, queue_manager, consumer, file_descriptor_manager
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@ -234,11 +230,11 @@ def download_strategy_type(many_to_n):
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_components(url, queue_manager, storage, many_to_n):
|
||||
def test_components(url, queue_manager, storage, download_strategy_type):
|
||||
|
||||
component_factory = ComponentFactory(CONFIG)
|
||||
|
||||
download_strategy = component_factory.get_download_strategy("multi" if many_to_n else "single")
|
||||
download_strategy = component_factory.get_download_strategy(download_strategy_type)
|
||||
file_descriptor_manager = component_factory.get_file_descriptor_manager()
|
||||
|
||||
visitor = QueueVisitor(
|
||||
@ -249,4 +245,4 @@ def test_components(url, queue_manager, storage, many_to_n):
|
||||
)
|
||||
consumer = Consumer(visitor, queue_manager)
|
||||
|
||||
return storage, queue_manager, consumer, download_strategy, file_descriptor_manager
|
||||
return storage, queue_manager, consumer, file_descriptor_manager
|
||||
|
||||
@ -5,7 +5,6 @@ from funcy import first
|
||||
|
||||
from pyinfra.utils.encoding import decompress
|
||||
from test.utils.storage import pack_for_upload
|
||||
from pyinfra.visitor.strategies.download.single import SingleDownloadStrategy
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
@ -17,24 +16,24 @@ def body():
|
||||
class TestVisitor:
|
||||
@pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session")
|
||||
def test_given_a_input_queue_message_callback_pulls_the_data_from_storage(
|
||||
self, visitor, body, storage, bucket_name
|
||||
self, visitor, body, storage, bucket_name, file_descriptor_manager
|
||||
):
|
||||
storage.clear_bucket(bucket_name)
|
||||
storage.put_object(**SingleDownloadStrategy().get_object_descriptor(body), data=pack_for_upload(b"content"))
|
||||
storage.put_object(**file_descriptor_manager.get_object_descriptor(body), data=pack_for_upload(b"content"))
|
||||
data_received = list(visitor.load_data(body))
|
||||
assert [{"data": b"content", "metadata": {}}] == data_received
|
||||
|
||||
@pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session")
|
||||
def test_visitor_pulls_and_processes_data(self, visitor, body, storage, bucket_name):
|
||||
def test_visitor_pulls_and_processes_data(self, visitor, body, storage, bucket_name, file_descriptor_manager):
|
||||
storage.clear_bucket(bucket_name)
|
||||
storage.put_object(**SingleDownloadStrategy().get_object_descriptor(body), data=pack_for_upload(b"2"))
|
||||
storage.put_object(**file_descriptor_manager.get_object_descriptor(body), data=pack_for_upload(b"2"))
|
||||
response_body = visitor.load_items_from_storage_and_process_with_callback(body)
|
||||
assert response_body["analysis_payloads"] == ["22"]
|
||||
|
||||
@pytest.mark.parametrize("response_strategy_name", ["storage"], scope="session")
|
||||
def test_visitor_puts_response_on_storage(self, visitor, body, storage, bucket_name):
|
||||
def test_visitor_puts_response_on_storage(self, visitor, body, storage, bucket_name, file_descriptor_manager):
|
||||
storage.clear_bucket(bucket_name)
|
||||
storage.put_object(**SingleDownloadStrategy().get_object_descriptor(body), data=pack_for_upload(b"2"))
|
||||
storage.put_object(**file_descriptor_manager.get_object_descriptor(body), data=pack_for_upload(b"2"))
|
||||
response_body = visitor(body)
|
||||
assert "data" not in response_body
|
||||
assert json.loads(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user