Pull request #63: RED-6366 refactor

Merge in RR/pyinfra from RED-6366-refactor to master

Squashed commit of the following:

commit 8807cda514b5cc24b1be208173283275d87dcb97
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Fri Mar 10 13:15:15 2023 +0100

    enable docker-compose autouse for automatic tests

commit c4579581d3e9a885ef387ee97f3f3a5cf4731193
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Fri Mar 10 12:35:49 2023 +0100

    black

commit ac2b754c5624ef37ce310fce7196c9ea11bbca03
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Fri Mar 10 12:30:23 2023 +0100

    refactor storage url parsing

    - move parsing and validation to config where the connection url is
    actually read in
    - improve readability of parsing fn

commit 371802cc10b6d946c4939ff6839571002a2cb9f4
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Fri Mar 10 10:48:00 2023 +0100

    refactor

commit e8c381c29deebf663e665920752c2965d7abce16
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Fri Mar 10 09:57:34 2023 +0100

    rename

commit c8628a509316a651960dfa806d5fe6aacb7a91c1
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Fri Mar 10 09:37:01 2023 +0100

    renaming and refactoring

commit 4974d4f56fd73bc55bd76aa7a9bbb16babee19f4
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Fri Mar 10 08:53:09 2023 +0100

    refactor payload processor

    - limit make_uploader and make_downloader cache
    - partially apply them when the class is initialized with storage and
    bucket to make the logic and behaviour more comprehensive
    - renaming functional pipeline steps to be more expressive

commit f8d51bfcad2b815c8293ab27dd66b256255c5414
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Thu Mar 9 15:30:32 2023 +0100

    remove monitor and rename Payload

commit 412ddaa207a08aff1229d7acd5d95402ac8cd578
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Thu Mar 2 10:15:39 2023 +0100

    remove azure connection string and disable respective test for now for security reasons

commit 7922a2d9d325f3b9008ad4e3e56b241ba179f52c
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Wed Mar 1 13:30:58 2023 +0100

    make payload formatting function names more expressive

commit 7517e544b0f5a434579cc9bada3a37e7ac04059f
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Wed Mar 1 13:24:57 2023 +0100

    add some type hints

commit 095410d3009f2dcbd374680dd0f7b55de94c9e76
Author: Matthias Bisping <matthias.bisping@axbit.com>
Date:   Wed Mar 1 10:54:58 2023 +0100

    Refactoring

    - Renaming
    - Docstring adjustments

commit e992f0715fc2636eb13eb5ffc4de0bcc5d433fc8
Author: Matthias Bisping <matthias.bisping@axbit.com>
Date:   Wed Mar 1 09:43:26 2023 +0100

    Re-wording and typo fixes

commit 3c2d698f9bf980bc4b378a44dc20c2badc407b3e
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Tue Feb 28 14:59:59 2023 +0100

    enable auto startup for docker compose in tests

commit 55773b4fb0b624ca4745e5b8aeafa6f6a0ae6436
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Tue Feb 28 14:59:37 2023 +0100

    Extended tests for queue manager

commit 14f7f943f60b9bfb9fe77fa3cef99a1e7d094333
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Tue Feb 28 13:39:00 2023 +0100

    enable auto startup for docker compose in tests

commit 7caf354491c84c6e0b0e09ad4d41cb5dfbfdb225
Merge: 49d47ba d0277b8
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Tue Feb 28 13:32:52 2023 +0100

    Merge branch 'RED-6205-prometheus' of ssh://git.iqser.com:2222/rr/pyinfra into RED-6205-prometheus

commit 49d47baba8ccf11dee48a4c1cbddc3bbd12471e5
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Tue Feb 28 13:32:42 2023 +0100

    adjust Payload Processor signature

commit d0277b86bc54994b6032774bf0ec2d7b19d7f517
Merge: 5184a18 f6b35d6
Author: Christoph Schabert <christoph.schabert@iqser.com>
Date:   Tue Feb 28 11:07:16 2023 +0100

    Pull request #61: Change Sec Trigger to PR

    Merge in RR/pyinfra from cschabert/PlanSpecjava-1677578703647 to RED-6205-prometheus

    * commit 'f6b35d648c88ddbce1856445c3b887bce669265c':
      Change Sec Trigger to PR

commit f6b35d648c88ddbce1856445c3b887bce669265c
Author: Christoph Schabert <christoph.schabert@iqser.com>
Date:   Tue Feb 28 11:05:13 2023 +0100

    Change Sec Trigger to PR

... and 20 more commits
This commit is contained in:
Julius Unverfehrt 2023-03-13 15:11:25 +01:00
parent 46157031b5
commit 3c4739ad8b
32 changed files with 1028 additions and 376 deletions

View File

@ -1,33 +1,41 @@
# Infrastructure to deploy Research Projects
# PyInfra
1. [ About ](#about)
2. [ Configuration ](#configuration)
3. [ Response Format ](#response-format)
4. [ Usage & API ](#usage--api)
5. [ Scripts ](#scripts)
6. [ Tests ](#tests)
## About
Common Module with the infrastructure to deploy Research Projects.
The Infrastructure expects to be deployed in the same Pod / local environment as the analysis container and handles all outbound communication.
## Configuration
A configuration is located in `/config.yaml`. All relevant variables can be configured via exporting environment variables.
| Environment Variable | Default | Description |
| ----------------------------- | ------------------------------ | --------------------------------------------------------------------- |
| LOGGING_LEVEL_ROOT | DEBUG | Logging level for service logger |
| PROBING_WEBSERVER_HOST | "0.0.0.0" | Probe webserver address |
| PROBING_WEBSERVER_PORT | 8080 | Probe webserver port |
| PROBING_WEBSERVER_MODE | production | Webserver mode: {development, production} |
| RABBITMQ_HOST | localhost | RabbitMQ host address |
| RABBITMQ_PORT | 5672 | RabbitMQ host port |
| RABBITMQ_USERNAME | user | RabbitMQ username |
| RABBITMQ_PASSWORD | bitnami | RabbitMQ password |
| RABBITMQ_HEARTBEAT | 7200 | Controls AMQP heartbeat timeout in seconds |
| REQUEST_QUEUE | request_queue | Requests to service |
| RESPONSE_QUEUE | response_queue | Responses by service |
| DEAD_LETTER_QUEUE | dead_letter_queue | Messages that failed to process |
| ANALYSIS_ENDPOINT | "http://127.0.0.1:5000" | Endpoint for analysis container |
| STORAGE_BACKEND | s3 | The type of storage to use {s3, azure} |
| STORAGE_BUCKET | "redaction" | The bucket / container to pull files specified in queue requests from |
| STORAGE_ENDPOINT | "http://127.0.0.1:9000" | Endpoint for s3 storage |
| STORAGE_KEY | root | User for s3 storage |
| STORAGE_SECRET | password | Password for s3 storage |
| STORAGE_AZURECONNECTIONSTRING | "DefaultEndpointsProtocol=..." | Connection string for Azure storage |
| STORAGE_AZURECONTAINERNAME | "redaction" | AKS container |
| Environment Variable | Default | Description |
|-------------------------------|----------------------------------|--------------------------------------------------------------------------|
| LOGGING_LEVEL_ROOT | "DEBUG" | Logging level for service logger |
| RABBITMQ_HOST | "localhost" | RabbitMQ host address |
| RABBITMQ_PORT | "5672" | RabbitMQ host port |
| RABBITMQ_USERNAME | "user" | RabbitMQ username |
| RABBITMQ_PASSWORD | "bitnami" | RabbitMQ password |
| RABBITMQ_HEARTBEAT | 60 | Controls AMQP heartbeat timeout in seconds |
| RABBITMQ_CONNECTION_SLEEP | 5 | Controls AMQP connection sleep timer in seconds |
| REQUEST_QUEUE | "request_queue" | Requests to service |
| RESPONSE_QUEUE | "response_queue" | Responses by service |
| DEAD_LETTER_QUEUE | "dead_letter_queue" | Messages that failed to process |
| STORAGE_BACKEND | "s3" | The type of storage to use {s3, azure} |
| STORAGE_BUCKET | "redaction" | The bucket / container to pull files specified in queue requests from |
| STORAGE_ENDPOINT | "http://127.0.0.1:9000" | Endpoint for s3 storage |
| STORAGE_KEY | "root" | User for s3 storage |
| STORAGE_SECRET | "password" | Password for s3 storage |
| STORAGE_AZURECONNECTIONSTRING | "DefaultEndpointsProtocol=..." | Connection string for Azure storage |
| STORAGE_AZURECONTAINERNAME | "redaction" | AKS container |
| WRITE_CONSUMER_TOKEN | "False" | Value to see if we should write a consumer token to a file |
## Response Format
@ -49,11 +57,12 @@ Optionally, the input message can contain a field with the key `"operations"`.
```json
{
"dossierId": "",
"fileId": "",
...
"fileId": ""
}
```
## Usage & API
### Setup
Install project dependencies
@ -69,3 +78,40 @@ or install form another project
```bash
poetry add git+ssh://git@git.iqser.com:2222/rr/pyinfra.git#TAG-NUMBER
```
### API
```python
from pyinfra.config import get_config
from pyinfra.payload_processing.processor import make_payload_processor
from pyinfra.queue.queue_manager import QueueManager
queue_manager = QueueManager(get_config())
queue_manager.start_consuming(make_payload_processor(data_processor))
```
The data_processor should expect a dict or bytes (pdf) as input and should return a list of results.
## Scripts
### Run pyinfra locally
**Shell 1**: Start minio and rabbitmq containers
```bash
$ cd tests && docker-compose up
```
**Shell 2**: Start pyinfra with callback mock
```bash
$ python scripts/start_pyinfra.py
```
**Shell 3**: Upload dummy content on storage and publish message
```bash
$ python scripts/mock_process_request.py
```
## Tests
The tests take a bit longer than you are probably used to, because among other things the required startup times are
quite high. The test runtime can be accelerated by setting 'autouse' to 'False'. In that case, run 'docker-compose up'
in the tests dir manually before running the tests.

View File

@ -199,11 +199,14 @@ public class PlanSpec {
"/var/run/docker.sock"))))
.linkedRepositories(REPOSITORY_KEY + " / " + SERVICE_NAME)
.triggers(
new ScheduledTrigger()
.scheduleOnceDaily(LocalTime.of(23, 00)))
new BitbucketServerTrigger())
.planBranchManagement(
new PlanBranchManagement()
.createForVcsBranchMatching("release.*")
.createForVcsBranch()
.delete(
new BranchCleanup()
.whenInactiveInRepositoryAfterDays(
14))
.notificationForCommitters());
}

0
bamboo-specs/src/main/resources/scripts/config-keys.sh Normal file → Executable file
View File

121
poetry.lock generated
View File

@ -428,6 +428,70 @@ traitlets = ">=5.3"
[package.extras]
test = ["pytest"]
[[package]]
name = "coverage"
version = "7.2.0"
description = "Code coverage measurement for Python"
category = "dev"
optional = false
python-versions = ">=3.7"
files = [
{file = "coverage-7.2.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:90e7a4cbbb7b1916937d380beb1315b12957b8e895d7d9fb032e2038ac367525"},
{file = "coverage-7.2.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:34d7211be69b215ad92298a962b2cd5a4ef4b17c7871d85e15d3d1b6dc8d8c96"},
{file = "coverage-7.2.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:971b49dbf713044c3e5f6451b39f65615d4d1c1d9a19948fa0f41b0245a98765"},
{file = "coverage-7.2.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f0557289260125a6c453ad5673ba79e5b6841d9a20c9e101f758bfbedf928a77"},
{file = "coverage-7.2.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:049806ae2df69468c130f04f0fab4212c46b34ba5590296281423bb1ae379df2"},
{file = "coverage-7.2.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:875b03d92ac939fbfa8ae74a35b2c468fc4f070f613d5b1692f9980099a3a210"},
{file = "coverage-7.2.0-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:c160e34e388277f10c50dc2c7b5e78abe6d07357d9fe7fcb2f3c156713fd647e"},
{file = "coverage-7.2.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:32e6a730fd18b2556716039ab93278ccebbefa1af81e6aa0c8dba888cf659e6e"},
{file = "coverage-7.2.0-cp310-cp310-win32.whl", hash = "sha256:f3ff4205aff999164834792a3949f82435bc7c7655c849226d5836c3242d7451"},
{file = "coverage-7.2.0-cp310-cp310-win_amd64.whl", hash = "sha256:93db11da6e728587e943dff8ae1b739002311f035831b6ecdb15e308224a4247"},
{file = "coverage-7.2.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:cd38140b56538855d3d5722c6d1b752b35237e7ea3f360047ce57f3fade82d98"},
{file = "coverage-7.2.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:9dbb21561b0e04acabe62d2c274f02df0d715e8769485353ddf3cf84727e31ce"},
{file = "coverage-7.2.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:171dd3aa71a49274a7e4fc26f5bc167bfae5a4421a668bc074e21a0522a0af4b"},
{file = "coverage-7.2.0-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:4655ecd813f4ba44857af3e9cffd133ab409774e9d2a7d8fdaf4fdfd2941b789"},
{file = "coverage-7.2.0-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1856a8c4aa77eb7ca0d42c996d0ca395ecafae658c1432b9da4528c429f2575c"},
{file = "coverage-7.2.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:bd67df6b48db18c10790635060858e2ea4109601e84a1e9bfdd92e898dc7dc79"},
{file = "coverage-7.2.0-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:2d7daf3da9c7e0ed742b3e6b4de6cc464552e787b8a6449d16517b31bbdaddf5"},
{file = "coverage-7.2.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:bf9e02bc3dee792b9d145af30db8686f328e781bd212fdef499db5e9e4dd8377"},
{file = "coverage-7.2.0-cp311-cp311-win32.whl", hash = "sha256:3713a8ec18781fda408f0e853bf8c85963e2d3327c99a82a22e5c91baffcb934"},
{file = "coverage-7.2.0-cp311-cp311-win_amd64.whl", hash = "sha256:88ae5929f0ef668b582fd7cad09b5e7277f50f912183cf969b36e82a1c26e49a"},
{file = "coverage-7.2.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:5e29a64e9586194ea271048bc80c83cdd4587830110d1e07b109e6ff435e5dbc"},
{file = "coverage-7.2.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8d5302eb84c61e758c9d68b8a2f93a398b272073a046d07da83d77b0edc8d76b"},
{file = "coverage-7.2.0-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2c9fffbc39dc4a6277e1525cab06c161d11ee3995bbc97543dc74fcec33e045b"},
{file = "coverage-7.2.0-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a6ceeab5fca62bca072eba6865a12d881f281c74231d2990f8a398226e1a5d96"},
{file = "coverage-7.2.0-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:28563a35ef4a82b5bc5160a01853ce62b9fceee00760e583ffc8acf9e3413753"},
{file = "coverage-7.2.0-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:bfa065307667f1c6e1f4c3e13f415b0925e34e56441f5fda2c84110a4a1d8bda"},
{file = "coverage-7.2.0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:7f992b32286c86c38f07a8b5c3fc88384199e82434040a729ec06b067ee0d52c"},
{file = "coverage-7.2.0-cp37-cp37m-win32.whl", hash = "sha256:2c15bd09fd5009f3a79c8b3682b52973df29761030b692043f9834fc780947c4"},
{file = "coverage-7.2.0-cp37-cp37m-win_amd64.whl", hash = "sha256:f332d61fbff353e2ef0f3130a166f499c3fad3a196e7f7ae72076d41a6bfb259"},
{file = "coverage-7.2.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:577a8bc40c01ad88bb9ab1b3a1814f2f860ff5c5099827da2a3cafc5522dadea"},
{file = "coverage-7.2.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:9240a0335365c29c968131bdf624bb25a8a653a9c0d8c5dbfcabf80b59c1973c"},
{file = "coverage-7.2.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:358d3bce1468f298b19a3e35183bdb13c06cdda029643537a0cc37e55e74e8f1"},
{file = "coverage-7.2.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:932048364ff9c39030c6ba360c31bf4500036d4e15c02a2afc5a76e7623140d4"},
{file = "coverage-7.2.0-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7efa21611ffc91156e6f053997285c6fe88cfef3fb7533692d0692d2cb30c846"},
{file = "coverage-7.2.0-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:465ea431c3b78a87e32d7d9ea6d081a1003c43a442982375cf2c247a19971961"},
{file = "coverage-7.2.0-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:0f03c229f1453b936916f68a47b3dfb5e84e7ad48e160488168a5e35115320c8"},
{file = "coverage-7.2.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:40785553d68c61e61100262b73f665024fd2bb3c6f0f8e2cd5b13e10e4df027b"},
{file = "coverage-7.2.0-cp38-cp38-win32.whl", hash = "sha256:b09dd7bef59448c66e6b490cc3f3c25c14bc85d4e3c193b81a6204be8dd355de"},
{file = "coverage-7.2.0-cp38-cp38-win_amd64.whl", hash = "sha256:dc4f9a89c82faf6254d646180b2e3aa4daf5ff75bdb2c296b9f6a6cf547e26a7"},
{file = "coverage-7.2.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:c243b25051440386179591a8d5a5caff4484f92c980fb6e061b9559da7cc3f64"},
{file = "coverage-7.2.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:4b8fd32f85b256fc096deeb4872aeb8137474da0c0351236f93cbedc359353d6"},
{file = "coverage-7.2.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d7f2a7df523791e6a63b40360afa6792a11869651307031160dc10802df9a252"},
{file = "coverage-7.2.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:da32526326e8da0effb452dc32a21ffad282c485a85a02aeff2393156f69c1c3"},
{file = "coverage-7.2.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4c1153a6156715db9d6ae8283480ae67fb67452aa693a56d7dae9ffe8f7a80da"},
{file = "coverage-7.2.0-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:74cd60fa00f46f28bd40048d6ca26bd58e9bee61d2b0eb4ec18cea13493c003f"},
{file = "coverage-7.2.0-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:59a427f8a005aa7254074719441acb25ac2c2f60c1f1026d43f846d4254c1c2f"},
{file = "coverage-7.2.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:c3c4beddee01c8125a75cde3b71be273995e2e9ec08fbc260dd206b46bb99969"},
{file = "coverage-7.2.0-cp39-cp39-win32.whl", hash = "sha256:08e3dd256b8d3e07bb230896c8c96ec6c5dffbe5a133ba21f8be82b275b900e8"},
{file = "coverage-7.2.0-cp39-cp39-win_amd64.whl", hash = "sha256:ad12c74c6ce53a027f5a5ecbac9be20758a41c85425c1bbab7078441794b04ee"},
{file = "coverage-7.2.0-pp37.pp38.pp39-none-any.whl", hash = "sha256:ffa637a2d5883298449a5434b699b22ef98dd8e2ef8a1d9e60fa9cfe79813411"},
{file = "coverage-7.2.0.tar.gz", hash = "sha256:9cc9c41aa5af16d845b53287051340c363dd03b7ef408e45eec3af52be77810d"},
]
[package.extras]
toml = ["tomli"]
[[package]]
name = "cryptography"
version = "39.0.1"
@ -1199,6 +1263,21 @@ files = [
dev = ["pre-commit", "tox"]
testing = ["pytest", "pytest-benchmark"]
[[package]]
name = "prometheus-client"
version = "0.16.0"
description = "Python client for the Prometheus monitoring system."
category = "main"
optional = false
python-versions = ">=3.6"
files = [
{file = "prometheus_client-0.16.0-py3-none-any.whl", hash = "sha256:0836af6eb2c8f4fed712b2f279f6c0a8bbab29f9f4aa15276b91c7cb0d1616ab"},
{file = "prometheus_client-0.16.0.tar.gz", hash = "sha256:a03e35b359f14dd1630898543e2120addfdeacd1a6069c1367ae90fd93ad3f48"},
]
[package.extras]
twisted = ["twisted"]
[[package]]
name = "prompt-toolkit"
version = "3.0.36"
@ -1334,6 +1413,46 @@ typing-extensions = {version = ">=3.10.0", markers = "python_version < \"3.10\""
spelling = ["pyenchant (>=3.2,<4.0)"]
testutils = ["gitpython (>3)"]
[[package]]
name = "pymupdf"
version = "1.21.1"
description = "Python bindings for the PDF toolkit and renderer MuPDF"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
{file = "PyMuPDF-1.21.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e334e7baea701d4029faa034520c722c9c55e8315b7ccf4d1b686eba8e293f32"},
{file = "PyMuPDF-1.21.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:19fe49966f3a072a258cdfbab3d8c5b11cb7c2fb7c8e6abf3398d9e55af6f1ca"},
{file = "PyMuPDF-1.21.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:36181c4cb27740791d611d0224dd18ac78e23a1f06aa2151394c5b9194b4f885"},
{file = "PyMuPDF-1.21.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e5273ff0c3bf08428ef956c4a5e5e0b475667cc3e1cb7b41d9f5131636996e59"},
{file = "PyMuPDF-1.21.1-cp310-cp310-win32.whl", hash = "sha256:7e5a1ed49df5eee7834fb37adb5c94354e98b23fa997e67034b157790a31bbfc"},
{file = "PyMuPDF-1.21.1-cp310-cp310-win_amd64.whl", hash = "sha256:c040c9ac98b7a1afc06fe55ffedbf6a305c24bf1c97597838f208c68035971f4"},
{file = "PyMuPDF-1.21.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:ac2f61632bcd4c9532a26b52684ca05a4c8b7b96d6be947134b3e01a420904fd"},
{file = "PyMuPDF-1.21.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:eb29cbfd34d5ef99c657539c4f48953ea15f4b1e4e162b48efcffa5c4101ce1d"},
{file = "PyMuPDF-1.21.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:06860a8e60ba14f25aac5db90803d59b1a2fdac24c2b65dc6f9876ff36df586e"},
{file = "PyMuPDF-1.21.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ff7d01cb563c3ca18880ba6a2661351f07b8a54f6599272be2e3568524e5e721"},
{file = "PyMuPDF-1.21.1-cp311-cp311-win32.whl", hash = "sha256:1fc4c8aee186326b3f138be8a4ac16a343e76c8ec45b5afab2d105a5e3b02d80"},
{file = "PyMuPDF-1.21.1-cp311-cp311-win_amd64.whl", hash = "sha256:dab0459791175dea813e6130e08b9026d3b9d66854f0dbb7ab51ab1619875d24"},
{file = "PyMuPDF-1.21.1-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:2a870f02b2767e9371bbb3d0263c3994f59679a64b1b9dd1a6b0d9a15c963d68"},
{file = "PyMuPDF-1.21.1-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f99e6c2109c38c17cd5f1b82c9d35f1e815a71a3464b765f41f3c8cf875dcf4c"},
{file = "PyMuPDF-1.21.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bf204723782ca481cd120e1ab124b0d59fd387103e15675ae462415bfa22b3a2"},
{file = "PyMuPDF-1.21.1-cp37-cp37m-win32.whl", hash = "sha256:b2b197bb0f00159f4584f51c7bf4d897aa5fb88de3c1f964347047a710c1f1be"},
{file = "PyMuPDF-1.21.1-cp37-cp37m-win_amd64.whl", hash = "sha256:dddd3fe6fa20b3e2642c10e66c97cfd3727010d3dafe653f16dce52396ef0208"},
{file = "PyMuPDF-1.21.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:c09cbc7924ddf99452bb0d70438d64753815d75526d6d94293249f1665691d84"},
{file = "PyMuPDF-1.21.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:fdba6bf77d1d3cd57ea93181c494f4b138523000ff74b25c523be7dce0058ac9"},
{file = "PyMuPDF-1.21.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0b08b6afbb656f0e4582a8ee44c2ce0ab6235bb81830ec92d79f6e4893db639c"},
{file = "PyMuPDF-1.21.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0c85efbc08c83c8d91ae1cc7bb7e06c7cd7e203e9d0ed806ff22db173999f30c"},
{file = "PyMuPDF-1.21.1-cp38-cp38-win32.whl", hash = "sha256:fa0334247549def667d9f5358e558ec93815c78cebdb927a81dda38d681bf550"},
{file = "PyMuPDF-1.21.1-cp38-cp38-win_amd64.whl", hash = "sha256:d85c360abd72c14e302a2cefd02ebd01568f5c669700c7fc1ed056e9cdf3c285"},
{file = "PyMuPDF-1.21.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:f3ffa9a8c643da39aba80e787d2c1771d5be07b0e15bf193ddd1fda888bbca88"},
{file = "PyMuPDF-1.21.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:18fbdcb35c144b26a3a507bfa73f98401a27819f29dca496eb470b9a1ec09fad"},
{file = "PyMuPDF-1.21.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2bb5f9ce33054ebfbf63c41e68606d51e3ad5d85def0af5ebb6f0dd276e61037"},
{file = "PyMuPDF-1.21.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2fbb96336c5cc065f6e5359f034a689e4d64ca7697dc5965b03f105b2ecb7ba1"},
{file = "PyMuPDF-1.21.1-cp39-cp39-win32.whl", hash = "sha256:909afad284c24f25a831b37e433e444dc4cb5b3e38b3f761b1e4acad5c65327d"},
{file = "PyMuPDF-1.21.1-cp39-cp39-win_amd64.whl", hash = "sha256:069ba56c28cb9b603d016c63f23a21bf22a00c5b2a92286bf3dd6a2759e119d4"},
{file = "PyMuPDF-1.21.1.tar.gz", hash = "sha256:f815741a435c62a0036bbcbf5fa6c533567bd69c5338d413714fc57b22db93e0"},
]
[[package]]
name = "pynacl"
version = "1.5.0"
@ -1962,4 +2081,4 @@ testing = ["flake8 (<5)", "func-timeout", "jaraco.functools", "jaraco.itertools"
[metadata]
lock-version = "2.0"
python-versions = "~3.8"
content-hash = "2cc7ebd06eb0574420d0e0838a89e61e4d38332d7ed3896fb20aa9d17480a566"
content-hash = "faba30e3afa700d6f483022581e1813ec16c9bff0a49ebbcbbd166e293ae877e"

View File

@ -1,11 +1,18 @@
from os import environ
from typing import Union
from pyinfra.utils.url_parsing import validate_and_parse_s3_endpoint
def read_from_environment(environment_variable_name, default_value):
return environ.get(environment_variable_name, default_value)
class Config(object):
def normalize_bool(value: Union[str, bool]):
return value if isinstance(value, bool) else value in ["True", "true"]
class Config:
def __init__(self):
# Logging level for service logger
self.logging_level_root = read_from_environment("LOGGING_LEVEL_ROOT", "DEBUG")
@ -23,11 +30,11 @@ class Config(object):
self.rabbitmq_password = read_from_environment("RABBITMQ_PASSWORD", "bitnami")
# Controls AMQP heartbeat timeout in seconds
self.rabbitmq_heartbeat = read_from_environment("RABBITMQ_HEARTBEAT", "60")
self.rabbitmq_heartbeat = int(read_from_environment("RABBITMQ_HEARTBEAT", 60))
# Controls AMQP connection sleep timer in seconds
# important for heartbeat to come through while main function runs on other thread
self.rabbitmq_connection_sleep = read_from_environment("RABBITMQ_CONNECTION_SLEEP", 5)
self.rabbitmq_connection_sleep = int(read_from_environment("RABBITMQ_CONNECTION_SLEEP", 5))
# Queue name for requests to the service
self.request_queue = read_from_environment("REQUEST_QUEUE", "request_queue")
@ -47,8 +54,9 @@ class Config(object):
else:
self.storage_bucket = read_from_environment("STORAGE_AZURECONTAINERNAME", "redaction")
# Endpoint for s3 storage
self.storage_endpoint = read_from_environment("STORAGE_ENDPOINT", "http://127.0.0.1:9000")
# S3 connection security flag and endpoint
storage_address = read_from_environment("STORAGE_ENDPOINT", "http://127.0.0.1:9000")
self.storage_secure_connection, self.storage_endpoint = validate_and_parse_s3_endpoint(storage_address)
# User for s3 storage
self.storage_key = read_from_environment("STORAGE_KEY", "root")
@ -61,7 +69,8 @@ class Config(object):
# Connection string for Azure storage
self.storage_azureconnectionstring = read_from_environment(
"STORAGE_AZURECONNECTIONSTRING", "DefaultEndpointsProtocol=..."
"STORAGE_AZURECONNECTIONSTRING",
"DefaultEndpointsProtocol=...",
)
# Value to see if we should write a consumer token to a file

2
pyinfra/exception.py Normal file
View File

@ -0,0 +1,2 @@
class ProcessingFailure(RuntimeError):
pass

View File

@ -0,0 +1,3 @@
__all__ = ["make_payload_processor"]
from pyinfra.payload_processing.processor import make_payload_processor

View File

@ -0,0 +1,56 @@
import os
from dataclasses import dataclass, field
from operator import itemgetter
from typing import List, Tuple
@dataclass
class QueueMessagePayload:
dossier_id: str
file_id: str
target_file_extension: str
response_file_extension: str
target_file_name: str = field(init=False)
response_file_name: str = field(init=False)
def __post_init__(self):
self.target_file_name = f"{self.dossier_id}/{self.file_id}.{self.target_file_extension}"
self.response_file_name = f"{self.dossier_id}/{self.file_id}.{self.response_file_extension}"
self.target_file_type, self.target_compression_type = parse_file_extension(self.target_file_extension)
self.response_file_type, self.response_compression_type = parse_file_extension(self.response_file_extension)
def read_queue_message_payload(payload: dict) -> QueueMessagePayload:
return QueueMessagePayload(
*itemgetter(
"dossierId",
"fileId",
"targetFileExtension",
"responseFileExtension",
)(payload)
)
def format_service_processing_result_for_storage(
queue_message_payload: QueueMessagePayload, service_processing_result: List[dict]
) -> dict:
return {
"dossierId": queue_message_payload.dossier_id,
"fileId": queue_message_payload.file_id,
"targetFileExtension": queue_message_payload.target_file_extension,
"responseFileExtension": queue_message_payload.response_file_extension,
"data": service_processing_result,
}
def format_to_queue_message_response_body(queue_message_payload: QueueMessagePayload) -> dict:
return {"dossierId": queue_message_payload.dossier_id, "fileId": queue_message_payload.file_id}
def parse_file_extension(file_extension: str) -> Tuple[str, str]:
file_type, compression_type = os.path.splitext(file_extension)
if "." in file_type:
raise ValueError(f"Number of file extensions exceeds allowed amount of two: {file_extension}")
return file_type, compression_type

View File

@ -0,0 +1,80 @@
import logging
from dataclasses import asdict
from functools import partial
from typing import Callable, Union, List
from funcy import compose
from pyinfra.config import get_config, Config
from pyinfra.payload_processing.payload import (
read_queue_message_payload,
format_service_processing_result_for_storage,
format_to_queue_message_response_body,
)
from pyinfra.storage import get_storage
from pyinfra.storage.storage import make_downloader, make_uploader
from pyinfra.storage.storages.interface import Storage
logger = logging.getLogger()
logger.setLevel(get_config().logging_level_root)
class PayloadProcessor:
def __init__(self, storage: Storage, bucket: str, data_processor: Callable):
"""Wraps an analysis function specified by a service (e.g. NER service) in pre- and post-processing steps.
Args:
storage: The storage to use for downloading and uploading files
bucket: The bucket to use for downloading and uploading files
data_processor: The analysis function to be called with the downloaded file
"""
self.process_data = data_processor
self.partial_download_fn = partial(make_downloader, storage, bucket)
self.partial_upload_fn = partial(make_uploader, storage, bucket)
def __call__(self, queue_message_payload: dict) -> dict:
"""Processes a queue message payload.
The steps executed are:
1. Download the file specified in the message payload from the storage
2. Process the file with the analysis function
3. Upload the result to the storage
4. Return the payload for a response queue message
Args:
queue_message_payload: The payload of a queue message. The payload is expected to be a dict with the
following keys: dossierId, fileId, targetFileExtension, responseFileExtension
Returns:
The payload for a response queue message. The payload is a dict with the following keys: dossierId, fileId
"""
return self._process(queue_message_payload)
def _process(self, queue_message_payload: dict) -> dict:
payload = read_queue_message_payload(queue_message_payload)
logger.info(f"Processing {asdict(payload)} ...")
download_file_to_process = self.partial_download_fn(payload.target_file_type, payload.target_compression_type)
upload_processing_result = self.partial_upload_fn(payload.response_file_type, payload.response_compression_type)
format_result_for_storage = partial(format_service_processing_result_for_storage, payload)
processing_pipeline = compose(format_result_for_storage, self.process_data, download_file_to_process)
result: List[dict] = processing_pipeline(payload.target_file_name)
upload_processing_result(payload.response_file_name, result)
return format_to_queue_message_response_body(payload)
def make_payload_processor(data_processor: Callable, config: Union[None, Config]) -> PayloadProcessor:
"""Produces payload processor for queue manager."""
config = config or get_config()
storage: Storage = get_storage(config)
bucket: str = config.storage_bucket
data_processor = compose(list, data_processor)
return PayloadProcessor(storage, bucket, data_processor)

View File

@ -0,0 +1,40 @@
import json
import pika
import pika.exceptions
from pyinfra.config import Config
from pyinfra.queue.queue_manager import QueueManager
class DevelopmentQueueManager(QueueManager):
"""Extends the queue manger with additional functionality that is needed for tests and scripts,
but not in production, such as publishing messages.
"""
def __init__(self, config: Config):
super().__init__(config)
self._open_channel()
def publish_request(self, message: dict, properties: pika.BasicProperties = None):
message_encoded = json.dumps(message).encode("utf-8")
self._channel.basic_publish(
"",
self._input_queue,
properties=properties,
body=message_encoded,
)
def get_response(self):
return self._channel.basic_get(self._output_queue)
def clear_queues(self):
"""purge input & output queues"""
try:
self._channel.queue_purge(self._input_queue)
self._channel.queue_purge(self._output_queue)
except pika.exceptions.ChannelWrongStateError:
pass
def close_channel(self):
self._channel.close()

View File

@ -4,13 +4,14 @@ import json
import logging
import signal
from pathlib import Path
from typing import Callable
import pika
import pika.exceptions
from pika.adapters.blocking_connection import BlockingChannel
from pyinfra.config import Config
from pyinfra.exception import ProcessingFailure
from pyinfra.payload_processing.processor import PayloadProcessor
CONFIG = Config()
@ -32,7 +33,7 @@ def get_connection_params(config: Config) -> pika.ConnectionParameters:
"host": config.rabbitmq_host,
"port": config.rabbitmq_port,
"credentials": credentials,
"heartbeat": int(config.rabbitmq_heartbeat),
"heartbeat": config.rabbitmq_heartbeat,
}
return pika.ConnectionParameters(**pika_connection_params)
@ -104,9 +105,10 @@ class QueueManager:
self._channel.queue_declare(self._input_queue, arguments=args, auto_delete=False, durable=True)
self._channel.queue_declare(self._output_queue, arguments=args, auto_delete=False, durable=True)
def start_consuming(self, process_message_callback: Callable):
def start_consuming(self, process_payload: PayloadProcessor):
"""consumption handling
- stanard callback handling is enforced through wrapping process_message_callback in _create_queue_callback (implements threading to support heartbeats)
- standard callback handling is enforced through wrapping process_message_callback in _create_queue_callback
(implements threading to support heartbeats)
- initially sets consumer token to None
- tries to
- open channels
@ -115,9 +117,9 @@ class QueueManager:
- catches all Exceptions & stops consuming + closes channels
Args:
process_message_callback (Callable): function call passed into the queue manager from implementing service
process_payload (Callable): function passed to the queue manager, configured by implementing service
"""
callback = self._create_queue_callback(process_message_callback)
callback = self._create_queue_callback(process_payload)
self._set_consumer_token(None)
self.logger.info("Consuming from queue")
@ -134,11 +136,10 @@ class QueueManager:
raise
finally:
self.stop_consuming() # stopping consumption also closes the channel
self.stop_consuming()
self._connection.close()
def stop_consuming(self):
"""stop channel consumption & reset consumer token to None"""
if self._consumer_token and self._connection:
self.logger.info("Cancelling subscription for consumer-tag %s", self._consumer_token)
self._channel.stop_consuming(self._consumer_token)
@ -148,71 +149,65 @@ class QueueManager:
self.logger.info("Received signal %s", signal_number)
self.stop_consuming()
def _create_queue_callback(self, process_message_callback: Callable):
def _create_queue_callback(self, process_payload: PayloadProcessor):
def process_message_body_and_await_result(unpacked_message_body):
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread_pool_executor:
self.logger.debug("opening thread for callback")
future = thread_pool_executor.submit(process_message_callback, unpacked_message_body)
self.logger.debug("Processing payload in separate thread")
future = thread_pool_executor.submit(process_payload, unpacked_message_body)
while future.running():
self.logger.debug("callback running in thread, processing data events in the meantime")
self.logger.debug("Waiting for payload processing to finish")
self._connection.sleep(float(self._connection_sleep))
self.logger.debug("fetching result from callback")
return future.result()
try:
return future.result()
except Exception as err:
raise ProcessingFailure("QueueMessagePayload processing failed") from err
def acknowledge_message_and_publish_response(frame, properties, response_body):
response_properties = pika.BasicProperties(headers=properties.headers) if properties.headers else None
self._channel.basic_publish(
"", self._output_queue, json.dumps(response_body).encode(), response_properties
)
self._channel.basic_publish("", self._output_queue, json.dumps(response_body).encode(), response_properties)
self.logger.info(
"Result published, acknowledging incoming message with delivery_tag %s", frame.delivery_tag
"Result published, acknowledging incoming message with delivery_tag %s",
frame.delivery_tag,
)
self._channel.basic_ack(frame.delivery_tag)
def callback(_channel, frame, properties, body):
self.logger.info("Received message from queue with delivery_tag %s", frame.delivery_tag)
self.logger.debug("Message headers: %s.", properties.headers)
self.logger.debug("Message headers: %s", properties.headers)
# Only try to process each message once.
# Requeueing will be handled by the dead-letter-exchange.
# This prevents endless retries on messages that are impossible to process.
# Only try to process each message once. Re-queueing will be handled by the dead-letter-exchange. This
# prevents endless retries on messages that are impossible to process.
if frame.redelivered:
self.logger.info(
"Aborting message processing for delivery_tag %s due to it being redelivered", frame.delivery_tag
"Aborting message processing for delivery_tag %s due to it being redelivered",
frame.delivery_tag,
)
self._channel.basic_nack(frame.delivery_tag, requeue=False)
# returns nothing, so the function stops here
return
self.logger.debug("Processing (%s, %s, %s).", frame, properties, body)
try:
callback_result = process_message_body_and_await_result(json.loads(body))
self.logger.debug("Processing (%s, %s, %s)", frame, properties, body)
processing_result = process_message_body_and_await_result(json.loads(body))
self.logger.info(
"Processed message with delivery_tag %s, publishing result to result-queue",
frame.delivery_tag,
)
acknowledge_message_and_publish_response(frame, properties, processing_result)
if callback_result:
self.logger.info(
"Processed message with delivery_tag %s, publishing result to result-queue", frame.delivery_tag
)
acknowledge_message_and_publish_response(frame, properties, callback_result)
else:
self.logger.info("Processed message with delivery_tag %s, declining message", frame.delivery_tag)
self._channel.basic_nack(frame.delivery_tag, requeue=False)
except ProcessingFailure:
self.logger.info(
"Processing message with delivery_tag %s failed, declining",
frame.delivery_tag,
)
self._channel.basic_nack(frame.delivery_tag, requeue=False)
except Exception as ex:
except Exception:
n_attempts = _get_n_previous_attempts(properties) + 1
self.logger.warning("Failed to process message, %s attempts", n_attempts, exc_info=True)
self._channel.basic_nack(frame.delivery_tag, requeue=False)
raise ex
raise
return callback
def clear(self):
"""purge input & output queues"""
try:
self._channel.queue_purge(self._input_queue)
self._channel.queue_purge(self._output_queue)
except pika.exceptions.ChannelWrongStateError:
pass

View File

@ -1,4 +1,3 @@
from pyinfra.storage import adapters, storage
from pyinfra.storage.storage import get_storage
__all__ = ["adapters", "storage"]
__all__ = ["get_storage"]

View File

@ -1,9 +1,17 @@
from functools import lru_cache, partial
from typing import Callable
from funcy import compose
from pyinfra.config import Config
from pyinfra.storage.adapters.azure import get_azure_storage
from pyinfra.storage.adapters.s3 import get_s3_storage
from pyinfra.storage.storages.azure import get_azure_storage
from pyinfra.storage.storages.s3 import get_s3_storage
from pyinfra.storage.storages.interface import Storage
from pyinfra.utils.compressing import get_decompressor, get_compressor
from pyinfra.utils.encoding import get_decoder, get_encoder
def get_storage(config: Config):
def get_storage(config: Config) -> Storage:
if config.storage_backend == "s3":
storage = get_s3_storage(config)
@ -13,3 +21,33 @@ def get_storage(config: Config):
raise Exception(f"Unknown storage backend '{config.storage_backend}'.")
return storage
def verify_existence(storage: Storage, bucket: str, file_name: str) -> str:
if not storage.exists(bucket, file_name):
raise FileNotFoundError(f"{file_name=} name not found on storage in {bucket=}.")
return file_name
@lru_cache(maxsize=10)
def make_downloader(storage: Storage, bucket: str, file_type: str, compression_type: str) -> Callable:
verify = partial(verify_existence, storage, bucket)
download = partial(storage.get_object, bucket)
decompress = get_decompressor(compression_type)
decode = get_decoder(file_type)
return compose(decode, decompress, download, verify)
@lru_cache(maxsize=10)
def make_uploader(storage: Storage, bucket: str, file_type: str, compression_type: str) -> Callable:
upload = partial(storage.put_object, bucket)
compress = get_compressor(compression_type)
encode = get_encoder(file_type)
def inner(file_name, file_bytes):
upload(file_name, compose(compress, encode)(file_bytes))
return inner

View File

@ -6,6 +6,7 @@ from azure.storage.blob import BlobServiceClient, ContainerClient
from retry import retry
from pyinfra.config import Config, get_config
from pyinfra.storage.storages.interface import Storage
CONFIG = get_config()
logger = logging.getLogger(CONFIG.logging_level_root)
@ -14,7 +15,7 @@ logging.getLogger("azure").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
class AzureStorageAdapter(object):
class AzureStorage(Storage):
def __init__(self, client: BlobServiceClient):
self._client: BlobServiceClient = client
@ -77,4 +78,4 @@ class AzureStorageAdapter(object):
def get_azure_storage(config: Config):
return AzureStorageAdapter(BlobServiceClient.from_connection_string(conn_str=config.storage_azureconnectionstring))
return AzureStorage(BlobServiceClient.from_connection_string(conn_str=config.storage_azureconnectionstring))

View File

@ -0,0 +1,35 @@
from abc import ABC, abstractmethod
class Storage(ABC):
@abstractmethod
def make_bucket(self, bucket_name):
raise NotImplementedError
@abstractmethod
def has_bucket(self, bucket_name):
raise NotImplementedError
@abstractmethod
def put_object(self, bucket_name, object_name, data):
raise NotImplementedError
@abstractmethod
def exists(self, bucket_name, object_name):
raise NotImplementedError
@abstractmethod
def get_object(self, bucket_name, object_name):
raise NotImplementedError
@abstractmethod
def get_all_objects(self, bucket_name):
raise NotImplementedError
@abstractmethod
def clear_bucket(self, bucket_name):
raise NotImplementedError
@abstractmethod
def get_all_object_names(self, bucket_name):
raise NotImplementedError

View File

@ -1,40 +1,19 @@
import io
import logging
import re
from itertools import repeat
from operator import attrgetter
from urllib.parse import urlparse
from minio import Minio
from retry import retry
from pyinfra.config import Config, get_config
from pyinfra.storage.storages.interface import Storage
CONFIG = get_config()
logger = logging.getLogger(CONFIG.logging_level_root)
ALLOWED_CONNECTION_SCHEMES = {"http", "https"}
URL_VALIDATOR = re.compile(
r"^(("
+ r"([A-Za-z]{3,9}:(?:\/\/)?)"
+ r"(?:[\-;:&=\+\$,\w]+@)?"
+ r"[A-Za-z0-9\.\-]+|(?:www\.|[\-;:&=\+\$,\w]+@)"
+ r"[A-Za-z0-9\.\-]+)"
+ r"((?:\/[\+~%\/\.\w\-_]*)?"
+ r"\??(?:[\-\+=&;%@\.\w_]*)#?(?:[\.\!\/\\\w]*))?)"
)
# URL_VALIDATOR = re.compile(
# r"""^((([A-Za-z]{3,9}:(?:\/\/)?)
# (?:[\-;:&=\+\$,\w]+@)?
# [A-Za-z0-9\.\-]+|(?:www\.|[\-;:&=\+\$,\w]+@)
# [A-Za-z0-9\.\-]+)
# ((?:\/[\+~%\/\.\w\-_]*)?
# \??(?:[\-\+=&;%@\.\w_]*)
# \#?(?:[\.\!\/\\\w]*))?)"""
# )
class S3StorageAdapter(object):
class S3Storage(Storage):
def __init__(self, client: Minio):
self._client = client
@ -88,21 +67,13 @@ class S3StorageAdapter(object):
return zip(repeat(bucket_name), map(attrgetter("object_name"), objs))
def _parse_endpoint(endpoint):
parsed_url = urlparse(endpoint)
if URL_VALIDATOR.match(endpoint) and parsed_url.netloc and parsed_url.scheme in ALLOWED_CONNECTION_SCHEMES:
return {"secure": parsed_url.scheme == "https", "endpoint": parsed_url.netloc}
else:
raise Exception(f"The configured storage endpoint is not a valid url: {endpoint}")
def get_s3_storage(config: Config):
return S3StorageAdapter(
return S3Storage(
Minio(
**_parse_endpoint(config.storage_endpoint),
secure=config.storage_secure_connection,
endpoint=config.storage_endpoint,
access_key=config.storage_key,
secret_key=config.storage_secret,
# This is relevant for running on s3
region=config.storage_region,
)
)

View File

@ -0,0 +1,22 @@
import gzip
from typing import Union, Callable
from funcy import identity
def get_decompressor(compression_type: Union[str, None]) -> Callable:
if not compression_type:
return identity
elif "gz" in compression_type:
return gzip.decompress
else:
raise ValueError(f"{compression_type=} is not supported.")
def get_compressor(compression_type: str) -> Callable:
if not compression_type:
return identity
elif "gz" in compression_type:
return gzip.compress
else:
raise ValueError(f"{compression_type=} is not supported.")

28
pyinfra/utils/encoding.py Normal file
View File

@ -0,0 +1,28 @@
import json
from typing import Callable
from funcy import identity
def decode_json(data: bytes) -> dict:
return json.loads(data.decode("utf-8"))
def encode_json(data: dict) -> bytes:
return json.dumps(data).encode("utf-8")
def get_decoder(file_type: str) -> Callable:
if "json" in file_type:
return decode_json
elif "pdf" in file_type:
return identity
else:
raise ValueError(f"{file_type=} is not supported.")
def get_encoder(file_type: str) -> Callable:
if "json" in file_type:
return encode_json
else:
raise ValueError(f"{file_type=} is not supported.")

View File

@ -0,0 +1,40 @@
import re
from operator import truth
from typing import Tuple
from urllib.parse import urlparse
def make_url_validator(allowed_connection_schemes: tuple = ("http", "https")):
pattern = re.compile(
r"^(("
+ r"([A-Za-z]{3,9}:(?:\/\/)?)"
+ r"(?:[\-;:&=\+\$,\w]+@)?"
+ r"[A-Za-z0-9\.\-]+|(?:www\.|[\-;:&=\+\$,\w]+@)"
+ r"[A-Za-z0-9\.\-]+)"
+ r"((?:\/[\+~%\/\.\w\-_]*)?"
+ r"\??(?:[\-\+=&;%@\.\w_]*)#?(?:[\.\!\/\\\w]*))?)"
)
def inner(url: str):
url_is_valid = pattern.match(url)
parsed_url = urlparse(url)
endpoint_is_valid = truth(parsed_url.netloc)
protocol_is_valid = parsed_url.scheme in allowed_connection_schemes
return url_is_valid and endpoint_is_valid and protocol_is_valid
return inner
def validate_and_parse_s3_endpoint(endpoint: str) -> Tuple[bool, str]:
validate_url = make_url_validator()
if not validate_url(endpoint):
raise Exception(f"The s3 storage endpoint is not a valid url: {endpoint}")
parsed_url = urlparse(endpoint)
connection_is_secure = parsed_url.scheme == "https"
storage_endpoint = parsed_url.netloc
return connection_is_secure, storage_endpoint

View File

@ -16,12 +16,16 @@ azure-storage-blob = "12.9.0"
testcontainers = "3.4.2"
docker-compose = "1.29.2"
funcy = "1.17"
prometheus-client = "^0.16.0"
pymupdf = "^1.21.1"
[tool.poetry.group.dev.dependencies]
pytest = "^7.1.3"
ipykernel = "^6.16.0"
black = {version = "^23.1a1", allow-prereleases = true}
pylint = "^2.15.10"
coverage = "^7.2.0"
requests = "^2.28.2"
[tool.pytest.ini_options]
minversion = "6.0"

View File

@ -1,100 +0,0 @@
import gzip
import json
import logging
from operator import itemgetter
import pika
from pyinfra.config import get_config
from pyinfra.storage.adapters.s3 import get_s3_storage
CONFIG = get_config()
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def read_connection_params():
credentials = pika.PlainCredentials(CONFIG.rabbitmq_username, CONFIG.rabbitmq_password)
parameters = pika.ConnectionParameters(
host=CONFIG.rabbitmq_host,
port=CONFIG.rabbitmq_port,
heartbeat=int(CONFIG.rabbitmq_heartbeat),
credentials=credentials,
)
return parameters
def make_channel(connection) -> pika.adapters.blocking_connection.BlockingChannel:
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
return channel
def declare_queue(channel, queue: str):
args = {"x-dead-letter-exchange": "", "x-dead-letter-routing-key": CONFIG.dead_letter_queue}
return channel.queue_declare(queue=queue, auto_delete=False, durable=True, arguments=args)
def make_connection() -> pika.BlockingConnection:
parameters = read_connection_params()
connection = pika.BlockingConnection(parameters)
return connection
def upload_and_make_message_body():
bucket = CONFIG.storage_bucket
dossier_id, file_id, suffix = "dossier", "file", "json.gz"
content = {"key": "value"}
object_name = f"{dossier_id}/{file_id}.{suffix}"
data = gzip.compress(json.dumps(content).encode("utf-8"))
storage = get_s3_storage(CONFIG)
if not storage.has_bucket(bucket):
storage.make_bucket(bucket)
storage.put_object(bucket, object_name, data)
message_body = {
"dossierId": dossier_id,
"fileId": file_id,
"targetFileExtension": suffix,
"responseFileExtension": f"result.{suffix}",
}
return message_body
def main():
connection = make_connection()
channel = make_channel(connection)
declare_queue(channel, CONFIG.request_queue)
declare_queue(channel, CONFIG.response_queue)
message = upload_and_make_message_body()
message_encoded = json.dumps(message).encode("utf-8")
channel.basic_publish(
"",
CONFIG.request_queue,
# properties=pika.BasicProperties(headers=None),
properties=pika.BasicProperties(headers={"x-tenant-id": "redaction"}),
body=message_encoded,
)
logger.info(f"Put {message} on {CONFIG.request_queue}")
storage = get_s3_storage(CONFIG)
for method_frame, properties, body in channel.consume(queue=CONFIG.response_queue, inactivity_timeout=10):
if not body:
break
response = json.loads(body)
logger.info(f"Received {response}")
logger.info(f"Message headers: {properties.headers}")
channel.basic_ack(method_frame.delivery_tag)
dossier_id, file_id = itemgetter("dossierId", "fileId")(response)
suffix = message["responseFileExtension"]
result = storage.get_object(CONFIG.storage_bucket, f"{dossier_id}/{file_id}.{suffix}")
result = json.loads(gzip.decompress(result))
logger.info(f"Contents of result on storage: {result}")
channel.close()
if __name__ == "__main__":
main()

72
scripts/send_request.py Normal file
View File

@ -0,0 +1,72 @@
import gzip
import json
import logging
from operator import itemgetter
import pika
from pyinfra.config import get_config
from pyinfra.queue.development_queue_manager import DevelopmentQueueManager
from pyinfra.storage.storages.s3 import get_s3_storage
CONFIG = get_config()
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def upload_json_and_make_message_body():
bucket = CONFIG.storage_bucket
dossier_id, file_id, suffix = "dossier", "file", "json.gz"
content = {
"numberOfPages": 7,
"sectionTexts": "data",
}
object_name = f"{dossier_id}/{file_id}.{suffix}"
data = gzip.compress(json.dumps(content).encode("utf-8"))
storage = get_s3_storage(CONFIG)
if not storage.has_bucket(bucket):
storage.make_bucket(bucket)
storage.put_object(bucket, object_name, data)
message_body = {
"dossierId": dossier_id,
"fileId": file_id,
"targetFileExtension": suffix,
"responseFileExtension": f"result.{suffix}",
}
return message_body
def main():
development_queue_manager = DevelopmentQueueManager(CONFIG)
development_queue_manager.clear_queues()
message = upload_json_and_make_message_body()
development_queue_manager.publish_request(message, pika.BasicProperties(headers={"x-tenant-id": "redaction"}))
logger.info(f"Put {message} on {CONFIG.request_queue}")
storage = get_s3_storage(CONFIG)
for method_frame, properties, body in development_queue_manager._channel.consume(
queue=CONFIG.response_queue, inactivity_timeout=15
):
if not body:
break
response = json.loads(body)
logger.info(f"Received {response}")
logger.info(f"Message headers: {properties.headers}")
development_queue_manager._channel.basic_ack(method_frame.delivery_tag)
dossier_id, file_id = itemgetter("dossierId", "fileId")(response)
suffix = message["responseFileExtension"]
print(f"{dossier_id}/{file_id}.{suffix}")
result = storage.get_object(CONFIG.storage_bucket, f"{dossier_id}/{file_id}.{suffix}")
result = json.loads(gzip.decompress(result))
logger.info(f"Contents of result on storage: {result}")
development_queue_manager.close_channel()
if __name__ == "__main__":
main()

View File

@ -1,53 +1,24 @@
import gzip
import json
import logging
from typing import Callable
import time
from pyinfra.config import get_config
from pyinfra.payload_processing.processor import make_payload_processor
from pyinfra.queue.queue_manager import QueueManager
from pyinfra.storage import get_storage
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def make_callback(processor: Callable, config=get_config()):
bucket = config.storage_bucket
storage = get_storage(config)
def callback(request_message):
dossier_id = request_message["dossierId"]
file_id = request_message["fileId"]
logger.info(f"Processing {dossier_id=} {file_id=} ...")
target_file_name = f"{dossier_id}/{file_id}.{request_message['targetFileExtension']}"
response_file_name = f"{dossier_id}/{file_id}.{request_message['responseFileExtension']}"
if not storage.exists(bucket, target_file_name):
logger.warning(f"{target_file_name=} not present in {bucket=}, cancelling processing...")
return None
object_bytes = storage.get_object(bucket, target_file_name)
object_bytes = gzip.decompress(object_bytes)
result_body = list(processor(object_bytes))
result = {**request_message, "data": result_body}
storage_bytes = gzip.compress(json.dumps(result).encode("utf-8"))
storage.put_object(bucket, response_file_name, storage_bytes)
return {"dossierId": dossier_id, "fileId": file_id}
return callback
def process(body):
return [{"response_key": "response_value"}]
def json_processor_mock(data: dict):
time.sleep(5)
return [{"result1": "result1"}, {"result2": "result2"}]
def main():
logger.info("Start consuming...")
queue_manager = QueueManager(get_config())
queue_manager.start_consuming(make_callback(process))
queue_manager.start_consuming(make_payload_processor(json_processor_mock))
if __name__ == "__main__":

View File

@ -1,18 +0,0 @@
# Scripts Usage
## Run pyinfra locally
**Shell 1**: Start minio and rabbitmq containers
```bash
$ cd scripts && docker-compose up
```
**Shell 2**: Start pyinfra with callback mock
```bash
$ python scripts/start_pyinfra.py
```
**Shell 3**: Upload dummy content on storage and publish message
```bash
$ python scripts/mock_process_request.py
```

View File

@ -1,79 +0,0 @@
service:
response_formatter: identity
operations:
upper:
input:
subdir: ""
extension: up_in.gz
multi: False
output:
subdir: ""
extension: up_out.gz
extract:
input:
subdir: ""
extension: extr_in.gz
multi: False
output:
subdir: "extractions"
extension: gz
rotate:
input:
subdir: ""
extension: rot_in.gz
multi: False
output:
subdir: ""
extension: rot_out.gz
classify:
input:
subdir: ""
extension: cls_in.gz
multi: True
output:
subdir: ""
extension: cls_out.gz
stream_pages:
input:
subdir: ""
extension: pgs_in.gz
multi: False
output:
subdir: "pages"
extension: pgs_out.gz
default:
input:
subdir: ""
extension: IN.gz
multi: False
output:
subdir: ""
extension: OUT.gz
storage:
minio:
endpoint: "http://127.0.0.1:9000"
access_key: root
secret_key: password
region: null
aws:
endpoint: https://s3.amazonaws.com
access_key: AKIA4QVP6D4LCDAGYGN2
secret_key: 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
region: $STORAGE_REGION|"eu-west-1"
azure:
connection_string: "DefaultEndpointsProtocol=https;AccountName=iqserdevelopment;AccountKey=4imAbV9PYXaztSOMpIyAClg88bAZCXuXMGJG0GA1eIBpdh2PlnFGoRBnKqLy2YZUSTmZ3wJfC7tzfHtuC6FEhQ==;EndpointSuffix=core.windows.net"
bucket: "pyinfra-test-bucket"
webserver:
host: $SERVER_HOST|"127.0.0.1" # webserver address
port: $SERVER_PORT|5000 # webserver port
mode: $SERVER_MODE|production # webserver mode: {development, production}
mock_analysis_endpoint: "http://127.0.0.1:5000"
use_docker_fixture: 1
logging: 0

View File

@ -1,19 +1,95 @@
import pytest
from pyinfra.config import get_config, Config
import os
import logging
import time
from pathlib import Path
@pytest.fixture(params=["aws", "azure"])
def storage_config(request) -> Config:
if request.param == "aws":
os.environ["STORAGE_BACKEND"] = "s3"
os.environ["STORAGE_BUCKET_NAME"] = "pyinfra-test-bucket"
os.environ["STORAGE_ENDPOINT"] = "https://s3.amazonaws.com"
os.environ["STORAGE_KEY"] = "AKIA4QVP6D4LCDAGYGN2"
os.environ["STORAGE_SECRET"] = "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"
os.environ["STORAGE_REGION"] = "eu-west-1"
else:
os.environ["STORAGE_BACKEND"] = "azure"
os.environ["STORAGE_AZURECONTAINERNAME"] = "pyinfra-test-bucket"
os.environ["STORAGE_AZURECONNECTIONSTRING"] = "DefaultEndpointsProtocol=https;AccountName=iqserdevelopment;AccountKey=4imAbV9PYXaztSOMpIyAClg88bAZCXuXMGJG0GA1eIBpdh2PlnFGoRBnKqLy2YZUSTmZ3wJfC7tzfHtuC6FEhQ==;EndpointSuffix=core.windows.net"
return get_config()
import pytest
import testcontainers.compose
from pyinfra.config import get_config
from pyinfra.queue.queue_manager import QueueManager
from pyinfra.storage import get_storage
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
TESTS_DIR = Path(__file__).resolve().parents[0]
@pytest.fixture(scope="session", autouse=True)
def docker_compose(sleep_seconds=30):
"""Note: `autouse` can be set to `False` while working on the code to speed up the testing. In that case, run
`docker-compose up` in the tests directory manually before running the tests.
"""
logger.info(f"Starting docker containers with {TESTS_DIR}/docker-compose.yml...")
compose = testcontainers.compose.DockerCompose(TESTS_DIR, compose_file_name="docker-compose.yml")
compose.start()
logger.info(f"Sleeping for {sleep_seconds} seconds to wait for containers to finish startup... ")
time.sleep(sleep_seconds)
yield compose
compose.stop()
@pytest.fixture(scope="session")
def storage_config(client_name):
config = get_config()
config.storage_backend = client_name
config.storage_azureconnectionstring = "DefaultEndpointsProtocol=https;AccountName=iqserdevelopment;AccountKey=4imAbV9PYXaztSOMpIyAClg88bAZCXuXMGJG0GA1eIBpdh2PlnFGoRBnKqLy2YZUSTmZ3wJfC7tzfHtuC6FEhQ==;EndpointSuffix=core.windows.net"
return config
@pytest.fixture(scope="session")
def processing_config(storage_config, monitoring_enabled):
storage_config.monitoring_enabled = monitoring_enabled
return storage_config
@pytest.fixture(scope="session")
def bucket_name(storage_config):
return storage_config.storage_bucket
@pytest.fixture(scope="session")
def storage(storage_config):
logger.debug("Setup for storage")
storage = get_storage(storage_config)
storage.make_bucket(storage_config.storage_bucket)
storage.clear_bucket(storage_config.storage_bucket)
yield storage
logger.debug("Teardown for storage")
try:
storage.clear_bucket(storage_config.storage_bucket)
except:
pass
@pytest.fixture(scope="session")
def queue_config(payload_processor_type):
config = get_config()
# FIXME: It looks like rabbitmq_heartbeat has to be greater than rabbitmq_connection_sleep. If this is expected, the
# user should not be abele to insert non working values.
config.rabbitmq_heartbeat = config.rabbitmq_connection_sleep + 1
return config
@pytest.fixture(scope="session")
def queue_manager(queue_config):
queue_manager = QueueManager(queue_config)
return queue_manager
@pytest.fixture
def request_payload():
return {
"dossierId": "test",
"fileId": "test",
"targetFileExtension": "json.gz",
"responseFileExtension": "json.gz",
}
@pytest.fixture(scope="session")
def response_payload():
return {
"dossierId": "test",
"fileId": "test",
}

69
tests/processing_test.py Normal file
View File

@ -0,0 +1,69 @@
import gzip
import json
from operator import itemgetter
import pytest
from pyinfra.payload_processing.processor import make_payload_processor
@pytest.fixture(scope="session")
def file_processor_mock():
def inner(json_file: dict):
return [json_file]
return inner
@pytest.fixture
def target_file():
contents = {"numberOfPages": 10, "content1": "value1", "content2": "value2"}
return gzip.compress(json.dumps(contents).encode("utf-8"))
@pytest.fixture
def file_names(request_payload):
dossier_id, file_id, target_suffix, response_suffix = itemgetter(
"dossierId",
"fileId",
"targetFileExtension",
"responseFileExtension",
)(request_payload)
return f"{dossier_id}/{file_id}.{target_suffix}", f"{dossier_id}/{file_id}.{response_suffix}"
@pytest.fixture(scope="session")
def payload_processor(file_processor_mock, processing_config):
yield make_payload_processor(file_processor_mock, processing_config)
@pytest.mark.parametrize("client_name", ["s3"], scope="session")
@pytest.mark.parametrize("monitoring_enabled", [True, False], scope="session")
class TestPayloadProcessor:
def test_payload_processor_yields_correct_response_and_uploads_result(
self,
payload_processor,
storage,
bucket_name,
request_payload,
response_payload,
target_file,
file_names,
):
storage.clear_bucket(bucket_name)
storage.put_object(bucket_name, file_names[0], target_file)
response = payload_processor(request_payload)
assert response == response_payload
data_received = storage.get_object(bucket_name, file_names[1])
assert json.loads((gzip.decompress(data_received)).decode("utf-8")) == {
**request_payload,
"data": [json.loads(gzip.decompress(target_file).decode("utf-8"))],
}
def test_catching_of_processing_failure(self, payload_processor, storage, bucket_name, request_payload):
storage.clear_bucket(bucket_name)
with pytest.raises(Exception):
payload_processor(request_payload)

120
tests/queue_test.py Normal file
View File

@ -0,0 +1,120 @@
import json
import logging
import time
from multiprocessing import Process
import pika
import pika.exceptions
import pytest
from pyinfra.queue.development_queue_manager import DevelopmentQueueManager
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
@pytest.fixture(scope="session")
def development_queue_manager(queue_config):
queue_config.rabbitmq_heartbeat = 7200
development_queue_manager = DevelopmentQueueManager(queue_config)
yield development_queue_manager
logger.info("Tearing down development queue manager...")
try:
development_queue_manager.close_channel()
except pika.exceptions.ConnectionClosedByBroker:
pass
@pytest.fixture(scope="session")
def payload_processing_time(queue_config, offset=5):
# FIXME: this implicitly tests the heartbeat when running the end-to-end test. There should be another way to test
# this explicitly.
return queue_config.rabbitmq_heartbeat + offset
@pytest.fixture(scope="session")
def payload_processor(response_payload, payload_processing_time, payload_processor_type):
def process(payload):
time.sleep(payload_processing_time)
return response_payload
def process_with_failure(payload):
raise MemoryError
if payload_processor_type == "mock":
return process
elif payload_processor_type == "failing":
return process_with_failure
@pytest.fixture(scope="session", autouse=True)
def start_queue_consumer(queue_manager, payload_processor, sleep_seconds=5):
def consume_queue():
queue_manager.start_consuming(payload_processor)
p = Process(target=consume_queue)
p.start()
logger.info(f"Setting up consumer, waiting for {sleep_seconds}...")
time.sleep(sleep_seconds)
yield
logger.info("Tearing down consumer...")
p.terminate()
@pytest.fixture
def message_properties(message_headers):
if not message_headers:
return pika.BasicProperties(headers=None)
elif message_headers == "x-tenant-id":
return pika.BasicProperties(headers={"x-tenant-id": "redaction"})
else:
raise Exception(f"Invalid {message_headers=}.")
class TestQueueManager:
# FIXME: All tests here are wonky. This is due to the implementation of running the process-blocking queue_manager
# in a subprocess. It is then very hard to interact directly with the subprocess. If you have a better idea, please
# refactor; the tests here are insufficient to ensure the functionality of the queue manager!
@pytest.mark.parametrize("payload_processor_type", ["mock"], scope="session")
def test_message_processing_does_not_block_heartbeat(
self, development_queue_manager, request_payload, response_payload, payload_processing_time
):
development_queue_manager.clear_queues()
development_queue_manager.publish_request(request_payload)
time.sleep(payload_processing_time + 10)
_, _, body = development_queue_manager.get_response()
result = json.loads(body)
assert result == response_payload
@pytest.mark.parametrize("message_headers", [None, "x-tenant-id"])
@pytest.mark.parametrize("payload_processor_type", ["mock"], scope="session")
def test_queue_manager_forwards_message_headers(
self,
development_queue_manager,
request_payload,
response_payload,
payload_processing_time,
message_properties,
):
development_queue_manager.clear_queues()
development_queue_manager.publish_request(request_payload, message_properties)
time.sleep(payload_processing_time + 10)
_, properties, _ = development_queue_manager.get_response()
assert properties.headers == message_properties.headers
# FIXME: It is not possible to test the behavior of the queue manager directly, since it is running in a separate
# process. You require logging to see if the exception is handled correctly. Hence, this test is only useful for
# development, but insufficient to guarantee the correct behavior.
@pytest.mark.parametrize("payload_processor_type", ["failing"], scope="session")
def test_failed_message_processing_is_handled(
self,
development_queue_manager,
request_payload,
response_payload,
payload_processing_time,
):
development_queue_manager.clear_queues()
development_queue_manager.publish_request(request_payload)
time.sleep(payload_processing_time + 10)
_, _, body = development_queue_manager.get_response()
assert not body

55
tests/storage_test.py Normal file
View File

@ -0,0 +1,55 @@
import logging
import pytest
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
@pytest.mark.parametrize("client_name", ["azure", "s3"], scope="session")
class TestStorage:
def test_clearing_bucket_yields_empty_bucket(self, storage, bucket_name):
storage.clear_bucket(bucket_name)
data_received = storage.get_all_objects(bucket_name)
assert not {*data_received}
def test_getting_object_put_in_bucket_is_object(self, storage, bucket_name):
storage.clear_bucket(bucket_name)
storage.put_object(bucket_name, "file", b"content")
data_received = storage.get_object(bucket_name, "file")
assert b"content" == data_received
def test_object_put_in_bucket_exists_on_storage(self, storage, bucket_name):
storage.clear_bucket(bucket_name)
storage.put_object(bucket_name, "file", b"content")
assert storage.exists(bucket_name, "file")
def test_getting_nested_object_put_in_bucket_is_nested_object(self, storage, bucket_name):
storage.clear_bucket(bucket_name)
storage.put_object(bucket_name, "folder/file", b"content")
data_received = storage.get_object(bucket_name, "folder/file")
assert b"content" == data_received
def test_getting_objects_put_in_bucket_are_objects(self, storage, bucket_name):
storage.clear_bucket(bucket_name)
storage.put_object(bucket_name, "file1", b"content 1")
storage.put_object(bucket_name, "folder/file2", b"content 2")
data_received = storage.get_all_objects(bucket_name)
assert {b"content 1", b"content 2"} == {*data_received}
def test_make_bucket_produces_bucket(self, storage, bucket_name):
storage.clear_bucket(bucket_name)
storage.make_bucket(bucket_name)
assert storage.has_bucket(bucket_name)
def test_listing_bucket_files_yields_all_files_in_bucket(self, storage, bucket_name):
storage.clear_bucket(bucket_name)
storage.put_object(bucket_name, "file1", b"content 1")
storage.put_object(bucket_name, "file2", b"content 2")
full_names_received = storage.get_all_object_names(bucket_name)
assert {(bucket_name, "file1"), (bucket_name, "file2")} == {*full_names_received}
def test_data_loading_failure_raised_if_object_not_present(self, storage, bucket_name):
storage.clear_bucket(bucket_name)
with pytest.raises(Exception):
storage.get_object(bucket_name, "folder/file")

View File

@ -1,5 +0,0 @@
from pyinfra.storage import get_storage
def test_storage(storage_config) -> None:
storage = get_storage(storage_config)
assert storage.has_bucket(storage_config.storage_bucket)