refactoring

This commit is contained in:
Matthias Bisping 2022-04-27 13:38:55 +02:00
parent 2af648254e
commit 90af62ed2c
2 changed files with 18 additions and 10 deletions

View File

@ -1,6 +1,7 @@
import json
from itertools import chain
from operator import methodcaller, itemgetter
from typing import Iterable
import pytest
import requests
@ -13,15 +14,15 @@ def lift(fn):
return curry(map)(fn)
@pytest.mark.parametrize("item_type", ["string"])
def test_sending_partial_request(url, data_items, metadata):
def send_partial_request(url, input_data: Iterable[bytes], metadata):
def pack(data: bytes):
package = {"data": bytes_to_string(data), "metadata": metadata}
package = json.dumps(package).encode()
return package
def post(data):
return requests.post(f"{url}/process", data=data)
return requests.post(url, data=data)
pack_data_and_metadata_for_rest_transfer = lift(pack)
send_packages_to_analyzer_and_receive_responses = lift(post)
@ -29,15 +30,20 @@ def test_sending_partial_request(url, data_items, metadata):
flatten_buffered_payloads = chain.from_iterable
interpret_payloads = lift(string_to_bytes)
data = rcompose(
input_data_to_result_data = rcompose(
pack_data_and_metadata_for_rest_transfer,
send_packages_to_analyzer_and_receive_responses,
extract_data_from_responses,
flatten_buffered_payloads,
interpret_payloads,
)((*data_items, b""))
)
assert list(data) == [b"CONTENT"] * 7
return input_data_to_result_data((*input_data, b""))
@pytest.mark.parametrize("item_type", ["string"])
def test_sending_partial_request(url, data_items, metadata):
assert list(send_partial_request(f"{url}/process", data_items, metadata)) == [b"CONTENT"] * 7
@pytest.fixture

View File

@ -1,11 +1,12 @@
import json
import socket
from multiprocessing import Process
from operator import itemgetter
import flask
import pytest
import requests
from funcy import retry
from funcy import retry, compose
from waitress import serve
from test.server import set_up_processing_server
@ -45,10 +46,9 @@ def processor_fn(item_type, data_items):
def make_string_processor():
def processor_fn(payload: flask.Request, buffersize=3):
def processor_fn(request: flask.Request, buffersize=3):
payload = json.loads(payload.data.decode())
data = string_to_bytes(payload["data"])
data = get_data_from_request(request)
if not data or len(buffer) == buffersize:
@ -66,6 +66,8 @@ def make_string_processor():
response_payload = {"data": result}
return response_payload
get_data_from_request = compose(string_to_bytes, itemgetter("data"), json.loads, lambda p: p.data.decode())
buffer = []
return processor_fn