pyinfra/pyinfra/utils/buffer.py
Matthias Bisping a999ce2c3b refactoring
2022-04-28 21:36:42 +02:00

24 lines
713 B
Python

from collections import deque
from typing import Any
from funcy import lmap, repeatedly
def bufferize(fn, buffer_size=3, persist_fn=lambda x: x):
def buffered_fn(item: Any, final=False):
buffer.append(persist_fn(item))
response_payload = lmap(fn, repeatedly(buffer.popleft, n_items_to_pop(buffer, final)))
return response_payload
def buffer_full(current_buffer_size):
assert current_buffer_size <= buffer_size
return current_buffer_size == buffer_size
def n_items_to_pop(buffer, final):
current_buffer_size = len(buffer)
return (final or buffer_full(current_buffer_size)) * current_buffer_size
buffer = deque()
return buffered_fn