reorganized serve-test to use only default-objects instead of test-object

This commit is contained in:
Matthias Bisping 2022-05-31 16:13:47 +02:00
parent 93da0d12bb
commit dc4f578e94
4 changed files with 31 additions and 29 deletions

View File

@ -18,7 +18,8 @@ from pyinfra.visitor import QueueVisitor, AggregationStorageStrategy
@lru_cache(maxsize=None)
def get_consumer(callback):
def get_consumer(callback=None):
callback = callback or get_callback()
return Consumer(get_visitor(callback), get_queue_manager())
@ -40,7 +41,7 @@ def get_storage():
@lru_cache(maxsize=None)
def get_callback(analysis_base_url=None):
analysis_base_url = analysis_base_url or CONFIG.rabbitmq.callback.analysis_endpoint
return make_callback(analysis_base_url)
return Callback(analysis_base_url)
@lru_cache(maxsize=None)
@ -48,11 +49,6 @@ def get_response_strategy(storage=None):
return AggregationStorageStrategy(storage or get_storage())
@lru_cache(maxsize=None)
def make_callback(url):
return Callback(url)
class Callback:
def __init__(self, base_url):
self.base_url = base_url

View File

@ -3,7 +3,7 @@ from _operator import itemgetter
import pytest
from pyinfra.queue.consumer import Consumer
from test.utils.input import adorn_data_with_storage_info
from test.utils.input import pair_data_with_queue_message
@pytest.fixture(scope="session")
@ -19,4 +19,4 @@ def access_callback():
@pytest.fixture()
def items():
numbers = [f"{i}".encode() for i in range(3)]
return adorn_data_with_storage_info(numbers)
return pair_data_with_queue_message(numbers)

View File

@ -11,12 +11,15 @@ from funcy import compose, lzip, lpluck
from pyinfra.default_objects import (
get_callback,
get_response_strategy,
get_consumer,
get_queue_manager,
get_storage,
)
from pyinfra.queue.consumer import Consumer
from pyinfra.server.packing import bytes_to_string, unpack, pack
from pyinfra.utils.func import star
from pyinfra.visitor import get_object_descriptor, QueueVisitor
from test.utils.input import adorn_data_with_storage_info
from test.utils.input import pair_data_with_queue_message
logger = logging.getLogger(__name__)
@ -26,26 +29,30 @@ def freeze(data, metadata):
@pytest.fixture
def mixed_components(url, bucket_name, queue_manager, storage):
def test_components(url, queue_manager, storage):
callback = get_callback(url)
visitor = QueueVisitor(storage, callback, get_response_strategy(storage))
consumer = Consumer(visitor, queue_manager)
return visitor, queue_manager, storage, consumer
return storage, queue_manager, consumer
@pytest.fixture
def real_components(url):
callback = get_callback(url)
consumer = get_consumer(callback)
queue_manager = get_queue_manager()
storage = get_storage()
return storage, queue_manager, consumer
@pytest.fixture
def components(components_type, real_components, mixed_components):
def components(components_type, real_components, test_components):
if components_type == "real":
return real_components
elif components_type == "mixed":
return mixed_components
elif components_type == "test":
return test_components
else:
raise ValueError(f"Unknown components type '{components_type}'.")
@ -106,33 +113,32 @@ def decode(storage_item):
@pytest.mark.parametrize(
"client_name",
[
"mock",
# "s3",
# "azure",
# "mock",
"s3",
"azure",
],
scope="session",
)
@pytest.mark.parametrize(
"components_type",
[
"mixed",
# "test",
"real",
],
)
def test_serving(
server_process,
input_data_items,
unencoded_input_data,
metadata,
bucket_name,
components,
core_operation,
storage_item_has_metadata,
target_data_items,
targets,
):
visitor, _, storage, consumer = components
storage, queue_manager, consumer = components
consumer.queue_manager.clear()
queue_manager.clear()
storage.clear_bucket(bucket_name)
if storage_item_has_metadata:
@ -144,15 +150,15 @@ def test_serving(
targets = sorted(starmap(freeze, targets), key=itemgetter(0))
adorned_data_metadata_packs = adorn_data_with_storage_info(data_metadata_packs)
data_message_pairs = pair_data_with_queue_message(data_metadata_packs)
for data, message in adorned_data_metadata_packs:
for data, message in data_message_pairs:
storage.put_object(**get_object_descriptor(message), data=gzip.compress(data))
consumer.queue_manager.publish_request(message)
queue_manager.publish_request(message)
consumer.consume_and_publish(n=len(adorned_data_metadata_packs))
consumer.consume_and_publish(n=len(data_message_pairs))
names_of_uploaded_files = lpluck("responseFile", consumer.queue_manager.output_queue.to_list())
names_of_uploaded_files = lpluck("responseFile", queue_manager.output_queue.to_list())
uploaded_files = starmap(storage.get_object, zip(repeat(bucket_name), names_of_uploaded_files))
outputs = sorted(chain(*map(decode, uploaded_files)), key=itemgetter(0))
assert outputs == targets

View File

@ -1,7 +1,7 @@
from typing import Iterable
def adorn_data_with_storage_info(data: Iterable[bytes]):
def pair_data_with_queue_message(data: Iterable[bytes]):
def inner():
for i, d in enumerate(data):
body = {