Compare commits

...

4 Commits

Author SHA1 Message Date
francisco.schulz
86840ecaca update for version 1.3.2 2023-06-14 15:10:09 +02:00
francisco.schulz
225aa1f4ad use poetry 2023-06-14 09:54:34 +02:00
Francisco Schulz
8d2a9240d1 Pull request #60: RED-5277: add multi-threading, update config
Merge in RR/pyinfra from RED-5277-backport-REDv3.6 to release/3.6.x

* commit '9f39be70776c486b3387f96b17cd019a46fe2858':
  RED-5277: add multi-threading, update config
2023-02-22 10:56:40 +01:00
Francisco Schulz
9f39be7077 RED-5277: add multi-threading, update config 2023-02-22 10:54:08 +01:00
4 changed files with 1982 additions and 1 deletions

1919
poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -25,6 +25,10 @@ class Config(object):
# Controls AMQP heartbeat timeout in seconds
self.rabbitmq_heartbeat = read_from_environment("RABBITMQ_HEARTBEAT", "60")
# Controls AMQP connection sleep timer in seconds
# important for heartbeat to come through while main function runs on other thread
self.rabbitmq_connection_sleep = read_from_environment("RABBITMQ_CONNECTION_SLEEP", 5)
# Queue name for requests to the service
self.request_queue = read_from_environment("REQUEST_QUEUE", "request_queue")

View File

@ -4,6 +4,7 @@ import logging
import signal
from typing import Callable
from pathlib import Path
import concurrent.futures
import pika
import pika.exceptions
@ -46,6 +47,10 @@ class QueueManager(object):
self._connection_params = get_connection_params(config)
# controls for how long we only process data events (e.g. heartbeats),
# while the queue is blocked and we process the given callback function
self._connection_sleep = config.rabbitmq_connection_sleep
self._input_queue = config.request_queue
self._output_queue = config.response_queue
self._dead_letter_queue = config.dead_letter_queue
@ -110,6 +115,18 @@ class QueueManager(object):
self.stop_consuming()
def _create_queue_callback(self, process_message_callback: Callable):
def process_message_body_and_await_result(unpacked_message_body):
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread_pool_executor:
self.logger.debug("opening thread for callback")
future = thread_pool_executor.submit(process_message_callback, unpacked_message_body)
while future.running():
self.logger.debug("callback running in thread, processing data events in the meantime")
self._connection.sleep(float(self._connection_sleep))
self.logger.debug("fetching result from callback")
return future.result()
def callback(_channel, frame, properties, body):
self.logger.info(f"Received message from queue with delivery_tag {frame.delivery_tag}")
@ -126,7 +143,7 @@ class QueueManager(object):
try:
unpacked_message_body = json.loads(body)
should_publish_result, callback_result = process_message_callback(unpacked_message_body)
should_publish_result, callback_result = process_message_body_and_await_result(unpacked_message_body)
if should_publish_result:
self.logger.info(f"Processed message with delivery_tag {frame.delivery_tag}, "

41
pyproject.toml Normal file
View File

@ -0,0 +1,41 @@
[tool.poetry]
name = "pyinfra"
version = "1.3.2"
description = ""
authors = ["Francisco Schulz <francisco.schulz@iqser.com>"]
license = "All rights reseverd"
readme = "README.md"
[tool.poetry.dependencies]
python = "~3.8"
pika = "^1.2.0"
retry = "^0.9.2"
minio = "^7.1.3"
azure-core = "^1.22.1"
azure-storage-blob = "^12.9.0"
testcontainers = "^3.4.2"
docker-compose = "^1.29.2"
funcy = "^1.17"
prometheus-client = "^0.16.0"
[tool.poetry.group.dev.dependencies]
pytest = "^7.1.3"
ipykernel = "^6.16.0"
black = {version = "^23.1a1", allow-prereleases = true}
pylint = "^2.15.10"
coverage = "^7.2.0"
requests = "^2.28.2"
[tool.pytest.ini_options]
minversion = "6.0"
addopts = "-ra -q"
testpaths = [
"tests",
"integration",
]
log_cli = 1
log_cli_level = "DEBUG"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"