Pull request #57: Bugfix/RED-5277 investigate missing heartbeat error

Merge in RR/pyinfra from bugfix/RED-5277-investigate-missing-heartbeat-error to master

Squashed commit of the following:

commit 9e139e79e46c52014986f9afb2c6534281b55c10
Author: Viktor Seifert <viktor.seifert@iqser.com>
Date:   Wed Feb 15 14:56:44 2023 +0100

    RED-5277: Moved async processing to its own functions

commit 244a941299dbf75b254adcad8b068b2917c6bf79
Author: Francisco Schulz <Francisco.Schulz@iqser.com>
Date:   Wed Feb 15 11:26:00 2023 +0100

    Revert "only set git tag on release and master branches"

    This reverts commit 9066856d223f0646723fa1c62c444e16a9bb3ce9.

commit adb35db6fa6daf4b79263a918716c34905e8b3bc
Author: Francisco Schulz <Francisco.Schulz@iqser.com>
Date:   Wed Feb 15 11:11:07 2023 +0100

    increment version

commit 9066856d223f0646723fa1c62c444e16a9bb3ce9
Author: Francisco Schulz <Francisco.Schulz@iqser.com>
Date:   Wed Feb 15 11:10:49 2023 +0100

    only set git tag on release and master branches

commit ee11e018efdbc63a740008e7fa2415cbb12476ae
Author: Francisco Schulz <Francisco.Schulz@iqser.com>
Date:   Wed Feb 15 10:18:08 2023 +0100

    configure root logger in `__init__.py`
    only set log levels for other loggers, inherit config

commit 776399912ddf1e936138cceb2af981f27d333823
Author: Francisco Schulz <Francisco.Schulz@iqser.com>
Date:   Wed Feb 15 10:16:57 2023 +0100

    update dependency via `poetry update`

commit 804a8d9fbd1ded3e154fe9b3cafa32428522ca0f
Author: Francisco Schulz <Francisco.Schulz@iqser.com>
Date:   Wed Feb 15 10:16:25 2023 +0100

    increment version

commit cf057daed23d5f5b0f6f3a1a31e956e015e86368
Author: Francisco Schulz <Francisco.Schulz@iqser.com>
Date:   Tue Feb 14 17:59:55 2023 +0100

    update

commit 51717d85fce592b8bf38a8b5235faa04379cce1a
Author: Francisco Schulz <Francisco.Schulz@iqser.com>
Date:   Tue Feb 14 17:48:51 2023 +0100

    define sonar source

commit ace57c211a61d8e473a700da161806f882b19dc6
Author: Francisco Schulz <Francisco.Schulz@iqser.com>
Date:   Tue Feb 14 17:46:24 2023 +0100

    update plan

commit 1fcc00eb18ed692e2646873b4a233a00b5f6d93b
Author: Francisco Schulz <Francisco.Schulz@iqser.com>
Date:   Tue Feb 14 17:46:13 2023 +0100

    fix typo

commit 20b59768a68d985e1bf2fe6f93a1e6283bac5cb0
Author: Francisco Schulz <Francisco.Schulz@iqser.com>
Date:   Tue Feb 14 17:43:39 2023 +0100

    increment version

commit 8e7b4bf302b5591b2c490ad89c8a01a87c5b4741
Author: Francisco Schulz <Francisco.Schulz@iqser.com>
Date:   Tue Feb 14 17:11:59 2023 +0100

    get rid of extra logger

commit 3fd3eb255c252d1e208b88b475ec8a07c521619d
Author: Francisco Schulz <Francisco.Schulz@iqser.com>
Date:   Tue Feb 14 16:45:56 2023 +0100

    increment version

commit b0b5e5ebd94554cdafed6cff333d73a9ba08bea1
Author: Francisco Schulz <Francisco.Schulz@iqser.com>
Date:   Tue Feb 14 16:40:22 2023 +0100

    update

commit b87b3c351722d6949833c397178bc0354c754d90
Author: Francisco Schulz <Francisco.Schulz@iqser.com>
Date:   Tue Feb 14 16:38:41 2023 +0100

    fix tag issue from build

commit 73f3dcb280b6f905eeef3c69123b1252e6c934b1
Author: Francisco Schulz <Francisco.Schulz@iqser.com>
Date:   Tue Feb 14 14:21:57 2023 +0100

    add comments & update logging

commit 72a9e2c51f5bf98fc9f0803183fc1d28aaea9e35
Author: Francisco Schulz <Francisco.Schulz@iqser.com>
Date:   Tue Feb 14 12:06:09 2023 +0100

    cleanup comments

commit 587814944921f0f148e4d3c4c76d4edffff55bba
Author: Francisco Schulz <Francisco.Schulz@iqser.com>
Date:   Tue Feb 14 11:16:17 2023 +0100

    use thread executor in a `with` statement

commit 9561a6b447d98d2f0d536f63c0946d7bf1e2ca7d
Author: Francisco Schulz <Francisco.Schulz@iqser.com>
Date:   Tue Feb 14 10:42:49 2023 +0100

    fix unbound issue `callback_result` & shutdown thread executor

... and 23 more commits
This commit is contained in:
Francisco Schulz 2023-02-15 16:02:17 +01:00
parent 61efbdaffd
commit 1af171bd3f
11 changed files with 1871 additions and 1316 deletions

View File

@ -2,34 +2,41 @@
# See https://pre-commit.com/hooks.html for more hooks
exclude: ^(docs/|notebooks/|data/|src/secrets/|src/static/|src/templates/|tests)
default_language_version:
python: python3.8
python: python3.8
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.3.0
rev: v4.4.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
exclude: bamboo-specs/bamboo.yml
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
exclude: bamboo-specs/bamboo.yml
- repo: https://gitlab.com/pycqa/flake8
rev: 3.9.2
hooks:
- id: flake8
args:
- "--max-line-length=120"
- "--ignore=F401,W503"
# - repo: https://github.com/pycqa/pylint
# rev: v2.16.1
# hooks:
# - id: pylint
# args:
# ["--max-line-length=120", "--errors-only", "--ignore-imports=true", ]
- repo: https://github.com/pre-commit/mirrors-isort
rev: v5.10.1
hooks:
- id: isort
args: ["--profile", "black"]
- id: isort
args: ["--profile", "black"]
- repo: https://github.com/psf/black
rev: 22.10.0
rev: 23.1.0
hooks:
- id: black
# exclude: ^(docs/|notebooks/|data/|src/secrets/)
args:
- --line-length=120
- id: black
# exclude: ^(docs/|notebooks/|data/|src/secrets/)
args:
- --line-length=120
# - repo: local
# hooks:
# - id: system
# name: PyLint
# entry: poetry run pylint
# language: system
# exclude: ^alembic/
# files: \.py$

View File

@ -36,7 +36,7 @@ install-dev:
poetry add --dev $(pkg)
requirements:
poetry export --without-hashes --output ./src/requirements.txt
poetry export --without-hashes --output requirements.txt
update-version:
poetry version prerelease

View File

@ -1,5 +1,9 @@
package buildjob;
import static com.atlassian.bamboo.specs.builders.task.TestParserTask.createJUnitParserTask;
import java.time.LocalTime;
import com.atlassian.bamboo.specs.api.BambooSpec;
import com.atlassian.bamboo.specs.api.builders.BambooKey;
import com.atlassian.bamboo.specs.api.builders.docker.DockerConfiguration;
@ -20,8 +24,11 @@ import com.atlassian.bamboo.specs.builders.task.VcsCheckoutTask;
import com.atlassian.bamboo.specs.builders.task.CleanWorkingDirectoryTask;
import com.atlassian.bamboo.specs.builders.task.VcsTagTask;
import com.atlassian.bamboo.specs.builders.trigger.BitbucketServerTrigger;
import com.atlassian.bamboo.specs.builders.trigger.ScheduledTrigger;
import com.atlassian.bamboo.specs.model.task.InjectVariablesScope;
import com.atlassian.bamboo.specs.api.builders.Variable;
import com.atlassian.bamboo.specs.util.BambooServer;
import com.atlassian.bamboo.specs.builders.task.ScriptTask;
import com.atlassian.bamboo.specs.model.task.ScriptTaskProperties.Location;
/**
@ -31,10 +38,16 @@ import com.atlassian.bamboo.specs.model.task.ScriptTaskProperties.Location;
*/
@BambooSpec
public class PlanSpec {
private static final String REPOSITORY_KEY = "RR";
// this is the repo name
private static final String SERVICE_NAME = "pyinfra";
private static final String SERVICE_KEY = SERVICE_NAME.toUpperCase().replaceAll("-", "");
private static final String SERVICE_KEY = SERVICE_NAME.toUpperCase().replaceAll("-", "").replaceAll("_", "");
private static final String PROJECT_NAME = "RED";
private static final String PROJECT_KEY = "RED";
private static final String HOST = "nexus.iqser.com";
private static final String PORT = "5001";;
private static final String INFRA_URI = HOST + ":" + PORT + "/infra/release_build:4.5.0";
private static final String MAVEN_URI = HOST + ":" + PORT + "/infra/maven:3.6.2-jdk-13-3.0.0";
/**
* Run main to publish plan on Bamboo
@ -47,78 +60,67 @@ public class PlanSpec {
bambooServer.publish(plan);
PlanPermissions planPermission = new PlanSpec().createPlanPermission(plan.getIdentifier());
bambooServer.publish(planPermission);
Plan secPlan = new PlanSpec().createSecBuild();
bambooServer.publish(secPlan);
PlanPermissions secPlanPermission = new PlanSpec().createPlanPermission(secPlan.getIdentifier());
bambooServer.publish(secPlanPermission);
}
private PlanPermissions createPlanPermission(PlanIdentifier planIdentifier) {
Permissions permission = new Permissions()
.userPermissions("atlbamboo", PermissionType.EDIT, PermissionType.VIEW, PermissionType.ADMIN,
.userPermissions("atlbamboo", PermissionType.EDIT, PermissionType.VIEW,
PermissionType.ADMIN,
PermissionType.CLONE, PermissionType.BUILD)
.groupPermissions("research", PermissionType.EDIT, PermissionType.VIEW, PermissionType.CLONE,
.groupPermissions("research", PermissionType.EDIT, PermissionType.VIEW,
PermissionType.CLONE,
PermissionType.BUILD)
.groupPermissions("Development", PermissionType.EDIT, PermissionType.VIEW, PermissionType.CLONE,
.groupPermissions("Development", PermissionType.EDIT, PermissionType.VIEW,
PermissionType.CLONE,
PermissionType.BUILD)
.groupPermissions("QA", PermissionType.EDIT, PermissionType.VIEW, PermissionType.CLONE,
PermissionType.BUILD)
.loggedInUserPermissions(PermissionType.VIEW)
.anonymousUserPermissionView();
return new PlanPermissions(planIdentifier.getProjectKey(), planIdentifier.getPlanKey()).permissions(permission);
return new PlanPermissions(planIdentifier.getProjectKey(), planIdentifier.getPlanKey())
.permissions(permission);
}
private Project project() {
return new Project()
.name("RED")
.key(new BambooKey("RED"));
.name(PROJECT_NAME)
.key(new BambooKey(PROJECT_KEY));
}
public Plan createBuildPlan() {
return new Plan(
project(),
SERVICE_NAME, new BambooKey(SERVICE_KEY))
.description("Build for pyinfra")
.description("Build for " + SERVICE_NAME)
// .variables()
.stages(
new Stage("Sonar Stage")
new Stage("Build Stage")
.jobs(
new Job("Sonar Job", new BambooKey("SONAR"))
new Job("Build Job", new BambooKey("BUILD"))
.tasks(
new CleanWorkingDirectoryTask()
.description("Clean working directory.")
.enabled(true),
new VcsCheckoutTask()
.description("Checkout default repository.")
.checkoutItems(new CheckoutItem().defaultRepository()),
.checkoutItems(new CheckoutItem()
.defaultRepository()),
// new ScriptTask()
// .description("Set config and keys.")
// .location(Location.FILE)
// .fileFromPath(
// "bamboo-specs/src/main/resources/scripts/config-keys.sh"),
new ScriptTask()
.description("Set config and keys.")
.inlineBody("mkdir -p ~/.ssh\n" +
"echo \"${bamboo.bamboo_agent_ssh}\" | base64 -d >> ~/.ssh/id_rsa\n"
+
"echo \"host vector.iqser.com\" > ~/.ssh/config\n"
+
"echo \" user bamboo-agent\" >> ~/.ssh/config\n"
+
"chmod 600 ~/.ssh/config ~/.ssh/id_rsa"),
new ScriptTask()
.description("Run Sonarqube scan.")
.description("Tag Version.")
.location(Location.FILE)
.fileFromPath(
"bamboo-specs/src/main/resources/scripts/sonar-scan.sh")
.argument(SERVICE_NAME))
.dockerConfiguration(
new DockerConfiguration()
.image("nexus.iqser.com:5001/infra/release_build:4.2.0")
.volume("/var/run/docker.sock",
"/var/run/docker.sock"))),
new Stage("Git Stage")
.jobs(
new Job("Git Tag Job", new BambooKey("GITTAG"))
.tasks(
new VcsCheckoutTask()
.description("Checkout default repository.")
.checkoutItems(new CheckoutItem().defaultRepository()),
new ScriptTask()
.description("Build git tag.")
.location(Location.FILE)
.fileFromPath(
"bamboo-specs/src/main/resources/scripts/git-tag.sh"),
"bamboo-specs/src/main/resources/scripts/git-tag.sh")
.argument(SERVICE_NAME),
new InjectVariablesTask()
.description("Inject git tag.")
.path("git.tag")
@ -130,7 +132,9 @@ public class PlanSpec {
.defaultRepository())
.dockerConfiguration(
new DockerConfiguration()
.image("nexus.iqser.com:5001/infra/release_build:4.4.1"))),
.image(INFRA_URI)
.volume("/var/run/docker.sock",
"/var/run/docker.sock"))),
new Stage("License Stage")
.jobs(
new Job("License Job", new BambooKey("LICENSE"))
@ -138,7 +142,8 @@ public class PlanSpec {
.tasks(
new VcsCheckoutTask()
.description("Checkout default repository.")
.checkoutItems(new CheckoutItem().defaultRepository()),
.checkoutItems(new CheckoutItem()
.defaultRepository()),
new ScriptTask()
.description("Build licence.")
.location(Location.FILE)
@ -146,18 +151,60 @@ public class PlanSpec {
"bamboo-specs/src/main/resources/scripts/create-licence.sh"))
.dockerConfiguration(
new DockerConfiguration()
.image("nexus.iqser.com:5001/infra/maven:3.6.2-jdk-13-3.0.0")
.image(MAVEN_URI)
.volume("/etc/maven/settings.xml",
"/usr/share/maven/ref/settings.xml")
.volume("/var/run/docker.sock",
"/var/run/docker.sock"))))
.linkedRepositories("RR / " + SERVICE_NAME)
.triggers(new BitbucketServerTrigger())
.linkedRepositories(REPOSITORY_KEY + " / " + SERVICE_NAME)
.triggers(
new BitbucketServerTrigger())
.planBranchManagement(
new PlanBranchManagement()
.createForVcsBranch()
.delete(new BranchCleanup()
.whenInactiveInRepositoryAfterDays(14))
.delete(
new BranchCleanup()
.whenInactiveInRepositoryAfterDays(
14))
.notificationForCommitters());
}
public Plan createSecBuild() {
return new Plan(project(), SERVICE_NAME + "-Sec", new BambooKey(SERVICE_KEY + "SEC"))
.description("Security Analysis Plan")
.stages(new Stage("Default Stage").jobs(
new Job("Sonar Job", new BambooKey("SONAR"))
.enabled(true)
.tasks(
new CleanWorkingDirectoryTask()
.description("Clean working directory.")
.enabled(true),
new VcsCheckoutTask()
.description("Checkout default repository.")
.checkoutItems(new CheckoutItem()
.defaultRepository()),
new ScriptTask()
.description("Set config and keys.")
.location(Location.FILE)
.fileFromPath("bamboo-specs/src/main/resources/scripts/config-keys.sh"),
new ScriptTask()
.description("Run Sonarqube scan.")
.location(Location.FILE)
.fileFromPath("bamboo-specs/src/main/resources/scripts/sonar-scan.sh")
.argument(SERVICE_NAME))
.dockerConfiguration(
new DockerConfiguration()
.image(INFRA_URI)
.volume("/var/run/docker.sock",
"/var/run/docker.sock"))))
.linkedRepositories(REPOSITORY_KEY + " / " + SERVICE_NAME)
.triggers(
new ScheduledTrigger()
.scheduleOnceDaily(LocalTime.of(23, 00)))
.planBranchManagement(
new PlanBranchManagement()
.createForVcsBranchMatching("release.*")
.notificationForCommitters());
}
}

View File

@ -67,18 +67,18 @@ else
if check_poetry_version
then
echo "updating version number by one prerelease increment"
poetry version prerelease
poetry version $(poetry version -s)-dev
fi
fi
if [[ $bamboo_planRepository_branchName =~ ^(master|hotfix/|bugfix/|feature/) ]]
if [[ $bamboo_planRepository_branchName =~ ^(master|release/|hotfix/|bugfix/|feature/) ]]
then
newVersion=$(poetry version -s)
else
newVersion="$(poetry version -s)-dev"
newVersion="${bamboo_planRepository_1_branch}_${bamboo_buildNumber}"
fi
echo "new build on $bamboo_planRepository_branchName with version: $newVersion"
echo "NEW BUILD on $bamboo_planRepository_branchName with version: $newVersion"
echo "gitTag=$newVersion" > git.tag

View File

@ -7,24 +7,22 @@ python3 -m venv build_venv
source build_venv/bin/activate
python3 -m pip install --upgrade pip
python3 -m pip install dependency-check
python3 -m pip install docker-compose
# python3 -m pip install docker-compose
python3 -m pip install coverage
# This is disabled since there are currently no tests in this project.
# If tests are added this can be enabled again
# echo "coverage calculation"
# coverage run -m pytest
# echo "coverage report generation"
# bash run_tests.sh
# if [ ! -f reports/coverage.xml ]
# then
# exit 1
# fi
# coverage report -m
# coverage xml
SERVICE_NAME=$1
project_name="RED"
pkg_src="pyinfra"
echo "dependency-check:aggregate"
mkdir -p reports
dependency-check --enableExperimental -f JSON -f XML \
dependency-check --enableExperimental -f JSON -f HTML -f XML \
--disableAssembly -s . -o reports --project $SERVICE_NAME --exclude ".git/**" --exclude "venv/**" \
--exclude "build_venv/**" --exclude "**/__pycache__/**" --exclude "bamboo-specs/**"
@ -32,9 +30,11 @@ if [[ -z "${bamboo_repository_pr_key}" ]]
then
echo "Sonar Scan for branch: ${bamboo_planRepository_1_branch}"
/usr/bin/sonar-scanner/bin/sonar-scanner -X\
-Dsonar.projectKey=RED_$SERVICE_NAME \
-Dsonar.projectKey=${project_name}_${SERVICE_NAME} \
-Dsonar.sources=${pkg_src} \
-Dsonar.host.url=https://sonarqube.iqser.com \
-Dsonar.login=${bamboo_sonarqube_api_token_secret} \
-Dsonar.branch.name=${bamboo_planRepository_1_branch} \
-Dsonar.dependencyCheck.jsonReportPath=reports/dependency-check-report.json \
-Dsonar.dependencyCheck.xmlReportPath=reports/dependency-check-report.xml \
-Dsonar.dependencyCheck.htmlReportPath=reports/dependency-check-report.html \
@ -43,7 +43,8 @@ then
else
echo "Sonar Scan for PR with key1: ${bamboo_repository_pr_key}"
/usr/bin/sonar-scanner/bin/sonar-scanner \
-Dsonar.projectKey=RED_$SERVICE_NAME \
-Dsonar.projectKey=${project_name}_${SERVICE_NAME} \
-Dsonar.sources=${pkg_src} \
-Dsonar.host.url=https://sonarqube.iqser.com \
-Dsonar.login=${bamboo_sonarqube_api_token_secret} \
-Dsonar.pullrequest.key=${bamboo_repository_pr_key} \

2765
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,3 +1,19 @@
from pyinfra import config, k8s_probes, queue, storage
__all__ = ["k8s_probes", "queue", "storage", "config"]
CONFIG = config.get_config()
import logging
import sys
# log config
LOG_FORMAT = "%(asctime)s [%(levelname)s] - [%(filename)s -> %(funcName)s() -> %(lineno)s] : %(message)s"
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler_format = logging.Formatter(LOG_FORMAT, datefmt=DATE_FORMAT)
stream_handler.setFormatter(stream_handler_format)
logger = logging.getLogger()
logger.setLevel(CONFIG.logging_level_root)
logger.addHandler(stream_handler)

View File

@ -25,6 +25,10 @@ class Config(object):
# Controls AMQP heartbeat timeout in seconds
self.rabbitmq_heartbeat = read_from_environment("RABBITMQ_HEARTBEAT", "60")
# Controls AMQP connection sleep timer in seconds
# important for heartbeat to come through while main function runs on other thread
self.rabbitmq_connection_sleep = read_from_environment("RABBITMQ_CONNECTION_SLEEP", 5)
# Queue name for requests to the service
self.request_queue = read_from_environment("REQUEST_QUEUE", "request_queue")

View File

@ -1,4 +1,5 @@
import atexit
import concurrent.futures
import json
import logging
import signal
@ -7,14 +8,25 @@ from typing import Callable
import pika
import pika.exceptions
from pika.adapters.blocking_connection import BlockingChannel
from pyinfra.config import Config
CONFIG = Config()
pika_logger = logging.getLogger("pika")
pika_logger.setLevel(logging.WARNING)
pika_logger.setLevel(CONFIG.logging_level_root)
def get_connection_params(config: Config) -> pika.ConnectionParameters:
"""creates pika connection params from pyinfra.Config class
Args:
config (pyinfra.Config): standard pyinfra config class
Returns:
pika.ConnectionParameters: standard pika connection param object
"""
credentials = pika.PlainCredentials(username=config.rabbitmq_username, password=config.rabbitmq_password)
pika_connection_params = {
"host": config.rabbitmq_host,
@ -31,25 +43,41 @@ def _get_n_previous_attempts(props):
def token_file_name():
"""create filepath
Returns:
joblib.Path: filepath
"""
token_file_path = Path("/tmp") / "consumer_token.txt"
return token_file_path
class QueueManager(object):
class QueueManager:
"""Handle RabbitMQ message reception & delivery"""
def __init__(self, config: Config):
self.logger = logging.getLogger("queue_manager")
self.logger = logging.getLogger(__name__)
self.logger.setLevel(config.logging_level_root)
self._write_token = config.write_consumer_token == "True"
self._set_consumer_token(None)
self._connection_params = get_connection_params(config)
self._input_queue = config.request_queue
self._output_queue = config.response_queue
self._dead_letter_queue = config.dead_letter_queue
# controls how often we send out a life signal
self._heartbeat = config.rabbitmq_heartbeat
# controls for how long we only process data events (e.g. heartbeats),
# while the queue is blocked and we process the given callback function
self._connection_sleep = config.rabbitmq_connection_sleep
self._write_token = config.write_consumer_token == "True"
self._set_consumer_token(None)
self._connection_params = get_connection_params(config)
self._connection = pika.BlockingConnection(parameters=self._connection_params)
self._channel: BlockingChannel
# necessary to pods can be terminated/restarted in K8s/docker
atexit.register(self.stop_consuming)
signal.signal(signal.SIGTERM, self._handle_stop_signal)
signal.signal(signal.SIGINT, self._handle_stop_signal)
@ -65,92 +93,117 @@ class QueueManager(object):
token_file.write(text)
def _open_channel(self):
self._connection = pika.BlockingConnection(parameters=self._connection_params)
self._channel = self._connection.channel()
self._channel.basic_qos(prefetch_count=1)
args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": self._dead_letter_queue}
args = {
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": self._dead_letter_queue,
}
self._channel.queue_declare(self._input_queue, arguments=args, auto_delete=False, durable=True)
self._channel.queue_declare(self._output_queue, arguments=args, auto_delete=False, durable=True)
def _close_channel(self):
self._channel.close()
self._connection.close()
def start_consuming(self, process_message_callback: Callable):
"""consumption handling
- stanard callback handling is enforced through wrapping process_message_callback in _create_queue_callback (implements threading to support heartbeats)
- initially sets consumer token to None
- tries to
- open channels
- set consumer token to basic_consume, passing in the standard callback and input queue name
- calls pika start_consuming method on the channels
- catches all Exceptions & stops consuming + closes channels
Args:
process_message_callback (Callable): function call passed into the queue manager from implementing service
"""
callback = self._create_queue_callback(process_message_callback)
self._set_consumer_token(None)
self.logger.info("Consuming from queue")
try:
self._open_channel()
self._set_consumer_token(self._channel.basic_consume(self._input_queue, callback))
self.logger.info(f"Registered with consumer-tag: {self._consumer_token}")
self.logger.info("Registered with consumer-tag: %s", self._consumer_token)
self._channel.start_consuming()
except Exception:
self.logger.warning("An unexpected exception occurred while consuming messages. Consuming will stop.")
self.logger.error(
"An unexpected exception occurred while consuming messages. Consuming will stop.", exc_info=True
)
raise
finally:
self.stop_consuming()
self._close_channel()
self.stop_consuming() # stopping consumption also closes the channel
self._connection.close()
def stop_consuming(self):
"""stop channel consumption & reset consumer token to None"""
if self._consumer_token and self._connection:
self.logger.info(f"Cancelling subscription for consumer-tag: {self._consumer_token}")
self.logger.info("Cancelling subscription for consumer-tag %s", self._consumer_token)
self._channel.stop_consuming(self._consumer_token)
self._set_consumer_token(None)
def _handle_stop_signal(self, signal_number, _stack_frame, *args, **kwargs):
self.logger.info(f"Received signal {signal_number}")
self.logger.info("Received signal %s", signal_number)
self.stop_consuming()
def _create_queue_callback(self, process_message_callback: Callable):
def process_message_body_and_await_result(unpacked_message_body):
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread_pool_executor:
self.logger.debug("opening thread for callback")
future = thread_pool_executor.submit(process_message_callback, unpacked_message_body)
while future.running():
self.logger.debug("callback running in thread, processing data events in the meantime")
self._connection.sleep(float(self._connection_sleep))
self.logger.debug("fetching result from callback")
return future.result()
def callback(_channel, frame, properties, body):
self.logger.info(f"Received message from queue with delivery_tag {frame.delivery_tag}")
self.logger.info("Received message from queue with delivery_tag %s", frame.delivery_tag)
# Only try to process each message once.
# Requeueing will be handled by the dead-letter-exchange.
# This prevents endless retries on messages that are impossible to process.
if frame.redelivered:
self.logger.info(
f"Aborting message processing for delivery_tag {frame.delivery_tag} " f"due to it being redelivered"
"Aborting message processing for delivery_tag %s due to it being redelivered", frame.delivery_tag
)
self._channel.basic_nack(frame.delivery_tag, requeue=False)
# returns nothing, so the function stops here
return
self.logger.debug(f"Processing {(frame, properties, body)}.")
self.logger.debug("Processing (%s, %s, %s).", frame, properties, body)
try:
unpacked_message_body = json.loads(body)
callback_result = process_message_body_and_await_result(json.loads(body))
should_publish_result, callback_result = process_message_callback(unpacked_message_body)
if should_publish_result:
if callback_result is not None:
self.logger.info(
f"Processed message with delivery_tag {frame.delivery_tag}, "
f"publishing result to result-queue"
"Processed message with delivery_tag %s, publishing result to result-queue", frame.delivery_tag
)
self._channel.basic_publish("", self._output_queue, json.dumps(callback_result).encode())
self.logger.info(
f"Result published, acknowledging incoming message with delivery_tag {frame.delivery_tag}"
"Result published, acknowledging incoming message with delivery_tag %s", frame.delivery_tag
)
self._channel.basic_ack(frame.delivery_tag)
else:
self.logger.info(f"Processed message with delivery_tag {frame.delivery_tag}, declining message")
self.logger.info("Processed message with delivery_tag %s, declining message", frame.delivery_tag)
self._channel.basic_nack(frame.delivery_tag, requeue=False)
except Exception as ex:
n_attempts = _get_n_previous_attempts(properties) + 1
self.logger.warning(f"Failed to process message, {n_attempts} attempts, error: {str(ex)}")
self.logger.warning("Failed to process message, %s attempts", n_attempts, exc_info=True)
self._channel.basic_nack(frame.delivery_tag, requeue=False)
raise ex
return callback
def clear(self):
"""purge input & output queues"""
try:
self._channel.queue_purge(self._input_queue)
self._channel.queue_purge(self._output_queue)

View File

@ -1,6 +1,6 @@
[tool.poetry]
name = "pyinfra"
version = "1.1.1a7"
version = "1.1.2"
description = ""
authors = ["Francisco Schulz <francisco.schulz@iqser.com>"]
license = "All rights reseverd"
@ -20,6 +20,8 @@ funcy = "1.17"
[tool.poetry.group.dev.dependencies]
pytest = "^7.1.3"
ipykernel = "^6.16.0"
black = {version = "^23.1a1", allow-prereleases = true}
pylint = "^2.15.10"
[tool.pytest.ini_options]
minversion = "6.0"

44
requirements.txt Normal file
View File

@ -0,0 +1,44 @@
attrs==22.2.0 ; python_version >= "3.8" and python_version < "3.9"
azure-core==1.22.1 ; python_version >= "3.8" and python_version < "3.9"
azure-storage-blob==12.9.0 ; python_version >= "3.8" and python_version < "3.9"
bcrypt==4.0.1 ; python_version >= "3.8" and python_version < "3.9"
certifi==2022.12.7 ; python_version >= "3.8" and python_version < "3.9"
cffi==1.15.1 ; python_version >= "3.8" and python_version < "3.9"
charset-normalizer==3.0.1 ; python_version >= "3.8" and python_version < "3.9"
colorama==0.4.6 ; python_version >= "3.8" and python_version < "3.9" and sys_platform == "win32"
cryptography==39.0.1 ; python_version >= "3.8" and python_version < "3.9"
decorator==5.1.1 ; python_version >= "3.8" and python_version < "3.9"
deprecation==2.1.0 ; python_version >= "3.8" and python_version < "3.9"
distro==1.8.0 ; python_version >= "3.8" and python_version < "3.9"
docker-compose==1.29.2 ; python_version >= "3.8" and python_version < "3.9"
docker==6.0.1 ; python_version >= "3.8" and python_version < "3.9"
docker[ssh]==6.0.1 ; python_version >= "3.8" and python_version < "3.9"
dockerpty==0.4.1 ; python_version >= "3.8" and python_version < "3.9"
docopt==0.6.2 ; python_version >= "3.8" and python_version < "3.9"
funcy==1.17 ; python_version >= "3.8" and python_version < "3.9"
idna==3.4 ; python_version >= "3.8" and python_version < "3.9"
isodate==0.6.1 ; python_version >= "3.8" and python_version < "3.9"
jsonschema==3.2.0 ; python_version >= "3.8" and python_version < "3.9"
minio==7.1.3 ; python_version >= "3.8" and python_version < "3.9"
msrest==0.6.21 ; python_version >= "3.8" and python_version < "3.9"
oauthlib==3.2.2 ; python_version >= "3.8" and python_version < "3.9"
packaging==23.0 ; python_version >= "3.8" and python_version < "3.9"
paramiko==3.0.0 ; python_version >= "3.8" and python_version < "3.9"
pika==1.2.0 ; python_version >= "3.8" and python_version < "3.9"
py==1.11.0 ; python_version >= "3.8" and python_version < "3.9"
pycparser==2.21 ; python_version >= "3.8" and python_version < "3.9"
pynacl==1.5.0 ; python_version >= "3.8" and python_version < "3.9"
pyrsistent==0.19.3 ; python_version >= "3.8" and python_version < "3.9"
python-dotenv==0.21.1 ; python_version >= "3.8" and python_version < "3.9"
pywin32==305 ; python_version >= "3.8" and python_version < "3.9" and sys_platform == "win32"
pyyaml==5.4.1 ; python_version >= "3.8" and python_version < "3.9"
requests-oauthlib==1.3.1 ; python_version >= "3.8" and python_version < "3.9"
requests==2.28.2 ; python_version >= "3.8" and python_version < "3.9"
retry==0.9.2 ; python_version >= "3.8" and python_version < "3.9"
setuptools==67.3.1 ; python_version >= "3.8" and python_version < "3.9"
six==1.16.0 ; python_version >= "3.8" and python_version < "3.9"
testcontainers==3.4.2 ; python_version >= "3.8" and python_version < "3.9"
texttable==1.6.7 ; python_version >= "3.8" and python_version < "3.9"
urllib3==1.26.14 ; python_version >= "3.8" and python_version < "3.9"
websocket-client==0.59.0 ; python_version >= "3.8" and python_version < "3.9"
wrapt==1.14.1 ; python_version >= "3.8" and python_version < "3.9"