diff --git a/.dockerignore b/.dockerignore index 1269488..b4d8935 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1 +1,106 @@ data +/build_venv/ +/.venv/ +/misc/ +/incl/image_service/test/ +/scratch/ +/bamboo-specs/ +README.md +Dockerfile +*idea +*misc +*egg-innfo +*pycache* + +# Git +.git +.gitignore + +# CI +.codeclimate.yml +.travis.yml +.taskcluster.yml + +# Docker +.docker + +# Byte-compiled / optimized / DLL files +__pycache__/ +*/__pycache__/ +*/*/__pycache__/ +*/*/*/__pycache__/ +*.py[cod] +*/*.py[cod] +*/*/*.py[cod] +*/*/*/*.py[cod] + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/** +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.cache +nosetests.xml +coverage.xml + +# Translations +*.mo +*.pot + +# Django stuff: +*.log + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Virtual environment +.env/ +.venv/ +#venv/ + +# PyCharm +.idea + +# Python mode for VIM +.ropeproject +*/.ropeproject +*/*/.ropeproject +*/*/*/.ropeproject + +# Vim swap files +*.swp +*/*.swp +*/*/*.swp +*/*/*/*.swp \ No newline at end of file diff --git a/Dockerfile_tests b/Dockerfile_tests new file mode 100755 index 0000000..b16eccc --- /dev/null +++ b/Dockerfile_tests @@ -0,0 +1,19 @@ +ARG BASE_ROOT="nexus.iqser.com:5001/red/" +ARG VERSION_TAG="dev" + +FROM ${BASE_ROOT}pyinfra:${VERSION_TAG} + +EXPOSE 5000 +EXPOSE 8080 + +RUN python3 -m pip install coverage + +# Make a directory for the service files and copy the service repo into the container. +WORKDIR /app/service +COPY . . + +# Install module & dependencies +RUN python3 -m pip install -e . +RUN python3 -m pip install -r requirements.txt + +CMD coverage run -m pytest pyinfra/test/ -x && coverage report -m && coverage xml diff --git a/bamboo-specs/src/main/java/buildjob/PlanSpec.java b/bamboo-specs/src/main/java/buildjob/PlanSpec.java index 481810e..6440aa9 100644 --- a/bamboo-specs/src/main/java/buildjob/PlanSpec.java +++ b/bamboo-specs/src/main/java/buildjob/PlanSpec.java @@ -72,8 +72,8 @@ public class PlanSpec { project(), SERVICE_NAME, new BambooKey(SERVICE_KEY)) .description("Docker build for pyinfra") - // .variables() - .stages(new Stage("Build Stage") + .stages( + new Stage("Build Stage") .jobs( new Job("Build Job", new BambooKey("BUILD")) .tasks( @@ -98,7 +98,9 @@ public class PlanSpec { .dockerConfiguration( new DockerConfiguration() .image("nexus.iqser.com:5001/infra/release_build:4.2.0") - .volume("/var/run/docker.sock", "/var/run/docker.sock")), + .volume("/var/run/docker.sock", "/var/run/docker.sock"))), + new Stage("Sonar Stage") + .jobs( new Job("Sonar Job", new BambooKey("SONAR")) .tasks( new CleanWorkingDirectoryTask() @@ -174,6 +176,4 @@ public class PlanSpec { .whenInactiveInRepositoryAfterDays(14)) .notificationForCommitters()); } - - } diff --git a/bamboo-specs/src/main/resources/scripts/docker-build.sh b/bamboo-specs/src/main/resources/scripts/docker-build.sh index 93019b0..bce07f2 100755 --- a/bamboo-specs/src/main/resources/scripts/docker-build.sh +++ b/bamboo-specs/src/main/resources/scripts/docker-build.sh @@ -11,4 +11,3 @@ echo "index-url = https://${bamboo_nexus_user}:${bamboo_nexus_password}@nexus.iq docker build -f Dockerfile -t nexus.iqser.com:5001/red/$SERVICE_NAME:${bamboo_version_tag} . echo "${bamboo_nexus_password}" | docker login --username "${bamboo_nexus_user}" --password-stdin nexus.iqser.com:5001 docker push nexus.iqser.com:5001/red/$SERVICE_NAME:${bamboo_version_tag} - diff --git a/bamboo-specs/src/main/resources/scripts/sonar-scan.sh b/bamboo-specs/src/main/resources/scripts/sonar-scan.sh index 632f69f..f8716e2 100755 --- a/bamboo-specs/src/main/resources/scripts/sonar-scan.sh +++ b/bamboo-specs/src/main/resources/scripts/sonar-scan.sh @@ -6,9 +6,29 @@ export JAVA_HOME=/usr/bin/sonar-scanner/jre python3 -m venv build_venv source build_venv/bin/activate python3 -m pip install --upgrade pip +python3 -m pip install dependency-check +python3 -m pip install docker-compose -pip install -e . -pip install -r requirements.txt +echo "coverage report generation" + +python3 -m pip install coverage + +# Install module & dependencies +#python3 -m pip install -e . +#python3 -m pip install -r requirements.txt + +echo "docker-compose down" +docker-compose down +sleep 30 + +bash run_tests.sh + +if [ ! -f reports/coverage.xml ] +then + exit 1 +fi + +#coverage run -m pytest pyinfra/test/ -x && coverage report -m && coverage xml SERVICE_NAME=$1 diff --git a/banner.txt b/banner.txt new file mode 100644 index 0000000..fa0940f --- /dev/null +++ b/banner.txt @@ -0,0 +1,6 @@ + ___ _ _ ___ __ + o O O | _ \ | || | |_ _| _ _ / _| _ _ __ _ + o | _/ \_, | | | | ' \ | _| | '_| / _` | + TS__[O] _|_|_ _|__/ |___| |_||_| _|_|_ _|_|_ \__,_| + {======|_| ``` |_| ````|_|`````|_|`````|_|`````|_|`````|_|`````| +./o--000' `-0-0-' `-0-0-' `-0-0-' `-0-0-' `-0-0-' `-0-0-' `-0-0-' \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yml similarity index 86% rename from docker-compose.yaml rename to docker-compose.yml index 4f4a6fa..04e5296 100755 --- a/docker-compose.yaml +++ b/docker-compose.yml @@ -22,6 +22,8 @@ services: - '15672:15672' environment: - RABBITMQ_SECURE_PASSWORD=yes + - RABBITMQ_VM_MEMORY_HIGH_WATERMARK=100% + - RABBITMQ_DISK_FREE_ABSOLUTE_LIMIT=20Gi network_mode: "bridge" volumes: - /opt/bitnami/rabbitmq/.rabbitmq/:/data/bitnami diff --git a/pyinfra/config.py b/pyinfra/config.py index 937b760..1e94f08 100644 --- a/pyinfra/config.py +++ b/pyinfra/config.py @@ -5,18 +5,6 @@ from envyaml import EnvYAML from pyinfra.locations import CONFIG_FILE -def make_art(): - return """ - ___ _ _ ___ __ - o O O | _ \ | || | |_ _| _ _ / _| _ _ __ _ - o | _/ \_, | | | | ' \ | _| | '_| / _` | - TS__[O] _|_|_ _|__/ |___| |_||_| _|_|_ _|_|_ \__,_| - {======|_| ``` |_| ````|_|`````|_|`````|_|`````|_|`````|_|`````| -./o--000' `-0-0-' `-0-0-' `-0-0-' `-0-0-' `-0-0-' `-0-0-' `-0-0-' - -""" - - def _get_item_and_maybe_make_dotindexable(container, item): ret = container[item] return DotIndexable(ret) if isinstance(ret, dict) else ret diff --git a/pyinfra/exceptions.py b/pyinfra/exceptions.py index 9b37df5..474cc6f 100644 --- a/pyinfra/exceptions.py +++ b/pyinfra/exceptions.py @@ -24,3 +24,7 @@ class UnknownClient(ValueError): class ConsumerError(Exception): pass + + +class NoSuchContainer(KeyError): + pass diff --git a/pyinfra/locations.py b/pyinfra/locations.py index 5183a8b..abd6a88 100644 --- a/pyinfra/locations.py +++ b/pyinfra/locations.py @@ -12,3 +12,7 @@ TEST_DIR = MODULE_DIR / "test" CONFIG_FILE = PACKAGE_ROOT_DIR / "config.yaml" TEST_CONFIG_FILE = TEST_DIR / "config.yaml" + +COMPOSE_PATH = PACKAGE_ROOT_DIR + +BANNER_FILE = PACKAGE_ROOT_DIR / "banner.txt" diff --git a/pyinfra/queue/consumer.py b/pyinfra/queue/consumer.py index 6dd7d60..c21cb26 100644 --- a/pyinfra/queue/consumer.py +++ b/pyinfra/queue/consumer.py @@ -9,5 +9,5 @@ class Consumer: def consume_and_publish(self): self.queue_manager.consume_and_publish(self.callback) - def consume(self): - yield from self.queue_manager.consume() + def consume(self, **kwargs): + return self.queue_manager.consume(**kwargs) diff --git a/pyinfra/queue/queue_manager/pika_queue_manager.py b/pyinfra/queue/queue_manager/pika_queue_manager.py index 3b91fea..4346658 100644 --- a/pyinfra/queue/queue_manager/pika_queue_manager.py +++ b/pyinfra/queue/queue_manager/pika_queue_manager.py @@ -41,22 +41,18 @@ def monkey_patch_queue_handle(channel, queue) -> QueueHandle: return queue_handle -def get_connection(): +def get_connection_params(): credentials = pika.PlainCredentials(username=CONFIG.rabbitmq.user, password=CONFIG.rabbitmq.password) - kwargs = { "host": CONFIG.rabbitmq.host, "port": CONFIG.rabbitmq.port, "credentials": credentials, "heartbeat": CONFIG.rabbitmq.heartbeat, } - parameters = pika.ConnectionParameters(**kwargs) - connection = pika.BlockingConnection(parameters=parameters) - - return connection + return parameters def get_n_previous_attempts(props): @@ -68,10 +64,13 @@ def attempts_remain(n_attempts, max_attempts): class PikaQueueManager(QueueManager): - def __init__(self, input_queue, output_queue, dead_letter_queue=None): + def __init__(self, input_queue, output_queue, dead_letter_queue=None, connection_params=None): super().__init__(input_queue, output_queue) - self.connection = get_connection() + if not connection_params: + connection_params = get_connection_params() + + self.connection = pika.BlockingConnection(parameters=connection_params) self.channel = self.connection.channel() if not dead_letter_queue: @@ -123,9 +122,9 @@ class PikaQueueManager(QueueManager): def pull_request(self): return self.channel.basic_get(self._input_queue) - def consume(self): + def consume(self, inactivity_timeout=None): logger.debug("Consuming") - return self.channel.consume(self._input_queue) + return self.channel.consume(self._input_queue, inactivity_timeout=inactivity_timeout) def consume_and_publish(self, visitor): @@ -135,8 +134,11 @@ class PikaQueueManager(QueueManager): self.publish_response(message, visitor) def clear(self): - self.channel.queue_purge(self._input_queue) - self.channel.queue_purge(self._output_queue) + try: + self.channel.queue_purge(self._input_queue) + self.channel.queue_purge(self._output_queue) + except pika.exceptions.ChannelWrongStateError: + pass @property def input_queue(self) -> QueueHandle: diff --git a/pyinfra/queue/queue_manager/queue_manager.py b/pyinfra/queue/queue_manager/queue_manager.py index a8af9d3..44b6baf 100644 --- a/pyinfra/queue/queue_manager/queue_manager.py +++ b/pyinfra/queue/queue_manager/queue_manager.py @@ -27,7 +27,7 @@ class QueueManager(abc.ABC): pass @abc.abstractmethod - def consume(self): + def consume(self, **kwargs): pass @abc.abstractmethod diff --git a/pyinfra/storage/adapters/azure.py b/pyinfra/storage/adapters/azure.py index 532da38..aa28279 100644 --- a/pyinfra/storage/adapters/azure.py +++ b/pyinfra/storage/adapters/azure.py @@ -13,18 +13,6 @@ logging.getLogger("azure").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) -def _retry(exceptions=Exception): - def inner(func): - @retry(exceptions=exceptions, delay=5, jitter=(0, 3), max_delay=60) - @wraps(func) - def inner(*args, **kwargs): - return func(*args, **kwargs) - - return inner - - return inner - - class AzureStorageAdapter(StorageAdapter): def __init__(self, client): super().__init__(client=client) diff --git a/pyinfra/storage/clients/s3.py b/pyinfra/storage/clients/s3.py index 19ee87e..943d88c 100644 --- a/pyinfra/storage/clients/s3.py +++ b/pyinfra/storage/clients/s3.py @@ -1,4 +1,3 @@ -import logging import re from minio import Minio @@ -9,6 +8,7 @@ from pyinfra.exceptions import InvalidEndpoint def parse_endpoint(endpoint): # FIXME Greedy matching (.+) since we get random storage names on kubernetes (eg http://red-research-headless:9000) + # FIXME this has been broken and accepts invalid URLs endpoint_pattern = r"(?Phttps?)*(?:://)*(?P
(?:(?:(?:\d{1,3}\.){3}\d{1,3})|.+)(?:\:\d+)?)" match = re.match(endpoint_pattern, endpoint) diff --git a/pyinfra/test/queue/queue_manager_mock.py b/pyinfra/test/queue/queue_manager_mock.py index 0f0cbd6..1928f12 100644 --- a/pyinfra/test/queue/queue_manager_mock.py +++ b/pyinfra/test/queue/queue_manager_mock.py @@ -22,7 +22,7 @@ class QueueManagerMock(QueueManager): def pull_request(self): return self._input_queue.popleft() - def consume(self): + def consume(self, **kwargs): while self._input_queue: yield self.pull_request() diff --git a/pyinfra/test/unit_tests/conftest.py b/pyinfra/test/unit_tests/conftest.py index 46b3ac5..7f9be90 100644 --- a/pyinfra/test/unit_tests/conftest.py +++ b/pyinfra/test/unit_tests/conftest.py @@ -1,12 +1,15 @@ import json import logging +import time from unittest.mock import Mock +import pika import pytest +import testcontainers.compose from pyinfra.exceptions import UnknownClient -from pyinfra.locations import TEST_DIR -from pyinfra.queue.queue_manager.pika_queue_manager import PikaQueueManager +from pyinfra.locations import TEST_DIR, COMPOSE_PATH +from pyinfra.queue.queue_manager.pika_queue_manager import PikaQueueManager, get_connection_params from pyinfra.queue.queue_manager.queue_manager import QueueManager from pyinfra.storage.adapters.azure import AzureStorageAdapter from pyinfra.storage.adapters.s3 import S3StorageAdapter @@ -66,13 +69,35 @@ def storage(client_name, bucket_name, request): storage.clear_bucket(bucket_name) +@pytest.fixture(scope="session", autouse=True) +def docker_compose(sleep_seconds=30): + logger.info(f"Starting docker containers with {COMPOSE_PATH}/docker-compose.yml...") + compose = testcontainers.compose.DockerCompose(COMPOSE_PATH, compose_file_name="docker-compose.yml") + compose.start() + logger.info(f"Sleeping for {sleep_seconds} seconds to wait for containers to finish startup... ") + time.sleep(sleep_seconds) + yield compose + compose.stop() + + +def get_pika_connection_params(): + params = get_connection_params() + return params + + +def get_s3_params(s3_backend): + params = CONFIG.storage[s3_backend] + + return params + + def get_adapter(client_name, s3_backend): if client_name == "mock": return StorageAdapterMock(StorageClientMock()) if client_name == "azure": return AzureStorageAdapter(get_azure_client(CONFIG.storage.azure.connection_string)) if client_name == "s3": - return S3StorageAdapter(get_s3_client(CONFIG.storage[s3_backend])) + return S3StorageAdapter(get_s3_client(get_s3_params(s3_backend))) else: raise UnknownClient(client_name) @@ -81,15 +106,29 @@ def get_queue_manager(queue_manager_name) -> QueueManager: if queue_manager_name == "mock": return QueueManagerMock("input", "output") if queue_manager_name == "pika": - return PikaQueueManager("input", "output") + return PikaQueueManager("input", "output", connection_params=get_pika_connection_params()) @pytest.fixture(scope="session") def queue_manager(queue_manager_name): + def close_connections(): + if queue_manager_name == "pika": + try: + queue_manager.connection.close() + except (pika.exceptions.StreamLostError, pika.exceptions.ConnectionWrongStateError, ConnectionResetError): + logger.debug("Connection was already closed when attempting to close explicitly.") + + def close_channel(): + if queue_manager_name == "pika": + try: + queue_manager.channel.close() + except pika.exceptions.ChannelWrongStateError: + logger.debug("Channel was already closed when attempting to close explicitly.") + queue_manager = get_queue_manager(queue_manager_name) yield queue_manager - if queue_manager_name == "pika": - queue_manager.connection.close() + close_connections() + close_channel() @pytest.fixture(scope="session") diff --git a/pyinfra/test/unit_tests/consumer_test.py b/pyinfra/test/unit_tests/consumer_test.py index 0846bad..5874846 100644 --- a/pyinfra/test/unit_tests/consumer_test.py +++ b/pyinfra/test/unit_tests/consumer_test.py @@ -9,6 +9,9 @@ from pyinfra.exceptions import ProcessingFailure from pyinfra.queue.consumer import Consumer from pyinfra.visitor import get_object_descriptor, ForwardingStrategy +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + @pytest.fixture(scope="session") def consumer(queue_manager, callback): @@ -85,10 +88,11 @@ class TestConsumer: storage.put_object(**get_object_descriptor(message), data=gzip.compress(data)) queue_manager.publish_request(message) - requests = consumer.consume() + requests = consumer.consume(inactivity_timeout=5) - for _, r in zip(items, requests): - queue_manager.publish_response(r, visitor) + for itm, req in zip(items, requests): + logger.debug(f"Processing item {itm}") + queue_manager.publish_response(req, visitor) assert list(map(itemgetter("data"), queue_manager.output_queue.to_list())) == ["00", "11", "22"] diff --git a/pyinfra/utils/__init__.py b/pyinfra/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/utils/banner.py b/pyinfra/utils/banner.py new file mode 100644 index 0000000..20d066e --- /dev/null +++ b/pyinfra/utils/banner.py @@ -0,0 +1,21 @@ +import logging + +from pyinfra.locations import BANNER_FILE + + +def show_banner(): + with open(BANNER_FILE) as f: + banner = "\n" + "".join(f.readlines()) + "\n" + + logger = logging.getLogger(__name__) + logger.propagate = False + + handler = logging.StreamHandler() + handler.setLevel(logging.INFO) + + formatter = logging.Formatter("") + + handler.setFormatter(formatter) + logger.addHandler(handler) + + logger.info(banner) diff --git a/requirements.txt b/requirements.txt index c13ae24..64f0c27 100755 --- a/requirements.txt +++ b/requirements.txt @@ -7,10 +7,7 @@ waitress==2.0.0 azure-core==1.22.1 azure-storage-blob==12.9.0 requests==2.27.1 -# dev +testcontainers==3.4.2 docker-compose==1.29.2 tqdm==4.62.3 -dependency-check - -pyinfra~=0.0.1 -pytest~=7.0.1 \ No newline at end of file +pytest~=7.0.1 diff --git a/run_tests.sh b/run_tests.sh new file mode 100755 index 0000000..e245822 --- /dev/null +++ b/run_tests.sh @@ -0,0 +1,11 @@ +echo "${bamboo_nexus_password}" | docker login --username "${bamboo_nexus_user}" --password-stdin nexus.iqser.com:5001 +docker build -f Dockerfile_tests -t pyinfra-tests:dev . +docker tag pyinfra-tests:dev nexus.iqser.com:5001/red/pyinfra-tests:dev +docker push nexus.iqser.com:5001/red/pyinfra-tests:dev + +rnd=$(date +"%s") +name=pyinfra-tests-${rnd} + +echo "running tests container" + +docker run --rm --net=host --name $name -v $PWD:$PWD -w $PWD -v /var/run/docker.sock:/var/run/docker.sock pyinfra-tests:dev diff --git a/src/serve.py b/src/serve.py index 7e26ce4..236d42f 100644 --- a/src/serve.py +++ b/src/serve.py @@ -4,12 +4,13 @@ from multiprocessing import Process import requests from retry import retry -from pyinfra.config import CONFIG, make_art +from pyinfra.config import CONFIG from pyinfra.exceptions import AnalysisFailure, ConsumerError from pyinfra.flask import run_probing_webserver, set_up_probing_webserver from pyinfra.queue.consumer import Consumer from pyinfra.queue.queue_manager.pika_queue_manager import PikaQueueManager from pyinfra.storage.storages import get_storage +from pyinfra.utils.banner import show_banner from pyinfra.visitor import QueueVisitor, StorageStrategy @@ -39,9 +40,9 @@ def make_callback(analysis_endpoint): def main(): + show_banner() webserver = Process(target=run_probing_webserver, args=(set_up_probing_webserver(),)) - logging.info(make_art()) logging.info("Starting webserver...") webserver.start()