removed eager endpoint (/process)

This commit is contained in:
Matthias Bisping 2022-05-05 17:21:47 +02:00
parent 4a3ac150cf
commit 68c24c863f
5 changed files with 55 additions and 27 deletions

View File

@ -32,18 +32,21 @@ class Processor(abc.ABC):
self.execution_queue = chain([])
self.processor_fn = processor_fn
def run(self, package, final):
"""Eagerly execute processor function and return result immediately."""
return self.processor_fn(package, final=final)
# def run(self, package, final):
# """Eagerly execute processor function and return result immediately."""
# return self.processor_fn(package, final=final)
def submit(self, package, final):
"""Submit computation request to execution queue; computation is performed on demand."""
self.execution_queue = chain(self.execution_queue, [lambda: self.processor_fn(package, final=final)])
"""Submit computation request to execution queue; computation is performed on demand.
Use for processor functions that are 1 -> n where n can take up a lot of memory, since
the """
self.execution_queue = chain(self.execution_queue, self.processor_fn(package, final=final))
def compute_next(self):
"""Processes the next request."""
try:
return next(self.execution_queue)()
return next(self.execution_queue)
except StopIteration:
return Nothing
@ -58,10 +61,10 @@ def set_up_processing_server(process_fn):
resp.status_code = 200
return resp
@app.route("/process", methods=["POST", "PATCH"])
def process():
response_payload = processor.run(request.json, final=request.method == "POST")
return jsonify(response_payload)
# @app.route("/process", methods=["POST", "PATCH"])
# def process():
# response_payload = processor.run(request.json, final=request.method == "POST")
# return jsonify(response_payload)
@app.route("/submit", methods=["POST", "PATCH"])
def submit():
@ -71,6 +74,7 @@ def set_up_processing_server(process_fn):
@app.route("/pickup", methods=["GET"])
def pickup():
result = processor.compute_next()
# print(result)
if result is Nothing:
resp = jsonify("No more items left")
resp.status_code = 204

View File

@ -6,7 +6,7 @@ import requests
from funcy import repeatedly, identity, ilen, compose
from pyinfra.exceptions import UnexpectedItemType
from pyinfra.utils.func import parallel_map, lift, lstarlift, star
from pyinfra.utils.func import parallel_map, lift, lstarlift, star, starlift
from test.utils.server import bytes_to_string, string_to_bytes

View File

@ -1,12 +1,12 @@
import pytest
from funcy import lmap
from pyinfra.server.rest import process_eagerly
from pyinfra.server.utils import unpack
@pytest.mark.parametrize("batched", [True, False])
@pytest.mark.parametrize("item_type", ["pdf", "string", "image"])
def test_sending_partial_request(url, input_data_items, metadata, target_data_items, server_process):
output = lmap(unpack, process_eagerly(f"{url}/process", input_data_items, metadata))
assert output == target_data_items
# import pytest
# from funcy import lmap
#
# from pyinfra.server.rest import process_eagerly
# from pyinfra.server.utils import unpack
#
#
# @pytest.mark.parametrize("batched", [True, False])
# @pytest.mark.parametrize("item_type", ["pdf", "string", "image"])
# def test_sending_partial_request(url, input_data_items, metadata, target_data_items, server_process):
# output = lmap(unpack, process_eagerly(f"{url}/process", input_data_items, metadata))
# assert output == target_data_items

View File

@ -40,7 +40,7 @@ def target_data_items(input_data_items, item_type, operation, metadata):
@pytest.fixture
def endpoint(url, analysis_type):
return {
"eager": f"{url}/process",
# "eager": f"{url}/process",
"lazy": f"{url}/submit",
}[analysis_type]
@ -55,7 +55,12 @@ def n_pages(request):
return request.param
@pytest.fixture(params=["eager", "lazy"])
@pytest.fixture(
params=[
# "eager",
"lazy",
]
)
def analysis_type(request):
return request.param

View File

@ -22,8 +22,27 @@ def test_mock_pipeline():
assert list(pipeline(data)) == list(rcompose(f, g, h, u)(data))
@pytest.mark.parametrize("batched", [True, False])
@pytest.mark.parametrize("item_type", ["pdf", "string", "image"])
@pytest.mark.parametrize(
"batched",
[
True,
False,
],
)
@pytest.mark.parametrize(
"item_type",
[
"pdf",
# "string",
# "image",
],
)
@pytest.mark.parametrize(
"n_pages",
[
100
],
)
def test_pipeline(pipeline, input_data_items, metadata, target_data_items):
output = lmap(unpack, pipeline(input_data_items, metadata))
assert output == target_data_items