pyinfra/test/fixtures/server.py
2022-04-27 13:41:48 +02:00

99 lines
2.0 KiB
Python

import json
import socket
from multiprocessing import Process
from operator import itemgetter
import flask
import pytest
import requests
from funcy import retry, compose
from waitress import serve
from test.server import set_up_processing_server
from test.utils.server import bytes_to_string, string_to_bytes
@pytest.fixture
def host():
return "0.0.0.0"
def get_free_port(host):
sock = socket.socket()
sock.bind((host, 0))
return sock.getsockname()[1]
@pytest.fixture
def port(host):
return get_free_port(host)
@pytest.fixture
def url(host, port):
return f"http://{host}:{port}"
@pytest.fixture
def server(processor_fn):
return set_up_processing_server(processor_fn)
@pytest.fixture
def processor_fn(item_type, data_items):
if item_type == "string":
return make_string_processor()
def make_string_processor():
def processor_fn(request: flask.Request, buffer_size=3):
data = get_data_from_request(request)
if not data or len(buffer) == buffer_size:
result = [bytes_to_string(itm.decode().upper().encode()) for itm in buffer]
buffer.clear()
else:
result = []
if data:
buffer.append(data)
response_payload = {"data": result}
return response_payload
get_data_from_request = compose(string_to_bytes, itemgetter("data"), json.loads, lambda p: p.data.decode())
buffer = []
return processor_fn
@pytest.fixture
def host_and_port(host, port):
return {"host": host, "port": port}
@retry(tries=5, timeout=1)
def server_ready(url):
response = requests.get(f"{url}/ready")
response.raise_for_status()
return response.status_code == 200
@pytest.fixture(autouse=True, scope="function")
def server_process(server, host_and_port, url):
def get_server_process():
return Process(target=serve, kwargs={"app": server, **host_and_port})
server = get_server_process()
server.start()
if server_ready(url):
yield
server.kill()
server.join()
server.close()