import json import socket from collections import deque from multiprocessing import Process from operator import itemgetter import flask import pytest import requests from funcy import retry, compose, repeatedly, lmap from waitress import serve from test.server import set_up_processing_server from test.utils.server import bytes_to_string, string_to_bytes @pytest.fixture def host(): return "0.0.0.0" def get_free_port(host): sock = socket.socket() sock.bind((host, 0)) return sock.getsockname()[1] @pytest.fixture def port(host): return get_free_port(host) @pytest.fixture def url(host, port): return f"http://{host}:{port}" @pytest.fixture def server(processor_fn): return set_up_processing_server(processor_fn) @pytest.fixture def processor_fn(item_type, data_items): if item_type == "string": return make_processor(operation=lambda x: x.decode().upper().encode()) def make_processor(operation, buffer_size=3): def processor_fn(request: flask.Request): data = get_data_from_request(request) items = repeatedly(buffer.pop, len(buffer)) if consume_buffer_now(buffer, data) else [] result = lmap(compose(bytes_to_string, operation), items) buffer.append(data) response_payload = {"data": result} return response_payload def consume_buffer_now(buffer, data): buffer_is_full = len(buffer) == buffer_size last_request_has_been_sent = not data return buffer_is_full or last_request_has_been_sent get_data_from_request = compose(string_to_bytes, itemgetter("data"), json.loads, lambda p: p.data.decode()) buffer = deque() return processor_fn @pytest.fixture def host_and_port(host, port): return {"host": host, "port": port} @retry(tries=5, timeout=1) def server_ready(url): response = requests.get(f"{url}/ready") response.raise_for_status() return response.status_code == 200 @pytest.fixture(autouse=True, scope="function") def server_process(server, host_and_port, url): def get_server_process(): return Process(target=serve, kwargs={"app": server, **host_and_port}) server = get_server_process() server.start() if server_ready(url): yield server.kill() server.join() server.close()