clean up config hygiene; align queue manager and storage signature
This commit is contained in:
parent
e3abf2be0f
commit
db8f617aa7
@ -50,8 +50,9 @@ class Config(object):
|
|||||||
self.storage_secret = read_from_environment("STORAGE_SECRET", "password")
|
self.storage_secret = read_from_environment("STORAGE_SECRET", "password")
|
||||||
|
|
||||||
# Connection string for Azure storage
|
# Connection string for Azure storage
|
||||||
self.storage_azureconnectionstring = read_from_environment("STORAGE_AZURECONNECTIONSTRING",
|
self.storage_azureconnectionstring = read_from_environment(
|
||||||
"DefaultEndpointsProtocol=...")
|
"STORAGE_AZURECONNECTIONSTRING", "DefaultEndpointsProtocol=..."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def get_config() -> Config:
|
def get_config() -> Config:
|
||||||
|
|||||||
@ -9,8 +9,6 @@ import pika.exceptions
|
|||||||
|
|
||||||
from pyinfra.config import get_config, Config
|
from pyinfra.config import get_config, Config
|
||||||
|
|
||||||
CONFIG = get_config()
|
|
||||||
|
|
||||||
pika_logger = logging.getLogger("pika")
|
pika_logger = logging.getLogger("pika")
|
||||||
pika_logger.setLevel(logging.WARNING)
|
pika_logger.setLevel(logging.WARNING)
|
||||||
|
|
||||||
@ -32,7 +30,7 @@ def _get_n_previous_attempts(props):
|
|||||||
|
|
||||||
|
|
||||||
class QueueManager(object):
|
class QueueManager(object):
|
||||||
def __init__(self, config: Config = CONFIG):
|
def __init__(self, config: Config):
|
||||||
connection_params = get_connection_params(config)
|
connection_params = get_connection_params(config)
|
||||||
|
|
||||||
atexit.register(self.stop_consuming)
|
atexit.register(self.stop_consuming)
|
||||||
@ -43,7 +41,7 @@ class QueueManager(object):
|
|||||||
self._channel = self._connection.channel()
|
self._channel = self._connection.channel()
|
||||||
self._channel.basic_qos(prefetch_count=1)
|
self._channel.basic_qos(prefetch_count=1)
|
||||||
|
|
||||||
args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": CONFIG.dead_letter_queue}
|
args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": config.dead_letter_queue}
|
||||||
|
|
||||||
self._input_queue = config.request_queue
|
self._input_queue = config.request_queue
|
||||||
self._output_queue = config.response_queue
|
self._output_queue = config.response_queue
|
||||||
@ -54,7 +52,7 @@ class QueueManager(object):
|
|||||||
self._consumer_token = None
|
self._consumer_token = None
|
||||||
|
|
||||||
self.logger = logging.getLogger("queue_manager")
|
self.logger = logging.getLogger("queue_manager")
|
||||||
self.logger.setLevel(CONFIG.logging_level_root)
|
self.logger.setLevel(config.logging_level_root)
|
||||||
|
|
||||||
def start_consuming(self, process_message_callback: Callable):
|
def start_consuming(self, process_message_callback: Callable):
|
||||||
callback = self._create_queue_callback(process_message_callback)
|
callback = self._create_queue_callback(process_message_callback)
|
||||||
@ -91,7 +89,8 @@ class QueueManager(object):
|
|||||||
self._channel.basic_publish("", self._output_queue, json.dumps(callback_result).encode())
|
self._channel.basic_publish("", self._output_queue, json.dumps(callback_result).encode())
|
||||||
|
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
f"Result published, acknowledging incoming message with delivery_tag {frame.delivery_tag}")
|
f"Result published, acknowledging incoming message with delivery_tag {frame.delivery_tag}"
|
||||||
|
)
|
||||||
self._channel.basic_ack(frame.delivery_tag)
|
self._channel.basic_ack(frame.delivery_tag)
|
||||||
|
|
||||||
self.logger.info(f"Message with delivery_tag {frame.delivery_tag} processed")
|
self.logger.info(f"Message with delivery_tag {frame.delivery_tag} processed")
|
||||||
|
|||||||
@ -72,5 +72,4 @@ class AzureStorageAdapter(object):
|
|||||||
|
|
||||||
|
|
||||||
def get_azure_storage(config: Config):
|
def get_azure_storage(config: Config):
|
||||||
return AzureStorageAdapter(
|
return AzureStorageAdapter(BlobServiceClient.from_connection_string(conn_str=config.storage_azureconnectionstring))
|
||||||
BlobServiceClient.from_connection_string(conn_str=config.storage_azureconnectionstring))
|
|
||||||
|
|||||||
@ -16,11 +16,14 @@ logger = logging.getLogger(CONFIG.logging_level_root)
|
|||||||
|
|
||||||
ALLOWED_CONNECTION_SCHEMES = {"http", "https"}
|
ALLOWED_CONNECTION_SCHEMES = {"http", "https"}
|
||||||
URL_VALIDATOR = re.compile(
|
URL_VALIDATOR = re.compile(
|
||||||
r"^((" +
|
r"^(("
|
||||||
r"([A-Za-z]{3,9}:(?:\/\/)?)" +
|
+ r"([A-Za-z]{3,9}:(?:\/\/)?)"
|
||||||
r"(?:[\-;:&=\+\$,\w]+@)?" + r"[A-Za-z0-9\.\-]+|(?:www\.|[\-;:&=\+\$,\w]+@)" +
|
+ r"(?:[\-;:&=\+\$,\w]+@)?"
|
||||||
r"[A-Za-z0-9\.\-]+)" + r"((?:\/[\+~%\/\.\w\-_]*)?" +
|
+ r"[A-Za-z0-9\.\-]+|(?:www\.|[\-;:&=\+\$,\w]+@)"
|
||||||
r"\??(?:[\-\+=&;%@\.\w_]*)#?(?:[\.\!\/\\\w]*))?)")
|
+ r"[A-Za-z0-9\.\-]+)"
|
||||||
|
+ r"((?:\/[\+~%\/\.\w\-_]*)?"
|
||||||
|
+ r"\??(?:[\-\+=&;%@\.\w_]*)#?(?:[\.\!\/\\\w]*))?)"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class S3StorageAdapter(object):
|
class S3StorageAdapter(object):
|
||||||
@ -79,10 +82,12 @@ def _parse_endpoint(endpoint):
|
|||||||
|
|
||||||
|
|
||||||
def get_s3_storage(config: Config):
|
def get_s3_storage(config: Config):
|
||||||
return S3StorageAdapter(Minio(
|
return S3StorageAdapter(
|
||||||
|
Minio(
|
||||||
**_parse_endpoint(config.storage_endpoint),
|
**_parse_endpoint(config.storage_endpoint),
|
||||||
access_key=config.storage_key,
|
access_key=config.storage_key,
|
||||||
secret_key=config.storage_secret,
|
secret_key=config.storage_secret,
|
||||||
# FIXME Is this still needed? Check and if yes, add it to config
|
# FIXME Is this still needed? Check and if yes, add it to config
|
||||||
# region=config.region,
|
# region=config.region,
|
||||||
))
|
)
|
||||||
|
)
|
||||||
|
|||||||
@ -4,10 +4,6 @@ from pyinfra.config import get_config, Config
|
|||||||
from pyinfra.storage.adapters.azure import get_azure_storage
|
from pyinfra.storage.adapters.azure import get_azure_storage
|
||||||
from pyinfra.storage.adapters.s3 import get_s3_storage
|
from pyinfra.storage.adapters.s3 import get_s3_storage
|
||||||
|
|
||||||
CONFIG = get_config()
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
logger.setLevel(CONFIG.logging_level_root)
|
|
||||||
|
|
||||||
|
|
||||||
def get_storage(config: Config):
|
def get_storage(config: Config):
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user