refactoring

This commit is contained in:
Matthias Bisping 2022-05-05 16:26:18 +02:00
parent b58a9d11c3
commit 373c38113f
2 changed files with 23 additions and 20 deletions

View File

@ -9,7 +9,7 @@ class Nothing:
def has_next(peekable_iter):
return peekable_iter.peek(Nothing) != Nothing
return peekable_iter.peek(Nothing) is not Nothing
class Dispatcher:

View File

@ -8,6 +8,7 @@ from typing import Iterable
from flask import Flask, jsonify, request
from funcy import compose
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.utils import unpack_batchop_pack, unpack_op_pack
from pyinfra.utils.buffer import bufferize
from pyinfra.utils.func import starlift, llift
@ -23,22 +24,28 @@ def make_processor_fn(operation, buffer_size, batched):
wrapper = unpack_batchop_pack if batched else compose(llift, unpack_op_pack)
operation = wrapper(operation)
return bufferize(operation, buffer_size=buffer_size, persist_fn=attrgetter("json"))
return bufferize(operation, buffer_size=buffer_size)
class Processor(abc.ABC):
def __init__(self, processor_fn):
self.results = []
self.execution_queue = []
self.processor_fn = processor_fn
def lazy(self, package, final):
self.results = chain(self.results, self.processor_fn(package, final=final))
def eager(self, package, final):
def run(self, package, final):
"""Eagerly execute processor function and return result immediately."""
return self.processor_fn(package, final=final)
def yield_result(self):
return next(self.results)
def submit(self, package, final):
"""Submit computation request to execution queue; computation is performed on demand."""
self.execution_queue = chain(self.execution_queue, [lambda: self.processor_fn(package, final=final)])
def compute_next(self):
"""Processes the next request."""
try:
return next(self.execution_queue)()
except StopIteration:
return Nothing
def set_up_processing_server(process_fn):
@ -53,28 +60,24 @@ def set_up_processing_server(process_fn):
@app.route("/process", methods=["POST", "PATCH"])
def process():
response_payload = processor.eager(request, final=request.method == "POST")
response_payload = processor.run(request.json, final=request.method == "POST")
return jsonify(response_payload)
@app.route("/submit", methods=["POST", "PATCH"])
def submit():
processor.lazy(request, final=request.method == "POST")
processor.submit(request.json, final=request.method == "POST")
return jsonify(f"{request.base_url.replace('/submit', '')}/pickup")
@app.route("/pickup", methods=["GET"])
def pickup():
try:
resp = jsonify(processor.yield_result())
resp.status_code = 206
except StopIteration:
result = processor.compute_next()
if result is Nothing:
resp = jsonify("No more items left")
resp.status_code = 204
except:
logger.error(traceback.format_exc())
raise
else:
resp = jsonify(result)
resp.status_code = 206
return resp