Compare commits
266 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a1bfec765c | ||
|
|
01bfb1d668 | ||
|
|
94254e1681 | ||
|
|
0d87c60fce | ||
|
|
2dff7d62aa | ||
|
|
e9424aee04 | ||
|
|
9d73f42982 | ||
|
|
4aef3316a3 | ||
|
|
41172d6abb | ||
|
|
71af6f703b | ||
|
|
a5ff59069a | ||
|
|
965d79b08f | ||
|
|
ca6a2f8d32 | ||
|
|
86eb3a6f7e | ||
|
|
87cf1ad189 | ||
|
|
7865a767c7 | ||
|
|
3897e44378 | ||
|
|
1558398c56 | ||
|
|
8537d4af50 | ||
|
|
116c2b8924 | ||
|
|
45f04590cc | ||
|
|
bb729b6c26 | ||
|
|
24be8d3f13 | ||
|
|
147416bfad | ||
|
|
c8fb15b9f7 | ||
|
|
771df7c78d | ||
|
|
f9972a95a7 | ||
|
|
83e1b5f029 | ||
|
|
c1b5cbeb51 | ||
|
|
4fcc89f938 | ||
|
|
d1242aee6c | ||
|
|
0442ecd7b3 | ||
|
|
e64ade3135 | ||
|
|
ace919d078 | ||
|
|
d179fdede6 | ||
|
|
9b975b759b | ||
|
|
c033d98acd | ||
|
|
bb7e631f91 | ||
|
|
d8b5be9e72 | ||
|
|
2954bbc1ad | ||
|
|
a69f613fe6 | ||
|
|
fa3b08aef5 | ||
|
|
14ab23b2cc | ||
|
|
8a64e5d868 | ||
|
|
051cea3ded | ||
|
|
40bc8c2c2c | ||
|
|
9962651d88 | ||
|
|
249c6203b2 | ||
|
|
3a3c497383 | ||
|
|
13b6388e5a | ||
|
|
1940b974b1 | ||
|
|
7e46a66698 | ||
|
|
5b45f5fa15 | ||
|
|
e43504f08d | ||
|
|
bffaa0786e | ||
|
|
8d209d63c7 | ||
|
|
0dee98b23d | ||
|
|
91701929e5 | ||
|
|
c55e41f2d8 | ||
|
|
f718b2f7ef | ||
|
|
730bdfb220 | ||
|
|
e48fa85784 | ||
|
|
6e5af4092e | ||
|
|
ea2d3223fb | ||
|
|
26573eeda3 | ||
|
|
7730950b50 | ||
|
|
9a47388017 | ||
|
|
8a2b60a8f5 | ||
|
|
9232385dea | ||
|
|
eb81e96400 | ||
|
|
e7ee0cda42 | ||
|
|
90f8f9da36 | ||
|
|
c2d7127a84 | ||
|
|
bfe8bbb8cb | ||
|
|
ecff50ae7c | ||
|
|
7a1b215d69 | ||
|
|
01ce914417 | ||
|
|
2b72174605 | ||
|
|
586871a26f | ||
|
|
187055e5eb | ||
|
|
3046b4dc26 | ||
|
|
dd591bd24b | ||
|
|
12fa52f38c | ||
|
|
043fa1ee53 | ||
|
|
1fa6bbdbc6 | ||
|
|
93747d0f63 | ||
|
|
dc4f578e94 | ||
|
|
93da0d12bb | ||
|
|
a688cbd7bd | ||
|
|
0104395790 | ||
|
|
18a9683ddb | ||
|
|
ae2509dc59 | ||
|
|
bf9f6ba8e2 | ||
|
|
868a53b23f | ||
|
|
2d1ec16714 | ||
|
|
9e2ed6a9f9 | ||
|
|
ab56c9a173 | ||
|
|
298d8d3e2c | ||
|
|
0842ec0d91 | ||
|
|
c944cdb1a7 | ||
|
|
7b998cdaf6 | ||
|
|
426967ee46 | ||
|
|
54ca81d577 | ||
|
|
13888524fb | ||
|
|
a7ffaeb18f | ||
|
|
c97393f690 | ||
|
|
7ff466e0ea | ||
|
|
02b0009219 | ||
|
|
cf13f67394 | ||
|
|
0ab86206ec | ||
|
|
35542f994c | ||
|
|
fb712af7c6 | ||
|
|
6cb13051eb | ||
|
|
456cb4157d | ||
|
|
6945760045 | ||
|
|
47f1d77c03 | ||
|
|
fb325ce43d | ||
|
|
5590669939 | ||
|
|
9c262e7138 | ||
|
|
e5a4e7e994 | ||
|
|
89f562aa71 | ||
|
|
1074f44b30 | ||
|
|
96bf831b00 | ||
|
|
5d2b71d647 | ||
|
|
d12124b2d5 | ||
|
|
7adbdefb4e | ||
|
|
a1c292a485 | ||
|
|
948575d199 | ||
|
|
2070f300c9 | ||
|
|
092a0e2964 | ||
|
|
40777ae609 | ||
|
|
2434e0ea55 | ||
|
|
08ad83b6a5 | ||
|
|
9870aa38d1 | ||
|
|
3b7605772e | ||
|
|
1acf16dc91 | ||
|
|
c09e5df23e | ||
|
|
bfdce62ccf | ||
|
|
1552cd10cc | ||
|
|
8b0c2d4e07 | ||
|
|
461c0fe6a6 | ||
|
|
9b5fc4ff77 | ||
|
|
e3793e5c7c | ||
|
|
1a04dfb426 | ||
|
|
e151d2005b | ||
|
|
da2572b8be | ||
|
|
eb8ace4ddd | ||
|
|
096068367f | ||
|
|
7bd35dce67 | ||
|
|
1eb4dbc657 | ||
|
|
ccf7a7379d | ||
|
|
b1a318872e | ||
|
|
abc56e6d9f | ||
|
|
c8579a8ad0 | ||
|
|
c68e19e6e4 | ||
|
|
de0deaa2f4 | ||
|
|
83ce7692e6 | ||
|
|
949413af4a | ||
|
|
3dba896038 | ||
|
|
453281d48d | ||
|
|
5b913983eb | ||
|
|
c2ed6d78b7 | ||
|
|
f29bd7d4d3 | ||
|
|
ec620abf54 | ||
|
|
5b30a32fff | ||
|
|
c092e7bcab | ||
|
|
1d09337378 | ||
|
|
8c1ad64464 | ||
|
|
1e21913e37 | ||
|
|
132a1a1b50 | ||
|
|
f99d779c29 | ||
|
|
f428372511 | ||
|
|
54359501f9 | ||
|
|
cbba561116 | ||
|
|
1daaf2b904 | ||
|
|
866df2dee3 | ||
|
|
962b398a9c | ||
|
|
3ae4fd8986 | ||
|
|
98af600787 | ||
|
|
ec650464a8 | ||
|
|
ed69011bf6 | ||
|
|
0fc3db2fae | ||
|
|
53eee983c4 | ||
|
|
5760a6f354 | ||
|
|
dca3eaaa54 | ||
|
|
1b04c46853 | ||
|
|
68c24c863f | ||
|
|
4a3ac150cf | ||
|
|
373c38113f | ||
|
|
b58a9d11c3 | ||
|
|
bd5fe82e06 | ||
|
|
b62652957a | ||
|
|
7a3bb9334b | ||
|
|
7ccad043f4 | ||
|
|
4fd6a5aa2a | ||
|
|
a7a44267f1 | ||
|
|
685edaa62f | ||
|
|
82d7b7f8cb | ||
|
|
d4ffd75e26 | ||
|
|
7a1db32c3b | ||
|
|
456fb1db06 | ||
|
|
e221b00933 | ||
|
|
24313241a8 | ||
|
|
ef0e805223 | ||
|
|
531ff8d3e0 | ||
|
|
14d83abd72 | ||
|
|
00ea224379 | ||
|
|
625552ec7c | ||
|
|
8afd87e44f | ||
|
|
1d70fb628e | ||
|
|
f32004c3a4 | ||
|
|
a301876ab9 | ||
|
|
35045128b4 | ||
|
|
463e1b2024 | ||
|
|
630ed51b27 | ||
|
|
a4079f6710 | ||
|
|
cf0a877569 | ||
|
|
7d8659f257 | ||
|
|
51a6bf9875 | ||
|
|
85d7ad52dc | ||
|
|
c00973b676 | ||
|
|
16fa992cae | ||
|
|
c315247625 | ||
|
|
29fb0dda30 | ||
|
|
ae39ccc8e2 | ||
|
|
8575567890 | ||
|
|
92190a42f0 | ||
|
|
ea9b405d2a | ||
|
|
77f23a2185 | ||
|
|
3172a00aaa | ||
|
|
501f0bd5fc | ||
|
|
fd57261631 | ||
|
|
23070f3480 | ||
|
|
2550a0eff2 | ||
|
|
f053a072d6 | ||
|
|
00276dbcc7 | ||
|
|
bc0e9ed643 | ||
|
|
940bc3a689 | ||
|
|
a999ce2c3b | ||
|
|
e8315ffea9 | ||
|
|
087b5af929 | ||
|
|
f47d458217 | ||
|
|
0d503d1c1d | ||
|
|
3b0d0868b9 | ||
|
|
da84ff5112 | ||
|
|
c9f26000d7 | ||
|
|
67c4bac4b7 | ||
|
|
ab5839a126 | ||
|
|
fa4e5e5e0e | ||
|
|
e58addf8c4 | ||
|
|
00e21b00ba | ||
|
|
fabc78efce | ||
|
|
90af62ed2c | ||
|
|
2af648254e | ||
|
|
9e8172427c | ||
|
|
e903c69a07 | ||
|
|
7419612c21 | ||
|
|
656bc7cd63 | ||
|
|
f6ca9c9ac5 | ||
|
|
5a948ef7ad | ||
|
|
4078b3e4ec | ||
|
|
b7882d4452 | ||
|
|
01daa634ec | ||
|
|
64624a3fd3 | ||
|
|
afd67d87a6 | ||
|
|
37881da08e |
@ -36,8 +36,6 @@ A configuration is located in `/config.yaml`. All relevant variables can be conf
|
||||
{
|
||||
"dossierId": "",
|
||||
"fileId": "",
|
||||
"targetFileExtension": "",
|
||||
"responseFileExtension": ""
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
52
config.yaml
52
config.yaml
@ -1,5 +1,52 @@
|
||||
service:
|
||||
logging_level: $LOGGING_LEVEL_ROOT|DEBUG # Logging level for service logger
|
||||
name: $SERVICE_NAME|research # Default service name for research service, used for prometheus metric name
|
||||
response_formatter: default # formats analysis payloads of response messages
|
||||
upload_formatter: projecting # formats analysis payloads of objects uploaded to storage
|
||||
# Note: This is not really the right place for this. It should be configured on a per-service basis.
|
||||
operation: $OPERATION|default
|
||||
# operation needs to be specified in deployment config for services that are called without an operation specified
|
||||
operations:
|
||||
conversion:
|
||||
input:
|
||||
multi: False
|
||||
subdir: ""
|
||||
extension: ORIGIN.pdf.gz
|
||||
output:
|
||||
subdir: "pages_as_images"
|
||||
extension: json.gz
|
||||
extraction:
|
||||
input:
|
||||
multi: False
|
||||
subdir: ""
|
||||
extension: ORIGIN.pdf.gz
|
||||
output:
|
||||
subdir: "extracted_images"
|
||||
extension: json.gz
|
||||
table_parsing:
|
||||
input:
|
||||
multi: True
|
||||
subdir: "pages_as_images"
|
||||
extension: json.gz
|
||||
output:
|
||||
subdir: "table_parses"
|
||||
extension: json.gz
|
||||
image_classification:
|
||||
input:
|
||||
multi: True
|
||||
subdir: "extracted_images"
|
||||
extension: json.gz
|
||||
output:
|
||||
subdir: ""
|
||||
extension: IMAGE_INFO.json.gz
|
||||
default:
|
||||
input:
|
||||
multi: False
|
||||
subdir: ""
|
||||
extension: in.gz
|
||||
output:
|
||||
subdir: ""
|
||||
extension: out.gz
|
||||
|
||||
probing_webserver:
|
||||
host: $PROBING_WEBSERVER_HOST|"0.0.0.0" # Probe webserver address
|
||||
@ -33,3 +80,8 @@ storage:
|
||||
|
||||
azure:
|
||||
connection_string: $STORAGE_AZURECONNECTIONSTRING|"DefaultEndpointsProtocol=https;AccountName=iqserdevelopment;AccountKey=4imAbV9PYXaztSOMpIyAClg88bAZCXuXMGJG0GA1eIBpdh2PlnFGoRBnKqLy2YZUSTmZ3wJfC7tzfHtuC6FEhQ==;EndpointSuffix=core.windows.net"
|
||||
|
||||
retry:
|
||||
tries: 3
|
||||
delay: 5
|
||||
jitter: [1, 3]
|
||||
|
||||
76
doc/signatures.txt
Normal file
76
doc/signatures.txt
Normal file
@ -0,0 +1,76 @@
|
||||
Processing service interface
|
||||
|
||||
image classification now : JSON (Mdat PDF) -> (Data PDF -> JSON [Mdat ImObj]
|
||||
image classification future: JSON [Mdat FunkIm] | Mdat PDF -> (Data [FunkIm] -> JSON [Mdat FunkIm])
|
||||
object detection : JSON [Mdat PagIm] | Mdat PDF -> (Data [PagIm] -> JSON [[Mdat SemIm]])
|
||||
NER : JSON [Mdat Dict] -> (Data [Dict] -> JSON [Mdat])
|
||||
table parsing : JSON [Mdat FunkIm] | Mdat PDF -> (Data [PagIm] -> JSON [[Mdat FunkIm]])
|
||||
pdf2image : Mdat (fn, [Int], PDF) -> (JSON ([Int], Data PDF) -> [(FunkIm, Mdat)])
|
||||
|
||||
|
||||
image classification now : Mdat (fn, [Int], file) -> (Data PDF -> JSON [Mdat ImObj]
|
||||
image classification future: Mdat (fn, [Int], dir) -> (Data [FunkIm] -> JSON [Mdat FunkIm])
|
||||
object detection : Mdat (fn, [Int], dir) -> (Data [PagIm] -> JSON [[Mdat SemIm]])
|
||||
table parsing : Mdat (fn, [Int], dir) -> (Data [PagIm] -> JSON [[Mdat FunkIm]])
|
||||
NER : Mdat (fn, [Int], file) -> (Data [Dict] -> JSON [Mdat])
|
||||
pdf2image : Mdat (fn, [Int], file) -> (JSON ([Int], Data PDF) -> [(FunkIm, Mdat)])
|
||||
|
||||
|
||||
from funcy import identity
|
||||
|
||||
access(mdat):
|
||||
if mdat.path is file:
|
||||
request = {"data": load(mdat.path), "metadata": mdat}
|
||||
elif mdat.path is dir:
|
||||
get_indexed = identity if not mdat.idx else itemgetter(*mdat.idx)
|
||||
request = {"data": get_indexed(get_files(mdat.path)), "metadata": mdat}
|
||||
else:
|
||||
raise BadRequest
|
||||
|
||||
|
||||
storage:
|
||||
|
||||
fileId: {
|
||||
pages: [PagIm]
|
||||
images: [FunkIm]
|
||||
sections: gz
|
||||
}
|
||||
|
||||
|
||||
---------------
|
||||
|
||||
|
||||
|
||||
assert if targetPath is file then response list must be singleton
|
||||
{index: [], dir: fileID.pdf.gz, targetPath: fileID.images.json.gz} -> [{data: pdf bytes, metadata: request: ...] -> [{data: null, metadata: request: null, response: {classification infos: ...}]
|
||||
image classification now : Mdat (fn, [Int], file) -> [JSON (Data PDF, Mdat)] -> [JSON (Data null, Mdat [ImObj])] | 1 -> 1
|
||||
assert if targetPath is file then response list must be singleton
|
||||
{index: [], dir: fileID/images, targetPath: fileID.images.json.gz} -> [{data: image bytes, metadata: request: {image location...}] -> [{data: null, metadata: request: null, response: {classification infos: ...}]
|
||||
image classification future: Mdat (fn, [Int], dir) -> JSON (Data [FunkIm], Mdat) -> [JSON (Data null, Mdat [FunkIm])] |
|
||||
object detection : Mdat (fn, [Int], dir) -> (Data [PagIm] -> JSON [[Mdat SemIm]])
|
||||
table parsing : Mdat (fn, [Int], dir) -> (Data [PagIm] -> JSON [[Mdat FunkIm]])
|
||||
NER : Mdat (fn, [Int], file) -> (Data [Dict] -> JSON [Mdat])
|
||||
pdf2image : Mdat (fn, [Int], file) -> (JSON ([Int], Data PDF) -> [(FunkIm, Mdat)])
|
||||
|
||||
aggregate <==> targetpath is file and index is empty
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
version: '2'
|
||||
services:
|
||||
minio:
|
||||
image: minio/minio
|
||||
image: minio/minio:RELEASE.2022-06-11T19-55-32Z
|
||||
ports:
|
||||
- "9000:9000"
|
||||
environment:
|
||||
@ -12,7 +12,7 @@ services:
|
||||
command: server /data
|
||||
network_mode: "bridge"
|
||||
rabbitmq:
|
||||
image: docker.io/bitnami/rabbitmq:3.9
|
||||
image: docker.io/bitnami/rabbitmq:3.9.8
|
||||
ports:
|
||||
- '4369:4369'
|
||||
- '5551:5551'
|
||||
|
||||
65
pyinfra/callback.py
Normal file
65
pyinfra/callback.py
Normal file
@ -0,0 +1,65 @@
|
||||
import logging
|
||||
|
||||
from funcy import merge, omit, lmap
|
||||
|
||||
from pyinfra.exceptions import AnalysisFailure
|
||||
from pyinfra.pipeline_factory import CachedPipelineFactory
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Callback:
|
||||
"""This is the callback that is applied to items pulled from the storage. It forwards these items to an analysis
|
||||
endpoint.
|
||||
"""
|
||||
|
||||
def __init__(self, pipeline_factory: CachedPipelineFactory):
|
||||
self.pipeline_factory = pipeline_factory
|
||||
|
||||
def __get_pipeline(self, endpoint):
|
||||
return self.pipeline_factory.get_pipeline(endpoint)
|
||||
|
||||
@staticmethod
|
||||
def __run_pipeline(pipeline, analysis_input: dict):
|
||||
"""
|
||||
TODO: Since data and metadata are passed as singletons, there is no buffering and hence no batching happening
|
||||
within the pipeline. However, the queue acknowledgment logic needs to be changed in order to facilitate
|
||||
passing non-singletons, to only ack a message, once a response is pulled from the output queue of the
|
||||
pipeline. Probably the pipeline return value needs to contains the queue message frame (or so), in order for
|
||||
the queue manager to tell which message to ack.
|
||||
|
||||
TODO: casting list (lmap) on `analysis_response_stream` is a temporary solution, while the client pipeline
|
||||
operates on singletons ([data], [metadata]).
|
||||
"""
|
||||
|
||||
def combine_storage_item_metadata_with_queue_message_metadata(analysis_input):
|
||||
return merge(analysis_input["metadata"], omit(analysis_input, ["data", "metadata"]))
|
||||
|
||||
def remove_queue_message_metadata(analysis_result):
|
||||
metadata = omit(analysis_result["metadata"], queue_message_keys(analysis_input))
|
||||
return {**analysis_result, "metadata": metadata}
|
||||
|
||||
def queue_message_keys(analysis_input):
|
||||
return {*analysis_input.keys()}.difference({"data", "metadata"})
|
||||
|
||||
try:
|
||||
data = analysis_input["data"]
|
||||
metadata = combine_storage_item_metadata_with_queue_message_metadata(analysis_input)
|
||||
analysis_response_stream = pipeline([data], [metadata])
|
||||
analysis_response_stream = lmap(remove_queue_message_metadata, analysis_response_stream)
|
||||
return analysis_response_stream
|
||||
|
||||
except Exception as err:
|
||||
logger.error(err)
|
||||
raise AnalysisFailure from err
|
||||
|
||||
def __call__(self, analysis_input: dict):
|
||||
"""data_metadata_pack: {'dossierId': ..., 'fileId': ..., 'pages': ..., 'operation': ...}"""
|
||||
operation = analysis_input.get("operation", "")
|
||||
pipeline = self.__get_pipeline(operation)
|
||||
|
||||
try:
|
||||
logging.debug(f"Requesting analysis for operation '{operation}'...")
|
||||
return self.__run_pipeline(pipeline, analysis_input)
|
||||
except AnalysisFailure:
|
||||
logging.warning(f"Exception caught when calling analysis endpoint for operation '{operation}'.")
|
||||
120
pyinfra/component_factory.py
Normal file
120
pyinfra/component_factory.py
Normal file
@ -0,0 +1,120 @@
|
||||
import logging
|
||||
from functools import lru_cache
|
||||
|
||||
from funcy import project, identity, rcompose
|
||||
|
||||
from pyinfra.callback import Callback
|
||||
from pyinfra.config import parse_disjunction_string
|
||||
from pyinfra.file_descriptor_builder import RedFileDescriptorBuilder
|
||||
from pyinfra.file_descriptor_manager import FileDescriptorManager
|
||||
from pyinfra.pipeline_factory import CachedPipelineFactory
|
||||
from pyinfra.queue.consumer import Consumer
|
||||
from pyinfra.queue.queue_manager.pika_queue_manager import PikaQueueManager
|
||||
from pyinfra.server.client_pipeline import ClientPipeline
|
||||
from pyinfra.server.dispatcher.dispatchers.rest import RestDispatcher
|
||||
from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStreamer
|
||||
from pyinfra.server.packer.packers.rest import RestPacker
|
||||
from pyinfra.server.receiver.receivers.rest import RestReceiver
|
||||
from pyinfra.storage import storages
|
||||
from pyinfra.visitor import QueueVisitor
|
||||
from pyinfra.visitor.downloader import Downloader
|
||||
from pyinfra.visitor.response_formatter.formatters.default import DefaultResponseFormatter
|
||||
from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter
|
||||
from pyinfra.visitor.strategies.response.aggregation import AggregationStorageStrategy, ProjectingUploadFormatter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ComponentFactory:
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
|
||||
@lru_cache(maxsize=None)
|
||||
def get_consumer(self, callback=None):
|
||||
callback = callback or self.get_callback()
|
||||
return Consumer(self.get_visitor(callback), self.get_queue_manager())
|
||||
|
||||
@lru_cache(maxsize=None)
|
||||
def get_callback(self, analysis_base_url=None):
|
||||
analysis_base_url = analysis_base_url or self.config.rabbitmq.callback.analysis_endpoint
|
||||
|
||||
callback = Callback(CachedPipelineFactory(base_url=analysis_base_url, pipeline_factory=self.get_pipeline))
|
||||
|
||||
def wrapped(body):
|
||||
body_repr = project(body, ["dossierId", "fileId", "operation"])
|
||||
logger.info(f"Processing {body_repr}...")
|
||||
result = callback(body)
|
||||
logger.info(f"Completed processing {body_repr}...")
|
||||
return result
|
||||
|
||||
return wrapped
|
||||
|
||||
@lru_cache(maxsize=None)
|
||||
def get_visitor(self, callback):
|
||||
return QueueVisitor(
|
||||
callback=callback,
|
||||
data_loader=self.get_downloader(),
|
||||
response_strategy=self.get_response_strategy(),
|
||||
response_formatter=self.get_response_formatter(),
|
||||
)
|
||||
|
||||
@lru_cache(maxsize=None)
|
||||
def get_queue_manager(self):
|
||||
return PikaQueueManager(self.config.rabbitmq.queues.input, self.config.rabbitmq.queues.output)
|
||||
|
||||
@staticmethod
|
||||
@lru_cache(maxsize=None)
|
||||
def get_pipeline(endpoint):
|
||||
return ClientPipeline(
|
||||
RestPacker(), RestDispatcher(endpoint), RestReceiver(), rcompose(RestPickupStreamer(), RestReceiver())
|
||||
)
|
||||
|
||||
@lru_cache(maxsize=None)
|
||||
def get_storage(self):
|
||||
return storages.get_storage(self.config.storage.backend)
|
||||
|
||||
@lru_cache(maxsize=None)
|
||||
def get_response_strategy(self, storage=None):
|
||||
return AggregationStorageStrategy(
|
||||
storage=storage or self.get_storage(),
|
||||
file_descriptor_manager=self.get_file_descriptor_manager(),
|
||||
upload_formatter=self.get_upload_formatter(),
|
||||
)
|
||||
|
||||
@lru_cache(maxsize=None)
|
||||
def get_file_descriptor_manager(self):
|
||||
return FileDescriptorManager(
|
||||
bucket_name=parse_disjunction_string(self.config.storage.bucket),
|
||||
file_descriptor_builder=self.get_operation_file_descriptor_builder(),
|
||||
)
|
||||
|
||||
@lru_cache(maxsize=None)
|
||||
def get_upload_formatter(self):
|
||||
return {"identity": identity, "projecting": ProjectingUploadFormatter()}[self.config.service.upload_formatter]
|
||||
|
||||
@lru_cache(maxsize=None)
|
||||
def get_operation_file_descriptor_builder(self):
|
||||
return RedFileDescriptorBuilder(
|
||||
operation2file_patterns=self.get_operation2file_patterns(),
|
||||
default_operation_name=self.config.service.operation,
|
||||
)
|
||||
|
||||
@lru_cache(maxsize=None)
|
||||
def get_response_formatter(self):
|
||||
return {"default": DefaultResponseFormatter(), "identity": IdentityResponseFormatter()}[
|
||||
self.config.service.response_formatter
|
||||
]
|
||||
|
||||
@lru_cache(maxsize=None)
|
||||
def get_operation2file_patterns(self):
|
||||
if self.config.service.operation is not "default":
|
||||
self.config.service.operations["default"] = self.config.service.operations[self.config.service.operation]
|
||||
return self.config.service.operations
|
||||
|
||||
@lru_cache(maxsize=None)
|
||||
def get_downloader(self, storage=None):
|
||||
return Downloader(
|
||||
storage=storage or self.get_storage(),
|
||||
bucket_name=parse_disjunction_string(self.config.storage.bucket),
|
||||
file_descriptor_manager=self.get_file_descriptor_manager(),
|
||||
)
|
||||
@ -1,10 +1,13 @@
|
||||
"""Implements a config object with dot-indexing syntax."""
|
||||
import os
|
||||
from functools import partial
|
||||
from itertools import chain
|
||||
from operator import truth
|
||||
from typing import Iterable
|
||||
|
||||
from envyaml import EnvYAML
|
||||
from funcy import first, juxt, butlast, last
|
||||
from frozendict import frozendict
|
||||
from funcy import first, juxt, butlast, last, lmap
|
||||
|
||||
from pyinfra.locations import CONFIG_FILE
|
||||
|
||||
@ -27,6 +30,9 @@ class DotIndexable:
|
||||
def __getitem__(self, item):
|
||||
return self.__getattr__(item)
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self.x[key] = value
|
||||
|
||||
|
||||
class Config:
|
||||
def __init__(self, config_path):
|
||||
@ -39,6 +45,29 @@ class Config:
|
||||
def __getitem__(self, item):
|
||||
return self.__getattr__(item)
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self.__config.key = value
|
||||
|
||||
def to_dict(self, frozen=True):
|
||||
return to_dict(self.__config.export(), frozen=frozen)
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.to_dict())
|
||||
|
||||
|
||||
def to_dict(v, frozen=True):
|
||||
def make_dict(*args, **kwargs):
|
||||
return (frozendict if frozen else dict)(*args, **kwargs)
|
||||
|
||||
if isinstance(v, list):
|
||||
return tuple(map(partial(to_dict, frozen=frozen), v))
|
||||
elif isinstance(v, DotIndexable):
|
||||
return make_dict({k: to_dict(v, frozen=frozen) for k, v in v.x.items()})
|
||||
elif isinstance(v, dict):
|
||||
return make_dict({k: to_dict(v, frozen=frozen) for k, v in v.items()})
|
||||
else:
|
||||
return v
|
||||
|
||||
|
||||
CONFIG = Config(CONFIG_FILE)
|
||||
|
||||
|
||||
8
pyinfra/default_objects.py
Normal file
8
pyinfra/default_objects.py
Normal file
@ -0,0 +1,8 @@
|
||||
from functools import lru_cache
|
||||
|
||||
from pyinfra.component_factory import ComponentFactory
|
||||
|
||||
|
||||
@lru_cache(maxsize=None)
|
||||
def get_component_factory(config):
|
||||
return ComponentFactory(config)
|
||||
@ -32,3 +32,19 @@ class NoSuchContainer(KeyError):
|
||||
|
||||
class IntentionalTestException(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
class UnexpectedItemType(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class NoBufferCapacity(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class InvalidMessage(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class InvalidStorageItemFormat(ValueError):
|
||||
pass
|
||||
|
||||
99
pyinfra/file_descriptor_builder.py
Normal file
99
pyinfra/file_descriptor_builder.py
Normal file
@ -0,0 +1,99 @@
|
||||
import abc
|
||||
import os
|
||||
from operator import itemgetter
|
||||
|
||||
from funcy import project
|
||||
|
||||
|
||||
class FileDescriptorBuilder:
|
||||
@abc.abstractmethod
|
||||
def build_file_descriptor(self, queue_item_body, end="input"):
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def build_matcher(self, file_descriptor):
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
@abc.abstractmethod
|
||||
def build_storage_upload_info(analysis_payload, request_metadata):
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_path_prefix(self, queue_item_body):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class RedFileDescriptorBuilder(FileDescriptorBuilder):
|
||||
"""Defines concrete descriptors for storage objects based on queue messages"""
|
||||
|
||||
def __init__(self, operation2file_patterns, default_operation_name):
|
||||
|
||||
self.operation2file_patterns = operation2file_patterns or self.get_default_operation2file_patterns()
|
||||
self.default_operation_name = default_operation_name
|
||||
|
||||
@staticmethod
|
||||
def get_default_operation2file_patterns():
|
||||
return {"default": {"input": {"subdir": "", "extension": ".in"}, "output": {"subdir": "", "extension": ".out"}}}
|
||||
|
||||
def build_file_descriptor(self, queue_item_body, end="input"):
|
||||
|
||||
def pages():
|
||||
if end == "input":
|
||||
if "id" in queue_item_body:
|
||||
return [queue_item_body["id"]]
|
||||
else:
|
||||
return queue_item_body["pages"] if file_pattern["multi"] else []
|
||||
elif end == "output":
|
||||
return [queue_item_body["id"]]
|
||||
else:
|
||||
raise ValueError(f"Invalid argument: {end=}") # TODO: use an enum for `end`
|
||||
|
||||
operation = queue_item_body.get("operation", self.default_operation_name)
|
||||
|
||||
file_pattern = self.operation2file_patterns[operation][end]
|
||||
|
||||
file_descriptor = {
|
||||
**project(queue_item_body, ["dossierId", "fileId", "pages"]),
|
||||
"pages": pages(),
|
||||
"extension": file_pattern["extension"],
|
||||
"subdir": file_pattern["subdir"],
|
||||
}
|
||||
|
||||
return file_descriptor
|
||||
|
||||
def build_matcher(self, file_descriptor):
|
||||
def make_filename(file_id, subdir, suffix):
|
||||
return os.path.join(file_id, subdir, suffix) if subdir else f"{file_id}.{suffix}"
|
||||
|
||||
dossier_id, file_id, subdir, pages, extension = itemgetter(
|
||||
"dossierId", "fileId", "subdir", "pages", "extension"
|
||||
)(file_descriptor)
|
||||
|
||||
matcher = os.path.join(
|
||||
dossier_id, make_filename(file_id, subdir, self.__build_page_regex(pages, subdir) + extension)
|
||||
)
|
||||
|
||||
return matcher
|
||||
|
||||
@staticmethod
|
||||
def __build_page_regex(pages, subdir):
|
||||
|
||||
n_pages = len(pages)
|
||||
if n_pages > 1:
|
||||
page_re = "id:(" + "|".join(map(str, pages)) + ")."
|
||||
elif n_pages == 1:
|
||||
page_re = f"id:{pages[0]}."
|
||||
else: # no pages specified -> either all pages or no pages, depending on whether a subdir is specified
|
||||
page_re = r"id:\d+." if subdir else ""
|
||||
|
||||
return page_re
|
||||
|
||||
@staticmethod
|
||||
def build_storage_upload_info(analysis_payload, request_metadata):
|
||||
storage_upload_info = {**request_metadata, "id": analysis_payload["metadata"].get("id", 0)}
|
||||
return storage_upload_info
|
||||
|
||||
def get_path_prefix(self, queue_item_body):
|
||||
prefix = "/".join(itemgetter("dossierId", "fileId")(self.build_file_descriptor(queue_item_body, end="input")))
|
||||
return prefix
|
||||
63
pyinfra/file_descriptor_manager.py
Normal file
63
pyinfra/file_descriptor_manager.py
Normal file
@ -0,0 +1,63 @@
|
||||
from pyinfra.file_descriptor_builder import FileDescriptorBuilder
|
||||
|
||||
|
||||
class FileDescriptorManager:
|
||||
"""Decorates a file descriptor builder with additional convenience functionality and this way provides a
|
||||
comprehensive interface for all file descriptor related operations, while the concrete descriptor logic is
|
||||
implemented in a file descriptor builder.
|
||||
|
||||
TODO: This is supposed to be fully decoupled from the concrete file descriptor builder implementation, however some
|
||||
bad coupling is still left.
|
||||
"""
|
||||
|
||||
def __init__(self, bucket_name, file_descriptor_builder: FileDescriptorBuilder):
|
||||
self.bucket_name = bucket_name
|
||||
self.operation_file_descriptor_builder = file_descriptor_builder
|
||||
|
||||
def get_input_object_name(self, queue_item_body: dict):
|
||||
return self.get_object_name(queue_item_body, end="input")
|
||||
|
||||
def get_output_object_name(self, queue_item_body: dict):
|
||||
return self.get_object_name(queue_item_body, end="output")
|
||||
|
||||
def get_object_name(self, queue_item_body: dict, end):
|
||||
file_descriptor = self.build_file_descriptor(queue_item_body, end=end)
|
||||
object_name = self.__build_matcher(file_descriptor)
|
||||
|
||||
return object_name
|
||||
|
||||
def build_file_descriptor(self, queue_item_body, end="input"):
|
||||
return self.operation_file_descriptor_builder.build_file_descriptor(queue_item_body, end=end)
|
||||
|
||||
def build_input_matcher(self, queue_item_body):
|
||||
return self.build_matcher(queue_item_body, end="input")
|
||||
|
||||
def build_output_matcher(self, queue_item_body):
|
||||
return self.build_matcher(queue_item_body, end="output")
|
||||
|
||||
def build_matcher(self, queue_item_body, end):
|
||||
file_descriptor = self.build_file_descriptor(queue_item_body, end=end)
|
||||
return self.__build_matcher(file_descriptor)
|
||||
|
||||
def __build_matcher(self, file_descriptor):
|
||||
return self.operation_file_descriptor_builder.build_matcher(file_descriptor)
|
||||
|
||||
def get_input_object_descriptor(self, queue_item_body):
|
||||
return self.get_object_descriptor(queue_item_body, end="input")
|
||||
|
||||
def get_output_object_descriptor(self, storage_upload_info):
|
||||
return self.get_object_descriptor(storage_upload_info, end="output")
|
||||
|
||||
def get_object_descriptor(self, queue_item_body, end):
|
||||
# TODO: this is complected with the Storage class API
|
||||
# FIXME: bad coupling
|
||||
return {
|
||||
"bucket_name": self.bucket_name,
|
||||
"object_name": self.get_object_name(queue_item_body, end=end),
|
||||
}
|
||||
|
||||
def build_storage_upload_info(self, analysis_payload, request_metadata):
|
||||
return self.operation_file_descriptor_builder.build_storage_upload_info(analysis_payload, request_metadata)
|
||||
|
||||
def get_path_prefix(self, queue_item_body):
|
||||
return self.operation_file_descriptor_builder.get_path_prefix(queue_item_body)
|
||||
@ -6,8 +6,7 @@ from waitress import serve
|
||||
|
||||
from pyinfra.config import CONFIG
|
||||
|
||||
logger = logging.getLogger(__file__)
|
||||
logger.setLevel(CONFIG.service.logging_level)
|
||||
logger = logging.getLogger()
|
||||
|
||||
|
||||
def run_probing_webserver(app, host=None, port=None, mode=None):
|
||||
|
||||
0
pyinfra/parser/__init__.py
Normal file
0
pyinfra/parser/__init__.py
Normal file
14
pyinfra/parser/blob_parser.py
Normal file
14
pyinfra/parser/blob_parser.py
Normal file
@ -0,0 +1,14 @@
|
||||
import abc
|
||||
|
||||
|
||||
class ParsingError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class BlobParser(abc.ABC):
|
||||
@abc.abstractmethod
|
||||
def parse(self, blob: bytes):
|
||||
pass
|
||||
|
||||
def __call__(self, blob: bytes):
|
||||
return self.parse(blob)
|
||||
67
pyinfra/parser/parser_composer.py
Normal file
67
pyinfra/parser/parser_composer.py
Normal file
@ -0,0 +1,67 @@
|
||||
import logging
|
||||
|
||||
from funcy import rcompose
|
||||
|
||||
from pyinfra.parser.blob_parser import ParsingError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Either:
|
||||
def __init__(self, item):
|
||||
self.item = item
|
||||
|
||||
def bind(self):
|
||||
return self.item
|
||||
|
||||
|
||||
class Left(Either):
|
||||
pass
|
||||
|
||||
|
||||
class Right(Either):
|
||||
pass
|
||||
|
||||
|
||||
class EitherParserWrapper:
|
||||
def __init__(self, parser):
|
||||
self.parser = parser
|
||||
|
||||
def __log(self, result):
|
||||
if isinstance(result, Right):
|
||||
logger.log(logging.DEBUG - 5, f"{self.parser.__class__.__name__} succeeded or forwarded on {result.bind()}")
|
||||
else:
|
||||
logger.log(logging.DEBUG - 5, f"{self.parser.__class__.__name__} failed on {result.bind()}")
|
||||
return result
|
||||
|
||||
def parse(self, item: Either):
|
||||
if isinstance(item, Left):
|
||||
|
||||
try:
|
||||
return Right(self.parser(item.bind()))
|
||||
except ParsingError:
|
||||
return item
|
||||
|
||||
elif isinstance(item, Right):
|
||||
return item
|
||||
|
||||
else:
|
||||
return self.parse(Left(item))
|
||||
|
||||
def __call__(self, item):
|
||||
return self.__log(self.parse(item))
|
||||
|
||||
|
||||
class EitherParserComposer:
|
||||
def __init__(self, *parsers):
|
||||
self.parser = rcompose(*map(EitherParserWrapper, parsers))
|
||||
|
||||
def parse(self, item):
|
||||
result = self.parser(item)
|
||||
if isinstance(result, Right):
|
||||
return result.bind()
|
||||
else:
|
||||
raise ParsingError("All parsers failed.")
|
||||
|
||||
def __call__(self, item):
|
||||
return self.parse(item)
|
||||
0
pyinfra/parser/parsers/__init__.py
Normal file
0
pyinfra/parser/parsers/__init__.py
Normal file
7
pyinfra/parser/parsers/identity.py
Normal file
7
pyinfra/parser/parsers/identity.py
Normal file
@ -0,0 +1,7 @@
|
||||
from pyinfra.parser.blob_parser import BlobParser
|
||||
|
||||
|
||||
class IdentityBlobParser(BlobParser):
|
||||
|
||||
def parse(self, data: bytes):
|
||||
return data
|
||||
21
pyinfra/parser/parsers/json.py
Normal file
21
pyinfra/parser/parsers/json.py
Normal file
@ -0,0 +1,21 @@
|
||||
import json
|
||||
|
||||
from pyinfra.parser.blob_parser import BlobParser, ParsingError
|
||||
from pyinfra.server.packing import string_to_bytes
|
||||
|
||||
|
||||
class JsonBlobParser(BlobParser):
|
||||
|
||||
def parse(self, data: bytes):
|
||||
try:
|
||||
data = data.decode()
|
||||
data = json.loads(data)
|
||||
except (UnicodeDecodeError, json.JSONDecodeError, AttributeError) as err:
|
||||
raise ParsingError from err
|
||||
|
||||
try:
|
||||
data["data"] = string_to_bytes(data["data"])
|
||||
except (KeyError, TypeError) as err:
|
||||
raise ParsingError from err
|
||||
|
||||
return data
|
||||
9
pyinfra/parser/parsers/string.py
Normal file
9
pyinfra/parser/parsers/string.py
Normal file
@ -0,0 +1,9 @@
|
||||
from pyinfra.parser.blob_parser import BlobParser, ParsingError
|
||||
|
||||
|
||||
class StringBlobParser(BlobParser):
|
||||
def parse(self, data: bytes):
|
||||
try:
|
||||
return data.decode()
|
||||
except Exception as err:
|
||||
raise ParsingError from err
|
||||
18
pyinfra/pipeline_factory.py
Normal file
18
pyinfra/pipeline_factory.py
Normal file
@ -0,0 +1,18 @@
|
||||
class CachedPipelineFactory:
|
||||
def __init__(self, base_url, pipeline_factory):
|
||||
self.base_url = base_url
|
||||
self.operation2pipeline = {}
|
||||
self.pipeline_factory = pipeline_factory
|
||||
|
||||
def get_pipeline(self, operation: str):
|
||||
pipeline = self.operation2pipeline.get(operation, None) or self.__register_pipeline(operation)
|
||||
return pipeline
|
||||
|
||||
def __register_pipeline(self, operation):
|
||||
endpoint = self.__make_endpoint(operation)
|
||||
pipeline = self.pipeline_factory(endpoint)
|
||||
self.operation2pipeline[operation] = pipeline
|
||||
return pipeline
|
||||
|
||||
def __make_endpoint(self, operation):
|
||||
return f"{self.base_url}/{operation}"
|
||||
@ -2,15 +2,15 @@ from pyinfra.queue.queue_manager.queue_manager import QueueManager
|
||||
|
||||
|
||||
class Consumer:
|
||||
def __init__(self, callback, queue_manager: QueueManager):
|
||||
def __init__(self, visitor, queue_manager: QueueManager):
|
||||
self.queue_manager = queue_manager
|
||||
self.callback = callback
|
||||
self.visitor = visitor
|
||||
|
||||
def consume_and_publish(self):
|
||||
self.queue_manager.consume_and_publish(self.callback)
|
||||
def consume_and_publish(self, n=None):
|
||||
self.queue_manager.consume_and_publish(self.visitor, n=n)
|
||||
|
||||
def basic_consume_and_publish(self):
|
||||
self.queue_manager.basic_consume_and_publish(self.callback)
|
||||
self.queue_manager.basic_consume_and_publish(self.visitor)
|
||||
|
||||
def consume(self, **kwargs):
|
||||
return self.queue_manager.consume(**kwargs)
|
||||
|
||||
@ -1,18 +1,18 @@
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from itertools import islice
|
||||
|
||||
import pika
|
||||
|
||||
from pyinfra.config import CONFIG
|
||||
from pyinfra.exceptions import ProcessingFailure, DataLoadingFailure
|
||||
from pyinfra.queue.queue_manager.queue_manager import QueueHandle, QueueManager
|
||||
from pyinfra.visitor import QueueVisitor
|
||||
|
||||
logger = logging.getLogger("pika")
|
||||
logger.setLevel(logging.WARNING)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(CONFIG.service.logging_level)
|
||||
logger = logging.getLogger()
|
||||
|
||||
|
||||
def monkey_patch_queue_handle(channel, queue) -> QueueHandle:
|
||||
@ -83,7 +83,7 @@ class PikaQueueManager(QueueManager):
|
||||
self.channel.queue_declare(input_queue, arguments=args, auto_delete=False, durable=True)
|
||||
self.channel.queue_declare(output_queue, arguments=args, auto_delete=False, durable=True)
|
||||
|
||||
def republish(self, body, n_current_attempts, frame):
|
||||
def republish(self, body: bytes, n_current_attempts, frame):
|
||||
self.channel.basic_publish(
|
||||
exchange="",
|
||||
routing_key=self._input_queue,
|
||||
@ -100,7 +100,7 @@ class PikaQueueManager(QueueManager):
|
||||
logger.error(f"Adding to dead letter queue: {body}")
|
||||
self.channel.basic_reject(delivery_tag=frame.delivery_tag, requeue=False)
|
||||
|
||||
def publish_response(self, message, callback, max_attempts=3):
|
||||
def publish_response(self, message, visitor: QueueVisitor, max_attempts=3):
|
||||
|
||||
logger.debug(f"Processing {message}.")
|
||||
|
||||
@ -109,8 +109,15 @@ class PikaQueueManager(QueueManager):
|
||||
n_attempts = get_n_previous_attempts(properties) + 1
|
||||
|
||||
try:
|
||||
response = json.dumps(callback(json.loads(body)))
|
||||
self.channel.basic_publish("", self._output_queue, response.encode())
|
||||
response_messages = visitor(json.loads(body))
|
||||
|
||||
if isinstance(response_messages, dict):
|
||||
response_messages = [response_messages]
|
||||
|
||||
for response_message in response_messages:
|
||||
response_message = json.dumps(response_message).encode()
|
||||
self.channel.basic_publish("", self._output_queue, response_message)
|
||||
|
||||
self.channel.basic_ack(frame.delivery_tag)
|
||||
except (ProcessingFailure, DataLoadingFailure):
|
||||
|
||||
@ -124,20 +131,21 @@ class PikaQueueManager(QueueManager):
|
||||
def pull_request(self):
|
||||
return self.channel.basic_get(self._input_queue)
|
||||
|
||||
def consume(self, inactivity_timeout=None):
|
||||
def consume(self, inactivity_timeout=None, n=None):
|
||||
logger.debug("Consuming")
|
||||
return self.channel.consume(self._input_queue, inactivity_timeout=inactivity_timeout)
|
||||
gen = self.channel.consume(self._input_queue, inactivity_timeout=inactivity_timeout)
|
||||
yield from islice(gen, n)
|
||||
|
||||
def consume_and_publish(self, visitor):
|
||||
def consume_and_publish(self, visitor: QueueVisitor, n=None):
|
||||
|
||||
logger.info(f"Consuming with callback {visitor.callback.__name__}")
|
||||
logger.info(f"Consuming input queue.")
|
||||
|
||||
for message in self.consume():
|
||||
for message in self.consume(n=n):
|
||||
self.publish_response(message, visitor)
|
||||
|
||||
def basic_consume_and_publish(self, visitor):
|
||||
def basic_consume_and_publish(self, visitor: QueueVisitor):
|
||||
|
||||
logger.info(f"Basic consuming with callback {visitor.callback.__name__}")
|
||||
logger.info(f"Basic consuming input queue.")
|
||||
|
||||
def callback(channel, frame, properties, body):
|
||||
message = (frame, properties, body)
|
||||
@ -150,6 +158,8 @@ class PikaQueueManager(QueueManager):
|
||||
try:
|
||||
self.channel.queue_purge(self._input_queue)
|
||||
self.channel.queue_purge(self._output_queue)
|
||||
assert self.input_queue.to_list() == []
|
||||
assert self.output_queue.to_list() == []
|
||||
except pika.exceptions.ChannelWrongStateError:
|
||||
pass
|
||||
|
||||
|
||||
@ -43,7 +43,7 @@ class QueueManager(abc.ABC):
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def consume_and_publish(self, callback):
|
||||
def consume_and_publish(self, callback, n=None):
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
|
||||
0
pyinfra/server/__init__.py
Normal file
0
pyinfra/server/__init__.py
Normal file
0
pyinfra/server/buffering/__init__.py
Normal file
0
pyinfra/server/buffering/__init__.py
Normal file
37
pyinfra/server/buffering/bufferize.py
Normal file
37
pyinfra/server/buffering/bufferize.py
Normal file
@ -0,0 +1,37 @@
|
||||
import logging
|
||||
from collections import deque
|
||||
|
||||
from funcy import repeatedly, identity
|
||||
|
||||
from pyinfra.exceptions import NoBufferCapacity
|
||||
from pyinfra.server.nothing import Nothing
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def bufferize(fn, buffer_size=3, persist_fn=identity, null_value=None):
|
||||
def buffered_fn(item):
|
||||
|
||||
if item is not Nothing:
|
||||
buffer.append(persist_fn(item))
|
||||
|
||||
response_payload = fn(repeatedly(buffer.popleft, n_items_to_pop(buffer, item is Nothing)))
|
||||
|
||||
return response_payload or null_value
|
||||
|
||||
def buffer_full(current_buffer_size):
|
||||
if current_buffer_size > buffer_size:
|
||||
logger.warning(f"Overfull buffer. size: {current_buffer_size}; intended capacity: {buffer_size}")
|
||||
|
||||
return current_buffer_size == buffer_size
|
||||
|
||||
def n_items_to_pop(buffer, final):
|
||||
current_buffer_size = len(buffer)
|
||||
return (final or buffer_full(current_buffer_size)) * current_buffer_size
|
||||
|
||||
if not buffer_size > 0:
|
||||
raise NoBufferCapacity("Buffer size must be greater than zero.")
|
||||
|
||||
buffer = deque()
|
||||
|
||||
return buffered_fn
|
||||
24
pyinfra/server/buffering/queue.py
Normal file
24
pyinfra/server/buffering/queue.py
Normal file
@ -0,0 +1,24 @@
|
||||
from collections import deque
|
||||
from itertools import takewhile
|
||||
|
||||
from funcy import repeatedly
|
||||
|
||||
from pyinfra.server.nothing import is_not_nothing, Nothing
|
||||
|
||||
|
||||
def stream_queue(queue):
|
||||
yield from takewhile(is_not_nothing, repeatedly(queue.popleft))
|
||||
|
||||
|
||||
class Queue:
|
||||
def __init__(self):
|
||||
self.__queue = deque()
|
||||
|
||||
def append(self, package) -> None:
|
||||
self.__queue.append(package)
|
||||
|
||||
def popleft(self):
|
||||
return self.__queue.popleft() if self.__queue else Nothing
|
||||
|
||||
def __bool__(self):
|
||||
return bool(self.__queue)
|
||||
44
pyinfra/server/buffering/stream.py
Normal file
44
pyinfra/server/buffering/stream.py
Normal file
@ -0,0 +1,44 @@
|
||||
from itertools import chain, takewhile
|
||||
from typing import Iterable
|
||||
|
||||
from funcy import first, repeatedly, mapcat
|
||||
|
||||
from pyinfra.server.buffering.bufferize import bufferize
|
||||
from pyinfra.server.nothing import Nothing, is_not_nothing
|
||||
|
||||
|
||||
class FlatStreamBuffer:
|
||||
"""Wraps a stream buffer and chains its output. Also flushes the stream buffer when applied to an iterable."""
|
||||
|
||||
def __init__(self, fn, buffer_size=3):
|
||||
"""Function `fn` needs to be mappable and return an iterable; ideally `fn` returns a generator."""
|
||||
self.stream_buffer = StreamBuffer(fn, buffer_size=buffer_size)
|
||||
|
||||
def __call__(self, items):
|
||||
items = chain(items, [Nothing])
|
||||
yield from mapcat(self.stream_buffer, items)
|
||||
|
||||
|
||||
class StreamBuffer:
|
||||
"""Puts a streaming function between an input and an output buffer."""
|
||||
|
||||
def __init__(self, fn, buffer_size=3):
|
||||
"""Function `fn` needs to be mappable and return an iterable; ideally `fn` returns a generator."""
|
||||
self.fn = bufferize(fn, buffer_size=buffer_size, null_value=[])
|
||||
self.result_stream = chain([])
|
||||
|
||||
def __call__(self, item) -> Iterable:
|
||||
self.push(item)
|
||||
yield from takewhile(is_not_nothing, repeatedly(self.pop))
|
||||
|
||||
def push(self, item):
|
||||
self.result_stream = chain(self.result_stream, self.compute(item))
|
||||
|
||||
def compute(self, item):
|
||||
try:
|
||||
yield from self.fn(item)
|
||||
except TypeError as err:
|
||||
raise TypeError("Function failed with type-error. Is it mappable?") from err
|
||||
|
||||
def pop(self):
|
||||
return first(chain(self.result_stream, [Nothing]))
|
||||
16
pyinfra/server/client_pipeline.py
Normal file
16
pyinfra/server/client_pipeline.py
Normal file
@ -0,0 +1,16 @@
|
||||
from funcy import rcompose, flatten
|
||||
|
||||
|
||||
# TODO: remove the dispatcher component from the pipeline; it no longer actually dispatches
|
||||
class ClientPipeline:
|
||||
def __init__(self, packer, dispatcher, receiver, interpreter):
|
||||
self.pipe = rcompose(
|
||||
packer,
|
||||
dispatcher,
|
||||
receiver,
|
||||
interpreter,
|
||||
flatten, # each analysis call returns an iterable. Can be empty, singleton or multi item. Hence, flatten.
|
||||
)
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
yield from self.pipe(*args, **kwargs)
|
||||
27
pyinfra/server/debugging.py
Normal file
27
pyinfra/server/debugging.py
Normal file
@ -0,0 +1,27 @@
|
||||
from itertools import tee
|
||||
from typing import Iterable
|
||||
|
||||
|
||||
def inspect(prefix="inspect", embed=False):
|
||||
"""Can be used to inspect compositions of generator functions by placing inbetween two functions."""
|
||||
|
||||
def inner(x):
|
||||
|
||||
if isinstance(x, Iterable) and not isinstance(x, dict) and not isinstance(x, tuple):
|
||||
x, y = tee(x)
|
||||
y = list(y)
|
||||
else:
|
||||
y = x
|
||||
|
||||
l = f" {len(y)} items" if isinstance(y, list) else ""
|
||||
|
||||
print(f"{prefix}{l}:", y)
|
||||
|
||||
if embed:
|
||||
import IPython
|
||||
|
||||
IPython.embed()
|
||||
|
||||
return x
|
||||
|
||||
return inner
|
||||
0
pyinfra/server/dispatcher/__init__.py
Normal file
0
pyinfra/server/dispatcher/__init__.py
Normal file
30
pyinfra/server/dispatcher/dispatcher.py
Normal file
30
pyinfra/server/dispatcher/dispatcher.py
Normal file
@ -0,0 +1,30 @@
|
||||
import abc
|
||||
from typing import Iterable
|
||||
|
||||
from more_itertools import peekable
|
||||
|
||||
from pyinfra.server.nothing import Nothing
|
||||
|
||||
|
||||
def has_next(peekable_iter):
|
||||
return peekable_iter.peek(Nothing) is not Nothing
|
||||
|
||||
|
||||
class Dispatcher:
|
||||
def __call__(self, packages: Iterable[dict]):
|
||||
yield from self.dispatch_methods(packages)
|
||||
|
||||
def dispatch_methods(self, packages):
|
||||
packages = peekable(packages)
|
||||
for package in packages:
|
||||
method = self.patch if has_next(packages) else self.post
|
||||
response = method(package)
|
||||
yield response
|
||||
|
||||
@abc.abstractmethod
|
||||
def patch(self, package):
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def post(self, package):
|
||||
raise NotImplementedError
|
||||
0
pyinfra/server/dispatcher/dispatchers/__init__.py
Normal file
0
pyinfra/server/dispatcher/dispatchers/__init__.py
Normal file
21
pyinfra/server/dispatcher/dispatchers/queue.py
Normal file
21
pyinfra/server/dispatcher/dispatchers/queue.py
Normal file
@ -0,0 +1,21 @@
|
||||
from itertools import takewhile
|
||||
|
||||
from funcy import repeatedly, notnone
|
||||
|
||||
from pyinfra.server.dispatcher.dispatcher import Dispatcher
|
||||
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
|
||||
|
||||
|
||||
class QueuedStreamFunctionDispatcher(Dispatcher):
|
||||
def __init__(self, queued_stream_function: QueuedStreamFunction):
|
||||
self.queued_stream_function = queued_stream_function
|
||||
|
||||
def patch(self, package):
|
||||
self.queued_stream_function.push(package)
|
||||
# TODO: this is wonky and a result of the pipeline components having shifted behaviour through previous
|
||||
# refactorings. The analogous functionality for the rest pipeline is in the interpreter. Correct this
|
||||
# asymmetry!
|
||||
yield from takewhile(notnone, repeatedly(self.queued_stream_function.pop))
|
||||
|
||||
def post(self, package):
|
||||
yield from self.patch(package)
|
||||
14
pyinfra/server/dispatcher/dispatchers/rest.py
Normal file
14
pyinfra/server/dispatcher/dispatchers/rest.py
Normal file
@ -0,0 +1,14 @@
|
||||
import requests
|
||||
|
||||
from pyinfra.server.dispatcher.dispatcher import Dispatcher
|
||||
|
||||
|
||||
class RestDispatcher(Dispatcher):
|
||||
def __init__(self, endpoint):
|
||||
self.endpoint = endpoint
|
||||
|
||||
def patch(self, package):
|
||||
return requests.patch(self.endpoint, json=package)
|
||||
|
||||
def post(self, package):
|
||||
return requests.post(self.endpoint, json=package)
|
||||
0
pyinfra/server/exceptions.py
Normal file
0
pyinfra/server/exceptions.py
Normal file
0
pyinfra/server/interpreter/__init__.py
Normal file
0
pyinfra/server/interpreter/__init__.py
Normal file
8
pyinfra/server/interpreter/interpreter.py
Normal file
8
pyinfra/server/interpreter/interpreter.py
Normal file
@ -0,0 +1,8 @@
|
||||
import abc
|
||||
from typing import Iterable
|
||||
|
||||
|
||||
class Interpreter(abc.ABC):
|
||||
@abc.abstractmethod
|
||||
def __call__(self, payloads: Iterable):
|
||||
pass
|
||||
0
pyinfra/server/interpreter/interpreters/__init__.py
Normal file
0
pyinfra/server/interpreter/interpreters/__init__.py
Normal file
8
pyinfra/server/interpreter/interpreters/identity.py
Normal file
8
pyinfra/server/interpreter/interpreters/identity.py
Normal file
@ -0,0 +1,8 @@
|
||||
from typing import Iterable
|
||||
|
||||
from pyinfra.server.interpreter.interpreter import Interpreter
|
||||
|
||||
|
||||
class IdentityInterpreter(Interpreter):
|
||||
def __call__(self, payloads: Iterable):
|
||||
yield from payloads
|
||||
23
pyinfra/server/interpreter/interpreters/rest_callback.py
Normal file
23
pyinfra/server/interpreter/interpreters/rest_callback.py
Normal file
@ -0,0 +1,23 @@
|
||||
from typing import Iterable
|
||||
|
||||
import requests
|
||||
from funcy import takewhile, repeatedly, mapcat
|
||||
|
||||
from pyinfra.server.interpreter.interpreter import Interpreter
|
||||
|
||||
|
||||
def stream_responses(endpoint):
|
||||
def receive():
|
||||
response = requests.get(endpoint)
|
||||
return response
|
||||
|
||||
def more_is_coming(response):
|
||||
return response.status_code == 206
|
||||
|
||||
response_stream = takewhile(more_is_coming, repeatedly(receive))
|
||||
yield from response_stream
|
||||
|
||||
|
||||
class RestPickupStreamer(Interpreter):
|
||||
def __call__(self, payloads: Iterable):
|
||||
yield from mapcat(stream_responses, payloads)
|
||||
39
pyinfra/server/monitoring.py
Normal file
39
pyinfra/server/monitoring.py
Normal file
@ -0,0 +1,39 @@
|
||||
from functools import lru_cache
|
||||
|
||||
from funcy import identity
|
||||
from prometheus_client import CollectorRegistry, Summary
|
||||
|
||||
from pyinfra.server.operation_dispatcher import OperationDispatcher
|
||||
|
||||
|
||||
class OperationDispatcherMonitoringDecorator:
|
||||
def __init__(self, operation_dispatcher: OperationDispatcher, naming_policy=identity):
|
||||
self.operation_dispatcher = operation_dispatcher
|
||||
self.operation2metric = {}
|
||||
self.naming_policy = naming_policy
|
||||
|
||||
@property
|
||||
@lru_cache(maxsize=None)
|
||||
def registry(self):
|
||||
return CollectorRegistry(auto_describe=True)
|
||||
|
||||
def make_summary_instance(self, op: str):
|
||||
return Summary(f"{self.naming_policy(op)}_seconds", f"Time spent on {op}.", registry=self.registry)
|
||||
|
||||
def submit(self, operation, request):
|
||||
return self.operation_dispatcher.submit(operation, request)
|
||||
|
||||
def pickup(self, operation):
|
||||
with self.get_monitor(operation):
|
||||
return self.operation_dispatcher.pickup(operation)
|
||||
|
||||
def get_monitor(self, operation):
|
||||
monitor = self.operation2metric.get(operation, None) or self.register_operation(operation)
|
||||
return monitor.time()
|
||||
|
||||
def register_operation(self, operation):
|
||||
summary = self.make_summary_instance(operation)
|
||||
self.operation2metric[operation] = summary
|
||||
return summary
|
||||
|
||||
|
||||
17
pyinfra/server/normalization.py
Normal file
17
pyinfra/server/normalization.py
Normal file
@ -0,0 +1,17 @@
|
||||
from itertools import chain
|
||||
from typing import Iterable, Union, Tuple
|
||||
|
||||
from pyinfra.exceptions import UnexpectedItemType
|
||||
|
||||
|
||||
def normalize(itr: Iterable[Union[Tuple, Iterable]]) -> Iterable[Tuple]:
|
||||
return chain.from_iterable(map(normalize_item, normalize_item(itr)))
|
||||
|
||||
|
||||
def normalize_item(itm: Union[Tuple, Iterable]) -> Iterable:
|
||||
if isinstance(itm, tuple):
|
||||
return [itm]
|
||||
elif isinstance(itm, Iterable):
|
||||
return itm
|
||||
else:
|
||||
raise UnexpectedItemType("Encountered an item that could not be normalized to a list.")
|
||||
6
pyinfra/server/nothing.py
Normal file
6
pyinfra/server/nothing.py
Normal file
@ -0,0 +1,6 @@
|
||||
class Nothing:
|
||||
pass
|
||||
|
||||
|
||||
def is_not_nothing(x):
|
||||
return x is not Nothing
|
||||
33
pyinfra/server/operation_dispatcher.py
Normal file
33
pyinfra/server/operation_dispatcher.py
Normal file
@ -0,0 +1,33 @@
|
||||
from itertools import starmap, tee
|
||||
from typing import Dict
|
||||
|
||||
from funcy import juxt, zipdict, cat
|
||||
|
||||
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
|
||||
from pyinfra.server.stream.rest import LazyRestProcessor
|
||||
|
||||
|
||||
class OperationDispatcher:
|
||||
def __init__(self, operation2function: Dict[str, QueuedStreamFunction]):
|
||||
submit_suffixes, pickup_suffixes = zip(*map(juxt(submit_suffix, pickup_suffix), operation2function))
|
||||
processors = starmap(LazyRestProcessor, zip(operation2function.values(), submit_suffixes, pickup_suffixes))
|
||||
self.operation2processor = zipdict(submit_suffixes + pickup_suffixes, cat(tee(processors)))
|
||||
|
||||
@classmethod
|
||||
@property
|
||||
def pickup_suffix(cls):
|
||||
return pickup_suffix("")
|
||||
|
||||
def submit(self, operation, request):
|
||||
return self.operation2processor[operation].push(request)
|
||||
|
||||
def pickup(self, operation):
|
||||
return self.operation2processor[operation].pop()
|
||||
|
||||
|
||||
def submit_suffix(op: str):
|
||||
return "" if not op else op
|
||||
|
||||
|
||||
def pickup_suffix(op: str):
|
||||
return "pickup" if not op else f"{op}_pickup"
|
||||
0
pyinfra/server/packer/__init__.py
Normal file
0
pyinfra/server/packer/__init__.py
Normal file
8
pyinfra/server/packer/packer.py
Normal file
8
pyinfra/server/packer/packer.py
Normal file
@ -0,0 +1,8 @@
|
||||
import abc
|
||||
from typing import Iterable
|
||||
|
||||
|
||||
class Packer(abc.ABC):
|
||||
@abc.abstractmethod
|
||||
def __call__(self, data: Iterable, metadata: Iterable):
|
||||
pass
|
||||
0
pyinfra/server/packer/packers/__init__.py
Normal file
0
pyinfra/server/packer/packers/__init__.py
Normal file
14
pyinfra/server/packer/packers/identity.py
Normal file
14
pyinfra/server/packer/packers/identity.py
Normal file
@ -0,0 +1,14 @@
|
||||
from itertools import starmap
|
||||
from typing import Iterable
|
||||
|
||||
from pyinfra.server.packer.packer import Packer
|
||||
|
||||
|
||||
def bundle(data: bytes, metadata: dict):
|
||||
package = {"data": data, "metadata": metadata}
|
||||
return package
|
||||
|
||||
|
||||
class IdentityPacker(Packer):
|
||||
def __call__(self, data: Iterable, metadata):
|
||||
yield from starmap(bundle, zip(data, metadata))
|
||||
9
pyinfra/server/packer/packers/rest.py
Normal file
9
pyinfra/server/packer/packers/rest.py
Normal file
@ -0,0 +1,9 @@
|
||||
from typing import Iterable
|
||||
|
||||
from pyinfra.server.packer.packer import Packer
|
||||
from pyinfra.server.packing import pack_data_and_metadata_for_rest_transfer
|
||||
|
||||
|
||||
class RestPacker(Packer):
|
||||
def __call__(self, data: Iterable[bytes], metadata: Iterable[dict]):
|
||||
yield from pack_data_and_metadata_for_rest_transfer(data, metadata)
|
||||
34
pyinfra/server/packing.py
Normal file
34
pyinfra/server/packing.py
Normal file
@ -0,0 +1,34 @@
|
||||
import base64
|
||||
from _operator import itemgetter
|
||||
from itertools import starmap
|
||||
from typing import Iterable
|
||||
|
||||
from funcy import compose
|
||||
|
||||
from pyinfra.utils.func import starlift, lift
|
||||
|
||||
|
||||
def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable):
|
||||
yield from starmap(pack, zip(data, metadata))
|
||||
|
||||
|
||||
def unpack_fn_pack(fn):
|
||||
return compose(starlift(pack), fn, lift(unpack))
|
||||
|
||||
|
||||
def pack(data: bytes, metadata: dict):
|
||||
package = {"data": bytes_to_string(data), "metadata": metadata}
|
||||
return package
|
||||
|
||||
|
||||
def unpack(package):
|
||||
data, metadata = itemgetter("data", "metadata")(package)
|
||||
return string_to_bytes(data), metadata
|
||||
|
||||
|
||||
def bytes_to_string(data: bytes) -> str:
|
||||
return base64.b64encode(data).decode()
|
||||
|
||||
|
||||
def string_to_bytes(data: str) -> bytes:
|
||||
return base64.b64decode(data.encode())
|
||||
0
pyinfra/server/receiver/__init__.py
Normal file
0
pyinfra/server/receiver/__init__.py
Normal file
8
pyinfra/server/receiver/receiver.py
Normal file
8
pyinfra/server/receiver/receiver.py
Normal file
@ -0,0 +1,8 @@
|
||||
import abc
|
||||
from typing import Iterable
|
||||
|
||||
|
||||
class Receiver(abc.ABC):
|
||||
@abc.abstractmethod
|
||||
def __call__(self, package: Iterable):
|
||||
pass
|
||||
0
pyinfra/server/receiver/receivers/__init__.py
Normal file
0
pyinfra/server/receiver/receivers/__init__.py
Normal file
11
pyinfra/server/receiver/receivers/identity.py
Normal file
11
pyinfra/server/receiver/receivers/identity.py
Normal file
@ -0,0 +1,11 @@
|
||||
from typing import Iterable
|
||||
|
||||
from pyinfra.server.receiver.receiver import Receiver
|
||||
from funcy import notnone
|
||||
|
||||
|
||||
class QueuedStreamFunctionReceiver(Receiver):
|
||||
|
||||
def __call__(self, responses: Iterable):
|
||||
for response in filter(notnone, responses):
|
||||
yield response
|
||||
16
pyinfra/server/receiver/receivers/rest.py
Normal file
16
pyinfra/server/receiver/receivers/rest.py
Normal file
@ -0,0 +1,16 @@
|
||||
from typing import Iterable
|
||||
|
||||
import requests
|
||||
from funcy import chunks, flatten
|
||||
|
||||
from pyinfra.server.receiver.receiver import Receiver
|
||||
|
||||
|
||||
class RestReceiver(Receiver):
|
||||
def __init__(self, chunk_size=3):
|
||||
self.chunk_size = chunk_size
|
||||
|
||||
def __call__(self, responses: Iterable[requests.Response]):
|
||||
for response in flatten(chunks(self.chunk_size, responses)):
|
||||
response.raise_for_status()
|
||||
yield response.json()
|
||||
100
pyinfra/server/server.py
Normal file
100
pyinfra/server/server.py
Normal file
@ -0,0 +1,100 @@
|
||||
from functools import singledispatch
|
||||
from typing import Dict, Callable, Union
|
||||
|
||||
from flask import Flask, jsonify, request
|
||||
from prometheus_client import generate_latest
|
||||
|
||||
from pyinfra.config import CONFIG
|
||||
from pyinfra.server.buffering.stream import FlatStreamBuffer
|
||||
from pyinfra.server.monitoring import OperationDispatcherMonitoringDecorator
|
||||
from pyinfra.server.operation_dispatcher import OperationDispatcher
|
||||
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
|
||||
|
||||
|
||||
@singledispatch
|
||||
def set_up_processing_server(arg: Union[dict, Callable], buffer_size=1):
|
||||
"""Produces a processing server given a streamable function or a mapping from operations to streamable functions.
|
||||
Streamable functions are constructed by calling pyinfra.server.utils.make_streamable_and_wrap_in_packing_logic on a
|
||||
function taking a tuple of data and metadata and also returning a tuple or yielding tuples of data and metadata.
|
||||
If the function doesn't produce data, data should be an empty byte string.
|
||||
If the function doesn't produce metadata, metadata should be an empty dictionary.
|
||||
|
||||
Args:
|
||||
arg: streamable function or mapping of operations: str to streamable functions
|
||||
buffer_size: If your function operates on batches this parameter controls how many items are aggregated before
|
||||
your function is applied.
|
||||
|
||||
TODO: buffer_size has to be controllable on per function basis.
|
||||
|
||||
Returns:
|
||||
Processing server: flask app
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
@set_up_processing_server.register
|
||||
def _(operation2stream_fn: dict, buffer_size=1):
|
||||
return __stream_fn_to_processing_server(operation2stream_fn, buffer_size)
|
||||
|
||||
|
||||
@set_up_processing_server.register
|
||||
def _(stream_fn: object, buffer_size=1):
|
||||
operation2stream_fn = {None: stream_fn}
|
||||
return __stream_fn_to_processing_server(operation2stream_fn, buffer_size)
|
||||
|
||||
|
||||
def __stream_fn_to_processing_server(operation2stream_fn: dict, buffer_size):
|
||||
operation2stream_fn = {
|
||||
op: QueuedStreamFunction(FlatStreamBuffer(fn, buffer_size)) for op, fn in operation2stream_fn.items()
|
||||
}
|
||||
return __set_up_processing_server(operation2stream_fn)
|
||||
|
||||
|
||||
def __set_up_processing_server(operation2function: Dict[str, QueuedStreamFunction]):
|
||||
app = Flask(__name__)
|
||||
|
||||
dispatcher = OperationDispatcherMonitoringDecorator(
|
||||
OperationDispatcher(operation2function),
|
||||
naming_policy=naming_policy,
|
||||
)
|
||||
|
||||
def ok():
|
||||
resp = jsonify("OK")
|
||||
resp.status_code = 200
|
||||
return resp
|
||||
|
||||
@app.route("/ready", methods=["GET"])
|
||||
def ready():
|
||||
return ok()
|
||||
|
||||
@app.route("/health", methods=["GET"])
|
||||
def healthy():
|
||||
return ok()
|
||||
|
||||
@app.route("/prometheus", methods=["GET"])
|
||||
def prometheus():
|
||||
return generate_latest(registry=dispatcher.registry)
|
||||
|
||||
@app.route("/<operation>", methods=["POST", "PATCH"])
|
||||
def submit(operation):
|
||||
return dispatcher.submit(operation, request)
|
||||
|
||||
@app.route("/", methods=["POST", "PATCH"])
|
||||
def submit_default():
|
||||
return dispatcher.submit("", request)
|
||||
|
||||
@app.route("/<operation>", methods=["GET"])
|
||||
def pickup(operation):
|
||||
return dispatcher.pickup(operation)
|
||||
|
||||
return app
|
||||
|
||||
|
||||
def naming_policy(op_name: str):
|
||||
pop_suffix = OperationDispatcher.pickup_suffix
|
||||
prefix = f"redactmanager_{CONFIG.service.name}"
|
||||
|
||||
op_display_name = op_name.replace(f"_{pop_suffix}", "") if op_name != pop_suffix else "default"
|
||||
complete_display_name = f"{prefix}_{op_display_name}"
|
||||
|
||||
return complete_display_name
|
||||
0
pyinfra/server/stream/__init__.py
Normal file
0
pyinfra/server/stream/__init__.py
Normal file
21
pyinfra/server/stream/queued_stream_function.py
Normal file
21
pyinfra/server/stream/queued_stream_function.py
Normal file
@ -0,0 +1,21 @@
|
||||
from funcy import first
|
||||
|
||||
from pyinfra.server.buffering.queue import stream_queue, Queue
|
||||
|
||||
|
||||
class QueuedStreamFunction:
|
||||
def __init__(self, stream_function):
|
||||
"""Combines a stream function with a queue.
|
||||
|
||||
Args:
|
||||
stream_function: Needs to operate on iterables.
|
||||
"""
|
||||
self.queue = Queue()
|
||||
self.stream_function = stream_function
|
||||
|
||||
def push(self, item):
|
||||
self.queue.append(item)
|
||||
|
||||
def pop(self):
|
||||
items = stream_queue(self.queue)
|
||||
return first(self.stream_function(items))
|
||||
51
pyinfra/server/stream/rest.py
Normal file
51
pyinfra/server/stream/rest.py
Normal file
@ -0,0 +1,51 @@
|
||||
import logging
|
||||
|
||||
from flask import jsonify
|
||||
from funcy import drop
|
||||
|
||||
from pyinfra.server.nothing import Nothing
|
||||
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LazyRestProcessor:
|
||||
def __init__(self, queued_stream_function: QueuedStreamFunction, submit_suffix="submit", pickup_suffix="pickup"):
|
||||
self.submit_suffix = submit_suffix
|
||||
self.pickup_suffix = pickup_suffix
|
||||
self.queued_stream_function = queued_stream_function
|
||||
|
||||
def push(self, request):
|
||||
self.queued_stream_function.push(request.json)
|
||||
return jsonify(replace_suffix(request.base_url, self.submit_suffix, self.pickup_suffix))
|
||||
|
||||
def pop(self):
|
||||
result = self.queued_stream_function.pop() or Nothing
|
||||
|
||||
if not valid(result):
|
||||
logger.error(f"Received invalid result: {result}")
|
||||
result = Nothing
|
||||
|
||||
if result is Nothing:
|
||||
logger.info("Analysis completed successfully.")
|
||||
resp = jsonify("No more items left")
|
||||
resp.status_code = 204
|
||||
|
||||
else:
|
||||
logger.debug("Partial analysis completed.")
|
||||
resp = jsonify(result)
|
||||
resp.status_code = 206
|
||||
|
||||
return resp
|
||||
|
||||
|
||||
def valid(result):
|
||||
return isinstance(result, dict) or result is Nothing
|
||||
|
||||
|
||||
def replace_suffix(strn, suf, repl):
|
||||
return remove_last_n(strn, len(suf)) + repl
|
||||
|
||||
|
||||
def remove_last_n(strn, n):
|
||||
return "".join(reversed(list(drop(n, reversed(strn)))))
|
||||
16
pyinfra/server/utils.py
Normal file
16
pyinfra/server/utils.py
Normal file
@ -0,0 +1,16 @@
|
||||
from funcy import compose, identity
|
||||
|
||||
from pyinfra.server.normalization import normalize
|
||||
from pyinfra.server.packing import unpack_fn_pack
|
||||
from pyinfra.utils.func import starlift
|
||||
|
||||
|
||||
def make_streamable_and_wrap_in_packing_logic(fn, batched):
|
||||
fn = make_streamable(fn, batched)
|
||||
fn = unpack_fn_pack(fn)
|
||||
return fn
|
||||
|
||||
|
||||
def make_streamable(fn, batched):
|
||||
# FIXME: something broken with batched == True
|
||||
return compose(normalize, (identity if batched else starlift)(fn))
|
||||
@ -30,5 +30,5 @@ class StorageAdapter(ABC):
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def get_all_object_names(self, bucket_name):
|
||||
def get_all_object_names(self, bucket_name, prefix=None):
|
||||
raise NotImplementedError
|
||||
|
||||
@ -1,5 +1,4 @@
|
||||
import logging
|
||||
from itertools import repeat
|
||||
from operator import attrgetter
|
||||
|
||||
from azure.storage.blob import ContainerClient, BlobServiceClient
|
||||
@ -20,15 +19,15 @@ class AzureStorageAdapter(StorageAdapter):
|
||||
container_client = self.__client.get_container_client(bucket_name)
|
||||
return container_client.exists()
|
||||
|
||||
def make_bucket(self, bucket_name):
|
||||
container_client = self.__client.get_container_client(bucket_name)
|
||||
container_client if container_client.exists() else self.__client.create_container(bucket_name)
|
||||
|
||||
def __provide_container_client(self, bucket_name) -> ContainerClient:
|
||||
self.make_bucket(bucket_name)
|
||||
container_client = self.__client.get_container_client(bucket_name)
|
||||
return container_client
|
||||
|
||||
def make_bucket(self, bucket_name):
|
||||
container_client = self.__client.get_container_client(bucket_name)
|
||||
container_client if container_client.exists() else self.__client.create_container(bucket_name)
|
||||
|
||||
def put_object(self, bucket_name, object_name, data):
|
||||
logger.debug(f"Uploading '{object_name}'...")
|
||||
container_client = self.__provide_container_client(bucket_name)
|
||||
@ -59,7 +58,7 @@ class AzureStorageAdapter(StorageAdapter):
|
||||
blobs = container_client.list_blobs()
|
||||
container_client.delete_blobs(*blobs)
|
||||
|
||||
def get_all_object_names(self, bucket_name):
|
||||
def get_all_object_names(self, bucket_name, prefix=None):
|
||||
container_client = self.__provide_container_client(bucket_name)
|
||||
blobs = container_client.list_blobs()
|
||||
return zip(repeat(bucket_name), map(attrgetter("name"), blobs))
|
||||
blobs = container_client.list_blobs(name_starts_with=prefix)
|
||||
return map(attrgetter("name"), blobs)
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import io
|
||||
from itertools import repeat
|
||||
import logging
|
||||
from itertools import repeat
|
||||
from operator import attrgetter
|
||||
|
||||
from minio import Minio
|
||||
@ -53,6 +53,6 @@ class S3StorageAdapter(StorageAdapter):
|
||||
for obj in objects:
|
||||
self.__client.remove_object(bucket_name, obj.object_name)
|
||||
|
||||
def get_all_object_names(self, bucket_name):
|
||||
objs = self.__client.list_objects(bucket_name, recursive=True)
|
||||
return zip(repeat(bucket_name), map(attrgetter("object_name"), objs))
|
||||
def get_all_object_names(self, bucket_name, prefix=None):
|
||||
objs = self.__client.list_objects(bucket_name, recursive=True, prefix=prefix)
|
||||
return map(attrgetter("object_name"), objs)
|
||||
|
||||
@ -1,10 +1,10 @@
|
||||
import logging
|
||||
|
||||
from retry import retry
|
||||
|
||||
from pyinfra.config import CONFIG
|
||||
from pyinfra.exceptions import DataLoadingFailure
|
||||
from pyinfra.storage.adapters.adapter import StorageAdapter
|
||||
from pyinfra.utils.retry import retry
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(CONFIG.service.logging_level)
|
||||
@ -26,7 +26,7 @@ class Storage:
|
||||
def get_object(self, bucket_name, object_name):
|
||||
return self.__get_object(bucket_name, object_name)
|
||||
|
||||
@retry(DataLoadingFailure, tries=3, delay=5, jitter=(1, 3))
|
||||
@retry(DataLoadingFailure)
|
||||
def __get_object(self, bucket_name, object_name):
|
||||
try:
|
||||
return self.__adapter.get_object(bucket_name, object_name)
|
||||
@ -40,5 +40,5 @@ class Storage:
|
||||
def clear_bucket(self, bucket_name):
|
||||
return self.__adapter.clear_bucket(bucket_name)
|
||||
|
||||
def get_all_object_names(self, bucket_name):
|
||||
return self.__adapter.get_all_object_names(bucket_name)
|
||||
def get_all_object_names(self, bucket_name, prefix=None):
|
||||
return self.__adapter.get_all_object_names(bucket_name, prefix=prefix)
|
||||
|
||||
14
pyinfra/utils/encoding.py
Normal file
14
pyinfra/utils/encoding.py
Normal file
@ -0,0 +1,14 @@
|
||||
import gzip
|
||||
import json
|
||||
|
||||
|
||||
def pack_analysis_payload(analysis_payload):
|
||||
return compress(json.dumps(analysis_payload).encode())
|
||||
|
||||
|
||||
def compress(data: bytes):
|
||||
return gzip.compress(data)
|
||||
|
||||
|
||||
def decompress(data: bytes):
|
||||
return gzip.decompress(data)
|
||||
47
pyinfra/utils/func.py
Normal file
47
pyinfra/utils/func.py
Normal file
@ -0,0 +1,47 @@
|
||||
from itertools import starmap, tee
|
||||
|
||||
from funcy import curry, compose, filter
|
||||
|
||||
|
||||
def lift(fn):
|
||||
return curry(map)(fn)
|
||||
|
||||
|
||||
def llift(fn):
|
||||
return compose(list, lift(fn))
|
||||
|
||||
|
||||
def starlift(fn):
|
||||
return curry(starmap)(fn)
|
||||
|
||||
|
||||
def lstarlift(fn):
|
||||
return compose(list, starlift(fn))
|
||||
|
||||
|
||||
def parallel(*fs):
|
||||
return lambda *args: (f(a) for f, a in zip(fs, args))
|
||||
|
||||
|
||||
def star(f):
|
||||
return lambda x: f(*x)
|
||||
|
||||
|
||||
def duplicate_stream_and_apply(f1, f2):
|
||||
return compose(star(parallel(f1, f2)), tee)
|
||||
|
||||
|
||||
def foreach(fn, iterable):
|
||||
for itm in iterable:
|
||||
fn(itm)
|
||||
|
||||
|
||||
def flift(pred):
|
||||
return curry(filter)(pred)
|
||||
|
||||
|
||||
def parallel_map(f1, f2):
|
||||
"""Applies functions to a stream in parallel and yields a stream of tuples:
|
||||
parallel_map :: a -> b, a -> c -> [a] -> [(b, c)]
|
||||
"""
|
||||
return compose(star(zip), duplicate_stream_and_apply(f1, f2))
|
||||
15
pyinfra/utils/retry.py
Normal file
15
pyinfra/utils/retry.py
Normal file
@ -0,0 +1,15 @@
|
||||
from pyinfra.config import CONFIG
|
||||
from retry import retry as _retry
|
||||
|
||||
|
||||
def retry(exception):
|
||||
|
||||
def decorator(fn):
|
||||
|
||||
@_retry(exception, tries=CONFIG.retry.tries, delay=CONFIG.retry.delay, jitter=tuple(CONFIG.retry.jitter))
|
||||
def inner(*args, **kwargs):
|
||||
return fn(*args, **kwargs)
|
||||
|
||||
return inner
|
||||
|
||||
return decorator
|
||||
@ -1,91 +0,0 @@
|
||||
import abc
|
||||
import gzip
|
||||
import json
|
||||
import logging
|
||||
from operator import itemgetter
|
||||
from typing import Callable
|
||||
|
||||
from pyinfra.config import CONFIG, parse_disjunction_string
|
||||
from pyinfra.exceptions import DataLoadingFailure
|
||||
from pyinfra.storage.storage import Storage
|
||||
|
||||
|
||||
def get_object_name(body):
|
||||
dossier_id, file_id, target_file_extension = itemgetter("dossierId", "fileId", "targetFileExtension")(body)
|
||||
object_name = f"{dossier_id}/{file_id}.{target_file_extension}"
|
||||
return object_name
|
||||
|
||||
|
||||
def get_response_object_name(body):
|
||||
dossier_id, file_id, response_file_extension = itemgetter("dossierId", "fileId", "responseFileExtension")(body)
|
||||
object_name = f"{dossier_id}/{file_id}.{response_file_extension}"
|
||||
return object_name
|
||||
|
||||
|
||||
def get_object_descriptor(body):
|
||||
return {"bucket_name": parse_disjunction_string(CONFIG.storage.bucket), "object_name": get_object_name(body)}
|
||||
|
||||
|
||||
def get_response_object_descriptor(body):
|
||||
return {
|
||||
"bucket_name": parse_disjunction_string(CONFIG.storage.bucket),
|
||||
"object_name": get_response_object_name(body),
|
||||
}
|
||||
|
||||
|
||||
class ResponseStrategy(abc.ABC):
|
||||
@abc.abstractmethod
|
||||
def handle_response(self, body):
|
||||
pass
|
||||
|
||||
def __call__(self, body):
|
||||
return self.handle_response(body)
|
||||
|
||||
|
||||
class StorageStrategy(ResponseStrategy):
|
||||
def __init__(self, storage):
|
||||
self.storage = storage
|
||||
|
||||
def handle_response(self, body):
|
||||
self.storage.put_object(**get_response_object_descriptor(body), data=gzip.compress(json.dumps(body).encode()))
|
||||
body.pop("data")
|
||||
return body
|
||||
|
||||
|
||||
class ForwardingStrategy(ResponseStrategy):
|
||||
def handle_response(self, body):
|
||||
return body
|
||||
|
||||
|
||||
class QueueVisitor:
|
||||
def __init__(self, storage: Storage, callback: Callable, response_strategy):
|
||||
self.storage = storage
|
||||
self.callback = callback
|
||||
self.response_strategy = response_strategy
|
||||
|
||||
def load_data(self, body):
|
||||
def download():
|
||||
logging.debug(f"Downloading {object_descriptor}...")
|
||||
data = self.storage.get_object(**object_descriptor)
|
||||
logging.debug(f"Downloaded {object_descriptor}.")
|
||||
return data
|
||||
|
||||
object_descriptor = get_object_descriptor(body)
|
||||
|
||||
try:
|
||||
return gzip.decompress(download())
|
||||
except Exception as err:
|
||||
logging.warning(f"Loading data from storage failed for {object_descriptor}.")
|
||||
raise DataLoadingFailure from err
|
||||
|
||||
def process_data(self, data, body):
|
||||
return self.callback({**body, "data": data})
|
||||
|
||||
def load_and_process(self, body):
|
||||
data = self.process_data(self.load_data(body), body)
|
||||
result_body = {**body, "data": data}
|
||||
return result_body
|
||||
|
||||
def __call__(self, body):
|
||||
result_body = self.load_and_process(body)
|
||||
return self.response_strategy(result_body)
|
||||
1
pyinfra/visitor/__init__.py
Normal file
1
pyinfra/visitor/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
from .visitor import QueueVisitor
|
||||
58
pyinfra/visitor/downloader.py
Normal file
58
pyinfra/visitor/downloader.py
Normal file
@ -0,0 +1,58 @@
|
||||
import logging
|
||||
from functools import partial
|
||||
|
||||
from funcy import compose
|
||||
|
||||
from pyinfra.file_descriptor_manager import FileDescriptorManager
|
||||
from pyinfra.storage.storage import Storage
|
||||
from pyinfra.utils.encoding import decompress
|
||||
from pyinfra.utils.func import flift
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Downloader:
|
||||
def __init__(self, storage: Storage, bucket_name, file_descriptor_manager: FileDescriptorManager):
|
||||
self.storage = storage
|
||||
self.bucket_name = bucket_name
|
||||
self.file_descriptor_manager = file_descriptor_manager
|
||||
|
||||
def __call__(self, queue_item_body):
|
||||
return self.download(queue_item_body)
|
||||
|
||||
def download(self, queue_item_body):
|
||||
names_of_relevant_objects = self.get_names_of_objects_by_pattern(queue_item_body)
|
||||
objects = self.download_and_decompress_object(names_of_relevant_objects)
|
||||
|
||||
return objects
|
||||
|
||||
def get_names_of_objects_by_pattern(self, queue_item_body):
|
||||
logger.debug(f"Filtering objects in bucket {self.bucket_name} by pattern...")
|
||||
|
||||
names_of_relevant_objects = compose(
|
||||
list,
|
||||
self.get_pattern_filter(queue_item_body),
|
||||
self.get_names_of_all_associated_objects,
|
||||
)(queue_item_body)
|
||||
|
||||
logger.debug(f"Found {len(names_of_relevant_objects)} objects matching filter.")
|
||||
|
||||
return names_of_relevant_objects
|
||||
|
||||
def download_and_decompress_object(self, object_names):
|
||||
download = partial(self.storage.get_object, self.bucket_name)
|
||||
return map(compose(decompress, download), object_names)
|
||||
|
||||
def get_names_of_all_associated_objects(self, queue_item_body):
|
||||
prefix = self.file_descriptor_manager.get_path_prefix(queue_item_body)
|
||||
# TODO: performance tests for the following situations:
|
||||
# 1) dossier with very many files
|
||||
# 2) prefix matches very many files, independent of dossier cardinality
|
||||
yield from self.storage.get_all_object_names(self.bucket_name, prefix=prefix)
|
||||
|
||||
def get_pattern_filter(self, queue_item_body):
|
||||
file_pattern = self.file_descriptor_manager.build_input_matcher(queue_item_body)
|
||||
|
||||
logger.debug(f"Filtering pattern: {file_pattern if len(file_pattern) <= 120 else (file_pattern[:120]+'...')}")
|
||||
matches_pattern = flift(file_pattern)
|
||||
return matches_pattern
|
||||
0
pyinfra/visitor/response_formatter/__init__.py
Normal file
0
pyinfra/visitor/response_formatter/__init__.py
Normal file
14
pyinfra/visitor/response_formatter/formatter.py
Normal file
14
pyinfra/visitor/response_formatter/formatter.py
Normal file
@ -0,0 +1,14 @@
|
||||
import abc
|
||||
|
||||
from funcy import identity
|
||||
|
||||
from pyinfra.utils.func import lift
|
||||
|
||||
|
||||
class ResponseFormatter(abc.ABC):
|
||||
def __call__(self, message):
|
||||
return (identity if isinstance(message, dict) else lift)(self.format)(message)
|
||||
|
||||
@abc.abstractmethod
|
||||
def format(self, message):
|
||||
pass
|
||||
13
pyinfra/visitor/response_formatter/formatters/default.py
Normal file
13
pyinfra/visitor/response_formatter/formatters/default.py
Normal file
@ -0,0 +1,13 @@
|
||||
from funcy import first, omit
|
||||
|
||||
from pyinfra.visitor.response_formatter.formatter import ResponseFormatter
|
||||
|
||||
|
||||
class DefaultResponseFormatter(ResponseFormatter):
|
||||
"""
|
||||
TODO: Extend via using enums throughout the codebase instead of strings.
|
||||
See the enum-formatter in image-prediction service for reference.
|
||||
"""
|
||||
|
||||
def format(self, message):
|
||||
return {**omit(message, ["response_files"]), "responseFile": first(message["response_files"])}
|
||||
@ -0,0 +1,6 @@
|
||||
from pyinfra.visitor.response_formatter.formatter import ResponseFormatter
|
||||
|
||||
|
||||
class IdentityResponseFormatter(ResponseFormatter):
|
||||
def format(self, message):
|
||||
return message
|
||||
0
pyinfra/visitor/strategies/__init__.py
Normal file
0
pyinfra/visitor/strategies/__init__.py
Normal file
0
pyinfra/visitor/strategies/blob_parsing/__init__.py
Normal file
0
pyinfra/visitor/strategies/blob_parsing/__init__.py
Normal file
14
pyinfra/visitor/strategies/blob_parsing/blob_parsing.py
Normal file
14
pyinfra/visitor/strategies/blob_parsing/blob_parsing.py
Normal file
@ -0,0 +1,14 @@
|
||||
import abc
|
||||
|
||||
|
||||
class BlobParsingStrategy(abc.ABC):
|
||||
@abc.abstractmethod
|
||||
def parse(self, data: bytes):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def parse_and_wrap(self, data: bytes):
|
||||
pass
|
||||
|
||||
def __call__(self, data: bytes):
|
||||
return self.parse_and_wrap(data)
|
||||
20
pyinfra/visitor/strategies/blob_parsing/dynamic.py
Normal file
20
pyinfra/visitor/strategies/blob_parsing/dynamic.py
Normal file
@ -0,0 +1,20 @@
|
||||
from typing import Union
|
||||
|
||||
from pyinfra.parser.parser_composer import EitherParserComposer
|
||||
from pyinfra.parser.parsers.identity import IdentityBlobParser
|
||||
from pyinfra.parser.parsers.json import JsonBlobParser
|
||||
from pyinfra.parser.parsers.string import StringBlobParser
|
||||
from pyinfra.visitor.strategies.blob_parsing.blob_parsing import BlobParsingStrategy
|
||||
|
||||
|
||||
# TODO: Each analysis service should specify a custom parsing strategy for the type of data it expects to be found
|
||||
# on the storage. This class is only a temporary trial-and-error->fallback type of solution.
|
||||
class DynamicParsingStrategy(BlobParsingStrategy):
|
||||
def __init__(self):
|
||||
self.parser = EitherParserComposer(JsonBlobParser(), StringBlobParser(), IdentityBlobParser())
|
||||
|
||||
def parse(self, data: bytes) -> Union[bytes, dict]:
|
||||
return self.parser(data)
|
||||
|
||||
def parse_and_wrap(self, data):
|
||||
return self.parse(data)
|
||||
0
pyinfra/visitor/strategies/response/__init__.py
Normal file
0
pyinfra/visitor/strategies/response/__init__.py
Normal file
92
pyinfra/visitor/strategies/response/aggregation.py
Normal file
92
pyinfra/visitor/strategies/response/aggregation.py
Normal file
@ -0,0 +1,92 @@
|
||||
import abc
|
||||
from collections import deque
|
||||
from typing import Callable
|
||||
|
||||
from funcy import omit, filter, first, lpluck, identity
|
||||
from more_itertools import peekable
|
||||
|
||||
from pyinfra.file_descriptor_manager import FileDescriptorManager
|
||||
from pyinfra.server.nothing import Nothing, is_not_nothing
|
||||
from pyinfra.utils.encoding import pack_analysis_payload
|
||||
from pyinfra.visitor.strategies.response.response import ResponseStrategy
|
||||
|
||||
|
||||
class UploadFormatter(abc.ABC):
|
||||
@abc.abstractmethod
|
||||
def format(self, items):
|
||||
raise NotImplementedError
|
||||
|
||||
def __call__(self, items):
|
||||
return self.format(items)
|
||||
|
||||
|
||||
class ProjectingUploadFormatter(UploadFormatter):
|
||||
def format(self, items):
|
||||
|
||||
head = first(items)
|
||||
if head["data"]:
|
||||
assert len(items) == 1
|
||||
return head
|
||||
else:
|
||||
items = lpluck("metadata", items)
|
||||
return items
|
||||
|
||||
|
||||
class AggregationStorageStrategy(ResponseStrategy):
|
||||
def __init__(
|
||||
self,
|
||||
storage,
|
||||
file_descriptor_manager: FileDescriptorManager,
|
||||
merger: Callable = list,
|
||||
upload_formatter: UploadFormatter = identity,
|
||||
):
|
||||
self.storage = storage
|
||||
self.file_descriptor_manager = file_descriptor_manager
|
||||
self.merger = merger
|
||||
self.upload_formatter = upload_formatter
|
||||
|
||||
self.buffer = deque()
|
||||
self.response_files = deque()
|
||||
|
||||
def handle_response(self, analysis_response, final=False):
|
||||
def upload_or_aggregate(analysis_payload):
|
||||
request_metadata = omit(analysis_response, ["analysis_payloads"])
|
||||
return self.upload_or_aggregate(analysis_payload, request_metadata, last=not analysis_payloads.peek(False))
|
||||
|
||||
analysis_payloads = peekable(analysis_response["analysis_payloads"])
|
||||
yield from filter(is_not_nothing, map(upload_or_aggregate, analysis_payloads))
|
||||
|
||||
def upload_or_aggregate(self, analysis_payload, request_metadata, last=False):
|
||||
"""analysis_payload : {data: ..., metadata: ...}"""
|
||||
storage_upload_info = self.file_descriptor_manager.build_storage_upload_info(analysis_payload, request_metadata)
|
||||
object_descriptor = self.file_descriptor_manager.get_output_object_descriptor(storage_upload_info)
|
||||
|
||||
self.add_analysis_payload_to_buffer(analysis_payload)
|
||||
|
||||
if analysis_payload["data"] or last:
|
||||
self.upload_aggregated_items(object_descriptor)
|
||||
self.response_files.append(object_descriptor["object_name"])
|
||||
|
||||
return self.build_response_message(storage_upload_info) if last else Nothing
|
||||
|
||||
def add_analysis_payload_to_buffer(self, analysis_payload):
|
||||
self.buffer.append({**analysis_payload, "metadata": omit(analysis_payload["metadata"], ["id"])})
|
||||
|
||||
def upload_aggregated_items(self, object_descriptor):
|
||||
items_to_upload = self.upload_formatter(self.merge_queue_items())
|
||||
self.upload_item(items_to_upload, object_descriptor)
|
||||
|
||||
def build_response_message(self, storage_upload_info):
|
||||
|
||||
response_files = [*self.response_files]
|
||||
self.response_files.clear()
|
||||
|
||||
return {**storage_upload_info, "response_files": response_files}
|
||||
|
||||
def upload_item(self, analysis_payload, object_descriptor):
|
||||
self.storage.put_object(**object_descriptor, data=pack_analysis_payload(analysis_payload))
|
||||
|
||||
def merge_queue_items(self):
|
||||
merged_buffer_content = self.merger(self.buffer)
|
||||
self.buffer.clear()
|
||||
return merged_buffer_content
|
||||
6
pyinfra/visitor/strategies/response/forwarding.py
Normal file
6
pyinfra/visitor/strategies/response/forwarding.py
Normal file
@ -0,0 +1,6 @@
|
||||
from pyinfra.visitor.strategies.response.response import ResponseStrategy
|
||||
|
||||
|
||||
class ForwardingStrategy(ResponseStrategy):
|
||||
def handle_response(self, analysis_response):
|
||||
return analysis_response
|
||||
10
pyinfra/visitor/strategies/response/response.py
Normal file
10
pyinfra/visitor/strategies/response/response.py
Normal file
@ -0,0 +1,10 @@
|
||||
import abc
|
||||
|
||||
|
||||
class ResponseStrategy(abc.ABC):
|
||||
@abc.abstractmethod
|
||||
def handle_response(self, analysis_response: dict):
|
||||
pass
|
||||
|
||||
def __call__(self, analysis_response: dict):
|
||||
return self.handle_response(analysis_response)
|
||||
33
pyinfra/visitor/strategies/response/storage.py
Normal file
33
pyinfra/visitor/strategies/response/storage.py
Normal file
@ -0,0 +1,33 @@
|
||||
import json
|
||||
from operator import itemgetter
|
||||
|
||||
from pyinfra.config import parse_disjunction_string, CONFIG
|
||||
from pyinfra.utils.encoding import compress
|
||||
from pyinfra.visitor.strategies.response.response import ResponseStrategy
|
||||
|
||||
|
||||
class StorageStrategy(ResponseStrategy):
|
||||
def __init__(self, storage, response_file_extension="out"):
|
||||
self.storage = storage
|
||||
self.response_file_extension = response_file_extension
|
||||
|
||||
def handle_response(self, body: dict):
|
||||
response_object_descriptor = self.get_response_object_descriptor(body)
|
||||
self.storage.put_object(**response_object_descriptor, data=compress(json.dumps(body).encode()))
|
||||
body.pop("analysis_payloads")
|
||||
body["response_files"] = [response_object_descriptor["object_name"]]
|
||||
return body
|
||||
|
||||
def get_response_object_descriptor(self, body):
|
||||
return {
|
||||
"bucket_name": parse_disjunction_string(CONFIG.storage.bucket),
|
||||
"object_name": self.get_response_object_name(body),
|
||||
}
|
||||
|
||||
def get_response_object_name(self, body):
|
||||
|
||||
dossier_id, file_id = itemgetter("dossierId", "fileId")(body)
|
||||
|
||||
object_name = f"{dossier_id}/{file_id}/.{self.response_file_extension}"
|
||||
|
||||
return object_name
|
||||
51
pyinfra/visitor/utils.py
Normal file
51
pyinfra/visitor/utils.py
Normal file
@ -0,0 +1,51 @@
|
||||
import logging
|
||||
from typing import Dict
|
||||
|
||||
from pyinfra.exceptions import InvalidStorageItemFormat
|
||||
from pyinfra.server.packing import string_to_bytes
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
|
||||
def build_file_path(storage_upload_info, folder):
|
||||
return f"{storage_upload_info['fileId']}" + (f"/{folder}" if folder else "")
|
||||
|
||||
|
||||
def standardize(storage_item) -> Dict:
|
||||
"""Storage items can be a blob or a blob with metadata. Standardizes to the latter.
|
||||
|
||||
Cases:
|
||||
1) backend upload: data as bytes
|
||||
2) Some Python service's upload: data as bytes of a json string "{'data': <str>, 'metadata': <dict>}",
|
||||
where value of key 'data' was encoded with bytes_to_string(...)
|
||||
|
||||
Returns:
|
||||
{"data": bytes, "metadata": dict}
|
||||
"""
|
||||
|
||||
def is_blob_without_metadata(storage_item):
|
||||
return isinstance(storage_item, bytes)
|
||||
|
||||
def is_blob_with_metadata(storage_item: Dict):
|
||||
return isinstance(storage_item, dict)
|
||||
|
||||
if is_blob_without_metadata(storage_item):
|
||||
return wrap(storage_item)
|
||||
|
||||
elif is_blob_with_metadata(storage_item):
|
||||
validate(storage_item)
|
||||
return storage_item
|
||||
|
||||
else: # Fallback / used for testing with simple data
|
||||
logger.warning("Encountered storage data in unexpected format.")
|
||||
assert isinstance(storage_item, str)
|
||||
return wrap(string_to_bytes(storage_item))
|
||||
|
||||
|
||||
def wrap(data):
|
||||
return {"data": data, "metadata": {}}
|
||||
|
||||
|
||||
def validate(storage_item):
|
||||
if not ("data" in storage_item and "metadata" in storage_item):
|
||||
raise InvalidStorageItemFormat(f"Expected a mapping with keys 'data' and 'metadata', got {storage_item}.")
|
||||
84
pyinfra/visitor/visitor.py
Normal file
84
pyinfra/visitor/visitor.py
Normal file
@ -0,0 +1,84 @@
|
||||
from typing import Callable
|
||||
|
||||
from funcy import lflatten, compose, itervalues, lfilter
|
||||
|
||||
from pyinfra.utils.func import lift
|
||||
from pyinfra.visitor.response_formatter.formatter import ResponseFormatter
|
||||
from pyinfra.visitor.response_formatter.formatters.identity import IdentityResponseFormatter
|
||||
from pyinfra.visitor.strategies.blob_parsing.blob_parsing import BlobParsingStrategy
|
||||
from pyinfra.visitor.strategies.blob_parsing.dynamic import DynamicParsingStrategy
|
||||
from pyinfra.visitor.strategies.response.response import ResponseStrategy
|
||||
from pyinfra.visitor.utils import standardize
|
||||
|
||||
|
||||
class QueueVisitor:
|
||||
def __init__(
|
||||
self,
|
||||
callback: Callable,
|
||||
data_loader: Callable,
|
||||
response_strategy: ResponseStrategy,
|
||||
parsing_strategy: BlobParsingStrategy = None,
|
||||
response_formatter: ResponseFormatter = None,
|
||||
):
|
||||
"""Processes queue messages with a given callback.
|
||||
|
||||
Args:
|
||||
callback: callback to apply
|
||||
data_loader: loads data specified in message and passes to callback
|
||||
parsing_strategy: behaviour for interpreting loaded items
|
||||
response_strategy: behaviour for response production
|
||||
|
||||
TODO: merge all dependencies into a single pipeline like: getter -> parser -> processor -> formatter -> putter
|
||||
|
||||
Returns:
|
||||
depends on response strategy
|
||||
"""
|
||||
self.callback = callback
|
||||
self.data_loader = data_loader
|
||||
self.response_strategy = response_strategy
|
||||
self.parsing_strategy = parsing_strategy or DynamicParsingStrategy()
|
||||
self.response_formatter = response_formatter or IdentityResponseFormatter()
|
||||
|
||||
def __call__(self, queue_item_body):
|
||||
analysis_response = compose(
|
||||
self.response_formatter,
|
||||
self.response_strategy,
|
||||
self.load_items_from_storage_and_process_with_callback,
|
||||
)(queue_item_body)
|
||||
|
||||
return analysis_response
|
||||
|
||||
def load_items_from_storage_and_process_with_callback(self, queue_item_body):
|
||||
"""Bundles the result from processing a storage item with the body of the corresponding queue item."""
|
||||
|
||||
callback_results = compose(
|
||||
self.remove_empty_results,
|
||||
lflatten,
|
||||
lift(self.get_item_processor(queue_item_body)),
|
||||
self.load_data,
|
||||
)(queue_item_body)
|
||||
|
||||
return {"analysis_payloads": callback_results, **queue_item_body}
|
||||
|
||||
def load_data(self, queue_item_body):
|
||||
data = compose(
|
||||
lift(standardize),
|
||||
lift(self.parsing_strategy),
|
||||
self.data_loader,
|
||||
)(queue_item_body)
|
||||
|
||||
return data
|
||||
|
||||
def get_item_processor(self, queue_item_body):
|
||||
def process_storage_item(storage_item):
|
||||
analysis_input = {**storage_item, **queue_item_body}
|
||||
return self.process_storage_item(analysis_input)
|
||||
|
||||
return process_storage_item
|
||||
|
||||
@staticmethod
|
||||
def remove_empty_results(results):
|
||||
return lfilter(compose(any, itervalues), results)
|
||||
|
||||
def process_storage_item(self, data_metadata_pack):
|
||||
return self.callback(data_metadata_pack)
|
||||
@ -13,3 +13,9 @@ tqdm==4.62.3
|
||||
pytest~=7.0.1
|
||||
funcy==1.17
|
||||
fpdf==1.7.2
|
||||
PyMuPDF==1.19.6
|
||||
more-itertools==8.12.0
|
||||
numpy==1.22.3
|
||||
Pillow==9.0.1
|
||||
prometheus-client==0.13.1
|
||||
frozendict==2.3.2
|
||||
|
||||
@ -1,5 +1,4 @@
|
||||
import argparse
|
||||
import gzip
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
@ -7,6 +6,7 @@ from tqdm import tqdm
|
||||
|
||||
from pyinfra.config import CONFIG, parse_disjunction_string
|
||||
from pyinfra.storage.storages import get_s3_storage
|
||||
from pyinfra.utils.encoding import compress
|
||||
|
||||
|
||||
def parse_args():
|
||||
@ -31,7 +31,7 @@ def combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, extension)
|
||||
|
||||
|
||||
def upload_compressed_response(storage, bucket_name, dossier_id, file_id, result) -> None:
|
||||
data = gzip.compress(result.encode())
|
||||
data = compress(result.encode())
|
||||
path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, file_id, CONFIG.service.response.extension)
|
||||
storage.put_object(bucket_name, path_gz, data)
|
||||
|
||||
@ -44,7 +44,7 @@ def add_file_compressed(storage, bucket_name, dossier_id, path) -> None:
|
||||
path_gz = combine_dossier_id_and_file_id_and_extension(dossier_id, Path(path).stem, suffix_gz)
|
||||
|
||||
with open(path, "rb") as f:
|
||||
data = gzip.compress(f.read())
|
||||
data = compress(f.read())
|
||||
storage.put_object(bucket_name, path_gz, data)
|
||||
|
||||
|
||||
|
||||
@ -9,8 +9,13 @@ from pyinfra.storage.storages import get_s3_storage
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--bucket_name", "-b", required=True)
|
||||
parser.add_argument("--analysis_container", "-a", choices=["detr", "ner", "image", "dl_error"], required=True)
|
||||
parser.add_argument("--bucket_name", "-b")
|
||||
parser.add_argument(
|
||||
"--analysis_container",
|
||||
"-a",
|
||||
choices=["detr", "ner", "image", "conversion", "extraction", "dl_error", "table_parsing"],
|
||||
required=True,
|
||||
)
|
||||
args = parser.parse_args()
|
||||
return args
|
||||
|
||||
@ -43,20 +48,40 @@ def make_connection() -> pika.BlockingConnection:
|
||||
return connection
|
||||
|
||||
|
||||
def build_message_bodies(analyse_container_type, bucket_name):
|
||||
def build_message_bodies(analysis_container, bucket_name):
|
||||
def update_message(message_dict):
|
||||
if analyse_container_type == "detr" or analyse_container_type == "image":
|
||||
message_dict.update({"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "IMAGE_INFO.json.gz"})
|
||||
if analyse_container_type == "dl_error":
|
||||
if analysis_container == "detr" or analysis_container == "image":
|
||||
message_dict.update({"pages": []})
|
||||
if analysis_container == "conversion":
|
||||
message_dict.update(
|
||||
{
|
||||
"targetFileExtension": "ORIGIN.pdf.gz",
|
||||
"responseFileExtension": "json.gz",
|
||||
"operation": "conversion",
|
||||
"pages": [1, 2, 3],
|
||||
}
|
||||
)
|
||||
if analysis_container == "table_parsing":
|
||||
message_dict.update(
|
||||
{
|
||||
"operation": "table_parsing",
|
||||
"pages": [1, 2, 3],
|
||||
}
|
||||
)
|
||||
if analysis_container == "extraction":
|
||||
message_dict.update(
|
||||
{"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "json.gz", "operation": "extraction"}
|
||||
)
|
||||
if analysis_container == "dl_error":
|
||||
message_dict.update({"targetFileExtension": "no_such_file", "responseFileExtension": "IMAGE_INFO.json.gz"})
|
||||
if analyse_container_type == "ner":
|
||||
if analysis_container == "ner":
|
||||
message_dict.update(
|
||||
{"targetFileExtension": "TEXT.json.gz", "responseFileExtension": "NER_ENTITIES.json.gz"}
|
||||
)
|
||||
return message_dict
|
||||
|
||||
storage = get_s3_storage()
|
||||
for bucket_name, pdf_name in storage.get_all_object_names(bucket_name):
|
||||
for pdf_name in storage.get_all_object_names(bucket_name):
|
||||
if "pdf" not in pdf_name:
|
||||
continue
|
||||
file_id = pdf_name.split(".")[0]
|
||||
@ -72,7 +97,9 @@ def main(args):
|
||||
declare_queue(channel, CONFIG.rabbitmq.queues.input)
|
||||
declare_queue(channel, CONFIG.rabbitmq.queues.output)
|
||||
|
||||
for body in build_message_bodies(args.analysis_container, args.bucket_name):
|
||||
bucket_name = args.bucket_name or parse_disjunction_string(CONFIG.storage.bucket)
|
||||
|
||||
for body in build_message_bodies(args.analysis_container, bucket_name):
|
||||
channel.basic_publish("", CONFIG.rabbitmq.queues.input, body)
|
||||
print(f"Put {body} on {CONFIG.rabbitmq.queues.input}")
|
||||
|
||||
|
||||
37
scripts/show_compressed_json.py
Normal file
37
scripts/show_compressed_json.py
Normal file
@ -0,0 +1,37 @@
|
||||
import argparse
|
||||
import gzip
|
||||
import json
|
||||
|
||||
from funcy import lmap
|
||||
|
||||
from pyinfra.server.packing import string_to_bytes
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("compressed_json_path", help="Path to compressed JSON file")
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def interpret(parsed):
|
||||
try:
|
||||
return {**parsed, "data": str(string_to_bytes(parsed["data"]))}
|
||||
except KeyError:
|
||||
return parsed
|
||||
|
||||
|
||||
def main(fp):
|
||||
with open(fp, "rb") as f:
|
||||
compressed_json_path = f.read()
|
||||
|
||||
json_str = gzip.decompress(compressed_json_path)
|
||||
parsed = json.loads(json_str)
|
||||
parsed = [parsed] if isinstance(parsed, dict) else parsed
|
||||
parsed = lmap(interpret, parsed)
|
||||
|
||||
print(json.dumps(parsed, indent=2))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
args = parse_args()
|
||||
main(args.compressed_json_path)
|
||||
57
src/serve.py
57
src/serve.py
@ -1,42 +1,25 @@
|
||||
import logging
|
||||
from multiprocessing import Process
|
||||
|
||||
import requests
|
||||
from retry import retry
|
||||
from pyinfra.utils.retry import retry
|
||||
|
||||
from pyinfra.config import CONFIG
|
||||
from pyinfra.exceptions import AnalysisFailure, ConsumerError
|
||||
from pyinfra.component_factory import ComponentFactory
|
||||
from pyinfra.exceptions import ConsumerError
|
||||
from pyinfra.flask import run_probing_webserver, set_up_probing_webserver
|
||||
from pyinfra.queue.consumer import Consumer
|
||||
from pyinfra.queue.queue_manager.pika_queue_manager import PikaQueueManager
|
||||
from pyinfra.storage.storages import get_storage
|
||||
from pyinfra.utils.banner import show_banner
|
||||
from pyinfra.visitor import QueueVisitor, StorageStrategy
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
|
||||
def make_callback(analysis_endpoint):
|
||||
def callback(message):
|
||||
def perform_operation(operation):
|
||||
endpoint = f"{analysis_endpoint}/{operation}"
|
||||
try:
|
||||
logging.debug(f"Requesting analysis from {endpoint}...")
|
||||
analysis_response = requests.post(endpoint, data=message["data"])
|
||||
analysis_response.raise_for_status()
|
||||
analysis_response = analysis_response.json()
|
||||
logging.debug(f"Received response.")
|
||||
return analysis_response
|
||||
except Exception as err:
|
||||
logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.")
|
||||
raise AnalysisFailure() from err
|
||||
|
||||
operations = message.get("operations", ["/"])
|
||||
results = map(perform_operation, operations)
|
||||
result = dict(zip(operations, results))
|
||||
if list(result.keys()) == ["/"]:
|
||||
result = list(result.values())[0]
|
||||
return result
|
||||
|
||||
return callback
|
||||
@retry(ConsumerError)
|
||||
def consume():
|
||||
try:
|
||||
consumer = ComponentFactory(CONFIG).get_consumer()
|
||||
consumer.basic_consume_and_publish()
|
||||
except Exception as err:
|
||||
logger.exception(err)
|
||||
raise ConsumerError from err
|
||||
|
||||
|
||||
def main():
|
||||
@ -46,20 +29,6 @@ def main():
|
||||
logging.info("Starting webserver...")
|
||||
webserver.start()
|
||||
|
||||
callback = make_callback(CONFIG.rabbitmq.callback.analysis_endpoint)
|
||||
storage = get_storage(CONFIG.storage.backend)
|
||||
response_strategy = StorageStrategy(storage)
|
||||
visitor = QueueVisitor(storage, callback, response_strategy)
|
||||
|
||||
@retry(ConsumerError, tries=3, delay=5, jitter=(1, 3))
|
||||
def consume():
|
||||
try: # RED-4049 queue manager needs to be in try scope to eventually throw Exception after connection loss.
|
||||
queue_manager = PikaQueueManager(CONFIG.rabbitmq.queues.input, CONFIG.rabbitmq.queues.output)
|
||||
consumer = Consumer(visitor, queue_manager)
|
||||
consumer.basic_consume_and_publish()
|
||||
except Exception as err:
|
||||
raise ConsumerError from err
|
||||
|
||||
try:
|
||||
consume()
|
||||
except KeyboardInterrupt:
|
||||
|
||||
@ -1,3 +1,55 @@
|
||||
service:
|
||||
response_formatter: identity
|
||||
operations:
|
||||
upper:
|
||||
input:
|
||||
subdir: ""
|
||||
extension: up_in.gz
|
||||
multi: False
|
||||
output:
|
||||
subdir: ""
|
||||
extension: up_out.gz
|
||||
extract:
|
||||
input:
|
||||
subdir: ""
|
||||
extension: extr_in.gz
|
||||
multi: False
|
||||
output:
|
||||
subdir: "extractions"
|
||||
extension: gz
|
||||
rotate:
|
||||
input:
|
||||
subdir: ""
|
||||
extension: rot_in.gz
|
||||
multi: False
|
||||
output:
|
||||
subdir: ""
|
||||
extension: rot_out.gz
|
||||
classify:
|
||||
input:
|
||||
subdir: ""
|
||||
extension: cls_in.gz
|
||||
multi: True
|
||||
output:
|
||||
subdir: ""
|
||||
extension: cls_out.gz
|
||||
stream_pages:
|
||||
input:
|
||||
subdir: ""
|
||||
extension: pgs_in.gz
|
||||
multi: False
|
||||
output:
|
||||
subdir: "pages"
|
||||
extension: pgs_out.gz
|
||||
default:
|
||||
input:
|
||||
subdir: ""
|
||||
extension: IN.gz
|
||||
multi: False
|
||||
output:
|
||||
subdir: ""
|
||||
extension: OUT.gz
|
||||
|
||||
storage:
|
||||
minio:
|
||||
endpoint: "http://127.0.0.1:9000"
|
||||
@ -21,5 +73,7 @@ webserver:
|
||||
port: $SERVER_PORT|5000 # webserver port
|
||||
mode: $SERVER_MODE|production # webserver mode: {development, production}
|
||||
|
||||
mock_analysis_endpoint: "http://127.0.0.1:5000"
|
||||
|
||||
mock_analysis_endpoint: "http://127.0.0.1:5000"
|
||||
use_docker_fixture: 1
|
||||
logging: 0
|
||||
@ -1,11 +1,20 @@
|
||||
import json
|
||||
|
||||
from pyinfra.config import CONFIG as MAIN_CONFIG
|
||||
|
||||
MAIN_CONFIG["retry"]["delay"] = 0.1
|
||||
MAIN_CONFIG["retry"]["jitter"] = (0.1, 0.2)
|
||||
|
||||
from pyinfra.default_objects import get_component_factory
|
||||
from test.config import CONFIG as TEST_CONFIG
|
||||
|
||||
import logging
|
||||
import time
|
||||
from unittest.mock import Mock
|
||||
|
||||
import pika
|
||||
import pytest
|
||||
import testcontainers.compose
|
||||
from testcontainers.compose import DockerCompose
|
||||
|
||||
from pyinfra.exceptions import UnknownClient
|
||||
from pyinfra.locations import TEST_DIR, COMPOSE_PATH
|
||||
@ -16,14 +25,36 @@ from pyinfra.storage.adapters.s3 import S3StorageAdapter
|
||||
from pyinfra.storage.clients.azure import get_azure_client
|
||||
from pyinfra.storage.clients.s3 import get_s3_client
|
||||
from pyinfra.storage.storage import Storage
|
||||
from test.config import CONFIG
|
||||
from pyinfra.visitor import QueueVisitor
|
||||
from pyinfra.visitor.strategies.response.forwarding import ForwardingStrategy
|
||||
from pyinfra.visitor.strategies.response.storage import StorageStrategy
|
||||
from test.queue.queue_manager_mock import QueueManagerMock
|
||||
from test.storage.adapter_mock import StorageAdapterMock
|
||||
from test.storage.client_mock import StorageClientMock
|
||||
from pyinfra.visitor import StorageStrategy, ForwardingStrategy, QueueVisitor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
logging.basicConfig()
|
||||
logger = logging.getLogger()
|
||||
|
||||
|
||||
# TODO: refactor all fixtures into cleanly separated modules
|
||||
pytest_plugins = [
|
||||
"test.fixtures.consumer",
|
||||
"test.fixtures.input",
|
||||
"test.fixtures.pdf",
|
||||
"test.fixtures.server",
|
||||
"test.integration_tests.serve_test",
|
||||
]
|
||||
|
||||
|
||||
logging.getLogger("PIL.PngImagePlugin").setLevel(level=logging.CRITICAL + 1)
|
||||
logging.getLogger("waitress").setLevel(level=logging.CRITICAL + 1)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mute_logger():
|
||||
if not TEST_CONFIG.logging:
|
||||
logger.setLevel(logging.CRITICAL + 1)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
@ -58,10 +89,14 @@ def mock_make_load_data():
|
||||
return load_data
|
||||
|
||||
|
||||
@pytest.fixture(params=["minio", "aws"], scope="session")
|
||||
def storage(client_name, bucket_name, request):
|
||||
# def pytest_make_parametrize_id(config, val, argname):
|
||||
# return f"\n\t{argname}={val}\n"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def storage(client_name, bucket_name, s3_backend, docker_compose):
|
||||
logger.debug("Setup for storage")
|
||||
storage = Storage(get_adapter(client_name, request.param))
|
||||
storage = Storage(get_adapter(client_name, s3_backend))
|
||||
storage.make_bucket(bucket_name)
|
||||
storage.clear_bucket(bucket_name)
|
||||
yield storage
|
||||
@ -69,15 +104,23 @@ def storage(client_name, bucket_name, request):
|
||||
storage.clear_bucket(bucket_name)
|
||||
|
||||
|
||||
@pytest.fixture(params=["minio", "aws"])
|
||||
def s3_backend(request):
|
||||
return request.param
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def docker_compose(sleep_seconds=30):
|
||||
logger.info(f"Starting docker containers with {COMPOSE_PATH}/docker-compose.yml...")
|
||||
compose = testcontainers.compose.DockerCompose(COMPOSE_PATH, compose_file_name="docker-compose.yml")
|
||||
compose.start()
|
||||
logger.info(f"Sleeping for {sleep_seconds} seconds to wait for containers to finish startup... ")
|
||||
time.sleep(sleep_seconds)
|
||||
yield compose
|
||||
compose.stop()
|
||||
if TEST_CONFIG.use_docker_fixture:
|
||||
logger.info(f"Starting docker containers with {COMPOSE_PATH}/docker-compose.yml...")
|
||||
compose = DockerCompose(COMPOSE_PATH, compose_file_name="docker-compose.yml")
|
||||
compose.start()
|
||||
logger.info(f"Sleeping for {sleep_seconds} seconds to wait for containers to finish startup... ")
|
||||
time.sleep(sleep_seconds)
|
||||
yield compose
|
||||
compose.stop()
|
||||
else:
|
||||
yield None
|
||||
|
||||
|
||||
def get_pika_connection_params():
|
||||
@ -86,7 +129,7 @@ def get_pika_connection_params():
|
||||
|
||||
|
||||
def get_s3_params(s3_backend):
|
||||
params = CONFIG.storage[s3_backend]
|
||||
params = TEST_CONFIG.storage[s3_backend]
|
||||
|
||||
return params
|
||||
|
||||
@ -95,7 +138,7 @@ def get_adapter(client_name, s3_backend):
|
||||
if client_name == "mock":
|
||||
return StorageAdapterMock(StorageClientMock())
|
||||
if client_name == "azure":
|
||||
return AzureStorageAdapter(get_azure_client(CONFIG.storage.azure.connection_string))
|
||||
return AzureStorageAdapter(get_azure_client(TEST_CONFIG.storage.azure.connection_string))
|
||||
if client_name == "s3":
|
||||
return S3StorageAdapter(get_s3_client(get_s3_params(s3_backend)))
|
||||
else:
|
||||
@ -110,7 +153,7 @@ def get_queue_manager(queue_manager_name) -> QueueManager:
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def queue_manager(queue_manager_name):
|
||||
def queue_manager(queue_manager_name, docker_compose):
|
||||
def close_connections():
|
||||
if queue_manager_name == "pika":
|
||||
try:
|
||||
@ -134,7 +177,7 @@ def queue_manager(queue_manager_name):
|
||||
@pytest.fixture(scope="session")
|
||||
def callback():
|
||||
def inner(request):
|
||||
return request["data"].decode() * 2
|
||||
return [request["data"].decode() * 2]
|
||||
|
||||
return inner
|
||||
|
||||
@ -156,5 +199,22 @@ def response_strategy(response_strategy_name, storage):
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def visitor(storage, analysis_callback, response_strategy):
|
||||
return QueueVisitor(storage, analysis_callback, response_strategy)
|
||||
def visitor(storage, analysis_callback, response_strategy, component_factory):
|
||||
|
||||
return QueueVisitor(
|
||||
callback=analysis_callback,
|
||||
data_loader=component_factory.get_downloader(storage),
|
||||
response_strategy=response_strategy,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def file_descriptor_manager(component_factory):
|
||||
return component_factory.get_file_descriptor_manager()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def component_factory():
|
||||
MAIN_CONFIG["service"]["operations"] = TEST_CONFIG.service.operations
|
||||
|
||||
return get_component_factory(MAIN_CONFIG)
|
||||
12
test/exploration_tests/pickup_endpoint_test.py
Normal file
12
test/exploration_tests/pickup_endpoint_test.py
Normal file
@ -0,0 +1,12 @@
|
||||
# import pytest
|
||||
# from funcy import lmap
|
||||
#
|
||||
# from pyinfra.server.rest import process_lazily
|
||||
# from pyinfra.server.utils import unpack
|
||||
#
|
||||
#
|
||||
# @pytest.mark.parametrize("batched", [True, False])
|
||||
# @pytest.mark.parametrize("item_type", ["string", "image", "pdf"])
|
||||
# def test_pickup_endpoint(url, input_data_items, metadata, operation, targets, server_process):
|
||||
# output = lmap(unpack, process_lazily(f"{url}/submit", input_data_items, metadata))
|
||||
# assert output == targets
|
||||
32
test/exploration_tests/repeated_first_chunk.py
Normal file
32
test/exploration_tests/repeated_first_chunk.py
Normal file
@ -0,0 +1,32 @@
|
||||
from functools import partial
|
||||
|
||||
from funcy import chunks, first, compose
|
||||
|
||||
|
||||
def test_repeated_first_chunk_consumption():
|
||||
def f(chunk):
|
||||
return sum(chunk)
|
||||
|
||||
def g():
|
||||
return f(first(chunks(3, items)))
|
||||
|
||||
items = iter(range(10))
|
||||
|
||||
assert g() == 3
|
||||
assert g() == 12
|
||||
assert g() == 21
|
||||
assert g() == 9
|
||||
|
||||
|
||||
def test_repeated_first_chunk_consumption_passing():
|
||||
def f(chunk):
|
||||
return sum(chunk)
|
||||
|
||||
g = compose(f, first, partial(chunks, 3))
|
||||
|
||||
items = iter(range(10))
|
||||
|
||||
assert g(items) == 3
|
||||
assert g(items) == 12
|
||||
assert g(items) == 21
|
||||
assert g(items) == 9
|
||||
0
test/fixtures/__init__.py
vendored
Normal file
0
test/fixtures/__init__.py
vendored
Normal file
37
test/fixtures/consumer.py
vendored
Normal file
37
test/fixtures/consumer.py
vendored
Normal file
@ -0,0 +1,37 @@
|
||||
from operator import itemgetter
|
||||
from typing import Iterable
|
||||
|
||||
import pytest
|
||||
|
||||
from pyinfra.queue.consumer import Consumer
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def consumer(queue_manager, callback):
|
||||
return Consumer(callback, queue_manager)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def access_callback():
|
||||
return itemgetter("fileId")
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def items():
|
||||
numbers = [f"{i}".encode() for i in range(3)]
|
||||
return pair_data_with_queue_message(numbers)
|
||||
|
||||
|
||||
def pair_data_with_queue_message(data: Iterable[bytes]):
|
||||
def inner():
|
||||
for i, d in enumerate(data):
|
||||
body = {
|
||||
"dossierId": "dossier_id",
|
||||
"fileId": f"file_id_{i}",
|
||||
"targetFileExtension": "in.gz",
|
||||
"responseFileExtension": "out.gz",
|
||||
"pages": [0, 2, 3],
|
||||
}
|
||||
yield d, body
|
||||
|
||||
return list(inner())
|
||||
165
test/fixtures/input.py
vendored
Normal file
165
test/fixtures/input.py
vendored
Normal file
@ -0,0 +1,165 @@
|
||||
from functools import partial
|
||||
from itertools import starmap, repeat
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
from PIL import Image
|
||||
from funcy import lmap, compose, flatten, lflatten, omit, second, first, lzip, merge
|
||||
|
||||
from pyinfra.server.normalization import normalize_item
|
||||
from pyinfra.server.nothing import Nothing
|
||||
from pyinfra.server.packing import pack, unpack
|
||||
from pyinfra.utils.func import star, lift, lstarlift
|
||||
from test.utils.image import image_to_bytes
|
||||
from test.utils.pdf import pdf_stream
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def input_data_items(unencoded_input_data, input_data_encoder):
|
||||
return input_data_encoder(unencoded_input_data)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def unencoded_input_data(item_type, unencoded_strings, unencoded_images, unencoded_pdfs):
|
||||
if item_type == "string":
|
||||
return unencoded_strings
|
||||
elif item_type == "image":
|
||||
return unencoded_images
|
||||
elif item_type == "pdf":
|
||||
return unencoded_pdfs
|
||||
else:
|
||||
raise ValueError(f"Unknown item type {item_type}")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def input_data_encoder(item_type):
|
||||
if item_type == "string":
|
||||
return strings_to_bytes
|
||||
elif item_type == "image":
|
||||
return images_to_bytes
|
||||
elif item_type == "pdf":
|
||||
return pdfs_to_bytes
|
||||
else:
|
||||
raise ValueError(f"Unknown item type {item_type}")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def unencoded_pdfs(n_items, unencoded_pdf):
|
||||
return [unencoded_pdf] * n_items
|
||||
|
||||
|
||||
def pdfs_to_bytes(unencoded_pdfs):
|
||||
return [pdf_stream(pdf) for pdf in unencoded_pdfs]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def target_data_items(input_data_items, core_operation, metadata):
|
||||
|
||||
if core_operation is Nothing:
|
||||
return Nothing
|
||||
|
||||
op = compose(normalize_item, core_operation)
|
||||
expected = lflatten(starmap(op, zip(input_data_items, metadata)))
|
||||
return expected
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def unencoded_strings(n_items):
|
||||
return [f"content{i}" for i in range(n_items)]
|
||||
|
||||
|
||||
def strings_to_bytes(strings):
|
||||
return [bytes(s, encoding="utf8") for s in strings]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def targets(data_message_pairs, input_data_items, operation, metadata, server_side_test, queue_message_metadata):
|
||||
"""TODO: this has become super wonky"""
|
||||
metadata = [{**m1, **m2} for m1, m2 in zip(lmap(second, data_message_pairs), metadata)]
|
||||
|
||||
if operation is Nothing:
|
||||
return Nothing
|
||||
|
||||
op = compose(lift(star(pack)), normalize_item, operation)
|
||||
|
||||
try:
|
||||
response_data, response_metadata = zip(*map(unpack, flatten(starmap(op, zip(input_data_items, metadata)))))
|
||||
|
||||
queue_message_keys = ["id"] * (not server_side_test) + [*first(queue_message_metadata).keys()]
|
||||
response_metadata = lmap(partial(omit, keys=queue_message_keys), response_metadata)
|
||||
expected = lzip(response_data, response_metadata)
|
||||
|
||||
except ValueError:
|
||||
expected = []
|
||||
|
||||
return expected
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def endpoint(url, operation_name):
|
||||
return f"{url}/{operation_name}"
|
||||
|
||||
|
||||
@pytest.fixture(params=["rest", "basic"])
|
||||
def client_pipeline_type(request):
|
||||
return request.param
|
||||
|
||||
|
||||
@pytest.fixture(params=[1, 0, 5])
|
||||
def n_items(request):
|
||||
return request.param
|
||||
|
||||
|
||||
@pytest.fixture(params=[0, 100])
|
||||
def n_pages(request):
|
||||
return request.param
|
||||
|
||||
|
||||
@pytest.fixture(params=[1, 5])
|
||||
def buffer_size(request):
|
||||
return request.param
|
||||
|
||||
|
||||
def array_to_image(array) -> Image.Image:
|
||||
return Image.fromarray(np.uint8(array * 255), mode="RGB")
|
||||
|
||||
|
||||
def input_batch(n_items):
|
||||
return np.random.random_sample(size=(n_items, 3, 30, 30))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def unencoded_images(n_items):
|
||||
return lmap(array_to_image, input_batch(n_items))
|
||||
|
||||
|
||||
def images_to_bytes(images):
|
||||
return lmap(image_to_bytes, images)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def metadata(n_items, many_to_n):
|
||||
"""storage metadata
|
||||
TODO: rename
|
||||
"""
|
||||
return list(repeat({"key": "value"}, times=n_items))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def queue_message_metadata(n_items, operation_name):
|
||||
def metadata(i):
|
||||
return merge(
|
||||
{
|
||||
"dossierId": "dossier_id",
|
||||
"fileId": f"file_id_{i}",
|
||||
},
|
||||
({"operation": operation_name} if operation_name else {}),
|
||||
({"pages": [0, 2, 3]} if n_items > 1 else {}),
|
||||
)
|
||||
|
||||
return lmap(metadata, range(n_items))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def packages(input_data_items, metadata):
|
||||
return lstarlift(pack)(zip(input_data_items, metadata))
|
||||
11
test/fixtures/pdf.py
vendored
Normal file
11
test/fixtures/pdf.py
vendored
Normal file
@ -0,0 +1,11 @@
|
||||
import fpdf
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def unencoded_pdf(n_pages):
|
||||
pdf = fpdf.FPDF(unit="pt")
|
||||
for _ in range(n_pages):
|
||||
pdf.add_page()
|
||||
|
||||
return pdf
|
||||
Some files were not shown because too many files have changed in this diff Show More
Loading…
x
Reference in New Issue
Block a user