From 461c0fe6a6181dad0524ce4e4753460074a3570b Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Thu, 12 May 2022 17:29:20 +0200 Subject: [PATCH] refactoring: simplified queue consuming function --- pyinfra/server/bufferizer/lazy_bufferizer.py | 12 +++--------- pyinfra/server/server.py | 4 +--- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/pyinfra/server/bufferizer/lazy_bufferizer.py b/pyinfra/server/bufferizer/lazy_bufferizer.py index 25636d0..090b285 100644 --- a/pyinfra/server/bufferizer/lazy_bufferizer.py +++ b/pyinfra/server/bufferizer/lazy_bufferizer.py @@ -120,8 +120,6 @@ class StreamBuffer: def make_queue_consumer(buffer: StreamBuffer): - def call(queue): - return compute_next(queue) def compute_next(queue): return next(compute(queue)) @@ -133,19 +131,15 @@ def make_queue_consumer(buffer: StreamBuffer): def consume_queue(queue): queue_items = stream_queue(queue) - yield from chain.from_iterable(map(add_to_buffer_and_consume, queue_items)) + yield from chain.from_iterable(map(buffer, queue_items)) def flush_buffer(): - yield from add_to_buffer_and_consume(Nothing) + yield from buffer(Nothing) def signal_termination(): return Nothing - def add_to_buffer_and_consume(queue_item): - buffer.push(queue_item) - yield from takewhile(is_not_nothing, repeatedly(buffer.pop)) - - return call + return compute_next def stream_queue(queue): diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index f561113..f24cdeb 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -1,11 +1,9 @@ import logging -from itertools import chain from flask import Flask, jsonify, request from funcy import compose, identity -from pyinfra.server.bufferizer.lazy_bufferizer import Queue, StreamBuffer, QueueBufferCoupler, stream_queue, \ - make_queue_consumer +from pyinfra.server.bufferizer.lazy_bufferizer import Queue, StreamBuffer, make_queue_consumer from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.utils import unpack, normalize, pack from pyinfra.utils.func import starlift, lift