refactoring: flat stream buffer now takes over stream buffer flushing

This commit is contained in:
Matthias Bisping 2022-05-13 12:29:18 +02:00
parent c09e5df23e
commit 1acf16dc91
2 changed files with 3 additions and 5 deletions

View File

@ -30,15 +30,14 @@ class Queue:
class FlatStreamBuffer: class FlatStreamBuffer:
"""Produces a function, which applied to n inputs, produces m >= n outputs, when called m times. If m > n, then the """Wraps a stream buffer and chains its output. Also flushes the stream buffer when applied to an iterable."""
function needs to be called m - n times with `Nothing` to return the remaining values.
"""
def __init__(self, fn, buffer_size=3): def __init__(self, fn, buffer_size=3):
"""Function `fn` Needs to be a mappable generator.""" """Function `fn` Needs to be a mappable generator."""
self.fn = lift(StreamBuffer(fn, buffer_size=buffer_size)) self.fn = lift(StreamBuffer(fn, buffer_size=buffer_size))
def __call__(self, items): def __call__(self, items):
items = chain(items, [Nothing])
yield from compose(flatten, self.fn)(items) yield from compose(flatten, self.fn)(items)

View File

@ -1,5 +1,4 @@
import logging import logging
from itertools import chain
from flask import Flask, jsonify, request from flask import Flask, jsonify, request
from funcy import compose, identity, first from funcy import compose, identity, first
@ -35,7 +34,7 @@ class LazyProcessor:
self.queue.append(item) self.queue.append(item)
def pop(self): def pop(self):
items = chain(stream_queue(self.queue), [Nothing]) items = stream_queue(self.queue)
result = first(self.stream_buffer(items)) or Nothing result = first(self.stream_buffer(items)) or Nothing
return result return result