From b58a9d11c3de980f27a6b8e8bfdbc1933899d0ab Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Thu, 5 May 2022 16:08:59 +0200 Subject: [PATCH] refactoring: added processor class --- pyinfra/server/server.py | 43 +++++++++++++++++++++++++++++++++++----- test/fixtures/server.py | 11 ++-------- 2 files changed, 40 insertions(+), 14 deletions(-) diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index 8eb27cc..f9feb94 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -1,15 +1,49 @@ +import abc import logging import traceback from itertools import chain +from operator import attrgetter +from typing import Iterable from flask import Flask, jsonify, request +from funcy import compose + +from pyinfra.server.utils import unpack_batchop_pack, unpack_op_pack +from pyinfra.utils.buffer import bufferize +from pyinfra.utils.func import starlift, llift logger = logging.getLogger() +def make_processor_fn(operation, buffer_size, batched): + + if batched: + operation = starlift(operation) + + wrapper = unpack_batchop_pack if batched else compose(llift, unpack_op_pack) + operation = wrapper(operation) + + return bufferize(operation, buffer_size=buffer_size, persist_fn=attrgetter("json")) + + +class Processor(abc.ABC): + def __init__(self, processor_fn): + self.results = [] + self.processor_fn = processor_fn + + def lazy(self, package, final): + self.results = chain(self.results, self.processor_fn(package, final=final)) + + def eager(self, package, final): + return self.processor_fn(package, final=final) + + def yield_result(self): + return next(self.results) + + def set_up_processing_server(process_fn): app = Flask(__name__) - response_payload_iter = [] + processor = Processor(process_fn) @app.route("/ready", methods=["GET"]) def ready(): @@ -19,20 +53,19 @@ def set_up_processing_server(process_fn): @app.route("/process", methods=["POST", "PATCH"]) def process(): - response_payload = process_fn(request, final=request.method == "POST") + response_payload = processor.eager(request, final=request.method == "POST") return jsonify(response_payload) @app.route("/submit", methods=["POST", "PATCH"]) def submit(): - nonlocal response_payload_iter - response_payload_iter = chain(response_payload_iter, process_fn(request, final=request.method == "POST")) + processor.lazy(request, final=request.method == "POST") return jsonify(f"{request.base_url.replace('/submit', '')}/pickup") @app.route("/pickup", methods=["GET"]) def pickup(): try: - resp = jsonify(next(response_payload_iter)) + resp = jsonify(processor.yield_result()) resp.status_code = 206 except StopIteration: diff --git a/test/fixtures/server.py b/test/fixtures/server.py index 81255bb..548460a 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -13,7 +13,7 @@ from waitress import serve from pyinfra.server.utils import unpack_op_pack, unpack_batchop_pack from pyinfra.utils.buffer import bufferize from pyinfra.utils.func import llift, starlift -from pyinfra.server.server import set_up_processing_server +from pyinfra.server.server import set_up_processing_server, make_processor_fn from test.utils.image import image_to_bytes @@ -45,14 +45,7 @@ def server(processor_fn): @pytest.fixture def processor_fn(operation, buffer_size, batched): - - if batched: - operation = starlift(operation) - - wrapper = unpack_batchop_pack if batched else compose(llift, unpack_op_pack) - operation = wrapper(operation) - - return bufferize(operation, buffer_size=buffer_size, persist_fn=attrgetter("json")) + return make_processor_fn(operation, buffer_size, batched) @pytest.fixture