From 373c38113f0e607b8be6cf58e59f86f718118db0 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Thu, 5 May 2022 16:26:18 +0200 Subject: [PATCH] refactoring --- pyinfra/server/dispatcher/dispatcher.py | 2 +- pyinfra/server/server.py | 41 +++++++++++++------------ 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/pyinfra/server/dispatcher/dispatcher.py b/pyinfra/server/dispatcher/dispatcher.py index 8d60ce8..3f71804 100644 --- a/pyinfra/server/dispatcher/dispatcher.py +++ b/pyinfra/server/dispatcher/dispatcher.py @@ -9,7 +9,7 @@ class Nothing: def has_next(peekable_iter): - return peekable_iter.peek(Nothing) != Nothing + return peekable_iter.peek(Nothing) is not Nothing class Dispatcher: diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index f9feb94..d88c2dd 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -8,6 +8,7 @@ from typing import Iterable from flask import Flask, jsonify, request from funcy import compose +from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.utils import unpack_batchop_pack, unpack_op_pack from pyinfra.utils.buffer import bufferize from pyinfra.utils.func import starlift, llift @@ -23,22 +24,28 @@ def make_processor_fn(operation, buffer_size, batched): wrapper = unpack_batchop_pack if batched else compose(llift, unpack_op_pack) operation = wrapper(operation) - return bufferize(operation, buffer_size=buffer_size, persist_fn=attrgetter("json")) + return bufferize(operation, buffer_size=buffer_size) class Processor(abc.ABC): def __init__(self, processor_fn): - self.results = [] + self.execution_queue = [] self.processor_fn = processor_fn - def lazy(self, package, final): - self.results = chain(self.results, self.processor_fn(package, final=final)) - - def eager(self, package, final): + def run(self, package, final): + """Eagerly execute processor function and return result immediately.""" return self.processor_fn(package, final=final) - def yield_result(self): - return next(self.results) + def submit(self, package, final): + """Submit computation request to execution queue; computation is performed on demand.""" + self.execution_queue = chain(self.execution_queue, [lambda: self.processor_fn(package, final=final)]) + + def compute_next(self): + """Processes the next request.""" + try: + return next(self.execution_queue)() + except StopIteration: + return Nothing def set_up_processing_server(process_fn): @@ -53,28 +60,24 @@ def set_up_processing_server(process_fn): @app.route("/process", methods=["POST", "PATCH"]) def process(): - response_payload = processor.eager(request, final=request.method == "POST") + response_payload = processor.run(request.json, final=request.method == "POST") return jsonify(response_payload) @app.route("/submit", methods=["POST", "PATCH"]) def submit(): - processor.lazy(request, final=request.method == "POST") + processor.submit(request.json, final=request.method == "POST") return jsonify(f"{request.base_url.replace('/submit', '')}/pickup") @app.route("/pickup", methods=["GET"]) def pickup(): - - try: - resp = jsonify(processor.yield_result()) - resp.status_code = 206 - - except StopIteration: + result = processor.compute_next() + if result is Nothing: resp = jsonify("No more items left") resp.status_code = 204 - except: - logger.error(traceback.format_exc()) - raise + else: + resp = jsonify(result) + resp.status_code = 206 return resp