diff --git a/config.yaml b/config.yaml index 31c6571..a7f8efe 100755 --- a/config.yaml +++ b/config.yaml @@ -29,6 +29,7 @@ storage: endpoint: $STORAGE_ENDPOINT|"http://127.0.0.1:9000" access_key: $STORAGE_KEY|root secret_key: $STORAGE_SECRET|password + region: $STORAGE_REGION|"eu-west-1" azure: connection_string: $STORAGE_AZURECONNECTIONSTRING|"DefaultEndpointsProtocol=https;AccountName=iqserdevelopment;AccountKey=4imAbV9PYXaztSOMpIyAClg88bAZCXuXMGJG0GA1eIBpdh2PlnFGoRBnKqLy2YZUSTmZ3wJfC7tzfHtuC6FEhQ==;EndpointSuffix=core.windows.net" diff --git a/pyinfra/queue/queue_manager/pika_queue_manager.py b/pyinfra/queue/queue_manager/pika_queue_manager.py index 2d036f3..01f7286 100644 --- a/pyinfra/queue/queue_manager/pika_queue_manager.py +++ b/pyinfra/queue/queue_manager/pika_queue_manager.py @@ -5,7 +5,7 @@ from itertools import islice import pika from pyinfra.config import CONFIG -from pyinfra.exceptions import ProcessingFailure +from pyinfra.exceptions import ProcessingFailure, DataLoadingFailure from pyinfra.queue.queue_manager.queue_manager import QueueHandle, QueueManager from pyinfra.visitor import QueueVisitor @@ -97,7 +97,7 @@ class PikaQueueManager(QueueManager): self.channel.basic_publish("", self._input_queue, json.dumps(request).encode()) def reject(self, body, frame): - logger.exception(f"Adding to dead letter queue: {body}") + logger.error(f"Adding to dead letter queue: {body}") self.channel.basic_reject(delivery_tag=frame.delivery_tag, requeue=False) def publish_response(self, message, visitor: QueueVisitor, max_attempts=3): @@ -120,8 +120,7 @@ class PikaQueueManager(QueueManager): self.channel.basic_publish("", self._output_queue, response_message) self.channel.basic_ack(frame.delivery_tag) - - except ProcessingFailure: + except (ProcessingFailure, DataLoadingFailure): logger.error(f"Message failed to process {n_attempts}/{max_attempts} times: {body}") diff --git a/pyinfra/storage/adapters/s3.py b/pyinfra/storage/adapters/s3.py index 1a67269..2fe5dcc 100644 --- a/pyinfra/storage/adapters/s3.py +++ b/pyinfra/storage/adapters/s3.py @@ -5,6 +5,7 @@ from operator import attrgetter from minio import Minio +from pyinfra.exceptions import DataLoadingFailure from pyinfra.storage.adapters.adapter import StorageAdapter logger = logging.getLogger() @@ -34,6 +35,8 @@ class S3StorageAdapter(StorageAdapter): try: response = self.__client.get_object(bucket_name, object_name) return response.data + except Exception as err: + raise DataLoadingFailure("Failed getting object from s3 client") from err finally: if response: response.close() diff --git a/pyinfra/storage/clients/s3.py b/pyinfra/storage/clients/s3.py index 943d88c..f130989 100644 --- a/pyinfra/storage/clients/s3.py +++ b/pyinfra/storage/clients/s3.py @@ -32,4 +32,9 @@ def get_s3_client(params=None) -> Minio: if not params: params = CONFIG.storage.s3 - return Minio(**parse_endpoint(params.endpoint), access_key=params.access_key, secret_key=params.secret_key) + return Minio( + **parse_endpoint(params.endpoint), + access_key=params.access_key, + secret_key=params.secret_key, + region=params.region, + ) diff --git a/pyinfra/storage/storage.py b/pyinfra/storage/storage.py index 827c914..c93ccf7 100644 --- a/pyinfra/storage/storage.py +++ b/pyinfra/storage/storage.py @@ -1,5 +1,14 @@ +import logging + +from retry import retry + +from pyinfra.config import CONFIG +from pyinfra.exceptions import DataLoadingFailure from pyinfra.storage.adapters.adapter import StorageAdapter +logger = logging.getLogger(__name__) +logger.setLevel(CONFIG.service.logging_level) + class Storage: def __init__(self, adapter: StorageAdapter): @@ -15,7 +24,15 @@ class Storage: self.__adapter.put_object(bucket_name, object_name, data) def get_object(self, bucket_name, object_name): - return self.__adapter.get_object(bucket_name, object_name) + return self.__get_object(bucket_name, object_name) + + @retry(DataLoadingFailure, tries=3, delay=5, jitter=(1, 3)) + def __get_object(self, bucket_name, object_name): + try: + return self.__adapter.get_object(bucket_name, object_name) + except Exception as err: + logging.error(err) + raise DataLoadingFailure from err def get_all_objects(self, bucket_name): return self.__adapter.get_all_objects(bucket_name) diff --git a/pyinfra/visitor.py b/pyinfra/visitor.py index 03e83f2..5c5d14c 100644 --- a/pyinfra/visitor.py +++ b/pyinfra/visitor.py @@ -218,7 +218,7 @@ class QueueVisitor: data = self.storage.get_object(**object_descriptor) except Exception as err: logging.warning(f"Loading data from storage failed for {object_descriptor}.") - raise DataLoadingFailure() from err + raise DataLoadingFailure from err return data diff --git a/scripts/mock_client.py b/scripts/mock_client.py index c6ab80e..7b05c5d 100644 --- a/scripts/mock_client.py +++ b/scripts/mock_client.py @@ -11,7 +11,10 @@ def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--bucket_name", "-b", required=True) parser.add_argument( - "--analysis_container", "-a", choices=["detr", "ner", "image", "conversion", "extraction"], required=True + "--analysis_container", + "-a", + choices=["detr", "ner", "image", "conversion", "extraction", "dl_error"], + required=True, ) args = parser.parse_args() return args @@ -62,6 +65,8 @@ def build_message_bodies(analyse_container_type, bucket_name): message_dict.update( {"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "json.gz", "operation": "extraction"} ) + if analyse_container_type == "dl_error": + message_dict.update({"targetFileExtension": "no_such_file", "responseFileExtension": "IMAGE_INFO.json.gz"}) if analyse_container_type == "ner": message_dict.update( {"targetFileExtension": "TEXT.json.gz", "responseFileExtension": "NER_ENTITIES.json.gz"} @@ -89,7 +94,9 @@ def main(args): channel.basic_publish("", CONFIG.rabbitmq.queues.input, body) print(f"Put {body} on {CONFIG.rabbitmq.queues.input}") - for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output): + for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output, inactivity_timeout=1): + if not body: + break print(f"Received {json.loads(body)}") channel.basic_ack(method_frame.delivery_tag) channel.close() diff --git a/src/serve.py b/src/serve.py index f38b9a8..097a6f1 100644 --- a/src/serve.py +++ b/src/serve.py @@ -9,18 +9,17 @@ from pyinfra.exceptions import ConsumerError from pyinfra.flask import run_probing_webserver, set_up_probing_webserver from pyinfra.utils.banner import show_banner - logger = logging.getLogger() @retry(ConsumerError, tries=3, delay=5, jitter=(1, 3)) def consume(): - consumer = get_consumer() try: + consumer = get_consumer() consumer.basic_consume_and_publish() except Exception as err: logger.exception(err) - raise ConsumerError() from err + raise ConsumerError from err def main(): diff --git a/test/config.yaml b/test/config.yaml index 72bddfc..d404715 100644 --- a/test/config.yaml +++ b/test/config.yaml @@ -3,11 +3,13 @@ storage: endpoint: "http://127.0.0.1:9000" access_key: root secret_key: password + region: null aws: endpoint: https://s3.amazonaws.com access_key: AKIA4QVP6D4LCDAGYGN2 secret_key: 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 + region: $STORAGE_REGION|"eu-west-1" azure: connection_string: "DefaultEndpointsProtocol=https;AccountName=iqserdevelopment;AccountKey=4imAbV9PYXaztSOMpIyAClg88bAZCXuXMGJG0GA1eIBpdh2PlnFGoRBnKqLy2YZUSTmZ3wJfC7tzfHtuC6FEhQ==;EndpointSuffix=core.windows.net" diff --git a/test/exploration_tests/data_json_request_test.py b/test/exploration_tests/data_json_request_test.py index b588f91..b53ee42 100644 --- a/test/exploration_tests/data_json_request_test.py +++ b/test/exploration_tests/data_json_request_test.py @@ -1,37 +1,79 @@ -# import json -# from operator import itemgetter -# -# import pytest -# import requests -# -# -# def test_server_ready_check(url): -# response = requests.get(f"{url}/ready") -# response.raise_for_status() -# return response.status_code == 200 -# -# -# @pytest.fixture -# def client(client_maker): -# return client_maker(processor_fn) -# -# -# def processor_fn(request): -# payload = json.loads(request.json) -# data = payload["data"].encode() -# metadata = payload["metadata"] -# response_payload = {"metadata_type": str(type(metadata)), "data_type": str(type(data))} -# return response_payload -# -# -# @pytest.mark.parametrize("data_type", ["pdf", "bytestring"]) -# def test_sending_bytes_through_json(url, data): -# payload = {"data": data.decode("latin1"), "metadata": {"A": 1, "B": [2, 3]}} -# -# response = requests.post(f"{url}/process", json=json.dumps(payload)) -# -# response_payload = response.json -# data_type, metadata_type = itemgetter("data_type", "metadata_type")(response_payload) -# -# assert data_type == "" -# assert metadata_type == "" +import json +from operator import itemgetter + +import pytest +from flask import Flask, request, jsonify +import fpdf + + +def set_up_processing_server(): + app = Flask(__name__) + + @app.route("/ready", methods=["GET"]) + def ready(): + resp = jsonify("OK") + resp.status_code = 200 + return resp + + @app.route("/process", methods=["POST"]) + def process(): + payload = json.loads(request.json) + data = payload["data"].encode() + metadata = payload["metadata"] + + response_payload = {"metadata_type": str(type(metadata)), "data_type": str(type(data))} + + return jsonify(response_payload) + + return app + + +@pytest.fixture +def server(): + server = set_up_processing_server() + server.config.update({"TESTING": True}) + return server + + +@pytest.fixture +def client(server): + return server.test_client() + + +def test_server_ready_check(client): + response = client.get("/ready") + assert response.status_code == 200 + assert response.json == "OK" + + +@pytest.mark.parametrize("data_type", ["pdf", "bytestring"]) +def test_sending_bytes_through_json(client, data): + payload = {"data": data.decode("latin1"), "metadata": {"A": 1, "B": [2, 3]}} + + response = client.post("/process", json=json.dumps(payload)) + + response_payload = response.json + data_type, metadata_type = itemgetter("data_type", "metadata_type")(response_payload) + + assert data_type == "" + assert metadata_type == "" + + +@pytest.fixture +def pdf(): + pdf = fpdf.FPDF(unit="pt") + pdf.add_page() + + return pdf_stream(pdf) + + +def pdf_stream(pdf: fpdf.fpdf.FPDF): + return pdf.output(dest="S").encode("latin1") + + +@pytest.fixture +def data(data_type, pdf): + if data_type == "pdf": + return pdf + elif data_type == "bytestring": + return "content".encode("latin1") diff --git a/test/unit_tests/consumer_test.py b/test/unit_tests/consumer_test.py index 3d8f1c3..d489185 100644 --- a/test/unit_tests/consumer_test.py +++ b/test/unit_tests/consumer_test.py @@ -3,13 +3,38 @@ import logging from operator import itemgetter import pytest -from funcy import lmap, lmapcat from pyinfra.exceptions import ProcessingFailure -from pyinfra.utils.encoding import pack_for_upload +from pyinfra.queue.consumer import Consumer from pyinfra.visitor import get_object_descriptor, ForwardingStrategy -logger = logging.getLogger() +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + + +@pytest.fixture(scope="session") +def consumer(queue_manager, callback): + return Consumer(callback, queue_manager) + + +@pytest.fixture(scope="session") +def access_callback(): + return itemgetter("fileId") + + +@pytest.fixture() +def items(): + def inner(): + for i in range(3): + body = { + "dossierId": "folder", + "fileId": f"file{i}", + "targetFileExtension": "in.gz", + "responseFileExtension": "out.gz", + } + yield f"{i}".encode(), body + + return list(inner()) class TestConsumer: @@ -19,66 +44,56 @@ class TestConsumer: consumer.consume() assert queue_manager.output_queue.empty() - # @pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session") - # def test_consuming_nonempty_input_queue_puts_messages_on_output_queue_in_fifo_order( - # self, consumer, queue_manager, callback - # ): - # def produce_items(): - # return map(str, range(3)) - # - # def mock_visitor(callback): - # def inner(data): - # print("data", data) - # return callback({"data": data.encode()}) - # - # return inner - # - # callback = mock_visitor(callback) - # - # print(11111111111) - # queue_manager.clear() - # print(22222222222) - # - # for item in produce_items(): - # print("item", item) - # queue_manager.publish_request(item) - # - # requests = consumer.consume(n=3) - # print(33333333333) - # - # print(requests) - # print(list(produce_items())) - # print(44444444444) - # for _, r in zip(produce_items(), requests): - # print(1) - # queue_manager.publish_response(r, callback) - # print(55555555555) - # - # assert queue_manager.output_queue.to_list() == ["00", "11", "22"] + @pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session") + def test_consuming_nonempty_input_queue_puts_messages_on_output_queue_in_fifo_order( + self, consumer, queue_manager, callback + ): + def produce_items(): + return map(str, range(3)) - # @pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session") - # @pytest.mark.parametrize("client_name", ["mock", "s3", "azure"], scope="session") - # @pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session") - # def test_consuming_nonempty_input_queue_with_visitor_puts_messages_on_output_queue_in_fifo_order( - # self, consumer, queue_manager, visitor, bucket_name, storage, items, input_data_encoder - # ): - # - # visitor.response_strategy = ForwardingStrategy() - # - # queue_manager.clear() - # storage.clear_bucket(bucket_name) - # - # for data, message in items: - # storage.put_object(**get_object_descriptor(message), data=pack_for_upload(data)) - # queue_manager.publish_request(message) - # - # requests = consumer.consume(inactivity_timeout=5) - # - # for itm, req in zip(items, requests): - # logger.debug(f"Processing item {itm}") - # queue_manager.publish_response(req, visitor) - # - # assert lmapcat(itemgetter("data"), queue_manager.output_queue.to_list()) == ["00", "11", "22"] + def mock_visitor(callback): + def inner(data): + return callback({"data": data.encode()}) + + return inner + + callback = mock_visitor(callback) + + queue_manager.clear() + + for item in produce_items(): + queue_manager.publish_request(item) + + requests = consumer.consume() + + for _, r in zip(produce_items(), requests): + queue_manager.publish_response(r, callback) + + assert queue_manager.output_queue.to_list() == ["00", "11", "22"] + + @pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session") + @pytest.mark.parametrize("client_name", ["mock", "s3", "azure"], scope="session") + @pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session") + def test_consuming_nonempty_input_queue_with_visitor_puts_messages_on_output_queue_in_fifo_order( + self, consumer, queue_manager, visitor, bucket_name, storage, items + ): + + visitor.response_strategy = ForwardingStrategy() + + queue_manager.clear() + storage.clear_bucket(bucket_name) + + for data, message in items: + storage.put_object(**get_object_descriptor(message), data=gzip.compress(data)) + queue_manager.publish_request(message) + + requests = consumer.consume(inactivity_timeout=5) + + for itm, req in zip(items, requests): + logger.debug(f"Processing item {itm}") + queue_manager.publish_response(req, visitor) + + assert list(map(itemgetter("data"), queue_manager.output_queue.to_list())) == ["00", "11", "22"] @pytest.mark.parametrize("queue_manager_name", ["pika"], scope="session") def test_message_is_republished_when_callback_raises_processing_failure_exception( @@ -88,10 +103,10 @@ class TestConsumer: pass def callback(_): - raise ProcessingFailure + raise ProcessingFailure() def reject_patch(*args, **kwargs): - raise DebugError + raise DebugError() queue_manager.reject = reject_patch @@ -105,8 +120,6 @@ class TestConsumer: logger = logging.getLogger("pyinfra.queue.queue_manager.pika_queue_manager") logger.addFilter(lambda record: False) - # TODO: for some reason this code now interferes with the commented out tests. all tests work on their own, but - # not together. with pytest.raises(DebugError): while True: queue_manager.publish_response(next(requests), callback) diff --git a/test/unit_tests/storage_test.py b/test/unit_tests/storage_test.py index c1d2aba..b09a8be 100644 --- a/test/unit_tests/storage_test.py +++ b/test/unit_tests/storage_test.py @@ -2,7 +2,10 @@ import logging import pytest -logger = logging.getLogger() +from pyinfra.exceptions import DataLoadingFailure + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) @pytest.mark.parametrize("client_name", ["mock", "azure", "s3"], scope="session") @@ -42,3 +45,8 @@ class TestStorage: storage.put_object(bucket_name, "file2", b"content 2") full_names_received = storage.get_all_object_names(bucket_name) assert {"file1", "file2"} == {*full_names_received} + + def test_data_loading_failure_raised_if_object_not_present(self, storage, bucket_name): + storage.clear_bucket(bucket_name) + with pytest.raises(DataLoadingFailure): + storage.get_object(bucket_name, "folder/file")