From 3c4739ad8bf7b2f753fd2b20eebe60271996a6e3 Mon Sep 17 00:00:00 2001 From: Julius Unverfehrt Date: Mon, 13 Mar 2023 15:11:25 +0100 Subject: [PATCH] Pull request #63: RED-6366 refactor Merge in RR/pyinfra from RED-6366-refactor to master Squashed commit of the following: commit 8807cda514b5cc24b1be208173283275d87dcb97 Author: Julius Unverfehrt Date: Fri Mar 10 13:15:15 2023 +0100 enable docker-compose autouse for automatic tests commit c4579581d3e9a885ef387ee97f3f3a5cf4731193 Author: Julius Unverfehrt Date: Fri Mar 10 12:35:49 2023 +0100 black commit ac2b754c5624ef37ce310fce7196c9ea11bbca03 Author: Julius Unverfehrt Date: Fri Mar 10 12:30:23 2023 +0100 refactor storage url parsing - move parsing and validation to config where the connection url is actually read in - improve readability of parsing fn commit 371802cc10b6d946c4939ff6839571002a2cb9f4 Author: Julius Unverfehrt Date: Fri Mar 10 10:48:00 2023 +0100 refactor commit e8c381c29deebf663e665920752c2965d7abce16 Author: Julius Unverfehrt Date: Fri Mar 10 09:57:34 2023 +0100 rename commit c8628a509316a651960dfa806d5fe6aacb7a91c1 Author: Julius Unverfehrt Date: Fri Mar 10 09:37:01 2023 +0100 renaming and refactoring commit 4974d4f56fd73bc55bd76aa7a9bbb16babee19f4 Author: Julius Unverfehrt Date: Fri Mar 10 08:53:09 2023 +0100 refactor payload processor - limit make_uploader and make_downloader cache - partially apply them when the class is initialized with storage and bucket to make the logic and behaviour more comprehensive - renaming functional pipeline steps to be more expressive commit f8d51bfcad2b815c8293ab27dd66b256255c5414 Author: Julius Unverfehrt Date: Thu Mar 9 15:30:32 2023 +0100 remove monitor and rename Payload commit 412ddaa207a08aff1229d7acd5d95402ac8cd578 Author: Julius Unverfehrt Date: Thu Mar 2 10:15:39 2023 +0100 remove azure connection string and disable respective test for now for security reasons commit 7922a2d9d325f3b9008ad4e3e56b241ba179f52c Author: Julius Unverfehrt Date: Wed Mar 1 13:30:58 2023 +0100 make payload formatting function names more expressive commit 7517e544b0f5a434579cc9bada3a37e7ac04059f Author: Julius Unverfehrt Date: Wed Mar 1 13:24:57 2023 +0100 add some type hints commit 095410d3009f2dcbd374680dd0f7b55de94c9e76 Author: Matthias Bisping Date: Wed Mar 1 10:54:58 2023 +0100 Refactoring - Renaming - Docstring adjustments commit e992f0715fc2636eb13eb5ffc4de0bcc5d433fc8 Author: Matthias Bisping Date: Wed Mar 1 09:43:26 2023 +0100 Re-wording and typo fixes commit 3c2d698f9bf980bc4b378a44dc20c2badc407b3e Author: Julius Unverfehrt Date: Tue Feb 28 14:59:59 2023 +0100 enable auto startup for docker compose in tests commit 55773b4fb0b624ca4745e5b8aeafa6f6a0ae6436 Author: Julius Unverfehrt Date: Tue Feb 28 14:59:37 2023 +0100 Extended tests for queue manager commit 14f7f943f60b9bfb9fe77fa3cef99a1e7d094333 Author: Julius Unverfehrt Date: Tue Feb 28 13:39:00 2023 +0100 enable auto startup for docker compose in tests commit 7caf354491c84c6e0b0e09ad4d41cb5dfbfdb225 Merge: 49d47ba d0277b8 Author: Julius Unverfehrt Date: Tue Feb 28 13:32:52 2023 +0100 Merge branch 'RED-6205-prometheus' of ssh://git.iqser.com:2222/rr/pyinfra into RED-6205-prometheus commit 49d47baba8ccf11dee48a4c1cbddc3bbd12471e5 Author: Julius Unverfehrt Date: Tue Feb 28 13:32:42 2023 +0100 adjust Payload Processor signature commit d0277b86bc54994b6032774bf0ec2d7b19d7f517 Merge: 5184a18 f6b35d6 Author: Christoph Schabert Date: Tue Feb 28 11:07:16 2023 +0100 Pull request #61: Change Sec Trigger to PR Merge in RR/pyinfra from cschabert/PlanSpecjava-1677578703647 to RED-6205-prometheus * commit 'f6b35d648c88ddbce1856445c3b887bce669265c': Change Sec Trigger to PR commit f6b35d648c88ddbce1856445c3b887bce669265c Author: Christoph Schabert Date: Tue Feb 28 11:05:13 2023 +0100 Change Sec Trigger to PR ... and 20 more commits --- README.md | 96 ++++++++++---- .../src/main/java/buildjob/PlanSpec.java | 9 +- .../src/main/resources/scripts/config-keys.sh | 0 poetry.lock | 121 +++++++++++++++++- pyinfra/config.py | 21 ++- pyinfra/exception.py | 2 + pyinfra/payload_processing/__init__.py | 3 + pyinfra/payload_processing/payload.py | 56 ++++++++ pyinfra/payload_processing/processor.py | 80 ++++++++++++ pyinfra/queue/development_queue_manager.py | 40 ++++++ pyinfra/queue/queue_manager.py | 85 ++++++------ pyinfra/storage/__init__.py | 3 +- pyinfra/storage/storage.py | 44 ++++++- .../{adapters => storages}/__init__.py | 0 .../storage/{adapters => storages}/azure.py | 5 +- pyinfra/storage/storages/interface.py | 35 +++++ pyinfra/storage/{adapters => storages}/s3.py | 39 +----- pyinfra/utils/compressing.py | 22 ++++ pyinfra/utils/encoding.py | 28 ++++ pyinfra/utils/url_parsing.py | 40 ++++++ pyproject.toml | 4 + scripts/mock_process_request.py | 100 --------------- scripts/send_request.py | 72 +++++++++++ scripts/start_pyinfra.py | 41 +----- scripts/usage_pyinfra_dev_scripts.md | 18 --- tests/config.yml | 79 ------------ tests/conftest.py | 112 +++++++++++++--- {scripts => tests}/docker-compose.yml | 0 tests/processing_test.py | 69 ++++++++++ tests/queue_test.py | 120 +++++++++++++++++ tests/storage_test.py | 55 ++++++++ tests/test_storage.py | 5 - 32 files changed, 1028 insertions(+), 376 deletions(-) mode change 100644 => 100755 bamboo-specs/src/main/resources/scripts/config-keys.sh create mode 100644 pyinfra/exception.py create mode 100644 pyinfra/payload_processing/__init__.py create mode 100644 pyinfra/payload_processing/payload.py create mode 100644 pyinfra/payload_processing/processor.py create mode 100644 pyinfra/queue/development_queue_manager.py rename pyinfra/storage/{adapters => storages}/__init__.py (100%) rename pyinfra/storage/{adapters => storages}/azure.py (94%) create mode 100644 pyinfra/storage/storages/interface.py rename pyinfra/storage/{adapters => storages}/s3.py (67%) create mode 100644 pyinfra/utils/compressing.py create mode 100644 pyinfra/utils/encoding.py create mode 100644 pyinfra/utils/url_parsing.py delete mode 100644 scripts/mock_process_request.py create mode 100644 scripts/send_request.py delete mode 100644 scripts/usage_pyinfra_dev_scripts.md delete mode 100644 tests/config.yml rename {scripts => tests}/docker-compose.yml (100%) create mode 100644 tests/processing_test.py create mode 100644 tests/queue_test.py create mode 100644 tests/storage_test.py delete mode 100644 tests/test_storage.py diff --git a/README.md b/README.md index f0fb0f4..c1b0181 100755 --- a/README.md +++ b/README.md @@ -1,33 +1,41 @@ -# Infrastructure to deploy Research Projects +# PyInfra +1. [ About ](#about) +2. [ Configuration ](#configuration) +3. [ Response Format ](#response-format) +4. [ Usage & API ](#usage--api) +5. [ Scripts ](#scripts) +6. [ Tests ](#tests) + +## About + +Common Module with the infrastructure to deploy Research Projects. The Infrastructure expects to be deployed in the same Pod / local environment as the analysis container and handles all outbound communication. ## Configuration A configuration is located in `/config.yaml`. All relevant variables can be configured via exporting environment variables. -| Environment Variable | Default | Description | -| ----------------------------- | ------------------------------ | --------------------------------------------------------------------- | -| LOGGING_LEVEL_ROOT | DEBUG | Logging level for service logger | -| PROBING_WEBSERVER_HOST | "0.0.0.0" | Probe webserver address | -| PROBING_WEBSERVER_PORT | 8080 | Probe webserver port | -| PROBING_WEBSERVER_MODE | production | Webserver mode: {development, production} | -| RABBITMQ_HOST | localhost | RabbitMQ host address | -| RABBITMQ_PORT | 5672 | RabbitMQ host port | -| RABBITMQ_USERNAME | user | RabbitMQ username | -| RABBITMQ_PASSWORD | bitnami | RabbitMQ password | -| RABBITMQ_HEARTBEAT | 7200 | Controls AMQP heartbeat timeout in seconds | -| REQUEST_QUEUE | request_queue | Requests to service | -| RESPONSE_QUEUE | response_queue | Responses by service | -| DEAD_LETTER_QUEUE | dead_letter_queue | Messages that failed to process | -| ANALYSIS_ENDPOINT | "http://127.0.0.1:5000" | Endpoint for analysis container | -| STORAGE_BACKEND | s3 | The type of storage to use {s3, azure} | -| STORAGE_BUCKET | "redaction" | The bucket / container to pull files specified in queue requests from | -| STORAGE_ENDPOINT | "http://127.0.0.1:9000" | Endpoint for s3 storage | -| STORAGE_KEY | root | User for s3 storage | -| STORAGE_SECRET | password | Password for s3 storage | -| STORAGE_AZURECONNECTIONSTRING | "DefaultEndpointsProtocol=..." | Connection string for Azure storage | -| STORAGE_AZURECONTAINERNAME | "redaction" | AKS container | +| Environment Variable | Default | Description | +|-------------------------------|----------------------------------|--------------------------------------------------------------------------| +| LOGGING_LEVEL_ROOT | "DEBUG" | Logging level for service logger | +| RABBITMQ_HOST | "localhost" | RabbitMQ host address | +| RABBITMQ_PORT | "5672" | RabbitMQ host port | +| RABBITMQ_USERNAME | "user" | RabbitMQ username | +| RABBITMQ_PASSWORD | "bitnami" | RabbitMQ password | +| RABBITMQ_HEARTBEAT | 60 | Controls AMQP heartbeat timeout in seconds | +| RABBITMQ_CONNECTION_SLEEP | 5 | Controls AMQP connection sleep timer in seconds | +| REQUEST_QUEUE | "request_queue" | Requests to service | +| RESPONSE_QUEUE | "response_queue" | Responses by service | +| DEAD_LETTER_QUEUE | "dead_letter_queue" | Messages that failed to process | +| STORAGE_BACKEND | "s3" | The type of storage to use {s3, azure} | +| STORAGE_BUCKET | "redaction" | The bucket / container to pull files specified in queue requests from | +| STORAGE_ENDPOINT | "http://127.0.0.1:9000" | Endpoint for s3 storage | +| STORAGE_KEY | "root" | User for s3 storage | +| STORAGE_SECRET | "password" | Password for s3 storage | +| STORAGE_AZURECONNECTIONSTRING | "DefaultEndpointsProtocol=..." | Connection string for Azure storage | +| STORAGE_AZURECONTAINERNAME | "redaction" | AKS container | +| WRITE_CONSUMER_TOKEN | "False" | Value to see if we should write a consumer token to a file | ## Response Format @@ -49,11 +57,12 @@ Optionally, the input message can contain a field with the key `"operations"`. ```json { "dossierId": "", - "fileId": "", - ... + "fileId": "" } ``` +## Usage & API + ### Setup Install project dependencies @@ -69,3 +78,40 @@ or install form another project ```bash poetry add git+ssh://git@git.iqser.com:2222/rr/pyinfra.git#TAG-NUMBER ``` + +### API + +```python +from pyinfra.config import get_config +from pyinfra.payload_processing.processor import make_payload_processor +from pyinfra.queue.queue_manager import QueueManager + +queue_manager = QueueManager(get_config()) +queue_manager.start_consuming(make_payload_processor(data_processor)) +``` +The data_processor should expect a dict or bytes (pdf) as input and should return a list of results. + +## Scripts + +### Run pyinfra locally + +**Shell 1**: Start minio and rabbitmq containers +```bash +$ cd tests && docker-compose up +``` + +**Shell 2**: Start pyinfra with callback mock +```bash +$ python scripts/start_pyinfra.py +``` + +**Shell 3**: Upload dummy content on storage and publish message +```bash +$ python scripts/mock_process_request.py +``` + +## Tests + +The tests take a bit longer than you are probably used to, because among other things the required startup times are +quite high. The test runtime can be accelerated by setting 'autouse' to 'False'. In that case, run 'docker-compose up' +in the tests dir manually before running the tests. \ No newline at end of file diff --git a/bamboo-specs/src/main/java/buildjob/PlanSpec.java b/bamboo-specs/src/main/java/buildjob/PlanSpec.java index dec2c07..f8590a8 100644 --- a/bamboo-specs/src/main/java/buildjob/PlanSpec.java +++ b/bamboo-specs/src/main/java/buildjob/PlanSpec.java @@ -199,11 +199,14 @@ public class PlanSpec { "/var/run/docker.sock")))) .linkedRepositories(REPOSITORY_KEY + " / " + SERVICE_NAME) .triggers( - new ScheduledTrigger() - .scheduleOnceDaily(LocalTime.of(23, 00))) + new BitbucketServerTrigger()) .planBranchManagement( new PlanBranchManagement() - .createForVcsBranchMatching("release.*") + .createForVcsBranch() + .delete( + new BranchCleanup() + .whenInactiveInRepositoryAfterDays( + 14)) .notificationForCommitters()); } diff --git a/bamboo-specs/src/main/resources/scripts/config-keys.sh b/bamboo-specs/src/main/resources/scripts/config-keys.sh old mode 100644 new mode 100755 diff --git a/poetry.lock b/poetry.lock index 78fddfa..6d86796 100644 --- a/poetry.lock +++ b/poetry.lock @@ -428,6 +428,70 @@ traitlets = ">=5.3" [package.extras] test = ["pytest"] +[[package]] +name = "coverage" +version = "7.2.0" +description = "Code coverage measurement for Python" +category = "dev" +optional = false +python-versions = ">=3.7" +files = [ + {file = "coverage-7.2.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:90e7a4cbbb7b1916937d380beb1315b12957b8e895d7d9fb032e2038ac367525"}, + {file = "coverage-7.2.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:34d7211be69b215ad92298a962b2cd5a4ef4b17c7871d85e15d3d1b6dc8d8c96"}, + {file = "coverage-7.2.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:971b49dbf713044c3e5f6451b39f65615d4d1c1d9a19948fa0f41b0245a98765"}, + {file = "coverage-7.2.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f0557289260125a6c453ad5673ba79e5b6841d9a20c9e101f758bfbedf928a77"}, + {file = "coverage-7.2.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:049806ae2df69468c130f04f0fab4212c46b34ba5590296281423bb1ae379df2"}, + {file = "coverage-7.2.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:875b03d92ac939fbfa8ae74a35b2c468fc4f070f613d5b1692f9980099a3a210"}, + {file = "coverage-7.2.0-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:c160e34e388277f10c50dc2c7b5e78abe6d07357d9fe7fcb2f3c156713fd647e"}, + {file = "coverage-7.2.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:32e6a730fd18b2556716039ab93278ccebbefa1af81e6aa0c8dba888cf659e6e"}, + {file = "coverage-7.2.0-cp310-cp310-win32.whl", hash = "sha256:f3ff4205aff999164834792a3949f82435bc7c7655c849226d5836c3242d7451"}, + {file = "coverage-7.2.0-cp310-cp310-win_amd64.whl", hash = "sha256:93db11da6e728587e943dff8ae1b739002311f035831b6ecdb15e308224a4247"}, + {file = "coverage-7.2.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:cd38140b56538855d3d5722c6d1b752b35237e7ea3f360047ce57f3fade82d98"}, + {file = "coverage-7.2.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:9dbb21561b0e04acabe62d2c274f02df0d715e8769485353ddf3cf84727e31ce"}, + {file = "coverage-7.2.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:171dd3aa71a49274a7e4fc26f5bc167bfae5a4421a668bc074e21a0522a0af4b"}, + {file = "coverage-7.2.0-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:4655ecd813f4ba44857af3e9cffd133ab409774e9d2a7d8fdaf4fdfd2941b789"}, + {file = "coverage-7.2.0-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1856a8c4aa77eb7ca0d42c996d0ca395ecafae658c1432b9da4528c429f2575c"}, + {file = "coverage-7.2.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:bd67df6b48db18c10790635060858e2ea4109601e84a1e9bfdd92e898dc7dc79"}, + {file = "coverage-7.2.0-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:2d7daf3da9c7e0ed742b3e6b4de6cc464552e787b8a6449d16517b31bbdaddf5"}, + {file = "coverage-7.2.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:bf9e02bc3dee792b9d145af30db8686f328e781bd212fdef499db5e9e4dd8377"}, + {file = "coverage-7.2.0-cp311-cp311-win32.whl", hash = "sha256:3713a8ec18781fda408f0e853bf8c85963e2d3327c99a82a22e5c91baffcb934"}, + {file = "coverage-7.2.0-cp311-cp311-win_amd64.whl", hash = "sha256:88ae5929f0ef668b582fd7cad09b5e7277f50f912183cf969b36e82a1c26e49a"}, + {file = "coverage-7.2.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:5e29a64e9586194ea271048bc80c83cdd4587830110d1e07b109e6ff435e5dbc"}, + {file = "coverage-7.2.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8d5302eb84c61e758c9d68b8a2f93a398b272073a046d07da83d77b0edc8d76b"}, + {file = "coverage-7.2.0-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2c9fffbc39dc4a6277e1525cab06c161d11ee3995bbc97543dc74fcec33e045b"}, + {file = "coverage-7.2.0-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a6ceeab5fca62bca072eba6865a12d881f281c74231d2990f8a398226e1a5d96"}, + {file = "coverage-7.2.0-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:28563a35ef4a82b5bc5160a01853ce62b9fceee00760e583ffc8acf9e3413753"}, + {file = "coverage-7.2.0-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:bfa065307667f1c6e1f4c3e13f415b0925e34e56441f5fda2c84110a4a1d8bda"}, + {file = "coverage-7.2.0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:7f992b32286c86c38f07a8b5c3fc88384199e82434040a729ec06b067ee0d52c"}, + {file = "coverage-7.2.0-cp37-cp37m-win32.whl", hash = "sha256:2c15bd09fd5009f3a79c8b3682b52973df29761030b692043f9834fc780947c4"}, + {file = "coverage-7.2.0-cp37-cp37m-win_amd64.whl", hash = "sha256:f332d61fbff353e2ef0f3130a166f499c3fad3a196e7f7ae72076d41a6bfb259"}, + {file = "coverage-7.2.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:577a8bc40c01ad88bb9ab1b3a1814f2f860ff5c5099827da2a3cafc5522dadea"}, + {file = "coverage-7.2.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:9240a0335365c29c968131bdf624bb25a8a653a9c0d8c5dbfcabf80b59c1973c"}, + {file = "coverage-7.2.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:358d3bce1468f298b19a3e35183bdb13c06cdda029643537a0cc37e55e74e8f1"}, + {file = "coverage-7.2.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:932048364ff9c39030c6ba360c31bf4500036d4e15c02a2afc5a76e7623140d4"}, + {file = "coverage-7.2.0-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7efa21611ffc91156e6f053997285c6fe88cfef3fb7533692d0692d2cb30c846"}, + {file = "coverage-7.2.0-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:465ea431c3b78a87e32d7d9ea6d081a1003c43a442982375cf2c247a19971961"}, + {file = "coverage-7.2.0-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:0f03c229f1453b936916f68a47b3dfb5e84e7ad48e160488168a5e35115320c8"}, + {file = "coverage-7.2.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:40785553d68c61e61100262b73f665024fd2bb3c6f0f8e2cd5b13e10e4df027b"}, + {file = "coverage-7.2.0-cp38-cp38-win32.whl", hash = "sha256:b09dd7bef59448c66e6b490cc3f3c25c14bc85d4e3c193b81a6204be8dd355de"}, + {file = "coverage-7.2.0-cp38-cp38-win_amd64.whl", hash = "sha256:dc4f9a89c82faf6254d646180b2e3aa4daf5ff75bdb2c296b9f6a6cf547e26a7"}, + {file = "coverage-7.2.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:c243b25051440386179591a8d5a5caff4484f92c980fb6e061b9559da7cc3f64"}, + {file = "coverage-7.2.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:4b8fd32f85b256fc096deeb4872aeb8137474da0c0351236f93cbedc359353d6"}, + {file = "coverage-7.2.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d7f2a7df523791e6a63b40360afa6792a11869651307031160dc10802df9a252"}, + {file = "coverage-7.2.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:da32526326e8da0effb452dc32a21ffad282c485a85a02aeff2393156f69c1c3"}, + {file = "coverage-7.2.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4c1153a6156715db9d6ae8283480ae67fb67452aa693a56d7dae9ffe8f7a80da"}, + {file = "coverage-7.2.0-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:74cd60fa00f46f28bd40048d6ca26bd58e9bee61d2b0eb4ec18cea13493c003f"}, + {file = "coverage-7.2.0-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:59a427f8a005aa7254074719441acb25ac2c2f60c1f1026d43f846d4254c1c2f"}, + {file = "coverage-7.2.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:c3c4beddee01c8125a75cde3b71be273995e2e9ec08fbc260dd206b46bb99969"}, + {file = "coverage-7.2.0-cp39-cp39-win32.whl", hash = "sha256:08e3dd256b8d3e07bb230896c8c96ec6c5dffbe5a133ba21f8be82b275b900e8"}, + {file = "coverage-7.2.0-cp39-cp39-win_amd64.whl", hash = "sha256:ad12c74c6ce53a027f5a5ecbac9be20758a41c85425c1bbab7078441794b04ee"}, + {file = "coverage-7.2.0-pp37.pp38.pp39-none-any.whl", hash = "sha256:ffa637a2d5883298449a5434b699b22ef98dd8e2ef8a1d9e60fa9cfe79813411"}, + {file = "coverage-7.2.0.tar.gz", hash = "sha256:9cc9c41aa5af16d845b53287051340c363dd03b7ef408e45eec3af52be77810d"}, +] + +[package.extras] +toml = ["tomli"] + [[package]] name = "cryptography" version = "39.0.1" @@ -1199,6 +1263,21 @@ files = [ dev = ["pre-commit", "tox"] testing = ["pytest", "pytest-benchmark"] +[[package]] +name = "prometheus-client" +version = "0.16.0" +description = "Python client for the Prometheus monitoring system." +category = "main" +optional = false +python-versions = ">=3.6" +files = [ + {file = "prometheus_client-0.16.0-py3-none-any.whl", hash = "sha256:0836af6eb2c8f4fed712b2f279f6c0a8bbab29f9f4aa15276b91c7cb0d1616ab"}, + {file = "prometheus_client-0.16.0.tar.gz", hash = "sha256:a03e35b359f14dd1630898543e2120addfdeacd1a6069c1367ae90fd93ad3f48"}, +] + +[package.extras] +twisted = ["twisted"] + [[package]] name = "prompt-toolkit" version = "3.0.36" @@ -1334,6 +1413,46 @@ typing-extensions = {version = ">=3.10.0", markers = "python_version < \"3.10\"" spelling = ["pyenchant (>=3.2,<4.0)"] testutils = ["gitpython (>3)"] +[[package]] +name = "pymupdf" +version = "1.21.1" +description = "Python bindings for the PDF toolkit and renderer MuPDF" +category = "main" +optional = false +python-versions = ">=3.7" +files = [ + {file = "PyMuPDF-1.21.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e334e7baea701d4029faa034520c722c9c55e8315b7ccf4d1b686eba8e293f32"}, + {file = "PyMuPDF-1.21.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:19fe49966f3a072a258cdfbab3d8c5b11cb7c2fb7c8e6abf3398d9e55af6f1ca"}, + {file = "PyMuPDF-1.21.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:36181c4cb27740791d611d0224dd18ac78e23a1f06aa2151394c5b9194b4f885"}, + {file = "PyMuPDF-1.21.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e5273ff0c3bf08428ef956c4a5e5e0b475667cc3e1cb7b41d9f5131636996e59"}, + {file = "PyMuPDF-1.21.1-cp310-cp310-win32.whl", hash = "sha256:7e5a1ed49df5eee7834fb37adb5c94354e98b23fa997e67034b157790a31bbfc"}, + {file = "PyMuPDF-1.21.1-cp310-cp310-win_amd64.whl", hash = "sha256:c040c9ac98b7a1afc06fe55ffedbf6a305c24bf1c97597838f208c68035971f4"}, + {file = "PyMuPDF-1.21.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:ac2f61632bcd4c9532a26b52684ca05a4c8b7b96d6be947134b3e01a420904fd"}, + {file = "PyMuPDF-1.21.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:eb29cbfd34d5ef99c657539c4f48953ea15f4b1e4e162b48efcffa5c4101ce1d"}, + {file = "PyMuPDF-1.21.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:06860a8e60ba14f25aac5db90803d59b1a2fdac24c2b65dc6f9876ff36df586e"}, + {file = "PyMuPDF-1.21.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ff7d01cb563c3ca18880ba6a2661351f07b8a54f6599272be2e3568524e5e721"}, + {file = "PyMuPDF-1.21.1-cp311-cp311-win32.whl", hash = "sha256:1fc4c8aee186326b3f138be8a4ac16a343e76c8ec45b5afab2d105a5e3b02d80"}, + {file = "PyMuPDF-1.21.1-cp311-cp311-win_amd64.whl", hash = "sha256:dab0459791175dea813e6130e08b9026d3b9d66854f0dbb7ab51ab1619875d24"}, + {file = "PyMuPDF-1.21.1-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:2a870f02b2767e9371bbb3d0263c3994f59679a64b1b9dd1a6b0d9a15c963d68"}, + {file = "PyMuPDF-1.21.1-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f99e6c2109c38c17cd5f1b82c9d35f1e815a71a3464b765f41f3c8cf875dcf4c"}, + {file = "PyMuPDF-1.21.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bf204723782ca481cd120e1ab124b0d59fd387103e15675ae462415bfa22b3a2"}, + {file = "PyMuPDF-1.21.1-cp37-cp37m-win32.whl", hash = "sha256:b2b197bb0f00159f4584f51c7bf4d897aa5fb88de3c1f964347047a710c1f1be"}, + {file = "PyMuPDF-1.21.1-cp37-cp37m-win_amd64.whl", hash = "sha256:dddd3fe6fa20b3e2642c10e66c97cfd3727010d3dafe653f16dce52396ef0208"}, + {file = "PyMuPDF-1.21.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:c09cbc7924ddf99452bb0d70438d64753815d75526d6d94293249f1665691d84"}, + {file = "PyMuPDF-1.21.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:fdba6bf77d1d3cd57ea93181c494f4b138523000ff74b25c523be7dce0058ac9"}, + {file = "PyMuPDF-1.21.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0b08b6afbb656f0e4582a8ee44c2ce0ab6235bb81830ec92d79f6e4893db639c"}, + {file = "PyMuPDF-1.21.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0c85efbc08c83c8d91ae1cc7bb7e06c7cd7e203e9d0ed806ff22db173999f30c"}, + {file = "PyMuPDF-1.21.1-cp38-cp38-win32.whl", hash = "sha256:fa0334247549def667d9f5358e558ec93815c78cebdb927a81dda38d681bf550"}, + {file = "PyMuPDF-1.21.1-cp38-cp38-win_amd64.whl", hash = "sha256:d85c360abd72c14e302a2cefd02ebd01568f5c669700c7fc1ed056e9cdf3c285"}, + {file = "PyMuPDF-1.21.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:f3ffa9a8c643da39aba80e787d2c1771d5be07b0e15bf193ddd1fda888bbca88"}, + {file = "PyMuPDF-1.21.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:18fbdcb35c144b26a3a507bfa73f98401a27819f29dca496eb470b9a1ec09fad"}, + {file = "PyMuPDF-1.21.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2bb5f9ce33054ebfbf63c41e68606d51e3ad5d85def0af5ebb6f0dd276e61037"}, + {file = "PyMuPDF-1.21.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2fbb96336c5cc065f6e5359f034a689e4d64ca7697dc5965b03f105b2ecb7ba1"}, + {file = "PyMuPDF-1.21.1-cp39-cp39-win32.whl", hash = "sha256:909afad284c24f25a831b37e433e444dc4cb5b3e38b3f761b1e4acad5c65327d"}, + {file = "PyMuPDF-1.21.1-cp39-cp39-win_amd64.whl", hash = "sha256:069ba56c28cb9b603d016c63f23a21bf22a00c5b2a92286bf3dd6a2759e119d4"}, + {file = "PyMuPDF-1.21.1.tar.gz", hash = "sha256:f815741a435c62a0036bbcbf5fa6c533567bd69c5338d413714fc57b22db93e0"}, +] + [[package]] name = "pynacl" version = "1.5.0" @@ -1962,4 +2081,4 @@ testing = ["flake8 (<5)", "func-timeout", "jaraco.functools", "jaraco.itertools" [metadata] lock-version = "2.0" python-versions = "~3.8" -content-hash = "2cc7ebd06eb0574420d0e0838a89e61e4d38332d7ed3896fb20aa9d17480a566" +content-hash = "faba30e3afa700d6f483022581e1813ec16c9bff0a49ebbcbbd166e293ae877e" diff --git a/pyinfra/config.py b/pyinfra/config.py index a4f2c4e..6536489 100644 --- a/pyinfra/config.py +++ b/pyinfra/config.py @@ -1,11 +1,18 @@ from os import environ +from typing import Union + +from pyinfra.utils.url_parsing import validate_and_parse_s3_endpoint def read_from_environment(environment_variable_name, default_value): return environ.get(environment_variable_name, default_value) -class Config(object): +def normalize_bool(value: Union[str, bool]): + return value if isinstance(value, bool) else value in ["True", "true"] + + +class Config: def __init__(self): # Logging level for service logger self.logging_level_root = read_from_environment("LOGGING_LEVEL_ROOT", "DEBUG") @@ -23,11 +30,11 @@ class Config(object): self.rabbitmq_password = read_from_environment("RABBITMQ_PASSWORD", "bitnami") # Controls AMQP heartbeat timeout in seconds - self.rabbitmq_heartbeat = read_from_environment("RABBITMQ_HEARTBEAT", "60") + self.rabbitmq_heartbeat = int(read_from_environment("RABBITMQ_HEARTBEAT", 60)) # Controls AMQP connection sleep timer in seconds # important for heartbeat to come through while main function runs on other thread - self.rabbitmq_connection_sleep = read_from_environment("RABBITMQ_CONNECTION_SLEEP", 5) + self.rabbitmq_connection_sleep = int(read_from_environment("RABBITMQ_CONNECTION_SLEEP", 5)) # Queue name for requests to the service self.request_queue = read_from_environment("REQUEST_QUEUE", "request_queue") @@ -47,8 +54,9 @@ class Config(object): else: self.storage_bucket = read_from_environment("STORAGE_AZURECONTAINERNAME", "redaction") - # Endpoint for s3 storage - self.storage_endpoint = read_from_environment("STORAGE_ENDPOINT", "http://127.0.0.1:9000") + # S3 connection security flag and endpoint + storage_address = read_from_environment("STORAGE_ENDPOINT", "http://127.0.0.1:9000") + self.storage_secure_connection, self.storage_endpoint = validate_and_parse_s3_endpoint(storage_address) # User for s3 storage self.storage_key = read_from_environment("STORAGE_KEY", "root") @@ -61,7 +69,8 @@ class Config(object): # Connection string for Azure storage self.storage_azureconnectionstring = read_from_environment( - "STORAGE_AZURECONNECTIONSTRING", "DefaultEndpointsProtocol=..." + "STORAGE_AZURECONNECTIONSTRING", + "DefaultEndpointsProtocol=...", ) # Value to see if we should write a consumer token to a file diff --git a/pyinfra/exception.py b/pyinfra/exception.py new file mode 100644 index 0000000..74a9dcb --- /dev/null +++ b/pyinfra/exception.py @@ -0,0 +1,2 @@ +class ProcessingFailure(RuntimeError): + pass diff --git a/pyinfra/payload_processing/__init__.py b/pyinfra/payload_processing/__init__.py new file mode 100644 index 0000000..c350215 --- /dev/null +++ b/pyinfra/payload_processing/__init__.py @@ -0,0 +1,3 @@ +__all__ = ["make_payload_processor"] + +from pyinfra.payload_processing.processor import make_payload_processor diff --git a/pyinfra/payload_processing/payload.py b/pyinfra/payload_processing/payload.py new file mode 100644 index 0000000..79c63c2 --- /dev/null +++ b/pyinfra/payload_processing/payload.py @@ -0,0 +1,56 @@ +import os +from dataclasses import dataclass, field +from operator import itemgetter +from typing import List, Tuple + + +@dataclass +class QueueMessagePayload: + dossier_id: str + file_id: str + target_file_extension: str + response_file_extension: str + + target_file_name: str = field(init=False) + response_file_name: str = field(init=False) + + def __post_init__(self): + self.target_file_name = f"{self.dossier_id}/{self.file_id}.{self.target_file_extension}" + self.response_file_name = f"{self.dossier_id}/{self.file_id}.{self.response_file_extension}" + + self.target_file_type, self.target_compression_type = parse_file_extension(self.target_file_extension) + self.response_file_type, self.response_compression_type = parse_file_extension(self.response_file_extension) + + +def read_queue_message_payload(payload: dict) -> QueueMessagePayload: + return QueueMessagePayload( + *itemgetter( + "dossierId", + "fileId", + "targetFileExtension", + "responseFileExtension", + )(payload) + ) + + +def format_service_processing_result_for_storage( + queue_message_payload: QueueMessagePayload, service_processing_result: List[dict] +) -> dict: + return { + "dossierId": queue_message_payload.dossier_id, + "fileId": queue_message_payload.file_id, + "targetFileExtension": queue_message_payload.target_file_extension, + "responseFileExtension": queue_message_payload.response_file_extension, + "data": service_processing_result, + } + + +def format_to_queue_message_response_body(queue_message_payload: QueueMessagePayload) -> dict: + return {"dossierId": queue_message_payload.dossier_id, "fileId": queue_message_payload.file_id} + + +def parse_file_extension(file_extension: str) -> Tuple[str, str]: + file_type, compression_type = os.path.splitext(file_extension) + if "." in file_type: + raise ValueError(f"Number of file extensions exceeds allowed amount of two: {file_extension}") + return file_type, compression_type diff --git a/pyinfra/payload_processing/processor.py b/pyinfra/payload_processing/processor.py new file mode 100644 index 0000000..08fd333 --- /dev/null +++ b/pyinfra/payload_processing/processor.py @@ -0,0 +1,80 @@ +import logging +from dataclasses import asdict +from functools import partial +from typing import Callable, Union, List + +from funcy import compose + +from pyinfra.config import get_config, Config +from pyinfra.payload_processing.payload import ( + read_queue_message_payload, + format_service_processing_result_for_storage, + format_to_queue_message_response_body, +) +from pyinfra.storage import get_storage +from pyinfra.storage.storage import make_downloader, make_uploader +from pyinfra.storage.storages.interface import Storage + +logger = logging.getLogger() +logger.setLevel(get_config().logging_level_root) + + +class PayloadProcessor: + def __init__(self, storage: Storage, bucket: str, data_processor: Callable): + """Wraps an analysis function specified by a service (e.g. NER service) in pre- and post-processing steps. + + Args: + storage: The storage to use for downloading and uploading files + bucket: The bucket to use for downloading and uploading files + data_processor: The analysis function to be called with the downloaded file + """ + self.process_data = data_processor + + self.partial_download_fn = partial(make_downloader, storage, bucket) + self.partial_upload_fn = partial(make_uploader, storage, bucket) + + def __call__(self, queue_message_payload: dict) -> dict: + """Processes a queue message payload. + + The steps executed are: + 1. Download the file specified in the message payload from the storage + 2. Process the file with the analysis function + 3. Upload the result to the storage + 4. Return the payload for a response queue message + + Args: + queue_message_payload: The payload of a queue message. The payload is expected to be a dict with the + following keys: dossierId, fileId, targetFileExtension, responseFileExtension + + Returns: + The payload for a response queue message. The payload is a dict with the following keys: dossierId, fileId + """ + return self._process(queue_message_payload) + + def _process(self, queue_message_payload: dict) -> dict: + payload = read_queue_message_payload(queue_message_payload) + logger.info(f"Processing {asdict(payload)} ...") + + download_file_to_process = self.partial_download_fn(payload.target_file_type, payload.target_compression_type) + upload_processing_result = self.partial_upload_fn(payload.response_file_type, payload.response_compression_type) + format_result_for_storage = partial(format_service_processing_result_for_storage, payload) + + processing_pipeline = compose(format_result_for_storage, self.process_data, download_file_to_process) + + result: List[dict] = processing_pipeline(payload.target_file_name) + + upload_processing_result(payload.response_file_name, result) + + return format_to_queue_message_response_body(payload) + + +def make_payload_processor(data_processor: Callable, config: Union[None, Config]) -> PayloadProcessor: + """Produces payload processor for queue manager.""" + config = config or get_config() + + storage: Storage = get_storage(config) + bucket: str = config.storage_bucket + + data_processor = compose(list, data_processor) + + return PayloadProcessor(storage, bucket, data_processor) diff --git a/pyinfra/queue/development_queue_manager.py b/pyinfra/queue/development_queue_manager.py new file mode 100644 index 0000000..a14df63 --- /dev/null +++ b/pyinfra/queue/development_queue_manager.py @@ -0,0 +1,40 @@ +import json + +import pika +import pika.exceptions + +from pyinfra.config import Config +from pyinfra.queue.queue_manager import QueueManager + + +class DevelopmentQueueManager(QueueManager): + """Extends the queue manger with additional functionality that is needed for tests and scripts, + but not in production, such as publishing messages. + """ + + def __init__(self, config: Config): + super().__init__(config) + self._open_channel() + + def publish_request(self, message: dict, properties: pika.BasicProperties = None): + message_encoded = json.dumps(message).encode("utf-8") + self._channel.basic_publish( + "", + self._input_queue, + properties=properties, + body=message_encoded, + ) + + def get_response(self): + return self._channel.basic_get(self._output_queue) + + def clear_queues(self): + """purge input & output queues""" + try: + self._channel.queue_purge(self._input_queue) + self._channel.queue_purge(self._output_queue) + except pika.exceptions.ChannelWrongStateError: + pass + + def close_channel(self): + self._channel.close() diff --git a/pyinfra/queue/queue_manager.py b/pyinfra/queue/queue_manager.py index f091acc..ec241a0 100644 --- a/pyinfra/queue/queue_manager.py +++ b/pyinfra/queue/queue_manager.py @@ -4,13 +4,14 @@ import json import logging import signal from pathlib import Path -from typing import Callable import pika import pika.exceptions from pika.adapters.blocking_connection import BlockingChannel from pyinfra.config import Config +from pyinfra.exception import ProcessingFailure +from pyinfra.payload_processing.processor import PayloadProcessor CONFIG = Config() @@ -32,7 +33,7 @@ def get_connection_params(config: Config) -> pika.ConnectionParameters: "host": config.rabbitmq_host, "port": config.rabbitmq_port, "credentials": credentials, - "heartbeat": int(config.rabbitmq_heartbeat), + "heartbeat": config.rabbitmq_heartbeat, } return pika.ConnectionParameters(**pika_connection_params) @@ -104,9 +105,10 @@ class QueueManager: self._channel.queue_declare(self._input_queue, arguments=args, auto_delete=False, durable=True) self._channel.queue_declare(self._output_queue, arguments=args, auto_delete=False, durable=True) - def start_consuming(self, process_message_callback: Callable): + def start_consuming(self, process_payload: PayloadProcessor): """consumption handling - - stanard callback handling is enforced through wrapping process_message_callback in _create_queue_callback (implements threading to support heartbeats) + - standard callback handling is enforced through wrapping process_message_callback in _create_queue_callback + (implements threading to support heartbeats) - initially sets consumer token to None - tries to - open channels @@ -115,9 +117,9 @@ class QueueManager: - catches all Exceptions & stops consuming + closes channels Args: - process_message_callback (Callable): function call passed into the queue manager from implementing service + process_payload (Callable): function passed to the queue manager, configured by implementing service """ - callback = self._create_queue_callback(process_message_callback) + callback = self._create_queue_callback(process_payload) self._set_consumer_token(None) self.logger.info("Consuming from queue") @@ -134,11 +136,10 @@ class QueueManager: raise finally: - self.stop_consuming() # stopping consumption also closes the channel + self.stop_consuming() self._connection.close() def stop_consuming(self): - """stop channel consumption & reset consumer token to None""" if self._consumer_token and self._connection: self.logger.info("Cancelling subscription for consumer-tag %s", self._consumer_token) self._channel.stop_consuming(self._consumer_token) @@ -148,71 +149,65 @@ class QueueManager: self.logger.info("Received signal %s", signal_number) self.stop_consuming() - def _create_queue_callback(self, process_message_callback: Callable): + def _create_queue_callback(self, process_payload: PayloadProcessor): def process_message_body_and_await_result(unpacked_message_body): with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread_pool_executor: - self.logger.debug("opening thread for callback") - future = thread_pool_executor.submit(process_message_callback, unpacked_message_body) + self.logger.debug("Processing payload in separate thread") + future = thread_pool_executor.submit(process_payload, unpacked_message_body) while future.running(): - self.logger.debug("callback running in thread, processing data events in the meantime") + self.logger.debug("Waiting for payload processing to finish") self._connection.sleep(float(self._connection_sleep)) - self.logger.debug("fetching result from callback") - return future.result() + try: + return future.result() + except Exception as err: + raise ProcessingFailure("QueueMessagePayload processing failed") from err def acknowledge_message_and_publish_response(frame, properties, response_body): response_properties = pika.BasicProperties(headers=properties.headers) if properties.headers else None - self._channel.basic_publish( - "", self._output_queue, json.dumps(response_body).encode(), response_properties - ) + self._channel.basic_publish("", self._output_queue, json.dumps(response_body).encode(), response_properties) self.logger.info( - "Result published, acknowledging incoming message with delivery_tag %s", frame.delivery_tag + "Result published, acknowledging incoming message with delivery_tag %s", + frame.delivery_tag, ) self._channel.basic_ack(frame.delivery_tag) def callback(_channel, frame, properties, body): self.logger.info("Received message from queue with delivery_tag %s", frame.delivery_tag) - self.logger.debug("Message headers: %s.", properties.headers) + self.logger.debug("Message headers: %s", properties.headers) - # Only try to process each message once. - # Requeueing will be handled by the dead-letter-exchange. - # This prevents endless retries on messages that are impossible to process. + # Only try to process each message once. Re-queueing will be handled by the dead-letter-exchange. This + # prevents endless retries on messages that are impossible to process. if frame.redelivered: self.logger.info( - "Aborting message processing for delivery_tag %s due to it being redelivered", frame.delivery_tag + "Aborting message processing for delivery_tag %s due to it being redelivered", + frame.delivery_tag, ) self._channel.basic_nack(frame.delivery_tag, requeue=False) - # returns nothing, so the function stops here return - self.logger.debug("Processing (%s, %s, %s).", frame, properties, body) - try: - callback_result = process_message_body_and_await_result(json.loads(body)) + self.logger.debug("Processing (%s, %s, %s)", frame, properties, body) + processing_result = process_message_body_and_await_result(json.loads(body)) + self.logger.info( + "Processed message with delivery_tag %s, publishing result to result-queue", + frame.delivery_tag, + ) + acknowledge_message_and_publish_response(frame, properties, processing_result) - if callback_result: - self.logger.info( - "Processed message with delivery_tag %s, publishing result to result-queue", frame.delivery_tag - ) - acknowledge_message_and_publish_response(frame, properties, callback_result) - else: - self.logger.info("Processed message with delivery_tag %s, declining message", frame.delivery_tag) - self._channel.basic_nack(frame.delivery_tag, requeue=False) + except ProcessingFailure: + self.logger.info( + "Processing message with delivery_tag %s failed, declining", + frame.delivery_tag, + ) + self._channel.basic_nack(frame.delivery_tag, requeue=False) - except Exception as ex: + except Exception: n_attempts = _get_n_previous_attempts(properties) + 1 self.logger.warning("Failed to process message, %s attempts", n_attempts, exc_info=True) self._channel.basic_nack(frame.delivery_tag, requeue=False) - raise ex + raise return callback - - def clear(self): - """purge input & output queues""" - try: - self._channel.queue_purge(self._input_queue) - self._channel.queue_purge(self._output_queue) - except pika.exceptions.ChannelWrongStateError: - pass diff --git a/pyinfra/storage/__init__.py b/pyinfra/storage/__init__.py index f619722..f5d004f 100644 --- a/pyinfra/storage/__init__.py +++ b/pyinfra/storage/__init__.py @@ -1,4 +1,3 @@ -from pyinfra.storage import adapters, storage from pyinfra.storage.storage import get_storage -__all__ = ["adapters", "storage"] +__all__ = ["get_storage"] diff --git a/pyinfra/storage/storage.py b/pyinfra/storage/storage.py index 36a9f57..40128b8 100644 --- a/pyinfra/storage/storage.py +++ b/pyinfra/storage/storage.py @@ -1,9 +1,17 @@ +from functools import lru_cache, partial +from typing import Callable + +from funcy import compose + from pyinfra.config import Config -from pyinfra.storage.adapters.azure import get_azure_storage -from pyinfra.storage.adapters.s3 import get_s3_storage +from pyinfra.storage.storages.azure import get_azure_storage +from pyinfra.storage.storages.s3 import get_s3_storage +from pyinfra.storage.storages.interface import Storage +from pyinfra.utils.compressing import get_decompressor, get_compressor +from pyinfra.utils.encoding import get_decoder, get_encoder -def get_storage(config: Config): +def get_storage(config: Config) -> Storage: if config.storage_backend == "s3": storage = get_s3_storage(config) @@ -13,3 +21,33 @@ def get_storage(config: Config): raise Exception(f"Unknown storage backend '{config.storage_backend}'.") return storage + + +def verify_existence(storage: Storage, bucket: str, file_name: str) -> str: + if not storage.exists(bucket, file_name): + raise FileNotFoundError(f"{file_name=} name not found on storage in {bucket=}.") + return file_name + + +@lru_cache(maxsize=10) +def make_downloader(storage: Storage, bucket: str, file_type: str, compression_type: str) -> Callable: + + verify = partial(verify_existence, storage, bucket) + download = partial(storage.get_object, bucket) + decompress = get_decompressor(compression_type) + decode = get_decoder(file_type) + + return compose(decode, decompress, download, verify) + + +@lru_cache(maxsize=10) +def make_uploader(storage: Storage, bucket: str, file_type: str, compression_type: str) -> Callable: + + upload = partial(storage.put_object, bucket) + compress = get_compressor(compression_type) + encode = get_encoder(file_type) + + def inner(file_name, file_bytes): + upload(file_name, compose(compress, encode)(file_bytes)) + + return inner diff --git a/pyinfra/storage/adapters/__init__.py b/pyinfra/storage/storages/__init__.py similarity index 100% rename from pyinfra/storage/adapters/__init__.py rename to pyinfra/storage/storages/__init__.py diff --git a/pyinfra/storage/adapters/azure.py b/pyinfra/storage/storages/azure.py similarity index 94% rename from pyinfra/storage/adapters/azure.py rename to pyinfra/storage/storages/azure.py index a23334c..aaa9ba2 100644 --- a/pyinfra/storage/adapters/azure.py +++ b/pyinfra/storage/storages/azure.py @@ -6,6 +6,7 @@ from azure.storage.blob import BlobServiceClient, ContainerClient from retry import retry from pyinfra.config import Config, get_config +from pyinfra.storage.storages.interface import Storage CONFIG = get_config() logger = logging.getLogger(CONFIG.logging_level_root) @@ -14,7 +15,7 @@ logging.getLogger("azure").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) -class AzureStorageAdapter(object): +class AzureStorage(Storage): def __init__(self, client: BlobServiceClient): self._client: BlobServiceClient = client @@ -77,4 +78,4 @@ class AzureStorageAdapter(object): def get_azure_storage(config: Config): - return AzureStorageAdapter(BlobServiceClient.from_connection_string(conn_str=config.storage_azureconnectionstring)) + return AzureStorage(BlobServiceClient.from_connection_string(conn_str=config.storage_azureconnectionstring)) diff --git a/pyinfra/storage/storages/interface.py b/pyinfra/storage/storages/interface.py new file mode 100644 index 0000000..f5530d6 --- /dev/null +++ b/pyinfra/storage/storages/interface.py @@ -0,0 +1,35 @@ +from abc import ABC, abstractmethod + + +class Storage(ABC): + @abstractmethod + def make_bucket(self, bucket_name): + raise NotImplementedError + + @abstractmethod + def has_bucket(self, bucket_name): + raise NotImplementedError + + @abstractmethod + def put_object(self, bucket_name, object_name, data): + raise NotImplementedError + + @abstractmethod + def exists(self, bucket_name, object_name): + raise NotImplementedError + + @abstractmethod + def get_object(self, bucket_name, object_name): + raise NotImplementedError + + @abstractmethod + def get_all_objects(self, bucket_name): + raise NotImplementedError + + @abstractmethod + def clear_bucket(self, bucket_name): + raise NotImplementedError + + @abstractmethod + def get_all_object_names(self, bucket_name): + raise NotImplementedError diff --git a/pyinfra/storage/adapters/s3.py b/pyinfra/storage/storages/s3.py similarity index 67% rename from pyinfra/storage/adapters/s3.py rename to pyinfra/storage/storages/s3.py index df33542..5763353 100644 --- a/pyinfra/storage/adapters/s3.py +++ b/pyinfra/storage/storages/s3.py @@ -1,40 +1,19 @@ import io import logging -import re from itertools import repeat from operator import attrgetter -from urllib.parse import urlparse from minio import Minio from retry import retry from pyinfra.config import Config, get_config +from pyinfra.storage.storages.interface import Storage CONFIG = get_config() logger = logging.getLogger(CONFIG.logging_level_root) -ALLOWED_CONNECTION_SCHEMES = {"http", "https"} -URL_VALIDATOR = re.compile( - r"^((" - + r"([A-Za-z]{3,9}:(?:\/\/)?)" - + r"(?:[\-;:&=\+\$,\w]+@)?" - + r"[A-Za-z0-9\.\-]+|(?:www\.|[\-;:&=\+\$,\w]+@)" - + r"[A-Za-z0-9\.\-]+)" - + r"((?:\/[\+~%\/\.\w\-_]*)?" - + r"\??(?:[\-\+=&;%@\.\w_]*)#?(?:[\.\!\/\\\w]*))?)" -) -# URL_VALIDATOR = re.compile( -# r"""^((([A-Za-z]{3,9}:(?:\/\/)?) -# (?:[\-;:&=\+\$,\w]+@)? -# [A-Za-z0-9\.\-]+|(?:www\.|[\-;:&=\+\$,\w]+@) -# [A-Za-z0-9\.\-]+) -# ((?:\/[\+~%\/\.\w\-_]*)? -# \??(?:[\-\+=&;%@\.\w_]*) -# \#?(?:[\.\!\/\\\w]*))?)""" -# ) - -class S3StorageAdapter(object): +class S3Storage(Storage): def __init__(self, client: Minio): self._client = client @@ -88,21 +67,13 @@ class S3StorageAdapter(object): return zip(repeat(bucket_name), map(attrgetter("object_name"), objs)) -def _parse_endpoint(endpoint): - parsed_url = urlparse(endpoint) - if URL_VALIDATOR.match(endpoint) and parsed_url.netloc and parsed_url.scheme in ALLOWED_CONNECTION_SCHEMES: - return {"secure": parsed_url.scheme == "https", "endpoint": parsed_url.netloc} - else: - raise Exception(f"The configured storage endpoint is not a valid url: {endpoint}") - - def get_s3_storage(config: Config): - return S3StorageAdapter( + return S3Storage( Minio( - **_parse_endpoint(config.storage_endpoint), + secure=config.storage_secure_connection, + endpoint=config.storage_endpoint, access_key=config.storage_key, secret_key=config.storage_secret, - # This is relevant for running on s3 region=config.storage_region, ) ) diff --git a/pyinfra/utils/compressing.py b/pyinfra/utils/compressing.py new file mode 100644 index 0000000..df23f69 --- /dev/null +++ b/pyinfra/utils/compressing.py @@ -0,0 +1,22 @@ +import gzip +from typing import Union, Callable + +from funcy import identity + + +def get_decompressor(compression_type: Union[str, None]) -> Callable: + if not compression_type: + return identity + elif "gz" in compression_type: + return gzip.decompress + else: + raise ValueError(f"{compression_type=} is not supported.") + + +def get_compressor(compression_type: str) -> Callable: + if not compression_type: + return identity + elif "gz" in compression_type: + return gzip.compress + else: + raise ValueError(f"{compression_type=} is not supported.") diff --git a/pyinfra/utils/encoding.py b/pyinfra/utils/encoding.py new file mode 100644 index 0000000..8cbaa51 --- /dev/null +++ b/pyinfra/utils/encoding.py @@ -0,0 +1,28 @@ +import json +from typing import Callable + +from funcy import identity + + +def decode_json(data: bytes) -> dict: + return json.loads(data.decode("utf-8")) + + +def encode_json(data: dict) -> bytes: + return json.dumps(data).encode("utf-8") + + +def get_decoder(file_type: str) -> Callable: + if "json" in file_type: + return decode_json + elif "pdf" in file_type: + return identity + else: + raise ValueError(f"{file_type=} is not supported.") + + +def get_encoder(file_type: str) -> Callable: + if "json" in file_type: + return encode_json + else: + raise ValueError(f"{file_type=} is not supported.") diff --git a/pyinfra/utils/url_parsing.py b/pyinfra/utils/url_parsing.py new file mode 100644 index 0000000..f06def4 --- /dev/null +++ b/pyinfra/utils/url_parsing.py @@ -0,0 +1,40 @@ +import re +from operator import truth +from typing import Tuple +from urllib.parse import urlparse + + +def make_url_validator(allowed_connection_schemes: tuple = ("http", "https")): + pattern = re.compile( + r"^((" + + r"([A-Za-z]{3,9}:(?:\/\/)?)" + + r"(?:[\-;:&=\+\$,\w]+@)?" + + r"[A-Za-z0-9\.\-]+|(?:www\.|[\-;:&=\+\$,\w]+@)" + + r"[A-Za-z0-9\.\-]+)" + + r"((?:\/[\+~%\/\.\w\-_]*)?" + + r"\??(?:[\-\+=&;%@\.\w_]*)#?(?:[\.\!\/\\\w]*))?)" + ) + + def inner(url: str): + url_is_valid = pattern.match(url) + + parsed_url = urlparse(url) + endpoint_is_valid = truth(parsed_url.netloc) + protocol_is_valid = parsed_url.scheme in allowed_connection_schemes + + return url_is_valid and endpoint_is_valid and protocol_is_valid + + return inner + + +def validate_and_parse_s3_endpoint(endpoint: str) -> Tuple[bool, str]: + validate_url = make_url_validator() + + if not validate_url(endpoint): + raise Exception(f"The s3 storage endpoint is not a valid url: {endpoint}") + + parsed_url = urlparse(endpoint) + connection_is_secure = parsed_url.scheme == "https" + storage_endpoint = parsed_url.netloc + + return connection_is_secure, storage_endpoint diff --git a/pyproject.toml b/pyproject.toml index 54dd25e..4be65a7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,12 +16,16 @@ azure-storage-blob = "12.9.0" testcontainers = "3.4.2" docker-compose = "1.29.2" funcy = "1.17" +prometheus-client = "^0.16.0" +pymupdf = "^1.21.1" [tool.poetry.group.dev.dependencies] pytest = "^7.1.3" ipykernel = "^6.16.0" black = {version = "^23.1a1", allow-prereleases = true} pylint = "^2.15.10" +coverage = "^7.2.0" +requests = "^2.28.2" [tool.pytest.ini_options] minversion = "6.0" diff --git a/scripts/mock_process_request.py b/scripts/mock_process_request.py deleted file mode 100644 index a4a3f1a..0000000 --- a/scripts/mock_process_request.py +++ /dev/null @@ -1,100 +0,0 @@ -import gzip -import json -import logging -from operator import itemgetter - -import pika - -from pyinfra.config import get_config -from pyinfra.storage.adapters.s3 import get_s3_storage - -CONFIG = get_config() -logging.basicConfig() -logger = logging.getLogger() -logger.setLevel(logging.INFO) - - -def read_connection_params(): - credentials = pika.PlainCredentials(CONFIG.rabbitmq_username, CONFIG.rabbitmq_password) - parameters = pika.ConnectionParameters( - host=CONFIG.rabbitmq_host, - port=CONFIG.rabbitmq_port, - heartbeat=int(CONFIG.rabbitmq_heartbeat), - credentials=credentials, - ) - return parameters - - -def make_channel(connection) -> pika.adapters.blocking_connection.BlockingChannel: - channel = connection.channel() - channel.basic_qos(prefetch_count=1) - return channel - - -def declare_queue(channel, queue: str): - args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": CONFIG.dead_letter_queue} - return channel.queue_declare(queue=queue, auto_delete=False, durable=True, arguments=args) - - -def make_connection() -> pika.BlockingConnection: - parameters = read_connection_params() - connection = pika.BlockingConnection(parameters) - return connection - - -def upload_and_make_message_body(): - bucket = CONFIG.storage_bucket - dossier_id, file_id, suffix = "dossier", "file", "json.gz" - content = {"key": "value"} - object_name = f"{dossier_id}/{file_id}.{suffix}" - data = gzip.compress(json.dumps(content).encode("utf-8")) - - storage = get_s3_storage(CONFIG) - if not storage.has_bucket(bucket): - storage.make_bucket(bucket) - storage.put_object(bucket, object_name, data) - - message_body = { - "dossierId": dossier_id, - "fileId": file_id, - "targetFileExtension": suffix, - "responseFileExtension": f"result.{suffix}", - } - return message_body - - -def main(): - connection = make_connection() - channel = make_channel(connection) - declare_queue(channel, CONFIG.request_queue) - declare_queue(channel, CONFIG.response_queue) - - message = upload_and_make_message_body() - message_encoded = json.dumps(message).encode("utf-8") - channel.basic_publish( - "", - CONFIG.request_queue, - # properties=pika.BasicProperties(headers=None), - properties=pika.BasicProperties(headers={"x-tenant-id": "redaction"}), - body=message_encoded, - ) - logger.info(f"Put {message} on {CONFIG.request_queue}") - - storage = get_s3_storage(CONFIG) - for method_frame, properties, body in channel.consume(queue=CONFIG.response_queue, inactivity_timeout=10): - if not body: - break - response = json.loads(body) - logger.info(f"Received {response}") - logger.info(f"Message headers: {properties.headers}") - channel.basic_ack(method_frame.delivery_tag) - dossier_id, file_id = itemgetter("dossierId", "fileId")(response) - suffix = message["responseFileExtension"] - result = storage.get_object(CONFIG.storage_bucket, f"{dossier_id}/{file_id}.{suffix}") - result = json.loads(gzip.decompress(result)) - logger.info(f"Contents of result on storage: {result}") - channel.close() - - -if __name__ == "__main__": - main() diff --git a/scripts/send_request.py b/scripts/send_request.py new file mode 100644 index 0000000..2b9c4b7 --- /dev/null +++ b/scripts/send_request.py @@ -0,0 +1,72 @@ +import gzip +import json +import logging +from operator import itemgetter + +import pika + +from pyinfra.config import get_config +from pyinfra.queue.development_queue_manager import DevelopmentQueueManager +from pyinfra.storage.storages.s3 import get_s3_storage + +CONFIG = get_config() +logging.basicConfig() +logger = logging.getLogger() +logger.setLevel(logging.INFO) + + +def upload_json_and_make_message_body(): + bucket = CONFIG.storage_bucket + dossier_id, file_id, suffix = "dossier", "file", "json.gz" + content = { + "numberOfPages": 7, + "sectionTexts": "data", + } + + object_name = f"{dossier_id}/{file_id}.{suffix}" + data = gzip.compress(json.dumps(content).encode("utf-8")) + + storage = get_s3_storage(CONFIG) + if not storage.has_bucket(bucket): + storage.make_bucket(bucket) + storage.put_object(bucket, object_name, data) + + message_body = { + "dossierId": dossier_id, + "fileId": file_id, + "targetFileExtension": suffix, + "responseFileExtension": f"result.{suffix}", + } + return message_body + + +def main(): + development_queue_manager = DevelopmentQueueManager(CONFIG) + development_queue_manager.clear_queues() + + message = upload_json_and_make_message_body() + + development_queue_manager.publish_request(message, pika.BasicProperties(headers={"x-tenant-id": "redaction"})) + logger.info(f"Put {message} on {CONFIG.request_queue}") + + storage = get_s3_storage(CONFIG) + for method_frame, properties, body in development_queue_manager._channel.consume( + queue=CONFIG.response_queue, inactivity_timeout=15 + ): + if not body: + break + response = json.loads(body) + logger.info(f"Received {response}") + logger.info(f"Message headers: {properties.headers}") + development_queue_manager._channel.basic_ack(method_frame.delivery_tag) + dossier_id, file_id = itemgetter("dossierId", "fileId")(response) + suffix = message["responseFileExtension"] + print(f"{dossier_id}/{file_id}.{suffix}") + result = storage.get_object(CONFIG.storage_bucket, f"{dossier_id}/{file_id}.{suffix}") + result = json.loads(gzip.decompress(result)) + logger.info(f"Contents of result on storage: {result}") + development_queue_manager.close_channel() + + +if __name__ == "__main__": + main() diff --git a/scripts/start_pyinfra.py b/scripts/start_pyinfra.py index 9c151d2..31b3a3a 100644 --- a/scripts/start_pyinfra.py +++ b/scripts/start_pyinfra.py @@ -1,53 +1,24 @@ -import gzip -import json import logging -from typing import Callable +import time from pyinfra.config import get_config +from pyinfra.payload_processing.processor import make_payload_processor from pyinfra.queue.queue_manager import QueueManager -from pyinfra.storage import get_storage logging.basicConfig() logger = logging.getLogger() logger.setLevel(logging.INFO) -def make_callback(processor: Callable, config=get_config()): - bucket = config.storage_bucket - storage = get_storage(config) - - def callback(request_message): - dossier_id = request_message["dossierId"] - file_id = request_message["fileId"] - logger.info(f"Processing {dossier_id=} {file_id=} ...") - target_file_name = f"{dossier_id}/{file_id}.{request_message['targetFileExtension']}" - response_file_name = f"{dossier_id}/{file_id}.{request_message['responseFileExtension']}" - - if not storage.exists(bucket, target_file_name): - logger.warning(f"{target_file_name=} not present in {bucket=}, cancelling processing...") - return None - - object_bytes = storage.get_object(bucket, target_file_name) - object_bytes = gzip.decompress(object_bytes) - result_body = list(processor(object_bytes)) - - result = {**request_message, "data": result_body} - storage_bytes = gzip.compress(json.dumps(result).encode("utf-8")) - storage.put_object(bucket, response_file_name, storage_bytes) - - return {"dossierId": dossier_id, "fileId": file_id} - - return callback - - -def process(body): - return [{"response_key": "response_value"}] +def json_processor_mock(data: dict): + time.sleep(5) + return [{"result1": "result1"}, {"result2": "result2"}] def main(): logger.info("Start consuming...") queue_manager = QueueManager(get_config()) - queue_manager.start_consuming(make_callback(process)) + queue_manager.start_consuming(make_payload_processor(json_processor_mock)) if __name__ == "__main__": diff --git a/scripts/usage_pyinfra_dev_scripts.md b/scripts/usage_pyinfra_dev_scripts.md deleted file mode 100644 index 6d383d5..0000000 --- a/scripts/usage_pyinfra_dev_scripts.md +++ /dev/null @@ -1,18 +0,0 @@ -# Scripts Usage - -## Run pyinfra locally - -**Shell 1**: Start minio and rabbitmq containers -```bash -$ cd scripts && docker-compose up -``` - -**Shell 2**: Start pyinfra with callback mock -```bash -$ python scripts/start_pyinfra.py -``` - -**Shell 3**: Upload dummy content on storage and publish message -```bash -$ python scripts/mock_process_request.py -``` diff --git a/tests/config.yml b/tests/config.yml deleted file mode 100644 index 2a14dcb..0000000 --- a/tests/config.yml +++ /dev/null @@ -1,79 +0,0 @@ -service: - response_formatter: identity - operations: - upper: - input: - subdir: "" - extension: up_in.gz - multi: False - output: - subdir: "" - extension: up_out.gz - extract: - input: - subdir: "" - extension: extr_in.gz - multi: False - output: - subdir: "extractions" - extension: gz - rotate: - input: - subdir: "" - extension: rot_in.gz - multi: False - output: - subdir: "" - extension: rot_out.gz - classify: - input: - subdir: "" - extension: cls_in.gz - multi: True - output: - subdir: "" - extension: cls_out.gz - stream_pages: - input: - subdir: "" - extension: pgs_in.gz - multi: False - output: - subdir: "pages" - extension: pgs_out.gz - default: - input: - subdir: "" - extension: IN.gz - multi: False - output: - subdir: "" - extension: OUT.gz - -storage: - minio: - endpoint: "http://127.0.0.1:9000" - access_key: root - secret_key: password - region: null - - aws: - endpoint: https://s3.amazonaws.com - access_key: AKIA4QVP6D4LCDAGYGN2 - secret_key: 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 - region: $STORAGE_REGION|"eu-west-1" - - azure: - connection_string: "DefaultEndpointsProtocol=https;AccountName=iqserdevelopment;AccountKey=4imAbV9PYXaztSOMpIyAClg88bAZCXuXMGJG0GA1eIBpdh2PlnFGoRBnKqLy2YZUSTmZ3wJfC7tzfHtuC6FEhQ==;EndpointSuffix=core.windows.net" - - bucket: "pyinfra-test-bucket" - -webserver: - host: $SERVER_HOST|"127.0.0.1" # webserver address - port: $SERVER_PORT|5000 # webserver port - mode: $SERVER_MODE|production # webserver mode: {development, production} - -mock_analysis_endpoint: "http://127.0.0.1:5000" - -use_docker_fixture: 1 -logging: 0 \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py index 247b7ba..7b3fc33 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,19 +1,95 @@ -import pytest -from pyinfra.config import get_config, Config -import os +import logging +import time +from pathlib import Path -@pytest.fixture(params=["aws", "azure"]) -def storage_config(request) -> Config: - if request.param == "aws": - os.environ["STORAGE_BACKEND"] = "s3" - os.environ["STORAGE_BUCKET_NAME"] = "pyinfra-test-bucket" - os.environ["STORAGE_ENDPOINT"] = "https://s3.amazonaws.com" - os.environ["STORAGE_KEY"] = "AKIA4QVP6D4LCDAGYGN2" - os.environ["STORAGE_SECRET"] = "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" - os.environ["STORAGE_REGION"] = "eu-west-1" - else: - os.environ["STORAGE_BACKEND"] = "azure" - os.environ["STORAGE_AZURECONTAINERNAME"] = "pyinfra-test-bucket" - os.environ["STORAGE_AZURECONNECTIONSTRING"] = "DefaultEndpointsProtocol=https;AccountName=iqserdevelopment;AccountKey=4imAbV9PYXaztSOMpIyAClg88bAZCXuXMGJG0GA1eIBpdh2PlnFGoRBnKqLy2YZUSTmZ3wJfC7tzfHtuC6FEhQ==;EndpointSuffix=core.windows.net" - - return get_config() +import pytest +import testcontainers.compose + +from pyinfra.config import get_config +from pyinfra.queue.queue_manager import QueueManager +from pyinfra.storage import get_storage + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + +TESTS_DIR = Path(__file__).resolve().parents[0] + + +@pytest.fixture(scope="session", autouse=True) +def docker_compose(sleep_seconds=30): + """Note: `autouse` can be set to `False` while working on the code to speed up the testing. In that case, run + `docker-compose up` in the tests directory manually before running the tests. + """ + logger.info(f"Starting docker containers with {TESTS_DIR}/docker-compose.yml...") + compose = testcontainers.compose.DockerCompose(TESTS_DIR, compose_file_name="docker-compose.yml") + compose.start() + logger.info(f"Sleeping for {sleep_seconds} seconds to wait for containers to finish startup... ") + time.sleep(sleep_seconds) + yield compose + compose.stop() + + +@pytest.fixture(scope="session") +def storage_config(client_name): + config = get_config() + config.storage_backend = client_name + config.storage_azureconnectionstring = "DefaultEndpointsProtocol=https;AccountName=iqserdevelopment;AccountKey=4imAbV9PYXaztSOMpIyAClg88bAZCXuXMGJG0GA1eIBpdh2PlnFGoRBnKqLy2YZUSTmZ3wJfC7tzfHtuC6FEhQ==;EndpointSuffix=core.windows.net" + return config + + +@pytest.fixture(scope="session") +def processing_config(storage_config, monitoring_enabled): + storage_config.monitoring_enabled = monitoring_enabled + return storage_config + + +@pytest.fixture(scope="session") +def bucket_name(storage_config): + return storage_config.storage_bucket + + +@pytest.fixture(scope="session") +def storage(storage_config): + logger.debug("Setup for storage") + storage = get_storage(storage_config) + storage.make_bucket(storage_config.storage_bucket) + storage.clear_bucket(storage_config.storage_bucket) + yield storage + logger.debug("Teardown for storage") + try: + storage.clear_bucket(storage_config.storage_bucket) + except: + pass + + +@pytest.fixture(scope="session") +def queue_config(payload_processor_type): + config = get_config() + # FIXME: It looks like rabbitmq_heartbeat has to be greater than rabbitmq_connection_sleep. If this is expected, the + # user should not be abele to insert non working values. + config.rabbitmq_heartbeat = config.rabbitmq_connection_sleep + 1 + return config + + +@pytest.fixture(scope="session") +def queue_manager(queue_config): + queue_manager = QueueManager(queue_config) + return queue_manager + + +@pytest.fixture +def request_payload(): + return { + "dossierId": "test", + "fileId": "test", + "targetFileExtension": "json.gz", + "responseFileExtension": "json.gz", + } + + +@pytest.fixture(scope="session") +def response_payload(): + return { + "dossierId": "test", + "fileId": "test", + } diff --git a/scripts/docker-compose.yml b/tests/docker-compose.yml similarity index 100% rename from scripts/docker-compose.yml rename to tests/docker-compose.yml diff --git a/tests/processing_test.py b/tests/processing_test.py new file mode 100644 index 0000000..ec83a40 --- /dev/null +++ b/tests/processing_test.py @@ -0,0 +1,69 @@ +import gzip +import json +from operator import itemgetter + +import pytest + +from pyinfra.payload_processing.processor import make_payload_processor + + +@pytest.fixture(scope="session") +def file_processor_mock(): + def inner(json_file: dict): + return [json_file] + + return inner + + +@pytest.fixture +def target_file(): + contents = {"numberOfPages": 10, "content1": "value1", "content2": "value2"} + return gzip.compress(json.dumps(contents).encode("utf-8")) + + +@pytest.fixture +def file_names(request_payload): + dossier_id, file_id, target_suffix, response_suffix = itemgetter( + "dossierId", + "fileId", + "targetFileExtension", + "responseFileExtension", + )(request_payload) + return f"{dossier_id}/{file_id}.{target_suffix}", f"{dossier_id}/{file_id}.{response_suffix}" + + +@pytest.fixture(scope="session") +def payload_processor(file_processor_mock, processing_config): + yield make_payload_processor(file_processor_mock, processing_config) + + +@pytest.mark.parametrize("client_name", ["s3"], scope="session") +@pytest.mark.parametrize("monitoring_enabled", [True, False], scope="session") +class TestPayloadProcessor: + def test_payload_processor_yields_correct_response_and_uploads_result( + self, + payload_processor, + storage, + bucket_name, + request_payload, + response_payload, + target_file, + file_names, + ): + storage.clear_bucket(bucket_name) + storage.put_object(bucket_name, file_names[0], target_file) + response = payload_processor(request_payload) + + assert response == response_payload + + data_received = storage.get_object(bucket_name, file_names[1]) + + assert json.loads((gzip.decompress(data_received)).decode("utf-8")) == { + **request_payload, + "data": [json.loads(gzip.decompress(target_file).decode("utf-8"))], + } + + def test_catching_of_processing_failure(self, payload_processor, storage, bucket_name, request_payload): + storage.clear_bucket(bucket_name) + with pytest.raises(Exception): + payload_processor(request_payload) diff --git a/tests/queue_test.py b/tests/queue_test.py new file mode 100644 index 0000000..8816e20 --- /dev/null +++ b/tests/queue_test.py @@ -0,0 +1,120 @@ +import json +import logging +import time +from multiprocessing import Process + +import pika +import pika.exceptions +import pytest + +from pyinfra.queue.development_queue_manager import DevelopmentQueueManager + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + + +@pytest.fixture(scope="session") +def development_queue_manager(queue_config): + queue_config.rabbitmq_heartbeat = 7200 + development_queue_manager = DevelopmentQueueManager(queue_config) + yield development_queue_manager + logger.info("Tearing down development queue manager...") + try: + development_queue_manager.close_channel() + except pika.exceptions.ConnectionClosedByBroker: + pass + + +@pytest.fixture(scope="session") +def payload_processing_time(queue_config, offset=5): + # FIXME: this implicitly tests the heartbeat when running the end-to-end test. There should be another way to test + # this explicitly. + return queue_config.rabbitmq_heartbeat + offset + + +@pytest.fixture(scope="session") +def payload_processor(response_payload, payload_processing_time, payload_processor_type): + def process(payload): + time.sleep(payload_processing_time) + return response_payload + + def process_with_failure(payload): + raise MemoryError + + if payload_processor_type == "mock": + return process + elif payload_processor_type == "failing": + return process_with_failure + + +@pytest.fixture(scope="session", autouse=True) +def start_queue_consumer(queue_manager, payload_processor, sleep_seconds=5): + def consume_queue(): + queue_manager.start_consuming(payload_processor) + + p = Process(target=consume_queue) + p.start() + logger.info(f"Setting up consumer, waiting for {sleep_seconds}...") + time.sleep(sleep_seconds) + yield + logger.info("Tearing down consumer...") + p.terminate() + + +@pytest.fixture +def message_properties(message_headers): + if not message_headers: + return pika.BasicProperties(headers=None) + elif message_headers == "x-tenant-id": + return pika.BasicProperties(headers={"x-tenant-id": "redaction"}) + else: + raise Exception(f"Invalid {message_headers=}.") + + +class TestQueueManager: + # FIXME: All tests here are wonky. This is due to the implementation of running the process-blocking queue_manager + # in a subprocess. It is then very hard to interact directly with the subprocess. If you have a better idea, please + # refactor; the tests here are insufficient to ensure the functionality of the queue manager! + @pytest.mark.parametrize("payload_processor_type", ["mock"], scope="session") + def test_message_processing_does_not_block_heartbeat( + self, development_queue_manager, request_payload, response_payload, payload_processing_time + ): + development_queue_manager.clear_queues() + development_queue_manager.publish_request(request_payload) + time.sleep(payload_processing_time + 10) + _, _, body = development_queue_manager.get_response() + result = json.loads(body) + assert result == response_payload + + @pytest.mark.parametrize("message_headers", [None, "x-tenant-id"]) + @pytest.mark.parametrize("payload_processor_type", ["mock"], scope="session") + def test_queue_manager_forwards_message_headers( + self, + development_queue_manager, + request_payload, + response_payload, + payload_processing_time, + message_properties, + ): + development_queue_manager.clear_queues() + development_queue_manager.publish_request(request_payload, message_properties) + time.sleep(payload_processing_time + 10) + _, properties, _ = development_queue_manager.get_response() + assert properties.headers == message_properties.headers + + # FIXME: It is not possible to test the behavior of the queue manager directly, since it is running in a separate + # process. You require logging to see if the exception is handled correctly. Hence, this test is only useful for + # development, but insufficient to guarantee the correct behavior. + @pytest.mark.parametrize("payload_processor_type", ["failing"], scope="session") + def test_failed_message_processing_is_handled( + self, + development_queue_manager, + request_payload, + response_payload, + payload_processing_time, + ): + development_queue_manager.clear_queues() + development_queue_manager.publish_request(request_payload) + time.sleep(payload_processing_time + 10) + _, _, body = development_queue_manager.get_response() + assert not body diff --git a/tests/storage_test.py b/tests/storage_test.py new file mode 100644 index 0000000..9d1635d --- /dev/null +++ b/tests/storage_test.py @@ -0,0 +1,55 @@ +import logging + +import pytest + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + + +@pytest.mark.parametrize("client_name", ["azure", "s3"], scope="session") +class TestStorage: + def test_clearing_bucket_yields_empty_bucket(self, storage, bucket_name): + storage.clear_bucket(bucket_name) + data_received = storage.get_all_objects(bucket_name) + assert not {*data_received} + + def test_getting_object_put_in_bucket_is_object(self, storage, bucket_name): + storage.clear_bucket(bucket_name) + storage.put_object(bucket_name, "file", b"content") + data_received = storage.get_object(bucket_name, "file") + assert b"content" == data_received + + def test_object_put_in_bucket_exists_on_storage(self, storage, bucket_name): + storage.clear_bucket(bucket_name) + storage.put_object(bucket_name, "file", b"content") + assert storage.exists(bucket_name, "file") + + def test_getting_nested_object_put_in_bucket_is_nested_object(self, storage, bucket_name): + storage.clear_bucket(bucket_name) + storage.put_object(bucket_name, "folder/file", b"content") + data_received = storage.get_object(bucket_name, "folder/file") + assert b"content" == data_received + + def test_getting_objects_put_in_bucket_are_objects(self, storage, bucket_name): + storage.clear_bucket(bucket_name) + storage.put_object(bucket_name, "file1", b"content 1") + storage.put_object(bucket_name, "folder/file2", b"content 2") + data_received = storage.get_all_objects(bucket_name) + assert {b"content 1", b"content 2"} == {*data_received} + + def test_make_bucket_produces_bucket(self, storage, bucket_name): + storage.clear_bucket(bucket_name) + storage.make_bucket(bucket_name) + assert storage.has_bucket(bucket_name) + + def test_listing_bucket_files_yields_all_files_in_bucket(self, storage, bucket_name): + storage.clear_bucket(bucket_name) + storage.put_object(bucket_name, "file1", b"content 1") + storage.put_object(bucket_name, "file2", b"content 2") + full_names_received = storage.get_all_object_names(bucket_name) + assert {(bucket_name, "file1"), (bucket_name, "file2")} == {*full_names_received} + + def test_data_loading_failure_raised_if_object_not_present(self, storage, bucket_name): + storage.clear_bucket(bucket_name) + with pytest.raises(Exception): + storage.get_object(bucket_name, "folder/file") diff --git a/tests/test_storage.py b/tests/test_storage.py deleted file mode 100644 index 830f9df..0000000 --- a/tests/test_storage.py +++ /dev/null @@ -1,5 +0,0 @@ -from pyinfra.storage import get_storage - -def test_storage(storage_config) -> None: - storage = get_storage(storage_config) - assert storage.has_bucket(storage_config.storage_bucket)