Compare commits

...

266 Commits

Author SHA1 Message Date
Julius Unverfehrt
a1bfec765c Pull request #43: Image prediction v2 support
Merge in RR/pyinfra from image-prediction-v2-support to 2.0.0

Squashed commit of the following:

commit 37c536324e847357e86dd9b72d1e07ad792ed90f
Merge: 77d1db8 01bfb1d
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Mon Jul 11 13:53:56 2022 +0200

    Merge branch '2.0.0' of ssh://git.iqser.com:2222/rr/pyinfra into image-prediction-v2-support

commit 77d1db8e8630de8822c124eb39f4cd817ed1d3e1
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Mon Jul 11 13:07:41 2022 +0200

    add operation assignment via config if operation is not defined by caller

commit 36c8ca48a8c6151f713c093a23de110901ba6b02
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Mon Jul 11 10:33:34 2022 +0200

    refactor nothing part 2

commit f6cd0ef986802554dd544b9b7a24073d3b3f05b5
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Mon Jul 11 10:28:49 2022 +0200

    refactor nothing

commit 1e70d49531e89613c70903be49290b94ee014f65
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jul 6 17:42:12 2022 +0200

    enable docker-compose fixture

commit 9fee32cecdd120cfac3e065fb8ad2b4f37b49226
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jul 6 17:40:35 2022 +0200

    added 'multi' key to actual operation configurations

commit 4287f6d9878dd361489b8490eafd06f81df472ce
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jul 6 16:56:12 2022 +0200

    removed debug prints

commit 23a533e8f99222c7e598fb0864f65e9aa3508a3b
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jul 6 16:31:50 2022 +0200

    completed correcting / cleaning upload and download logic with regard to operations and ids. next: remove debug code

commit 33246d1ff94989d2ea70242c7ae2e58afa4d35c1
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jul 6 14:37:17 2022 +0200

    corrected / cleaned upload and download logic with regard to operations and ids

commit 7f2b4e882022c6843cb2f80df202caa495c54ee9
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Tue Jul 5 18:41:07 2022 +0200

    partially decomplected file descriptor manager from concrete and non-generic descriptor code

commit 40b892da17670dae3b8eba1700877c1dcf219852
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Tue Jul 5 09:53:46 2022 +0200

    typo

commit ec4fa8e6f4551ff1f8d4f78c484b7a260f274898
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Tue Jul 5 09:52:41 2022 +0200

    typo

commit 701b43403c328161fd96a73ce388a66035cca348
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jul 4 17:26:53 2022 +0200

    made adjustments for image classification with pyinfra 2.x; added related fixmes

commit 7a794bdcc987631cdc4d89b5620359464e2e018e
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jul 4 13:05:26 2022 +0200

    removed obsolete imports

commit 3fc6a7ef5d0172dbce1c4292d245eced2f378b5a
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jul 4 11:47:12 2022 +0200

    enable docker-compose fixture

commit 36d8d3bc851b06d94cf12a73048a00a67ef79c42
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jul 4 11:46:53 2022 +0200

    renaming

commit 3bf00d11cd041dff325b66f13fcd00d3ce96b8b5
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 30 12:47:57 2022 +0200

    refactoring: added cached pipeline factory

commit 90e735852af2f86e35be845fabf28494de952edb
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jun 29 13:47:08 2022 +0200

    renaming

commit 93b3d4b202b41183ed8cabe193a4bfa03f520787
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jun 29 13:25:03 2022 +0200

    further refactored server setup code: moving and decomplecting

commit 8b2ed83c7ade5bd811cb045d56fbfb0353fa385e
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jun 29 12:53:09 2022 +0200

    refactored server setup code: factored out and decoupled operation registry and prometheus summary registry

... and 6 more commits
2022-07-11 14:17:59 +02:00
Julius Unverfehrt
01bfb1d668 Pull request #40: 2.0.0 documentation
Merge in RR/pyinfra from 2.0.0-documentation to 2.0.0

Squashed commit of the following:

commit 7a794bdcc987631cdc4d89b5620359464e2e018e
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jul 4 13:05:26 2022 +0200

    removed obsolete imports

commit 3fc6a7ef5d0172dbce1c4292d245eced2f378b5a
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jul 4 11:47:12 2022 +0200

    enable docker-compose fixture

commit 36d8d3bc851b06d94cf12a73048a00a67ef79c42
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jul 4 11:46:53 2022 +0200

    renaming

commit 3bf00d11cd041dff325b66f13fcd00d3ce96b8b5
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 30 12:47:57 2022 +0200

    refactoring: added cached pipeline factory

commit 90e735852af2f86e35be845fabf28494de952edb
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jun 29 13:47:08 2022 +0200

    renaming

commit 93b3d4b202b41183ed8cabe193a4bfa03f520787
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jun 29 13:25:03 2022 +0200

    further refactored server setup code: moving and decomplecting

commit 8b2ed83c7ade5bd811cb045d56fbfb0353fa385e
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jun 29 12:53:09 2022 +0200

    refactored server setup code: factored out and decoupled operation registry and prometheus summary registry

commit da2dce762bdd6889165fbb320dc9ee8a0bd089b2
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Tue Jun 28 19:40:04 2022 +0200

    adjusted test target

commit 70df7911b9b92f4b72afd7d4b33ca2bbf136295e
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Tue Jun 28 19:32:38 2022 +0200

    minor refactoring

commit 0937b63dc000346559bde353381304b273244109
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jun 27 13:59:59 2022 +0200

    support for empty operation suffix

commit 5e56917970962a2e69bbd66a324bdb4618c040bd
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jun 27 12:52:36 2022 +0200

    minor refactoring

commit 40665a7815ae5927b3877bda14fb77deef37d667
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jun 27 10:57:04 2022 +0200

    optimization: prefix filtering via storage API for get_all_object_names

commit af0892a899d09023eb0e61eecb63e03dc2fd3b60
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jun 27 10:55:47 2022 +0200

    topological sorting of definitions by caller hierarchy
2022-07-11 09:09:44 +02:00
Matthias Bisping
94254e1681 Pull request #38: 2.0.0 input output file pattern for download strategy
Merge in RR/pyinfra from 2.0.0-input-output-file-pattern-for-download-strategy to 2.0.0

Squashed commit of the following:

commit c7ce79ebbeace6a8cb7925ed69eda2d7cd2a4783
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Fri Jun 24 12:35:29 2022 +0200

    refactor

commit 80f04e544962760adb2dc60c9dd03ccca22167d6
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Fri Jun 24 11:06:10 2022 +0200

    refactoring of component factory, callback and client-pipeline getter

commit 6c024e1a789e1d55f0739c6846e5c02e8b7c943d
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 20:04:10 2022 +0200

    operations section in config cleaned up; added upload formatter

commit c85800aefc224967cea591c1ec4cf1aaa3ac8215
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 19:22:51 2022 +0200

    refactoring; removed obsolete config entries and code

commit 4be125952d82dc868935c8c73ad87fd8f0bd1d6c
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 19:14:47 2022 +0200

    removed obsolete code

commit ac69a5c8e3f1e2fd7e828a17eeab97984f4f9746
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 18:58:41 2022 +0200

    refactoring: rm dl strat module

commit efd36d0fc4f8f36d267bfa9d35415811fe723ccc
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 18:33:51 2022 +0200

    refactoring: multi dl strat -> downloader, rm single dl strat

commit afffdeb993500a6abdb6fe85a549e3d6e97e9ee7
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 16:39:22 2022 +0200

    operations section in config cleaned up

commit 671129af3e343490e0fb277a2b0329aa3027fd73
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Thu Jun 23 16:09:16 2022 +0200

    rename prometheus metric name to include service name

commit 932a3e314b382315492aecab95b1f02f2916f8a6
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 14:43:23 2022 +0200

    cleaned up file descr mngr

commit 79350b4ce71fcd095ed6a5e1d3a598ea246fae53
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 12:26:15 2022 +0200

    refactoring WIP: moving response stratgey logic into storage strategy (needs to be refactored as well, later) and file descr mngr. Here the moved code needs to be cleaned up.

commit 7e48c66f0c378b25a433a4034eefdc8a0957e775
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 12:00:48 2022 +0200

    refactoring; removed operation / response folder from output path

commit 8e6cbdaf23c48f6eeb52512b7f382d5727e206d6
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 11:08:37 2022 +0200

    refactoring; added operation -> file pattern mapping to file descr mngr (mainly for self-documentaton purposes)

commit 2c80d7cec0cc171e099e5b13aadd2ae0f9bf4f02
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 10:59:57 2022 +0200

    refactoring: introduced input- and output-file specific methods to file descr mngr

commit ecced37150eaac3008cc1b01b235e5f7135e504b
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 10:43:26 2022 +0200

    refactoring

commit 3828341e98861ff8d63035ee983309ad5064bb30
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 23 10:42:46 2022 +0200

    refactoring

commit 9a7c412523d467af40feb6924823ca89e28aadfe
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Wed Jun 22 17:04:54 2022 +0200

    add prometheus metric name for default operation

commit d207b2e274ba53b2a21a18c367bb130fb05ee1cd
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Wed Jun 22 17:02:55 2022 +0200

    Merge config

commit d3fdf36b12d8def18810454765e731599b833bfc
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jun 22 17:01:12 2022 +0200

    added fixmes / todos

commit f49d0b9cb7764473ef9d127bc5d88525a4a16a23
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Wed Jun 22 16:28:25 2022 +0200

    update script

... and 47 more commits
2022-06-24 12:59:26 +02:00
Julius Unverfehrt
0d87c60fce parametrize download strategy 2022-06-20 11:43:33 +02:00
Julius Unverfehrt
2dff7d62aa remove duplicate pickup metrics 2022-06-20 10:27:36 +02:00
Julius Unverfehrt
e9424aee04 remove duplicate pickup metrics 2022-06-20 08:29:33 +02:00
Julius Unverfehrt
9d73f42982 remove duplicate pickup metrics 2022-06-20 08:19:51 +02:00
Matthias Bisping
4aef3316a3 renaming 2022-06-15 15:14:17 +02:00
Matthias Bisping
41172d6abb formatting 2022-06-15 15:13:46 +02:00
Matthias Bisping
71af6f703b using function local registry for prometheus 2022-06-15 15:13:10 +02:00
Julius Unverfehrt
a5ff59069a Merge branch '2.0.0' of ssh://git.iqser.com:2222/rr/pyinfra into 2.0.0 2022-06-15 14:56:06 +02:00
Julius Unverfehrt
965d79b08f add prometheus endpoint to analysis server 2022-06-15 14:52:22 +02:00
Matthias Bisping
ca6a2f8d32 fixed docker fixture 2022-06-15 14:14:38 +02:00
Matthias Bisping
86eb3a6f7e enanbled docker fixture 2022-06-15 14:01:56 +02:00
Matthias Bisping
87cf1ad189 removed obsolete imports 2022-06-15 14:00:36 +02:00
Matthias Bisping
7865a767c7 added type hint 2022-06-15 14:00:09 +02:00
Matthias Bisping
3897e44378 Merge branch '2.0.0' of ssh://git.iqser.com:2222/rr/pyinfra into 2.0.0 2022-06-15 12:25:55 +02:00
Matthias Bisping
1558398c56 made object name construction logic part of download strategies 2022-06-15 12:25:27 +02:00
Matthias Bisping
8537d4af50 made object name construction logic part of download strategies 2022-06-15 12:02:41 +02:00
Matthias Bisping
116c2b8924 changed default target file extension 2022-06-15 10:31:05 +02:00
Matthias Bisping
45f04590cc removed obsolete code 2022-06-15 10:25:58 +02:00
Matthias Bisping
bb729b6c26 wrapped retry decortaor, so retry behaviour can be controlled via config and set to a lower value for tests to save time 2022-06-15 10:25:53 +02:00
Matthias Bisping
24be8d3f13 test config options for logging and docker; changed object name construction 2022-06-15 09:59:47 +02:00
Matthias Bisping
147416bfad pin minio and rabbitmq again 2022-06-14 17:05:33 +02:00
Matthias Bisping
c8fb15b9f7 rm retry decorator on clear_bucket, unpin minio 2022-06-14 17:02:22 +02:00
Matthias Bisping
771df7c78d make bucket before running test; rabbitmq 3.9 again 2022-06-14 16:58:27 +02:00
Matthias Bisping
f9972a95a7 fixed minio version 2022-06-14 16:44:46 +02:00
Matthias Bisping
83e1b5f029 added retry to clear bucket 2022-06-14 16:34:10 +02:00
Matthias Bisping
c1b5cbeb51 logging setup changed 2022-06-14 16:28:45 +02:00
Matthias Bisping
4fcc89f938 s3 backend fixture no longer needs to not come last 2022-06-14 15:40:23 +02:00
Matthias Bisping
d1242aee6c enable docker-compose fixture 2022-06-14 15:33:34 +02:00
Matthias Bisping
0442ecd7b3 Merge branch 'file_extensions_and_index_handler_via_config' of ssh://git.iqser.com:2222/rr/pyinfra into file_extensions_and_index_handler_via_config 2022-06-14 15:33:14 +02:00
Matthias Bisping
e64ade3135 added comments to new config params 2022-06-14 15:33:11 +02:00
Julius Unverfehrt
ace919d078 set xfail for broken tests, set docker-compose rabbitmq version to version running on production server 2022-06-14 15:10:08 +02:00
Matthias Bisping
d179fdede6 consumer test runs again...? 2022-06-14 14:17:08 +02:00
Julius Unverfehrt
9b975b759b set xfail for broken tests 2022-06-14 14:06:06 +02:00
Julius Unverfehrt
c033d98acd adjust test for new return type of visitor, add download strategy parameter to config 2022-06-14 12:33:56 +02:00
Julius Unverfehrt
bb7e631f91 introduce flag to distinguish between server side tests and complete integration tests 2022-06-14 11:56:47 +02:00
Julius Unverfehrt
d8b5be9e72 refactoring 2022-06-14 11:26:46 +02:00
Julius Unverfehrt
2954bbc1ad refactoring 2022-06-14 11:21:31 +02:00
Matthias Bisping
a69f613fe6 completed multi download to single response logic. but broke pipeline test again, maybe? 2022-06-14 11:08:03 +02:00
Matthias Bisping
fa3b08aef5 added dependencies 2022-06-13 15:38:14 +02:00
Matthias Bisping
14ab23b2cc fixed bug in operation wrapper returning a tuple instead of an singleton-iterable with a tuple in one of the return-cases. 2022-06-13 15:36:17 +02:00
Matthias Bisping
8a64e5d868 narrowed down the pytest bug: n_items interacts with s3_backend: when n_items has more than one entry, s3_backend must not be the last decorator 2022-06-13 10:36:26 +02:00
Matthias Bisping
051cea3ded found bug in pytest fixture setup causing serve_test to fail (s3 backend fixture function vs s3 backend decorator fixture) 2022-06-13 10:15:33 +02:00
Matthias Bisping
40bc8c2c2c debugging of queue problem, when queue is not consumed by skipping a test configuration WIP 2022-06-13 09:49:10 +02:00
Matthias Bisping
9962651d88 download strategy WIP: added 1 -> n upload logic 2022-06-10 14:06:19 +02:00
Matthias Bisping
249c6203b2 download strategy WIP 2022-06-10 13:26:43 +02:00
Matthias Bisping
3a3c497383 added logic for uploading response files to a folder, which defaults to the name of the operation used 2022-06-10 13:03:40 +02:00
Matthias Bisping
13b6388e5a fixed default identifier type 2022-06-09 16:24:11 +02:00
Matthias Bisping
1940b974b1 added id setting to operatin mocks and thus removed need for random seed of randomly seeded hash in storage item identifier 2022-06-09 15:18:41 +02:00
Matthias Bisping
7e46a66698 fixed bug in stream_pages: return -> yield 2022-06-09 14:58:42 +02:00
Matthias Bisping
5b45f5fa15 added dependency 2022-06-08 14:27:34 +02:00
Matthias Bisping
e43504f08d commented out consumer tests. something is wrong with the fixtures, leading to the tests failing when run together with other tests. Consumer functionality is covered by serve_test.py, but dedicated tests should be restored at a later point. 2022-06-08 11:03:50 +02:00
Matthias Bisping
bffaa0786e readded correct consumer test code which git messed up 2022-06-07 15:48:54 +02:00
Matthias Bisping
8d209d63c7 Merge branch 'master' of ssh://git.iqser.com:2222/rr/pyinfra into fixing_consumer_tests 2022-06-07 15:44:39 +02:00
Matthias Bisping
0dee98b23d consumer test adjustment WIP 2022-06-07 15:11:38 +02:00
Matthias Bisping
91701929e5 adjusted stream buffer test for core-operations taking tuples now 2022-06-07 15:08:58 +02:00
Matthias Bisping
c55e41f2d8 refactoring; tweaked json-blob-parser; added standardization case for decodable strings as storage items 2022-06-07 14:55:40 +02:00
Matthias Bisping
f718b2f7ef parser composer checks for either-type 2022-06-03 16:34:37 +02:00
Matthias Bisping
730bdfb220 refactoring 2022-06-03 16:26:12 +02:00
Matthias Bisping
e48fa85784 replaced trial and error logic for parsing blobs in visitor with parser composer instance 2022-06-03 16:18:56 +02:00
Matthias Bisping
6e5af4092e added parsers and parser composer for clean handling of storage blobs in the context of interpreting downloaded blobs 2022-06-03 16:13:15 +02:00
Matthias Bisping
ea2d3223fb parsing strategy error handling for bytes as not an encoded string 2022-06-03 14:55:04 +02:00
Matthias Bisping
26573eeda3 introduced parsing strategy for storage blobs as part of the queue visitor 2022-06-03 14:49:19 +02:00
Matthias Bisping
7730950b50 cleaning up standardization method for downloaded storage items (WIP) 2022-06-03 14:08:40 +02:00
Matthias Bisping
9a47388017 adjusted param fixtures for serve test 2022-06-03 13:48:33 +02:00
Matthias Bisping
8a2b60a8f5 applied black 2022-06-03 13:42:52 +02:00
Matthias Bisping
9232385dea modified core operations to return metadata for better classification mock test 2022-06-03 13:40:59 +02:00
Matthias Bisping
eb81e96400 added extraction test case (with page index) to serving test 2022-06-03 13:13:45 +02:00
Julius Unverfehrt
e7ee0cda42 add compression for storage item before upload, update script for extraction 2022-06-02 15:49:28 +02:00
Julius Unverfehrt
90f8f9da36 update script for extraction 2022-06-02 15:12:33 +02:00
Julius Unverfehrt
c2d7127a84 add log for Consumer Error, fix page index hash function 2022-06-02 14:15:03 +02:00
Matthias Bisping
bfe8bbb8cb reorganized queue message metadata in request 2022-06-01 16:00:46 +02:00
Matthias Bisping
ecff50ae7c submit endpoint is now 'submit' or 'operation' 2022-06-01 11:46:51 +02:00
Matthias Bisping
7a1b215d69 removed obsolete code 2022-06-01 11:37:47 +02:00
Matthias Bisping
01ce914417 generalizing server setup from operations WIP 2022-06-01 10:58:44 +02:00
Matthias Bisping
2b72174605 Merge branch 'integration_test_in_order_develop_aggregation_stratgey' of ssh://git.iqser.com:2222/rr/pyinfra into integration_test_in_order_develop_aggregation_stratgey 2022-05-31 18:40:47 +02:00
Matthias Bisping
586871a26f added queue message body to analysis input dict 2022-05-31 18:40:40 +02:00
Julius Unverfehrt
187055e5eb adjust mock_client script for conversion 2022-05-31 18:19:41 +02:00
Matthias Bisping
3046b4dc26 misc minor fixes while integrating with pdf2image 2022-05-31 17:58:28 +02:00
Matthias Bisping
dd591bd24b removed obsolete code 2022-05-31 16:23:14 +02:00
Matthias Bisping
12fa52f38c removed obsolete code 2022-05-31 16:22:27 +02:00
Matthias Bisping
043fa1ee53 removed obsolete code 2022-05-31 16:22:11 +02:00
Matthias Bisping
1fa6bbdbc6 removed obsolete code 2022-05-31 16:19:37 +02:00
Matthias Bisping
93747d0f63 test param adjustment 2022-05-31 16:14:48 +02:00
Matthias Bisping
dc4f578e94 reorganized serve-test to use only default-objects instead of test-object 2022-05-31 16:13:47 +02:00
Matthias Bisping
93da0d12bb refactoring: added paramater 'n' to consume_and_publish 2022-05-31 15:30:23 +02:00
Matthias Bisping
a688cbd7bd refactoring 2022-05-31 13:57:20 +02:00
Matthias Bisping
0104395790 removed obsolete code 2022-05-31 13:50:44 +02:00
Matthias Bisping
18a9683ddb operation field in queue message WIP 2022-05-31 13:47:40 +02:00
Matthias Bisping
ae2509dc59 modified visitor and queue manager for 1 -> n (1 request to n response messages) 2022-05-30 13:04:12 +02:00
Matthias Bisping
bf9f6ba8e2 tweaked response upload related logic and repaired visitor tests that were broken by new visitor code written for accomodating the aggregation storage strategy 2022-05-30 12:10:14 +02:00
Matthias Bisping
868a53b23f response file path depending on response metadata and request page index complete 2022-05-25 17:37:18 +02:00
Matthias Bisping
2d1ec16714 modified serve test to use components from fixtures; response file path depending on response metadata and request page index WIP 2022-05-25 16:56:08 +02:00
Matthias Bisping
9e2ed6a9f9 fix: data was doubly encoded and hence always triggering the immediate upload path 2022-05-24 14:51:24 +02:00
Matthias Bisping
ab56c9a173 added todo comment for modifying acknowledging logic at some point to allow input buffering to take effect 2022-05-24 14:39:39 +02:00
Matthias Bisping
298d8d3e2c metadata as part of storage item test works 2022-05-23 15:59:56 +02:00
Matthias Bisping
0842ec0d91 metadata as part of storage item WIP 2022-05-23 15:36:18 +02:00
Matthias Bisping
c944cdb1a7 refactoring: splitting source data from encoded data in data fixture 2022-05-23 13:53:50 +02:00
Matthias Bisping
7b998cdaf6 refactoring 2022-05-23 13:14:18 +02:00
Matthias Bisping
426967ee46 refactoring 2022-05-23 11:39:30 +02:00
Matthias Bisping
54ca81d577 moved parameter combination based test skipping into operation factory 2022-05-23 11:28:35 +02:00
Matthias Bisping
13888524fb refactoring 2022-05-23 10:40:28 +02:00
Matthias Bisping
a7ffaeb18f changed return value of file name listing function for storages to return strings of filenames rather than tuples of bucket name and file name 2022-05-23 10:19:07 +02:00
Matthias Bisping
c97393f690 skipping undefined combinations for analysis_task 2022-05-23 10:07:28 +02:00
Matthias Bisping
7ff466e0ea added test for empty return-data operation, like classification 2022-05-23 09:58:43 +02:00
Matthias Bisping
02b0009219 data AND metadata is being uploaded instead of data only 2022-05-18 16:35:18 +02:00
Matthias Bisping
cf13f67394 refactoring: serve_test can now be run with input_data_items like image, pdf etc 2022-05-18 10:56:14 +02:00
Matthias Bisping
0ab86206ec fixed bug introduced by overwritng 'body' as explanatory variable within try-block, which resultet in republish() receiving parsed body, instead of body as bytes 2022-05-18 10:32:54 +02:00
Matthias Bisping
35542f994c integration test for lazy pipeline 2022-05-18 09:24:12 +02:00
Matthias Bisping
fb712af7c6 storag aggregation strategy working in first version 2022-05-17 22:30:54 +02:00
Matthias Bisping
6cb13051eb fixed following bugs:
- upper() did yield instead of return
 - metdadata was not repeated when zipping with results generator
 - since test metadata was empty dict,  target data was therefore empty always, since results were zipped with {}
 - hence added check for target lengths > 0
 - fixed return value of queued stream function dispatcher; only returned first item of 1 -> n results
2022-05-17 21:48:16 +02:00
Matthias Bisping
456cb4157d refactoring: move 2022-05-17 17:27:58 +02:00
Matthias Bisping
6945760045 refactoring: move 2022-05-17 15:59:04 +02:00
Matthias Bisping
47f1d77c03 renaming 2022-05-17 12:12:43 +02:00
Matthias Bisping
fb325ce43d renaming 2022-05-17 12:10:24 +02:00
Matthias Bisping
5590669939 pipelin laziness test works again 2022-05-17 10:44:43 +02:00
Matthias Bisping
9c262e7138 non-rest pipeline works again 2022-05-17 10:27:32 +02:00
Matthias Bisping
e5a4e7e994 applied black 2022-05-16 15:00:09 +02:00
Matthias Bisping
89f562aa71 refactoring: move 2022-05-16 14:58:19 +02:00
Matthias Bisping
1074f44b30 no buffer capacity test; commented out probably dead codde -- removing next 2022-05-16 14:34:37 +02:00
Matthias Bisping
96bf831b00 refactoring: move 2022-05-16 13:31:11 +02:00
Matthias Bisping
5d2b71d647 target data fixture and test for flat stream buffer on different data 2022-05-16 13:21:31 +02:00
Matthias Bisping
d12124b2d5 refactoring 2022-05-16 13:04:37 +02:00
Matthias Bisping
7adbdefb4e refactoring: skipping invalid parameter combinations 2022-05-16 12:18:04 +02:00
Matthias Bisping
a1c292a485 refactoring: pulled core operation taking only data out from operation taking data and metadata 2022-05-16 11:53:52 +02:00
Matthias Bisping
948575d199 renaming 2022-05-16 11:43:48 +02:00
Matthias Bisping
2070f300c9 refactoring: queued stream function returns first of generator 2022-05-16 10:28:53 +02:00
Matthias Bisping
092a0e2964 renaming 2022-05-13 17:16:27 +02:00
Matthias Bisping
40777ae609 refactoring: simplyfing lazy processor to queued function WIP 2022-05-13 16:59:50 +02:00
Matthias Bisping
2434e0ea55 refactoring; stream buffer tests 2022-05-13 16:38:20 +02:00
Matthias Bisping
08ad83b6a5 renaming 2022-05-13 15:04:01 +02:00
Matthias Bisping
9870aa38d1 renaming 2022-05-13 15:02:05 +02:00
Matthias Bisping
3b7605772e refactoring 2022-05-13 12:42:07 +02:00
Matthias Bisping
1acf16dc91 refactoring: flat stream buffer now takes over stream buffer flushing 2022-05-13 12:29:18 +02:00
Matthias Bisping
c09e5df23e refactoring: introduced flat stream buffer class 2022-05-13 12:13:59 +02:00
Matthias Bisping
bfdce62ccf refactoring; renaming 2022-05-12 19:46:29 +02:00
Matthias Bisping
1552cd10cc refactoring: further simplififyied queue consuming 2022-05-12 19:26:22 +02:00
Matthias Bisping
8b0c2d4e07 refactoring: further simplified queue consuming function; added one -> many test fixture param 2022-05-12 17:55:45 +02:00
Matthias Bisping
461c0fe6a6 refactoring: simplified queue consuming function 2022-05-12 17:29:20 +02:00
Matthias Bisping
9b5fc4ff77 refactoring: made queued buffer coupler into a function 2022-05-12 17:19:52 +02:00
Matthias Bisping
e3793e5c7c refactoring: split stream processor into two functions; moved queue streaming Nothing check from caller to stream function 2022-05-12 17:12:26 +02:00
Matthias Bisping
1a04dfb426 renaming 2022-05-12 14:42:42 +02:00
Matthias Bisping
e151d2005b refactoring 2022-05-11 17:01:56 +02:00
Matthias Bisping
da2572b8be refactoring 2022-05-11 16:59:41 +02:00
Matthias Bisping
eb8ace4ddd refactoring 2022-05-11 16:38:36 +02:00
Matthias Bisping
096068367f fixed recursion issue 2022-05-11 12:40:03 +02:00
Matthias Bisping
7bd35dce67 refactoring: introduced queue-buffer-coupler; introduced recursion depth issie -- fixing next 2022-05-11 12:39:36 +02:00
Matthias Bisping
1eb4dbc657 refactoring: split queue processor into output buffer and queue streamer 2022-05-11 10:54:27 +02:00
Matthias Bisping
ccf7a7379d refactoring: introduced queue wrapper 2022-05-11 10:44:50 +02:00
Matthias Bisping
b1a318872e refactoring 2022-05-11 10:16:36 +02:00
Matthias Bisping
abc56e6d9f refactoring: factored out queue processor fom lazy bufferizer 2022-05-11 10:07:34 +02:00
Matthias Bisping
c8579a8ad0 renaming 2022-05-10 12:49:36 +02:00
Matthias Bisping
c68e19e6e4 refactoring rest stream processor 2022-05-10 12:40:19 +02:00
Matthias Bisping
de0deaa2f4 renaming 2022-05-10 12:19:42 +02:00
Matthias Bisping
83ce7692e6 renaming; adjusted and added tests for lazy bufferize (formerlay on-demand processor) 2022-05-10 12:18:20 +02:00
Matthias Bisping
949413af4a fixed bug in compute_next of on demand processor that skipped all but the first return value of 1 -> n functions 2022-05-10 11:07:58 +02:00
Matthias Bisping
3dba896038 removed obsolete imports 2022-05-09 14:52:33 +02:00
Matthias Bisping
453281d48d re-order imports 2022-05-09 14:52:07 +02:00
Matthias Bisping
5b913983eb refactoring: move; added null value param to bufferize 2022-05-09 14:50:54 +02:00
Matthias Bisping
c2ed6d78b7 reintroduced buffering wrapper with Nothing item as flushing signal. buffering is controlled via chunking in the REST receiver on client side 2022-05-09 14:23:12 +02:00
Matthias Bisping
f29bd7d4d3 removed need for bufferize wrapper by composing with first(chunks(...)) and applying to on-demand processor execution chain; broke mock pipeline, fixing next 2022-05-09 11:06:14 +02:00
Matthias Bisping
ec620abf54 lazy pipeline test 2022-05-09 01:17:53 +02:00
Matthias Bisping
5b30a32fff added client pipeline without webserver backend 2022-05-09 00:43:21 +02:00
Matthias Bisping
c092e7bcab refactoring; server pipeline WIP 2022-05-08 18:36:15 +02:00
Matthias Bisping
1d09337378 endpoint suffixes passed to stream processor 2022-05-08 17:28:55 +02:00
Matthias Bisping
8c1ad64464 refactoring; additional buffer test 2022-05-07 00:29:58 +02:00
Matthias Bisping
1e21913e37 processor test; refactoring 2022-05-07 00:23:22 +02:00
Matthias Bisping
132a1a1b50 renaming 2022-05-06 23:46:21 +02:00
Matthias Bisping
f99d779c29 refactoring 2022-05-06 23:39:29 +02:00
Matthias Bisping
f428372511 renaming 2022-05-06 23:19:25 +02:00
Matthias Bisping
54359501f9 renaming 2022-05-06 23:19:03 +02:00
Matthias Bisping
cbba561116 replaced eager endpoint in sender test 2022-05-06 23:18:37 +02:00
Matthias Bisping
1daaf2b904 refactoring 2022-05-06 23:13:48 +02:00
Matthias Bisping
866df2dee3 renaming 2022-05-06 23:07:46 +02:00
Matthias Bisping
962b398a9c refactoring: added processor adapter and streamer 2022-05-06 22:49:59 +02:00
Matthias Bisping
3ae4fd8986 refactoring / formatting 2022-05-06 19:57:33 +02:00
Matthias Bisping
98af600787 fixed result validation case for Nothing value 2022-05-06 19:57:05 +02:00
Matthias Bisping
ec650464a8 bufferize test 2022-05-06 19:56:43 +02:00
Matthias Bisping
ed69011bf6 removed comment out lines 2022-05-06 19:16:17 +02:00
Matthias Bisping
0fc3db2fae test params shown with names in pytest log 2022-05-06 19:15:44 +02:00
Matthias Bisping
53eee983c4 added result validaton for processor 2022-05-06 19:15:17 +02:00
Matthias Bisping
5760a6f354 fixed buffer issue: buffer can overflow when called lazily, for some reason. looking into it next. 2022-05-06 19:15:02 +02:00
Matthias Bisping
dca3eaaa54 fixed buffering and result streaming: all items are yielded individually and computed on demand now 2022-05-06 12:14:09 +02:00
Matthias Bisping
1b04c46853 removed code related to eager endpoint 2022-05-05 17:25:42 +02:00
Matthias Bisping
68c24c863f removed eager endpoint (/process) 2022-05-05 17:21:47 +02:00
Matthias Bisping
4a3ac150cf fixed initial computation queue state 2022-05-05 16:42:43 +02:00
Matthias Bisping
373c38113f refactoring 2022-05-05 16:26:18 +02:00
Matthias Bisping
b58a9d11c3 refactoring: added processor class 2022-05-05 16:08:59 +02:00
Matthias Bisping
bd5fe82e06 refactoring: rename 2022-05-05 15:47:25 +02:00
Matthias Bisping
b62652957a refactoring: pipeline 2022-05-05 15:15:05 +02:00
Matthias Bisping
7a3bb9334b removed obsolete code 2022-05-05 14:26:46 +02:00
Matthias Bisping
7ccad043f4 refactoring: rename 2022-05-05 14:25:24 +02:00
Matthias Bisping
4fd6a5aa2a docstring update 2022-05-05 12:52:40 +02:00
Matthias Bisping
a7a44267f1 refactoring: added pipeline class 2022-05-05 12:49:49 +02:00
Matthias Bisping
685edaa62f removed obsolete imports 2022-05-05 11:57:29 +02:00
Matthias Bisping
82d7b7f8cb refactoring: simplify pickup endpoint extraction 2022-05-05 11:56:06 +02:00
Matthias Bisping
d4ffd75e26 added rest callback interpreter 2022-05-05 11:45:52 +02:00
Matthias Bisping
7a1db32c3b refactoring: rename 2022-05-05 11:22:17 +02:00
Matthias Bisping
456fb1db06 refactoring: move 2022-05-05 10:23:51 +02:00
Matthias Bisping
e221b00933 refactoring: Sender, Receiver 2022-05-05 10:17:38 +02:00
Matthias Bisping
24313241a8 sync 2022-05-04 17:46:35 +02:00
Matthias Bisping
ef0e805223 sender baseclass 2022-05-04 17:05:44 +02:00
Matthias Bisping
531ff8d3e0 refactoring; fixed sender 2022-05-04 16:57:08 +02:00
Matthias Bisping
14d83abd72 refactoring; rest sender 2022-05-04 16:46:24 +02:00
Matthias Bisping
00ea224379 refactoring; packer test; sender test 2022-05-04 16:41:29 +02:00
Matthias Bisping
625552ec7c packing and bundling test params 2022-05-04 15:19:52 +02:00
Matthias Bisping
8afd87e44f added packing and bundling test 2022-05-04 15:18:15 +02:00
Matthias Bisping
1d70fb628e removed debug prints 2022-05-04 14:58:22 +02:00
Matthias Bisping
f32004c3a4 added packer test 2022-05-04 14:57:40 +02:00
Matthias Bisping
a301876ab9 refactoring: metadata argument as iterable instead of dict 2022-05-04 14:55:04 +02:00
Matthias Bisping
35045128b4 renaming 2022-05-04 13:51:05 +02:00
Matthias Bisping
463e1b2024 corrected imports 2022-05-04 12:14:30 +02:00
Matthias Bisping
630ed51b27 refactoring 2022-05-04 10:52:19 +02:00
Matthias Bisping
a4079f6710 refactoring 2022-05-04 10:45:13 +02:00
Matthias Bisping
cf0a877569 refactoring 2022-05-04 10:37:46 +02:00
Matthias Bisping
7d8659f257 topological sorting of definitions by caller hierarchy 2022-05-03 18:21:51 +02:00
Matthias Bisping
51a6bf9875 refactoring 2022-05-03 18:06:28 +02:00
Matthias Bisping
85d7ad52dc refactoring 2022-05-03 18:02:53 +02:00
Matthias Bisping
c00973b676 removed debug prints 2022-05-03 17:56:33 +02:00
Matthias Bisping
16fa992cae pickup endpoint working 2022-05-03 17:52:13 +02:00
Matthias Bisping
c315247625 parametrized number of pages for pdf fixture 2022-05-03 15:47:11 +02:00
Matthias Bisping
29fb0dda30 improved 1 -> n test and explanation 2022-05-03 15:43:30 +02:00
Matthias Bisping
ae39ccc8e2 renaming 2022-05-03 14:32:35 +02:00
Matthias Bisping
8575567890 refactoring 2022-05-03 14:25:01 +02:00
Matthias Bisping
92190a42f0 refactoring: move 2022-05-03 09:56:12 +02:00
Matthias Bisping
ea9b405d2a signature harminization for 1 -> 1 and 1 -> n completed 2022-05-02 15:50:14 +02:00
Matthias Bisping
77f23a2185 data nesting harmonized for 1 -> 1 and 1 -> n; pdf page extraction (1 -> n) working for non-batched usage 2022-04-29 17:22:54 +02:00
Matthias Bisping
3172a00aaa refactoring 2022-04-29 17:08:34 +02:00
Matthias Bisping
501f0bd5fc signature harminization for 1 -> 1 and 1 -> n WIP: batched and not batched working again 2022-04-29 16:46:25 +02:00
Matthias Bisping
fd57261631 signature harminization for 1 -> 1 and 1 -> n WIP 2022-04-29 15:43:20 +02:00
Matthias Bisping
23070f3480 changed operation signature to return iterables for 1 -> n compatibility 2022-04-29 13:34:27 +02:00
Matthias Bisping
2550a0eff2 refactoring 2022-04-29 12:34:54 +02:00
Matthias Bisping
f053a072d6 signatures for services updated 2022-04-29 12:01:13 +02:00
Matthias Bisping
00276dbcc7 added batching wrapper for internally batching functions 2022-04-29 10:35:55 +02:00
Matthias Bisping
bc0e9ed643 renaming 2022-04-29 09:51:41 +02:00
Matthias Bisping
940bc3a689 changed buffering function behaviour: applies function to buffer. function needs to be lifted from the outside if single items are to be processed. 2022-04-28 21:58:38 +02:00
Matthias Bisping
a999ce2c3b refactoring 2022-04-28 21:36:42 +02:00
Matthias Bisping
e8315ffea9 signature of process_fn changed. Now returns {'data': ..., 'metadata'} instead of {'data': ...} 2022-04-28 21:12:25 +02:00
Matthias Bisping
087b5af929 refactoring 2022-04-28 18:28:35 +02:00
Matthias Bisping
f47d458217 send json instead of data 2022-04-28 18:05:07 +02:00
Matthias Bisping
0d503d1c1d refactoring 2022-04-28 14:30:50 +02:00
Matthias Bisping
3b0d0868b9 refactoring: method dispatch via peekable rather than special empty data request 2022-04-28 13:01:01 +02:00
Matthias Bisping
da84ff5112 buffer size constrained by assertion 2022-04-27 18:45:14 +02:00
Matthias Bisping
c9f26000d7 test parametrization for number of input items and buffer size 2022-04-27 18:38:43 +02:00
Matthias Bisping
67c4bac4b7 sync 2022-04-27 17:45:48 +02:00
Matthias Bisping
ab5839a126 refactoring; extended partial posting by image payload data 2022-04-27 16:29:52 +02:00
Matthias Bisping
fa4e5e5e0e refactoring; made test dynamic relative to import 2022-04-27 14:15:52 +02:00
Matthias Bisping
e58addf8c4 refactoring 2022-04-27 14:09:18 +02:00
Matthias Bisping
00e21b00ba refactoring 2022-04-27 14:02:41 +02:00
Matthias Bisping
fabc78efce refactoring; formatting 2022-04-27 13:41:48 +02:00
Matthias Bisping
90af62ed2c refactoring 2022-04-27 13:38:55 +02:00
Matthias Bisping
2af648254e formatting 2022-04-27 10:13:46 +02:00
Matthias Bisping
9e8172427c refactoring 2022-04-27 10:08:57 +02:00
Matthias Bisping
e903c69a07 refactoring 2022-04-27 09:13:19 +02:00
Matthias Bisping
7419612c21 partial request by manual receiver buffering V1 2022-04-26 19:54:37 +02:00
Julius Unverfehrt
656bc7cd63 explore partial responses 2022-04-26 16:34:58 +02:00
Julius Unverfehrt
f6ca9c9ac5 explore partial responses 2022-04-26 16:17:50 +02:00
Matthias Bisping
5a948ef7ad partial response test WIP 2022-04-26 16:15:54 +02:00
Matthias Bisping
4078b3e4ec signatures for services 2022-04-26 14:37:12 +02:00
Matthias Bisping
b7882d4452 refactoring: introduced sub-conftest files 2022-04-26 13:13:26 +02:00
Matthias Bisping
01daa634ec refactoring: made docker-compose api call non-autousing 2022-04-26 13:01:15 +02:00
Matthias Bisping
64624a3fd3 refactoring: moved conftest up a dir 2022-04-26 12:52:24 +02:00
Matthias Bisping
afd67d87a6 updated test container dockerfile for new location of tests directory 2022-04-26 12:48:15 +02:00
Matthias Bisping
37881da08e restructuring: moved test out of module scope 2022-04-26 12:45:12 +02:00
125 changed files with 3287 additions and 279 deletions

View File

@ -36,8 +36,6 @@ A configuration is located in `/config.yaml`. All relevant variables can be conf
{
"dossierId": "",
"fileId": "",
"targetFileExtension": "",
"responseFileExtension": ""
}
```

View File

@ -1,5 +1,52 @@
service:
logging_level: $LOGGING_LEVEL_ROOT|DEBUG # Logging level for service logger
name: $SERVICE_NAME|research # Default service name for research service, used for prometheus metric name
response_formatter: default # formats analysis payloads of response messages
upload_formatter: projecting # formats analysis payloads of objects uploaded to storage
# Note: This is not really the right place for this. It should be configured on a per-service basis.
operation: $OPERATION|default
# operation needs to be specified in deployment config for services that are called without an operation specified
operations:
conversion:
input:
multi: False
subdir: ""
extension: ORIGIN.pdf.gz
output:
subdir: "pages_as_images"
extension: json.gz
extraction:
input:
multi: False
subdir: ""
extension: ORIGIN.pdf.gz
output:
subdir: "extracted_images"
extension: json.gz
table_parsing:
input:
multi: True
subdir: "pages_as_images"
extension: json.gz
output:
subdir: "table_parses"
extension: json.gz
image_classification:
input:
multi: True
subdir: "extracted_images"
extension: json.gz
output:
subdir: ""
extension: IMAGE_INFO.json.gz
default:
input:
multi: False
subdir: ""
extension: in.gz
output:
subdir: ""
extension: out.gz
probing_webserver:
host: $PROBING_WEBSERVER_HOST|"0.0.0.0" # Probe webserver address
@ -33,3 +80,8 @@ storage:
azure:
connection_string: $STORAGE_AZURECONNECTIONSTRING|"DefaultEndpointsProtocol=https;AccountName=iqserdevelopment;AccountKey=4imAbV9PYXaztSOMpIyAClg88bAZCXuXMGJG0GA1eIBpdh2PlnFGoRBnKqLy2YZUSTmZ3wJfC7tzfHtuC6FEhQ==;EndpointSuffix=core.windows.net"
retry:
tries: 3
delay: 5
jitter: [1, 3]

76
doc/signatures.txt Normal file
View File

@ -0,0 +1,76 @@
Processing service interface
image classification now : JSON (Mdat PDF) -> (Data PDF -> JSON [Mdat ImObj]
image classification future: JSON [Mdat FunkIm] | Mdat PDF -> (Data [FunkIm] -> JSON [Mdat FunkIm])
object detection : JSON [Mdat PagIm] | Mdat PDF -> (Data [PagIm] -> JSON [[Mdat SemIm]])
NER : JSON [Mdat Dict] -> (Data [Dict] -> JSON [Mdat])
table parsing : JSON [Mdat FunkIm] | Mdat PDF -> (Data [PagIm] -> JSON [[Mdat FunkIm]])
pdf2image : Mdat (fn, [Int], PDF) -> (JSON ([Int], Data PDF) -> [(FunkIm, Mdat)])
image classification now : Mdat (fn, [Int], file) -> (Data PDF -> JSON [Mdat ImObj]
image classification future: Mdat (fn, [Int], dir) -> (Data [FunkIm] -> JSON [Mdat FunkIm])
object detection : Mdat (fn, [Int], dir) -> (Data [PagIm] -> JSON [[Mdat SemIm]])
table parsing : Mdat (fn, [Int], dir) -> (Data [PagIm] -> JSON [[Mdat FunkIm]])
NER : Mdat (fn, [Int], file) -> (Data [Dict] -> JSON [Mdat])
pdf2image : Mdat (fn, [Int], file) -> (JSON ([Int], Data PDF) -> [(FunkIm, Mdat)])
from funcy import identity
access(mdat):
if mdat.path is file:
request = {"data": load(mdat.path), "metadata": mdat}
elif mdat.path is dir:
get_indexed = identity if not mdat.idx else itemgetter(*mdat.idx)
request = {"data": get_indexed(get_files(mdat.path)), "metadata": mdat}
else:
raise BadRequest
storage:
fileId: {
pages: [PagIm]
images: [FunkIm]
sections: gz
}
---------------
assert if targetPath is file then response list must be singleton
{index: [], dir: fileID.pdf.gz, targetPath: fileID.images.json.gz} -> [{data: pdf bytes, metadata: request: ...] -> [{data: null, metadata: request: null, response: {classification infos: ...}]
image classification now : Mdat (fn, [Int], file) -> [JSON (Data PDF, Mdat)] -> [JSON (Data null, Mdat [ImObj])] | 1 -> 1
assert if targetPath is file then response list must be singleton
{index: [], dir: fileID/images, targetPath: fileID.images.json.gz} -> [{data: image bytes, metadata: request: {image location...}] -> [{data: null, metadata: request: null, response: {classification infos: ...}]
image classification future: Mdat (fn, [Int], dir) -> JSON (Data [FunkIm], Mdat) -> [JSON (Data null, Mdat [FunkIm])] |
object detection : Mdat (fn, [Int], dir) -> (Data [PagIm] -> JSON [[Mdat SemIm]])
table parsing : Mdat (fn, [Int], dir) -> (Data [PagIm] -> JSON [[Mdat FunkIm]])
NER : Mdat (fn, [Int], file) -> (Data [Dict] -> JSON [Mdat])
pdf2image : Mdat (fn, [Int], file) -> (JSON ([Int], Data PDF) -> [(FunkIm, Mdat)])
aggregate <==> targetpath is file and index is empty

View File

@ -1,7 +1,7 @@
version: '2'
services:
minio:
image: minio/minio
image: minio/minio:RELEASE.2022-06-11T19-55-32Z
ports:
- "9000:9000"
environment:
@ -12,7 +12,7 @@ services:
command: server /data
network_mode: "bridge"
rabbitmq:
image: docker.io/bitnami/rabbitmq:3.9
image: docker.io/bitnami/rabbitmq:3.9.8
ports:
- '4369:4369'
- '5551:5551'

65
pyinfra/callback.py Normal file
View File

@ -0,0 +1,65 @@
import logging
from funcy import merge, omit, lmap
from pyinfra.exceptions import AnalysisFailure
from pyinfra.pipeline_factory import CachedPipelineFactory
logger = logging.getLogger(__name__)
class Callback:
"""This is the callback that is applied to items pulled from the storage. It forwards these items to an analysis
endpoint.
"""
def __init__(self, pipeline_factory: CachedPipelineFactory):
self.pipeline_factory = pipeline_factory
def __get_pipeline(self, endpoint):
return self.pipeline_factory.get_pipeline(endpoint)
@staticmethod
def __run_pipeline(pipeline, analysis_input: dict):
"""
TODO: Since data and metadata are passed as singletons, there is no buffering and hence no batching happening
within the pipeline. However, the queue acknowledgment logic needs to be changed in order to facilitate
passing non-singletons, to only ack a message, once a response is pulled from the output queue of the
pipeline. Probably the pipeline return value needs to contains the queue message frame (or so), in order for
the queue manager to tell which message to ack.
TODO: casting list (lmap) on `analysis_response_stream` is a temporary solution, while the client pipeline
operates on singletons ([data], [metadata]).
"""
def combine_storage_item_metadata_with_queue_message_metadata(analysis_input):
return merge(analysis_input["metadata"], omit(analysis_input, ["data", "metadata"]))
def remove_queue_message_metadata(analysis_result):
metadata = omit(analysis_result["metadata"], queue_message_keys(analysis_input))
return {**analysis_result, "metadata": metadata}
def queue_message_keys(analysis_input):
return {*analysis_input.keys()}.difference({"data", "metadata"})
try:
data = analysis_input["data"]
metadata = combine_storage_item_metadata_with_queue_message_metadata(analysis_input)
analysis_response_stream = pipeline([data], [metadata])
analysis_response_stream = lmap(remove_queue_message_metadata, analysis_response_stream)
return analysis_response_stream
except Exception as err:
logger.error(err)
raise AnalysisFailure from err
def __call__(self, analysis_input: dict):
"""data_metadata_pack: {'dossierId': ..., 'fileId': ..., 'pages': ..., 'operation': ...}"""
operation = analysis_input.get("operation", "")
pipeline = self.__get_pipeline(operation)
try:
logging.debug(f"Requesting analysis for operation '{operation}'...")
return self.__run_pipeline(pipeline, analysis_input)
except AnalysisFailure:
logging.warning(f"Exception caught when calling analysis endpoint for operation '{operation}'.")

View File

@ -0,0 +1,120 @@
import logging
from functools import lru_cache
from funcy import project, identity, rcompose
from pyinfra.callback import Callback
from pyinfra.config import parse_disjunction_string
from pyinfra.file_descriptor_builder import RedFileDescriptorBuilder
from pyinfra.file_descriptor_manager import FileDescriptorManager
from pyinfra.pipeline_factory import CachedPipelineFactory
from pyinfra.queue.consumer import Consumer
from pyinfra.queue.queue_manager.pika_queue_manager import PikaQueueManager
from pyinfra.server.client_pipeline import ClientPipeline
from pyinfra.server.dispatcher.dispatchers.rest import RestDispatcher
from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStreamer
from pyinfra.server.packer.packers.rest import RestPacker
from pyinfra.server.receiver.receivers.rest import RestReceiver
from pyinfra.storage import storages
from pyinfra.visitor import QueueVisitor
from pyinfra.visitor.downloader import Downloader
from pyinfra.visitor.response_formatter.formatters.default import DefaultResponseFormatter
from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter
from pyinfra.visitor.strategies.response.aggregation import AggregationStorageStrategy, ProjectingUploadFormatter
logger = logging.getLogger(__name__)
class ComponentFactory:
def __init__(self, config):
self.config = config
@lru_cache(maxsize=None)
def get_consumer(self, callback=None):
callback = callback or self.get_callback()
return Consumer(self.get_visitor(callback), self.get_queue_manager())
@lru_cache(maxsize=None)
def get_callback(self, analysis_base_url=None):
analysis_base_url = analysis_base_url or self.config.rabbitmq.callback.analysis_endpoint
callback = Callback(CachedPipelineFactory(base_url=analysis_base_url, pipeline_factory=self.get_pipeline))
def wrapped(body):
body_repr = project(body, ["dossierId", "fileId", "operation"])
logger.info(f"Processing {body_repr}...")
result = callback(body)
logger.info(f"Completed processing {body_repr}...")
return result
return wrapped
@lru_cache(maxsize=None)
def get_visitor(self, callback):
return QueueVisitor(
callback=callback,
data_loader=self.get_downloader(),
response_strategy=self.get_response_strategy(),
response_formatter=self.get_response_formatter(),
)
@lru_cache(maxsize=None)
def get_queue_manager(self):
return PikaQueueManager(self.config.rabbitmq.queues.input, self.config.rabbitmq.queues.output)
@staticmethod
@lru_cache(maxsize=None)
def get_pipeline(endpoint):
return ClientPipeline(
RestPacker(), RestDispatcher(endpoint), RestReceiver(), rcompose(RestPickupStreamer(), RestReceiver())
)
@lru_cache(maxsize=None)
def get_storage(self):
return storages.get_storage(self.config.storage.backend)
@lru_cache(maxsize=None)
def get_response_strategy(self, storage=None):
return AggregationStorageStrategy(
storage=storage or self.get_storage(),
file_descriptor_manager=self.get_file_descriptor_manager(),
upload_formatter=self.get_upload_formatter(),
)
@lru_cache(maxsize=None)
def get_file_descriptor_manager(self):
return FileDescriptorManager(
bucket_name=parse_disjunction_string(self.config.storage.bucket),
file_descriptor_builder=self.get_operation_file_descriptor_builder(),
)
@lru_cache(maxsize=None)
def get_upload_formatter(self):
return {"identity": identity, "projecting": ProjectingUploadFormatter()}[self.config.service.upload_formatter]
@lru_cache(maxsize=None)
def get_operation_file_descriptor_builder(self):
return RedFileDescriptorBuilder(
operation2file_patterns=self.get_operation2file_patterns(),
default_operation_name=self.config.service.operation,
)
@lru_cache(maxsize=None)
def get_response_formatter(self):
return {"default": DefaultResponseFormatter(), "identity": IdentityResponseFormatter()}[
self.config.service.response_formatter
]
@lru_cache(maxsize=None)
def get_operation2file_patterns(self):
if self.config.service.operation is not "default":
self.config.service.operations["default"] = self.config.service.operations[self.config.service.operation]
return self.config.service.operations
@lru_cache(maxsize=None)
def get_downloader(self, storage=None):
return Downloader(
storage=storage or self.get_storage(),
bucket_name=parse_disjunction_string(self.config.storage.bucket),
file_descriptor_manager=self.get_file_descriptor_manager(),
)

View File

@ -1,10 +1,13 @@
"""Implements a config object with dot-indexing syntax."""
import os
from functools import partial
from itertools import chain
from operator import truth
from typing import Iterable
from envyaml import EnvYAML
from funcy import first, juxt, butlast, last
from frozendict import frozendict
from funcy import first, juxt, butlast, last, lmap
from pyinfra.locations import CONFIG_FILE
@ -27,6 +30,9 @@ class DotIndexable:
def __getitem__(self, item):
return self.__getattr__(item)
def __setitem__(self, key, value):
self.x[key] = value
class Config:
def __init__(self, config_path):
@ -39,6 +45,29 @@ class Config:
def __getitem__(self, item):
return self.__getattr__(item)
def __setitem__(self, key, value):
self.__config.key = value
def to_dict(self, frozen=True):
return to_dict(self.__config.export(), frozen=frozen)
def __hash__(self):
return hash(self.to_dict())
def to_dict(v, frozen=True):
def make_dict(*args, **kwargs):
return (frozendict if frozen else dict)(*args, **kwargs)
if isinstance(v, list):
return tuple(map(partial(to_dict, frozen=frozen), v))
elif isinstance(v, DotIndexable):
return make_dict({k: to_dict(v, frozen=frozen) for k, v in v.x.items()})
elif isinstance(v, dict):
return make_dict({k: to_dict(v, frozen=frozen) for k, v in v.items()})
else:
return v
CONFIG = Config(CONFIG_FILE)

View File

@ -0,0 +1,8 @@
from functools import lru_cache
from pyinfra.component_factory import ComponentFactory
@lru_cache(maxsize=None)
def get_component_factory(config):
return ComponentFactory(config)

View File

@ -32,3 +32,19 @@ class NoSuchContainer(KeyError):
class IntentionalTestException(RuntimeError):
pass
class UnexpectedItemType(ValueError):
pass
class NoBufferCapacity(ValueError):
pass
class InvalidMessage(ValueError):
pass
class InvalidStorageItemFormat(ValueError):
pass

View File

@ -0,0 +1,99 @@
import abc
import os
from operator import itemgetter
from funcy import project
class FileDescriptorBuilder:
@abc.abstractmethod
def build_file_descriptor(self, queue_item_body, end="input"):
raise NotImplementedError
@abc.abstractmethod
def build_matcher(self, file_descriptor):
raise NotImplementedError
@staticmethod
@abc.abstractmethod
def build_storage_upload_info(analysis_payload, request_metadata):
raise NotImplementedError
@abc.abstractmethod
def get_path_prefix(self, queue_item_body):
raise NotImplementedError
class RedFileDescriptorBuilder(FileDescriptorBuilder):
"""Defines concrete descriptors for storage objects based on queue messages"""
def __init__(self, operation2file_patterns, default_operation_name):
self.operation2file_patterns = operation2file_patterns or self.get_default_operation2file_patterns()
self.default_operation_name = default_operation_name
@staticmethod
def get_default_operation2file_patterns():
return {"default": {"input": {"subdir": "", "extension": ".in"}, "output": {"subdir": "", "extension": ".out"}}}
def build_file_descriptor(self, queue_item_body, end="input"):
def pages():
if end == "input":
if "id" in queue_item_body:
return [queue_item_body["id"]]
else:
return queue_item_body["pages"] if file_pattern["multi"] else []
elif end == "output":
return [queue_item_body["id"]]
else:
raise ValueError(f"Invalid argument: {end=}") # TODO: use an enum for `end`
operation = queue_item_body.get("operation", self.default_operation_name)
file_pattern = self.operation2file_patterns[operation][end]
file_descriptor = {
**project(queue_item_body, ["dossierId", "fileId", "pages"]),
"pages": pages(),
"extension": file_pattern["extension"],
"subdir": file_pattern["subdir"],
}
return file_descriptor
def build_matcher(self, file_descriptor):
def make_filename(file_id, subdir, suffix):
return os.path.join(file_id, subdir, suffix) if subdir else f"{file_id}.{suffix}"
dossier_id, file_id, subdir, pages, extension = itemgetter(
"dossierId", "fileId", "subdir", "pages", "extension"
)(file_descriptor)
matcher = os.path.join(
dossier_id, make_filename(file_id, subdir, self.__build_page_regex(pages, subdir) + extension)
)
return matcher
@staticmethod
def __build_page_regex(pages, subdir):
n_pages = len(pages)
if n_pages > 1:
page_re = "id:(" + "|".join(map(str, pages)) + ")."
elif n_pages == 1:
page_re = f"id:{pages[0]}."
else: # no pages specified -> either all pages or no pages, depending on whether a subdir is specified
page_re = r"id:\d+." if subdir else ""
return page_re
@staticmethod
def build_storage_upload_info(analysis_payload, request_metadata):
storage_upload_info = {**request_metadata, "id": analysis_payload["metadata"].get("id", 0)}
return storage_upload_info
def get_path_prefix(self, queue_item_body):
prefix = "/".join(itemgetter("dossierId", "fileId")(self.build_file_descriptor(queue_item_body, end="input")))
return prefix

View File

@ -0,0 +1,63 @@
from pyinfra.file_descriptor_builder import FileDescriptorBuilder
class FileDescriptorManager:
"""Decorates a file descriptor builder with additional convenience functionality and this way provides a
comprehensive interface for all file descriptor related operations, while the concrete descriptor logic is
implemented in a file descriptor builder.
TODO: This is supposed to be fully decoupled from the concrete file descriptor builder implementation, however some
bad coupling is still left.
"""
def __init__(self, bucket_name, file_descriptor_builder: FileDescriptorBuilder):
self.bucket_name = bucket_name
self.operation_file_descriptor_builder = file_descriptor_builder
def get_input_object_name(self, queue_item_body: dict):
return self.get_object_name(queue_item_body, end="input")
def get_output_object_name(self, queue_item_body: dict):
return self.get_object_name(queue_item_body, end="output")
def get_object_name(self, queue_item_body: dict, end):
file_descriptor = self.build_file_descriptor(queue_item_body, end=end)
object_name = self.__build_matcher(file_descriptor)
return object_name
def build_file_descriptor(self, queue_item_body, end="input"):
return self.operation_file_descriptor_builder.build_file_descriptor(queue_item_body, end=end)
def build_input_matcher(self, queue_item_body):
return self.build_matcher(queue_item_body, end="input")
def build_output_matcher(self, queue_item_body):
return self.build_matcher(queue_item_body, end="output")
def build_matcher(self, queue_item_body, end):
file_descriptor = self.build_file_descriptor(queue_item_body, end=end)
return self.__build_matcher(file_descriptor)
def __build_matcher(self, file_descriptor):
return self.operation_file_descriptor_builder.build_matcher(file_descriptor)
def get_input_object_descriptor(self, queue_item_body):
return self.get_object_descriptor(queue_item_body, end="input")
def get_output_object_descriptor(self, storage_upload_info):
return self.get_object_descriptor(storage_upload_info, end="output")
def get_object_descriptor(self, queue_item_body, end):
# TODO: this is complected with the Storage class API
# FIXME: bad coupling
return {
"bucket_name": self.bucket_name,
"object_name": self.get_object_name(queue_item_body, end=end),
}
def build_storage_upload_info(self, analysis_payload, request_metadata):
return self.operation_file_descriptor_builder.build_storage_upload_info(analysis_payload, request_metadata)
def get_path_prefix(self, queue_item_body):
return self.operation_file_descriptor_builder.get_path_prefix(queue_item_body)

View File

@ -6,8 +6,7 @@ from waitress import serve
from pyinfra.config import CONFIG
logger = logging.getLogger(__file__)
logger.setLevel(CONFIG.service.logging_level)
logger = logging.getLogger()
def run_probing_webserver(app, host=None, port=None, mode=None):

View File

View File

@ -0,0 +1,14 @@
import abc
class ParsingError(Exception):
pass
class BlobParser(abc.ABC):
@abc.abstractmethod
def parse(self, blob: bytes):
pass
def __call__(self, blob: bytes):
return self.parse(blob)

View File

@ -0,0 +1,67 @@
import logging
from funcy import rcompose
from pyinfra.parser.blob_parser import ParsingError
logger = logging.getLogger(__name__)
class Either:
def __init__(self, item):
self.item = item
def bind(self):
return self.item
class Left(Either):
pass
class Right(Either):
pass
class EitherParserWrapper:
def __init__(self, parser):
self.parser = parser
def __log(self, result):
if isinstance(result, Right):
logger.log(logging.DEBUG - 5, f"{self.parser.__class__.__name__} succeeded or forwarded on {result.bind()}")
else:
logger.log(logging.DEBUG - 5, f"{self.parser.__class__.__name__} failed on {result.bind()}")
return result
def parse(self, item: Either):
if isinstance(item, Left):
try:
return Right(self.parser(item.bind()))
except ParsingError:
return item
elif isinstance(item, Right):
return item
else:
return self.parse(Left(item))
def __call__(self, item):
return self.__log(self.parse(item))
class EitherParserComposer:
def __init__(self, *parsers):
self.parser = rcompose(*map(EitherParserWrapper, parsers))
def parse(self, item):
result = self.parser(item)
if isinstance(result, Right):
return result.bind()
else:
raise ParsingError("All parsers failed.")
def __call__(self, item):
return self.parse(item)

View File

View File

@ -0,0 +1,7 @@
from pyinfra.parser.blob_parser import BlobParser
class IdentityBlobParser(BlobParser):
def parse(self, data: bytes):
return data

View File

@ -0,0 +1,21 @@
import json
from pyinfra.parser.blob_parser import BlobParser, ParsingError
from pyinfra.server.packing import string_to_bytes
class JsonBlobParser(BlobParser):
def parse(self, data: bytes):
try:
data = data.decode()
data = json.loads(data)
except (UnicodeDecodeError, json.JSONDecodeError, AttributeError) as err:
raise ParsingError from err
try:
data["data"] = string_to_bytes(data["data"])
except (KeyError, TypeError) as err:
raise ParsingError from err
return data

View File

@ -0,0 +1,9 @@
from pyinfra.parser.blob_parser import BlobParser, ParsingError
class StringBlobParser(BlobParser):
def parse(self, data: bytes):
try:
return data.decode()
except Exception as err:
raise ParsingError from err

View File

@ -0,0 +1,18 @@
class CachedPipelineFactory:
def __init__(self, base_url, pipeline_factory):
self.base_url = base_url
self.operation2pipeline = {}
self.pipeline_factory = pipeline_factory
def get_pipeline(self, operation: str):
pipeline = self.operation2pipeline.get(operation, None) or self.__register_pipeline(operation)
return pipeline
def __register_pipeline(self, operation):
endpoint = self.__make_endpoint(operation)
pipeline = self.pipeline_factory(endpoint)
self.operation2pipeline[operation] = pipeline
return pipeline
def __make_endpoint(self, operation):
return f"{self.base_url}/{operation}"

View File

@ -2,15 +2,15 @@ from pyinfra.queue.queue_manager.queue_manager import QueueManager
class Consumer:
def __init__(self, callback, queue_manager: QueueManager):
def __init__(self, visitor, queue_manager: QueueManager):
self.queue_manager = queue_manager
self.callback = callback
self.visitor = visitor
def consume_and_publish(self):
self.queue_manager.consume_and_publish(self.callback)
def consume_and_publish(self, n=None):
self.queue_manager.consume_and_publish(self.visitor, n=n)
def basic_consume_and_publish(self):
self.queue_manager.basic_consume_and_publish(self.callback)
self.queue_manager.basic_consume_and_publish(self.visitor)
def consume(self, **kwargs):
return self.queue_manager.consume(**kwargs)

View File

@ -1,18 +1,18 @@
import json
import logging
import time
from itertools import islice
import pika
from pyinfra.config import CONFIG
from pyinfra.exceptions import ProcessingFailure, DataLoadingFailure
from pyinfra.queue.queue_manager.queue_manager import QueueHandle, QueueManager
from pyinfra.visitor import QueueVisitor
logger = logging.getLogger("pika")
logger.setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
logger.setLevel(CONFIG.service.logging_level)
logger = logging.getLogger()
def monkey_patch_queue_handle(channel, queue) -> QueueHandle:
@ -83,7 +83,7 @@ class PikaQueueManager(QueueManager):
self.channel.queue_declare(input_queue, arguments=args, auto_delete=False, durable=True)
self.channel.queue_declare(output_queue, arguments=args, auto_delete=False, durable=True)
def republish(self, body, n_current_attempts, frame):
def republish(self, body: bytes, n_current_attempts, frame):
self.channel.basic_publish(
exchange="",
routing_key=self._input_queue,
@ -100,7 +100,7 @@ class PikaQueueManager(QueueManager):
logger.error(f"Adding to dead letter queue: {body}")
self.channel.basic_reject(delivery_tag=frame.delivery_tag, requeue=False)
def publish_response(self, message, callback, max_attempts=3):
def publish_response(self, message, visitor: QueueVisitor, max_attempts=3):
logger.debug(f"Processing {message}.")
@ -109,8 +109,15 @@ class PikaQueueManager(QueueManager):
n_attempts = get_n_previous_attempts(properties) + 1
try:
response = json.dumps(callback(json.loads(body)))
self.channel.basic_publish("", self._output_queue, response.encode())
response_messages = visitor(json.loads(body))
if isinstance(response_messages, dict):
response_messages = [response_messages]
for response_message in response_messages:
response_message = json.dumps(response_message).encode()
self.channel.basic_publish("", self._output_queue, response_message)
self.channel.basic_ack(frame.delivery_tag)
except (ProcessingFailure, DataLoadingFailure):
@ -124,20 +131,21 @@ class PikaQueueManager(QueueManager):
def pull_request(self):
return self.channel.basic_get(self._input_queue)
def consume(self, inactivity_timeout=None):
def consume(self, inactivity_timeout=None, n=None):
logger.debug("Consuming")
return self.channel.consume(self._input_queue, inactivity_timeout=inactivity_timeout)
gen = self.channel.consume(self._input_queue, inactivity_timeout=inactivity_timeout)
yield from islice(gen, n)
def consume_and_publish(self, visitor):
def consume_and_publish(self, visitor: QueueVisitor, n=None):
logger.info(f"Consuming with callback {visitor.callback.__name__}")
logger.info(f"Consuming input queue.")
for message in self.consume():
for message in self.consume(n=n):
self.publish_response(message, visitor)
def basic_consume_and_publish(self, visitor):
def basic_consume_and_publish(self, visitor: QueueVisitor):
logger.info(f"Basic consuming with callback {visitor.callback.__name__}")
logger.info(f"Basic consuming input queue.")
def callback(channel, frame, properties, body):
message = (frame, properties, body)
@ -150,6 +158,8 @@ class PikaQueueManager(QueueManager):
try:
self.channel.queue_purge(self._input_queue)
self.channel.queue_purge(self._output_queue)
assert self.input_queue.to_list() == []
assert self.output_queue.to_list() == []
except pika.exceptions.ChannelWrongStateError:
pass

View File

@ -43,7 +43,7 @@ class QueueManager(abc.ABC):
raise NotImplementedError
@abc.abstractmethod
def consume_and_publish(self, callback):
def consume_and_publish(self, callback, n=None):
raise NotImplementedError
@abc.abstractmethod

View File

View File

View File

@ -0,0 +1,37 @@
import logging
from collections import deque
from funcy import repeatedly, identity
from pyinfra.exceptions import NoBufferCapacity
from pyinfra.server.nothing import Nothing
logger = logging.getLogger(__name__)
def bufferize(fn, buffer_size=3, persist_fn=identity, null_value=None):
def buffered_fn(item):
if item is not Nothing:
buffer.append(persist_fn(item))
response_payload = fn(repeatedly(buffer.popleft, n_items_to_pop(buffer, item is Nothing)))
return response_payload or null_value
def buffer_full(current_buffer_size):
if current_buffer_size > buffer_size:
logger.warning(f"Overfull buffer. size: {current_buffer_size}; intended capacity: {buffer_size}")
return current_buffer_size == buffer_size
def n_items_to_pop(buffer, final):
current_buffer_size = len(buffer)
return (final or buffer_full(current_buffer_size)) * current_buffer_size
if not buffer_size > 0:
raise NoBufferCapacity("Buffer size must be greater than zero.")
buffer = deque()
return buffered_fn

View File

@ -0,0 +1,24 @@
from collections import deque
from itertools import takewhile
from funcy import repeatedly
from pyinfra.server.nothing import is_not_nothing, Nothing
def stream_queue(queue):
yield from takewhile(is_not_nothing, repeatedly(queue.popleft))
class Queue:
def __init__(self):
self.__queue = deque()
def append(self, package) -> None:
self.__queue.append(package)
def popleft(self):
return self.__queue.popleft() if self.__queue else Nothing
def __bool__(self):
return bool(self.__queue)

View File

@ -0,0 +1,44 @@
from itertools import chain, takewhile
from typing import Iterable
from funcy import first, repeatedly, mapcat
from pyinfra.server.buffering.bufferize import bufferize
from pyinfra.server.nothing import Nothing, is_not_nothing
class FlatStreamBuffer:
"""Wraps a stream buffer and chains its output. Also flushes the stream buffer when applied to an iterable."""
def __init__(self, fn, buffer_size=3):
"""Function `fn` needs to be mappable and return an iterable; ideally `fn` returns a generator."""
self.stream_buffer = StreamBuffer(fn, buffer_size=buffer_size)
def __call__(self, items):
items = chain(items, [Nothing])
yield from mapcat(self.stream_buffer, items)
class StreamBuffer:
"""Puts a streaming function between an input and an output buffer."""
def __init__(self, fn, buffer_size=3):
"""Function `fn` needs to be mappable and return an iterable; ideally `fn` returns a generator."""
self.fn = bufferize(fn, buffer_size=buffer_size, null_value=[])
self.result_stream = chain([])
def __call__(self, item) -> Iterable:
self.push(item)
yield from takewhile(is_not_nothing, repeatedly(self.pop))
def push(self, item):
self.result_stream = chain(self.result_stream, self.compute(item))
def compute(self, item):
try:
yield from self.fn(item)
except TypeError as err:
raise TypeError("Function failed with type-error. Is it mappable?") from err
def pop(self):
return first(chain(self.result_stream, [Nothing]))

View File

@ -0,0 +1,16 @@
from funcy import rcompose, flatten
# TODO: remove the dispatcher component from the pipeline; it no longer actually dispatches
class ClientPipeline:
def __init__(self, packer, dispatcher, receiver, interpreter):
self.pipe = rcompose(
packer,
dispatcher,
receiver,
interpreter,
flatten, # each analysis call returns an iterable. Can be empty, singleton or multi item. Hence, flatten.
)
def __call__(self, *args, **kwargs):
yield from self.pipe(*args, **kwargs)

View File

@ -0,0 +1,27 @@
from itertools import tee
from typing import Iterable
def inspect(prefix="inspect", embed=False):
"""Can be used to inspect compositions of generator functions by placing inbetween two functions."""
def inner(x):
if isinstance(x, Iterable) and not isinstance(x, dict) and not isinstance(x, tuple):
x, y = tee(x)
y = list(y)
else:
y = x
l = f" {len(y)} items" if isinstance(y, list) else ""
print(f"{prefix}{l}:", y)
if embed:
import IPython
IPython.embed()
return x
return inner

View File

View File

@ -0,0 +1,30 @@
import abc
from typing import Iterable
from more_itertools import peekable
from pyinfra.server.nothing import Nothing
def has_next(peekable_iter):
return peekable_iter.peek(Nothing) is not Nothing
class Dispatcher:
def __call__(self, packages: Iterable[dict]):
yield from self.dispatch_methods(packages)
def dispatch_methods(self, packages):
packages = peekable(packages)
for package in packages:
method = self.patch if has_next(packages) else self.post
response = method(package)
yield response
@abc.abstractmethod
def patch(self, package):
raise NotImplementedError
@abc.abstractmethod
def post(self, package):
raise NotImplementedError

View File

@ -0,0 +1,21 @@
from itertools import takewhile
from funcy import repeatedly, notnone
from pyinfra.server.dispatcher.dispatcher import Dispatcher
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
class QueuedStreamFunctionDispatcher(Dispatcher):
def __init__(self, queued_stream_function: QueuedStreamFunction):
self.queued_stream_function = queued_stream_function
def patch(self, package):
self.queued_stream_function.push(package)
# TODO: this is wonky and a result of the pipeline components having shifted behaviour through previous
# refactorings. The analogous functionality for the rest pipeline is in the interpreter. Correct this
# asymmetry!
yield from takewhile(notnone, repeatedly(self.queued_stream_function.pop))
def post(self, package):
yield from self.patch(package)

View File

@ -0,0 +1,14 @@
import requests
from pyinfra.server.dispatcher.dispatcher import Dispatcher
class RestDispatcher(Dispatcher):
def __init__(self, endpoint):
self.endpoint = endpoint
def patch(self, package):
return requests.patch(self.endpoint, json=package)
def post(self, package):
return requests.post(self.endpoint, json=package)

View File

View File

View File

@ -0,0 +1,8 @@
import abc
from typing import Iterable
class Interpreter(abc.ABC):
@abc.abstractmethod
def __call__(self, payloads: Iterable):
pass

View File

@ -0,0 +1,8 @@
from typing import Iterable
from pyinfra.server.interpreter.interpreter import Interpreter
class IdentityInterpreter(Interpreter):
def __call__(self, payloads: Iterable):
yield from payloads

View File

@ -0,0 +1,23 @@
from typing import Iterable
import requests
from funcy import takewhile, repeatedly, mapcat
from pyinfra.server.interpreter.interpreter import Interpreter
def stream_responses(endpoint):
def receive():
response = requests.get(endpoint)
return response
def more_is_coming(response):
return response.status_code == 206
response_stream = takewhile(more_is_coming, repeatedly(receive))
yield from response_stream
class RestPickupStreamer(Interpreter):
def __call__(self, payloads: Iterable):
yield from mapcat(stream_responses, payloads)

View File

@ -0,0 +1,39 @@
from functools import lru_cache
from funcy import identity
from prometheus_client import CollectorRegistry, Summary
from pyinfra.server.operation_dispatcher import OperationDispatcher
class OperationDispatcherMonitoringDecorator:
def __init__(self, operation_dispatcher: OperationDispatcher, naming_policy=identity):
self.operation_dispatcher = operation_dispatcher
self.operation2metric = {}
self.naming_policy = naming_policy
@property
@lru_cache(maxsize=None)
def registry(self):
return CollectorRegistry(auto_describe=True)
def make_summary_instance(self, op: str):
return Summary(f"{self.naming_policy(op)}_seconds", f"Time spent on {op}.", registry=self.registry)
def submit(self, operation, request):
return self.operation_dispatcher.submit(operation, request)
def pickup(self, operation):
with self.get_monitor(operation):
return self.operation_dispatcher.pickup(operation)
def get_monitor(self, operation):
monitor = self.operation2metric.get(operation, None) or self.register_operation(operation)
return monitor.time()
def register_operation(self, operation):
summary = self.make_summary_instance(operation)
self.operation2metric[operation] = summary
return summary

View File

@ -0,0 +1,17 @@
from itertools import chain
from typing import Iterable, Union, Tuple
from pyinfra.exceptions import UnexpectedItemType
def normalize(itr: Iterable[Union[Tuple, Iterable]]) -> Iterable[Tuple]:
return chain.from_iterable(map(normalize_item, normalize_item(itr)))
def normalize_item(itm: Union[Tuple, Iterable]) -> Iterable:
if isinstance(itm, tuple):
return [itm]
elif isinstance(itm, Iterable):
return itm
else:
raise UnexpectedItemType("Encountered an item that could not be normalized to a list.")

View File

@ -0,0 +1,6 @@
class Nothing:
pass
def is_not_nothing(x):
return x is not Nothing

View File

@ -0,0 +1,33 @@
from itertools import starmap, tee
from typing import Dict
from funcy import juxt, zipdict, cat
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
from pyinfra.server.stream.rest import LazyRestProcessor
class OperationDispatcher:
def __init__(self, operation2function: Dict[str, QueuedStreamFunction]):
submit_suffixes, pickup_suffixes = zip(*map(juxt(submit_suffix, pickup_suffix), operation2function))
processors = starmap(LazyRestProcessor, zip(operation2function.values(), submit_suffixes, pickup_suffixes))
self.operation2processor = zipdict(submit_suffixes + pickup_suffixes, cat(tee(processors)))
@classmethod
@property
def pickup_suffix(cls):
return pickup_suffix("")
def submit(self, operation, request):
return self.operation2processor[operation].push(request)
def pickup(self, operation):
return self.operation2processor[operation].pop()
def submit_suffix(op: str):
return "" if not op else op
def pickup_suffix(op: str):
return "pickup" if not op else f"{op}_pickup"

View File

View File

@ -0,0 +1,8 @@
import abc
from typing import Iterable
class Packer(abc.ABC):
@abc.abstractmethod
def __call__(self, data: Iterable, metadata: Iterable):
pass

View File

@ -0,0 +1,14 @@
from itertools import starmap
from typing import Iterable
from pyinfra.server.packer.packer import Packer
def bundle(data: bytes, metadata: dict):
package = {"data": data, "metadata": metadata}
return package
class IdentityPacker(Packer):
def __call__(self, data: Iterable, metadata):
yield from starmap(bundle, zip(data, metadata))

View File

@ -0,0 +1,9 @@
from typing import Iterable
from pyinfra.server.packer.packer import Packer
from pyinfra.server.packing import pack_data_and_metadata_for_rest_transfer
class RestPacker(Packer):
def __call__(self, data: Iterable[bytes], metadata: Iterable[dict]):
yield from pack_data_and_metadata_for_rest_transfer(data, metadata)

34
pyinfra/server/packing.py Normal file
View File

@ -0,0 +1,34 @@
import base64
from _operator import itemgetter
from itertools import starmap
from typing import Iterable
from funcy import compose
from pyinfra.utils.func import starlift, lift
def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable):
yield from starmap(pack, zip(data, metadata))
def unpack_fn_pack(fn):
return compose(starlift(pack), fn, lift(unpack))
def pack(data: bytes, metadata: dict):
package = {"data": bytes_to_string(data), "metadata": metadata}
return package
def unpack(package):
data, metadata = itemgetter("data", "metadata")(package)
return string_to_bytes(data), metadata
def bytes_to_string(data: bytes) -> str:
return base64.b64encode(data).decode()
def string_to_bytes(data: str) -> bytes:
return base64.b64decode(data.encode())

View File

View File

@ -0,0 +1,8 @@
import abc
from typing import Iterable
class Receiver(abc.ABC):
@abc.abstractmethod
def __call__(self, package: Iterable):
pass

View File

@ -0,0 +1,11 @@
from typing import Iterable
from pyinfra.server.receiver.receiver import Receiver
from funcy import notnone
class QueuedStreamFunctionReceiver(Receiver):
def __call__(self, responses: Iterable):
for response in filter(notnone, responses):
yield response

View File

@ -0,0 +1,16 @@
from typing import Iterable
import requests
from funcy import chunks, flatten
from pyinfra.server.receiver.receiver import Receiver
class RestReceiver(Receiver):
def __init__(self, chunk_size=3):
self.chunk_size = chunk_size
def __call__(self, responses: Iterable[requests.Response]):
for response in flatten(chunks(self.chunk_size, responses)):
response.raise_for_status()
yield response.json()

100
pyinfra/server/server.py Normal file
View File

@ -0,0 +1,100 @@
from functools import singledispatch
from typing import Dict, Callable, Union
from flask import Flask, jsonify, request
from prometheus_client import generate_latest
from pyinfra.config import CONFIG
from pyinfra.server.buffering.stream import FlatStreamBuffer
from pyinfra.server.monitoring import OperationDispatcherMonitoringDecorator
from pyinfra.server.operation_dispatcher import OperationDispatcher
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
@singledispatch
def set_up_processing_server(arg: Union[dict, Callable], buffer_size=1):
"""Produces a processing server given a streamable function or a mapping from operations to streamable functions.
Streamable functions are constructed by calling pyinfra.server.utils.make_streamable_and_wrap_in_packing_logic on a
function taking a tuple of data and metadata and also returning a tuple or yielding tuples of data and metadata.
If the function doesn't produce data, data should be an empty byte string.
If the function doesn't produce metadata, metadata should be an empty dictionary.
Args:
arg: streamable function or mapping of operations: str to streamable functions
buffer_size: If your function operates on batches this parameter controls how many items are aggregated before
your function is applied.
TODO: buffer_size has to be controllable on per function basis.
Returns:
Processing server: flask app
"""
pass
@set_up_processing_server.register
def _(operation2stream_fn: dict, buffer_size=1):
return __stream_fn_to_processing_server(operation2stream_fn, buffer_size)
@set_up_processing_server.register
def _(stream_fn: object, buffer_size=1):
operation2stream_fn = {None: stream_fn}
return __stream_fn_to_processing_server(operation2stream_fn, buffer_size)
def __stream_fn_to_processing_server(operation2stream_fn: dict, buffer_size):
operation2stream_fn = {
op: QueuedStreamFunction(FlatStreamBuffer(fn, buffer_size)) for op, fn in operation2stream_fn.items()
}
return __set_up_processing_server(operation2stream_fn)
def __set_up_processing_server(operation2function: Dict[str, QueuedStreamFunction]):
app = Flask(__name__)
dispatcher = OperationDispatcherMonitoringDecorator(
OperationDispatcher(operation2function),
naming_policy=naming_policy,
)
def ok():
resp = jsonify("OK")
resp.status_code = 200
return resp
@app.route("/ready", methods=["GET"])
def ready():
return ok()
@app.route("/health", methods=["GET"])
def healthy():
return ok()
@app.route("/prometheus", methods=["GET"])
def prometheus():
return generate_latest(registry=dispatcher.registry)
@app.route("/<operation>", methods=["POST", "PATCH"])
def submit(operation):
return dispatcher.submit(operation, request)
@app.route("/", methods=["POST", "PATCH"])
def submit_default():
return dispatcher.submit("", request)
@app.route("/<operation>", methods=["GET"])
def pickup(operation):
return dispatcher.pickup(operation)
return app
def naming_policy(op_name: str):
pop_suffix = OperationDispatcher.pickup_suffix
prefix = f"redactmanager_{CONFIG.service.name}"
op_display_name = op_name.replace(f"_{pop_suffix}", "") if op_name != pop_suffix else "default"
complete_display_name = f"{prefix}_{op_display_name}"
return complete_display_name

View File

View File

@ -0,0 +1,21 @@
from funcy import first
from pyinfra.server.buffering.queue import stream_queue, Queue
class QueuedStreamFunction:
def __init__(self, stream_function):
"""Combines a stream function with a queue.
Args:
stream_function: Needs to operate on iterables.
"""
self.queue = Queue()
self.stream_function = stream_function
def push(self, item):
self.queue.append(item)
def pop(self):
items = stream_queue(self.queue)
return first(self.stream_function(items))

View File

@ -0,0 +1,51 @@
import logging
from flask import jsonify
from funcy import drop
from pyinfra.server.nothing import Nothing
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
logger = logging.getLogger(__name__)
class LazyRestProcessor:
def __init__(self, queued_stream_function: QueuedStreamFunction, submit_suffix="submit", pickup_suffix="pickup"):
self.submit_suffix = submit_suffix
self.pickup_suffix = pickup_suffix
self.queued_stream_function = queued_stream_function
def push(self, request):
self.queued_stream_function.push(request.json)
return jsonify(replace_suffix(request.base_url, self.submit_suffix, self.pickup_suffix))
def pop(self):
result = self.queued_stream_function.pop() or Nothing
if not valid(result):
logger.error(f"Received invalid result: {result}")
result = Nothing
if result is Nothing:
logger.info("Analysis completed successfully.")
resp = jsonify("No more items left")
resp.status_code = 204
else:
logger.debug("Partial analysis completed.")
resp = jsonify(result)
resp.status_code = 206
return resp
def valid(result):
return isinstance(result, dict) or result is Nothing
def replace_suffix(strn, suf, repl):
return remove_last_n(strn, len(suf)) + repl
def remove_last_n(strn, n):
return "".join(reversed(list(drop(n, reversed(strn)))))

16
pyinfra/server/utils.py Normal file
View File

@ -0,0 +1,16 @@
from funcy import compose, identity
from pyinfra.server.normalization import normalize
from pyinfra.server.packing import unpack_fn_pack
from pyinfra.utils.func import starlift
def make_streamable_and_wrap_in_packing_logic(fn, batched):
fn = make_streamable(fn, batched)
fn = unpack_fn_pack(fn)
return fn
def make_streamable(fn, batched):
# FIXME: something broken with batched == True
return compose(normalize, (identity if batched else starlift)(fn))

View File

@ -30,5 +30,5 @@ class StorageAdapter(ABC):
raise NotImplementedError
@abstractmethod
def get_all_object_names(self, bucket_name):
def get_all_object_names(self, bucket_name, prefix=None):
raise NotImplementedError

View File

@ -1,5 +1,4 @@
import logging
from itertools import repeat
from operator import attrgetter
from azure.storage.blob import ContainerClient, BlobServiceClient
@ -20,15 +19,15 @@ class AzureStorageAdapter(StorageAdapter):
container_client = self.__client.get_container_client(bucket_name)
return container_client.exists()
def make_bucket(self, bucket_name):
container_client = self.__client.get_container_client(bucket_name)
container_client if container_client.exists() else self.__client.create_container(bucket_name)
def __provide_container_client(self, bucket_name) -> ContainerClient:
self.make_bucket(bucket_name)
container_client = self.__client.get_container_client(bucket_name)
return container_client
def make_bucket(self, bucket_name):
container_client = self.__client.get_container_client(bucket_name)
container_client if container_client.exists() else self.__client.create_container(bucket_name)
def put_object(self, bucket_name, object_name, data):
logger.debug(f"Uploading '{object_name}'...")
container_client = self.__provide_container_client(bucket_name)
@ -59,7 +58,7 @@ class AzureStorageAdapter(StorageAdapter):
blobs = container_client.list_blobs()
container_client.delete_blobs(*blobs)
def get_all_object_names(self, bucket_name):
def get_all_object_names(self, bucket_name, prefix=None):
container_client = self.__provide_container_client(bucket_name)
blobs = container_client.list_blobs()
return zip(repeat(bucket_name), map(attrgetter("name"), blobs))
blobs = container_client.list_blobs(name_starts_with=prefix)
return map(attrgetter("name"), blobs)

View File

@ -1,6 +1,6 @@
import io
from itertools import repeat
import logging
from itertools import repeat
from operator import attrgetter
from minio import Minio
@ -53,6 +53,6 @@ class S3StorageAdapter(StorageAdapter):
for obj in objects:
self.__client.remove_object(bucket_name, obj.object_name)
def get_all_object_names(self, bucket_name):
objs = self.__client.list_objects(bucket_name, recursive=True)
return zip(repeat(bucket_name), map(attrgetter("object_name"), objs))
def get_all_object_names(self, bucket_name, prefix=None):
objs = self.__client.list_objects(bucket_name, recursive=True, prefix=prefix)
return map(attrgetter("object_name"), objs)

View File

@ -1,10 +1,10 @@
import logging
from retry import retry
from pyinfra.config import CONFIG
from pyinfra.exceptions import DataLoadingFailure
from pyinfra.storage.adapters.adapter import StorageAdapter
from pyinfra.utils.retry import retry
logger = logging.getLogger(__name__)
logger.setLevel(CONFIG.service.logging_level)
@ -26,7 +26,7 @@ class Storage:
def get_object(self, bucket_name, object_name):
return self.__get_object(bucket_name, object_name)
@retry(DataLoadingFailure, tries=3, delay=5, jitter=(1, 3))
@retry(DataLoadingFailure)
def __get_object(self, bucket_name, object_name):
try:
return self.__adapter.get_object(bucket_name, object_name)
@ -40,5 +40,5 @@ class Storage:
def clear_bucket(self, bucket_name):
return self.__adapter.clear_bucket(bucket_name)
def get_all_object_names(self, bucket_name):
return self.__adapter.get_all_object_names(bucket_name)
def get_all_object_names(self, bucket_name, prefix=None):
return self.__adapter.get_all_object_names(bucket_name, prefix=prefix)

14
pyinfra/utils/encoding.py Normal file
View File

@ -0,0 +1,14 @@
import gzip
import json
def pack_analysis_payload(analysis_payload):
return compress(json.dumps(analysis_payload).encode())
def compress(data: bytes):
return gzip.compress(data)
def decompress(data: bytes):
return gzip.decompress(data)

47
pyinfra/utils/func.py Normal file
View File

@ -0,0 +1,47 @@
from itertools import starmap, tee
from funcy import curry, compose, filter
def lift(fn):
return curry(map)(fn)
def llift(fn):
return compose(list, lift(fn))
def starlift(fn):
return curry(starmap)(fn)
def lstarlift(fn):
return compose(list, starlift(fn))
def parallel(*fs):
return lambda *args: (f(a) for f, a in zip(fs, args))
def star(f):
return lambda x: f(*x)
def duplicate_stream_and_apply(f1, f2):
return compose(star(parallel(f1, f2)), tee)
def foreach(fn, iterable):
for itm in iterable:
fn(itm)
def flift(pred):
return curry(filter)(pred)
def parallel_map(f1, f2):
"""Applies functions to a stream in parallel and yields a stream of tuples:
parallel_map :: a -> b, a -> c -> [a] -> [(b, c)]
"""
return compose(star(zip), duplicate_stream_and_apply(f1, f2))

15
pyinfra/utils/retry.py Normal file
View File

@ -0,0 +1,15 @@
from pyinfra.config import CONFIG
from retry import retry as _retry
def retry(exception):
def decorator(fn):
@_retry(exception, tries=CONFIG.retry.tries, delay=CONFIG.retry.delay, jitter=tuple(CONFIG.retry.jitter))
def inner(*args, **kwargs):
return fn(*args, **kwargs)
return inner
return decorator

View File

@ -1,91 +0,0 @@
import abc
import gzip
import json
import logging
from operator import itemgetter
from typing import Callable
from pyinfra.config import CONFIG, parse_disjunction_string
from pyinfra.exceptions import DataLoadingFailure
from pyinfra.storage.storage import Storage
def get_object_name(body):
dossier_id, file_id, target_file_extension = itemgetter("dossierId", "fileId", "targetFileExtension")(body)
object_name = f"{dossier_id}/{file_id}.{target_file_extension}"
return object_name
def get_response_object_name(body):
dossier_id, file_id, response_file_extension = itemgetter("dossierId", "fileId", "responseFileExtension")(body)
object_name = f"{dossier_id}/{file_id}.{response_file_extension}"
return object_name
def get_object_descriptor(body):
return {"bucket_name": parse_disjunction_string(CONFIG.storage.bucket), "object_name": get_object_name(body)}
def get_response_object_descriptor(body):
return {
"bucket_name": parse_disjunction_string(CONFIG.storage.bucket),
"object_name": get_response_object_name(body),
}
class ResponseStrategy(abc.ABC):
@abc.abstractmethod
def handle_response(self, body):
pass
def __call__(self, body):
return self.handle_response(body)
class StorageStrategy(ResponseStrategy):
def __init__(self, storage):
self.storage = storage
def handle_response(self, body):
self.storage.put_object(**get_response_object_descriptor(body), data=gzip.compress(json.dumps(body).encode()))
body.pop("data")
return body
class ForwardingStrategy(ResponseStrategy):
def handle_response(self, body):
return body
class QueueVisitor:
def __init__(self, storage: Storage, callback: Callable, response_strategy):
self.storage = storage
self.callback = callback
self.response_strategy = response_strategy
def load_data(self, body):
def download():
logging.debug(f"Downloading {object_descriptor}...")
data = self.storage.get_object(**object_descriptor)
logging.debug(f"Downloaded {object_descriptor}.")
return data
object_descriptor = get_object_descriptor(body)
try:
return gzip.decompress(download())
except Exception as err:
logging.warning(f"Loading data from storage failed for {object_descriptor}.")
raise DataLoadingFailure from err
def process_data(self, data, body):
return self.callback({**body, "data": data})
def load_and_process(self, body):
data = self.process_data(self.load_data(body), body)
result_body = {**body, "data": data}
return result_body
def __call__(self, body):
result_body = self.load_and_process(body)
return self.response_strategy(result_body)

View File

@ -0,0 +1 @@
from .visitor import QueueVisitor

View File

@ -0,0 +1,58 @@
import logging
from functools import partial
from funcy import compose
from pyinfra.file_descriptor_manager import FileDescriptorManager
from pyinfra.storage.storage import Storage
from pyinfra.utils.encoding import decompress
from pyinfra.utils.func import flift
logger = logging.getLogger(__name__)
class Downloader:
def __init__(self, storage: Storage, bucket_name, file_descriptor_manager: FileDescriptorManager):
self.storage = storage
self.bucket_name = bucket_name
self.file_descriptor_manager = file_descriptor_manager
def __call__(self, queue_item_body):
return self.download(queue_item_body)
def download(self, queue_item_body):
names_of_relevant_objects = self.get_names_of_objects_by_pattern(queue_item_body)
objects = self.download_and_decompress_object(names_of_relevant_objects)
return objects
def get_names_of_objects_by_pattern(self, queue_item_body):
logger.debug(f"Filtering objects in bucket {self.bucket_name} by pattern...")
names_of_relevant_objects = compose(
list,
self.get_pattern_filter(queue_item_body),
self.get_names_of_all_associated_objects,
)(queue_item_body)
logger.debug(f"Found {len(names_of_relevant_objects)} objects matching filter.")
return names_of_relevant_objects
def download_and_decompress_object(self, object_names):
download = partial(self.storage.get_object, self.bucket_name)
return map(compose(decompress, download), object_names)
def get_names_of_all_associated_objects(self, queue_item_body):
prefix = self.file_descriptor_manager.get_path_prefix(queue_item_body)
# TODO: performance tests for the following situations:
# 1) dossier with very many files
# 2) prefix matches very many files, independent of dossier cardinality
yield from self.storage.get_all_object_names(self.bucket_name, prefix=prefix)
def get_pattern_filter(self, queue_item_body):
file_pattern = self.file_descriptor_manager.build_input_matcher(queue_item_body)
logger.debug(f"Filtering pattern: {file_pattern if len(file_pattern) <= 120 else (file_pattern[:120]+'...')}")
matches_pattern = flift(file_pattern)
return matches_pattern

View File

@ -0,0 +1,14 @@
import abc
from funcy import identity
from pyinfra.utils.func import lift
class ResponseFormatter(abc.ABC):
def __call__(self, message):
return (identity if isinstance(message, dict) else lift)(self.format)(message)
@abc.abstractmethod
def format(self, message):
pass

View File

@ -0,0 +1,13 @@
from funcy import first, omit
from pyinfra.visitor.response_formatter.formatter import ResponseFormatter
class DefaultResponseFormatter(ResponseFormatter):
"""
TODO: Extend via using enums throughout the codebase instead of strings.
See the enum-formatter in image-prediction service for reference.
"""
def format(self, message):
return {**omit(message, ["response_files"]), "responseFile": first(message["response_files"])}

View File

@ -0,0 +1,6 @@
from pyinfra.visitor.response_formatter.formatter import ResponseFormatter
class IdentityResponseFormatter(ResponseFormatter):
def format(self, message):
return message

View File

View File

@ -0,0 +1,14 @@
import abc
class BlobParsingStrategy(abc.ABC):
@abc.abstractmethod
def parse(self, data: bytes):
pass
@abc.abstractmethod
def parse_and_wrap(self, data: bytes):
pass
def __call__(self, data: bytes):
return self.parse_and_wrap(data)

View File

@ -0,0 +1,20 @@
from typing import Union
from pyinfra.parser.parser_composer import EitherParserComposer
from pyinfra.parser.parsers.identity import IdentityBlobParser
from pyinfra.parser.parsers.json import JsonBlobParser
from pyinfra.parser.parsers.string import StringBlobParser
from pyinfra.visitor.strategies.blob_parsing.blob_parsing import BlobParsingStrategy
# TODO: Each analysis service should specify a custom parsing strategy for the type of data it expects to be found
# on the storage. This class is only a temporary trial-and-error->fallback type of solution.
class DynamicParsingStrategy(BlobParsingStrategy):
def __init__(self):
self.parser = EitherParserComposer(JsonBlobParser(), StringBlobParser(), IdentityBlobParser())
def parse(self, data: bytes) -> Union[bytes, dict]:
return self.parser(data)
def parse_and_wrap(self, data):
return self.parse(data)

View File

@ -0,0 +1,92 @@
import abc
from collections import deque
from typing import Callable
from funcy import omit, filter, first, lpluck, identity
from more_itertools import peekable
from pyinfra.file_descriptor_manager import FileDescriptorManager
from pyinfra.server.nothing import Nothing, is_not_nothing
from pyinfra.utils.encoding import pack_analysis_payload
from pyinfra.visitor.strategies.response.response import ResponseStrategy
class UploadFormatter(abc.ABC):
@abc.abstractmethod
def format(self, items):
raise NotImplementedError
def __call__(self, items):
return self.format(items)
class ProjectingUploadFormatter(UploadFormatter):
def format(self, items):
head = first(items)
if head["data"]:
assert len(items) == 1
return head
else:
items = lpluck("metadata", items)
return items
class AggregationStorageStrategy(ResponseStrategy):
def __init__(
self,
storage,
file_descriptor_manager: FileDescriptorManager,
merger: Callable = list,
upload_formatter: UploadFormatter = identity,
):
self.storage = storage
self.file_descriptor_manager = file_descriptor_manager
self.merger = merger
self.upload_formatter = upload_formatter
self.buffer = deque()
self.response_files = deque()
def handle_response(self, analysis_response, final=False):
def upload_or_aggregate(analysis_payload):
request_metadata = omit(analysis_response, ["analysis_payloads"])
return self.upload_or_aggregate(analysis_payload, request_metadata, last=not analysis_payloads.peek(False))
analysis_payloads = peekable(analysis_response["analysis_payloads"])
yield from filter(is_not_nothing, map(upload_or_aggregate, analysis_payloads))
def upload_or_aggregate(self, analysis_payload, request_metadata, last=False):
"""analysis_payload : {data: ..., metadata: ...}"""
storage_upload_info = self.file_descriptor_manager.build_storage_upload_info(analysis_payload, request_metadata)
object_descriptor = self.file_descriptor_manager.get_output_object_descriptor(storage_upload_info)
self.add_analysis_payload_to_buffer(analysis_payload)
if analysis_payload["data"] or last:
self.upload_aggregated_items(object_descriptor)
self.response_files.append(object_descriptor["object_name"])
return self.build_response_message(storage_upload_info) if last else Nothing
def add_analysis_payload_to_buffer(self, analysis_payload):
self.buffer.append({**analysis_payload, "metadata": omit(analysis_payload["metadata"], ["id"])})
def upload_aggregated_items(self, object_descriptor):
items_to_upload = self.upload_formatter(self.merge_queue_items())
self.upload_item(items_to_upload, object_descriptor)
def build_response_message(self, storage_upload_info):
response_files = [*self.response_files]
self.response_files.clear()
return {**storage_upload_info, "response_files": response_files}
def upload_item(self, analysis_payload, object_descriptor):
self.storage.put_object(**object_descriptor, data=pack_analysis_payload(analysis_payload))
def merge_queue_items(self):
merged_buffer_content = self.merger(self.buffer)
self.buffer.clear()
return merged_buffer_content

View File

@ -0,0 +1,6 @@
from pyinfra.visitor.strategies.response.response import ResponseStrategy
class ForwardingStrategy(ResponseStrategy):
def handle_response(self, analysis_response):
return analysis_response

View File

@ -0,0 +1,10 @@
import abc
class ResponseStrategy(abc.ABC):
@abc.abstractmethod
def handle_response(self, analysis_response: dict):
pass
def __call__(self, analysis_response: dict):
return self.handle_response(analysis_response)

View File

@ -0,0 +1,33 @@
import json
from operator import itemgetter
from pyinfra.config import parse_disjunction_string, CONFIG
from pyinfra.utils.encoding import compress
from pyinfra.visitor.strategies.response.response import ResponseStrategy
class StorageStrategy(ResponseStrategy):
def __init__(self, storage, response_file_extension="out"):
self.storage = storage
self.response_file_extension = response_file_extension
def handle_response(self, body: dict):
response_object_descriptor = self.get_response_object_descriptor(body)
self.storage.put_object(**response_object_descriptor, data=compress(json.dumps(body).encode()))
body.pop("analysis_payloads")
body["response_files"] = [response_object_descriptor["object_name"]]
return body
def get_response_object_descriptor(self, body):
return {
"bucket_name": parse_disjunction_string(CONFIG.storage.bucket),
"object_name": self.get_response_object_name(body),
}
def get_response_object_name(self, body):
dossier_id, file_id = itemgetter("dossierId", "fileId")(body)
object_name = f"{dossier_id}/{file_id}/.{self.response_file_extension}"
return object_name

51
pyinfra/visitor/utils.py Normal file
View File

@ -0,0 +1,51 @@
import logging
from typing import Dict
from pyinfra.exceptions import InvalidStorageItemFormat
from pyinfra.server.packing import string_to_bytes
logger = logging.getLogger()
def build_file_path(storage_upload_info, folder):
return f"{storage_upload_info['fileId']}" + (f"/{folder}" if folder else "")
def standardize(storage_item) -> Dict:
"""Storage items can be a blob or a blob with metadata. Standardizes to the latter.
Cases:
1) backend upload: data as bytes
2) Some Python service's upload: data as bytes of a json string "{'data': <str>, 'metadata': <dict>}",
where value of key 'data' was encoded with bytes_to_string(...)
Returns:
{"data": bytes, "metadata": dict}
"""
def is_blob_without_metadata(storage_item):
return isinstance(storage_item, bytes)
def is_blob_with_metadata(storage_item: Dict):
return isinstance(storage_item, dict)
if is_blob_without_metadata(storage_item):
return wrap(storage_item)
elif is_blob_with_metadata(storage_item):
validate(storage_item)
return storage_item
else: # Fallback / used for testing with simple data
logger.warning("Encountered storage data in unexpected format.")
assert isinstance(storage_item, str)
return wrap(string_to_bytes(storage_item))
def wrap(data):
return {"data": data, "metadata": {}}
def validate(storage_item):
if not ("data" in storage_item and "metadata" in storage_item):
raise InvalidStorageItemFormat(f"Expected a mapping with keys 'data' and 'metadata', got {storage_item}.")

View File

@ -0,0 +1,84 @@
from typing import Callable
from funcy import lflatten, compose, itervalues, lfilter
from pyinfra.utils.func import lift
from pyinfra.visitor.response_formatter.formatter import ResponseFormatter
from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter
from pyinfra.visitor.strategies.blob_parsing.blob_parsing import BlobParsingStrategy
from pyinfra.visitor.strategies.blob_parsing.dynamic import DynamicParsingStrategy
from pyinfra.visitor.strategies.response.response import ResponseStrategy
from pyinfra.visitor.utils import standardize
class QueueVisitor:
def __init__(
self,
callback: Callable,
data_loader: Callable,
response_strategy: ResponseStrategy,
parsing_strategy: BlobParsingStrategy = None,
response_formatter: ResponseFormatter = None,
):
"""Processes queue messages with a given callback.
Args:
callback: callback to apply
data_loader: loads data specified in message and passes to callback
parsing_strategy: behaviour for interpreting loaded items
response_strategy: behaviour for response production
TODO: merge all dependencies into a single pipeline like: getter -> parser -> processor -> formatter -> putter
Returns:
depends on response strategy
"""
self.callback = callback
self.data_loader = data_loader
self.response_strategy = response_strategy
self.parsing_strategy = parsing_strategy or DynamicParsingStrategy()
self.response_formatter = response_formatter or IdentityResponseFormatter()
def __call__(self, queue_item_body):
analysis_response = compose(
self.response_formatter,
self.response_strategy,
self.load_items_from_storage_and_process_with_callback,
)(queue_item_body)
return analysis_response
def load_items_from_storage_and_process_with_callback(self, queue_item_body):
"""Bundles the result from processing a storage item with the body of the corresponding queue item."""
callback_results = compose(
self.remove_empty_results,
lflatten,
lift(self.get_item_processor(queue_item_body)),
self.load_data,
)(queue_item_body)
return {"analysis_payloads": callback_results, **queue_item_body}
def load_data(self, queue_item_body):
data = compose(
lift(standardize),
lift(self.parsing_strategy),
self.data_loader,
)(queue_item_body)
return data
def get_item_processor(self, queue_item_body):
def process_storage_item(storage_item):
analysis_input = {**storage_item, **queue_item_body}
return self.process_storage_item(analysis_input)
return process_storage_item
@staticmethod
def remove_empty_results(results):
return lfilter(compose(any, itervalues), results)
def process_storage_item(self, data_metadata_pack):
return self.callback(data_metadata_pack)

View File

@ -13,3 +13,9 @@ tqdm==4.62.3
pytest~=7.0.1
funcy==1.17
fpdf==1.7.2
PyMuPDF==1.19.6
more-itertools==8.12.0
numpy==1.22.3
Pillow==9.0.1
prometheus-client==0.13.1
frozendict==2.3.2

View File

@ -1,5 +1,4 @@
import argparse
import gzip
import os
from pathlib import Path
@ -7,6 +6,7 @@ from tqdm import tqdm
from pyinfra.config import CONFIG, parse_disjunction_string
from pyinfra.storage.storages import get_s3_storage
from pyinfra.utils.encoding import compress
def parse_args():
@ -31,7 +31,7 @@ def combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, extension)
def upload_compressed_response(storage, bucket_name, dossier_id, file_id, result) -> None:
data = gzip.compress(result.encode())
data = compress(result.encode())
path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, CONFIG.service.response.extension)
storage.put_object(bucket_name, path_gz, data)
@ -44,7 +44,7 @@ def add_file_compressed(storage, bucket_name, dossier_id, path) -> None:
path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, Path(path).stem, suffix_gz)
with open(path, "rb") as f:
data = gzip.compress(f.read())
data = compress(f.read())
storage.put_object(bucket_name, path_gz, data)

View File

@ -9,8 +9,13 @@ from pyinfra.storage.storages import get_s3_storage
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--bucket_name", "-b", required=True)
parser.add_argument("--analysis_container", "-a", choices=["detr", "ner", "image", "dl_error"], required=True)
parser.add_argument("--bucket_name", "-b")
parser.add_argument(
"--analysis_container",
"-a",
choices=["detr", "ner", "image", "conversion", "extraction", "dl_error", "table_parsing"],
required=True,
)
args = parser.parse_args()
return args
@ -43,20 +48,40 @@ def make_connection() -> pika.BlockingConnection:
return connection
def build_message_bodies(analyse_container_type, bucket_name):
def build_message_bodies(analysis_container, bucket_name):
def update_message(message_dict):
if analyse_container_type == "detr" or analyse_container_type == "image":
message_dict.update({"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "IMAGE_INFO.json.gz"})
if analyse_container_type == "dl_error":
if analysis_container == "detr" or analysis_container == "image":
message_dict.update({"pages": []})
if analysis_container == "conversion":
message_dict.update(
{
"targetFileExtension": "ORIGIN.pdf.gz",
"responseFileExtension": "json.gz",
"operation": "conversion",
"pages": [1, 2, 3],
}
)
if analysis_container == "table_parsing":
message_dict.update(
{
"operation": "table_parsing",
"pages": [1, 2, 3],
}
)
if analysis_container == "extraction":
message_dict.update(
{"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "json.gz", "operation": "extraction"}
)
if analysis_container == "dl_error":
message_dict.update({"targetFileExtension": "no_such_file", "responseFileExtension": "IMAGE_INFO.json.gz"})
if analyse_container_type == "ner":
if analysis_container == "ner":
message_dict.update(
{"targetFileExtension": "TEXT.json.gz", "responseFileExtension": "NER_ENTITIES.json.gz"}
)
return message_dict
storage = get_s3_storage()
for bucket_name, pdf_name in storage.get_all_object_names(bucket_name):
for pdf_name in storage.get_all_object_names(bucket_name):
if "pdf" not in pdf_name:
continue
file_id = pdf_name.split(".")[0]
@ -72,7 +97,9 @@ def main(args):
declare_queue(channel, CONFIG.rabbitmq.queues.input)
declare_queue(channel, CONFIG.rabbitmq.queues.output)
for body in build_message_bodies(args.analysis_container, args.bucket_name):
bucket_name = args.bucket_name or parse_disjunction_string(CONFIG.storage.bucket)
for body in build_message_bodies(args.analysis_container, bucket_name):
channel.basic_publish("", CONFIG.rabbitmq.queues.input, body)
print(f"Put {body} on {CONFIG.rabbitmq.queues.input}")

View File

@ -0,0 +1,37 @@
import argparse
import gzip
import json
from funcy import lmap
from pyinfra.server.packing import string_to_bytes
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("compressed_json_path", help="Path to compressed JSON file")
return parser.parse_args()
def interpret(parsed):
try:
return {**parsed, "data": str(string_to_bytes(parsed["data"]))}
except KeyError:
return parsed
def main(fp):
with open(fp, "rb") as f:
compressed_json_path = f.read()
json_str = gzip.decompress(compressed_json_path)
parsed = json.loads(json_str)
parsed = [parsed] if isinstance(parsed, dict) else parsed
parsed = lmap(interpret, parsed)
print(json.dumps(parsed, indent=2))
if __name__ == "__main__":
args = parse_args()
main(args.compressed_json_path)

View File

@ -1,42 +1,25 @@
import logging
from multiprocessing import Process
import requests
from retry import retry
from pyinfra.utils.retry import retry
from pyinfra.config import CONFIG
from pyinfra.exceptions import AnalysisFailure, ConsumerError
from pyinfra.component_factory import ComponentFactory
from pyinfra.exceptions import ConsumerError
from pyinfra.flask import run_probing_webserver, set_up_probing_webserver
from pyinfra.queue.consumer import Consumer
from pyinfra.queue.queue_manager.pika_queue_manager import PikaQueueManager
from pyinfra.storage.storages import get_storage
from pyinfra.utils.banner import show_banner
from pyinfra.visitor import QueueVisitor, StorageStrategy
logger = logging.getLogger()
def make_callback(analysis_endpoint):
def callback(message):
def perform_operation(operation):
endpoint = f"{analysis_endpoint}/{operation}"
try:
logging.debug(f"Requesting analysis from {endpoint}...")
analysis_response = requests.post(endpoint, data=message["data"])
analysis_response.raise_for_status()
analysis_response = analysis_response.json()
logging.debug(f"Received response.")
return analysis_response
except Exception as err:
logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.")
raise AnalysisFailure() from err
operations = message.get("operations", ["/"])
results = map(perform_operation, operations)
result = dict(zip(operations, results))
if list(result.keys()) == ["/"]:
result = list(result.values())[0]
return result
return callback
@retry(ConsumerError)
def consume():
try:
consumer = ComponentFactory(CONFIG).get_consumer()
consumer.basic_consume_and_publish()
except Exception as err:
logger.exception(err)
raise ConsumerError from err
def main():
@ -46,20 +29,6 @@ def main():
logging.info("Starting webserver...")
webserver.start()
callback = make_callback(CONFIG.rabbitmq.callback.analysis_endpoint)
storage = get_storage(CONFIG.storage.backend)
response_strategy = StorageStrategy(storage)
visitor = QueueVisitor(storage, callback, response_strategy)
@retry(ConsumerError, tries=3, delay=5, jitter=(1, 3))
def consume():
try: # RED-4049 queue manager needs to be in try scope to eventually throw Exception after connection loss.
queue_manager = PikaQueueManager(CONFIG.rabbitmq.queues.input, CONFIG.rabbitmq.queues.output)
consumer = Consumer(visitor, queue_manager)
consumer.basic_consume_and_publish()
except Exception as err:
raise ConsumerError from err
try:
consume()
except KeyboardInterrupt:

View File

@ -1,3 +1,55 @@
service:
response_formatter: identity
operations:
upper:
input:
subdir: ""
extension: up_in.gz
multi: False
output:
subdir: ""
extension: up_out.gz
extract:
input:
subdir: ""
extension: extr_in.gz
multi: False
output:
subdir: "extractions"
extension: gz
rotate:
input:
subdir: ""
extension: rot_in.gz
multi: False
output:
subdir: ""
extension: rot_out.gz
classify:
input:
subdir: ""
extension: cls_in.gz
multi: True
output:
subdir: ""
extension: cls_out.gz
stream_pages:
input:
subdir: ""
extension: pgs_in.gz
multi: False
output:
subdir: "pages"
extension: pgs_out.gz
default:
input:
subdir: ""
extension: IN.gz
multi: False
output:
subdir: ""
extension: OUT.gz
storage:
minio:
endpoint: "http://127.0.0.1:9000"
@ -21,5 +73,7 @@ webserver:
port: $SERVER_PORT|5000 # webserver port
mode: $SERVER_MODE|production # webserver mode: {development, production}
mock_analysis_endpoint: "http://127.0.0.1:5000"
mock_analysis_endpoint: "http://127.0.0.1:5000"
use_docker_fixture: 1
logging: 0

View File

@ -1,11 +1,20 @@
import json
from pyinfra.config import CONFIG as MAIN_CONFIG
MAIN_CONFIG["retry"]["delay"] = 0.1
MAIN_CONFIG["retry"]["jitter"] = (0.1, 0.2)
from pyinfra.default_objects import get_component_factory
from test.config import CONFIG as TEST_CONFIG
import logging
import time
from unittest.mock import Mock
import pika
import pytest
import testcontainers.compose
from testcontainers.compose import DockerCompose
from pyinfra.exceptions import UnknownClient
from pyinfra.locations import TEST_DIR, COMPOSE_PATH
@ -16,14 +25,36 @@ from pyinfra.storage.adapters.s3 import S3StorageAdapter
from pyinfra.storage.clients.azure import get_azure_client
from pyinfra.storage.clients.s3 import get_s3_client
from pyinfra.storage.storage import Storage
from test.config import CONFIG
from pyinfra.visitor import QueueVisitor
from pyinfra.visitor.strategies.response.forwarding import ForwardingStrategy
from pyinfra.visitor.strategies.response.storage import StorageStrategy
from test.queue.queue_manager_mock import QueueManagerMock
from test.storage.adapter_mock import StorageAdapterMock
from test.storage.client_mock import StorageClientMock
from pyinfra.visitor import StorageStrategy, ForwardingStrategy, QueueVisitor
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logging.basicConfig()
logger = logging.getLogger()
# TODO: refactor all fixtures into cleanly separated modules
pytest_plugins = [
"test.fixtures.consumer",
"test.fixtures.input",
"test.fixtures.pdf",
"test.fixtures.server",
"test.integration_tests.serve_test",
]
logging.getLogger("PIL.PngImagePlugin").setLevel(level=logging.CRITICAL + 1)
logging.getLogger("waitress").setLevel(level=logging.CRITICAL + 1)
@pytest.fixture(autouse=True)
def mute_logger():
if not TEST_CONFIG.logging:
logger.setLevel(logging.CRITICAL + 1)
@pytest.fixture(scope="session")
@ -58,10 +89,14 @@ def mock_make_load_data():
return load_data
@pytest.fixture(params=["minio", "aws"], scope="session")
def storage(client_name, bucket_name, request):
# def pytest_make_parametrize_id(config, val, argname):
# return f"\n\t{argname}={val}\n"
@pytest.fixture
def storage(client_name, bucket_name, s3_backend, docker_compose):
logger.debug("Setup for storage")
storage = Storage(get_adapter(client_name, request.param))
storage = Storage(get_adapter(client_name, s3_backend))
storage.make_bucket(bucket_name)
storage.clear_bucket(bucket_name)
yield storage
@ -69,15 +104,23 @@ def storage(client_name, bucket_name, request):
storage.clear_bucket(bucket_name)
@pytest.fixture(params=["minio", "aws"])
def s3_backend(request):
return request.param
@pytest.fixture(scope="session", autouse=True)
def docker_compose(sleep_seconds=30):
logger.info(f"Starting docker containers with {COMPOSE_PATH}/docker-compose.yml...")
compose = testcontainers.compose.DockerCompose(COMPOSE_PATH, compose_file_name="docker-compose.yml")
compose.start()
logger.info(f"Sleeping for {sleep_seconds} seconds to wait for containers to finish startup... ")
time.sleep(sleep_seconds)
yield compose
compose.stop()
if TEST_CONFIG.use_docker_fixture:
logger.info(f"Starting docker containers with {COMPOSE_PATH}/docker-compose.yml...")
compose = DockerCompose(COMPOSE_PATH, compose_file_name="docker-compose.yml")
compose.start()
logger.info(f"Sleeping for {sleep_seconds} seconds to wait for containers to finish startup... ")
time.sleep(sleep_seconds)
yield compose
compose.stop()
else:
yield None
def get_pika_connection_params():
@ -86,7 +129,7 @@ def get_pika_connection_params():
def get_s3_params(s3_backend):
params = CONFIG.storage[s3_backend]
params = TEST_CONFIG.storage[s3_backend]
return params
@ -95,7 +138,7 @@ def get_adapter(client_name, s3_backend):
if client_name == "mock":
return StorageAdapterMock(StorageClientMock())
if client_name == "azure":
return AzureStorageAdapter(get_azure_client(CONFIG.storage.azure.connection_string))
return AzureStorageAdapter(get_azure_client(TEST_CONFIG.storage.azure.connection_string))
if client_name == "s3":
return S3StorageAdapter(get_s3_client(get_s3_params(s3_backend)))
else:
@ -110,7 +153,7 @@ def get_queue_manager(queue_manager_name) -> QueueManager:
@pytest.fixture(scope="session")
def queue_manager(queue_manager_name):
def queue_manager(queue_manager_name, docker_compose):
def close_connections():
if queue_manager_name == "pika":
try:
@ -134,7 +177,7 @@ def queue_manager(queue_manager_name):
@pytest.fixture(scope="session")
def callback():
def inner(request):
return request["data"].decode() * 2
return [request["data"].decode() * 2]
return inner
@ -156,5 +199,22 @@ def response_strategy(response_strategy_name, storage):
@pytest.fixture()
def visitor(storage, analysis_callback, response_strategy):
return QueueVisitor(storage, analysis_callback, response_strategy)
def visitor(storage, analysis_callback, response_strategy, component_factory):
return QueueVisitor(
callback=analysis_callback,
data_loader=component_factory.get_downloader(storage),
response_strategy=response_strategy,
)
@pytest.fixture
def file_descriptor_manager(component_factory):
return component_factory.get_file_descriptor_manager()
@pytest.fixture
def component_factory():
MAIN_CONFIG["service"]["operations"] = TEST_CONFIG.service.operations
return get_component_factory(MAIN_CONFIG)

View File

@ -0,0 +1,12 @@
# import pytest
# from funcy import lmap
#
# from pyinfra.server.rest import process_lazily
# from pyinfra.server.utils import unpack
#
#
# @pytest.mark.parametrize("batched", [True, False])
# @pytest.mark.parametrize("item_type", ["string", "image", "pdf"])
# def test_pickup_endpoint(url, input_data_items, metadata, operation, targets, server_process):
# output = lmap(unpack, process_lazily(f"{url}/submit", input_data_items, metadata))
# assert output == targets

View File

@ -0,0 +1,32 @@
from functools import partial
from funcy import chunks, first, compose
def test_repeated_first_chunk_consumption():
def f(chunk):
return sum(chunk)
def g():
return f(first(chunks(3, items)))
items = iter(range(10))
assert g() == 3
assert g() == 12
assert g() == 21
assert g() == 9
def test_repeated_first_chunk_consumption_passing():
def f(chunk):
return sum(chunk)
g = compose(f, first, partial(chunks, 3))
items = iter(range(10))
assert g(items) == 3
assert g(items) == 12
assert g(items) == 21
assert g(items) == 9

0
test/fixtures/__init__.py vendored Normal file
View File

37
test/fixtures/consumer.py vendored Normal file
View File

@ -0,0 +1,37 @@
from operator import itemgetter
from typing import Iterable
import pytest
from pyinfra.queue.consumer import Consumer
@pytest.fixture(scope="session")
def consumer(queue_manager, callback):
return Consumer(callback, queue_manager)
@pytest.fixture(scope="session")
def access_callback():
return itemgetter("fileId")
@pytest.fixture()
def items():
numbers = [f"{i}".encode() for i in range(3)]
return pair_data_with_queue_message(numbers)
def pair_data_with_queue_message(data: Iterable[bytes]):
def inner():
for i, d in enumerate(data):
body = {
"dossierId": "dossier_id",
"fileId": f"file_id_{i}",
"targetFileExtension": "in.gz",
"responseFileExtension": "out.gz",
"pages": [0, 2, 3],
}
yield d, body
return list(inner())

165
test/fixtures/input.py vendored Normal file
View File

@ -0,0 +1,165 @@
from functools import partial
from itertools import starmap, repeat
import numpy as np
import pytest
from PIL import Image
from funcy import lmap, compose, flatten, lflatten, omit, second, first, lzip, merge
from pyinfra.server.normalization import normalize_item
from pyinfra.server.nothing import Nothing
from pyinfra.server.packing import pack, unpack
from pyinfra.utils.func import star, lift, lstarlift
from test.utils.image import image_to_bytes
from test.utils.pdf import pdf_stream
@pytest.fixture
def input_data_items(unencoded_input_data, input_data_encoder):
return input_data_encoder(unencoded_input_data)
@pytest.fixture
def unencoded_input_data(item_type, unencoded_strings, unencoded_images, unencoded_pdfs):
if item_type == "string":
return unencoded_strings
elif item_type == "image":
return unencoded_images
elif item_type == "pdf":
return unencoded_pdfs
else:
raise ValueError(f"Unknown item type {item_type}")
@pytest.fixture
def input_data_encoder(item_type):
if item_type == "string":
return strings_to_bytes
elif item_type == "image":
return images_to_bytes
elif item_type == "pdf":
return pdfs_to_bytes
else:
raise ValueError(f"Unknown item type {item_type}")
@pytest.fixture
def unencoded_pdfs(n_items, unencoded_pdf):
return [unencoded_pdf] * n_items
def pdfs_to_bytes(unencoded_pdfs):
return [pdf_stream(pdf) for pdf in unencoded_pdfs]
@pytest.fixture
def target_data_items(input_data_items, core_operation, metadata):
if core_operation is Nothing:
return Nothing
op = compose(normalize_item, core_operation)
expected = lflatten(starmap(op, zip(input_data_items, metadata)))
return expected
@pytest.fixture
def unencoded_strings(n_items):
return [f"content{i}" for i in range(n_items)]
def strings_to_bytes(strings):
return [bytes(s, encoding="utf8") for s in strings]
@pytest.fixture
def targets(data_message_pairs, input_data_items, operation, metadata, server_side_test, queue_message_metadata):
"""TODO: this has become super wonky"""
metadata = [{**m1, **m2} for m1, m2 in zip(lmap(second, data_message_pairs), metadata)]
if operation is Nothing:
return Nothing
op = compose(lift(star(pack)), normalize_item, operation)
try:
response_data, response_metadata = zip(*map(unpack, flatten(starmap(op, zip(input_data_items, metadata)))))
queue_message_keys = ["id"] * (not server_side_test) + [*first(queue_message_metadata).keys()]
response_metadata = lmap(partial(omit, keys=queue_message_keys), response_metadata)
expected = lzip(response_data, response_metadata)
except ValueError:
expected = []
return expected
@pytest.fixture
def endpoint(url, operation_name):
return f"{url}/{operation_name}"
@pytest.fixture(params=["rest", "basic"])
def client_pipeline_type(request):
return request.param
@pytest.fixture(params=[1, 0, 5])
def n_items(request):
return request.param
@pytest.fixture(params=[0, 100])
def n_pages(request):
return request.param
@pytest.fixture(params=[1, 5])
def buffer_size(request):
return request.param
def array_to_image(array) -> Image.Image:
return Image.fromarray(np.uint8(array * 255), mode="RGB")
def input_batch(n_items):
return np.random.random_sample(size=(n_items, 3, 30, 30))
@pytest.fixture
def unencoded_images(n_items):
return lmap(array_to_image, input_batch(n_items))
def images_to_bytes(images):
return lmap(image_to_bytes, images)
@pytest.fixture
def metadata(n_items, many_to_n):
"""storage metadata
TODO: rename
"""
return list(repeat({"key": "value"}, times=n_items))
@pytest.fixture
def queue_message_metadata(n_items, operation_name):
def metadata(i):
return merge(
{
"dossierId": "dossier_id",
"fileId": f"file_id_{i}",
},
({"operation": operation_name} if operation_name else {}),
({"pages": [0, 2, 3]} if n_items > 1 else {}),
)
return lmap(metadata, range(n_items))
@pytest.fixture
def packages(input_data_items, metadata):
return lstarlift(pack)(zip(input_data_items, metadata))

11
test/fixtures/pdf.py vendored Normal file
View File

@ -0,0 +1,11 @@
import fpdf
import pytest
@pytest.fixture
def unencoded_pdf(n_pages):
pdf = fpdf.FPDF(unit="pt")
for _ in range(n_pages):
pdf.add_page()
return pdf

Some files were not shown because too many files have changed in this diff Show More