feat(opentelemetry): improve readability
This commit is contained in:
parent
e0b32fa448
commit
c18475a77d
@ -5,7 +5,7 @@ from kn_utils.logging import logger
|
||||
from pyinfra.config.loader import validate_settings, get_all_validators
|
||||
from pyinfra.queue.callback import make_download_process_upload_callback, DataProcessor
|
||||
from pyinfra.queue.manager import QueueManager
|
||||
from pyinfra.utils.opentelemetry import setup_trace, instrument_pika, instrument_app
|
||||
from pyinfra.utils.opentelemetry import setup_trace, instrument_pika
|
||||
from pyinfra.webserver.prometheus import (
|
||||
add_prometheus_endpoint,
|
||||
make_prometheus_processing_time_decorator_from_settings,
|
||||
@ -13,11 +13,15 @@ from pyinfra.webserver.prometheus import (
|
||||
from pyinfra.webserver.utils import add_health_check_endpoint, create_webserver_thread_from_settings
|
||||
|
||||
|
||||
def start_queue_consumer_with_prometheus_and_health_endpoints(process_fn: DataProcessor, settings: Dynaconf):
|
||||
def start_standard_queue_consumer(
|
||||
process_fn: DataProcessor,
|
||||
settings: Dynaconf,
|
||||
app: FastAPI = None,
|
||||
):
|
||||
"""Default serving logic for research services.
|
||||
|
||||
Supplies /health, /ready and /prometheus endpoints (if enabled). The process_fn is monitored for processing time per
|
||||
call. Also traces the queue messages (if enabled).
|
||||
call. Also traces the queue messages via openTelemetry (if enabled).
|
||||
Workload is only received via queue messages. The message contains a file path to the data to be processed, which
|
||||
gets downloaded from the storage. The data and the message are then passed to the process_fn. The process_fn should
|
||||
return a json serializable object. This object is then uploaded to the storage. The response message is just the
|
||||
@ -29,7 +33,7 @@ def start_queue_consumer_with_prometheus_and_health_endpoints(process_fn: DataPr
|
||||
|
||||
logger.info(f"Starting webserver and queue consumer...")
|
||||
|
||||
app = FastAPI()
|
||||
app = app or FastAPI()
|
||||
|
||||
queue_manager = QueueManager(settings)
|
||||
|
||||
|
||||
@ -141,11 +141,10 @@ class QueueManager:
|
||||
logger.info("Processing payload in separate thread.")
|
||||
future = thread_pool_executor.submit(message_processor, unpacked_message_body)
|
||||
|
||||
# FIXME: This block is probably not necessary, but kept since the implications of removing it are
|
||||
# TODO: This block is probably not necessary, but kept since the implications of removing it are
|
||||
# unclear. Remove it in a future iteration where less changes are being made to the code base.
|
||||
while future.running():
|
||||
logger.debug("Waiting for payload processing to finish...")
|
||||
self.connection.process_data_events()
|
||||
self.connection.sleep(self.connection_sleep)
|
||||
|
||||
return future.result()
|
||||
|
||||
@ -1,7 +1,9 @@
|
||||
import time
|
||||
|
||||
from dynaconf import Dynaconf
|
||||
|
||||
from pyinfra.config.loader import load_settings, parse_args
|
||||
from pyinfra.examples import start_queue_consumer_with_prometheus_and_health_endpoints
|
||||
from pyinfra.examples import start_standard_queue_consumer
|
||||
|
||||
|
||||
def processor_mock(_data: dict, _message: dict) -> dict:
|
||||
@ -10,5 +12,6 @@ def processor_mock(_data: dict, _message: dict) -> dict:
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
settings = load_settings(parse_args().settings_path)
|
||||
start_queue_consumer_with_prometheus_and_health_endpoints(processor_mock, settings)
|
||||
arguments = parse_args()
|
||||
settings = load_settings(arguments.settings_path)
|
||||
start_standard_queue_consumer(processor_mock, settings)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user