diff --git a/pyinfra/queue/callback.py b/pyinfra/queue/callback.py index cedf686..3fa8f22 100644 --- a/pyinfra/queue/callback.py +++ b/pyinfra/queue/callback.py @@ -9,7 +9,7 @@ from pyinfra.storage.utils import download_data_as_specified_in_message, upload_ DataProcessor = Callable[[Union[dict, bytes], dict], dict] -def make_payload_processor(data_processor: DataProcessor, settings: Dynaconf): +def make_queue_message_callback(data_processor: DataProcessor, settings: Dynaconf): """Default callback for processing queue messages. Data will be downloaded from the storage as specified in the message. If a tenant id is specified, the storage will be configured to use that tenant id, otherwise the storage is configured as specified in the settings. @@ -19,7 +19,7 @@ def make_payload_processor(data_processor: DataProcessor, settings: Dynaconf): The response message is just the original message. Adapt as needed. """ - + def inner(queue_message_payload: dict) -> dict: logger.info(f"Processing payload...") @@ -33,4 +33,4 @@ def make_payload_processor(data_processor: DataProcessor, settings: Dynaconf): return queue_message_payload - return inner \ No newline at end of file + return inner diff --git a/pyinfra/queue/manager.py b/pyinfra/queue/manager.py index d91e0be..34d5313 100644 --- a/pyinfra/queue/manager.py +++ b/pyinfra/queue/manager.py @@ -172,10 +172,9 @@ class QueueManager: channel.basic_publish( "", self.output_queue, - result, + json.dumps(result).encode(), properties=pika.BasicProperties(headers=filtered_message_headers), ) - # FIXME: publish doesnt work in example script, explore, adapt, overcome logger.info(f"Published result to queue {self.output_queue}.") channel.basic_ack(delivery_tag=method.delivery_tag) diff --git a/pyinfra/webserver/utils.py b/pyinfra/webserver/utils.py index c72c40e..70a9c2b 100644 --- a/pyinfra/webserver/utils.py +++ b/pyinfra/webserver/utils.py @@ -15,4 +15,9 @@ def create_webserver_thread_from_settings(app: FastAPI, settings: Dynaconf) -> t def create_webserver_thread(app: FastAPI, port: int, host: str) -> threading.Thread: - return threading.Thread(target=lambda: uvicorn.run(app, port=port, host=host, log_level=logging.WARNING)) + """Creates a thread that runs a FastAPI webserver. Start with thread.start(), and join with thread.join(). + Note that the thread is a daemon thread, so it will be terminated when the main thread is terminated. + """ + thread = threading.Thread(target=lambda: uvicorn.run(app, port=port, host=host, log_level=logging.WARNING)) + thread.daemon = True + return thread diff --git a/scripts/start_pyinfra.py b/scripts/start_pyinfra.py index 0b1c4ec..ea959b5 100644 --- a/scripts/start_pyinfra.py +++ b/scripts/start_pyinfra.py @@ -5,7 +5,7 @@ from fastapi import FastAPI from pyinfra.config.loader import load_settings from pyinfra.monitor.prometheus import make_prometheus_processing_time_decorator_from_settings, add_prometheus_endpoint -from pyinfra.queue.callback import make_payload_processor +from pyinfra.queue.callback import make_queue_message_callback from pyinfra.queue.manager import QueueManager from pyinfra.webserver.utils import create_webserver_thread_from_settings @@ -31,11 +31,15 @@ def main(): @app.get("/ready") @app.get("/health") def check_health(): - return queue_manager.is_ready() + if queue_manager.is_ready(): + return {"status": "OK"}, 200 + else: + return {"status": "Service Unavailable"}, 503 webserver_thread = create_webserver_thread_from_settings(app, settings) webserver_thread.start() - callback = make_payload_processor(json_processor_mock, settings) + + callback = make_queue_message_callback(json_processor_mock, settings) queue_manager.start_consuming(callback)