diff --git a/pyinfra/server/processor/processor.py b/pyinfra/server/processor/processor.py index 482846d..4321011 100644 --- a/pyinfra/server/processor/processor.py +++ b/pyinfra/server/processor/processor.py @@ -1,20 +1,19 @@ from itertools import chain -from typing import Union, Any +from typing import Union, Any, Iterable, Callable from pyinfra.server.dispatcher.dispatcher import Nothing class OnDemandProcessor: - def __init__(self, processor_fn): + def __init__(self, fn): + """Function has to return an iterable and ideally is a generator.""" self.execution_queue = chain([]) - self.processor_fn = processor_fn + self.fn = fn - def submit(self, package, final) -> None: - """Submit computation request to execution queue; computation is performed on demand.""" - self.execution_queue = chain(self.execution_queue, self.processor_fn(package, final=final)) + def submit(self, package, **kwargs) -> None: + self.execution_queue = chain(self.execution_queue, self.fn(package, **kwargs)) def compute_next(self) -> Union[Nothing, Any]: - """Processes the next request.""" try: return next(self.execution_queue) except StopIteration: diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index b0b8df0..0111e04 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -23,8 +23,8 @@ def make_streamable(operation, buffer_size, batched): class RestOndDemandProcessor(OnDemandProcessor): - def __init__(self, processor_fn): - super(RestOndDemandProcessor, self).__init__(processor_fn=processor_fn) + def __init__(self, fn): + super(RestOndDemandProcessor, self).__init__(fn=fn) def submit(self, request, **kwargs) -> None: super(RestOndDemandProcessor, self).submit(request.json, final=request.method == "POST") @@ -33,8 +33,8 @@ class RestOndDemandProcessor(OnDemandProcessor): class RestStreamProcessor(RestOndDemandProcessor): """Wraps an on-demand-processor. Combine with a webserver that provides the endpoints 'submit' and 'pickup'.""" - def __init__(self, processor_fn): - super(RestStreamProcessor, self).__init__(processor_fn=processor_fn) + def __init__(self, fn): + super(RestStreamProcessor, self).__init__(fn=fn) def submit(self, request, **kwargs): super(RestStreamProcessor, self).submit(request) diff --git a/test/unit_tests/server/processor_test.py b/test/unit_tests/server/processor_test.py new file mode 100644 index 0000000..d19a0f2 --- /dev/null +++ b/test/unit_tests/server/processor_test.py @@ -0,0 +1,20 @@ +from itertools import takewhile + +from funcy import repeatedly + +from pyinfra.server.dispatcher.dispatcher import Nothing +from pyinfra.server.processor.processor import OnDemandProcessor + + +def test_processor(): + def func(x): + return [x ** 2] + + processor = OnDemandProcessor(func) + + for i in range(10): + processor.submit(i) + + output = list(takewhile(lambda r: r != Nothing, repeatedly(processor.compute_next))) + + assert output == [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]