fix message encoding for response, rename some functions

This commit is contained in:
Julius Unverfehrt 2024-01-19 08:53:36 +01:00
parent b7f860f36b
commit fbbfc553ae
4 changed files with 17 additions and 9 deletions

View File

@ -9,7 +9,7 @@ from pyinfra.storage.utils import download_data_as_specified_in_message, upload_
DataProcessor = Callable[[Union[dict, bytes], dict], dict] DataProcessor = Callable[[Union[dict, bytes], dict], dict]
def make_payload_processor(data_processor: DataProcessor, settings: Dynaconf): def make_queue_message_callback(data_processor: DataProcessor, settings: Dynaconf):
"""Default callback for processing queue messages. """Default callback for processing queue messages.
Data will be downloaded from the storage as specified in the message. If a tenant id is specified, the storage Data will be downloaded from the storage as specified in the message. If a tenant id is specified, the storage
will be configured to use that tenant id, otherwise the storage is configured as specified in the settings. will be configured to use that tenant id, otherwise the storage is configured as specified in the settings.
@ -19,7 +19,7 @@ def make_payload_processor(data_processor: DataProcessor, settings: Dynaconf):
The response message is just the original message. The response message is just the original message.
Adapt as needed. Adapt as needed.
""" """
def inner(queue_message_payload: dict) -> dict: def inner(queue_message_payload: dict) -> dict:
logger.info(f"Processing payload...") logger.info(f"Processing payload...")
@ -33,4 +33,4 @@ def make_payload_processor(data_processor: DataProcessor, settings: Dynaconf):
return queue_message_payload return queue_message_payload
return inner return inner

View File

@ -172,10 +172,9 @@ class QueueManager:
channel.basic_publish( channel.basic_publish(
"", "",
self.output_queue, self.output_queue,
result, json.dumps(result).encode(),
properties=pika.BasicProperties(headers=filtered_message_headers), properties=pika.BasicProperties(headers=filtered_message_headers),
) )
# FIXME: publish doesnt work in example script, explore, adapt, overcome
logger.info(f"Published result to queue {self.output_queue}.") logger.info(f"Published result to queue {self.output_queue}.")
channel.basic_ack(delivery_tag=method.delivery_tag) channel.basic_ack(delivery_tag=method.delivery_tag)

View File

@ -15,4 +15,9 @@ def create_webserver_thread_from_settings(app: FastAPI, settings: Dynaconf) -> t
def create_webserver_thread(app: FastAPI, port: int, host: str) -> threading.Thread: def create_webserver_thread(app: FastAPI, port: int, host: str) -> threading.Thread:
return threading.Thread(target=lambda: uvicorn.run(app, port=port, host=host, log_level=logging.WARNING)) """Creates a thread that runs a FastAPI webserver. Start with thread.start(), and join with thread.join().
Note that the thread is a daemon thread, so it will be terminated when the main thread is terminated.
"""
thread = threading.Thread(target=lambda: uvicorn.run(app, port=port, host=host, log_level=logging.WARNING))
thread.daemon = True
return thread

View File

@ -5,7 +5,7 @@ from fastapi import FastAPI
from pyinfra.config.loader import load_settings from pyinfra.config.loader import load_settings
from pyinfra.monitor.prometheus import make_prometheus_processing_time_decorator_from_settings, add_prometheus_endpoint from pyinfra.monitor.prometheus import make_prometheus_processing_time_decorator_from_settings, add_prometheus_endpoint
from pyinfra.queue.callback import make_payload_processor from pyinfra.queue.callback import make_queue_message_callback
from pyinfra.queue.manager import QueueManager from pyinfra.queue.manager import QueueManager
from pyinfra.webserver.utils import create_webserver_thread_from_settings from pyinfra.webserver.utils import create_webserver_thread_from_settings
@ -31,11 +31,15 @@ def main():
@app.get("/ready") @app.get("/ready")
@app.get("/health") @app.get("/health")
def check_health(): def check_health():
return queue_manager.is_ready() if queue_manager.is_ready():
return {"status": "OK"}, 200
else:
return {"status": "Service Unavailable"}, 503
webserver_thread = create_webserver_thread_from_settings(app, settings) webserver_thread = create_webserver_thread_from_settings(app, settings)
webserver_thread.start() webserver_thread.start()
callback = make_payload_processor(json_processor_mock, settings)
callback = make_queue_message_callback(json_processor_mock, settings)
queue_manager.start_consuming(callback) queue_manager.start_consuming(callback)