Compare commits

...

172 Commits

Author SHA1 Message Date
Julius Unverfehrt
3ef4246d1e chore: fuzzy pin kn-utils to allow for future updates 2025-01-22 12:36:38 +01:00
Julius Unverfehrt
841c492639 Merge branch 'chore/RES-871-update-callback' into 'master'
feat:BREAKING CHANGE: download callback no forwards all files as bytes

See merge request knecon/research/pyinfra!108
2025-01-16 11:11:59 +01:00
Julius Unverfehrt
ead069d3a7 chore: adjust docstrings 2025-01-16 10:35:06 +01:00
Julius Unverfehrt
044ea6cf0a feat: streamline download to always include the filename of the downloaded file 2025-01-16 10:29:50 +01:00
Julius Unverfehrt
ff7547e2c6 fix: remove faulty import 2025-01-16 10:29:50 +01:00
Julius Unverfehrt
fbf79ef758 chore: regenerate BOM 2025-01-16 10:29:50 +01:00
Julius Unverfehrt
f382887d40 chore: seek and destroy proto in code 2025-01-16 10:29:50 +01:00
Julius Unverfehrt
5c4400aa8b feat:BREAKING CHANGE: download callback no forwards all files as bytes 2025-01-16 10:29:46 +01:00
Jonathan Kössler
5ce66f18a0 Merge branch 'bugfix/RED-10722' into 'master'
fix: dlq init

See merge request knecon/research/pyinfra!109
2025-01-15 10:56:12 +01:00
Jonathan Kössler
ea0c55930a chore: remove test nack 2025-01-15 10:00:50 +01:00
Jonathan Kössler
87f57e2244 fix: dlq init 2025-01-14 16:39:47 +01:00
Jonathan Kössler
3fb8c4e641 fix: do not use groups for packages 2024-12-18 16:33:35 +01:00
Jonathan Kössler
e23f63acf0 Merge branch 'chore/nexus-package-registry' into 'master'
RES-914: move package registry to nexus

See merge request knecon/research/pyinfra!106
2024-11-20 10:02:52 +01:00
Jonathan Kössler
d3fecc518e chore: move integration tests to own subfolder 2024-11-18 17:31:15 +01:00
Jonathan Kössler
341500d463 chore: set lower bound for opentelemetry dependencies 2024-11-18 17:28:11 +01:00
Jonathan Kössler
e002f77fd5 Revert "chore: update opentelemetry for proto v5 support"
This reverts commit 3c6d8f2dcc73b17f329f9cecb8d4d301f848dc1e.
2024-11-18 17:19:37 +01:00
Jonathan Kössler
3c6d8f2dcc chore: update opentelemetry for proto v5 support 2024-11-18 15:14:34 +01:00
Jonathan Kössler
f6d6ba40bb chore: add pytest-cov 2024-11-18 13:57:39 +01:00
Jonathan Kössler
6a0bbad108 ops: update CI 2024-11-18 13:53:11 +01:00
Jonathan Kössler
527a671a75 feat: move package registry to nexus 2024-11-18 13:49:48 +01:00
Jonathan Kössler
cf91189728 Merge branch 'feature/RED-10441' into 'master'
RED-10441: separate queue and webserver shutdown

See merge request knecon/research/pyinfra!105
2024-11-13 17:17:13 +01:00
Jonathan Kössler
61a6d0eeed feat: separate queue and webserver shutdown 2024-11-13 17:02:21 +01:00
Jonathan Kössler
bc0b355ff9 Merge branch 'feature/RED-10441' into 'master'
RED-10441: ensure queue manager shutdown

See merge request knecon/research/pyinfra!104
2024-11-13 16:34:25 +01:00
Jonathan Kössler
235e27b74c chore: bump version 2024-11-13 16:31:48 +01:00
Jonathan Kössler
1540c2894e feat: ensure shutdown of queue manager 2024-11-13 16:30:18 +01:00
Jonathan Kössler
9b60594ce1 Merge branch 'feature/RED-10441' into 'master'
RED-10441: Fix graceful shutdown

See merge request knecon/research/pyinfra!103
2024-11-13 14:48:34 +01:00
Jonathan Kössler
3d3c76b466 chore: bump version 2024-11-13 13:55:15 +01:00
Jonathan Kössler
9d4ec84b49 fix: use signals for graceful shutdown 2024-11-13 13:54:41 +01:00
Jonathan Kössler
8891249d7a Merge branch 'feature/RED-10441' into 'master'
RED-10441: fix abandoned queues

See merge request knecon/research/pyinfra!102
2024-11-13 09:35:36 +01:00
Jonathan Kössler
e51e5c33eb chore: cleanup 2024-11-12 17:24:57 +01:00
Jonathan Kössler
04c90533b6 refactor: fetch active tenants before start 2024-11-12 17:11:33 +01:00
Jonathan Kössler
86af05c12c feat: add logger to retry 2024-11-12 16:50:23 +01:00
Jonathan Kössler
c6e336cb35 refactor: tenant queues init 2024-11-12 15:55:11 +01:00
Jonathan Kössler
bf6f95f3e0 feat: exit on ClientResponseError 2024-11-12 15:32:11 +01:00
Jonathan Kössler
ed2bd1ec86 refactor: raise error if tenant service is not available 2024-11-12 13:30:21 +01:00
Julius Unverfehrt
9906f68e0a chore: bumb versions to enable package rebuild (current package has the wrong hash due to backup issues) 2024-11-11 12:47:27 +01:00
Julius Unverfehrt
0af648d66c fix: rebuild since mia and update rebuild kn_utils 2024-11-08 13:52:08 +01:00
Jonathan Kössler
46dc1fdce4 Merge branch 'feature/RES-809' into 'master'
RES-809: update kn_utils

See merge request knecon/research/pyinfra!101
2024-10-23 18:01:25 +02:00
Jonathan Kössler
bd2f0b9b9a feat: switch out tenacity retry with kn_utils 2024-10-23 16:06:06 +02:00
Jonathan Kössler
131afd7d3e chore: update kn_utils 2024-10-23 16:04:08 +02:00
Jonathan Kössler
98532c60ed Merge branch 'feature/RES-858-fix-graceful-shutdown' into 'master'
RES-858: fix graceful shutdown for unexpected broker disconnects

See merge request knecon/research/pyinfra!100
2024-09-30 09:54:25 +02:00
Jonathan Kössler
45377ba172 feat: improve on close callback and simplify exception handling 2024-09-27 17:11:10 +02:00
Jonathan Kössler
f855224e29 feat: add on close callback 2024-09-27 10:00:41 +02:00
Jonathan Kössler
541219177f feat: add error handling to shutdown logic 2024-09-26 12:28:55 +02:00
Jonathan Kössler
4119a7d7d7 chore: bump version 2024-09-26 11:05:12 +02:00
Jonathan Kössler
e2edfa7260 fix: simplify webserver shutdown 2024-09-26 10:33:05 +02:00
Jonathan Kössler
b70b16c541 Merge branch 'feature/RES-856-test-proto-format' into 'master'
RES-856: add type tests for proto format

See merge request knecon/research/pyinfra!99
2024-09-26 10:07:29 +02:00
Jonathan Kössler
e8d9326e48 chore: rewrite lock and bump version 2024-09-26 09:45:42 +02:00
Jonathan Kössler
9669152e14 Merge branch 'master' into feature/RES-856-test-proto-format 2024-09-26 09:39:28 +02:00
Jonathan Kössler
ed3f8088e1 Merge branch 'feature/RES-844-fix-tracing' into 'master'
RES-844: fix opentelemtry tracing

See merge request knecon/research/pyinfra!98
2024-09-26 09:13:52 +02:00
Jonathan Kössler
66eaa9a748 feat: set range for protobuf version 2024-09-25 14:16:40 +02:00
Jonathan Kössler
3a04359320 chore: bump pyinfra version 2024-09-25 11:59:52 +02:00
Jonathan Kössler
b46fcbd977 feat: add AioPikaInstrumentor 2024-09-25 11:58:51 +02:00
Jonathan Kössler
e75df42bec feat: skip keys in int conversion 2024-09-25 11:07:20 +02:00
Jonathan Kössler
3bab86fe83 chore: update test files 2024-09-24 11:59:08 +02:00
Jonathan Kössler
c5d53b8665 feat: add file comparison 2024-09-24 11:57:33 +02:00
Jonathan Kössler
09d39930e7 chore: cleanup test 2024-09-23 16:43:59 +02:00
Jonathan Kössler
a81f1bf31a chore: update protobuf to 25.5 2024-09-23 16:41:57 +02:00
Francisco Schulz
0783e95d22 Merge branch 'RED-10017-investigate-crashing-py-services-when-upload-large-number-of-files' into 'master'
fix: add semaphore to AsyncQueueManager to limit concurrent tasks

See merge request knecon/research/pyinfra!97
2024-09-23 15:19:40 +02:00
Francisco Schulz
8ec13502a9 fix: add semaphore to AsyncQueueManager to limit concurrent tasks 2024-09-23 15:19:40 +02:00
Jonathan Kössler
43881de526 feat: add tests for types of documentreader 2024-09-20 16:42:55 +02:00
Julius Unverfehrt
67c30a5620 fix: recompile proto schemas with experimental schema update 2024-09-20 15:23:13 +02:00
Francisco Schulz
8e21b2144c Merge branch 'fix-poetry-version' into 'master'
chore: update package version

See merge request knecon/research/pyinfra!96
2024-09-02 16:56:58 +02:00
francisco.schulz
5b45cae9a0 chore: update package version 2024-09-02 10:53:09 -04:00
Francisco Schulz
f2a5a2ea0e Merge branch 'custom-build-image-classification-service-protobuf' into 'master'
fix(temp): set protobuf version range to >=v3,<v4 so image-classification model keeps working

See merge request knecon/research/pyinfra!95
2024-09-02 16:48:56 +02:00
francisco.schulz
2133933d25 chore: update dependencies 2024-08-30 08:42:19 -04:00
francisco.schulz
4c8dc6ccc0 fix(temp): set protobuf version range to >=v3,<v4 so image-classification model keeps working 2024-08-30 08:37:31 -04:00
Julius Unverfehrt
5f31e2b15f Merge branch 'RES-842-pyinfra-fix-rabbit-mq-handler-shuts-down-when-queues-not-available-yet' into 'master'
fix(queuemanager): add retries to prevent container from shutting down when queues are not available yet

See merge request knecon/research/pyinfra!94
2024-08-30 13:59:02 +02:00
francisco.schulz
88aef57c5f chore: version increase 2024-08-29 11:18:36 -04:00
francisco.schulz
2b129b35f4 fix(queuemanager): add retries to prevent container from shutting down when queues are not available yet 2024-08-29 11:17:11 -04:00
Jonathan Kössler
facb9726f9 Merge branch 'feature/RES-840-add-client-connector-error' into 'master'
feat: add ClientConnectorError

See merge request knecon/research/pyinfra!93
2024-08-28 14:39:40 +02:00
Jonathan Kössler
b6a2069a6a feat: add ClientConnectorError 2024-08-28 10:28:12 +02:00
Jonathan Kössler
f626ef2e6f Merge branch 'bugfix/RES-834-service-disconnects' into 'master'
fix: pod restarts due to health check

See merge request knecon/research/pyinfra!92
2024-08-26 15:10:51 +02:00
Jonathan Kössler
318779413a fix: add signal to webserver 2024-08-23 17:23:53 +02:00
Jonathan Kössler
f27b1fbba1 chore: bump version 2024-08-23 16:56:54 +02:00
Jonathan Kössler
f2018f9c86 fix: process message in thread in event loop 2024-08-23 16:56:24 +02:00
Julius Unverfehrt
a5167d1230 Merge branch 'bugfix/RES-826-fix-initial-startup' into 'master'
fix: add async webserver for probes

See merge request knecon/research/pyinfra!91
2024-08-21 17:25:35 +02:00
Jonathan Kössler
1e939febc2 refactor: function naming 2024-08-21 17:02:04 +02:00
Jonathan Kössler
564f2cbb43 chore: bump version 2024-08-21 16:25:17 +02:00
Jonathan Kössler
fa44f36088 feat: add async webserver for probes 2024-08-21 16:24:20 +02:00
Jonathan Kössler
2970823cc1 Merge branch 'refactor/tenant_queue_settings' into 'master'
refactor: tenant queues settings

See merge request knecon/research/pyinfra!90
2024-08-19 14:43:24 +02:00
Jonathan Kössler
dba348a621 refactor: tenant queues settings 2024-08-19 14:37:48 +02:00
Jonathan Kössler
5020e54dcc Merge branch 'fix/RES-820-channel-opening' into 'master'
fix: use is_initialized instead of is_open

See merge request knecon/research/pyinfra!89
2024-08-16 14:23:46 +02:00
Jonathan Kössler
2bc332831e fix: use is_initialized instead of is_open 2024-08-16 12:37:28 +02:00
Jonathan Kössler
b3f1529be2 chore: bump version 2024-08-06 09:48:09 +02:00
Jonathan Kössler
789f6a7f7c Merge branch 'feat/RES-757-protobuffer' into 'master'
feat: add protobuffer

See merge request knecon/research/pyinfra!87
2024-08-06 09:44:01 +02:00
Jonathan Kössler
06ce8bbb22 Merge branch 'master' into feat/RES-757-protobuffer 2024-08-05 11:01:40 +02:00
Jonathan Kössler
fdde56991b Merge branch 'refactor/RES-780-graceful-shutdown' into 'master'
refactor: graceful shutdown

See merge request knecon/research/pyinfra!88
2024-08-02 13:57:04 +02:00
Jonathan Kössler
cb8509b120 refactor: message counter 2024-08-01 17:42:59 +02:00
Jonathan Kössler
47b42e95e2 refactor: graceful shutdown 2024-08-01 15:31:58 +02:00
Jonathan Kössler
536284ed84 chore: update readme 2024-08-01 09:56:13 +02:00
Jonathan Kössler
aeac1c58f9 chore: bump pyinfra version 2024-07-31 16:05:42 +02:00
Jonathan Kössler
b12b1ce42b refactor: use protoc 4.25.x as compiler to avoid dependency issues 2024-07-31 16:04:43 +02:00
Jonathan Kössler
50b7a877e9 fix: poetry lock 2024-07-30 10:45:37 +02:00
Jonathan Kössler
f3d0f24ea6 Merge branch 'master' into feat/RES-757-protobuffer 2024-07-30 10:40:56 +02:00
Jonathan Kössler
8f1ad1a4bd Merge branch 'feature/RES-731-add-queues-per-tenant' into 'master'
feat: refractor to work asynchronously

See merge request knecon/research/pyinfra!86
2024-07-29 15:06:05 +02:00
Jonathan Kössler
2a2028085e feat: add async retry for tenant server calls 2024-07-25 14:45:19 +02:00
Jonathan Kössler
66aaeca928 fix: async queue test 2024-07-24 17:28:13 +02:00
Jonathan Kössler
23aaaf68b1 refactor: simplify rabbitmq config 2024-07-23 18:34:50 +02:00
Jonathan Kössler
c7e0df758e feat: add async health endpoint 2024-07-23 15:42:48 +02:00
Jonathan Kössler
13d670091c chore: update readme 2024-07-22 17:31:32 +02:00
Jonathan Kössler
1520e96287 refactor: cleanup codebase 2024-07-22 16:57:02 +02:00
Jonathan Kössler
28451e8f8f chore: bump pyinfra version 2024-07-22 16:54:28 +02:00
Jonathan Kössler
596d4a9bd0 feat: add expiration for tenant event queue and retry to tenant api call 2024-07-22 16:48:31 +02:00
Julius Unverfehrt
70d3a210a1 feat: update data loader tests
We now compare the output proto json conversion to expected json files.
This revealed multiple differences between the file.

FIXED: int64 type was cast into string in python. We now get proper
integers

TODO: Empty fields are omitted by proto, but the jsons have them and the
pyinfra implementing services might expect them. We have to test this
behaviour and adjusts the tests accordingly.
2024-07-18 12:36:29 +02:00
Jonathan Kössler
f935056fa9 refactor: dataloader to not crash on unknown file formats 2024-07-17 13:54:50 +02:00
Jonathan Kössler
eeb4c3ce29 fix: add await to is_ready 2024-07-17 11:41:31 +02:00
Jonathan Kössler
b8833c7560 fix: settings mapping 2024-07-17 10:51:14 +02:00
Julius Unverfehrt
f175633f30 chore: track proto buf test data with dvc 2024-07-16 17:36:50 +02:00
Julius Unverfehrt
ceac21c1ef deps: add dvc 2024-07-16 17:35:03 +02:00
Julius Unverfehrt
0d232226fd feat: integrate proto data loader in pipeline 2024-07-16 17:34:39 +02:00
Julius Unverfehrt
9d55b3be89 feat: implement proto data loader 2024-07-16 16:32:58 +02:00
Julius Unverfehrt
edba6fc4da feat: track proto schmemata & add compilations to package 2024-07-16 16:31:48 +02:00
Julius Unverfehrt
c5d8a6ed84 feat: add proto requirements and instructions to readme for compiling the schemata 2024-07-16 16:30:32 +02:00
Julius Unverfehrt
c16000c774 fix(tracing test): make test work in case azure conntection string is missing 2024-07-15 16:13:41 +02:00
Jonathan Kössler
02665a5ef8 feat: align async queue manager 2024-07-12 15:14:13 +02:00
Jonathan Kössler
9c28498d8a feat: rollback testing logic for send_request 2024-07-12 15:12:46 +02:00
Jonathan Kössler
3c3580d3bc feat: add backwards compatibility 2024-07-12 12:26:56 +02:00
Jonathan Kössler
8ac16de0fa feat: add backwards compatibility 2024-07-12 12:23:45 +02:00
Jonathan Kössler
8844df44ce feat: add async_v2 2024-07-12 12:12:55 +02:00
Jonathan Kössler
a5162d5bf0 chore: update poetry deps 2024-07-12 12:10:31 +02:00
francisco.schulz
f9aec74d55 chore: clean up + improve robustness 2024-07-11 15:54:21 -04:00
francisco.schulz
7559118822 fix: remove sleep commands 2024-07-11 14:50:11 -04:00
francisco.schulz
5ff65f2cf4 feat(tests): add RabbitMQHandler class tests 2024-07-11 14:46:41 -04:00
francisco.schulz
cc25a20c24 feat(process_input_message): add message processing logic with support to pass in external message processor 2024-07-11 12:21:48 -04:00
francisco.schulz
f723bcb9b1 fix(fetch_active_tenants): propper async API call 2024-07-11 12:06:59 -04:00
francisco.schulz
abde776cd1 feat(RabbitMQHandler): add async test class 2024-07-11 11:55:52 -04:00
francisco.schulz
aa23894858 chose(dependencies): update 2024-07-11 11:55:17 -04:00
Jonathan Kössler
2da4f37620 feat: wip for multiple tenants - for pkg build 2024-07-11 12:49:07 +02:00
Jonathan Kössler
9b20a67ace feat: wip for multiple tenants - for pkg build 2024-07-11 11:41:09 +02:00
Jonathan Kössler
7b6408e0de feat: wip for multiple tenants - for pkg build 2024-07-11 11:04:02 +02:00
Jonathan Kössler
6e7c4ccb7b feat: wip for multiple tenants - for pkg build 2024-07-10 11:45:47 +02:00
Jonathan Kössler
b2e3ae092f feat: wip for multiple tenants 2024-07-09 18:20:55 +02:00
Jonathan Kössler
de41030e69 feat: wip for multiple tenants 2024-07-05 13:27:16 +02:00
Jonathan Kössler
c81d967aee feat: wip for multiple tenants 2024-07-03 17:51:47 +02:00
Jonathan Kössler
30330937ce feat: wip for multiple tenants 2024-07-02 18:07:23 +02:00
Jonathan Kössler
7624208188 feat: wip for multiple tenants 2024-07-01 18:15:04 +02:00
Jonathan Kössler
6fabe1ae8c feat: wip for multiple tenants 2024-06-28 15:41:53 +02:00
Jonathan Kössler
3532f949a9 refactor: remove second trace setup 2024-06-26 18:15:51 +02:00
Jonathan Kössler
65cc1c9aad fix: improve error handling for tracing settings 2024-06-26 18:02:52 +02:00
Jonathan Kössler
2484a5e9f7 chore: bump pyinfra version 2024-06-17 13:53:42 +02:00
Julius Unverfehrt
88fe7383f3 Merge branch 'feature/RES-718-add-azure-monitoring' into 'master'
RES-718: add azure tracing

See merge request knecon/research/pyinfra!85
2024-06-17 12:25:09 +02:00
Jonathan Kössler
18a0ddc2d3 feat: add tracing settings to validator 2024-06-13 08:47:50 +02:00
Jonathan Kössler
5328e8de03 refactor: streamline tracing types 2024-06-12 10:41:52 +02:00
Jonathan Kössler
9661d75d8a refactor: update tracing info for Azure Monitor 2024-06-11 14:31:06 +02:00
Jonathan Kössler
7dbcdf1650 feat: add azure opentelemtry monitoring 2024-06-11 12:00:18 +02:00
Julius Unverfehrt
4536f9d35b Merge branch 'RES-671-multi-file-dl' into 'master'
feat: add multiple file download

See merge request knecon/research/pyinfra!84
2024-04-18 16:47:00 +02:00
Julius Unverfehrt
a1e7b3b565 build: add SBOM and increment package version 2024-04-18 16:39:46 +02:00
Julius Unverfehrt
b810449bba feat: add multiple file download
The download function is now overloaded and additionlly supports a
dict with file paths as values, in addition to the present string as
file path. The data is forwarded as dict of the same structure in the
first case.
2024-04-18 16:35:55 +02:00
Julius Unverfehrt
f67813702a Merge branch 'RED-8978-no-crash-on-non-existing-files' into 'master'
fix: add error handling for file not found error

See merge request knecon/research/pyinfra!83
2024-04-16 16:28:25 +02:00
Julius Unverfehrt
ed4f912acf build: increment service version 2024-04-16 16:21:57 +02:00
Julius Unverfehrt
021222475b fix: add error handling for file not found error
When a file couldn't be downloaded from storage, the queue consumer now
informs the operator with a log and rejects the message, without crashing
but continuing its honest work.
2024-04-16 16:20:08 +02:00
Julius Unverfehrt
876253b3fb tests: add test for file not found error 2024-04-16 16:19:45 +02:00
Julius Unverfehrt
1689cd762b fix(CI): fix CI 2024-01-31 12:03:07 +01:00
Julius Unverfehrt
dc413cea82 Merge branch 'opentel' into 'master'
RES-506, RES-507, RES-499, RES-434, RES-398

See merge request knecon/research/pyinfra!82
2024-01-31 11:21:17 +01:00
Julius Unverfehrt
bfb27383e4 fix(settings): change precedence to ENV ROOT_PATH > root_path arg 2024-01-31 10:24:29 +01:00
Julius Unverfehrt
af914ab3ae fix(argparse): automatically output settings path 2024-01-31 10:12:32 +01:00
Julius Unverfehrt
7093e01925 feat(opentelemetry): add webserver tracing to default pipeline 2024-01-31 09:09:13 +01:00
Julius Unverfehrt
88cfb2b1c1 fix(settings): add debug log 2024-01-30 14:52:35 +01:00
Julius Unverfehrt
c1301d287f fix(dependencies): move opentel deps to main since groups are not packaged with CI script 2024-01-30 14:31:08 +01:00
Julius Unverfehrt
f1b8e5a25f refac(arg parse): rename settings parsing fn for clarity 2024-01-30 13:27:19 +01:00
Julius Unverfehrt
fff5be2e50 feat(settings): improve config loading logic
Load settings from .toml files, .env and environment variables. Also ensures a ROOT_PATH environment variable is
set. If ROOT_PATH is not set and no root_path argument is passed, the current working directory is used as root.
Settings paths can be a single .toml file, a folder containing .toml files or a list of .toml files and folders.
If a folder is passed, all .toml files in the folder are loaded. If settings path is None, only .env and
environment variables are loaded. If settings_path are relative paths, they are joined with the root_path argument.
2024-01-30 12:56:58 +01:00
Julius Unverfehrt
ec9ab21198 package: increment major version and update kn-utils 2024-01-25 11:08:50 +01:00
Julius Unverfehrt
b2f073e0c5 refactor: IoC for callback, update readme 2024-01-25 10:41:48 +01:00
Julius Unverfehrt
f6f56b8d8c refactoy: simplify storage connection logic 2024-01-25 09:08:51 +01:00
Isaac Riley
8ff637d6ba chore: add opentelemetry subsection to README.md; formatting 2024-01-25 08:25:19 +01:00
Julius Unverfehrt
c18475a77d feat(opentelemetry): improve readability 2024-01-24 17:46:54 +01:00
Julius Unverfehrt
e0b32fa448 feat(opentelemetry): fastAPI tracing
The tests don't work yet since the webserver has to run in a thread and
the traces don't get exported from the thread with local json exporting.
However, with an export to an external server this should still work.
WIP
2024-01-24 15:52:42 +01:00
Julius Unverfehrt
da163897c4 feat(opentelemetry): add fastapi instumentation 2024-01-24 14:26:10 +01:00
Julius Unverfehrt
a415666830 feat(opentelemetry): put logic in own module 2024-01-24 14:00:11 +01:00
Julius Unverfehrt
739a7c0731 feat(opentelemetry): add queue instrumenting test 2024-01-24 13:26:01 +01:00
Isaac Riley
936bb4fe80 feat: add opentelemetry on top of newly refactored pyinfra 2024-01-24 08:09:42 +01:00
39 changed files with 34924 additions and 1251 deletions

2
.dvc/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/config.local
/cache

5
.dvc/config Normal file
View File

@ -0,0 +1,5 @@
[core]
remote = azure
['remote "azure"']
url = azure://pyinfra-dvc
connection_string =

3
.dvcignore Normal file
View File

@ -0,0 +1,3 @@
# Add patterns of files dvc should ignore, which could improve
# the performance. Learn more at
# https://dvc.org/doc/user-guide/dvcignore

1
.gitignore vendored
View File

@ -31,7 +31,6 @@ __pycache__/
# file extensions
*.log
*.csv
*.json
*.pkl
*.profile
*.cbm

View File

@ -1,11 +1,23 @@
# CI for services, check gitlab repo for python package CI
include:
- project: "Gitlab/gitlab"
ref: 0.3.0
file: "/ci-templates/research/python_pkg_venv_test_build_release_gitlab-ci.yml"
ref: main
file: "/ci-templates/research/python_pkg-test-build-release.gitlab-ci.yml"
default:
image: python:3.10
# set project variables here
variables:
NEXUS_PROJECT_DIR: research # subfolder in Nexus docker-gin where your container will be stored
IMAGENAME: $CI_PROJECT_NAME # if the project URL is gitlab.example.com/group-name/project-1, CI_PROJECT_NAME is project-1
REPORTS_DIR: reports
FF_USE_FASTZIP: "true" # enable fastzip - a faster zip implementation that also supports level configuration.
ARTIFACT_COMPRESSION_LEVEL: default # can also be set to fastest, fast, slow and slowest. If just enabling fastzip is not enough try setting this to fastest or fast.
CACHE_COMPRESSION_LEVEL: default # same as above, but for caches
# TRANSFER_METER_FREQUENCY: 5s # will display transfer progress every 5 seconds for artifacts and remote caches. For debugging purposes.
run-tests:
script:
- echo "Disabled until we have an automated way to run docker compose before tests."
############
# UNIT TESTS
unit-tests:
variables:
###### UPDATE/EDIT ######
UNIT_TEST_DIR: "tests/unit_test"

View File

@ -1,42 +1,55 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
exclude: ^(docs/|notebooks/|data/|src/secrets/|src/static/|src/templates/|tests)
exclude: ^(docs/|notebooks/|data/|src/configs/|tests/|.hooks/)
default_language_version:
python: python3.8
python: python3.10
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
rev: v5.0.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
exclude: bamboo-specs/bamboo.yml
name: Check Gitlab CI (unsafe)
args: [--unsafe]
files: .gitlab-ci.yml
- id: check-yaml
exclude: .gitlab-ci.yml
- id: check-toml
- id: detect-private-key
- id: check-added-large-files
args: ['--maxkb=10000']
- id: check-case-conflict
- id: mixed-line-ending
# - repo: https://github.com/pycqa/pylint
# rev: v2.16.1
# hooks:
# - id: pylint
# args:
# ["--max-line-length=120", "--errors-only", "--ignore-imports=true", ]
- repo: https://github.com/pre-commit/mirrors-pylint
rev: v3.0.0a5
hooks:
- id: pylint
language: system
args:
- --disable=C0111,R0903
- --max-line-length=120
- repo: https://github.com/pre-commit/mirrors-isort
rev: v5.10.1
hooks:
- id: isort
args: ["--profile", "black"]
args:
- --profile black
- repo: https://github.com/psf/black
rev: 23.1.0
rev: 24.10.0
hooks:
- id: black
# exclude: ^(docs/|notebooks/|data/|src/secrets/)
args:
- --line-length=120
# - repo: local
# hooks:
# - id: system
# name: PyLint
# entry: poetry run pylint
# language: system
# exclude: ^alembic/
# files: \.py$
- repo: https://github.com/compilerla/conventional-pre-commit
rev: v3.6.0
hooks:
- id: conventional-pre-commit
pass_filenames: false
stages: [commit-msg]
# args: [] # optional: list of Conventional Commits types to allow e.g. [feat, fix, ci, chore, test]

View File

@ -1 +1 @@
3.10.12
3.10

130
README.md
View File

@ -6,13 +6,13 @@
4. [ Module Installation ](#module-installation)
5. [ Scripts ](#scripts)
6. [ Tests ](#tests)
7. [ Opentelemetry protobuf dependency hell ](#opentelemetry-protobuf-dependency-hell)
## About
Shared library for the research team, containing code related to infrastructure and communication with other services.
Offers a simple interface for processing data and sending responses via AMQP, monitoring via Prometheus and storage
access via S3 or Azure.
access via S3 or Azure. Also export traces via OpenTelemetry for queue messages and webserver requests.
To start, see the [complete example](pyinfra/examples.py) which shows how to use all features of the service and can be
imported and used directly for default research service pipelines (data ID in message, download data from storage,
@ -31,33 +31,68 @@ The following table shows all necessary settings. You can find a preconfigured s
bitbucket. These are the complete settings, you only need all if using all features of the service as described in
the [complete example](pyinfra/examples.py).
| Environment Variable | Internal / .toml Name | Description |
|------------------------------------|----------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| LOGGING__LEVEL | logging.level | Log level |
| METRICS__PROMETHEUS__ENABLED | metrics.prometheus.enabled | Enable Prometheus metrics collection |
| METRICS__PROMETHEUS__PREFIX | metrics.prometheus.prefix | Prefix for Prometheus metrics (e.g. {product}-{service}) |
| WEBSERVER__HOST | webserver.host | Host of the webserver (offering e.g. /prometheus, /ready and /health endpoints) |
| WEBSERVER__PORT | webserver.port | Port of the webserver |
| RABBITMQ__HOST | rabbitmq.host | Host of the RabbitMQ server |
| RABBITMQ__PORT | rabbitmq.port | Port of the RabbitMQ server |
| RABBITMQ__USERNAME | rabbitmq.username | Username for the RabbitMQ server |
| RABBITMQ__PASSWORD | rabbitmq.password | Password for the RabbitMQ server |
| RABBITMQ__HEARTBEAT | rabbitmq.heartbeat | Heartbeat for the RabbitMQ server |
| RABBITMQ__CONNECTION_SLEEP | rabbitmq.connection_sleep | Sleep time intervals during message processing. Has to be a divider of heartbeat, and shouldn't be too big, since only in these intervals queue interactions happen (like receiving new messages) This is also the minimum time the service needs to process a message. |
| RABBITMQ__INPUT_QUEUE | rabbitmq.input_queue | Name of the input queue |
| RABBITMQ__OUTPUT_QUEUE | rabbitmq.output_queue | Name of the output queue |
| RABBITMQ__DEAD_LETTER_QUEUE | rabbitmq.dead_letter_queue | Name of the dead letter queue |
| STORAGE__BACKEND | storage.backend | Storage backend to use (currently only "s3" and "azure" are supported) |
| STORAGE__CACHE_SIZE | storage.cache_size | Number of cached storage connection (to reduce connection stops and reconnects for multi tenancy). |
| STORAGE__S3__BUCKET_NAME | storage.s3.bucket_name | Name of the S3 bucket |
| STORAGE__S3__ENDPOINT | storage.s3.endpoint | Endpoint of the S3 server |
| STORAGE__S3__KEY | storage.s3.key | Access key for the S3 server |
| STORAGE__S3__SECRET | storage.s3.secret | Secret key for the S3 server |
| STORAGE__S3__REGION | storage.s3.region | Region of the S3 server |
| STORAGE__AZURE__CONTAINER | storage.azure.container_name | Name of the Azure container |
| STORAGE__AZURE__CONNECTION_STRING | storage.azure.connection_string | Connection string for the Azure server |
| STORAGE__TENANT_SERVER__PUBLIC_KEY | storage.tenant_server.public_key | Public key of the tenant server |
| STORAGE__TENANT_SERVER__ENDPOINT | storage.tenant_server.endpoint | Endpoint of the tenant server |
| Environment Variable | Internal / .toml Name | Description |
| ------------------------------------------ | --------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| LOGGING\_\_LEVEL | logging.level | Log level |
| DYNAMIC_TENANT_QUEUES\_\_ENABLED | dynamic_tenant_queues.enabled | Enable queues per tenant that are dynamically created mode |
| METRICS\_\_PROMETHEUS\_\_ENABLED | metrics.prometheus.enabled | Enable Prometheus metrics collection |
| METRICS\_\_PROMETHEUS\_\_PREFIX | metrics.prometheus.prefix | Prefix for Prometheus metrics (e.g. {product}-{service}) |
| WEBSERVER\_\_HOST | webserver.host | Host of the webserver (offering e.g. /prometheus, /ready and /health endpoints) |
| WEBSERVER\_\_PORT | webserver.port | Port of the webserver |
| RABBITMQ\_\_HOST | rabbitmq.host | Host of the RabbitMQ server |
| RABBITMQ\_\_PORT | rabbitmq.port | Port of the RabbitMQ server |
| RABBITMQ\_\_USERNAME | rabbitmq.username | Username for the RabbitMQ server |
| RABBITMQ\_\_PASSWORD | rabbitmq.password | Password for the RabbitMQ server |
| RABBITMQ\_\_HEARTBEAT | rabbitmq.heartbeat | Heartbeat for the RabbitMQ server |
| RABBITMQ\_\_CONNECTION_SLEEP | rabbitmq.connection_sleep | Sleep time intervals during message processing. Has to be a divider of heartbeat, and shouldn't be too big, since only in these intervals queue interactions happen (like receiving new messages) This is also the minimum time the service needs to process a message. |
| RABBITMQ\_\_INPUT_QUEUE | rabbitmq.input_queue | Name of the input queue in single queue setting |
| RABBITMQ\_\_OUTPUT_QUEUE | rabbitmq.output_queue | Name of the output queue in single queue setting |
| RABBITMQ\_\_DEAD_LETTER_QUEUE | rabbitmq.dead_letter_queue | Name of the dead letter queue in single queue setting |
| RABBITMQ\_\_TENANT_EVENT_QUEUE_SUFFIX | rabbitmq.tenant_event_queue_suffix | Suffix for the tenant event queue in multi tenant/queue setting |
| RABBITMQ\_\_TENANT_EVENT_DLQ_SUFFIX | rabbitmq.tenant_event_dlq_suffix | Suffix for the dead letter queue in multi tenant/queue setting |
| RABBITMQ\_\_TENANT_EXCHANGE_NAME | rabbitmq.tenant_exchange_name | Name of tenant exchange in multi tenant/queue setting |
| RABBITMQ\_\_QUEUE_EXPIRATION_TIME | rabbitmq.queue_expiration_time | Time until queue expiration in multi tenant/queue setting |
| RABBITMQ\_\_SERVICE_REQUEST_QUEUE_PREFIX | rabbitmq.service_request_queue_prefix | Service request queue prefix in multi tenant/queue setting |
| RABBITMQ\_\_SERVICE_REQUEST_EXCHANGE_NAME | rabbitmq.service_request_exchange_name | Service request exchange name in multi tenant/queue setting |
| RABBITMQ\_\_SERVICE_RESPONSE_EXCHANGE_NAME | rabbitmq.service_response_exchange_name | Service response exchange name in multi tenant/queue setting |
| RABBITMQ\_\_SERVICE_DLQ_NAME | rabbitmq.service_dlq_name | Service dead letter queue name in multi tenant/queue setting |
| STORAGE\_\_BACKEND | storage.backend | Storage backend to use (currently only "s3" and "azure" are supported) |
| STORAGE\_\_S3\_\_BUCKET | storage.s3.bucket | Name of the S3 bucket |
| STORAGE\_\_S3\_\_ENDPOINT | storage.s3.endpoint | Endpoint of the S3 server |
| STORAGE\_\_S3\_\_KEY | storage.s3.key | Access key for the S3 server |
| STORAGE\_\_S3\_\_SECRET | storage.s3.secret | Secret key for the S3 server |
| STORAGE\_\_S3\_\_REGION | storage.s3.region | Region of the S3 server |
| STORAGE\_\_AZURE\_\_CONTAINER | storage.azure.container_name | Name of the Azure container |
| STORAGE\_\_AZURE\_\_CONNECTION_STRING | storage.azure.connection_string | Connection string for the Azure server |
| STORAGE\_\_TENANT_SERVER\_\_PUBLIC_KEY | storage.tenant_server.public_key | Public key of the tenant server |
| STORAGE\_\_TENANT_SERVER\_\_ENDPOINT | storage.tenant_server.endpoint | Endpoint of the tenant server |
| TRACING\_\_ENABLED | tracing.enabled | Enable tracing |
| TRACING\_\_TYPE | tracing.type | Tracing mode - possible values: "opentelemetry", "azure_monitor" (Excpects APPLICATIONINSIGHTS_CONNECTION_STRING environment variable.) |
| TRACING\_\_OPENTELEMETRY\_\_ENDPOINT | tracing.opentelemetry.endpoint | Endpoint to which OpenTelemetry traces are exported |
| TRACING\_\_OPENTELEMETRY\_\_SERVICE_NAME | tracing.opentelemetry.service_name | Name of the service as displayed in the traces collected |
| TRACING\_\_OPENTELEMETRY\_\_EXPORTER | tracing.opentelemetry.exporter | Name of exporter |
| KUBERNETES\_\_POD_NAME | kubernetes.pod_name | Service pod name |
## Setup
**IMPORTANT** you need to set the following environment variables before running the setup script:
- ``$NEXUS_USER`` your Nexus user (usually equal to firstname.lastname@knecon.com)
- ``$NEXUS_PASSWORD`` your Nexus password (usually equal to your Azure Login)
```shell
# create venv and activate it
source ./scripts/setup/devenvsetup.sh {{ cookiecutter.python_version }} $NEXUS_USER $NEXUS_PASSWORD
source .venv/bin/activate
```
### OpenTelemetry
Open telemetry (vis its Python SDK) is set up to be as unobtrusive as possible; for typical use cases it can be
configured
from environment variables, without additional work in the microservice app, although additional confiuration is
possible.
`TRACING__OPENTELEMETRY__ENDPOINT` should typically be set
to `http://otel-collector-opentelemetry-collector.otel-collector:4318/v1/traces`.
## Queue Manager
@ -66,7 +101,7 @@ to the output queue. The default callback also downloads data from the storage a
The response message does not contain the data itself, but the identifiers from the input message (including headers
beginning with "X-").
Usage:
### Standalone Usage
```python
from pyinfra.queue.manager import QueueManager
@ -77,7 +112,32 @@ settings = load_settings("path/to/settings")
processing_function: DataProcessor # function should expect a dict (json) or bytes (pdf) as input and should return a json serializable object.
queue_manager = QueueManager(settings)
queue_manager.start_consuming(make_download_process_upload_callback(processing_function, settings))
callback = make_download_process_upload_callback(processing_function, settings)
queue_manager.start_consuming(make_download_process_upload_callback(callback, settings))
```
### Usage in a Service
This is the recommended way to use the module. This includes the webserver, Prometheus metrics and health endpoints.
Custom endpoints can be added by adding a new route to the `app` object beforehand. Settings are loaded from files
specified as CLI arguments (e.g. `--settings-path path/to/settings.toml`). The values can also be set or overriden via
environment variables (e.g. `LOGGING__LEVEL=DEBUG`).
The callback can be replaced with a custom one, for example if the data to process is contained in the message itself
and not on the storage.
```python
from pyinfra.config.loader import load_settings, parse_settings_path
from pyinfra.examples import start_standard_queue_consumer
from pyinfra.queue.callback import make_download_process_upload_callback, DataProcessor
processing_function: DataProcessor
arguments = parse_settings_path()
settings = load_settings(arguments.settings_path)
callback = make_download_process_upload_callback(processing_function, settings)
start_standard_queue_consumer(callback, settings) # optionally also pass a fastAPI app object with preconfigured routes
```
### AMQP input message:
@ -139,7 +199,7 @@ $ cd tests && docker compose up
**Shell 2**: Start pyinfra with callback mock
```bash
$ python scripts/start_pyinfra.py
$ python scripts/start_pyinfra.py
```
**Shell 3**: Upload dummy content on storage and publish message
@ -152,3 +212,9 @@ $ python scripts/send_request.py
Tests require a running minio and rabbitmq container, meaning you have to run `docker compose up` in the tests folder
before running the tests.
## OpenTelemetry Protobuf Dependency Hell
**Note**: Status 2025/01/09: the currently used `opentelemetry-exporter-otlp-proto-http` version `1.25.0` requires
a `protobuf` version < `5.x.x` and is not compatible with the latest protobuf version `5.27.x`. This is an [open issue](https://github.com/open-telemetry/opentelemetry-python/issues/3958) in opentelemetry, because [support for 4.25.x ends in Q2 '25](https://protobuf.dev/support/version-support/#python).
Therefore, we should keep this in mind and update the dependency once opentelemetry includes support for `protobuf 5.27.x`.

27459
bom.json Normal file

File diff suppressed because it is too large Load Diff

6779
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +1,6 @@
import argparse
import os
from functools import partial
from pathlib import Path
from typing import Union
@ -7,20 +8,28 @@ from dynaconf import Dynaconf, ValidationError, Validator
from funcy import lflatten
from kn_utils.logging import logger
# This path is ment for testing purposes and convenience. It probably won't reflect the actual root path when pyinfra is
# installed as a package, so don't use it in production code, but define your own root path as described in load config.
local_pyinfra_root_path = Path(__file__).parents[2]
def load_settings(settings_path: Union[str, Path] = None, validators: list[Validator] = None):
settings_path = Path(settings_path) if settings_path else None
validators = validators or get_all_validators()
if not settings_path:
logger.info("No settings path specified, only loading .env end ENVs.")
settings_files = []
elif os.path.isdir(settings_path):
logger.info(f"Settings path is a directory, loading all .toml files in the directory: {settings_path}")
settings_files = list(settings_path.glob("*.toml"))
else:
logger.info(f"Settings path is a file, loading only the specified file: {settings_path}")
settings_files = [settings_path]
def load_settings(
settings_path: Union[str, Path, list] = "config/",
root_path: Union[str, Path] = None,
validators: list[Validator] = None,
):
"""Load settings from .toml files, .env and environment variables. Also ensures a ROOT_PATH environment variable is
set. If ROOT_PATH is not set and no root_path argument is passed, the current working directory is used as root.
Settings paths can be a single .toml file, a folder containing .toml files or a list of .toml files and folders.
If a ROOT_PATH environment variable is set, it is not overwritten by the root_path argument.
If a folder is passed, all .toml files in the folder are loaded. If settings path is None, only .env and
environment variables are loaded. If settings_path are relative paths, they are joined with the root_path argument.
"""
root_path = get_or_set_root_path(root_path)
validators = validators or get_pyinfra_validators()
settings_files = normalize_to_settings_files(settings_path, root_path)
settings = Dynaconf(
load_dotenv=True,
@ -34,10 +43,63 @@ def load_settings(settings_path: Union[str, Path] = None, validators: list[Valid
return settings
pyinfra_config_path = Path(__file__).resolve().parents[2] / "config/"
def normalize_to_settings_files(settings_path: Union[str, Path, list], root_path: Union[str, Path]):
if settings_path is None:
logger.info("No settings path specified, only loading .env end ENVs.")
settings_files = []
elif isinstance(settings_path, str) or isinstance(settings_path, Path):
settings_files = [settings_path]
elif isinstance(settings_path, list):
settings_files = settings_path
else:
raise ValueError(f"Invalid settings path: {settings_path=}")
settings_files = lflatten(map(partial(_normalize_and_verify, root_path=root_path), settings_files))
logger.debug(f"Normalized settings files: {settings_files}")
return settings_files
def get_all_validators():
def _normalize_and_verify(settings_path: Path, root_path: Path):
settings_path = Path(settings_path)
root_path = Path(root_path)
if not settings_path.is_absolute():
logger.debug(f"Settings path is not absolute, joining with root path: {root_path}")
settings_path = root_path / settings_path
if settings_path.is_dir():
logger.debug(f"Settings path is a directory, loading all .toml files in the directory: {settings_path}")
settings_files = list(settings_path.glob("*.toml"))
elif settings_path.is_file():
logger.debug(f"Settings path is a file, loading specified file: {settings_path}")
settings_files = [settings_path]
else:
raise ValueError(f"Invalid settings path: {settings_path=}, {root_path=}")
return settings_files
def get_or_set_root_path(root_path: Union[str, Path] = None):
env_root_path = os.environ.get("ROOT_PATH")
if env_root_path:
root_path = env_root_path
logger.debug(f"'ROOT_PATH' environment variable is set to {root_path}.")
elif root_path:
logger.info(f"'ROOT_PATH' environment variable is not set, setting to {root_path}.")
os.environ["ROOT_PATH"] = str(root_path)
else:
root_path = Path.cwd()
logger.info(f"'ROOT_PATH' environment variable is not set, defaulting to working directory {root_path}.")
os.environ["ROOT_PATH"] = str(root_path)
return root_path
def get_pyinfra_validators():
import pyinfra.config.validators
return lflatten(
@ -61,13 +123,11 @@ def validate_settings(settings: Dynaconf, validators):
logger.debug("Settings validated.")
def parse_args():
def parse_settings_path():
parser = argparse.ArgumentParser()
parser.add_argument(
"--settings_path",
"-s",
type=Path,
default=pyinfra_config_path,
help="Path to settings file or folder. Must be a .toml file or a folder containing .toml files.",
"settings_path",
help="Path to settings file(s) or folder(s). Must be .toml file(s) or a folder(s) containing .toml files.",
nargs="+",
)
return parser.parse_args()
return parser.parse_args().settings_path

View File

@ -44,3 +44,14 @@ webserver_validators = [
Validator("webserver.host", must_exist=True, is_type_of=str),
Validator("webserver.port", must_exist=True, is_type_of=int),
]
tracing_validators = [
Validator("tracing.enabled", must_exist=True, is_type_of=bool),
Validator("tracing.type", must_exist=True, is_type_of=str)
]
opentelemetry_validators = [
Validator("tracing.opentelemetry.endpoint", must_exist=True, is_type_of=str),
Validator("tracing.opentelemetry.service_name", must_exist=True, is_type_of=str),
Validator("tracing.opentelemetry.exporter", must_exist=True, is_type_of=str)
]

View File

@ -1,38 +1,169 @@
import asyncio
import signal
import sys
import aiohttp
from aiormq.exceptions import AMQPConnectionError
from dynaconf import Dynaconf
from fastapi import FastAPI
from kn_utils.logging import logger
from pyinfra.queue.callback import make_download_process_upload_callback, DataProcessor
from pyinfra.config.loader import get_pyinfra_validators, validate_settings
from pyinfra.queue.async_manager import AsyncQueueManager, RabbitMQConfig
from pyinfra.queue.callback import Callback
from pyinfra.queue.manager import QueueManager
from pyinfra.webserver.prometheus import add_prometheus_endpoint, \
make_prometheus_processing_time_decorator_from_settings
from pyinfra.webserver.utils import add_health_check_endpoint, create_webserver_thread_from_settings
from pyinfra.utils.opentelemetry import instrument_app, instrument_pika, setup_trace
from pyinfra.webserver.prometheus import (
add_prometheus_endpoint,
make_prometheus_processing_time_decorator_from_settings,
)
from pyinfra.webserver.utils import (
add_health_check_endpoint,
create_webserver_thread_from_settings,
run_async_webserver,
)
shutdown_flag = False
def start_queue_consumer_with_prometheus_and_health_endpoints(process_fn: DataProcessor, settings: Dynaconf):
async def graceful_shutdown(manager: AsyncQueueManager, queue_task, webserver_task):
global shutdown_flag
shutdown_flag = True
logger.info("SIGTERM received, shutting down gracefully...")
if queue_task and not queue_task.done():
queue_task.cancel()
# await queue manager shutdown
await asyncio.gather(queue_task, manager.shutdown(), return_exceptions=True)
if webserver_task and not webserver_task.done():
webserver_task.cancel()
# await webserver shutdown
await asyncio.gather(webserver_task, return_exceptions=True)
logger.info("Shutdown complete.")
async def run_async_queues(manager: AsyncQueueManager, app, port, host):
"""Run the async webserver and the async queue manager concurrently."""
queue_task = None
webserver_task = None
tenant_api_available = True
# add signal handler for SIGTERM and SIGINT
loop = asyncio.get_running_loop()
loop.add_signal_handler(
signal.SIGTERM, lambda: asyncio.create_task(graceful_shutdown(manager, queue_task, webserver_task))
)
loop.add_signal_handler(
signal.SIGINT, lambda: asyncio.create_task(graceful_shutdown(manager, queue_task, webserver_task))
)
try:
active_tenants = await manager.fetch_active_tenants()
queue_task = asyncio.create_task(manager.run(active_tenants=active_tenants), name="queues")
webserver_task = asyncio.create_task(run_async_webserver(app, port, host), name="webserver")
await asyncio.gather(queue_task, webserver_task)
except asyncio.CancelledError:
logger.info("Main task was cancelled, initiating shutdown.")
except AMQPConnectionError as e:
logger.warning(f"AMQPConnectionError: {e} - shutting down.")
except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError):
logger.warning("Tenant server did not answer - shutting down.")
tenant_api_available = False
except Exception as e:
logger.error(f"An error occurred while running async queues: {e}", exc_info=True)
sys.exit(1)
finally:
if shutdown_flag:
logger.debug("Graceful shutdown already in progress.")
else:
logger.warning("Initiating shutdown due to error or manual interruption.")
if not tenant_api_available:
sys.exit(0)
if queue_task and not queue_task.done():
queue_task.cancel()
if webserver_task and not webserver_task.done():
webserver_task.cancel()
await asyncio.gather(queue_task, manager.shutdown(), webserver_task, return_exceptions=True)
logger.info("Shutdown complete.")
def start_standard_queue_consumer(
callback: Callback,
settings: Dynaconf,
app: FastAPI = None,
):
"""Default serving logic for research services.
Supplies /health, /ready and /prometheus endpoints. The process_fn is monitored for processing time per call.
Workload is only received via queue messages. The message contains a file path to the data to be processed, which
gets downloaded from the storage. The data and the message are then passed to the process_fn. The process_fn should
return a json serializable object. This object is then uploaded to the storage. The response message is just the
original message.
Adapt as needed.
Supplies /health, /ready and /prometheus endpoints (if enabled). The callback is monitored for processing time per
message. Also traces the queue messages via openTelemetry (if enabled).
Workload is received via queue messages and processed by the callback function (see pyinfra.queue.callback for
callbacks).
"""
logger.info(f"Starting webserver and queue consumer...")
validate_settings(settings, get_pyinfra_validators())
app = FastAPI()
logger.info("Starting webserver and queue consumer...")
app = add_prometheus_endpoint(app)
process_fn = make_prometheus_processing_time_decorator_from_settings(settings)(process_fn)
app = app or FastAPI()
queue_manager = QueueManager(settings)
if settings.metrics.prometheus.enabled:
logger.info("Prometheus metrics enabled.")
app = add_prometheus_endpoint(app)
callback = make_prometheus_processing_time_decorator_from_settings(settings)(callback)
app = add_health_check_endpoint(app, queue_manager.is_ready)
if settings.tracing.enabled:
setup_trace(settings)
webserver_thread = create_webserver_thread_from_settings(app, settings)
webserver_thread.start()
instrument_pika(dynamic_queues=settings.dynamic_tenant_queues.enabled)
instrument_app(app)
callback = make_download_process_upload_callback(process_fn, settings)
queue_manager.start_consuming(callback)
if settings.dynamic_tenant_queues.enabled:
logger.info("Dynamic tenant queues enabled. Running async queues.")
config = RabbitMQConfig(
host=settings.rabbitmq.host,
port=settings.rabbitmq.port,
username=settings.rabbitmq.username,
password=settings.rabbitmq.password,
heartbeat=settings.rabbitmq.heartbeat,
input_queue_prefix=settings.rabbitmq.service_request_queue_prefix,
tenant_event_queue_suffix=settings.rabbitmq.tenant_event_queue_suffix,
tenant_exchange_name=settings.rabbitmq.tenant_exchange_name,
service_request_exchange_name=settings.rabbitmq.service_request_exchange_name,
service_response_exchange_name=settings.rabbitmq.service_response_exchange_name,
service_dead_letter_queue_name=settings.rabbitmq.service_dlq_name,
queue_expiration_time=settings.rabbitmq.queue_expiration_time,
pod_name=settings.kubernetes.pod_name,
)
manager = AsyncQueueManager(
config=config,
tenant_service_url=settings.storage.tenant_server.endpoint,
message_processor=callback,
max_concurrent_tasks=(
settings.asyncio.max_concurrent_tasks if hasattr(settings.asyncio, "max_concurrent_tasks") else 10
),
)
else:
logger.info("Dynamic tenant queues disabled. Running sync queues.")
manager = QueueManager(settings)
app = add_health_check_endpoint(app, manager.is_ready)
if isinstance(manager, AsyncQueueManager):
asyncio.run(run_async_queues(manager, app, port=settings.webserver.port, host=settings.webserver.host))
elif isinstance(manager, QueueManager):
webserver = create_webserver_thread_from_settings(app, settings)
webserver.start()
try:
manager.start_consuming(callback)
except Exception as e:
logger.error(f"An error occurred while consuming messages: {e}", exc_info=True)
sys.exit(1)
else:
logger.warning(f"Behavior for type {type(manager)} is not defined")

View File

@ -0,0 +1,329 @@
import asyncio
import concurrent.futures
import json
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, Set
import aiohttp
from aio_pika import ExchangeType, IncomingMessage, Message, connect
from aio_pika.abc import (
AbstractChannel,
AbstractConnection,
AbstractExchange,
AbstractIncomingMessage,
AbstractQueue,
)
from aio_pika.exceptions import (
ChannelClosed,
ChannelInvalidStateError,
ConnectionClosed,
)
from aiormq.exceptions import AMQPConnectionError
from kn_utils.logging import logger
from kn_utils.retry import retry
@dataclass
class RabbitMQConfig:
host: str
port: int
username: str
password: str
heartbeat: int
input_queue_prefix: str
tenant_event_queue_suffix: str
tenant_exchange_name: str
service_request_exchange_name: str
service_response_exchange_name: str
service_dead_letter_queue_name: str
queue_expiration_time: int
pod_name: str
connection_params: Dict[str, object] = field(init=False)
def __post_init__(self):
self.connection_params = {
"host": self.host,
"port": self.port,
"login": self.username,
"password": self.password,
"client_properties": {"heartbeat": self.heartbeat},
}
class AsyncQueueManager:
def __init__(
self,
config: RabbitMQConfig,
tenant_service_url: str,
message_processor: Callable[[Dict[str, Any]], Dict[str, Any]],
max_concurrent_tasks: int = 10,
):
self.config = config
self.tenant_service_url = tenant_service_url
self.message_processor = message_processor
self.semaphore = asyncio.Semaphore(max_concurrent_tasks)
self.connection: AbstractConnection | None = None
self.channel: AbstractChannel | None = None
self.tenant_exchange: AbstractExchange | None = None
self.input_exchange: AbstractExchange | None = None
self.output_exchange: AbstractExchange | None = None
self.tenant_exchange_queue: AbstractQueue | None = None
self.tenant_queues: Dict[str, AbstractChannel] = {}
self.consumer_tags: Dict[str, str] = {}
self.message_count: int = 0
@retry(tries=5, exceptions=AMQPConnectionError, reraise=True, logger=logger)
async def connect(self) -> None:
logger.info("Attempting to connect to RabbitMQ...")
self.connection = await connect(**self.config.connection_params)
self.connection.close_callbacks.add(self.on_connection_close)
self.channel = await self.connection.channel()
await self.channel.set_qos(prefetch_count=1)
logger.info("Successfully connected to RabbitMQ")
async def on_connection_close(self, sender, exc):
"""This is a callback for unexpected connection closures."""
logger.debug(f"Sender: {sender}")
if isinstance(exc, ConnectionClosed):
logger.warning("Connection to RabbitMQ lost. Attempting to reconnect...")
try:
active_tenants = await self.fetch_active_tenants()
await self.run(active_tenants=active_tenants)
logger.debug("Reconnected to RabbitMQ successfully")
except Exception as e:
logger.warning(f"Failed to reconnect to RabbitMQ: {e}")
# cancel queue manager and webserver to shutdown service
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
[task.cancel() for task in tasks if task.get_name() in ["queues", "webserver"]]
else:
logger.debug("Connection closed on purpose.")
async def is_ready(self) -> bool:
if self.connection is None or self.connection.is_closed:
try:
await self.connect()
except Exception as e:
logger.error(f"Failed to connect to RabbitMQ: {e}")
return False
return True
@retry(tries=5, exceptions=(AMQPConnectionError, ChannelInvalidStateError), reraise=True, logger=logger)
async def setup_exchanges(self) -> None:
self.tenant_exchange = await self.channel.declare_exchange(
self.config.tenant_exchange_name, ExchangeType.TOPIC, durable=True
)
self.input_exchange = await self.channel.declare_exchange(
self.config.service_request_exchange_name, ExchangeType.DIRECT, durable=True
)
self.output_exchange = await self.channel.declare_exchange(
self.config.service_response_exchange_name, ExchangeType.DIRECT, durable=True
)
# we must declare DLQ to handle error messages
self.dead_letter_queue = await self.channel.declare_queue(
self.config.service_dead_letter_queue_name, durable=True
)
@retry(tries=5, exceptions=(AMQPConnectionError, ChannelInvalidStateError), reraise=True, logger=logger)
async def setup_tenant_queue(self) -> None:
self.tenant_exchange_queue = await self.channel.declare_queue(
f"{self.config.pod_name}_{self.config.tenant_event_queue_suffix}",
durable=True,
arguments={
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": self.config.service_dead_letter_queue_name,
"x-expires": self.config.queue_expiration_time,
},
)
await self.tenant_exchange_queue.bind(self.tenant_exchange, routing_key="tenant.*")
self.consumer_tags["tenant_exchange_queue"] = await self.tenant_exchange_queue.consume(
self.process_tenant_message
)
async def process_tenant_message(self, message: AbstractIncomingMessage) -> None:
try:
async with message.process():
message_body = json.loads(message.body.decode())
logger.debug(f"Tenant message received: {message_body}")
tenant_id = message_body["tenantId"]
routing_key = message.routing_key
if routing_key == "tenant.created":
await self.create_tenant_queues(tenant_id)
elif routing_key == "tenant.delete":
await self.delete_tenant_queues(tenant_id)
except Exception as e:
logger.error(e, exc_info=True)
async def create_tenant_queues(self, tenant_id: str) -> None:
queue_name = f"{self.config.input_queue_prefix}_{tenant_id}"
logger.info(f"Declaring queue: {queue_name}")
try:
input_queue = await self.channel.declare_queue(
queue_name,
durable=True,
arguments={
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": self.config.service_dead_letter_queue_name,
},
)
await input_queue.bind(self.input_exchange, routing_key=tenant_id)
self.consumer_tags[tenant_id] = await input_queue.consume(self.process_input_message)
self.tenant_queues[tenant_id] = input_queue
logger.info(f"Created and started consuming queue for tenant {tenant_id}")
except Exception as e:
logger.error(e, exc_info=True)
async def delete_tenant_queues(self, tenant_id: str) -> None:
if tenant_id in self.tenant_queues:
# somehow queue.delete() does not work here
await self.channel.queue_delete(f"{self.config.input_queue_prefix}_{tenant_id}")
del self.tenant_queues[tenant_id]
del self.consumer_tags[tenant_id]
logger.info(f"Deleted queues for tenant {tenant_id}")
async def process_input_message(self, message: IncomingMessage) -> None:
async def process_message_body_and_await_result(unpacked_message_body):
async with self.semaphore:
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread_pool_executor:
logger.info("Processing payload in a separate thread.")
result = await loop.run_in_executor(
thread_pool_executor, self.message_processor, unpacked_message_body
)
return result
async with message.process(ignore_processed=True):
if message.redelivered:
logger.warning(f"Declining message with {message.delivery_tag=} due to it being redelivered.")
await message.nack(requeue=False)
return
if message.body.decode("utf-8") == "STOP":
logger.info("Received stop signal, stopping consumption...")
await message.ack()
# TODO: shutdown is probably not the right call here - align w/ Dev what should happen on stop signal
await self.shutdown()
return
self.message_count += 1
try:
tenant_id = message.routing_key
filtered_message_headers = (
{k: v for k, v in message.headers.items() if k.lower().startswith("x-")} if message.headers else {}
)
logger.debug(f"Processing message with {filtered_message_headers=}.")
result: dict = await (
process_message_body_and_await_result({**json.loads(message.body), **filtered_message_headers})
or {}
)
if result:
await self.publish_to_output_exchange(tenant_id, result, filtered_message_headers)
await message.ack()
logger.debug(f"Message with {message.delivery_tag=} acknowledged.")
else:
raise ValueError(f"Could not process message with {message.body=}.")
except json.JSONDecodeError:
await message.nack(requeue=False)
logger.error(f"Invalid JSON in input message: {message.body}", exc_info=True)
except FileNotFoundError as e:
logger.warning(f"{e}, declining message with {message.delivery_tag=}.", exc_info=True)
await message.nack(requeue=False)
except Exception as e:
await message.nack(requeue=False)
logger.error(f"Error processing input message: {e}", exc_info=True)
finally:
self.message_count -= 1
async def publish_to_output_exchange(self, tenant_id: str, result: Dict[str, Any], headers: Dict[str, Any]) -> None:
await self.output_exchange.publish(
Message(body=json.dumps(result).encode(), headers=headers),
routing_key=tenant_id,
)
logger.info(f"Published result to queue {tenant_id}.")
@retry(tries=5, exceptions=(aiohttp.ClientResponseError, aiohttp.ClientConnectorError), reraise=True, logger=logger)
async def fetch_active_tenants(self) -> Set[str]:
async with aiohttp.ClientSession() as session:
async with session.get(self.tenant_service_url) as response:
response.raise_for_status()
if response.headers["content-type"].lower() == "application/json":
data = await response.json()
return {tenant["tenantId"] for tenant in data}
else:
logger.error(
f"Failed to fetch active tenants. Content type is not JSON: {response.headers['content-type'].lower()}"
)
return set()
@retry(
tries=5,
exceptions=(
AMQPConnectionError,
ChannelInvalidStateError,
),
reraise=True,
logger=logger,
)
async def initialize_tenant_queues(self, active_tenants: set) -> None:
for tenant_id in active_tenants:
await self.create_tenant_queues(tenant_id)
async def run(self, active_tenants: set) -> None:
await self.connect()
await self.setup_exchanges()
await self.initialize_tenant_queues(active_tenants=active_tenants)
await self.setup_tenant_queue()
logger.info("RabbitMQ handler is running. Press CTRL+C to exit.")
async def close_channels(self) -> None:
try:
if self.channel and not self.channel.is_closed:
# Cancel queues to stop fetching messages
logger.debug("Cancelling queues...")
for tenant, queue in self.tenant_queues.items():
await queue.cancel(self.consumer_tags[tenant])
if self.tenant_exchange_queue:
await self.tenant_exchange_queue.cancel(self.consumer_tags["tenant_exchange_queue"])
while self.message_count != 0:
logger.debug(f"Messages are still being processed: {self.message_count=} ")
await asyncio.sleep(2)
await self.channel.close(exc=asyncio.CancelledError)
logger.debug("Channel closed.")
else:
logger.debug("No channel to close.")
except ChannelClosed:
logger.warning("Channel was already closed.")
except ConnectionClosed:
logger.warning("Connection was lost, unable to close channel.")
except Exception as e:
logger.error(f"Error during channel shutdown: {e}")
async def close_connection(self) -> None:
try:
if self.connection and not self.connection.is_closed:
await self.connection.close(exc=asyncio.CancelledError)
logger.debug("Connection closed.")
else:
logger.debug("No connection to close.")
except ConnectionClosed:
logger.warning("Connection was already closed.")
except Exception as e:
logger.error(f"Error closing connection: {e}")
async def shutdown(self) -> None:
logger.info("Shutting down RabbitMQ handler...")
await self.close_channels()
await self.close_connection()
logger.info("RabbitMQ handler shut down successfully.")

View File

@ -1,23 +1,27 @@
from typing import Callable, Union
from typing import Callable
from dynaconf import Dynaconf
from kn_utils.logging import logger
from pyinfra.storage.connection import get_storage
from pyinfra.storage.utils import download_data_as_specified_in_message, upload_data_as_specified_in_message
from pyinfra.storage.utils import (
download_data_bytes_as_specified_in_message,
upload_data_as_specified_in_message,
DownloadedData,
)
DataProcessor = Callable[[Union[dict, bytes], dict], dict]
DataProcessor = Callable[[dict[str, DownloadedData] | DownloadedData, dict], dict | list | str]
Callback = Callable[[dict], dict]
def make_download_process_upload_callback(data_processor: DataProcessor, settings: Dynaconf):
def make_download_process_upload_callback(data_processor: DataProcessor, settings: Dynaconf) -> Callback:
"""Default callback for processing queue messages.
Data will be downloaded from the storage as specified in the message. If a tenant id is specified, the storage
will be configured to use that tenant id, otherwise the storage is configured as specified in the settings.
The data is the passed to the dataprocessor, together with the message. The dataprocessor should return a
json serializable object. This object is then uploaded to the storage as specified in the message.
The response message is just the original message.
Adapt as needed.
json serializable object. This object is then uploaded to the storage as specified in the message. The response
message is just the original message.
"""
def inner(queue_message_payload: dict) -> dict:
@ -25,7 +29,9 @@ def make_download_process_upload_callback(data_processor: DataProcessor, setting
storage = get_storage(settings, queue_message_payload.get("X-TENANT-ID"))
data = download_data_as_specified_in_message(storage, queue_message_payload)
data: dict[str, DownloadedData] | DownloadedData = download_data_bytes_as_specified_in_message(
storage, queue_message_payload
)
result = data_processor(data, queue_message_payload)

View File

@ -4,17 +4,17 @@ import json
import logging
import signal
import sys
from typing import Union, Callable
from typing import Callable, Union
import pika
import pika.exceptions
from dynaconf import Dynaconf
from kn_utils.logging import logger
from kn_utils.retry import retry
from pika.adapters.blocking_connection import BlockingChannel, BlockingConnection
from retry import retry
from pyinfra.config.validators import queue_manager_validators
from pyinfra.config.loader import validate_settings
from pyinfra.config.validators import queue_manager_validators
pika_logger = logging.getLogger("pika")
pika_logger.setLevel(logging.WARNING) # disables non-informative pika log clutter
@ -35,11 +35,16 @@ class QueueManager:
self.connection: Union[BlockingConnection, None] = None
self.channel: Union[BlockingChannel, None] = None
self.connection_sleep = settings.rabbitmq.connection_sleep
self.processing_callback = False
self.received_signal = False
atexit.register(self.stop_consuming)
signal.signal(signal.SIGTERM, self._handle_stop_signal)
signal.signal(signal.SIGINT, self._handle_stop_signal)
self.max_retries = settings.rabbitmq.max_retries or 5
self.max_delay = settings.rabbitmq.max_delay or 60
@staticmethod
def create_connection_parameters(settings: Dynaconf):
credentials = pika.PlainCredentials(username=settings.rabbitmq.username, password=settings.rabbitmq.password)
@ -52,9 +57,12 @@ class QueueManager:
return pika.ConnectionParameters(**pika_connection_params)
@retry(tries=3, delay=5, jitter=(1, 3), logger=logger)
@retry(
tries=5,
exceptions=(pika.exceptions.AMQPConnectionError, pika.exceptions.ChannelClosedByBroker),
reraise=True,
)
def establish_connection(self):
# TODO: set sensible retry parameters
if self.connection and self.connection.is_open:
logger.debug("Connection to RabbitMQ already established.")
return
@ -77,19 +85,31 @@ class QueueManager:
logger.info("Connection to RabbitMQ established, channel open.")
def is_ready(self):
self.establish_connection()
return self.channel.is_open
try:
self.establish_connection()
return self.channel.is_open
except Exception as e:
logger.error(f"Failed to establish connection: {e}")
return False
@retry(exceptions=pika.exceptions.AMQPConnectionError, tries=3, delay=5, jitter=(1, 3), logger=logger)
@retry(
tries=5,
exceptions=pika.exceptions.AMQPConnectionError,
reraise=True,
)
def start_consuming(self, message_processor: Callable):
on_message_callback = self._make_on_message_callback(message_processor)
try:
self.establish_connection()
self.channel.basic_consume(self.input_queue, on_message_callback)
logger.info("Starting to consume messages...")
self.channel.start_consuming()
except Exception:
logger.error("An unexpected error occurred while consuming messages. Consuming will stop.", exc_info=True)
except pika.exceptions.AMQPConnectionError as e:
logger.error(f"AMQP Connection Error: {e}")
raise
except Exception as e:
logger.error(f"An unexpected error occurred while consuming messages: {e}", exc_info=True)
raise
finally:
self.stop_consuming()
@ -141,17 +161,17 @@ class QueueManager:
logger.info("Processing payload in separate thread.")
future = thread_pool_executor.submit(message_processor, unpacked_message_body)
# FIXME: This block is probably not necessary, but kept since the implications of removing it are
# TODO: This block is probably not necessary, but kept since the implications of removing it are
# unclear. Remove it in a future iteration where less changes are being made to the code base.
while future.running():
logger.debug("Waiting for payload processing to finish...")
self.connection.process_data_events()
self.connection.sleep(self.connection_sleep)
return future.result()
def on_message_callback(channel, method, properties, body):
logger.info(f"Received message from queue with delivery_tag {method.delivery_tag}.")
self.processing_callback = True
if method.redelivered:
logger.warning(f"Declining message with {method.delivery_tag=} due to it being redelivered.")
@ -185,14 +205,25 @@ class QueueManager:
channel.basic_ack(delivery_tag=method.delivery_tag)
logger.debug(f"Message with {method.delivery_tag=} acknowledged.")
except FileNotFoundError as e:
logger.warning(f"{e}, declining message with {method.delivery_tag=}.")
channel.basic_nack(method.delivery_tag, requeue=False)
except Exception:
logger.warning(f"Failed to process message with {method.delivery_tag=}, declining...", exc_info=True)
channel.basic_nack(method.delivery_tag, requeue=False)
raise
finally:
self.processing_callback = False
if self.received_signal:
self.stop_consuming()
sys.exit(0)
return on_message_callback
def _handle_stop_signal(self, signum, *args, **kwargs):
logger.info(f"Received signal {signum}, stopping consuming...")
self.stop_consuming()
sys.exit(0)
self.received_signal = True
if not self.processing_callback:
self.stop_consuming()
sys.exit(0)

View File

@ -4,92 +4,86 @@ import requests
from dynaconf import Dynaconf
from kn_utils.logging import logger
from pyinfra.config.loader import validate_settings
from pyinfra.config.validators import (
multi_tenant_storage_validators,
storage_validators,
)
from pyinfra.storage.storages.azure import get_azure_storage_from_settings
from pyinfra.storage.storages.s3 import get_s3_storage_from_settings
from pyinfra.storage.storages.storage import Storage
from pyinfra.utils.cipher import decrypt
from pyinfra.config.validators import storage_validators, multi_tenant_storage_validators
from pyinfra.config.loader import validate_settings
def get_storage(settings: Dynaconf, tenant_id: str = None) -> Storage:
"""Get storage connection based on settings.
If tenant_id is provided, gets storage connection information from tenant server instead.
The connections are cached based on the settings.cache_size value.
In the future, when the default storage from config is no longer needed (only multi-tenant storage will be used),
get_storage_from_tenant_id can replace this function directly.
"""Establishes a storage connection.
If tenant_id is provided, gets storage connection information from tenant server. These connections are cached.
Otherwise, gets storage connection information from settings.
"""
logger.info("Establishing storage connection...")
if tenant_id:
logger.info(f"Using tenant storage for {tenant_id}.")
return get_storage_from_tenant_id(tenant_id, settings)
else:
logger.info("Using default storage.")
return get_storage_from_settings(settings)
validate_settings(settings, multi_tenant_storage_validators)
return get_storage_for_tenant(
tenant_id,
settings.storage.tenant_server.endpoint,
settings.storage.tenant_server.public_key,
)
def get_storage_from_settings(settings: Dynaconf) -> Storage:
logger.info("Using default storage.")
validate_settings(settings, storage_validators)
@lru_cache(maxsize=settings.storage.cache_size)
def _get_storage(backend: str) -> Storage:
return storage_dispatcher[backend](settings)
return _get_storage(settings.storage.backend)
def get_storage_from_tenant_id(tenant_id: str, settings: Dynaconf) -> Storage:
validate_settings(settings, multi_tenant_storage_validators)
@lru_cache(maxsize=settings.storage.cache_size)
def _get_storage(tenant: str, endpoint: str, public_key: str) -> Storage:
response = requests.get(f"{endpoint}/{tenant}").json()
maybe_azure = response.get("azureStorageConnection")
maybe_s3 = response.get("s3StorageConnection")
assert (maybe_azure or maybe_s3) and not (maybe_azure and maybe_s3), "Only one storage backend can be used."
if maybe_azure:
connection_string = decrypt(public_key, maybe_azure["connectionString"])
backend = "azure"
storage_info = {
"storage": {
"azure": {
"connection_string": connection_string,
"container": maybe_azure["containerName"],
},
}
}
elif maybe_s3:
secret = decrypt(public_key, maybe_s3["secret"])
backend = "s3"
storage_info = {
"storage": {
"s3": {
"endpoint": maybe_s3["endpoint"],
"key": maybe_s3["key"],
"secret": secret,
"region": maybe_s3["region"],
"bucket": maybe_s3["bucketName"],
},
}
}
else:
raise Exception(f"Unknown storage backend in {response}.")
storage_settings = Dynaconf()
storage_settings.update(storage_info)
storage = storage_dispatcher[backend](storage_settings)
return storage
return _get_storage(tenant_id, settings.storage.tenant_server.endpoint, settings.storage.tenant_server.public_key)
return storage_dispatcher[settings.storage.backend](settings)
storage_dispatcher = {
"azure": get_azure_storage_from_settings,
"s3": get_s3_storage_from_settings,
}
@lru_cache(maxsize=10)
def get_storage_for_tenant(tenant: str, endpoint: str, public_key: str) -> Storage:
response = requests.get(f"{endpoint}/{tenant}").json()
maybe_azure = response.get("azureStorageConnection")
maybe_s3 = response.get("s3StorageConnection")
assert (maybe_azure or maybe_s3) and not (maybe_azure and maybe_s3), "Only one storage backend can be used."
if maybe_azure:
connection_string = decrypt(public_key, maybe_azure["connectionString"])
backend = "azure"
storage_info = {
"storage": {
"azure": {
"connection_string": connection_string,
"container": maybe_azure["containerName"],
},
}
}
elif maybe_s3:
secret = decrypt(public_key, maybe_s3["secret"])
backend = "s3"
storage_info = {
"storage": {
"s3": {
"endpoint": maybe_s3["endpoint"],
"key": maybe_s3["key"],
"secret": secret,
"region": maybe_s3["region"],
"bucket": maybe_s3["bucketName"],
},
}
}
else:
raise Exception(f"Unknown storage backend in {response}.")
storage_settings = Dynaconf()
storage_settings.update(storage_info)
storage = storage_dispatcher[backend](storage_settings)
return storage

View File

@ -7,9 +7,9 @@ from dynaconf import Dynaconf
from kn_utils.logging import logger
from retry import retry
from pyinfra.storage.storages.storage import Storage
from pyinfra.config.validators import azure_storage_validators
from pyinfra.config.loader import validate_settings
from pyinfra.config.validators import azure_storage_validators
from pyinfra.storage.storages.storage import Storage
logging.getLogger("azure").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)

View File

@ -7,9 +7,9 @@ from kn_utils.logging import logger
from minio import Minio
from retry import retry
from pyinfra.storage.storages.storage import Storage
from pyinfra.config.validators import s3_storage_validators
from pyinfra.config.loader import validate_settings
from pyinfra.config.validators import s3_storage_validators
from pyinfra.storage.storages.storage import Storage
from pyinfra.utils.url_parsing import validate_and_parse_s3_endpoint

View File

@ -1,6 +1,7 @@
import gzip
import json
from typing import Union
from functools import singledispatch
from typing import TypedDict
from kn_utils.logging import logger
from pydantic import BaseModel, ValidationError
@ -18,6 +19,17 @@ class DossierIdFileIdDownloadPayload(BaseModel):
return f"{self.dossierId}/{self.fileId}.{self.targetFileExtension}"
class TenantIdDossierIdFileIdDownloadPayload(BaseModel):
tenantId: str
dossierId: str
fileId: str
targetFileExtension: str
@property
def targetFilePath(self):
return f"{self.tenantId}/{self.dossierId}/{self.fileId}.{self.targetFileExtension}"
class DossierIdFileIdUploadPayload(BaseModel):
dossierId: str
fileId: str
@ -28,50 +40,79 @@ class DossierIdFileIdUploadPayload(BaseModel):
return f"{self.dossierId}/{self.fileId}.{self.responseFileExtension}"
class TenantIdDossierIdFileIdUploadPayload(BaseModel):
tenantId: str
dossierId: str
fileId: str
responseFileExtension: str
@property
def responseFilePath(self):
return f"{self.tenantId}/{self.dossierId}/{self.fileId}.{self.responseFileExtension}"
class TargetResponseFilePathDownloadPayload(BaseModel):
targetFilePath: str
targetFilePath: str | dict[str, str]
class TargetResponseFilePathUploadPayload(BaseModel):
responseFilePath: str
def download_data_as_specified_in_message(storage: Storage, raw_payload: dict) -> Union[dict, bytes]:
class DownloadedData(TypedDict):
data: bytes
file_path: str
def download_data_bytes_as_specified_in_message(
storage: Storage, raw_payload: dict
) -> dict[str, DownloadedData] | DownloadedData:
"""Convenience function to download a file specified in a message payload.
Supports both legacy and new payload formats.
If the content is compressed with gzip (.gz), it will be decompressed (-> bytes).
If the content is a json file, it will be decoded (-> dict).
If no file is specified in the payload or the file does not exist in storage, an exception will be raised.
In all other cases, the content will be returned as is (-> bytes).
This function can be extended in the future as needed (e.g. handling of more file types), but since further
requirements are not specified at this point in time, and it is unclear what these would entail, the code is kept
simple for now to improve readability, maintainability and avoid refactoring efforts of generic solutions that
weren't as generic as they seemed.
Supports both legacy and new payload formats. Also supports downloading multiple files at once, which should
be specified in a dictionary under the 'targetFilePath' key with the file path as value.
The data is downloaded as bytes and returned as a dictionary with the file path as key and the data as value.
In case of several download targets, a nested dictionary is returned with the same keys and dictionaries with
the file path and data as values.
"""
try:
if "dossierId" in raw_payload:
if "tenantId" in raw_payload and "dossierId" in raw_payload:
payload = TenantIdDossierIdFileIdDownloadPayload(**raw_payload)
elif "tenantId" not in raw_payload and "dossierId" in raw_payload:
payload = DossierIdFileIdDownloadPayload(**raw_payload)
else:
payload = TargetResponseFilePathDownloadPayload(**raw_payload)
except ValidationError:
raise ValueError("No download file path found in payload, nothing to download.")
if not storage.exists(payload.targetFilePath):
raise FileNotFoundError(f"File '{payload.targetFilePath}' does not exist in storage.")
data = storage.get_object(payload.targetFilePath)
data = gzip.decompress(data) if ".gz" in payload.targetFilePath else data
data = json.loads(data.decode("utf-8")) if ".json" in payload.targetFilePath else data
logger.info(f"Downloaded {payload.targetFilePath} from storage.")
data = _download(payload.targetFilePath, storage)
return data
@singledispatch
def _download(
file_path_or_file_path_dict: str | dict[str, str], storage: Storage
) -> dict[str, DownloadedData] | DownloadedData:
pass
@_download.register(str)
def _download_single_file(file_path: str, storage: Storage) -> DownloadedData:
if not storage.exists(file_path):
raise FileNotFoundError(f"File '{file_path}' does not exist in storage.")
data = storage.get_object(file_path)
logger.info(f"Downloaded {file_path} from storage.")
return DownloadedData(data=data, file_path=file_path)
@_download.register(dict)
def _download_multiple_files(file_path_dict: dict, storage: Storage) -> dict[str, DownloadedData]:
return {key: _download(value, storage) for key, value in file_path_dict.items()}
def upload_data_as_specified_in_message(storage: Storage, raw_payload: dict, data):
"""Convenience function to upload a file specified in a message payload. For now, only json serializable data is
supported. The storage json consists of the raw_payload, which is extended with a 'data' key, containing the
@ -87,7 +128,9 @@ def upload_data_as_specified_in_message(storage: Storage, raw_payload: dict, dat
"""
try:
if "dossierId" in raw_payload:
if "tenantId" in raw_payload and "dossierId" in raw_payload:
payload = TenantIdDossierIdFileIdUploadPayload(**raw_payload)
elif "tenantId" not in raw_payload and "dossierId" in raw_payload:
payload = DossierIdFileIdUploadPayload(**raw_payload)
else:
payload = TargetResponseFilePathUploadPayload(**raw_payload)

View File

@ -0,0 +1,96 @@
import json
from azure.monitor.opentelemetry import configure_azure_monitor
from dynaconf import Dynaconf
from fastapi import FastAPI
from kn_utils.logging import logger
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.aio_pika import AioPikaInstrumentor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.pika import PikaInstrumentor
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
ConsoleSpanExporter,
SpanExporter,
SpanExportResult,
)
from pyinfra.config.loader import validate_settings
from pyinfra.config.validators import opentelemetry_validators
class JsonSpanExporter(SpanExporter):
def __init__(self):
self.traces = []
def export(self, spans):
for span in spans:
self.traces.append(json.loads(span.to_json()))
return SpanExportResult.SUCCESS
def shutdown(self):
pass
def setup_trace(settings: Dynaconf, service_name: str = None, exporter: SpanExporter = None):
tracing_type = settings.tracing.type
if tracing_type == "azure_monitor":
# Configure OpenTelemetry to use Azure Monitor with the
# APPLICATIONINSIGHTS_CONNECTION_STRING environment variable.
try:
configure_azure_monitor()
logger.info("Azure Monitor tracing enabled.")
except Exception as exception:
logger.warning(f"Azure Monitor tracing could not be enabled: {exception}")
elif tracing_type == "opentelemetry":
configure_opentelemtry_tracing(settings, service_name, exporter)
logger.info("OpenTelemetry tracing enabled.")
else:
logger.warning(f"Unknown tracing type: {tracing_type}. Tracing could not be enabled.")
def configure_opentelemtry_tracing(settings: Dynaconf, service_name: str = None, exporter: SpanExporter = None):
service_name = service_name or settings.tracing.opentelemetry.service_name
exporter = exporter or get_exporter(settings)
resource = Resource(attributes={"service.name": service_name})
provider = TracerProvider(resource=resource, shutdown_on_exit=True)
processor = BatchSpanProcessor(exporter)
provider.add_span_processor(processor)
# TODO: trace.set_tracer_provider produces a warning if trying to set the provider twice.
# "WARNING opentelemetry.trace:__init__.py:521 Overriding of current TracerProvider is not allowed"
# This doesn't seem to affect the functionality since we only want to use the tracer provided set in the beginning.
# We work around the log message by using the protected method with log=False.
trace._set_tracer_provider(provider, log=False)
def get_exporter(settings: Dynaconf):
validate_settings(settings, validators=opentelemetry_validators)
if settings.tracing.opentelemetry.exporter == "json":
return JsonSpanExporter()
elif settings.tracing.opentelemetry.exporter == "otlp":
return OTLPSpanExporter(endpoint=settings.tracing.opentelemetry.endpoint)
elif settings.tracing.opentelemetry.exporter == "console":
return ConsoleSpanExporter()
else:
raise ValueError(
f"Invalid OpenTelemetry exporter {settings.tracing.opentelemetry.exporter}. "
f"Valid values are 'json', 'otlp' and 'console'."
)
def instrument_pika(dynamic_queues: bool):
if dynamic_queues:
AioPikaInstrumentor().instrument()
else:
PikaInstrumentor().instrument()
def instrument_app(app: FastAPI, excluded_urls: str = "/health,/ready,/prometheus"):
FastAPIInstrumentor().instrument_app(app, excluded_urls=excluded_urls)

View File

@ -4,11 +4,11 @@ from typing import Callable, TypeVar
from dynaconf import Dynaconf
from fastapi import FastAPI
from funcy import identity
from prometheus_client import generate_latest, CollectorRegistry, REGISTRY, Summary
from prometheus_client import REGISTRY, CollectorRegistry, Summary, generate_latest
from starlette.responses import Response
from pyinfra.config.validators import prometheus_validators
from pyinfra.config.loader import validate_settings
from pyinfra.config.validators import prometheus_validators
def add_prometheus_endpoint(app: FastAPI, registry: CollectorRegistry = REGISTRY) -> FastAPI:
@ -36,16 +36,11 @@ def make_prometheus_processing_time_decorator_from_settings(
postfix: str = "processing_time",
registry: CollectorRegistry = REGISTRY,
) -> Decorator:
"""Make a decorator for monitoring the processing time of a function. The decorator is only applied if the
prometheus metrics are enabled in the settings.
This, and other metrics should follow the convention
{product name}_{service name}_{processing step / parameter to monitor}.
"""Make a decorator for monitoring the processing time of a function. This, and other metrics should follow the
convention {product name}_{service name}_{processing step / parameter to monitor}.
"""
validate_settings(settings, validators=prometheus_validators)
if not settings.metrics.prometheus.enabled:
return identity
processing_time_sum = Summary(
f"{settings.metrics.prometheus.prefix}_{postfix}",
"Summed up processing time per call.",

View File

@ -1,18 +1,35 @@
import asyncio
import inspect
import logging
import signal
import threading
import time
from typing import Callable
import uvicorn
from dynaconf import Dynaconf
from fastapi import FastAPI
from kn_utils.logging import logger
from kn_utils.retry import retry
from pyinfra.config.validators import webserver_validators
from pyinfra.config.loader import validate_settings
from pyinfra.config.validators import webserver_validators
class PyInfraUvicornServer(uvicorn.Server):
# this is a workaround to enable custom signal handlers
# https://github.com/encode/uvicorn/issues/1579
def install_signal_handlers(self):
pass
@retry(
tries=5,
exceptions=Exception,
reraise=True,
)
def create_webserver_thread_from_settings(app: FastAPI, settings: Dynaconf) -> threading.Thread:
validate_settings(settings, validators=webserver_validators)
return create_webserver_thread(app=app, port=settings.webserver.port, host=settings.webserver.host)
@ -20,11 +37,43 @@ def create_webserver_thread(app: FastAPI, port: int, host: str) -> threading.Thr
"""Creates a thread that runs a FastAPI webserver. Start with thread.start(), and join with thread.join().
Note that the thread is a daemon thread, so it will be terminated when the main thread is terminated.
"""
thread = threading.Thread(target=lambda: uvicorn.run(app, port=port, host=host, log_level=logging.WARNING))
def run_server():
retries = 5
for attempt in range(retries):
try:
uvicorn.run(app, port=port, host=host, log_level=logging.WARNING)
break
except Exception as e:
if attempt < retries - 1: # if it's not the last attempt
logger.warning(f"Attempt {attempt + 1} failed to start the server: {e}. Retrying...")
time.sleep(2**attempt) # exponential backoff
else:
logger.error(f"Failed to start the server after {retries} attempts: {e}")
raise
thread = threading.Thread(target=run_server)
thread.daemon = True
return thread
async def run_async_webserver(app: FastAPI, port: int, host: str):
"""Run the FastAPI web server async."""
config = uvicorn.Config(app, host=host, port=port, log_level=logging.WARNING)
server = PyInfraUvicornServer(config)
try:
await server.serve()
except asyncio.CancelledError:
logger.debug("Webserver was cancelled.")
server.should_exit = True
await server.shutdown()
except Exception as e:
logger.error(f"Error while running the webserver: {e}", exc_info=True)
finally:
logger.info("Webserver has been shut down.")
HealthFunction = Callable[[], bool]
@ -32,13 +81,23 @@ def add_health_check_endpoint(app: FastAPI, health_function: HealthFunction) ->
"""Add a health check endpoint to the app. The health function should return True if the service is healthy,
and False otherwise. The health function is called when the endpoint is hit.
"""
if inspect.iscoroutinefunction(health_function):
@app.get("/health")
@app.get("/ready")
def check_health():
if health_function():
return {"status": "OK"}, 200
else:
@app.get("/health")
@app.get("/ready")
async def async_check_health():
alive = await health_function()
if alive:
return {"status": "OK"}, 200
return {"status": "Service Unavailable"}, 503
else:
@app.get("/health")
@app.get("/ready")
def check_health():
if health_function():
return {"status": "OK"}, 200
return {"status": "Service Unavailable"}, 503
return app

View File

@ -1,10 +1,9 @@
[tool.poetry]
name = "pyinfra"
version = "1.10.0"
version = "4.1.0"
description = ""
authors = ["Team Research <research@knecon.com>"]
license = "All rights reseverd"
readme = "README.md"
[tool.poetry.dependencies]
python = ">=3.10,<3.11"
@ -19,18 +18,43 @@ azure-storage-blob = "^12.13"
# misc utils
funcy = "^2"
pycryptodome = "^3.19"
# research shared packages
kn-utils = { version = "^0.2.4.dev112", source = "gitlab-research" }
fastapi = "^0.109.0"
uvicorn = "^0.26.0"
# DONT USE GROUPS BECAUSE THEY ARE NOT INSTALLED FOR PACKAGES
# [tool.poetry.group.internal.dependencies] <<< THIS IS NOT WORKING
kn-utils = { version = ">=0.4.0", source = "nexus" }
# We set all opentelemetry dependencies to lower bound because the image classification service depends on a protobuf version <4, but does not use proto files.
# Therefore, we allow latest possible protobuf version in the services which use proto files. As soon as the dependency issue is fixed set this to the latest possible opentelemetry version
opentelemetry-instrumentation-pika = ">=0.46b0,<0.50"
opentelemetry-exporter-otlp = ">=1.25.0,<1.29"
opentelemetry-instrumentation = ">=0.46b0,<0.50"
opentelemetry-api = ">=1.25.0,<1.29"
opentelemetry-sdk = ">=1.25.0,<1.29"
opentelemetry-exporter-otlp-proto-http = ">=1.25.0,<1.29"
opentelemetry-instrumentation-flask = ">=0.46b0,<0.50"
opentelemetry-instrumentation-requests = ">=0.46b0,<0.50"
opentelemetry-instrumentation-fastapi = ">=0.46b0,<0.50"
opentelemetry-instrumentation-aio-pika = ">=0.46b0,<0.50"
wcwidth = "<=0.2.12"
azure-monitor-opentelemetry = "^1.6.0"
aio-pika = "^9.4.2"
aiohttp = "^3.9.5"
# THIS IS NOT AVAILABLE FOR SERVICES THAT IMPLEMENT PYINFRA
[tool.poetry.group.dev.dependencies]
pytest = "^7"
ipykernel = "^6.26.0"
black = "^23.10"
black = "^24.10"
pylint = "^3"
coverage = "^7.3"
requests = "^2.31"
pre-commit = "^3.6.0"
cyclonedx-bom = "^4.1.1"
dvc = "^3.51.2"
dvc-azure = "^3.1.0"
deepdiff = "^7.0.1"
pytest-cov = "^5.0.0"
[tool.pytest.ini_options]
minversion = "6.0"
@ -39,13 +63,39 @@ testpaths = ["tests", "integration"]
log_cli = 1
log_cli_level = "DEBUG"
[tool.mypy]
exclude = ['.venv']
[tool.black]
line-length = 120
target-version = ["py310"]
[tool.isort]
profile = "black"
[tool.pylint.format]
max-line-length = 120
disable = [
"C0114",
"C0325",
"R0801",
"R0902",
"R0903",
"R0904",
"R0913",
"R0914",
"W0511",
]
docstring-min-length = 3
[[tool.poetry.source]]
name = "PyPI"
name = "pypi-proxy"
url = "https://nexus.knecon.com/repository/pypi-proxy/simple"
priority = "primary"
[[tool.poetry.source]]
name = "gitlab-research"
url = "https://gitlab.knecon.com/api/v4/groups/19/-/packages/pypi/simple"
name = "nexus"
url = "https://nexus.knecon.com/repository/python/simple"
priority = "explicit"
[build-system]

View File

@ -0,0 +1,150 @@
import asyncio
import gzip
import json
from operator import itemgetter
from typing import Any, Dict
from aio_pika import Message
from aio_pika.abc import AbstractIncomingMessage
from kn_utils.logging import logger
from pyinfra.config.loader import load_settings, local_pyinfra_root_path
from pyinfra.queue.async_manager import AsyncQueueManager, RabbitMQConfig
from pyinfra.storage.storages.s3 import S3Storage, get_s3_storage_from_settings
settings = load_settings(local_pyinfra_root_path / "config/")
async def dummy_message_processor(message: Dict[str, Any]) -> Dict[str, Any]:
logger.info(f"Processing message: {message}")
# await asyncio.sleep(1) # Simulate processing time
storage = get_s3_storage_from_settings(settings)
tenant_id, dossier_id, file_id = itemgetter("tenantId", "dossierId", "fileId")(message)
suffix = message["responseFileExtension"]
object_name = f"{tenant_id}/{dossier_id}/{file_id}.{message['targetFileExtension']}"
original_content = json.loads(gzip.decompress(storage.get_object(object_name)))
processed_content = {
"processedPages": original_content["numberOfPages"],
"processedSectionTexts": f"Processed: {original_content['sectionTexts']}",
}
processed_object_name = f"{tenant_id}/{dossier_id}/{file_id}.{suffix}"
processed_data = gzip.compress(json.dumps(processed_content).encode("utf-8"))
storage.put_object(processed_object_name, processed_data)
processed_message = message.copy()
processed_message["processed"] = True
processed_message["processor_message"] = "This message was processed by the dummy processor"
logger.info(f"Finished processing message. Result: {processed_message}")
return processed_message
async def on_response_message_callback(storage: S3Storage):
async def on_message(message: AbstractIncomingMessage) -> None:
async with message.process(ignore_processed=True):
if not message.body:
raise ValueError
response = json.loads(message.body)
logger.info(f"Received {response}")
logger.info(f"Message headers: {message.properties.headers}")
await message.ack()
tenant_id, dossier_id, file_id = itemgetter("tenantId", "dossierId", "fileId")(response)
suffix = response["responseFileExtension"]
result = storage.get_object(f"{tenant_id}/{dossier_id}/{file_id}.{suffix}")
result = json.loads(gzip.decompress(result))
logger.info(f"Contents of result on storage: {result}")
return on_message
def upload_json_and_make_message_body(tenant_id: str):
dossier_id, file_id, suffix = "dossier", "file", "json.gz"
content = {
"numberOfPages": 7,
"sectionTexts": "data",
}
object_name = f"{tenant_id}/{dossier_id}/{file_id}.{suffix}"
data = gzip.compress(json.dumps(content).encode("utf-8"))
storage = get_s3_storage_from_settings(settings)
if not storage.has_bucket():
storage.make_bucket()
storage.put_object(object_name, data)
message_body = {
"tenantId": tenant_id,
"dossierId": dossier_id,
"fileId": file_id,
"targetFileExtension": suffix,
"responseFileExtension": f"result.{suffix}",
}
return message_body, storage
async def test_rabbitmq_handler() -> None:
tenant_service_url = settings.storage.tenant_server.endpoint
config = RabbitMQConfig(
host=settings.rabbitmq.host,
port=settings.rabbitmq.port,
username=settings.rabbitmq.username,
password=settings.rabbitmq.password,
heartbeat=settings.rabbitmq.heartbeat,
input_queue_prefix=settings.rabbitmq.service_request_queue_prefix,
tenant_event_queue_suffix=settings.rabbitmq.tenant_event_queue_suffix,
tenant_exchange_name=settings.rabbitmq.tenant_exchange_name,
service_request_exchange_name=settings.rabbitmq.service_request_exchange_name,
service_response_exchange_name=settings.rabbitmq.service_response_exchange_name,
service_dead_letter_queue_name=settings.rabbitmq.service_dlq_name,
queue_expiration_time=settings.rabbitmq.queue_expiration_time,
pod_name=settings.kubernetes.pod_name,
)
handler = AsyncQueueManager(config, tenant_service_url, dummy_message_processor)
await handler.connect()
await handler.setup_exchanges()
tenant_id = "test_tenant"
# Test tenant creation
create_message = {"tenantId": tenant_id}
await handler.tenant_exchange.publish(
Message(body=json.dumps(create_message).encode()), routing_key="tenant.created"
)
logger.info(f"Sent create tenant message for {tenant_id}")
await asyncio.sleep(0.5) # Wait for queue creation
# Prepare service request
service_request, storage = upload_json_and_make_message_body(tenant_id)
# Test service request
await handler.input_exchange.publish(Message(body=json.dumps(service_request).encode()), routing_key=tenant_id)
logger.info(f"Sent service request for {tenant_id}")
await asyncio.sleep(5) # Wait for message processing
# Consume service request
response_queue = await handler.channel.declare_queue(name=f"response_queue_{tenant_id}")
await response_queue.bind(exchange=handler.output_exchange, routing_key=tenant_id)
callback = await on_response_message_callback(storage)
await response_queue.consume(callback=callback)
await asyncio.sleep(5) # Wait for message processing
# Test tenant deletion
delete_message = {"tenantId": tenant_id}
await handler.tenant_exchange.publish(
Message(body=json.dumps(delete_message).encode()), routing_key="tenant.delete"
)
logger.info(f"Sent delete tenant message for {tenant_id}")
await asyncio.sleep(0.5) # Wait for queue deletion
await handler.connection.close()
if __name__ == "__main__":
asyncio.run(test_rabbitmq_handler())

View File

@ -4,11 +4,11 @@ from operator import itemgetter
from kn_utils.logging import logger
from pyinfra.config.loader import load_settings, pyinfra_config_path
from pyinfra.config.loader import load_settings, local_pyinfra_root_path
from pyinfra.queue.manager import QueueManager
from pyinfra.storage.storages.s3 import get_s3_storage_from_settings
settings = load_settings(pyinfra_config_path)
settings = load_settings(local_pyinfra_root_path / "config/")
def upload_json_and_make_message_body():

17
scripts/send_sigterm.py Normal file
View File

@ -0,0 +1,17 @@
import os
import signal
import time
# BE CAREFUL WITH THIS SCRIPT - THIS SIMULATES A SIGTERM FROM KUBERNETES
target_pid = int(input("Enter the PID of the target script: "))
print(f"Sending SIGTERM to PID {target_pid}...")
time.sleep(1)
try:
os.kill(target_pid, signal.SIGTERM)
print("SIGTERM sent.")
except ProcessLookupError:
print("Process not found.")
except PermissionError:
print("Permission denied. Are you trying to signal a process you don't own?")

View File

@ -0,0 +1,39 @@
#!/bin/bash
python_version=$1
nexus_user=$2
nexus_password=$3
# cookiecutter https://gitlab.knecon.com/knecon/research/template-python-project.git --checkout master
# latest_dir=$(ls -td -- */ | head -n 1) # should be the dir cookiecutter just created
# cd $latest_dir
pyenv install $python_version
pyenv local $python_version
pyenv shell $python_version
# install poetry globally (PREFERRED), only need to install it once
# curl -sSL https://install.python-poetry.org | python3 -
# remember to update poetry once in a while
poetry self update
# install poetry in current python environment, can lead to multiple instances of poetry being installed on one system (DISPREFERRED)
# pip install --upgrade pip
# pip install poetry
poetry config virtualenvs.in-project true
poetry config installer.max-workers 10
poetry config repositories.pypi-proxy "https://nexus.knecon.com/repository/pypi-proxy/simple"
poetry config http-basic.pypi-proxy ${nexus_user} ${nexus_password}
poetry config repositories.nexus https://nexus.knecon.com/repository/python/simple
poetry config http-basic.nexus ${nexus_user} ${nexus_password}
poetry env use $(pyenv which python)
poetry install --with=dev
poetry update
source .venv/bin/activate
pre-commit install
pre-commit autoupdate

View File

@ -1,7 +1,8 @@
import time
from pyinfra.config.loader import load_settings, parse_args
from pyinfra.examples import start_queue_consumer_with_prometheus_and_health_endpoints
from pyinfra.config.loader import load_settings, parse_settings_path
from pyinfra.examples import start_standard_queue_consumer
from pyinfra.queue.callback import make_download_process_upload_callback
def processor_mock(_data: dict, _message: dict) -> dict:
@ -10,5 +11,8 @@ def processor_mock(_data: dict, _message: dict) -> dict:
if __name__ == "__main__":
settings = load_settings(parse_args().settings_path)
start_queue_consumer_with_prometheus_and_health_endpoints(processor_mock, settings)
arguments = parse_settings_path()
settings = load_settings(arguments)
callback = make_download_process_upload_callback(processor_mock, settings)
start_standard_queue_consumer(callback, settings)

View File

@ -1,20 +1,48 @@
import json
import pytest
from pyinfra.config.loader import load_settings, pyinfra_config_path
from pyinfra.storage.connection import get_storage_from_settings
from pyinfra.config.loader import load_settings, local_pyinfra_root_path
from pyinfra.queue.manager import QueueManager
from pyinfra.storage.connection import get_storage
@pytest.fixture(scope="session")
def settings():
return load_settings(pyinfra_config_path)
return load_settings(local_pyinfra_root_path / "config/")
@pytest.fixture(scope="class")
def storage(storage_backend, settings):
settings.storage.backend = storage_backend
storage = get_storage_from_settings(settings)
storage = get_storage(settings)
storage.make_bucket()
yield storage
storage.clear_bucket()
@pytest.fixture(scope="session")
def queue_manager(settings):
settings.rabbitmq_heartbeat = 10
settings.connection_sleep = 5
settings.rabbitmq.max_retries = 3
settings.rabbitmq.max_delay = 10
queue_manager = QueueManager(settings)
yield queue_manager
@pytest.fixture
def input_message():
return json.dumps(
{
"targetFilePath": "test/target.json.gz",
"responseFilePath": "test/response.json.gz",
}
)
@pytest.fixture
def stop_message():
return "STOP"

6
tests/data.dvc Normal file
View File

@ -0,0 +1,6 @@
outs:
- md5: 75cc98b7c8fcf782a7d4941594e6bc12.dir
size: 134913
nfiles: 9
hash: md5
path: data

View File

@ -1,31 +1,41 @@
version: '2'
version: '3.8'
services:
minio:
image: minio/minio:RELEASE.2022-06-11T19-55-32Z
image: minio/minio:latest
container_name: minio
ports:
- "9000:9000"
environment:
- MINIO_ROOT_PASSWORD=password
- MINIO_ROOT_USER=root
volumes:
- /tmp/minio_store:/data
- /tmp/data/minio_store:/data
command: server /data
network_mode: "bridge"
network_mode: "bridge"
extra_hosts:
- "host.docker.internal:host-gateway"
rabbitmq:
image: docker.io/bitnami/rabbitmq:3.9.8
image: docker.io/bitnami/rabbitmq:latest
container_name: rabbitmq
ports:
- '4369:4369'
- '5551:5551'
- '5552:5552'
# - '4369:4369'
# - '5551:5551'
# - '5552:5552'
- '5672:5672'
- '25672:25672'
- '15672:15672'
# - '25672:25672'
environment:
- RABBITMQ_SECURE_PASSWORD=yes
- RABBITMQ_VM_MEMORY_HIGH_WATERMARK=100%
- RABBITMQ_DISK_FREE_ABSOLUTE_LIMIT=20Gi
- RABBITMQ_MANAGEMENT_ALLOW_WEB_ACCESS=true
network_mode: "bridge"
volumes:
- /opt/bitnami/rabbitmq/.rabbitmq/:/data/bitnami
volumes:
mdata:
- /tmp/bitnami/rabbitmq/.rabbitmq/:/data/bitnami
healthcheck:
test: [ "CMD", "curl", "-f", "http://localhost:15672" ]
interval: 30s
timeout: 10s
retries: 5
extra_hosts:
- "host.docker.internal:host-gateway"

View File

@ -0,0 +1,41 @@
from time import sleep
import pytest
from pyinfra.utils.opentelemetry import get_exporter, instrument_pika, setup_trace
@pytest.fixture(scope="session")
def exporter(settings):
settings.tracing.opentelemetry.exporter = "json"
return get_exporter(settings)
@pytest.fixture(autouse=True)
def setup_test_trace(settings, exporter, tracing_type):
settings.tracing.type = tracing_type
setup_trace(settings, exporter=exporter)
class TestOpenTelemetry:
@pytest.mark.xfail(
reason="Azure Monitor requires a connection string. Therefore the test is allowed to fail in this case."
)
@pytest.mark.parametrize("tracing_type", ["opentelemetry", "azure_monitor"])
def test_queue_messages_are_traced(self, queue_manager, input_message, stop_message, settings, exporter):
instrument_pika()
queue_manager.purge_queues()
queue_manager.publish_message_to_input_queue(input_message)
queue_manager.publish_message_to_input_queue(stop_message)
def callback(_):
sleep(2)
return {"flat": "earth"}
queue_manager.start_consuming(callback)
for exported_trace in exporter.traces:
assert (
exported_trace["resource"]["attributes"]["service.name"] == settings.tracing.opentelemetry.service_name
)

View File

@ -5,7 +5,10 @@ import pytest
import requests
from fastapi import FastAPI
from pyinfra.webserver.prometheus import add_prometheus_endpoint, make_prometheus_processing_time_decorator_from_settings
from pyinfra.webserver.prometheus import (
add_prometheus_endpoint,
make_prometheus_processing_time_decorator_from_settings,
)
from pyinfra.webserver.utils import create_webserver_thread_from_settings

View File

@ -3,11 +3,8 @@ from sys import stdout
from time import sleep
import pika
import pytest
from kn_utils.logging import logger
from pyinfra.queue.manager import QueueManager
logger.remove()
logger.add(sink=stdout, level="DEBUG")
@ -20,28 +17,21 @@ def make_callback(process_time):
return callback
@pytest.fixture(scope="session")
def queue_manager(settings):
settings.rabbitmq_heartbeat = 10
settings.connection_sleep = 5
queue_manager = QueueManager(settings)
yield queue_manager
@pytest.fixture
def input_message():
return json.dumps({
"targetFilePath": "test/target.json.gz",
"responseFilePath": "test/response.json.gz",
})
@pytest.fixture
def stop_message():
return "STOP"
def file_not_found_callback(x):
raise FileNotFoundError("File not found")
class TestQueueManager:
def test_not_available_file_leads_to_message_rejection_without_crashing(
self, queue_manager, input_message, stop_message
):
queue_manager.purge_queues()
queue_manager.publish_message_to_input_queue(input_message)
queue_manager.publish_message_to_input_queue(stop_message)
queue_manager.start_consuming(file_not_found_callback)
def test_processing_of_several_messages(self, queue_manager, input_message, stop_message):
queue_manager.purge_queues()

View File

@ -5,8 +5,11 @@ from time import sleep
import pytest
from fastapi import FastAPI
from pyinfra.storage.connection import get_storage_from_tenant_id
from pyinfra.storage.utils import download_data_as_specified_in_message, upload_data_as_specified_in_message
from pyinfra.storage.connection import get_storage_for_tenant
from pyinfra.storage.utils import (
download_data_bytes_as_specified_in_message,
upload_data_as_specified_in_message,
)
from pyinfra.utils.cipher import encrypt
from pyinfra.webserver.utils import create_webserver_thread
@ -103,7 +106,11 @@ class TestMultiTenantStorage:
self, tenant_id, tenant_server_mock, settings, tenant_server_host, tenant_server_port
):
settings["storage"]["tenant_server"]["endpoint"] = f"http://{tenant_server_host}:{tenant_server_port}"
storage = get_storage_from_tenant_id(tenant_id, settings)
storage = get_storage_for_tenant(
tenant_id,
settings["storage"]["tenant_server"]["endpoint"],
settings["storage"]["tenant_server"]["public_key"],
)
storage.put_object("file", b"content")
data_received = storage.get_object("file")
@ -125,23 +132,35 @@ def payload(payload_type):
"targetFileExtension": "target.json.gz",
"responseFileExtension": "response.json.gz",
}
elif payload_type == "target_file_dict":
return {
"targetFilePath": {"file_1": "test/file.target.json.gz", "file_2": "test/file.target.json.gz"},
"responseFilePath": "test/file.response.json.gz",
}
@pytest.mark.parametrize("payload_type", ["target_response_file_path", "dossier_id_file_id"], scope="class")
@pytest.mark.parametrize(
"payload_type",
[
"target_response_file_path",
"dossier_id_file_id",
"target_file_dict",
],
scope="class",
)
@pytest.mark.parametrize("storage_backend", ["azure", "s3"], scope="class")
class TestDownloadAndUploadFromMessage:
def test_download_and_upload_from_message(self, storage, payload):
def test_download_and_upload_from_message(self, storage, payload, payload_type):
storage.clear_bucket()
input_data = {"data": "success"}
result = {"process_result": "success"}
storage_data = {**payload, "data": result}
packed_data = gzip.compress(json.dumps(storage_data).encode())
storage.put_object("test/file.target.json.gz", gzip.compress(json.dumps(input_data).encode()))
storage.put_object("test/file.target.json.gz", packed_data)
data = download_data_as_specified_in_message(storage, payload)
assert data == input_data
upload_data_as_specified_in_message(storage, payload, input_data)
_ = download_data_bytes_as_specified_in_message(storage, payload)
upload_data_as_specified_in_message(storage, payload, result)
data = json.loads(gzip.decompress(storage.get_object("test/file.response.json.gz")).decode())
assert data == {**payload, "data": input_data}
assert data == storage_data

View File

@ -1,9 +1,10 @@
import os
from pathlib import Path
import pytest
from dynaconf import Validator
from pyinfra.config.loader import load_settings
from pyinfra.config.loader import load_settings, local_pyinfra_root_path, normalize_to_settings_files
from pyinfra.config.validators import webserver_validators
@ -22,7 +23,7 @@ class TestConfig:
validators = webserver_validators
test_settings = load_settings(validators=validators)
test_settings = load_settings(root_path=local_pyinfra_root_path, validators=validators)
assert test_settings.webserver.host == "localhost"
@ -30,7 +31,25 @@ class TestConfig:
os.environ["TEST__VALUE__INT"] = "1"
os.environ["TEST__VALUE__STR"] = "test"
test_settings = load_settings(validators=test_validators)
test_settings = load_settings(root_path=local_pyinfra_root_path, validators=test_validators)
assert test_settings.test.value.int == 1
assert test_settings.test.value.str == "test"
@pytest.mark.parametrize(
"settings_path,expected_file_paths",
[
(None, []),
("config", [f"{local_pyinfra_root_path}/config/settings.toml"]),
("config/settings.toml", [f"{local_pyinfra_root_path}/config/settings.toml"]),
(f"{local_pyinfra_root_path}/config", [f"{local_pyinfra_root_path}/config/settings.toml"]),
],
)
def test_normalize_settings_files(self, settings_path, expected_file_paths):
files = normalize_to_settings_files(settings_path, local_pyinfra_root_path)
print(files)
assert len(files) == len(expected_file_paths)
for path, expected in zip(files, expected_file_paths):
assert path == Path(expected).absolute()

View File

@ -4,7 +4,7 @@ from kn_utils.logging import logger
def test_necessary_log_levels_are_supported_by_kn_utils():
logger.setLevel("TRACE")
logger.trace("trace")
logger.debug("debug")
logger.info("info")
@ -13,6 +13,7 @@ def test_necessary_log_levels_are_supported_by_kn_utils():
logger.exception("exception", exc_info="this is an exception")
logger.error("error", exc_info="this is an error")
def test_setlevel_warn():
logger.setLevel("WARN")
logger.warning("warn")

View File

@ -0,0 +1,83 @@
import json
import pytest
from unittest.mock import patch
from pyinfra.storage.utils import (
download_data_bytes_as_specified_in_message,
upload_data_as_specified_in_message,
DownloadedData,
)
from pyinfra.storage.storages.storage import Storage
@pytest.fixture
def mock_storage():
with patch("pyinfra.storage.utils.Storage") as MockStorage:
yield MockStorage()
@pytest.fixture(
params=[
{
"raw_payload": {
"tenantId": "tenant1",
"dossierId": "dossier1",
"fileId": "file1",
"targetFileExtension": "txt",
"responseFileExtension": "json",
},
"expected_result": {
"data": b'{"key": "value"}',
"file_path": "tenant1/dossier1/file1.txt"
}
},
{
"raw_payload": {
"targetFilePath": "some/path/to/file.txt.gz",
"responseFilePath": "some/path/to/file.json"
},
"expected_result": {
"data": b'{"key": "value"}',
"file_path": "some/path/to/file.txt.gz"
}
},
{
"raw_payload": {
"targetFilePath": {
"file1": "some/path/to/file1.txt.gz",
"file2": "some/path/to/file2.txt.gz"
},
"responseFilePath": "some/path/to/file.json"
},
"expected_result": {
"file1": {
"data": b'{"key": "value"}',
"file_path": "some/path/to/file1.txt.gz"
},
"file2": {
"data": b'{"key": "value"}',
"file_path": "some/path/to/file2.txt.gz"
}
}
},
]
)
def payload_and_expected_result(request):
return request.param
def test_download_data_bytes_as_specified_in_message(mock_storage, payload_and_expected_result):
raw_payload = payload_and_expected_result["raw_payload"]
expected_result = payload_and_expected_result["expected_result"]
mock_storage.get_object.return_value = b'{"key": "value"}'
result = download_data_bytes_as_specified_in_message(mock_storage, raw_payload)
assert isinstance(result, dict)
assert result == expected_result
mock_storage.get_object.assert_called()
def test_upload_data_as_specified_in_message(mock_storage, payload_and_expected_result):
raw_payload = payload_and_expected_result["raw_payload"]
data = {"key": "value"}
upload_data_as_specified_in_message(mock_storage, raw_payload, data)
mock_storage.put_object.assert_called_once()