refactoring

This commit is contained in:
Matthias Bisping 2022-05-06 23:39:29 +02:00
parent f428372511
commit f99d779c29
4 changed files with 37 additions and 35 deletions

View File

View File

@ -0,0 +1,21 @@
from itertools import chain
from typing import Union, Any
from pyinfra.server.dispatcher.dispatcher import Nothing
class OnDemandProcessor:
def __init__(self, processor_fn):
self.execution_queue = chain([])
self.processor_fn = processor_fn
def submit(self, package, final) -> None:
"""Submit computation request to execution queue; computation is performed on demand."""
self.execution_queue = chain(self.execution_queue, self.processor_fn(package, final=final))
def compute_next(self) -> Union[Nothing, Any]:
"""Processes the next request."""
try:
return next(self.execution_queue)
except StopIteration:
return Nothing

View File

@ -1,12 +1,10 @@
import abc
import logging
from itertools import chain
from typing import Union, Any
from flask import Flask, jsonify, request
from funcy import compose, flatten
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.processor.processor import OnDemandProcessor
from pyinfra.server.utils import unpack_batchop_pack, unpack_op_pack
from pyinfra.utils.buffer import bufferize
from pyinfra.utils.func import lift
@ -14,7 +12,7 @@ from pyinfra.utils.func import lift
logger = logging.getLogger()
def make_processor_fn(operation, buffer_size, batched):
def make_streamable(operation, buffer_size, batched):
wrapper = unpack_batchop_pack if batched else compose(lift, unpack_op_pack)
operation = wrapper(operation)
@ -24,43 +22,26 @@ def make_processor_fn(operation, buffer_size, batched):
return operation
class OnDemandProcessor:
class RestOndDemandProcessor(OnDemandProcessor):
def __init__(self, processor_fn):
self.execution_queue = chain([])
self.processor_fn = processor_fn
def submit(self, package, final) -> None:
"""Submit computation request to execution queue; computation is performed on demand."""
self.execution_queue = chain(self.execution_queue, self.processor_fn(package, final=final))
def compute_next(self) -> Union[Nothing, Any]:
"""Processes the next request."""
try:
return next(self.execution_queue)
except StopIteration:
return Nothing
class RestOndDemandProcessorAdapter(OnDemandProcessor):
def __init__(self, processor_fn):
super(RestOndDemandProcessorAdapter, self).__init__(processor_fn=processor_fn)
super(RestOndDemandProcessor, self).__init__(processor_fn=processor_fn)
def submit(self, request, **kwargs) -> None:
super(RestOndDemandProcessorAdapter, self).submit(request.json, final=request.method == "POST")
super(RestOndDemandProcessor, self).submit(request.json, final=request.method == "POST")
class RestStreamer:
class RestStreamProcessor(RestOndDemandProcessor):
"""Wraps an on-demand-processor. Combine with a webserver that provides the endpoints 'submit' and 'pickup'."""
def __init__(self, processor):
self.processor = processor
def __init__(self, processor_fn):
super(RestStreamProcessor, self).__init__(processor_fn=processor_fn)
def submit(self, request):
self.processor.submit(request)
def submit(self, request, **kwargs):
super(RestStreamProcessor, self).submit(request)
return jsonify(f"{request.base_url.replace('/submit', '')}/pickup")
def pickup(self):
result = self.processor.compute_next()
result = self.compute_next()
if not valid(result):
logger.error(f"Received invalid result: {result}")
@ -83,7 +64,7 @@ def valid(result):
def set_up_processing_server(process_fn):
app = Flask(__name__)
streamer = RestStreamer(RestOndDemandProcessorAdapter(process_fn))
stream = RestStreamProcessor(process_fn)
@app.route("/ready", methods=["GET"])
def ready():
@ -93,10 +74,10 @@ def set_up_processing_server(process_fn):
@app.route("/submit", methods=["POST", "PATCH"])
def submit():
return streamer.submit(request)
return stream.submit(request)
@app.route("/pickup", methods=["GET"])
def pickup():
return streamer.pickup()
return stream.pickup()
return app

View File

@ -9,7 +9,7 @@ from PIL import Image
from funcy import retry
from waitress import serve
from pyinfra.server.server import set_up_processing_server, make_processor_fn
from pyinfra.server.server import set_up_processing_server, make_streamable
from pyinfra.utils.func import starlift
from test.utils.image import image_to_bytes
@ -42,7 +42,7 @@ def server(processor_fn):
@pytest.fixture
def processor_fn(operation_conditionally_batched, buffer_size, batched):
return make_processor_fn(operation_conditionally_batched, buffer_size, batched)
return make_streamable(operation_conditionally_batched, buffer_size, batched)
@pytest.fixture