refactoring: simplified queue consuming function

This commit is contained in:
Matthias Bisping 2022-05-12 17:29:20 +02:00
parent 9b5fc4ff77
commit 461c0fe6a6
2 changed files with 4 additions and 12 deletions

View File

@ -120,8 +120,6 @@ class StreamBuffer:
def make_queue_consumer(buffer: StreamBuffer):
def call(queue):
return compute_next(queue)
def compute_next(queue):
return next(compute(queue))
@ -133,19 +131,15 @@ def make_queue_consumer(buffer: StreamBuffer):
def consume_queue(queue):
queue_items = stream_queue(queue)
yield from chain.from_iterable(map(add_to_buffer_and_consume, queue_items))
yield from chain.from_iterable(map(buffer, queue_items))
def flush_buffer():
yield from add_to_buffer_and_consume(Nothing)
yield from buffer(Nothing)
def signal_termination():
return Nothing
def add_to_buffer_and_consume(queue_item):
buffer.push(queue_item)
yield from takewhile(is_not_nothing, repeatedly(buffer.pop))
return call
return compute_next
def stream_queue(queue):

View File

@ -1,11 +1,9 @@
import logging
from itertools import chain
from flask import Flask, jsonify, request
from funcy import compose, identity
from pyinfra.server.bufferizer.lazy_bufferizer import Queue, StreamBuffer, QueueBufferCoupler, stream_queue, \
make_queue_consumer
from pyinfra.server.bufferizer.lazy_bufferizer import Queue, StreamBuffer, make_queue_consumer
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.utils import unpack, normalize, pack
from pyinfra.utils.func import starlift, lift