From c9bfc767a896126c2be2e0c6385d880bd39d0237 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Tue, 26 Apr 2022 13:12:54 +0200 Subject: [PATCH 1/6] Pull request #32: restructuring: moved test out of module scope Merge in RR/pyinfra from partial_responses to master Squashed commit of the following: commit afd67d87a6349c4b97453a12274c6ccf5e976339 Author: Matthias Bisping Date: Tue Apr 26 12:48:12 2022 +0200 updated test container dockerfile for new location of tests directory commit 37881da08ebedf0f2d0c6c2b267bdb47818a0da1 Author: Matthias Bisping Date: Tue Apr 26 12:45:12 2022 +0200 restructuring: moved test out of module scope --- Dockerfile_tests | 2 +- pyinfra/locations.py | 2 +- {pyinfra/test => test}/__init__.py | 0 {pyinfra/test => test}/config.py | 0 {pyinfra/test => test}/config.yaml | 0 {pyinfra/test => test}/exploration_tests/__init__.py | 0 .../exploration_tests/data_json_request_test.py | 0 {pyinfra/test => test}/queue/__init__.py | 0 {pyinfra/test => test}/queue/queue_manager_mock.py | 2 +- {pyinfra/test => test}/queue/queue_mock.py | 0 {pyinfra/test => test}/storage/__init__.py | 0 {pyinfra/test => test}/storage/adapter_mock.py | 2 +- {pyinfra/test => test}/storage/client_mock.py | 0 {pyinfra/test => test}/unit_tests/__init__.py | 0 {pyinfra/test => test}/unit_tests/azure_adapter_test.py | 2 +- {pyinfra/test => test}/unit_tests/config_test.py | 0 {pyinfra/test => test}/unit_tests/conftest.py | 8 ++++---- {pyinfra/test => test}/unit_tests/consumer_test.py | 0 {pyinfra/test => test}/unit_tests/queue_visitor_test.py | 0 {pyinfra/test => test}/unit_tests/storage_test.py | 0 20 files changed, 9 insertions(+), 9 deletions(-) rename {pyinfra/test => test}/__init__.py (100%) rename {pyinfra/test => test}/config.py (100%) rename {pyinfra/test => test}/config.yaml (100%) rename {pyinfra/test => test}/exploration_tests/__init__.py (100%) rename pyinfra/test/exploration_tests/request_test.py => test/exploration_tests/data_json_request_test.py (100%) rename {pyinfra/test => test}/queue/__init__.py (100%) rename {pyinfra/test => test}/queue/queue_manager_mock.py (96%) rename {pyinfra/test => test}/queue/queue_mock.py (100%) rename {pyinfra/test => test}/storage/__init__.py (100%) rename {pyinfra/test => test}/storage/adapter_mock.py (94%) rename {pyinfra/test => test}/storage/client_mock.py (100%) rename {pyinfra/test => test}/unit_tests/__init__.py (100%) rename {pyinfra/test => test}/unit_tests/azure_adapter_test.py (74%) rename {pyinfra/test => test}/unit_tests/config_test.py (100%) rename {pyinfra/test => test}/unit_tests/conftest.py (95%) rename {pyinfra/test => test}/unit_tests/consumer_test.py (100%) rename {pyinfra/test => test}/unit_tests/queue_visitor_test.py (100%) rename {pyinfra/test => test}/unit_tests/storage_test.py (100%) diff --git a/Dockerfile_tests b/Dockerfile_tests index b16eccc..fd1164f 100755 --- a/Dockerfile_tests +++ b/Dockerfile_tests @@ -16,4 +16,4 @@ COPY . . RUN python3 -m pip install -e . RUN python3 -m pip install -r requirements.txt -CMD coverage run -m pytest pyinfra/test/ -x && coverage report -m && coverage xml +CMD coverage run -m pytest test/ -x && coverage report -m && coverage xml diff --git a/pyinfra/locations.py b/pyinfra/locations.py index abd6a88..54c5a17 100644 --- a/pyinfra/locations.py +++ b/pyinfra/locations.py @@ -7,7 +7,7 @@ MODULE_DIR = Path(__file__).resolve().parents[0] PACKAGE_ROOT_DIR = MODULE_DIR.parents[0] -TEST_DIR = MODULE_DIR / "test" +TEST_DIR = PACKAGE_ROOT_DIR / "test" CONFIG_FILE = PACKAGE_ROOT_DIR / "config.yaml" diff --git a/pyinfra/test/__init__.py b/test/__init__.py similarity index 100% rename from pyinfra/test/__init__.py rename to test/__init__.py diff --git a/pyinfra/test/config.py b/test/config.py similarity index 100% rename from pyinfra/test/config.py rename to test/config.py diff --git a/pyinfra/test/config.yaml b/test/config.yaml similarity index 100% rename from pyinfra/test/config.yaml rename to test/config.yaml diff --git a/pyinfra/test/exploration_tests/__init__.py b/test/exploration_tests/__init__.py similarity index 100% rename from pyinfra/test/exploration_tests/__init__.py rename to test/exploration_tests/__init__.py diff --git a/pyinfra/test/exploration_tests/request_test.py b/test/exploration_tests/data_json_request_test.py similarity index 100% rename from pyinfra/test/exploration_tests/request_test.py rename to test/exploration_tests/data_json_request_test.py diff --git a/pyinfra/test/queue/__init__.py b/test/queue/__init__.py similarity index 100% rename from pyinfra/test/queue/__init__.py rename to test/queue/__init__.py diff --git a/pyinfra/test/queue/queue_manager_mock.py b/test/queue/queue_manager_mock.py similarity index 96% rename from pyinfra/test/queue/queue_manager_mock.py rename to test/queue/queue_manager_mock.py index 58c40a3..063a68b 100644 --- a/pyinfra/test/queue/queue_manager_mock.py +++ b/test/queue/queue_manager_mock.py @@ -1,5 +1,5 @@ from pyinfra.queue.queue_manager.queue_manager import QueueManager, QueueHandle -from pyinfra.test.queue.queue_mock import QueueMock +from test.queue.queue_mock import QueueMock def monkey_patch_queue_handle(queue) -> QueueHandle: diff --git a/pyinfra/test/queue/queue_mock.py b/test/queue/queue_mock.py similarity index 100% rename from pyinfra/test/queue/queue_mock.py rename to test/queue/queue_mock.py diff --git a/pyinfra/test/storage/__init__.py b/test/storage/__init__.py similarity index 100% rename from pyinfra/test/storage/__init__.py rename to test/storage/__init__.py diff --git a/pyinfra/test/storage/adapter_mock.py b/test/storage/adapter_mock.py similarity index 94% rename from pyinfra/test/storage/adapter_mock.py rename to test/storage/adapter_mock.py index 98f3d81..5443ade 100644 --- a/pyinfra/test/storage/adapter_mock.py +++ b/test/storage/adapter_mock.py @@ -1,5 +1,5 @@ from pyinfra.storage.adapters.adapter import StorageAdapter -from pyinfra.test.storage.client_mock import StorageClientMock +from test.storage.client_mock import StorageClientMock class StorageAdapterMock(StorageAdapter): diff --git a/pyinfra/test/storage/client_mock.py b/test/storage/client_mock.py similarity index 100% rename from pyinfra/test/storage/client_mock.py rename to test/storage/client_mock.py diff --git a/pyinfra/test/unit_tests/__init__.py b/test/unit_tests/__init__.py similarity index 100% rename from pyinfra/test/unit_tests/__init__.py rename to test/unit_tests/__init__.py diff --git a/pyinfra/test/unit_tests/azure_adapter_test.py b/test/unit_tests/azure_adapter_test.py similarity index 74% rename from pyinfra/test/unit_tests/azure_adapter_test.py rename to test/unit_tests/azure_adapter_test.py index d758668..6807ffb 100644 --- a/pyinfra/test/unit_tests/azure_adapter_test.py +++ b/test/unit_tests/azure_adapter_test.py @@ -1,7 +1,7 @@ import pytest from pyinfra.storage.adapters.azure import AzureStorageAdapter -from pyinfra.test.storage.client_mock import StorageClientMock +from test.storage.client_mock import StorageClientMock @pytest.fixture diff --git a/pyinfra/test/unit_tests/config_test.py b/test/unit_tests/config_test.py similarity index 100% rename from pyinfra/test/unit_tests/config_test.py rename to test/unit_tests/config_test.py diff --git a/pyinfra/test/unit_tests/conftest.py b/test/unit_tests/conftest.py similarity index 95% rename from pyinfra/test/unit_tests/conftest.py rename to test/unit_tests/conftest.py index 7f9be90..92d78cf 100644 --- a/pyinfra/test/unit_tests/conftest.py +++ b/test/unit_tests/conftest.py @@ -16,10 +16,10 @@ from pyinfra.storage.adapters.s3 import S3StorageAdapter from pyinfra.storage.clients.azure import get_azure_client from pyinfra.storage.clients.s3 import get_s3_client from pyinfra.storage.storage import Storage -from pyinfra.test.config import CONFIG -from pyinfra.test.queue.queue_manager_mock import QueueManagerMock -from pyinfra.test.storage.adapter_mock import StorageAdapterMock -from pyinfra.test.storage.client_mock import StorageClientMock +from test.config import CONFIG +from test.queue.queue_manager_mock import QueueManagerMock +from test.storage.adapter_mock import StorageAdapterMock +from test.storage.client_mock import StorageClientMock from pyinfra.visitor import StorageStrategy, ForwardingStrategy, QueueVisitor logger = logging.getLogger(__name__) diff --git a/pyinfra/test/unit_tests/consumer_test.py b/test/unit_tests/consumer_test.py similarity index 100% rename from pyinfra/test/unit_tests/consumer_test.py rename to test/unit_tests/consumer_test.py diff --git a/pyinfra/test/unit_tests/queue_visitor_test.py b/test/unit_tests/queue_visitor_test.py similarity index 100% rename from pyinfra/test/unit_tests/queue_visitor_test.py rename to test/unit_tests/queue_visitor_test.py diff --git a/pyinfra/test/unit_tests/storage_test.py b/test/unit_tests/storage_test.py similarity index 100% rename from pyinfra/test/unit_tests/storage_test.py rename to test/unit_tests/storage_test.py From 52acc3080e862c8400e8a6428f3c874e6f92820f Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Thu, 28 Apr 2022 15:13:21 +0200 Subject: [PATCH 2/6] Pull request #33: added aws region to config Merge in RR/pyinfra from aws_region to master Squashed commit of the following: commit 29b15be9337fb409f6a0bfbb29026baa4bbe7236 Author: Matthias Bisping Date: Thu Apr 28 15:08:43 2022 +0200 added aws region to config --- config.yaml | 3 ++- pyinfra/storage/clients/s3.py | 7 ++++++- test/config.yaml | 2 ++ test/unit_tests/conftest.py | 2 +- 4 files changed, 11 insertions(+), 3 deletions(-) diff --git a/config.yaml b/config.yaml index 31c6571..079b09a 100755 --- a/config.yaml +++ b/config.yaml @@ -23,12 +23,13 @@ rabbitmq: storage: backend: $STORAGE_BACKEND|s3 # The type of storage to use {s3, azure} - bucket: "STORAGE_BUCKET_NAME|STORAGE_AZURECONTAINERNAME|pyinfra-test-bucket" # The bucket / container to pull files specified in queue requests from + bucket: "STORAGE_BUCKET_NAME|STORAGE_AZURECONTAINERNAME|pyinfra-test-bucket2" # The bucket / container to pull files specified in queue requests from s3: endpoint: $STORAGE_ENDPOINT|"http://127.0.0.1:9000" access_key: $STORAGE_KEY|root secret_key: $STORAGE_SECRET|password + region: $STORAGE_REGION|"eu-west-1" azure: connection_string: $STORAGE_AZURECONNECTIONSTRING|"DefaultEndpointsProtocol=https;AccountName=iqserdevelopment;AccountKey=4imAbV9PYXaztSOMpIyAClg88bAZCXuXMGJG0GA1eIBpdh2PlnFGoRBnKqLy2YZUSTmZ3wJfC7tzfHtuC6FEhQ==;EndpointSuffix=core.windows.net" diff --git a/pyinfra/storage/clients/s3.py b/pyinfra/storage/clients/s3.py index 943d88c..f130989 100644 --- a/pyinfra/storage/clients/s3.py +++ b/pyinfra/storage/clients/s3.py @@ -32,4 +32,9 @@ def get_s3_client(params=None) -> Minio: if not params: params = CONFIG.storage.s3 - return Minio(**parse_endpoint(params.endpoint), access_key=params.access_key, secret_key=params.secret_key) + return Minio( + **parse_endpoint(params.endpoint), + access_key=params.access_key, + secret_key=params.secret_key, + region=params.region, + ) diff --git a/test/config.yaml b/test/config.yaml index 72bddfc..d404715 100644 --- a/test/config.yaml +++ b/test/config.yaml @@ -3,11 +3,13 @@ storage: endpoint: "http://127.0.0.1:9000" access_key: root secret_key: password + region: null aws: endpoint: https://s3.amazonaws.com access_key: AKIA4QVP6D4LCDAGYGN2 secret_key: 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 + region: $STORAGE_REGION|"eu-west-1" azure: connection_string: "DefaultEndpointsProtocol=https;AccountName=iqserdevelopment;AccountKey=4imAbV9PYXaztSOMpIyAClg88bAZCXuXMGJG0GA1eIBpdh2PlnFGoRBnKqLy2YZUSTmZ3wJfC7tzfHtuC6FEhQ==;EndpointSuffix=core.windows.net" diff --git a/test/unit_tests/conftest.py b/test/unit_tests/conftest.py index 92d78cf..c614ee0 100644 --- a/test/unit_tests/conftest.py +++ b/test/unit_tests/conftest.py @@ -28,7 +28,7 @@ logger.setLevel(logging.DEBUG) @pytest.fixture(scope="session") def bucket_name(): - return "pyinfra-test-bucket" + return "pyinfra-test-bucket2" @pytest.fixture From 0e47ba61c76696b4e2471a9929f9b4c240234dda Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Thu, 28 Apr 2022 16:29:15 +0200 Subject: [PATCH 3/6] Pull request #34: Aws region Merge in RR/pyinfra from aws_region to master Squashed commit of the following: commit 7d5513e8b2e46218fe6f030abf99cc05b3fed7f5 Author: Matthias Bisping Date: Thu Apr 28 16:08:20 2022 +0200 changed test bucket commit caba7b503edbc6147aa69f8044ac92a81e768abd Merge: e76f0ab 52acc30 Author: Matthias Bisping Date: Thu Apr 28 15:30:57 2022 +0200 Merge branch 'master' of ssh://git.iqser.com:2222/rr/pyinfra into aws_region commit e76f0ab1f29b774d9297453c82b3b5749ea4cb44 Author: Matthias Bisping Date: Thu Apr 28 15:30:19 2022 +0200 changed test bucket commit 29b15be9337fb409f6a0bfbb29026baa4bbe7236 Author: Matthias Bisping Date: Thu Apr 28 15:08:43 2022 +0200 added aws region to config --- config.yaml | 2 +- test/unit_tests/conftest.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/config.yaml b/config.yaml index 079b09a..a7f8efe 100755 --- a/config.yaml +++ b/config.yaml @@ -23,7 +23,7 @@ rabbitmq: storage: backend: $STORAGE_BACKEND|s3 # The type of storage to use {s3, azure} - bucket: "STORAGE_BUCKET_NAME|STORAGE_AZURECONTAINERNAME|pyinfra-test-bucket2" # The bucket / container to pull files specified in queue requests from + bucket: "STORAGE_BUCKET_NAME|STORAGE_AZURECONTAINERNAME|pyinfra-test-bucket" # The bucket / container to pull files specified in queue requests from s3: endpoint: $STORAGE_ENDPOINT|"http://127.0.0.1:9000" diff --git a/test/unit_tests/conftest.py b/test/unit_tests/conftest.py index c614ee0..92d78cf 100644 --- a/test/unit_tests/conftest.py +++ b/test/unit_tests/conftest.py @@ -28,7 +28,7 @@ logger.setLevel(logging.DEBUG) @pytest.fixture(scope="session") def bucket_name(): - return "pyinfra-test-bucket2" + return "pyinfra-test-bucket" @pytest.fixture From d95b4839172ee456adf614debdca58f7c3f17d1d Mon Sep 17 00:00:00 2001 From: Julius Unverfehrt Date: Mon, 2 May 2022 14:55:38 +0200 Subject: [PATCH 4/6] Pull request #35: fix: storage data access failure now triggers dead letter queue publishing Merge in RR/pyinfra from storage-data-access-failure-dead-letter-queue-fix to master Squashed commit of the following: commit 1f1fcdf0357fe7817b36eac7e369d5ef75ec5af4 Author: Julius Unverfehrt Date: Mon May 2 14:27:29 2022 +0200 fix: storage data access failure now triggers dead letter queue publishing --- pyinfra/queue/queue_manager/pika_queue_manager.py | 5 +++-- pyinfra/storage/adapters/s3.py | 3 +++ pyinfra/storage/storage.py | 9 ++++++++- pyinfra/visitor.py | 2 +- src/serve.py | 2 +- test/unit_tests/storage_test.py | 7 +++++++ 6 files changed, 23 insertions(+), 5 deletions(-) diff --git a/pyinfra/queue/queue_manager/pika_queue_manager.py b/pyinfra/queue/queue_manager/pika_queue_manager.py index 5cde19f..8274048 100644 --- a/pyinfra/queue/queue_manager/pika_queue_manager.py +++ b/pyinfra/queue/queue_manager/pika_queue_manager.py @@ -1,10 +1,11 @@ import json import logging +import time import pika from pyinfra.config import CONFIG -from pyinfra.exceptions import ProcessingFailure +from pyinfra.exceptions import ProcessingFailure, DataLoadingFailure from pyinfra.queue.queue_manager.queue_manager import QueueHandle, QueueManager logger = logging.getLogger("pika") @@ -111,7 +112,7 @@ class PikaQueueManager(QueueManager): response = json.dumps(callback(json.loads(body))) self.channel.basic_publish("", self._output_queue, response.encode()) self.channel.basic_ack(frame.delivery_tag) - except ProcessingFailure: + except (ProcessingFailure, DataLoadingFailure): logger.error(f"Message failed to process {n_attempts}/{max_attempts} times: {body}") diff --git a/pyinfra/storage/adapters/s3.py b/pyinfra/storage/adapters/s3.py index 04ecbb4..06a1f69 100644 --- a/pyinfra/storage/adapters/s3.py +++ b/pyinfra/storage/adapters/s3.py @@ -5,6 +5,7 @@ from operator import attrgetter from minio import Minio +from pyinfra.exceptions import DataLoadingFailure from pyinfra.storage.adapters.adapter import StorageAdapter logger = logging.getLogger(__name__) @@ -34,6 +35,8 @@ class S3StorageAdapter(StorageAdapter): try: response = self.__client.get_object(bucket_name, object_name) return response.data + except Exception as err: + raise DataLoadingFailure("Failed getting object from s3 client") from err finally: if response: response.close() diff --git a/pyinfra/storage/storage.py b/pyinfra/storage/storage.py index 827c914..0d50093 100644 --- a/pyinfra/storage/storage.py +++ b/pyinfra/storage/storage.py @@ -1,3 +1,4 @@ +from pyinfra.exceptions import DataLoadingFailure from pyinfra.storage.adapters.adapter import StorageAdapter @@ -15,7 +16,13 @@ class Storage: self.__adapter.put_object(bucket_name, object_name, data) def get_object(self, bucket_name, object_name): - return self.__adapter.get_object(bucket_name, object_name) + return self.__get_object(bucket_name, object_name) + + def __get_object(self, bucket_name, object_name): + try: + return self.__adapter.get_object(bucket_name, object_name) + except Exception as err: + raise DataLoadingFailure from err def get_all_objects(self, bucket_name): return self.__adapter.get_all_objects(bucket_name) diff --git a/pyinfra/visitor.py b/pyinfra/visitor.py index e49a4cd..1fbdab0 100644 --- a/pyinfra/visitor.py +++ b/pyinfra/visitor.py @@ -76,7 +76,7 @@ class QueueVisitor: return gzip.decompress(download()) except Exception as err: logging.warning(f"Loading data from storage failed for {object_descriptor}.") - raise DataLoadingFailure() from err + raise DataLoadingFailure from err def process_data(self, data, body): return self.callback({**body, "data": data}) diff --git a/src/serve.py b/src/serve.py index f670e74..2fc4f9f 100644 --- a/src/serve.py +++ b/src/serve.py @@ -59,7 +59,7 @@ def main(): try: consumer.basic_consume_and_publish() except Exception as err: - raise ConsumerError() from err + raise ConsumerError from err try: consume() diff --git a/test/unit_tests/storage_test.py b/test/unit_tests/storage_test.py index efb7b62..90354d7 100644 --- a/test/unit_tests/storage_test.py +++ b/test/unit_tests/storage_test.py @@ -2,6 +2,8 @@ import logging import pytest +from pyinfra.exceptions import DataLoadingFailure + logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -43,3 +45,8 @@ class TestStorage: storage.put_object(bucket_name, "file2", b"content 2") full_names_received = storage.get_all_object_names(bucket_name) assert {(bucket_name, "file1"), (bucket_name, "file2")} == {*full_names_received} + + def test_data_loading_failure_raised_if_object_not_present(self, storage, bucket_name): + storage.clear_bucket(bucket_name) + with pytest.raises(DataLoadingFailure): + storage.get_object(bucket_name, "folder/file") From 4f18dbc60da104f08f5eecf6c52fd3de7c787f49 Mon Sep 17 00:00:00 2001 From: Julius Unverfehrt Date: Tue, 17 May 2022 14:41:03 +0200 Subject: [PATCH 5/6] Pull request #36: RED-4049 fix: Pyinfra now retries to reconnect after AMPQ connection loss and eventually throws an error if unsuccessful, forcing container restart. Merge in RR/pyinfra from RED-4049-AMQP-reconnect to master Squashed commit of the following: commit 1baafb781c9f623faf4223043e1185fd4ecbbe0e Author: Julius Unverfehrt Date: Tue May 17 14:36:47 2022 +0200 RED-4049 fix: Pyinfra now retries to reconnect after AMPQ connection loss and eventually throws an error if unsuccessful, forcing container restart. --- src/serve.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/serve.py b/src/serve.py index 2fc4f9f..45025ab 100644 --- a/src/serve.py +++ b/src/serve.py @@ -51,12 +51,11 @@ def main(): response_strategy = StorageStrategy(storage) visitor = QueueVisitor(storage, callback, response_strategy) - queue_manager = PikaQueueManager(CONFIG.rabbitmq.queues.input, CONFIG.rabbitmq.queues.output) - @retry(ConsumerError, tries=3, delay=5, jitter=(1, 3)) def consume(): - consumer = Consumer(visitor, queue_manager) - try: + try: # RED-4049 queue manager needs to be in try scope to eventually throw Exception after connection loss. + queue_manager = PikaQueueManager(CONFIG.rabbitmq.queues.input, CONFIG.rabbitmq.queues.output) + consumer = Consumer(visitor, queue_manager) consumer.basic_consume_and_publish() except Exception as err: raise ConsumerError from err From 6d31cbe6353c1ab06e510b746de1a9d84d742a84 Mon Sep 17 00:00:00 2001 From: Julius Unverfehrt Date: Tue, 31 May 2022 13:13:18 +0200 Subject: [PATCH 6/6] Pull request #37: RED-4124 PyInfra now tries three times to download an unobtainable object before republishing message, removed unreadable stack trace print if message is put on dead letter queue since allready logged where the exeption is raised. Merge in RR/pyinfra from RED-4124-s3-retry to master Squashed commit of the following: commit 40e4dc3712fc692115b8274226430a431bf0192f Author: Julius Unverfehrt Date: Tue May 31 12:59:45 2022 +0200 RED-4124 PyInfra now tries three times to download an unobtainable object before republishing message, removed unreadable stack trace print if message is put on dead letter queue since allready logged where the exeption is raised. --- pyinfra/queue/queue_manager/pika_queue_manager.py | 2 +- pyinfra/storage/storage.py | 10 ++++++++++ scripts/mock_client.py | 8 ++++++-- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/pyinfra/queue/queue_manager/pika_queue_manager.py b/pyinfra/queue/queue_manager/pika_queue_manager.py index 8274048..5ff27be 100644 --- a/pyinfra/queue/queue_manager/pika_queue_manager.py +++ b/pyinfra/queue/queue_manager/pika_queue_manager.py @@ -97,7 +97,7 @@ class PikaQueueManager(QueueManager): self.channel.basic_publish("", self._input_queue, json.dumps(request).encode()) def reject(self, body, frame): - logger.exception(f"Adding to dead letter queue: {body}") + logger.error(f"Adding to dead letter queue: {body}") self.channel.basic_reject(delivery_tag=frame.delivery_tag, requeue=False) def publish_response(self, message, callback, max_attempts=3): diff --git a/pyinfra/storage/storage.py b/pyinfra/storage/storage.py index 0d50093..c93ccf7 100644 --- a/pyinfra/storage/storage.py +++ b/pyinfra/storage/storage.py @@ -1,6 +1,14 @@ +import logging + +from retry import retry + +from pyinfra.config import CONFIG from pyinfra.exceptions import DataLoadingFailure from pyinfra.storage.adapters.adapter import StorageAdapter +logger = logging.getLogger(__name__) +logger.setLevel(CONFIG.service.logging_level) + class Storage: def __init__(self, adapter: StorageAdapter): @@ -18,10 +26,12 @@ class Storage: def get_object(self, bucket_name, object_name): return self.__get_object(bucket_name, object_name) + @retry(DataLoadingFailure, tries=3, delay=5, jitter=(1, 3)) def __get_object(self, bucket_name, object_name): try: return self.__adapter.get_object(bucket_name, object_name) except Exception as err: + logging.error(err) raise DataLoadingFailure from err def get_all_objects(self, bucket_name): diff --git a/scripts/mock_client.py b/scripts/mock_client.py index 0a97e03..db620d0 100644 --- a/scripts/mock_client.py +++ b/scripts/mock_client.py @@ -10,7 +10,7 @@ from pyinfra.storage.storages import get_s3_storage def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--bucket_name", "-b", required=True) - parser.add_argument("--analysis_container", "-a", choices=["detr", "ner", "image"], required=True) + parser.add_argument("--analysis_container", "-a", choices=["detr", "ner", "image", "dl_error"], required=True) args = parser.parse_args() return args @@ -47,6 +47,8 @@ def build_message_bodies(analyse_container_type, bucket_name): def update_message(message_dict): if analyse_container_type == "detr" or analyse_container_type == "image": message_dict.update({"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "IMAGE_INFO.json.gz"}) + if analyse_container_type == "dl_error": + message_dict.update({"targetFileExtension": "no_such_file", "responseFileExtension": "IMAGE_INFO.json.gz"}) if analyse_container_type == "ner": message_dict.update( {"targetFileExtension": "TEXT.json.gz", "responseFileExtension": "NER_ENTITIES.json.gz"} @@ -74,7 +76,9 @@ def main(args): channel.basic_publish("", CONFIG.rabbitmq.queues.input, body) print(f"Put {body} on {CONFIG.rabbitmq.queues.input}") - for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output): + for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output, inactivity_timeout=1): + if not body: + break print(f"Received {json.loads(body)}") channel.basic_ack(method_frame.delivery_tag) channel.close()