pyinfra/test/fixtures/server.py
Matthias Bisping e58addf8c4 refactoring
2022-04-27 14:09:18 +02:00

101 lines
2.3 KiB
Python

import json
import socket
from collections import deque
from multiprocessing import Process
from operator import itemgetter
import flask
import pytest
import requests
from funcy import retry, compose, repeatedly, lmap
from waitress import serve
from test.server import set_up_processing_server
from test.utils.server import bytes_to_string, string_to_bytes
@pytest.fixture
def host():
return "0.0.0.0"
def get_free_port(host):
sock = socket.socket()
sock.bind((host, 0))
return sock.getsockname()[1]
@pytest.fixture
def port(host):
return get_free_port(host)
@pytest.fixture
def url(host, port):
return f"http://{host}:{port}"
@pytest.fixture
def server(processor_fn):
return set_up_processing_server(processor_fn)
@pytest.fixture
def processor_fn(item_type, data_items):
if item_type == "string":
return make_processor(operation=lambda x: x.decode().upper().encode())
def make_processor(operation, buffer_size=3):
def processor_fn(request: flask.Request):
data = get_data_from_request(request)
items = repeatedly(buffer.pop, len(buffer)) if consume_buffer_now(buffer, data) else []
result = lmap(compose(bytes_to_string, operation), items)
buffer.append(data)
response_payload = {"data": result}
return response_payload
def consume_buffer_now(buffer, data):
buffer_is_full = len(buffer) == buffer_size
last_request_has_been_sent = not data
return buffer_is_full or last_request_has_been_sent
get_data_from_request = compose(string_to_bytes, itemgetter("data"), json.loads, lambda p: p.data.decode())
buffer = deque()
return processor_fn
@pytest.fixture
def host_and_port(host, port):
return {"host": host, "port": port}
@retry(tries=5, timeout=1)
def server_ready(url):
response = requests.get(f"{url}/ready")
response.raise_for_status()
return response.status_code == 200
@pytest.fixture(autouse=True, scope="function")
def server_process(server, host_and_port, url):
def get_server_process():
return Process(target=serve, kwargs={"app": server, **host_and_port})
server = get_server_process()
server.start()
if server_ready(url):
yield
server.kill()
server.join()
server.close()