pyinfra/test/fixtures/server.py
2022-04-27 18:45:14 +02:00

137 lines
3.1 KiB
Python

import io
import json
import socket
from collections import deque
from multiprocessing import Process
from operator import itemgetter
import fitz
import flask
import pytest
import requests
from PIL import Image
from funcy import retry, compose, repeatedly, lmap
from waitress import serve
from test.server import set_up_processing_server
from test.utils.image import image_to_bytes
from test.utils.server import bytes_to_string, string_to_bytes
@pytest.fixture
def host():
return "0.0.0.0"
def get_free_port(host):
sock = socket.socket()
sock.bind((host, 0))
return sock.getsockname()[1]
@pytest.fixture
def port(host):
return get_free_port(host)
@pytest.fixture
def url(host, port):
return f"http://{host}:{port}"
@pytest.fixture
def server(processor_fn):
return set_up_processing_server(processor_fn)
@pytest.fixture
def processor_fn(operation, buffer_size):
return make_processor(operation, buffer_size=buffer_size)
@pytest.fixture
def operation(item_type):
def rotate(im: bytes):
im = Image.open(io.BytesIO(im))
return image_to_bytes(im.rotate(90))
def stream_pages(pdf: bytes):
for page in fitz.open(stream=pdf):
yield page.get_pixmap().tobytes("png")
if item_type == "string":
return lambda s: s.decode().upper().encode()
elif item_type == "image":
return rotate
elif item_type == "pdf":
return stream_pages
else:
raise ValueError(f"No operation specified for item type {item_type}")
@pytest.fixture(params=["string"])
def item_type(request):
return request.param
@pytest.fixture(params=[1, 3, 7, 100])
def buffer_size(request):
return request.param
def make_processor(operation, buffer_size=3):
def processor_fn(request: flask.Request):
data = get_data_from_request(request)
items = repeatedly(buffer.popleft, len(buffer)) if consume_buffer_now(buffer, data) else []
result = lmap(compose(bytes_to_string, operation), items)
buffer.append(data)
response_payload = {"data": result}
return response_payload
def consume_buffer_now(buffer, data):
current_buffer_size = len(buffer)
assert current_buffer_size <= buffer_size
buffer_is_full = current_buffer_size >= buffer_size
last_request_has_been_sent = not data
return buffer_is_full or last_request_has_been_sent
get_data_from_request = compose(string_to_bytes, itemgetter("data"), json.loads, lambda p: p.data.decode())
buffer = deque()
return processor_fn
@pytest.fixture
def host_and_port(host, port):
return {"host": host, "port": port}
@retry(tries=5, timeout=1)
def server_ready(url):
response = requests.get(f"{url}/ready")
response.raise_for_status()
return response.status_code == 200
@pytest.fixture(autouse=False, scope="function")
def server_process(server, host_and_port, url):
def get_server_process():
return Process(target=serve, kwargs={"app": server, **host_and_port})
server = get_server_process()
server.start()
if server_ready(url):
yield
server.kill()
server.join()
server.close()