refactoring

This commit is contained in:
Matthias Bisping 2022-04-27 10:08:57 +02:00
parent e903c69a07
commit 9e8172427c
2 changed files with 26 additions and 15 deletions

View File

@ -1,29 +1,41 @@
import json
from itertools import repeat, starmap, chain
from itertools import chain
from operator import methodcaller, itemgetter
import pytest
import requests
from funcy import pluck
from funcy import curry, rcompose, compose
from test.utils.server import bytes_to_string, string_to_bytes
def lift(fn):
return curry(map)(fn)
@pytest.mark.parametrize("item_type", ["string"])
def test_sending_partial_request(url, data_items, metadata):
def pack(metadata: dict, data: bytes):
def pack(data: bytes):
package = {"data": bytes_to_string(data), "metadata": metadata}
package = json.dumps(package).encode()
return package
def post(package):
final = str(0 if package else 1)
return requests.post(f"{url}/process", data=package, headers={"final": final})
def post(data):
return requests.post(f"{url}/process", data=data)
packages = starmap(pack, zip(repeat(metadata), data_items))
pack_data_and_metadata_for_rest_transfer = lift(pack)
send_packages_to_analyzer_and_receive_responses = lift(post)
extract_data_from_responses = lift(compose(itemgetter("data"), methodcaller("json")))
flatten_buffered_payloads = chain.from_iterable
interpret_payloads = lift(string_to_bytes)
responses = map(post, chain(packages, [""]))
payloads = (r.json() for r in responses)
data = map(string_to_bytes, chain.from_iterable(pluck("data", payloads)))
data = rcompose(
pack_data_and_metadata_for_rest_transfer,
send_packages_to_analyzer_and_receive_responses,
extract_data_from_responses,
flatten_buffered_payloads,
interpret_payloads
)((*data_items, b""))
assert list(data) == [b"CONTENT"] * 7

View File

@ -47,9 +47,10 @@ def processor_fn(item_type, data_items):
def make_string_processor():
def processor_fn(payload: flask.Request, buffersize=3):
final = int(payload.headers["final"])
payload = json.loads(payload.data.decode())
data = string_to_bytes(payload["data"])
if final or len(buffer) == buffersize:
if not data or len(buffer) == buffersize:
result = [bytes_to_string(itm.decode().upper().encode()) for itm in buffer]
@ -58,10 +59,8 @@ def make_string_processor():
else:
result = []
if not final:
if data:
payload = json.loads(payload.data.decode())
data = string_to_bytes(payload["data"])
buffer.append(data)
response_payload = {"data": result}