processor test; refactoring

This commit is contained in:
Matthias Bisping 2022-05-07 00:23:22 +02:00
parent 132a1a1b50
commit 1e21913e37
3 changed files with 30 additions and 11 deletions

View File

@ -1,20 +1,19 @@
from itertools import chain
from typing import Union, Any
from typing import Union, Any, Iterable, Callable
from pyinfra.server.dispatcher.dispatcher import Nothing
class OnDemandProcessor:
def __init__(self, processor_fn):
def __init__(self, fn):
"""Function has to return an iterable and ideally is a generator."""
self.execution_queue = chain([])
self.processor_fn = processor_fn
self.fn = fn
def submit(self, package, final) -> None:
"""Submit computation request to execution queue; computation is performed on demand."""
self.execution_queue = chain(self.execution_queue, self.processor_fn(package, final=final))
def submit(self, package, **kwargs) -> None:
self.execution_queue = chain(self.execution_queue, self.fn(package, **kwargs))
def compute_next(self) -> Union[Nothing, Any]:
"""Processes the next request."""
try:
return next(self.execution_queue)
except StopIteration:

View File

@ -23,8 +23,8 @@ def make_streamable(operation, buffer_size, batched):
class RestOndDemandProcessor(OnDemandProcessor):
def __init__(self, processor_fn):
super(RestOndDemandProcessor, self).__init__(processor_fn=processor_fn)
def __init__(self, fn):
super(RestOndDemandProcessor, self).__init__(fn=fn)
def submit(self, request, **kwargs) -> None:
super(RestOndDemandProcessor, self).submit(request.json, final=request.method == "POST")
@ -33,8 +33,8 @@ class RestOndDemandProcessor(OnDemandProcessor):
class RestStreamProcessor(RestOndDemandProcessor):
"""Wraps an on-demand-processor. Combine with a webserver that provides the endpoints 'submit' and 'pickup'."""
def __init__(self, processor_fn):
super(RestStreamProcessor, self).__init__(processor_fn=processor_fn)
def __init__(self, fn):
super(RestStreamProcessor, self).__init__(fn=fn)
def submit(self, request, **kwargs):
super(RestStreamProcessor, self).submit(request)

View File

@ -0,0 +1,20 @@
from itertools import takewhile
from funcy import repeatedly
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.processor.processor import OnDemandProcessor
def test_processor():
def func(x):
return [x ** 2]
processor = OnDemandProcessor(func)
for i in range(10):
processor.submit(i)
output = list(takewhile(lambda r: r != Nothing, repeatedly(processor.compute_next)))
assert output == [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]