Julius Unverfehrt c09476cfae Update tests
All components from payload processing downwards are tested.

Tests that depend on docker compose have been disabled by default
because they take too long to use during development. Furthermore, the
queue manager tests are not stable, a refactoring with inversion of
control is urgently needed to make the components properly testable. The
storage tests are stable and should be run once before releasing, this
should be implemented via the CI script.

Also adds, if present, tenant Id and operation kwargs to storage and
queue response.
2023-08-22 17:33:22 +02:00

58 lines
2.6 KiB
Python

import logging
import pytest
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
@pytest.mark.parametrize("storage_backend", ["azure", "s3"], scope="session")
@pytest.mark.parametrize("bucket_name", ["testbucket"], scope="session")
@pytest.mark.parametrize("monitoring_enabled", [False], scope="session")
class TestStorage:
def test_clearing_bucket_yields_empty_bucket(self, storage, bucket_name):
storage.clear_bucket(bucket_name)
data_received = storage.get_all_objects(bucket_name)
assert not {*data_received}
def test_getting_object_put_in_bucket_is_object(self, storage, bucket_name):
storage.clear_bucket(bucket_name)
storage.put_object(bucket_name, "file", b"content")
data_received = storage.get_object(bucket_name, "file")
assert b"content" == data_received
def test_object_put_in_bucket_exists_on_storage(self, storage, bucket_name):
storage.clear_bucket(bucket_name)
storage.put_object(bucket_name, "file", b"content")
assert storage.exists(bucket_name, "file")
def test_getting_nested_object_put_in_bucket_is_nested_object(self, storage, bucket_name):
storage.clear_bucket(bucket_name)
storage.put_object(bucket_name, "folder/file", b"content")
data_received = storage.get_object(bucket_name, "folder/file")
assert b"content" == data_received
def test_getting_objects_put_in_bucket_are_objects(self, storage, bucket_name):
storage.clear_bucket(bucket_name)
storage.put_object(bucket_name, "file1", b"content 1")
storage.put_object(bucket_name, "folder/file2", b"content 2")
data_received = storage.get_all_objects(bucket_name)
assert {b"content 1", b"content 2"} == {*data_received}
def test_make_bucket_produces_bucket(self, storage, bucket_name):
storage.clear_bucket(bucket_name)
storage.make_bucket(bucket_name)
assert storage.has_bucket(bucket_name)
def test_listing_bucket_files_yields_all_files_in_bucket(self, storage, bucket_name):
storage.clear_bucket(bucket_name)
storage.put_object(bucket_name, "file1", b"content 1")
storage.put_object(bucket_name, "file2", b"content 2")
full_names_received = storage.get_all_object_names(bucket_name)
assert {(bucket_name, "file1"), (bucket_name, "file2")} == {*full_names_received}
def test_data_loading_failure_raised_if_object_not_present(self, storage, bucket_name):
storage.clear_bucket(bucket_name)
with pytest.raises(Exception):
storage.get_object(bucket_name, "folder/file")