refactoring: added processor class

This commit is contained in:
Matthias Bisping 2022-05-05 16:08:59 +02:00
parent bd5fe82e06
commit b58a9d11c3
2 changed files with 40 additions and 14 deletions

View File

@ -1,15 +1,49 @@
import abc
import logging
import traceback
from itertools import chain
from operator import attrgetter
from typing import Iterable
from flask import Flask, jsonify, request
from funcy import compose
from pyinfra.server.utils import unpack_batchop_pack, unpack_op_pack
from pyinfra.utils.buffer import bufferize
from pyinfra.utils.func import starlift, llift
logger = logging.getLogger()
def make_processor_fn(operation, buffer_size, batched):
if batched:
operation = starlift(operation)
wrapper = unpack_batchop_pack if batched else compose(llift, unpack_op_pack)
operation = wrapper(operation)
return bufferize(operation, buffer_size=buffer_size, persist_fn=attrgetter("json"))
class Processor(abc.ABC):
def __init__(self, processor_fn):
self.results = []
self.processor_fn = processor_fn
def lazy(self, package, final):
self.results = chain(self.results, self.processor_fn(package, final=final))
def eager(self, package, final):
return self.processor_fn(package, final=final)
def yield_result(self):
return next(self.results)
def set_up_processing_server(process_fn):
app = Flask(__name__)
response_payload_iter = []
processor = Processor(process_fn)
@app.route("/ready", methods=["GET"])
def ready():
@ -19,20 +53,19 @@ def set_up_processing_server(process_fn):
@app.route("/process", methods=["POST", "PATCH"])
def process():
response_payload = process_fn(request, final=request.method == "POST")
response_payload = processor.eager(request, final=request.method == "POST")
return jsonify(response_payload)
@app.route("/submit", methods=["POST", "PATCH"])
def submit():
nonlocal response_payload_iter
response_payload_iter = chain(response_payload_iter, process_fn(request, final=request.method == "POST"))
processor.lazy(request, final=request.method == "POST")
return jsonify(f"{request.base_url.replace('/submit', '')}/pickup")
@app.route("/pickup", methods=["GET"])
def pickup():
try:
resp = jsonify(next(response_payload_iter))
resp = jsonify(processor.yield_result())
resp.status_code = 206
except StopIteration:

View File

@ -13,7 +13,7 @@ from waitress import serve
from pyinfra.server.utils import unpack_op_pack, unpack_batchop_pack
from pyinfra.utils.buffer import bufferize
from pyinfra.utils.func import llift, starlift
from pyinfra.server.server import set_up_processing_server
from pyinfra.server.server import set_up_processing_server, make_processor_fn
from test.utils.image import image_to_bytes
@ -45,14 +45,7 @@ def server(processor_fn):
@pytest.fixture
def processor_fn(operation, buffer_size, batched):
if batched:
operation = starlift(operation)
wrapper = unpack_batchop_pack if batched else compose(llift, unpack_op_pack)
operation = wrapper(operation)
return bufferize(operation, buffer_size=buffer_size, persist_fn=attrgetter("json"))
return make_processor_fn(operation, buffer_size, batched)
@pytest.fixture