Compare commits
2 Commits
master
...
release/3.
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8d2a9240d1 | ||
|
|
9f39be7077 |
@ -25,6 +25,10 @@ class Config(object):
|
|||||||
# Controls AMQP heartbeat timeout in seconds
|
# Controls AMQP heartbeat timeout in seconds
|
||||||
self.rabbitmq_heartbeat = read_from_environment("RABBITMQ_HEARTBEAT", "60")
|
self.rabbitmq_heartbeat = read_from_environment("RABBITMQ_HEARTBEAT", "60")
|
||||||
|
|
||||||
|
# Controls AMQP connection sleep timer in seconds
|
||||||
|
# important for heartbeat to come through while main function runs on other thread
|
||||||
|
self.rabbitmq_connection_sleep = read_from_environment("RABBITMQ_CONNECTION_SLEEP", 5)
|
||||||
|
|
||||||
# Queue name for requests to the service
|
# Queue name for requests to the service
|
||||||
self.request_queue = read_from_environment("REQUEST_QUEUE", "request_queue")
|
self.request_queue = read_from_environment("REQUEST_QUEUE", "request_queue")
|
||||||
|
|
||||||
|
|||||||
@ -4,6 +4,7 @@ import logging
|
|||||||
import signal
|
import signal
|
||||||
from typing import Callable
|
from typing import Callable
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
import concurrent.futures
|
||||||
|
|
||||||
import pika
|
import pika
|
||||||
import pika.exceptions
|
import pika.exceptions
|
||||||
@ -46,6 +47,10 @@ class QueueManager(object):
|
|||||||
|
|
||||||
self._connection_params = get_connection_params(config)
|
self._connection_params = get_connection_params(config)
|
||||||
|
|
||||||
|
# controls for how long we only process data events (e.g. heartbeats),
|
||||||
|
# while the queue is blocked and we process the given callback function
|
||||||
|
self._connection_sleep = config.rabbitmq_connection_sleep
|
||||||
|
|
||||||
self._input_queue = config.request_queue
|
self._input_queue = config.request_queue
|
||||||
self._output_queue = config.response_queue
|
self._output_queue = config.response_queue
|
||||||
self._dead_letter_queue = config.dead_letter_queue
|
self._dead_letter_queue = config.dead_letter_queue
|
||||||
@ -110,6 +115,18 @@ class QueueManager(object):
|
|||||||
self.stop_consuming()
|
self.stop_consuming()
|
||||||
|
|
||||||
def _create_queue_callback(self, process_message_callback: Callable):
|
def _create_queue_callback(self, process_message_callback: Callable):
|
||||||
|
def process_message_body_and_await_result(unpacked_message_body):
|
||||||
|
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread_pool_executor:
|
||||||
|
self.logger.debug("opening thread for callback")
|
||||||
|
future = thread_pool_executor.submit(process_message_callback, unpacked_message_body)
|
||||||
|
|
||||||
|
while future.running():
|
||||||
|
self.logger.debug("callback running in thread, processing data events in the meantime")
|
||||||
|
self._connection.sleep(float(self._connection_sleep))
|
||||||
|
|
||||||
|
self.logger.debug("fetching result from callback")
|
||||||
|
return future.result()
|
||||||
|
|
||||||
def callback(_channel, frame, properties, body):
|
def callback(_channel, frame, properties, body):
|
||||||
self.logger.info(f"Received message from queue with delivery_tag {frame.delivery_tag}")
|
self.logger.info(f"Received message from queue with delivery_tag {frame.delivery_tag}")
|
||||||
|
|
||||||
@ -126,7 +143,7 @@ class QueueManager(object):
|
|||||||
try:
|
try:
|
||||||
unpacked_message_body = json.loads(body)
|
unpacked_message_body = json.loads(body)
|
||||||
|
|
||||||
should_publish_result, callback_result = process_message_callback(unpacked_message_body)
|
should_publish_result, callback_result = process_message_body_and_await_result(unpacked_message_body)
|
||||||
|
|
||||||
if should_publish_result:
|
if should_publish_result:
|
||||||
self.logger.info(f"Processed message with delivery_tag {frame.delivery_tag}, "
|
self.logger.info(f"Processed message with delivery_tag {frame.delivery_tag}, "
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user