Merge in RR/pyinfra from image-prediction-v2-support to 2.0.0
Squashed commit of the following:
commit 37c536324e847357e86dd9b72d1e07ad792ed90f
Merge: 77d1db8 01bfb1d
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Mon Jul 11 13:53:56 2022 +0200
Merge branch '2.0.0' of ssh://git.iqser.com:2222/rr/pyinfra into image-prediction-v2-support
commit 77d1db8e8630de8822c124eb39f4cd817ed1d3e1
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Mon Jul 11 13:07:41 2022 +0200
add operation assignment via config if operation is not defined by caller
commit 36c8ca48a8c6151f713c093a23de110901ba6b02
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Mon Jul 11 10:33:34 2022 +0200
refactor nothing part 2
commit f6cd0ef986802554dd544b9b7a24073d3b3f05b5
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Mon Jul 11 10:28:49 2022 +0200
refactor nothing
commit 1e70d49531e89613c70903be49290b94ee014f65
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Jul 6 17:42:12 2022 +0200
enable docker-compose fixture
commit 9fee32cecdd120cfac3e065fb8ad2b4f37b49226
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Jul 6 17:40:35 2022 +0200
added 'multi' key to actual operation configurations
commit 4287f6d9878dd361489b8490eafd06f81df472ce
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Jul 6 16:56:12 2022 +0200
removed debug prints
commit 23a533e8f99222c7e598fb0864f65e9aa3508a3b
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Jul 6 16:31:50 2022 +0200
completed correcting / cleaning upload and download logic with regard to operations and ids. next: remove debug code
commit 33246d1ff94989d2ea70242c7ae2e58afa4d35c1
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Jul 6 14:37:17 2022 +0200
corrected / cleaned upload and download logic with regard to operations and ids
commit 7f2b4e882022c6843cb2f80df202caa495c54ee9
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Tue Jul 5 18:41:07 2022 +0200
partially decomplected file descriptor manager from concrete and non-generic descriptor code
commit 40b892da17670dae3b8eba1700877c1dcf219852
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Tue Jul 5 09:53:46 2022 +0200
typo
commit ec4fa8e6f4551ff1f8d4f78c484b7a260f274898
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Tue Jul 5 09:52:41 2022 +0200
typo
commit 701b43403c328161fd96a73ce388a66035cca348
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Mon Jul 4 17:26:53 2022 +0200
made adjustments for image classification with pyinfra 2.x; added related fixmes
commit 7a794bdcc987631cdc4d89b5620359464e2e018e
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Mon Jul 4 13:05:26 2022 +0200
removed obsolete imports
commit 3fc6a7ef5d0172dbce1c4292d245eced2f378b5a
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Mon Jul 4 11:47:12 2022 +0200
enable docker-compose fixture
commit 36d8d3bc851b06d94cf12a73048a00a67ef79c42
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Mon Jul 4 11:46:53 2022 +0200
renaming
commit 3bf00d11cd041dff325b66f13fcd00d3ce96b8b5
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Thu Jun 30 12:47:57 2022 +0200
refactoring: added cached pipeline factory
commit 90e735852af2f86e35be845fabf28494de952edb
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Jun 29 13:47:08 2022 +0200
renaming
commit 93b3d4b202b41183ed8cabe193a4bfa03f520787
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Jun 29 13:25:03 2022 +0200
further refactored server setup code: moving and decomplecting
commit 8b2ed83c7ade5bd811cb045d56fbfb0353fa385e
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Jun 29 12:53:09 2022 +0200
refactored server setup code: factored out and decoupled operation registry and prometheus summary registry
... and 6 more commits
220 lines
5.5 KiB
Python
220 lines
5.5 KiB
Python
import io
|
|
import logging
|
|
import socket
|
|
from collections import Counter
|
|
from multiprocessing import Process
|
|
from operator import itemgetter
|
|
from typing import Generator
|
|
|
|
import fitz
|
|
import pytest
|
|
import requests
|
|
from PIL import Image
|
|
from funcy import retry, project, omit
|
|
from waitress import serve
|
|
|
|
from pyinfra.server.nothing import Nothing
|
|
from pyinfra.server.server import (
|
|
set_up_processing_server,
|
|
)
|
|
from pyinfra.server.utils import make_streamable_and_wrap_in_packing_logic
|
|
from pyinfra.utils.func import starlift
|
|
from test.utils.image import image_to_bytes
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@pytest.fixture
|
|
def host():
|
|
return "0.0.0.0"
|
|
|
|
|
|
def get_free_port(host):
|
|
sock = socket.socket()
|
|
sock.bind((host, 0))
|
|
return sock.getsockname()[1]
|
|
|
|
|
|
@pytest.fixture
|
|
def port(host):
|
|
return get_free_port(host)
|
|
|
|
|
|
@pytest.fixture
|
|
def url(host, port):
|
|
return f"http://{host}:{port}"
|
|
|
|
|
|
@pytest.fixture
|
|
def server(server_stream_function, buffer_size, operation_name):
|
|
return set_up_processing_server({operation_name: server_stream_function}, buffer_size)
|
|
|
|
|
|
@pytest.fixture
|
|
def operation_name(core_operation):
|
|
operation_name = core_operation.__name__
|
|
return operation_name
|
|
|
|
|
|
@pytest.fixture
|
|
def server_stream_function(operation_conditionally_batched, batched):
|
|
return make_streamable_and_wrap_in_packing_logic(operation_conditionally_batched, batched)
|
|
|
|
|
|
@pytest.fixture
|
|
def operation_conditionally_batched(operation, batched):
|
|
return starlift(operation) if batched else operation
|
|
|
|
|
|
@pytest.fixture
|
|
def operation(core_operation, server_side_test):
|
|
auto_counter = Counter()
|
|
|
|
def auto_count(metadata):
|
|
if not server_side_test:
|
|
idnt = itemgetter("dossierId", "fileId")(metadata)
|
|
auto_counter[idnt] += 1
|
|
return {**metadata, "id": auto_counter[idnt]} if "id" not in metadata else metadata
|
|
else:
|
|
return metadata
|
|
|
|
def op(data, metadata):
|
|
assert isinstance(metadata, dict)
|
|
result = core_operation(data, metadata)
|
|
if isinstance(result, Generator):
|
|
for data, metadata in result:
|
|
yield data, auto_count(omit(metadata, ["pages", "operation"]))
|
|
else:
|
|
data, metadata = result
|
|
yield data, auto_count(omit(metadata, ["pages", "operation"]))
|
|
|
|
if core_operation is Nothing:
|
|
return Nothing
|
|
|
|
return op
|
|
|
|
|
|
@pytest.fixture(params=[False])
|
|
def server_side_test(request):
|
|
return request.param
|
|
|
|
|
|
@pytest.fixture
|
|
def core_operation(many_to_n, item_type, one_to_many, analysis_task):
|
|
def duplicate(string: bytes, metadata):
|
|
for _ in range(2):
|
|
yield upper(string, metadata), metadata
|
|
|
|
def upper(string: bytes, metadata):
|
|
r = string.decode().upper().encode(), metadata
|
|
return r
|
|
|
|
def extract(string: bytes, metadata):
|
|
for i, c in project(
|
|
dict(enumerate(string.decode())), metadata.get("pages", range(len(string.decode())))
|
|
).items():
|
|
metadata["id"] = i
|
|
yield c.encode(), metadata
|
|
|
|
def rotate(im: bytes, metadata):
|
|
im = Image.open(io.BytesIO(im))
|
|
return image_to_bytes(im.rotate(90)), metadata
|
|
|
|
def classify(_: bytes, metadata):
|
|
return b"", {"classification": 1, **metadata}
|
|
|
|
def stream_pages(pdf: bytes, metadata):
|
|
for i, page in enumerate(fitz.open(stream=pdf)):
|
|
# yield page.get_pixmap().tobytes("png"), metadata
|
|
metadata["id"] = i
|
|
yield f"page_{i}".encode(), metadata
|
|
|
|
params2op = {
|
|
True: {
|
|
True: {},
|
|
False: {
|
|
"image": {True: classify},
|
|
},
|
|
},
|
|
False: {
|
|
True: {
|
|
"string": {False: extract},
|
|
"pdf": {False: stream_pages},
|
|
},
|
|
False: {
|
|
"string": {False: upper},
|
|
"image": {False: rotate},
|
|
},
|
|
},
|
|
# False: {
|
|
# "string": {False: upper},
|
|
# "image": {False: rotate, True: classify},
|
|
# },
|
|
# True: {
|
|
# "string": {False: extract},
|
|
# "pdf": {False: stream_pages},
|
|
# },
|
|
}
|
|
|
|
try:
|
|
return params2op[many_to_n][one_to_many][item_type][analysis_task]
|
|
except KeyError:
|
|
msg = f"No operation defined for [{one_to_many=}, {item_type=}, {analysis_task=}]."
|
|
pytest.skip(msg)
|
|
logger.debug(msg)
|
|
return Nothing
|
|
|
|
|
|
@pytest.fixture(params=["pdf", "string", "image"])
|
|
def item_type(request):
|
|
return request.param
|
|
|
|
|
|
@pytest.fixture(params=[True, False])
|
|
def one_to_many(request):
|
|
return request.param
|
|
|
|
|
|
@pytest.fixture(params=[True, False])
|
|
def many_to_n(request):
|
|
return request.param
|
|
|
|
|
|
@pytest.fixture(params=[True, False])
|
|
def analysis_task(request):
|
|
return request.param
|
|
|
|
|
|
@pytest.fixture(params=[False, True])
|
|
def batched(request):
|
|
"""Controls, whether the buffer processor function of the webserver is applied to batches or single items."""
|
|
return request.param
|
|
|
|
|
|
@pytest.fixture
|
|
def host_and_port(host, port):
|
|
return {"host": host, "port": port}
|
|
|
|
|
|
@retry(tries=5, timeout=1)
|
|
def server_ready(url):
|
|
response = requests.get(f"{url}/ready")
|
|
response.raise_for_status()
|
|
return response.status_code == 200
|
|
|
|
|
|
@pytest.fixture(autouse=False, scope="function")
|
|
def server_process(server, host_and_port, url):
|
|
def get_server_process():
|
|
return Process(target=serve, kwargs={"app": server, **host_and_port})
|
|
|
|
server = get_server_process()
|
|
server.start()
|
|
|
|
if server_ready(url):
|
|
yield
|
|
|
|
server.kill()
|
|
server.join()
|
|
server.close()
|