From e3abf2be0f50761d58711a178ee5d48a16dc786b Mon Sep 17 00:00:00 2001 From: Viktor Seifert Date: Tue, 26 Jul 2022 13:15:07 +0200 Subject: [PATCH] Pull request #44: RED-4653 Merge in RR/pyinfra from RED-4653 to master Squashed commit of the following: commit 14ed6d2ee79f9a6bc4bad187dc775f7476a05d97 Author: Viktor Seifert Date: Tue Jul 26 11:08:16 2022 +0200 RED-4653: Disabled coverage check since there not tests at the moment commit e926631b167d03e8cc0867db5b5c7d44d6612dcf Author: Viktor Seifert Date: Tue Jul 26 10:58:50 2022 +0200 RED-4653: Re-added test execution scripts commit 94648cc449bbc392864197a1796f99f8953b7312 Author: Viktor Seifert Date: Tue Jul 26 10:50:42 2022 +0200 RED-4653: Changed error case for processing messages to not requeue the message since that will be handled in DLQ logic commit d77982dfedcec49482293d79818283c8d7a17dc7 Author: Viktor Seifert Date: Tue Jul 26 10:46:32 2022 +0200 RED-4653: Removed unnecessary logging message commit 8c00fd75bf04f8ecc0e9cda654f8e053d4cfb66f Author: Viktor Seifert Date: Tue Jul 26 10:03:35 2022 +0200 RED-4653: Re-added wrongly removed config commit 759d72b3fa093b19f97e68d17bf53390cd5453c7 Author: Viktor Seifert Date: Tue Jul 26 09:57:47 2022 +0200 RED-4653: Removed leftover Docker commands commit 2ff5897ee38e39d6507278b6a82176be2450da16 Author: Viktor Seifert Date: Tue Jul 26 09:48:08 2022 +0200 RED-4653: Removed leftover Docker config commit 1074167aa98f9f59c0f0f534ba2f1ba09ffb0958 Author: Viktor Seifert Date: Tue Jul 26 09:41:21 2022 +0200 RED-4653: Removed Docker build stage since it is not needed for a project that is used as a Python module commit ec769c6cd74a74097d8ebe4800ea6e2ea86236cc Author: Viktor Seifert Date: Mon Jul 25 16:11:50 2022 +0200 RED-4653: Renamed function for better clarity and consistency commit 96e8ac4316ac57aac90066f35422d333c532513b Author: Viktor Seifert Date: Mon Jul 25 15:07:40 2022 +0200 RED-4653: Added code to cancel the queue subscription on application exit to queue manager so that it can exit gracefully commit 64d8e0bd15730898274c08d34f9c34fbac559422 Author: Viktor Seifert Date: Mon Jul 25 13:57:06 2022 +0200 RED-4653: Moved queue cancellation to a separate method so that it can be called on application exit commit aff1d06364f5694c5922f37d961e401c12243221 Author: Viktor Seifert Date: Mon Jul 25 11:51:16 2022 +0200 RED-4653: Re-ordered message processing so that ack occurs after publishing the result, to prevent message loss commit 9339186b86f2fe9653366c22fcdc9f7fc096b138 Author: Viktor Seifert Date: Fri Jul 22 18:07:25 2022 +0200 RED-4653: RED-4653: Reordered code to acknowledge message before publishing a result message commit 2d6fe1cbd95cd86832b086c6dfbcfa62b3ffa16f Author: Viktor Seifert Date: Fri Jul 22 17:00:04 2022 +0200 RED-4653: Hopefully corrected storage bucket env var name commit 8f1ef0dd5532882cb12901721195d9acb336286c Author: Viktor Seifert Date: Fri Jul 22 16:37:27 2022 +0200 RED-4653: Switched to validating the connection url via a regex since the validators lib parses our endpoints incorrectly commit 8d0234fcc5ff7ed1ae7695a17856c6af050065bd Author: Viktor Seifert Date: Fri Jul 22 15:02:54 2022 +0200 RED-4653: Corrected exception creation commit 098a62335b3b695ee409363d429ac07284de7138 Author: Viktor Seifert Date: Fri Jul 22 14:42:22 2022 +0200 RED-4653: Added a descriptive error message when the storage endpoint is nor a correct url commit 379685f964a4de641ce6506713f1ea8914a3f5ab Author: Viktor Seifert Date: Fri Jul 22 14:11:48 2022 +0200 RED-4653: Removed variable re-use to make the code clearer commit 4bf1a023453635568e16b1678ef5ad994c534045 Author: Viktor Seifert Date: Thu Jul 21 17:41:55 2022 +0200 RED-4653: Added explicit conversion of the heartbeat config value to an int before passing it to pika commit 8f2bc4e028aafdef893458d1433a05724f534fce Author: Viktor Seifert Date: Mon Jul 18 16:41:31 2022 +0200 RED-4653: Set heartbeat to lower value so that disconnects are detected more quickly ... and 6 more commits --- Dockerfile | 19 -- Dockerfile_tests | 0 .../src/main/java/buildjob/PlanSpec.java | 39 +--- .../main/resources/scripts/docker-build.sh | 13 -- .../src/main/resources/scripts/sonar-scan.sh | 18 +- banner.txt | 6 - config.yaml | 35 ---- docker-compose.yml | 32 ---- pyinfra/config.py | 95 +++++----- pyinfra/exceptions.py | 34 ---- pyinfra/flask.py | 64 ------- pyinfra/locations.py | 18 -- pyinfra/queue/consumer.py | 16 -- pyinfra/queue/queue_manager.py | 111 ++++++++++++ pyinfra/queue/queue_manager/__init__.py | 0 .../queue/queue_manager/pika_queue_manager.py | 168 ------------------ pyinfra/queue/queue_manager/queue_manager.py | 51 ------ pyinfra/storage/adapters/adapter.py | 34 ---- pyinfra/storage/adapters/azure.py | 43 +++-- pyinfra/storage/adapters/s3.py | 66 +++++-- pyinfra/storage/clients/__init__.py | 0 pyinfra/storage/clients/azure.py | 11 -- pyinfra/storage/clients/s3.py | 40 ----- pyinfra/storage/storage.py | 49 ++--- pyinfra/storage/storages.py | 26 --- pyinfra/utils/banner.py | 21 --- pyinfra/visitor.py | 91 ---------- requirements.txt | 6 - run_tests.sh | 0 scripts/manage_minio.py | 72 -------- scripts/mock_client.py | 88 --------- src/serve.py | 84 --------- test/config.py | 5 - test/config.yaml | 25 --- test/exploration_tests/__init__.py | 0 .../data_json_request_test.py | 79 -------- test/queue/__init__.py | 0 test/queue/queue_manager_mock.py | 46 ----- test/queue/queue_mock.py | 5 - test/storage/__init__.py | 0 test/storage/adapter_mock.py | 30 ---- test/storage/client_mock.py | 27 --- test/unit_tests/__init__.py | 0 test/unit_tests/azure_adapter_test.py | 10 -- test/unit_tests/config_test.py | 45 ----- test/unit_tests/conftest.py | 160 ----------------- test/unit_tests/consumer_test.py | 126 ------------- test/unit_tests/queue_visitor_test.py | 38 ---- test/unit_tests/storage_test.py | 52 ------ 49 files changed, 260 insertions(+), 1738 deletions(-) delete mode 100755 Dockerfile mode change 100755 => 100644 Dockerfile_tests delete mode 100755 bamboo-specs/src/main/resources/scripts/docker-build.sh delete mode 100644 banner.txt delete mode 100755 config.yaml delete mode 100755 docker-compose.yml delete mode 100644 pyinfra/exceptions.py delete mode 100644 pyinfra/flask.py delete mode 100644 pyinfra/locations.py delete mode 100644 pyinfra/queue/consumer.py create mode 100644 pyinfra/queue/queue_manager.py delete mode 100644 pyinfra/queue/queue_manager/__init__.py delete mode 100644 pyinfra/queue/queue_manager/pika_queue_manager.py delete mode 100644 pyinfra/queue/queue_manager/queue_manager.py delete mode 100644 pyinfra/storage/adapters/adapter.py delete mode 100644 pyinfra/storage/clients/__init__.py delete mode 100644 pyinfra/storage/clients/azure.py delete mode 100644 pyinfra/storage/clients/s3.py delete mode 100644 pyinfra/storage/storages.py delete mode 100644 pyinfra/utils/banner.py delete mode 100644 pyinfra/visitor.py mode change 100755 => 100644 run_tests.sh delete mode 100644 scripts/manage_minio.py delete mode 100644 scripts/mock_client.py delete mode 100644 src/serve.py delete mode 100644 test/config.py delete mode 100644 test/config.yaml delete mode 100644 test/exploration_tests/__init__.py delete mode 100644 test/exploration_tests/data_json_request_test.py delete mode 100644 test/queue/__init__.py delete mode 100644 test/queue/queue_manager_mock.py delete mode 100644 test/queue/queue_mock.py delete mode 100644 test/storage/__init__.py delete mode 100644 test/storage/adapter_mock.py delete mode 100644 test/storage/client_mock.py delete mode 100644 test/unit_tests/__init__.py delete mode 100644 test/unit_tests/azure_adapter_test.py delete mode 100644 test/unit_tests/config_test.py delete mode 100644 test/unit_tests/conftest.py delete mode 100644 test/unit_tests/consumer_test.py delete mode 100644 test/unit_tests/queue_visitor_test.py delete mode 100644 test/unit_tests/storage_test.py diff --git a/Dockerfile b/Dockerfile deleted file mode 100755 index 4c06a71..0000000 --- a/Dockerfile +++ /dev/null @@ -1,19 +0,0 @@ -FROM python:3.8 - -# Use a virtual environment. -RUN python -m venv /app/venv -ENV PATH="/app/venv/bin:$PATH" - -# Upgrade pip. -RUN python -m pip install --upgrade pip - -# Make a directory for the service files and copy the service repo into the container. -WORKDIR /app/service -COPY . . - -# Install module & dependencies -RUN python3 -m pip install -e . -RUN python3 -m pip install -r requirements.txt - -# Run the service loop. -CMD ["python", "src/serve.py"] diff --git a/Dockerfile_tests b/Dockerfile_tests old mode 100755 new mode 100644 diff --git a/bamboo-specs/src/main/java/buildjob/PlanSpec.java b/bamboo-specs/src/main/java/buildjob/PlanSpec.java index 6440aa9..20f339d 100644 --- a/bamboo-specs/src/main/java/buildjob/PlanSpec.java +++ b/bamboo-specs/src/main/java/buildjob/PlanSpec.java @@ -44,7 +44,7 @@ public class PlanSpec { //By default credentials are read from the '.credentials' file. BambooServer bambooServer = new BambooServer("http://localhost:8085"); - Plan plan = new PlanSpec().createDockerBuildPlan(); + Plan plan = new PlanSpec().createBuildPlan(); bambooServer.publish(plan); PlanPermissions planPermission = new PlanSpec().createPlanPermission(plan.getIdentifier()); bambooServer.publish(planPermission); @@ -67,38 +67,12 @@ public class PlanSpec { .key(new BambooKey("RED")); } - public Plan createDockerBuildPlan() { + public Plan createBuildPlan() { return new Plan( project(), SERVICE_NAME, new BambooKey(SERVICE_KEY)) - .description("Docker build for pyinfra") + .description("Build for pyinfra") .stages( - new Stage("Build Stage") - .jobs( - new Job("Build Job", new BambooKey("BUILD")) - .tasks( - new CleanWorkingDirectoryTask() - .description("Clean working directory.") - .enabled(true), - new VcsCheckoutTask() - .description("Checkout default repository.") - .checkoutItems(new CheckoutItem().defaultRepository()), - new ScriptTask() - .description("Set config and keys.") - .inlineBody("mkdir -p ~/.ssh\n" + - "echo \"${bamboo.bamboo_agent_ssh}\" | base64 -d >> ~/.ssh/id_rsa\n" + - "echo \"host vector.iqser.com\" > ~/.ssh/config\n" + - "echo \" user bamboo-agent\" >> ~/.ssh/config\n" + - "chmod 600 ~/.ssh/config ~/.ssh/id_rsa"), - new ScriptTask() - .description("Build Docker container.") - .location(Location.FILE) - .fileFromPath("bamboo-specs/src/main/resources/scripts/docker-build.sh") - .argument(SERVICE_NAME)) - .dockerConfiguration( - new DockerConfiguration() - .image("nexus.iqser.com:5001/infra/release_build:4.2.0") - .volume("/var/run/docker.sock", "/var/run/docker.sock"))), new Stage("Sonar Stage") .jobs( new Job("Sonar Job", new BambooKey("SONAR")) @@ -120,12 +94,7 @@ public class PlanSpec { .description("Run Sonarqube scan.") .location(Location.FILE) .fileFromPath("bamboo-specs/src/main/resources/scripts/sonar-scan.sh") - .argument(SERVICE_NAME), - new ScriptTask() - .description("Shut down any running docker containers.") - .location(Location.FILE) - .inlineBody("pip install docker-compose\n" + - "docker-compose down")) + .argument(SERVICE_NAME)) .dockerConfiguration( new DockerConfiguration() .image("nexus.iqser.com:5001/infra/release_build:4.2.0") diff --git a/bamboo-specs/src/main/resources/scripts/docker-build.sh b/bamboo-specs/src/main/resources/scripts/docker-build.sh deleted file mode 100755 index bce07f2..0000000 --- a/bamboo-specs/src/main/resources/scripts/docker-build.sh +++ /dev/null @@ -1,13 +0,0 @@ -#!/bin/bash -set -e - -SERVICE_NAME=$1 - -python3 -m venv build_venv -source build_venv/bin/activate -python3 -m pip install --upgrade pip - -echo "index-url = https://${bamboo_nexus_user}:${bamboo_nexus_password}@nexus.iqser.com/repository/python-combind/simple" >> pip.conf -docker build -f Dockerfile -t nexus.iqser.com:5001/red/$SERVICE_NAME:${bamboo_version_tag} . -echo "${bamboo_nexus_password}" | docker login --username "${bamboo_nexus_user}" --password-stdin nexus.iqser.com:5001 -docker push nexus.iqser.com:5001/red/$SERVICE_NAME:${bamboo_version_tag} diff --git a/bamboo-specs/src/main/resources/scripts/sonar-scan.sh b/bamboo-specs/src/main/resources/scripts/sonar-scan.sh index 0df9717..5ba9ff6 100755 --- a/bamboo-specs/src/main/resources/scripts/sonar-scan.sh +++ b/bamboo-specs/src/main/resources/scripts/sonar-scan.sh @@ -10,17 +10,15 @@ python3 -m pip install dependency-check python3 -m pip install docker-compose python3 -m pip install coverage -echo "docker-compose down" -docker-compose down -sleep 30 +# This is disabled since there are currently no tests in this project. +# If tests are added this can be enabled again +# echo "coverage report generation" +# bash run_tests.sh -echo "coverage report generation" -bash run_tests.sh - -if [ ! -f reports/coverage.xml ] -then - exit 1 -fi +# if [ ! -f reports/coverage.xml ] +# then +# exit 1 +# fi SERVICE_NAME=$1 diff --git a/banner.txt b/banner.txt deleted file mode 100644 index fa0940f..0000000 --- a/banner.txt +++ /dev/null @@ -1,6 +0,0 @@ - ___ _ _ ___ __ - o O O | _ \ | || | |_ _| _ _ / _| _ _ __ _ - o | _/ \_, | | | | ' \ | _| | '_| / _` | - TS__[O] _|_|_ _|__/ |___| |_||_| _|_|_ _|_|_ \__,_| - {======|_| ``` |_| ````|_|`````|_|`````|_|`````|_|`````|_|`````| -./o--000' `-0-0-' `-0-0-' `-0-0-' `-0-0-' `-0-0-' `-0-0-' `-0-0-' \ No newline at end of file diff --git a/config.yaml b/config.yaml deleted file mode 100755 index a7f8efe..0000000 --- a/config.yaml +++ /dev/null @@ -1,35 +0,0 @@ -service: - logging_level: $LOGGING_LEVEL_ROOT|DEBUG # Logging level for service logger - -probing_webserver: - host: $PROBING_WEBSERVER_HOST|"0.0.0.0" # Probe webserver address - port: $PROBING_WEBSERVER_PORT|8080 # Probe webserver port - mode: $PROBING_WEBSERVER_MODE|production # webserver mode: {development, production} - -rabbitmq: - host: $RABBITMQ_HOST|localhost # RabbitMQ host address - port: $RABBITMQ_PORT|5672 # RabbitMQ host port - user: $RABBITMQ_USERNAME|user # RabbitMQ username - password: $RABBITMQ_PASSWORD|bitnami # RabbitMQ password - heartbeat: $RABBITMQ_HEARTBEAT|7200 # Controls AMQP heartbeat timeout in seconds - - queues: - input: $REQUEST_QUEUE|request_queue # Requests to service - output: $RESPONSE_QUEUE|response_queue # Responses by service - dead_letter: $DEAD_LETTER_QUEUE|dead_letter_queue # Messages that failed to process - - callback: - analysis_endpoint: $ANALYSIS_ENDPOINT|"http://127.0.0.1:5000" - -storage: - backend: $STORAGE_BACKEND|s3 # The type of storage to use {s3, azure} - bucket: "STORAGE_BUCKET_NAME|STORAGE_AZURECONTAINERNAME|pyinfra-test-bucket" # The bucket / container to pull files specified in queue requests from - - s3: - endpoint: $STORAGE_ENDPOINT|"http://127.0.0.1:9000" - access_key: $STORAGE_KEY|root - secret_key: $STORAGE_SECRET|password - region: $STORAGE_REGION|"eu-west-1" - - azure: - connection_string: $STORAGE_AZURECONNECTIONSTRING|"DefaultEndpointsProtocol=https;AccountName=iqserdevelopment;AccountKey=4imAbV9PYXaztSOMpIyAClg88bAZCXuXMGJG0GA1eIBpdh2PlnFGoRBnKqLy2YZUSTmZ3wJfC7tzfHtuC6FEhQ==;EndpointSuffix=core.windows.net" diff --git a/docker-compose.yml b/docker-compose.yml deleted file mode 100755 index 04e5296..0000000 --- a/docker-compose.yml +++ /dev/null @@ -1,32 +0,0 @@ -version: '2' -services: - minio: - image: minio/minio - ports: - - "9000:9000" - environment: - - MINIO_ROOT_PASSWORD=password - - MINIO_ROOT_USER=root - volumes: - - ./data/minio_store:/data - command: server /data - network_mode: "bridge" - rabbitmq: - image: docker.io/bitnami/rabbitmq:3.9 - ports: - - '4369:4369' - - '5551:5551' - - '5552:5552' - - '5672:5672' - - '25672:25672' - - '15672:15672' - environment: - - RABBITMQ_SECURE_PASSWORD=yes - - RABBITMQ_VM_MEMORY_HIGH_WATERMARK=100% - - RABBITMQ_DISK_FREE_ABSOLUTE_LIMIT=20Gi - network_mode: "bridge" - volumes: - - /opt/bitnami/rabbitmq/.rabbitmq/:/data/bitnami -volumes: - mdata: - diff --git a/pyinfra/config.py b/pyinfra/config.py index 072a52e..8efc335 100644 --- a/pyinfra/config.py +++ b/pyinfra/config.py @@ -1,55 +1,58 @@ -"""Implements a config object with dot-indexing syntax.""" -import os -from itertools import chain -from operator import truth - -from envyaml import EnvYAML -from funcy import first, juxt, butlast, last - -from pyinfra.locations import CONFIG_FILE +from os import environ -def _get_item_and_maybe_make_dotindexable(container, item): - ret = container[item] - return DotIndexable(ret) if isinstance(ret, dict) else ret +def read_from_environment(environment_variable_name, default_value): + return environ.get(environment_variable_name, default_value) -class DotIndexable: - def __init__(self, x): - self.x = x +class Config(object): + def __init__(self): + # Logging level for service logger + self.logging_level_root = read_from_environment("LOGGING_LEVEL_ROOT", "DEBUG") - def __getattr__(self, item): - return _get_item_and_maybe_make_dotindexable(self.x, item) + # RabbitMQ host address + self.rabbitmq_host = read_from_environment("RABBITMQ_HOST", "localhost") - def __repr__(self): - return self.x.__repr__() + # RabbitMQ host port + self.rabbitmq_port = read_from_environment("RABBITMQ_PORT", "5672") - def __getitem__(self, item): - return self.__getattr__(item) + # RabbitMQ username + self.rabbitmq_username = read_from_environment("RABBITMQ_USERNAME", "user") + + # RabbitMQ password + self.rabbitmq_password = read_from_environment("RABBITMQ_PASSWORD", "bitnami") + + # Controls AMQP heartbeat timeout in seconds + self.rabbitmq_heartbeat = read_from_environment("RABBITMQ_HEARTBEAT", "60") + + # Queue name for requests to the service + self.request_queue = read_from_environment("REQUEST_QUEUE", "request_queue") + + # Queue name for responses by service + self.response_queue = read_from_environment("RESPONSE_QUEUE", "response_queue") + + # Queue name for failed messages + self.dead_letter_queue = read_from_environment("DEAD_LETTER_QUEUE", "dead_letter_queue") + + # The type of storage to use {s3, azure} + self.storage_backend = read_from_environment("STORAGE_BACKEND", "s3") + + # The bucket / container to pull files specified in queue requests from + self.storage_bucket = read_from_environment("STORAGE_BUCKET_NAME", "redaction") + + # Endpoint for s3 storage + self.storage_endpoint = read_from_environment("STORAGE_ENDPOINT", "http://127.0.0.1:9000") + + # User for s3 storage + self.storage_key = read_from_environment("STORAGE_KEY", "root") + + # Password for s3 storage + self.storage_secret = read_from_environment("STORAGE_SECRET", "password") + + # Connection string for Azure storage + self.storage_azureconnectionstring = read_from_environment("STORAGE_AZURECONNECTIONSTRING", + "DefaultEndpointsProtocol=...") -class Config: - def __init__(self, config_path): - self.__config = EnvYAML(config_path) - - def __getattr__(self, item): - if item in self.__config: - return _get_item_and_maybe_make_dotindexable(self.__config, item) - - def __getitem__(self, item): - return self.__getattr__(item) - - -CONFIG = Config(CONFIG_FILE) - - -def parse_disjunction_string(disjunction_string): - def try_parse_env_var(disjunction_string): - try: - return os.environ[disjunction_string] - except KeyError: - return None - - options = disjunction_string.split("|") - identifiers, fallback_value = juxt(butlast, last)(options) - return first(chain(filter(truth, map(try_parse_env_var, identifiers)), [fallback_value])) +def get_config() -> Config: + return Config() diff --git a/pyinfra/exceptions.py b/pyinfra/exceptions.py deleted file mode 100644 index 56405d4..0000000 --- a/pyinfra/exceptions.py +++ /dev/null @@ -1,34 +0,0 @@ -class AnalysisFailure(Exception): - pass - - -class DataLoadingFailure(Exception): - pass - - -class ProcessingFailure(Exception): - pass - - -class UnknownStorageBackend(ValueError): - pass - - -class InvalidEndpoint(ValueError): - pass - - -class UnknownClient(ValueError): - pass - - -class ConsumerError(Exception): - pass - - -class NoSuchContainer(KeyError): - pass - - -class IntentionalTestException(RuntimeError): - pass diff --git a/pyinfra/flask.py b/pyinfra/flask.py deleted file mode 100644 index 549ef0c..0000000 --- a/pyinfra/flask.py +++ /dev/null @@ -1,64 +0,0 @@ -import logging - -import requests -from flask import Flask, jsonify -from waitress import serve - -from pyinfra.config import CONFIG - -logger = logging.getLogger(__file__) -logger.setLevel(CONFIG.service.logging_level) - - -def run_probing_webserver(app, host=None, port=None, mode=None): - if not host: - host = CONFIG.probing_webserver.host - - if not port: - port = CONFIG.probing_webserver.port - - if not mode: - mode = CONFIG.probing_webserver.mode - - if mode == "development": - app.run(host=host, port=port, debug=True) - - elif mode == "production": - serve(app, host=host, port=port) - - -def set_up_probing_webserver(): - # TODO: implement meaningful checks - app = Flask(__name__) - informed_about_missing_prometheus_endpoint = False - - @app.route("/ready", methods=["GET"]) - def ready(): - resp = jsonify("OK") - resp.status_code = 200 - return resp - - @app.route("/health", methods=["GET"]) - def healthy(): - resp = jsonify("OK") - resp.status_code = 200 - return resp - - @app.route("/prometheus", methods=["GET"]) - def get_metrics_from_analysis_endpoint(): - nonlocal informed_about_missing_prometheus_endpoint - try: - resp = requests.get(f"{CONFIG.rabbitmq.callback.analysis_endpoint}/prometheus") - resp.raise_for_status() - except ConnectionError: - return "" - except requests.exceptions.HTTPError as err: - if resp.status_code == 404: - if not informed_about_missing_prometheus_endpoint: - logger.warning(f"Got no metrics from analysis prometheus endpoint: {err}") - informed_about_missing_prometheus_endpoint = True - else: - logging.warning(f"Caught {err}") - return resp.text - - return app diff --git a/pyinfra/locations.py b/pyinfra/locations.py deleted file mode 100644 index 54c5a17..0000000 --- a/pyinfra/locations.py +++ /dev/null @@ -1,18 +0,0 @@ -"""Defines constant paths relative to the module root path.""" - -from pathlib import Path - - -MODULE_DIR = Path(__file__).resolve().parents[0] - -PACKAGE_ROOT_DIR = MODULE_DIR.parents[0] - -TEST_DIR = PACKAGE_ROOT_DIR / "test" - -CONFIG_FILE = PACKAGE_ROOT_DIR / "config.yaml" - -TEST_CONFIG_FILE = TEST_DIR / "config.yaml" - -COMPOSE_PATH = PACKAGE_ROOT_DIR - -BANNER_FILE = PACKAGE_ROOT_DIR / "banner.txt" diff --git a/pyinfra/queue/consumer.py b/pyinfra/queue/consumer.py deleted file mode 100644 index 1072178..0000000 --- a/pyinfra/queue/consumer.py +++ /dev/null @@ -1,16 +0,0 @@ -from pyinfra.queue.queue_manager.queue_manager import QueueManager - - -class Consumer: - def __init__(self, callback, queue_manager: QueueManager): - self.queue_manager = queue_manager - self.callback = callback - - def consume_and_publish(self): - self.queue_manager.consume_and_publish(self.callback) - - def basic_consume_and_publish(self): - self.queue_manager.basic_consume_and_publish(self.callback) - - def consume(self, **kwargs): - return self.queue_manager.consume(**kwargs) diff --git a/pyinfra/queue/queue_manager.py b/pyinfra/queue/queue_manager.py new file mode 100644 index 0000000..17a7ada --- /dev/null +++ b/pyinfra/queue/queue_manager.py @@ -0,0 +1,111 @@ +import atexit +import json +import logging +import signal +from typing import Callable + +import pika +import pika.exceptions + +from pyinfra.config import get_config, Config + +CONFIG = get_config() + +pika_logger = logging.getLogger("pika") +pika_logger.setLevel(logging.WARNING) + + +def get_connection_params(config: Config) -> pika.ConnectionParameters: + credentials = pika.PlainCredentials(username=config.rabbitmq_username, password=config.rabbitmq_password) + pika_connection_params = { + "host": config.rabbitmq_host, + "port": config.rabbitmq_port, + "credentials": credentials, + "heartbeat": int(config.rabbitmq_heartbeat), + } + + return pika.ConnectionParameters(**pika_connection_params) + + +def _get_n_previous_attempts(props): + return 0 if props.headers is None else props.headers.get("x-retry-count", 0) + + +class QueueManager(object): + def __init__(self, config: Config = CONFIG): + connection_params = get_connection_params(config) + + atexit.register(self.stop_consuming) + signal.signal(signal.SIGTERM, self.stop_consuming) + signal.signal(signal.SIGINT, self.stop_consuming) + + self._connection = pika.BlockingConnection(parameters=connection_params) + self._channel = self._connection.channel() + self._channel.basic_qos(prefetch_count=1) + + args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": CONFIG.dead_letter_queue} + + self._input_queue = config.request_queue + self._output_queue = config.response_queue + + self._channel.queue_declare(self._input_queue, arguments=args, auto_delete=False, durable=True) + self._channel.queue_declare(self._output_queue, arguments=args, auto_delete=False, durable=True) + + self._consumer_token = None + + self.logger = logging.getLogger("queue_manager") + self.logger.setLevel(CONFIG.logging_level_root) + + def start_consuming(self, process_message_callback: Callable): + callback = self._create_queue_callback(process_message_callback) + + self.logger.info("Consuming from queue") + self._consumer_token = None + try: + self._consumer_token = self._channel.basic_consume(self._input_queue, callback) + self.logger.info(f"Registered with consumer-tag: {self._consumer_token}") + self._channel.start_consuming() + finally: + self.logger.warning("An unhandled exception occurred while consuming messages. Consuming will stop.") + self.stop_consuming() + + def stop_consuming(self): + if self._consumer_token and self._connection: + self.logger.info(f"Cancelling subscription for consumer-tag: {self._consumer_token}") + self._channel.basic_cancel(self._consumer_token) + self._connection.close() + + self._consumer_token = None + + def _create_queue_callback(self, process_message_callback: Callable): + def callback(_channel, frame, properties, body): + self.logger.info(f"Received message from queue with delivery_tag {frame.delivery_tag}") + self.logger.debug(f"Processing {(frame, properties, body)}.") + + try: + unpacked_message_body = json.loads(body) + + callback_result = process_message_callback(unpacked_message_body) + + self.logger.info("Processed message, publishing result to result-queue") + self._channel.basic_publish("", self._output_queue, json.dumps(callback_result).encode()) + + self.logger.info( + f"Result published, acknowledging incoming message with delivery_tag {frame.delivery_tag}") + self._channel.basic_ack(frame.delivery_tag) + + self.logger.info(f"Message with delivery_tag {frame.delivery_tag} processed") + except Exception as ex: + n_attempts = _get_n_previous_attempts(properties) + 1 + self.logger.warning(f"Failed to process message, {n_attempts} attempts, error: {str(ex)}") + self._channel.basic_nack(frame.delivery_tag, requeue=False) + raise ex + + return callback + + def clear(self): + try: + self._channel.queue_purge(self._input_queue) + self._channel.queue_purge(self._output_queue) + except pika.exceptions.ChannelWrongStateError: + pass diff --git a/pyinfra/queue/queue_manager/__init__.py b/pyinfra/queue/queue_manager/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pyinfra/queue/queue_manager/pika_queue_manager.py b/pyinfra/queue/queue_manager/pika_queue_manager.py deleted file mode 100644 index ad5191a..0000000 --- a/pyinfra/queue/queue_manager/pika_queue_manager.py +++ /dev/null @@ -1,168 +0,0 @@ -import json -import logging -import time - -import pika - -from pyinfra.config import CONFIG -from pyinfra.exceptions import ProcessingFailure, DataLoadingFailure -from pyinfra.queue.queue_manager.queue_manager import QueueHandle, QueueManager - -logger = logging.getLogger("pika") -logger.setLevel(logging.WARNING) - -logger = logging.getLogger(__name__) -logger.setLevel(CONFIG.service.logging_level) - - -def monkey_patch_queue_handle(channel, queue) -> QueueHandle: - - empty_message = (None, None, None) - - def is_empty_message(message): - return message == empty_message - - queue_handle = QueueHandle() - queue_handle.empty = lambda: is_empty_message(channel.basic_get(queue)) - - def produce_items(): - - while True: - message = channel.basic_get(queue) - - if is_empty_message(message): - break - - method_frame, properties, body = message - channel.basic_ack(method_frame.delivery_tag) - yield json.loads(body) - - queue_handle.to_list = lambda: list(produce_items()) - - return queue_handle - - -def get_connection_params(): - - credentials = pika.PlainCredentials(username=CONFIG.rabbitmq.user, password=CONFIG.rabbitmq.password) - kwargs = { - "host": CONFIG.rabbitmq.host, - "port": CONFIG.rabbitmq.port, - "credentials": credentials, - "heartbeat": CONFIG.rabbitmq.heartbeat, - } - parameters = pika.ConnectionParameters(**kwargs) - - return parameters - - -def get_n_previous_attempts(props): - return 0 if props.headers is None else props.headers.get("x-retry-count", 0) - - -def attempts_remain(n_attempts, max_attempts): - return n_attempts < max_attempts - - -class PikaQueueManager(QueueManager): - def __init__(self, input_queue, output_queue, dead_letter_queue=None, connection_params=None): - super().__init__(input_queue, output_queue) - - if not connection_params: - connection_params = get_connection_params() - - self.connection = pika.BlockingConnection(parameters=connection_params) - self.channel = self.connection.channel() - self.channel.basic_qos(prefetch_count=1) - - if not dead_letter_queue: - dead_letter_queue = CONFIG.rabbitmq.queues.dead_letter - - args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": dead_letter_queue} - - self.channel.queue_declare(input_queue, arguments=args, auto_delete=False, durable=True) - self.channel.queue_declare(output_queue, arguments=args, auto_delete=False, durable=True) - - def republish(self, body, n_current_attempts, frame): - self.channel.basic_publish( - exchange="", - routing_key=self._input_queue, - body=body, - properties=pika.BasicProperties(headers={"x-retry-count": n_current_attempts}), - ) - self.channel.basic_ack(delivery_tag=frame.delivery_tag) - - def publish_request(self, request): - logger.debug(f"Publishing {request}") - self.channel.basic_publish("", self._input_queue, json.dumps(request).encode()) - - def reject(self, body, frame): - logger.error(f"Adding to dead letter queue: {body}") - self.channel.basic_reject(delivery_tag=frame.delivery_tag, requeue=False) - - def publish_response(self, message, callback, max_attempts=3): - - logger.debug(f"Processing {message}.") - - frame, properties, body = message - - n_attempts = get_n_previous_attempts(properties) + 1 - - try: - response = json.dumps(callback(json.loads(body))) - self.channel.basic_publish("", self._output_queue, response.encode()) - self.channel.basic_ack(frame.delivery_tag) - except (ProcessingFailure, DataLoadingFailure): - - logger.error(f"Message failed to process {n_attempts}/{max_attempts} times: {body}") - - if attempts_remain(n_attempts, max_attempts): - self.republish(body, n_attempts, frame) - else: - self.reject(body, frame) - - def pull_request(self): - return self.channel.basic_get(self._input_queue) - - def consume(self, inactivity_timeout=None): - logger.debug("Consuming") - return self.channel.consume(self._input_queue, inactivity_timeout=inactivity_timeout) - - def consume_and_publish(self, visitor): - - logger.info(f"Consuming with callback {visitor.callback.__name__}") - - for message in self.consume(): - self.publish_response(message, visitor) - - def basic_consume_and_publish(self, visitor): - - logger.info(f"Basic consuming with callback {visitor.callback.__name__}") - - def callback(channel, frame, properties, body): - message = (frame, properties, body) - return self.publish_response(message, visitor) - - consumer_tag = None - - try: - consumer_tag = self.channel.basic_consume(self._input_queue, callback) - self.channel.start_consuming() - finally: - if consumer_tag: - self.channel.basic_cancel(consumer_tag) - - def clear(self): - try: - self.channel.queue_purge(self._input_queue) - self.channel.queue_purge(self._output_queue) - except pika.exceptions.ChannelWrongStateError: - pass - - @property - def input_queue(self) -> QueueHandle: - return monkey_patch_queue_handle(self.channel, self._input_queue) - - @property - def output_queue(self) -> QueueHandle: - return monkey_patch_queue_handle(self.channel, self._output_queue) diff --git a/pyinfra/queue/queue_manager/queue_manager.py b/pyinfra/queue/queue_manager/queue_manager.py deleted file mode 100644 index d42bfc5..0000000 --- a/pyinfra/queue/queue_manager/queue_manager.py +++ /dev/null @@ -1,51 +0,0 @@ -import abc - - -class QueueHandle: - def empty(self) -> bool: - raise NotImplementedError - - def to_list(self) -> list: - raise NotImplementedError - - -class QueueManager(abc.ABC): - def __init__(self, input_queue, output_queue): - self._input_queue = input_queue - self._output_queue = output_queue - - @abc.abstractmethod - def publish_request(self, request): - raise NotImplementedError - - @abc.abstractmethod - def publish_response(self, response, callback): - raise NotImplementedError - - @abc.abstractmethod - def pull_request(self): - raise NotImplementedError - - @abc.abstractmethod - def consume(self, **kwargs): - raise NotImplementedError - - @abc.abstractmethod - def clear(self): - raise NotImplementedError - - @abc.abstractmethod - def input_queue(self) -> QueueHandle: - raise NotImplementedError - - @abc.abstractmethod - def output_queue(self) -> QueueHandle: - raise NotImplementedError - - @abc.abstractmethod - def consume_and_publish(self, callback): - raise NotImplementedError - - @abc.abstractmethod - def basic_consume_and_publish(self, callback): - raise NotImplementedError diff --git a/pyinfra/storage/adapters/adapter.py b/pyinfra/storage/adapters/adapter.py deleted file mode 100644 index 57cff28..0000000 --- a/pyinfra/storage/adapters/adapter.py +++ /dev/null @@ -1,34 +0,0 @@ -from abc import ABC, abstractmethod - - -class StorageAdapter(ABC): - def __init__(self, client): - self.__client = client - - @abstractmethod - def make_bucket(self, bucket_name): - raise NotImplementedError - - @abstractmethod - def has_bucket(self, bucket_name): - raise NotImplementedError - - @abstractmethod - def put_object(self, bucket_name, object_name, data): - raise NotImplementedError - - @abstractmethod - def get_object(self, bucket_name, object_name): - raise NotImplementedError - - @abstractmethod - def get_all_objects(self, bucket_name): - raise NotImplementedError - - @abstractmethod - def clear_bucket(self, bucket_name): - raise NotImplementedError - - @abstractmethod - def get_all_object_names(self, bucket_name): - raise NotImplementedError diff --git a/pyinfra/storage/adapters/azure.py b/pyinfra/storage/adapters/azure.py index 334dbaf..8f59ea7 100644 --- a/pyinfra/storage/adapters/azure.py +++ b/pyinfra/storage/adapters/azure.py @@ -3,30 +3,32 @@ from itertools import repeat from operator import attrgetter from azure.storage.blob import ContainerClient, BlobServiceClient +from retry import retry -from pyinfra.storage.adapters.adapter import StorageAdapter +from pyinfra.config import Config, get_config + +CONFIG = get_config() +logger = logging.getLogger(CONFIG.logging_level_root) -logger = logging.getLogger(__name__) logging.getLogger("azure").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) -class AzureStorageAdapter(StorageAdapter): - def __init__(self, client): - super().__init__(client=client) - self.__client: BlobServiceClient = self._StorageAdapter__client +class AzureStorageAdapter(object): + def __init__(self, client: BlobServiceClient): + self._client: BlobServiceClient = client def has_bucket(self, bucket_name): - container_client = self.__client.get_container_client(bucket_name) + container_client = self._client.get_container_client(bucket_name) return container_client.exists() def make_bucket(self, bucket_name): - container_client = self.__client.get_container_client(bucket_name) - container_client if container_client.exists() else self.__client.create_container(bucket_name) + container_client = self._client.get_container_client(bucket_name) + container_client if container_client.exists() else self._client.create_container(bucket_name) def __provide_container_client(self, bucket_name) -> ContainerClient: self.make_bucket(bucket_name) - container_client = self.__client.get_container_client(bucket_name) + container_client = self._client.get_container_client(bucket_name) return container_client def put_object(self, bucket_name, object_name, data): @@ -35,15 +37,19 @@ class AzureStorageAdapter(StorageAdapter): blob_client = container_client.get_blob_client(object_name) blob_client.upload_blob(data, overwrite=True) + @retry(tries=3, delay=5, jitter=(1, 3)) def get_object(self, bucket_name, object_name): logger.debug(f"Downloading '{object_name}'...") - container_client = self.__provide_container_client(bucket_name) - blob_client = container_client.get_blob_client(object_name) - blob_data = blob_client.download_blob() - return blob_data.readall() + + try: + container_client = self.__provide_container_client(bucket_name) + blob_client = container_client.get_blob_client(object_name) + blob_data = blob_client.download_blob() + return blob_data.readall() + except Exception as err: + raise Exception("Failed getting object from azure client") from err def get_all_objects(self, bucket_name): - container_client = self.__provide_container_client(bucket_name) blobs = container_client.list_blobs() for blob in blobs: @@ -55,7 +61,7 @@ class AzureStorageAdapter(StorageAdapter): def clear_bucket(self, bucket_name): logger.debug(f"Clearing Azure container '{bucket_name}'...") - container_client = self.__client.get_container_client(bucket_name) + container_client = self._client.get_container_client(bucket_name) blobs = container_client.list_blobs() container_client.delete_blobs(*blobs) @@ -63,3 +69,8 @@ class AzureStorageAdapter(StorageAdapter): container_client = self.__provide_container_client(bucket_name) blobs = container_client.list_blobs() return zip(repeat(bucket_name), map(attrgetter("name"), blobs)) + + +def get_azure_storage(config: Config): + return AzureStorageAdapter( + BlobServiceClient.from_connection_string(conn_str=config.storage_azureconnectionstring)) diff --git a/pyinfra/storage/adapters/s3.py b/pyinfra/storage/adapters/s3.py index 06a1f69..ef2b1d1 100644 --- a/pyinfra/storage/adapters/s3.py +++ b/pyinfra/storage/adapters/s3.py @@ -1,58 +1,88 @@ import io -from itertools import repeat import logging +import re +from itertools import repeat from operator import attrgetter +from urllib.parse import urlparse from minio import Minio +from retry import retry -from pyinfra.exceptions import DataLoadingFailure -from pyinfra.storage.adapters.adapter import StorageAdapter - -logger = logging.getLogger(__name__) +from pyinfra.config import Config, get_config -class S3StorageAdapter(StorageAdapter): - def __init__(self, client): - super().__init__(client=client) - self.__client: Minio = self._StorageAdapter__client +CONFIG = get_config() +logger = logging.getLogger(CONFIG.logging_level_root) + +ALLOWED_CONNECTION_SCHEMES = {"http", "https"} +URL_VALIDATOR = re.compile( + r"^((" + + r"([A-Za-z]{3,9}:(?:\/\/)?)" + + r"(?:[\-;:&=\+\$,\w]+@)?" + r"[A-Za-z0-9\.\-]+|(?:www\.|[\-;:&=\+\$,\w]+@)" + + r"[A-Za-z0-9\.\-]+)" + r"((?:\/[\+~%\/\.\w\-_]*)?" + + r"\??(?:[\-\+=&;%@\.\w_]*)#?(?:[\.\!\/\\\w]*))?)") + + +class S3StorageAdapter(object): + def __init__(self, client: Minio): + self._client = client def make_bucket(self, bucket_name): if not self.has_bucket(bucket_name): - self.__client.make_bucket(bucket_name) + self._client.make_bucket(bucket_name) def has_bucket(self, bucket_name): - return self.__client.bucket_exists(bucket_name) + return self._client.bucket_exists(bucket_name) def put_object(self, bucket_name, object_name, data): logger.debug(f"Uploading '{object_name}'...") data = io.BytesIO(data) - self.__client.put_object(bucket_name, object_name, data, length=data.getbuffer().nbytes) + self._client.put_object(bucket_name, object_name, data, length=data.getbuffer().nbytes) + @retry(tries=3, delay=5, jitter=(1, 3)) def get_object(self, bucket_name, object_name): logger.debug(f"Downloading '{object_name}'...") response = None try: - response = self.__client.get_object(bucket_name, object_name) + response = self._client.get_object(bucket_name, object_name) return response.data except Exception as err: - raise DataLoadingFailure("Failed getting object from s3 client") from err + raise Exception("Failed getting object from s3 client") from err finally: if response: response.close() response.release_conn() def get_all_objects(self, bucket_name): - for obj in self.__client.list_objects(bucket_name, recursive=True): + for obj in self._client.list_objects(bucket_name, recursive=True): logger.debug(f"Downloading '{obj.object_name}'...") yield self.get_object(bucket_name, obj.object_name) def clear_bucket(self, bucket_name): logger.debug(f"Clearing S3 bucket '{bucket_name}'...") - objects = self.__client.list_objects(bucket_name, recursive=True) + objects = self._client.list_objects(bucket_name, recursive=True) for obj in objects: - self.__client.remove_object(bucket_name, obj.object_name) + self._client.remove_object(bucket_name, obj.object_name) def get_all_object_names(self, bucket_name): - objs = self.__client.list_objects(bucket_name, recursive=True) + objs = self._client.list_objects(bucket_name, recursive=True) return zip(repeat(bucket_name), map(attrgetter("object_name"), objs)) + + +def _parse_endpoint(endpoint): + parsed_url = urlparse(endpoint) + if URL_VALIDATOR.match(endpoint) and parsed_url.netloc and parsed_url.scheme in ALLOWED_CONNECTION_SCHEMES: + return {"secure": parsed_url.scheme == "https", "endpoint": parsed_url.netloc} + else: + raise Exception(f"The configured storage endpoint is not a valid url: {endpoint}") + + +def get_s3_storage(config: Config): + return S3StorageAdapter(Minio( + **_parse_endpoint(config.storage_endpoint), + access_key=config.storage_key, + secret_key=config.storage_secret, + # FIXME Is this still needed? Check and if yes, add it to config + # region=config.region, + )) diff --git a/pyinfra/storage/clients/__init__.py b/pyinfra/storage/clients/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pyinfra/storage/clients/azure.py b/pyinfra/storage/clients/azure.py deleted file mode 100644 index 5c3751e..0000000 --- a/pyinfra/storage/clients/azure.py +++ /dev/null @@ -1,11 +0,0 @@ -from azure.storage.blob import BlobServiceClient - -from pyinfra.config import CONFIG - - -def get_azure_client(connection_string=None) -> BlobServiceClient: - - if not connection_string: - connection_string = CONFIG.storage.azure.connection_string - - return BlobServiceClient.from_connection_string(conn_str=connection_string) diff --git a/pyinfra/storage/clients/s3.py b/pyinfra/storage/clients/s3.py deleted file mode 100644 index f130989..0000000 --- a/pyinfra/storage/clients/s3.py +++ /dev/null @@ -1,40 +0,0 @@ -import re - -from minio import Minio - -from pyinfra.config import CONFIG -from pyinfra.exceptions import InvalidEndpoint - - -def parse_endpoint(endpoint): - # FIXME Greedy matching (.+) since we get random storage names on kubernetes (eg http://red-research-headless:9000) - # FIXME this has been broken and accepts invalid URLs - endpoint_pattern = r"(?Phttps?)*(?:://)*(?P
(?:(?:(?:\d{1,3}\.){3}\d{1,3})|.+)(?:\:\d+)?)" - - match = re.match(endpoint_pattern, endpoint) - - if not match: - raise InvalidEndpoint(f"Endpoint {endpoint} is invalid; expected {endpoint_pattern}") - - return {"secure": match.group("protocol") == "https", "endpoint": match.group("address")} - - -def get_s3_client(params=None) -> Minio: - """ - Args: - params: dict like - { - "endpoint": - "access_key": - "secret_key": - } - """ - if not params: - params = CONFIG.storage.s3 - - return Minio( - **parse_endpoint(params.endpoint), - access_key=params.access_key, - secret_key=params.secret_key, - region=params.region, - ) diff --git a/pyinfra/storage/storage.py b/pyinfra/storage/storage.py index c93ccf7..dcd777a 100644 --- a/pyinfra/storage/storage.py +++ b/pyinfra/storage/storage.py @@ -1,44 +1,21 @@ import logging -from retry import retry - -from pyinfra.config import CONFIG -from pyinfra.exceptions import DataLoadingFailure -from pyinfra.storage.adapters.adapter import StorageAdapter +from pyinfra.config import get_config, Config +from pyinfra.storage.adapters.azure import get_azure_storage +from pyinfra.storage.adapters.s3 import get_s3_storage +CONFIG = get_config() logger = logging.getLogger(__name__) -logger.setLevel(CONFIG.service.logging_level) +logger.setLevel(CONFIG.logging_level_root) -class Storage: - def __init__(self, adapter: StorageAdapter): - self.__adapter = adapter +def get_storage(config: Config): - def make_bucket(self, bucket_name): - self.__adapter.make_bucket(bucket_name) + if config.storage_backend == "s3": + storage = get_s3_storage(config) + elif config.storage_backend == "azure": + storage = get_azure_storage(config) + else: + raise Exception(f"Unknown storage backend '{config.storage_backend}'.") - def has_bucket(self, bucket_name): - return self.__adapter.has_bucket(bucket_name) - - def put_object(self, bucket_name, object_name, data): - self.__adapter.put_object(bucket_name, object_name, data) - - def get_object(self, bucket_name, object_name): - return self.__get_object(bucket_name, object_name) - - @retry(DataLoadingFailure, tries=3, delay=5, jitter=(1, 3)) - def __get_object(self, bucket_name, object_name): - try: - return self.__adapter.get_object(bucket_name, object_name) - except Exception as err: - logging.error(err) - raise DataLoadingFailure from err - - def get_all_objects(self, bucket_name): - return self.__adapter.get_all_objects(bucket_name) - - def clear_bucket(self, bucket_name): - return self.__adapter.clear_bucket(bucket_name) - - def get_all_object_names(self, bucket_name): - return self.__adapter.get_all_object_names(bucket_name) + return storage diff --git a/pyinfra/storage/storages.py b/pyinfra/storage/storages.py deleted file mode 100644 index 2e58186..0000000 --- a/pyinfra/storage/storages.py +++ /dev/null @@ -1,26 +0,0 @@ -from pyinfra.exceptions import UnknownStorageBackend -from pyinfra.storage.adapters.azure import AzureStorageAdapter -from pyinfra.storage.adapters.s3 import S3StorageAdapter -from pyinfra.storage.clients.azure import get_azure_client -from pyinfra.storage.clients.s3 import get_s3_client -from pyinfra.storage.storage import Storage - - -def get_azure_storage(config=None): - return Storage(AzureStorageAdapter(get_azure_client(config))) - - -def get_s3_storage(config=None): - return Storage(S3StorageAdapter(get_s3_client(config))) - - -def get_storage(storage_backend): - - if storage_backend == "s3": - storage = get_s3_storage() - elif storage_backend == "azure": - storage = get_azure_storage() - else: - raise UnknownStorageBackend(f"Unknown storage backend '{storage_backend}'.") - - return storage diff --git a/pyinfra/utils/banner.py b/pyinfra/utils/banner.py deleted file mode 100644 index 20d066e..0000000 --- a/pyinfra/utils/banner.py +++ /dev/null @@ -1,21 +0,0 @@ -import logging - -from pyinfra.locations import BANNER_FILE - - -def show_banner(): - with open(BANNER_FILE) as f: - banner = "\n" + "".join(f.readlines()) + "\n" - - logger = logging.getLogger(__name__) - logger.propagate = False - - handler = logging.StreamHandler() - handler.setLevel(logging.INFO) - - formatter = logging.Formatter("") - - handler.setFormatter(formatter) - logger.addHandler(handler) - - logger.info(banner) diff --git a/pyinfra/visitor.py b/pyinfra/visitor.py deleted file mode 100644 index 1fbdab0..0000000 --- a/pyinfra/visitor.py +++ /dev/null @@ -1,91 +0,0 @@ -import abc -import gzip -import json -import logging -from operator import itemgetter -from typing import Callable - -from pyinfra.config import CONFIG, parse_disjunction_string -from pyinfra.exceptions import DataLoadingFailure -from pyinfra.storage.storage import Storage - - -def get_object_name(body): - dossier_id, file_id, target_file_extension = itemgetter("dossierId", "fileId", "targetFileExtension")(body) - object_name = f"{dossier_id}/{file_id}.{target_file_extension}" - return object_name - - -def get_response_object_name(body): - dossier_id, file_id, response_file_extension = itemgetter("dossierId", "fileId", "responseFileExtension")(body) - object_name = f"{dossier_id}/{file_id}.{response_file_extension}" - return object_name - - -def get_object_descriptor(body): - return {"bucket_name": parse_disjunction_string(CONFIG.storage.bucket), "object_name": get_object_name(body)} - - -def get_response_object_descriptor(body): - return { - "bucket_name": parse_disjunction_string(CONFIG.storage.bucket), - "object_name": get_response_object_name(body), - } - - -class ResponseStrategy(abc.ABC): - @abc.abstractmethod - def handle_response(self, body): - pass - - def __call__(self, body): - return self.handle_response(body) - - -class StorageStrategy(ResponseStrategy): - def __init__(self, storage): - self.storage = storage - - def handle_response(self, body): - self.storage.put_object(**get_response_object_descriptor(body), data=gzip.compress(json.dumps(body).encode())) - body.pop("data") - return body - - -class ForwardingStrategy(ResponseStrategy): - def handle_response(self, body): - return body - - -class QueueVisitor: - def __init__(self, storage: Storage, callback: Callable, response_strategy): - self.storage = storage - self.callback = callback - self.response_strategy = response_strategy - - def load_data(self, body): - def download(): - logging.debug(f"Downloading {object_descriptor}...") - data = self.storage.get_object(**object_descriptor) - logging.debug(f"Downloaded {object_descriptor}.") - return data - - object_descriptor = get_object_descriptor(body) - - try: - return gzip.decompress(download()) - except Exception as err: - logging.warning(f"Loading data from storage failed for {object_descriptor}.") - raise DataLoadingFailure from err - - def process_data(self, data, body): - return self.callback({**body, "data": data}) - - def load_and_process(self, body): - data = self.process_data(self.load_data(body), body) - result_body = {**body, "data": data} - return result_body - - def __call__(self, body): - result_body = self.load_and_process(body) - return self.response_strategy(result_body) diff --git a/requirements.txt b/requirements.txt index e67c02b..43409f0 100755 --- a/requirements.txt +++ b/requirements.txt @@ -1,15 +1,9 @@ pika==1.2.0 retry==0.9.2 -envyaml==1.10.211231 minio==7.1.3 -Flask==2.1.1 -waitress==2.0.0 azure-core==1.22.1 azure-storage-blob==12.9.0 -requests==2.27.1 testcontainers==3.4.2 docker-compose==1.29.2 -tqdm==4.62.3 pytest~=7.0.1 funcy==1.17 -fpdf==1.7.2 diff --git a/run_tests.sh b/run_tests.sh old mode 100755 new mode 100644 diff --git a/scripts/manage_minio.py b/scripts/manage_minio.py deleted file mode 100644 index 88e1127..0000000 --- a/scripts/manage_minio.py +++ /dev/null @@ -1,72 +0,0 @@ -import argparse -import gzip -import os -from pathlib import Path - -from tqdm import tqdm - -from pyinfra.config import CONFIG, parse_disjunction_string -from pyinfra.storage.storages import get_s3_storage - - -def parse_args(): - parser = argparse.ArgumentParser() - - subparsers = parser.add_subparsers(help="sub-command help", dest="command") - - parser_add = subparsers.add_parser("add", help="Add file(s) to the MinIO store") - parser_add.add_argument("dossier_id") - add_group = parser_add.add_mutually_exclusive_group(required=True) - add_group.add_argument("--file", "-f") - add_group.add_argument("--directory", "-d") - - subparsers.add_parser("purge", help="Delete all files and buckets in the MinIO store") - - args = parser.parse_args() - return args - - -def combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, extension): - return f"{dossier_id}/{file_id}{extension}" - - -def upload_compressed_response(storage, bucket_name, dossier_id, file_id, result) -> None: - data = gzip.compress(result.encode()) - path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, CONFIG.service.response.extension) - storage.put_object(bucket_name, path_gz, data) - - -def add_file_compressed(storage, bucket_name, dossier_id, path) -> None: - if Path(path).suffix == ".pdf": - suffix_gz = ".ORIGIN.pdf.gz" - if Path(path).suffix == ".json": - suffix_gz = ".TEXT.json.gz" - path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, Path(path).stem, suffix_gz) - - with open(path, "rb") as f: - data = gzip.compress(f.read()) - storage.put_object(bucket_name, path_gz, data) - - -if __name__ == "__main__": - - storage = get_s3_storage() - bucket_name = parse_disjunction_string(CONFIG.storage.bucket) - - if not storage.has_bucket(bucket_name): - storage.make_bucket(bucket_name) - - args = parse_args() - - if args.command == "add": - - if args.file: - add_file_compressed(storage, bucket_name, args.dossier_id, args.file) - - elif args.directory: - for fname in tqdm([*os.listdir(args.directory)], desc="Adding files"): - path = Path(args.directory) / fname - add_file_compressed(storage, bucket_name, args.dossier_id, path) - - elif args.command == "purge": - storage.clear_bucket(bucket_name) diff --git a/scripts/mock_client.py b/scripts/mock_client.py deleted file mode 100644 index db620d0..0000000 --- a/scripts/mock_client.py +++ /dev/null @@ -1,88 +0,0 @@ -import argparse -import json - -import pika - -from pyinfra.config import CONFIG, parse_disjunction_string -from pyinfra.storage.storages import get_s3_storage - - -def parse_args(): - parser = argparse.ArgumentParser() - parser.add_argument("--bucket_name", "-b", required=True) - parser.add_argument("--analysis_container", "-a", choices=["detr", "ner", "image", "dl_error"], required=True) - args = parser.parse_args() - return args - - -def read_connection_params(): - credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password) - parameters = pika.ConnectionParameters( - host=CONFIG.rabbitmq.host, - port=CONFIG.rabbitmq.port, - heartbeat=CONFIG.rabbitmq.heartbeat, - credentials=credentials, - ) - return parameters - - -def make_channel(connection) -> pika.adapters.blocking_connection.BlockingChannel: - channel = connection.channel() - channel.basic_qos(prefetch_count=1) - return channel - - -def declare_queue(channel, queue: str): - args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": CONFIG.rabbitmq.queues.dead_letter} - return channel.queue_declare(queue=queue, auto_delete=False, durable=True, arguments=args) - - -def make_connection() -> pika.BlockingConnection: - parameters = read_connection_params() - connection = pika.BlockingConnection(parameters) - return connection - - -def build_message_bodies(analyse_container_type, bucket_name): - def update_message(message_dict): - if analyse_container_type == "detr" or analyse_container_type == "image": - message_dict.update({"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "IMAGE_INFO.json.gz"}) - if analyse_container_type == "dl_error": - message_dict.update({"targetFileExtension": "no_such_file", "responseFileExtension": "IMAGE_INFO.json.gz"}) - if analyse_container_type == "ner": - message_dict.update( - {"targetFileExtension": "TEXT.json.gz", "responseFileExtension": "NER_ENTITIES.json.gz"} - ) - return message_dict - - storage = get_s3_storage() - for bucket_name, pdf_name in storage.get_all_object_names(bucket_name): - if "pdf" not in pdf_name: - continue - file_id = pdf_name.split(".")[0] - dossier_id, file_id = file_id.split("/") - message_dict = {"dossierId": dossier_id, "fileId": file_id} - update_message(message_dict) - yield json.dumps(message_dict).encode() - - -def main(args): - connection = make_connection() - channel = make_channel(connection) - declare_queue(channel, CONFIG.rabbitmq.queues.input) - declare_queue(channel, CONFIG.rabbitmq.queues.output) - - for body in build_message_bodies(args.analysis_container, args.bucket_name): - channel.basic_publish("", CONFIG.rabbitmq.queues.input, body) - print(f"Put {body} on {CONFIG.rabbitmq.queues.input}") - - for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output, inactivity_timeout=1): - if not body: - break - print(f"Received {json.loads(body)}") - channel.basic_ack(method_frame.delivery_tag) - channel.close() - - -if __name__ == "__main__": - main(parse_args()) diff --git a/src/serve.py b/src/serve.py deleted file mode 100644 index 435d632..0000000 --- a/src/serve.py +++ /dev/null @@ -1,84 +0,0 @@ -import logging -from multiprocessing import Process - -import requests -from retry import retry - -from pyinfra.config import CONFIG -from pyinfra.exceptions import AnalysisFailure, ConsumerError -from pyinfra.flask import run_probing_webserver, set_up_probing_webserver -from pyinfra.queue.consumer import Consumer -from pyinfra.queue.queue_manager.pika_queue_manager import PikaQueueManager -from pyinfra.storage.storages import get_storage -from pyinfra.utils.banner import show_banner -from pyinfra.visitor import QueueVisitor, StorageStrategy - - -def make_callback(analysis_endpoint): - def callback(message): - def perform_operation(operation): - endpoint = f"{analysis_endpoint}/{operation}" - try: - logging.debug(f"Requesting analysis from {endpoint}...") - analysis_response = requests.post(endpoint, data=message["data"]) - analysis_response.raise_for_status() - analysis_response = analysis_response.json() - logging.debug(f"Received response.") - return analysis_response - except Exception as err: - logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.") - raise AnalysisFailure() from err - - operations = message.get("operations", ["/"]) - results = map(perform_operation, operations) - result = dict(zip(operations, results)) - if list(result.keys()) == ["/"]: - result = list(result.values())[0] - return result - - return callback - - -def main(): - logger = logging.getLogger("main") - - show_banner() - - webserver = Process(target=run_probing_webserver, args=(set_up_probing_webserver(),)) - logging.info("Starting webserver...") - webserver.start() - - callback = make_callback(CONFIG.rabbitmq.callback.analysis_endpoint) - storage = get_storage(CONFIG.storage.backend) - response_strategy = StorageStrategy(storage) - visitor = QueueVisitor(storage, callback, response_strategy) - - @retry(ConsumerError, tries=3, delay=5, jitter=(1, 3)) - def consume(): - try: # RED-4049 queue manager needs to be in try scope to eventually throw Exception after connection loss. - queue_manager = PikaQueueManager(CONFIG.rabbitmq.queues.input, CONFIG.rabbitmq.queues.output) - consumer = Consumer(visitor, queue_manager) - consumer.basic_consume_and_publish() - except Exception as err: - logger.exception(err) - raise ConsumerError from err - - try: - consume() - except KeyboardInterrupt: - pass - except ConsumerError: - webserver.terminate() - raise - - webserver.join() - - -if __name__ == "__main__": - logging_level = CONFIG.service.logging_level - logging.basicConfig(level=logging_level) - logging.getLogger("pika").setLevel(logging.ERROR) - logging.getLogger("flask").setLevel(logging.ERROR) - logging.getLogger("urllib3").setLevel(logging.ERROR) - - main() diff --git a/test/config.py b/test/config.py deleted file mode 100644 index c8058f1..0000000 --- a/test/config.py +++ /dev/null @@ -1,5 +0,0 @@ -from pyinfra.config import Config -from pyinfra.locations import TEST_CONFIG_FILE - - -CONFIG = Config(TEST_CONFIG_FILE) diff --git a/test/config.yaml b/test/config.yaml deleted file mode 100644 index d404715..0000000 --- a/test/config.yaml +++ /dev/null @@ -1,25 +0,0 @@ -storage: - minio: - endpoint: "http://127.0.0.1:9000" - access_key: root - secret_key: password - region: null - - aws: - endpoint: https://s3.amazonaws.com - access_key: AKIA4QVP6D4LCDAGYGN2 - secret_key: 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 - region: $STORAGE_REGION|"eu-west-1" - - azure: - connection_string: "DefaultEndpointsProtocol=https;AccountName=iqserdevelopment;AccountKey=4imAbV9PYXaztSOMpIyAClg88bAZCXuXMGJG0GA1eIBpdh2PlnFGoRBnKqLy2YZUSTmZ3wJfC7tzfHtuC6FEhQ==;EndpointSuffix=core.windows.net" - - bucket: "pyinfra-test-bucket" - -webserver: - host: $SERVER_HOST|"127.0.0.1" # webserver address - port: $SERVER_PORT|5000 # webserver port - mode: $SERVER_MODE|production # webserver mode: {development, production} - - -mock_analysis_endpoint: "http://127.0.0.1:5000" \ No newline at end of file diff --git a/test/exploration_tests/__init__.py b/test/exploration_tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/test/exploration_tests/data_json_request_test.py b/test/exploration_tests/data_json_request_test.py deleted file mode 100644 index b53ee42..0000000 --- a/test/exploration_tests/data_json_request_test.py +++ /dev/null @@ -1,79 +0,0 @@ -import json -from operator import itemgetter - -import pytest -from flask import Flask, request, jsonify -import fpdf - - -def set_up_processing_server(): - app = Flask(__name__) - - @app.route("/ready", methods=["GET"]) - def ready(): - resp = jsonify("OK") - resp.status_code = 200 - return resp - - @app.route("/process", methods=["POST"]) - def process(): - payload = json.loads(request.json) - data = payload["data"].encode() - metadata = payload["metadata"] - - response_payload = {"metadata_type": str(type(metadata)), "data_type": str(type(data))} - - return jsonify(response_payload) - - return app - - -@pytest.fixture -def server(): - server = set_up_processing_server() - server.config.update({"TESTING": True}) - return server - - -@pytest.fixture -def client(server): - return server.test_client() - - -def test_server_ready_check(client): - response = client.get("/ready") - assert response.status_code == 200 - assert response.json == "OK" - - -@pytest.mark.parametrize("data_type", ["pdf", "bytestring"]) -def test_sending_bytes_through_json(client, data): - payload = {"data": data.decode("latin1"), "metadata": {"A": 1, "B": [2, 3]}} - - response = client.post("/process", json=json.dumps(payload)) - - response_payload = response.json - data_type, metadata_type = itemgetter("data_type", "metadata_type")(response_payload) - - assert data_type == "" - assert metadata_type == "" - - -@pytest.fixture -def pdf(): - pdf = fpdf.FPDF(unit="pt") - pdf.add_page() - - return pdf_stream(pdf) - - -def pdf_stream(pdf: fpdf.fpdf.FPDF): - return pdf.output(dest="S").encode("latin1") - - -@pytest.fixture -def data(data_type, pdf): - if data_type == "pdf": - return pdf - elif data_type == "bytestring": - return "content".encode("latin1") diff --git a/test/queue/__init__.py b/test/queue/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/test/queue/queue_manager_mock.py b/test/queue/queue_manager_mock.py deleted file mode 100644 index 063a68b..0000000 --- a/test/queue/queue_manager_mock.py +++ /dev/null @@ -1,46 +0,0 @@ -from pyinfra.queue.queue_manager.queue_manager import QueueManager, QueueHandle -from test.queue.queue_mock import QueueMock - - -def monkey_patch_queue_handle(queue) -> QueueHandle: - queue_handle = QueueHandle() - queue_handle.empty = lambda: not queue - queue_handle.to_list = lambda: list(queue) - return queue_handle - - -class QueueManagerMock(QueueManager): - def __init__(self, input_queue, output_queue): - super().__init__(QueueMock(), QueueMock()) - - def publish_request(self, request): - self._input_queue.append(request) - - def publish_response(self, message, callback): - self._output_queue.append(callback(message)) - - def pull_request(self): - return self._input_queue.popleft() - - def consume(self, **kwargs): - while self._input_queue: - yield self.pull_request() - - def consume_and_publish(self, callback): - for message in self.consume(): - self.publish_response(message, callback) - - def basic_consume_and_publish(self, callback): - raise NotImplementedError - - def clear(self): - self._input_queue.clear() - self._output_queue.clear() - - @property - def input_queue(self) -> QueueHandle: - return monkey_patch_queue_handle(self._input_queue) - - @property - def output_queue(self) -> QueueHandle: - return monkey_patch_queue_handle(self._output_queue) diff --git a/test/queue/queue_mock.py b/test/queue/queue_mock.py deleted file mode 100644 index c138f29..0000000 --- a/test/queue/queue_mock.py +++ /dev/null @@ -1,5 +0,0 @@ -from collections import deque - - -class QueueMock(deque): - pass diff --git a/test/storage/__init__.py b/test/storage/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/test/storage/adapter_mock.py b/test/storage/adapter_mock.py deleted file mode 100644 index 5443ade..0000000 --- a/test/storage/adapter_mock.py +++ /dev/null @@ -1,30 +0,0 @@ -from pyinfra.storage.adapters.adapter import StorageAdapter -from test.storage.client_mock import StorageClientMock - - -class StorageAdapterMock(StorageAdapter): - def __init__(self, client: StorageClientMock): - assert isinstance(client, StorageClientMock) - super().__init__(client=client) - self.__client = self._StorageAdapter__client - - def make_bucket(self, bucket_name): - self.__client.make_bucket(bucket_name) - - def has_bucket(self, bucket_name): - return self.__client.has_bucket(bucket_name) - - def put_object(self, bucket_name, object_name, data): - return self.__client.put_object(bucket_name, object_name, data) - - def get_object(self, bucket_name, object_name): - return self.__client.get_object(bucket_name, object_name) - - def get_all_objects(self, bucket_name): - return self.__client.get_all_objects(bucket_name) - - def clear_bucket(self, bucket_name): - return self.__client.clear_bucket(bucket_name) - - def get_all_object_names(self, bucket_name): - return self.__client.get_all_object_names(bucket_name) diff --git a/test/storage/client_mock.py b/test/storage/client_mock.py deleted file mode 100644 index c81b02b..0000000 --- a/test/storage/client_mock.py +++ /dev/null @@ -1,27 +0,0 @@ -from itertools import repeat - - -class StorageClientMock: - def __init__(self): - self.__data = {} - - def make_bucket(self, bucket_name): - self.__data[bucket_name] = {} - - def has_bucket(self, bucket_name): - return bucket_name in self.__data - - def put_object(self, bucket_name, object_name, data): - self.__data[bucket_name][object_name] = data - - def get_object(self, bucket_name, object_name): - return self.__data[bucket_name][object_name] - - def get_all_objects(self, bucket_name): - return self.__data[bucket_name].values() - - def clear_bucket(self, bucket_name): - self.__data[bucket_name] = {} - - def get_all_object_names(self, bucket_name): - return zip(repeat(bucket_name), self.__data[bucket_name]) diff --git a/test/unit_tests/__init__.py b/test/unit_tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/test/unit_tests/azure_adapter_test.py b/test/unit_tests/azure_adapter_test.py deleted file mode 100644 index 6807ffb..0000000 --- a/test/unit_tests/azure_adapter_test.py +++ /dev/null @@ -1,10 +0,0 @@ -import pytest - -from pyinfra.storage.adapters.azure import AzureStorageAdapter -from test.storage.client_mock import StorageClientMock - - -@pytest.fixture -def adapter(): - adapter = AzureStorageAdapter(StorageClientMock()) - return adapter diff --git a/test/unit_tests/config_test.py b/test/unit_tests/config_test.py deleted file mode 100644 index 5a5d610..0000000 --- a/test/unit_tests/config_test.py +++ /dev/null @@ -1,45 +0,0 @@ -import os -import tempfile - -import pytest -import yaml - -from pyinfra.config import Config, parse_disjunction_string - - -@pytest.fixture -def config_file_content(): - return {"A": [{"B": [1, 2]}, {"C": 3}, 4], "D": {"E": {"F": True}}} - - -@pytest.fixture -def config(config_file_content): - with tempfile.NamedTemporaryFile(suffix=".yaml", mode="w") as f: - yaml.dump(config_file_content, f, default_flow_style=False) - yield Config(f.name) - - -def test_dot_access_key_exists(config): - assert config.A == [{"B": [1, 2]}, {"C": 3}, 4] - assert config.D.E["F"] - - -def test_access_key_exists(config): - assert config["A"] == [{"B": [1, 2]}, {"C": 3}, 4] - assert config["A"][0] == {"B": [1, 2]} - assert config["A"][0]["B"] == [1, 2] - assert config["A"][0]["B"][0] == 1 - - -def test_dot_access_key_does_not_exists(config): - assert config.B is None - - -def test_access_key_does_not_exists(config): - assert config["B"] is None - - -def test_parse_disjunction_string(): - assert parse_disjunction_string("A|Bb|c") == "c" - os.environ["Bb"] = "d" - assert parse_disjunction_string("A|Bb|c") == "d" diff --git a/test/unit_tests/conftest.py b/test/unit_tests/conftest.py deleted file mode 100644 index 92d78cf..0000000 --- a/test/unit_tests/conftest.py +++ /dev/null @@ -1,160 +0,0 @@ -import json -import logging -import time -from unittest.mock import Mock - -import pika -import pytest -import testcontainers.compose - -from pyinfra.exceptions import UnknownClient -from pyinfra.locations import TEST_DIR, COMPOSE_PATH -from pyinfra.queue.queue_manager.pika_queue_manager import PikaQueueManager, get_connection_params -from pyinfra.queue.queue_manager.queue_manager import QueueManager -from pyinfra.storage.adapters.azure import AzureStorageAdapter -from pyinfra.storage.adapters.s3 import S3StorageAdapter -from pyinfra.storage.clients.azure import get_azure_client -from pyinfra.storage.clients.s3 import get_s3_client -from pyinfra.storage.storage import Storage -from test.config import CONFIG -from test.queue.queue_manager_mock import QueueManagerMock -from test.storage.adapter_mock import StorageAdapterMock -from test.storage.client_mock import StorageClientMock -from pyinfra.visitor import StorageStrategy, ForwardingStrategy, QueueVisitor - -logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) - - -@pytest.fixture(scope="session") -def bucket_name(): - return "pyinfra-test-bucket" - - -@pytest.fixture -def storage_data(): - with open(f"{TEST_DIR}/test_data/test_data.TEXT.json", "r") as f: - data = json.load(f) - return data - - -@pytest.fixture -def mock_response(storage_data): - response = Mock(status_code=200) - response.json.return_value = storage_data - return response - - -@pytest.fixture -def mock_payload(): - return json.dumps({"dossierId": "test", "fileId": "test"}) - - -@pytest.fixture -def mock_make_load_data(): - def load_data(payload): - return storage_data - - return load_data - - -@pytest.fixture(params=["minio", "aws"], scope="session") -def storage(client_name, bucket_name, request): - logger.debug("Setup for storage") - storage = Storage(get_adapter(client_name, request.param)) - storage.make_bucket(bucket_name) - storage.clear_bucket(bucket_name) - yield storage - logger.debug("Teardown for storage") - storage.clear_bucket(bucket_name) - - -@pytest.fixture(scope="session", autouse=True) -def docker_compose(sleep_seconds=30): - logger.info(f"Starting docker containers with {COMPOSE_PATH}/docker-compose.yml...") - compose = testcontainers.compose.DockerCompose(COMPOSE_PATH, compose_file_name="docker-compose.yml") - compose.start() - logger.info(f"Sleeping for {sleep_seconds} seconds to wait for containers to finish startup... ") - time.sleep(sleep_seconds) - yield compose - compose.stop() - - -def get_pika_connection_params(): - params = get_connection_params() - return params - - -def get_s3_params(s3_backend): - params = CONFIG.storage[s3_backend] - - return params - - -def get_adapter(client_name, s3_backend): - if client_name == "mock": - return StorageAdapterMock(StorageClientMock()) - if client_name == "azure": - return AzureStorageAdapter(get_azure_client(CONFIG.storage.azure.connection_string)) - if client_name == "s3": - return S3StorageAdapter(get_s3_client(get_s3_params(s3_backend))) - else: - raise UnknownClient(client_name) - - -def get_queue_manager(queue_manager_name) -> QueueManager: - if queue_manager_name == "mock": - return QueueManagerMock("input", "output") - if queue_manager_name == "pika": - return PikaQueueManager("input", "output", connection_params=get_pika_connection_params()) - - -@pytest.fixture(scope="session") -def queue_manager(queue_manager_name): - def close_connections(): - if queue_manager_name == "pika": - try: - queue_manager.connection.close() - except (pika.exceptions.StreamLostError, pika.exceptions.ConnectionWrongStateError, ConnectionResetError): - logger.debug("Connection was already closed when attempting to close explicitly.") - - def close_channel(): - if queue_manager_name == "pika": - try: - queue_manager.channel.close() - except pika.exceptions.ChannelWrongStateError: - logger.debug("Channel was already closed when attempting to close explicitly.") - - queue_manager = get_queue_manager(queue_manager_name) - yield queue_manager - close_connections() - close_channel() - - -@pytest.fixture(scope="session") -def callback(): - def inner(request): - return request["data"].decode() * 2 - - return inner - - -@pytest.fixture -def analysis_callback(callback): - def inner(request): - return callback(request) - - return inner - - -@pytest.fixture -def response_strategy(response_strategy_name, storage): - if response_strategy_name == "storage": - return StorageStrategy(storage) - if response_strategy_name == "forwarding": - return ForwardingStrategy() - - -@pytest.fixture() -def visitor(storage, analysis_callback, response_strategy): - return QueueVisitor(storage, analysis_callback, response_strategy) diff --git a/test/unit_tests/consumer_test.py b/test/unit_tests/consumer_test.py deleted file mode 100644 index 5874846..0000000 --- a/test/unit_tests/consumer_test.py +++ /dev/null @@ -1,126 +0,0 @@ -import gzip -import json -import logging -from operator import itemgetter - -import pytest - -from pyinfra.exceptions import ProcessingFailure -from pyinfra.queue.consumer import Consumer -from pyinfra.visitor import get_object_descriptor, ForwardingStrategy - -logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) - - -@pytest.fixture(scope="session") -def consumer(queue_manager, callback): - return Consumer(callback, queue_manager) - - -@pytest.fixture(scope="session") -def access_callback(): - return itemgetter("fileId") - - -@pytest.fixture() -def items(): - def inner(): - for i in range(3): - body = { - "dossierId": "folder", - "fileId": f"file{i}", - "targetFileExtension": "in.gz", - "responseFileExtension": "out.gz", - } - yield f"{i}".encode(), body - - return list(inner()) - - -class TestConsumer: - @pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session") - def test_consuming_empty_input_queue_does_not_put_anything_on_output_queue(self, consumer, queue_manager): - queue_manager.clear() - consumer.consume() - assert queue_manager.output_queue.empty() - - @pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session") - def test_consuming_nonempty_input_queue_puts_messages_on_output_queue_in_fifo_order( - self, consumer, queue_manager, callback - ): - def produce_items(): - return map(str, range(3)) - - def mock_visitor(callback): - def inner(data): - return callback({"data": data.encode()}) - - return inner - - callback = mock_visitor(callback) - - queue_manager.clear() - - for item in produce_items(): - queue_manager.publish_request(item) - - requests = consumer.consume() - - for _, r in zip(produce_items(), requests): - queue_manager.publish_response(r, callback) - - assert queue_manager.output_queue.to_list() == ["00", "11", "22"] - - @pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session") - @pytest.mark.parametrize("client_name", ["mock", "s3", "azure"], scope="session") - @pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session") - def test_consuming_nonempty_input_queue_with_visitor_puts_messages_on_output_queue_in_fifo_order( - self, consumer, queue_manager, visitor, bucket_name, storage, items - ): - - visitor.response_strategy = ForwardingStrategy() - - queue_manager.clear() - storage.clear_bucket(bucket_name) - - for data, message in items: - storage.put_object(**get_object_descriptor(message), data=gzip.compress(data)) - queue_manager.publish_request(message) - - requests = consumer.consume(inactivity_timeout=5) - - for itm, req in zip(items, requests): - logger.debug(f"Processing item {itm}") - queue_manager.publish_response(req, visitor) - - assert list(map(itemgetter("data"), queue_manager.output_queue.to_list())) == ["00", "11", "22"] - - @pytest.mark.parametrize("queue_manager_name", ["pika"], scope="session") - def test_message_is_republished_when_callback_raises_processing_failure_exception( - self, consumer, queue_manager, bucket_name, items - ): - class DebugError(Exception): - pass - - def callback(_): - raise ProcessingFailure() - - def reject_patch(*args, **kwargs): - raise DebugError() - - queue_manager.reject = reject_patch - - queue_manager.clear() - - for data, message in items: - queue_manager.publish_request(message) - - requests = consumer.consume() - - logger = logging.getLogger("pyinfra.queue.queue_manager.pika_queue_manager") - logger.addFilter(lambda record: False) - - with pytest.raises(DebugError): - while True: - queue_manager.publish_response(next(requests), callback) diff --git a/test/unit_tests/queue_visitor_test.py b/test/unit_tests/queue_visitor_test.py deleted file mode 100644 index c781429..0000000 --- a/test/unit_tests/queue_visitor_test.py +++ /dev/null @@ -1,38 +0,0 @@ -import gzip -import json - -import pytest - -from pyinfra.visitor import get_object_descriptor, get_response_object_descriptor - - -@pytest.fixture() -def body(): - return {"dossierId": "folder", "fileId": "file", "targetFileExtension": "in.gz", "responseFileExtension": "out.gz"} - - -@pytest.mark.parametrize("client_name", ["mock", "azure", "s3"], scope="session") -class TestVisitor: - @pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session") - def test_given_a_input_queue_message_callback_pulls_the_data_from_storage( - self, visitor, body, storage, bucket_name - ): - storage.clear_bucket(bucket_name) - storage.put_object(**get_object_descriptor(body), data=gzip.compress(b"content")) - data_received = visitor.load_data(body) - assert b"content" == data_received - - @pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session") - def test_visitor_pulls_and_processes_data(self, visitor, body, storage, bucket_name): - storage.clear_bucket(bucket_name) - storage.put_object(**get_object_descriptor(body), data=gzip.compress("2".encode())) - response_body = visitor.load_and_process(body) - assert response_body["data"] == "22" - - @pytest.mark.parametrize("response_strategy_name", ["storage"], scope="session") - def test_visitor_puts_response_on_storage(self, visitor, body, storage, bucket_name): - storage.clear_bucket(bucket_name) - storage.put_object(**get_object_descriptor(body), data=gzip.compress("2".encode())) - response_body = visitor(body) - assert "data" not in response_body - assert json.loads(gzip.decompress(storage.get_object(**get_response_object_descriptor(body))))["data"] == "22" diff --git a/test/unit_tests/storage_test.py b/test/unit_tests/storage_test.py deleted file mode 100644 index 90354d7..0000000 --- a/test/unit_tests/storage_test.py +++ /dev/null @@ -1,52 +0,0 @@ -import logging - -import pytest - -from pyinfra.exceptions import DataLoadingFailure - -logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) - - -@pytest.mark.parametrize("client_name", ["mock", "azure", "s3"], scope="session") -class TestStorage: - def test_clearing_bucket_yields_empty_bucket(self, storage, bucket_name): - storage.clear_bucket(bucket_name) - data_received = storage.get_all_objects(bucket_name) - assert not {*data_received} - - def test_getting_object_put_in_bucket_is_object(self, storage, bucket_name): - storage.clear_bucket(bucket_name) - storage.put_object(bucket_name, "file", b"content") - data_received = storage.get_object(bucket_name, "file") - assert b"content" == data_received - - def test_getting_nested_object_put_in_bucket_is_nested_object(self, storage, bucket_name): - storage.clear_bucket(bucket_name) - storage.put_object(bucket_name, "folder/file", b"content") - data_received = storage.get_object(bucket_name, "folder/file") - assert b"content" == data_received - - def test_getting_objects_put_in_bucket_are_objects(self, storage, bucket_name): - storage.clear_bucket(bucket_name) - storage.put_object(bucket_name, "file1", b"content 1") - storage.put_object(bucket_name, "folder/file2", b"content 2") - data_received = storage.get_all_objects(bucket_name) - assert {b"content 1", b"content 2"} == {*data_received} - - def test_make_bucket_produces_bucket(self, storage, bucket_name): - storage.clear_bucket(bucket_name) - storage.make_bucket(bucket_name) - assert storage.has_bucket(bucket_name) - - def test_listing_bucket_files_yields_all_files_in_bucket(self, storage, bucket_name): - storage.clear_bucket(bucket_name) - storage.put_object(bucket_name, "file1", b"content 1") - storage.put_object(bucket_name, "file2", b"content 2") - full_names_received = storage.get_all_object_names(bucket_name) - assert {(bucket_name, "file1"), (bucket_name, "file2")} == {*full_names_received} - - def test_data_loading_failure_raised_if_object_not_present(self, storage, bucket_name): - storage.clear_bucket(bucket_name) - with pytest.raises(DataLoadingFailure): - storage.get_object(bucket_name, "folder/file")