diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index afd6828..81fd8be 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -32,18 +32,21 @@ class Processor(abc.ABC): self.execution_queue = chain([]) self.processor_fn = processor_fn - def run(self, package, final): - """Eagerly execute processor function and return result immediately.""" - return self.processor_fn(package, final=final) + # def run(self, package, final): + # """Eagerly execute processor function and return result immediately.""" + # return self.processor_fn(package, final=final) def submit(self, package, final): - """Submit computation request to execution queue; computation is performed on demand.""" - self.execution_queue = chain(self.execution_queue, [lambda: self.processor_fn(package, final=final)]) + """Submit computation request to execution queue; computation is performed on demand. + + Use for processor functions that are 1 -> n where n can take up a lot of memory, since + the """ + self.execution_queue = chain(self.execution_queue, self.processor_fn(package, final=final)) def compute_next(self): """Processes the next request.""" try: - return next(self.execution_queue)() + return next(self.execution_queue) except StopIteration: return Nothing @@ -58,10 +61,10 @@ def set_up_processing_server(process_fn): resp.status_code = 200 return resp - @app.route("/process", methods=["POST", "PATCH"]) - def process(): - response_payload = processor.run(request.json, final=request.method == "POST") - return jsonify(response_payload) + # @app.route("/process", methods=["POST", "PATCH"]) + # def process(): + # response_payload = processor.run(request.json, final=request.method == "POST") + # return jsonify(response_payload) @app.route("/submit", methods=["POST", "PATCH"]) def submit(): @@ -71,6 +74,7 @@ def set_up_processing_server(process_fn): @app.route("/pickup", methods=["GET"]) def pickup(): result = processor.compute_next() + # print(result) if result is Nothing: resp = jsonify("No more items left") resp.status_code = 204 diff --git a/pyinfra/server/utils.py b/pyinfra/server/utils.py index ec9a8a1..93f50b6 100644 --- a/pyinfra/server/utils.py +++ b/pyinfra/server/utils.py @@ -6,7 +6,7 @@ import requests from funcy import repeatedly, identity, ilen, compose from pyinfra.exceptions import UnexpectedItemType -from pyinfra.utils.func import parallel_map, lift, lstarlift, star +from pyinfra.utils.func import parallel_map, lift, lstarlift, star, starlift from test.utils.server import bytes_to_string, string_to_bytes diff --git a/test/exploration_tests/partial_response_test.py b/test/exploration_tests/partial_response_test.py index 0f5ae24..1de349e 100644 --- a/test/exploration_tests/partial_response_test.py +++ b/test/exploration_tests/partial_response_test.py @@ -1,12 +1,12 @@ -import pytest -from funcy import lmap - -from pyinfra.server.rest import process_eagerly -from pyinfra.server.utils import unpack - - -@pytest.mark.parametrize("batched", [True, False]) -@pytest.mark.parametrize("item_type", ["pdf", "string", "image"]) -def test_sending_partial_request(url, input_data_items, metadata, target_data_items, server_process): - output = lmap(unpack, process_eagerly(f"{url}/process", input_data_items, metadata)) - assert output == target_data_items +# import pytest +# from funcy import lmap +# +# from pyinfra.server.rest import process_eagerly +# from pyinfra.server.utils import unpack +# +# +# @pytest.mark.parametrize("batched", [True, False]) +# @pytest.mark.parametrize("item_type", ["pdf", "string", "image"]) +# def test_sending_partial_request(url, input_data_items, metadata, target_data_items, server_process): +# output = lmap(unpack, process_eagerly(f"{url}/process", input_data_items, metadata)) +# assert output == target_data_items diff --git a/test/fixtures/input.py b/test/fixtures/input.py index 5f0c514..ec3100d 100644 --- a/test/fixtures/input.py +++ b/test/fixtures/input.py @@ -40,7 +40,7 @@ def target_data_items(input_data_items, item_type, operation, metadata): @pytest.fixture def endpoint(url, analysis_type): return { - "eager": f"{url}/process", + # "eager": f"{url}/process", "lazy": f"{url}/submit", }[analysis_type] @@ -55,7 +55,12 @@ def n_pages(request): return request.param -@pytest.fixture(params=["eager", "lazy"]) +@pytest.fixture( + params=[ + # "eager", + "lazy", + ] +) def analysis_type(request): return request.param diff --git a/test/unit_tests/pipeline_test.py b/test/unit_tests/pipeline_test.py index 5bbea45..9c3e6ad 100644 --- a/test/unit_tests/pipeline_test.py +++ b/test/unit_tests/pipeline_test.py @@ -22,8 +22,27 @@ def test_mock_pipeline(): assert list(pipeline(data)) == list(rcompose(f, g, h, u)(data)) -@pytest.mark.parametrize("batched", [True, False]) -@pytest.mark.parametrize("item_type", ["pdf", "string", "image"]) +@pytest.mark.parametrize( + "batched", + [ + True, + False, + ], +) +@pytest.mark.parametrize( + "item_type", + [ + "pdf", + # "string", + # "image", + ], +) +@pytest.mark.parametrize( + "n_pages", + [ + 100 + ], +) def test_pipeline(pipeline, input_data_items, metadata, target_data_items): output = lmap(unpack, pipeline(input_data_items, metadata)) assert output == target_data_items