partial request by manual receiver buffering V1
This commit is contained in:
parent
656bc7cd63
commit
7419612c21
@ -1,8 +1,9 @@
|
||||
import json
|
||||
from itertools import repeat, starmap
|
||||
from itertools import repeat, starmap, chain
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from funcy import pluck
|
||||
|
||||
from test.utils.server import bytes_to_string, string_to_bytes
|
||||
|
||||
@ -14,9 +15,17 @@ def test_sending_partial_request(url, data_items, metadata):
|
||||
package = json.dumps(package).encode()
|
||||
return package
|
||||
|
||||
def post(package):
|
||||
final = str(0 if package else 1)
|
||||
return requests.post(f"{url}/process", data=package, headers={"final": final})
|
||||
|
||||
packages = starmap(pack, zip(repeat(metadata), data_items))
|
||||
|
||||
requests.post(f"{url}/process", data=packages, stream=True)
|
||||
responses = map(post, chain(packages, [""]))
|
||||
payloads = (json.loads(r.json()) for r in responses)
|
||||
data = map(string_to_bytes, chain.from_iterable(pluck("data", payloads)))
|
||||
|
||||
assert list(data) == [b"CONTENT"] * 7
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
||||
4
test/fixtures/input.py
vendored
4
test/fixtures/input.py
vendored
@ -1,3 +1,5 @@
|
||||
from itertools import repeat
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@ -12,4 +14,4 @@ def data(data_type, pdf):
|
||||
@pytest.fixture
|
||||
def data_items(item_type):
|
||||
if item_type == "string":
|
||||
return iter([b"content"] * 4)
|
||||
return repeat(b"content", 7)
|
||||
|
||||
40
test/fixtures/server.py
vendored
40
test/fixtures/server.py
vendored
@ -1,13 +1,15 @@
|
||||
import json
|
||||
import socket
|
||||
from multiprocessing import Process
|
||||
|
||||
import flask
|
||||
import pytest
|
||||
import requests
|
||||
from funcy import retry
|
||||
from waitress import serve
|
||||
|
||||
from test.server import set_up_processing_server
|
||||
from test.utils.server import bytes_to_string
|
||||
from test.utils.server import bytes_to_string, string_to_bytes
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@ -39,17 +41,35 @@ def server(processor_fn):
|
||||
@pytest.fixture
|
||||
def processor_fn(item_type, data_items):
|
||||
if item_type == "string":
|
||||
return make_string_processor(data_items)
|
||||
return make_string_processor()
|
||||
|
||||
|
||||
def make_string_processor(data_items):
|
||||
def processor_fn(payload):
|
||||
print(payload.stream.read())
|
||||
# def parse(package)
|
||||
# payload = json.load(payload)
|
||||
# data = string_to_bytes(payload["data"])
|
||||
# # response_payload = {"metadata_type": str(type(metadata)), "data_type": str(type(data))}
|
||||
# return response_payload
|
||||
def make_string_processor():
|
||||
def processor_fn(payload: flask.Request, buffersize=3):
|
||||
|
||||
final = int(payload.headers["final"])
|
||||
|
||||
if final or len(buffer) == buffersize:
|
||||
|
||||
result = [bytes_to_string(itm.decode().upper().encode()) for itm in buffer]
|
||||
|
||||
buffer.clear()
|
||||
|
||||
else:
|
||||
result = []
|
||||
|
||||
if not final:
|
||||
|
||||
payload = json.loads(payload.data.decode())
|
||||
data = string_to_bytes(payload["data"])
|
||||
buffer.append(data)
|
||||
|
||||
response_payload = {"data": result}
|
||||
response = json.dumps(response_payload)
|
||||
|
||||
return response
|
||||
|
||||
buffer = []
|
||||
|
||||
return processor_fn
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user