diff --git a/test/config.yaml b/test/config.yaml index 20fa617..69d72a1 100644 --- a/test/config.yaml +++ b/test/config.yaml @@ -9,6 +9,10 @@ service: input: subdir: "" extension: IN.gz + default: + input: + subdir: "" + extension: IN.gz storage: minio: @@ -36,4 +40,4 @@ webserver: mock_analysis_endpoint: "http://127.0.0.1:5000" use_docker_fixture: 0 -logging: 1 \ No newline at end of file +logging: 0 \ No newline at end of file diff --git a/test/conftest.py b/test/conftest.py index 7749b25..49991e8 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -5,6 +5,9 @@ from pyinfra.config import CONFIG as MAIN_CONFIG MAIN_CONFIG["retry"]["delay"] = 0.1 MAIN_CONFIG["retry"]["jitter"] = (0.1, 0.2) +from pyinfra.default_objects import get_component_factory +from test.config import CONFIG as TEST_CONFIG + import logging import time from unittest.mock import Mock @@ -197,5 +200,23 @@ def response_strategy(response_strategy_name, storage): @pytest.fixture() -def visitor(storage, analysis_callback, response_strategy): - return QueueVisitor(storage=storage, callback=analysis_callback, response_strategy=response_strategy) +def visitor(storage, analysis_callback, response_strategy, component_factory): + + return QueueVisitor( + storage=storage, + callback=analysis_callback, + download_strategy=component_factory.get_download_strategy(), + response_strategy=response_strategy, + ) + + +@pytest.fixture +def file_descriptor_manager(component_factory): + return component_factory.get_file_descriptor_manager() + + +@pytest.fixture +def component_factory(): + CONFIG["service"]["operations"] = TEST_CONFIG.service.operations + CONFIG["service"]["download_strategy"] = "single" + return get_component_factory(CONFIG) diff --git a/test/integration_tests/serve_test.py b/test/integration_tests/serve_test.py index f64a27a..2fbfb4d 100644 --- a/test/integration_tests/serve_test.py +++ b/test/integration_tests/serve_test.py @@ -80,13 +80,13 @@ from test.config import CONFIG as TEST_CONFIG @pytest.mark.parametrize( "many_to_n", [ - False, - # True, + True, + False, # NOTE: has to be in this order, because who knows why... pytest ], ) def test_serving(server_process, bucket_name, components, targets, data_message_pairs, n_items, many_to_n): - storage, queue_manager, consumer, download_strategy, file_descriptor_manager = components + storage, queue_manager, consumer, file_descriptor_manager = components assert queue_manager.input_queue.to_list() == [] assert queue_manager.output_queue.to_list() == [] @@ -134,7 +134,6 @@ def upload_data_to_storage_and_publish_requests_to_queue( # TODO: refactor; too many params def upload_data_to_storage_and_publish_request_to_queue(storage, queue_manager, data, message, file_descriptor_manager): - print(file_descriptor_manager.get_object_descriptor(message)) storage.put_object(**file_descriptor_manager.get_object_descriptor(message), data=compress(data)) queue_manager.publish_request(message) @@ -182,16 +181,16 @@ def components(components_type, real_components, test_components, bucket_name): else: raise ValueError(f"Unknown component type '{components_type}'.") - storage, queue_manager, consumer, download_strategy, file_descriptor_manager = components + storage, queue_manager, consumer, file_descriptor_manager = components queue_manager.clear() storage.make_bucket(bucket_name) storage.clear_bucket(bucket_name) - yield storage, queue_manager, consumer, download_strategy, file_descriptor_manager + yield storage, queue_manager, consumer, file_descriptor_manager queue_manager.clear() - # storage.clear_bucket(bucket_name) + storage.clear_bucket(bucket_name) def decode(storage_item): @@ -222,10 +221,7 @@ def real_components(url, download_strategy_type): storage = component_factory.get_storage() file_descriptor_manager = component_factory.get_file_descriptor_manager() - download_strategy = component_factory.get_download_strategy() - - # consumer.visitor.download_strategy = download_strategy - return storage, queue_manager, consumer, download_strategy, file_descriptor_manager + return storage, queue_manager, consumer, file_descriptor_manager @pytest.fixture @@ -234,11 +230,11 @@ def download_strategy_type(many_to_n): @pytest.fixture -def test_components(url, queue_manager, storage, many_to_n): +def test_components(url, queue_manager, storage, download_strategy_type): component_factory = ComponentFactory(CONFIG) - download_strategy = component_factory.get_download_strategy("multi" if many_to_n else "single") + download_strategy = component_factory.get_download_strategy(download_strategy_type) file_descriptor_manager = component_factory.get_file_descriptor_manager() visitor = QueueVisitor( @@ -249,4 +245,4 @@ def test_components(url, queue_manager, storage, many_to_n): ) consumer = Consumer(visitor, queue_manager) - return storage, queue_manager, consumer, download_strategy, file_descriptor_manager + return storage, queue_manager, consumer, file_descriptor_manager diff --git a/test/unit_tests/queue_visitor_test.py b/test/unit_tests/queue_visitor_test.py index 23c177c..78dde54 100644 --- a/test/unit_tests/queue_visitor_test.py +++ b/test/unit_tests/queue_visitor_test.py @@ -5,7 +5,6 @@ from funcy import first from pyinfra.utils.encoding import decompress from test.utils.storage import pack_for_upload -from pyinfra.visitor.strategies.download.single import SingleDownloadStrategy @pytest.fixture() @@ -17,24 +16,24 @@ def body(): class TestVisitor: @pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session") def test_given_a_input_queue_message_callback_pulls_the_data_from_storage( - self, visitor, body, storage, bucket_name + self, visitor, body, storage, bucket_name, file_descriptor_manager ): storage.clear_bucket(bucket_name) - storage.put_object(**SingleDownloadStrategy().get_object_descriptor(body), data=pack_for_upload(b"content")) + storage.put_object(**file_descriptor_manager.get_object_descriptor(body), data=pack_for_upload(b"content")) data_received = list(visitor.load_data(body)) assert [{"data": b"content", "metadata": {}}] == data_received @pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session") - def test_visitor_pulls_and_processes_data(self, visitor, body, storage, bucket_name): + def test_visitor_pulls_and_processes_data(self, visitor, body, storage, bucket_name, file_descriptor_manager): storage.clear_bucket(bucket_name) - storage.put_object(**SingleDownloadStrategy().get_object_descriptor(body), data=pack_for_upload(b"2")) + storage.put_object(**file_descriptor_manager.get_object_descriptor(body), data=pack_for_upload(b"2")) response_body = visitor.load_items_from_storage_and_process_with_callback(body) assert response_body["analysis_payloads"] == ["22"] @pytest.mark.parametrize("response_strategy_name", ["storage"], scope="session") - def test_visitor_puts_response_on_storage(self, visitor, body, storage, bucket_name): + def test_visitor_puts_response_on_storage(self, visitor, body, storage, bucket_name, file_descriptor_manager): storage.clear_bucket(bucket_name) - storage.put_object(**SingleDownloadStrategy().get_object_descriptor(body), data=pack_for_upload(b"2")) + storage.put_object(**file_descriptor_manager.get_object_descriptor(body), data=pack_for_upload(b"2")) response_body = visitor(body) assert "data" not in response_body assert json.loads(