import json import socket from multiprocessing import Process import flask import pytest import requests from funcy import retry from waitress import serve from test.server import set_up_processing_server from test.utils.server import bytes_to_string, string_to_bytes @pytest.fixture def host(): return "0.0.0.0" def get_free_port(host): sock = socket.socket() sock.bind((host, 0)) return sock.getsockname()[1] @pytest.fixture def port(host): return get_free_port(host) @pytest.fixture def url(host, port): return f"http://{host}:{port}" @pytest.fixture def server(processor_fn): return set_up_processing_server(processor_fn) @pytest.fixture def processor_fn(item_type, data_items): if item_type == "string": return make_string_processor() def make_string_processor(): def processor_fn(payload: flask.Request, buffersize=3): payload = json.loads(payload.data.decode()) data = string_to_bytes(payload["data"]) if not data or len(buffer) == buffersize: result = [bytes_to_string(itm.decode().upper().encode()) for itm in buffer] buffer.clear() else: result = [] if data: buffer.append(data) response_payload = {"data": result} return response_payload buffer = [] return processor_fn @pytest.fixture def host_and_port(host, port): return {"host": host, "port": port} @retry(tries=5, timeout=1) def server_ready(url): response = requests.get(f"{url}/ready") response.raise_for_status() return response.status_code == 200 @pytest.fixture(autouse=True, scope="function") def server_process(server, host_and_port, url): def get_server_process(): return Process(target=serve, kwargs={"app": server, **host_and_port}) server = get_server_process() server.start() if server_ready(url): yield server.kill() server.join() server.close()