Julius Unverfehrt a1bfec765c Pull request #43: Image prediction v2 support
Merge in RR/pyinfra from image-prediction-v2-support to 2.0.0

Squashed commit of the following:

commit 37c536324e847357e86dd9b72d1e07ad792ed90f
Merge: 77d1db8 01bfb1d
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Mon Jul 11 13:53:56 2022 +0200

    Merge branch '2.0.0' of ssh://git.iqser.com:2222/rr/pyinfra into image-prediction-v2-support

commit 77d1db8e8630de8822c124eb39f4cd817ed1d3e1
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Mon Jul 11 13:07:41 2022 +0200

    add operation assignment via config if operation is not defined by caller

commit 36c8ca48a8c6151f713c093a23de110901ba6b02
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Mon Jul 11 10:33:34 2022 +0200

    refactor nothing part 2

commit f6cd0ef986802554dd544b9b7a24073d3b3f05b5
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Mon Jul 11 10:28:49 2022 +0200

    refactor nothing

commit 1e70d49531e89613c70903be49290b94ee014f65
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jul 6 17:42:12 2022 +0200

    enable docker-compose fixture

commit 9fee32cecdd120cfac3e065fb8ad2b4f37b49226
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jul 6 17:40:35 2022 +0200

    added 'multi' key to actual operation configurations

commit 4287f6d9878dd361489b8490eafd06f81df472ce
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jul 6 16:56:12 2022 +0200

    removed debug prints

commit 23a533e8f99222c7e598fb0864f65e9aa3508a3b
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jul 6 16:31:50 2022 +0200

    completed correcting / cleaning upload and download logic with regard to operations and ids. next: remove debug code

commit 33246d1ff94989d2ea70242c7ae2e58afa4d35c1
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jul 6 14:37:17 2022 +0200

    corrected / cleaned upload and download logic with regard to operations and ids

commit 7f2b4e882022c6843cb2f80df202caa495c54ee9
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Tue Jul 5 18:41:07 2022 +0200

    partially decomplected file descriptor manager from concrete and non-generic descriptor code

commit 40b892da17670dae3b8eba1700877c1dcf219852
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Tue Jul 5 09:53:46 2022 +0200

    typo

commit ec4fa8e6f4551ff1f8d4f78c484b7a260f274898
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Tue Jul 5 09:52:41 2022 +0200

    typo

commit 701b43403c328161fd96a73ce388a66035cca348
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jul 4 17:26:53 2022 +0200

    made adjustments for image classification with pyinfra 2.x; added related fixmes

commit 7a794bdcc987631cdc4d89b5620359464e2e018e
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jul 4 13:05:26 2022 +0200

    removed obsolete imports

commit 3fc6a7ef5d0172dbce1c4292d245eced2f378b5a
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jul 4 11:47:12 2022 +0200

    enable docker-compose fixture

commit 36d8d3bc851b06d94cf12a73048a00a67ef79c42
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Mon Jul 4 11:46:53 2022 +0200

    renaming

commit 3bf00d11cd041dff325b66f13fcd00d3ce96b8b5
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Thu Jun 30 12:47:57 2022 +0200

    refactoring: added cached pipeline factory

commit 90e735852af2f86e35be845fabf28494de952edb
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jun 29 13:47:08 2022 +0200

    renaming

commit 93b3d4b202b41183ed8cabe193a4bfa03f520787
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jun 29 13:25:03 2022 +0200

    further refactored server setup code: moving and decomplecting

commit 8b2ed83c7ade5bd811cb045d56fbfb0353fa385e
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Wed Jun 29 12:53:09 2022 +0200

    refactored server setup code: factored out and decoupled operation registry and prometheus summary registry

... and 6 more commits
2022-07-11 14:17:59 +02:00

220 lines
5.5 KiB
Python

import io
import logging
import socket
from collections import Counter
from multiprocessing import Process
from operator import itemgetter
from typing import Generator
import fitz
import pytest
import requests
from PIL import Image
from funcy import retry, project, omit
from waitress import serve
from pyinfra.server.nothing import Nothing
from pyinfra.server.server import (
set_up_processing_server,
)
from pyinfra.server.utils import make_streamable_and_wrap_in_packing_logic
from pyinfra.utils.func import starlift
from test.utils.image import image_to_bytes
logger = logging.getLogger(__name__)
@pytest.fixture
def host():
return "0.0.0.0"
def get_free_port(host):
sock = socket.socket()
sock.bind((host, 0))
return sock.getsockname()[1]
@pytest.fixture
def port(host):
return get_free_port(host)
@pytest.fixture
def url(host, port):
return f"http://{host}:{port}"
@pytest.fixture
def server(server_stream_function, buffer_size, operation_name):
return set_up_processing_server({operation_name: server_stream_function}, buffer_size)
@pytest.fixture
def operation_name(core_operation):
operation_name = core_operation.__name__
return operation_name
@pytest.fixture
def server_stream_function(operation_conditionally_batched, batched):
return make_streamable_and_wrap_in_packing_logic(operation_conditionally_batched, batched)
@pytest.fixture
def operation_conditionally_batched(operation, batched):
return starlift(operation) if batched else operation
@pytest.fixture
def operation(core_operation, server_side_test):
auto_counter = Counter()
def auto_count(metadata):
if not server_side_test:
idnt = itemgetter("dossierId", "fileId")(metadata)
auto_counter[idnt] += 1
return {**metadata, "id": auto_counter[idnt]} if "id" not in metadata else metadata
else:
return metadata
def op(data, metadata):
assert isinstance(metadata, dict)
result = core_operation(data, metadata)
if isinstance(result, Generator):
for data, metadata in result:
yield data, auto_count(omit(metadata, ["pages", "operation"]))
else:
data, metadata = result
yield data, auto_count(omit(metadata, ["pages", "operation"]))
if core_operation is Nothing:
return Nothing
return op
@pytest.fixture(params=[False])
def server_side_test(request):
return request.param
@pytest.fixture
def core_operation(many_to_n, item_type, one_to_many, analysis_task):
def duplicate(string: bytes, metadata):
for _ in range(2):
yield upper(string, metadata), metadata
def upper(string: bytes, metadata):
r = string.decode().upper().encode(), metadata
return r
def extract(string: bytes, metadata):
for i, c in project(
dict(enumerate(string.decode())), metadata.get("pages", range(len(string.decode())))
).items():
metadata["id"] = i
yield c.encode(), metadata
def rotate(im: bytes, metadata):
im = Image.open(io.BytesIO(im))
return image_to_bytes(im.rotate(90)), metadata
def classify(_: bytes, metadata):
return b"", {"classification": 1, **metadata}
def stream_pages(pdf: bytes, metadata):
for i, page in enumerate(fitz.open(stream=pdf)):
# yield page.get_pixmap().tobytes("png"), metadata
metadata["id"] = i
yield f"page_{i}".encode(), metadata
params2op = {
True: {
True: {},
False: {
"image": {True: classify},
},
},
False: {
True: {
"string": {False: extract},
"pdf": {False: stream_pages},
},
False: {
"string": {False: upper},
"image": {False: rotate},
},
},
# False: {
# "string": {False: upper},
# "image": {False: rotate, True: classify},
# },
# True: {
# "string": {False: extract},
# "pdf": {False: stream_pages},
# },
}
try:
return params2op[many_to_n][one_to_many][item_type][analysis_task]
except KeyError:
msg = f"No operation defined for [{one_to_many=}, {item_type=}, {analysis_task=}]."
pytest.skip(msg)
logger.debug(msg)
return Nothing
@pytest.fixture(params=["pdf", "string", "image"])
def item_type(request):
return request.param
@pytest.fixture(params=[True, False])
def one_to_many(request):
return request.param
@pytest.fixture(params=[True, False])
def many_to_n(request):
return request.param
@pytest.fixture(params=[True, False])
def analysis_task(request):
return request.param
@pytest.fixture(params=[False, True])
def batched(request):
"""Controls, whether the buffer processor function of the webserver is applied to batches or single items."""
return request.param
@pytest.fixture
def host_and_port(host, port):
return {"host": host, "port": port}
@retry(tries=5, timeout=1)
def server_ready(url):
response = requests.get(f"{url}/ready")
response.raise_for_status()
return response.status_code == 200
@pytest.fixture(autouse=False, scope="function")
def server_process(server, host_and_port, url):
def get_server_process():
return Process(target=serve, kwargs={"app": server, **host_and_port})
server = get_server_process()
server.start()
if server_ready(url):
yield
server.kill()
server.join()
server.close()