pyinfra/test/fixtures/server.py
2022-04-26 16:15:54 +02:00

83 lines
1.7 KiB
Python

import socket
from multiprocessing import Process
import pytest
import requests
from funcy import retry
from waitress import serve
from test.server import set_up_processing_server
@pytest.fixture
def host():
return "0.0.0.0"
def get_free_port(host):
sock = socket.socket()
sock.bind((host, 0))
return sock.getsockname()[1]
@pytest.fixture
def port(host):
return get_free_port(host)
@pytest.fixture
def url(host, port):
return f"http://{host}:{port}"
@pytest.fixture
def server(processor_fn):
return set_up_processing_server(processor_fn)
@pytest.fixture
def processor_fn(item_type, data_items):
if item_type == "string":
return make_string_processor(data_items)
def make_string_processor(data_items):
def processor_fn(payload):
print(111111111111111111111111111111111111)
print(payload)
# def parse(package)
# payload = json.load(payload)
# data = string_to_bytes(payload["data"])
# # response_payload = {"metadata_type": str(type(metadata)), "data_type": str(type(data))}
# return response_payload
return processor_fn
@pytest.fixture
def host_and_port(host, port):
return {"host": host, "port": port}
@retry(tries=5, timeout=1)
def server_ready(url):
response = requests.get(f"{url}/ready")
response.raise_for_status()
return response.status_code == 200
@pytest.fixture(autouse=True, scope="function")
def server_process(server, host_and_port, url):
def get_server_process():
return Process(target=serve, kwargs={"app": server, **host_and_port})
server = get_server_process()
server.start()
if server_ready(url):
yield
server.kill()
server.join()
server.close()