25 lines
660 B
Python
25 lines
660 B
Python
import json
|
|
from itertools import repeat, starmap
|
|
|
|
import pytest
|
|
import requests
|
|
|
|
from test.utils.server import bytes_to_string, string_to_bytes
|
|
|
|
|
|
@pytest.mark.parametrize("item_type", ["string"])
|
|
def test_sending_partial_request(url, data_items, metadata):
|
|
def pack(metadata: dict, data: bytes):
|
|
package = {"data": bytes_to_string(data), "metadata": metadata}
|
|
package = json.dumps(package).encode()
|
|
return package
|
|
|
|
packages = starmap(pack, zip(repeat(metadata), data_items))
|
|
|
|
requests.post(f"{url}/process", data=packages, stream=True)
|
|
|
|
|
|
@pytest.fixture
|
|
def metadata():
|
|
return {"idx": [1, 100, 101], "path": "asd/asd"}
|