From 3b7605772e877e736b4bf39de0690368143d1324 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Fri, 13 May 2022 12:42:07 +0200 Subject: [PATCH] refactoring --- pyinfra/server/bufferizer/lazy_bufferizer.py | 12 ++++++------ pyinfra/server/server.py | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pyinfra/server/bufferizer/lazy_bufferizer.py b/pyinfra/server/bufferizer/lazy_bufferizer.py index 8bafd59..e8e3937 100644 --- a/pyinfra/server/bufferizer/lazy_bufferizer.py +++ b/pyinfra/server/bufferizer/lazy_bufferizer.py @@ -2,11 +2,10 @@ import logging from collections import deque from itertools import chain, takewhile -from funcy import first, repeatedly, compose, flatten +from funcy import first, repeatedly, flatten from pyinfra.server.bufferizer.buffer import bufferize from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing -from pyinfra.utils.func import lift logger = logging.getLogger(__name__) @@ -33,18 +32,19 @@ class FlatStreamBuffer: """Wraps a stream buffer and chains its output. Also flushes the stream buffer when applied to an iterable.""" def __init__(self, fn, buffer_size=3): - """Function `fn` Needs to be a mappable generator.""" - self.fn = lift(StreamBuffer(fn, buffer_size=buffer_size)) + """Function `fn` Needs to be mappable and return an iterable --- ideally a generator.""" + self.stream_buffer = StreamBuffer(fn, buffer_size=buffer_size) def __call__(self, items): items = chain(items, [Nothing]) - yield from compose(flatten, self.fn)(items) + yield from flatten(map(self.stream_buffer, items)) class StreamBuffer: - """Puts a function `fn` between an input and an output buffer. `fn` Needs to be a mappable generator.""" + """Puts a function `fn` between an input and an output buffer.""" def __init__(self, fn, buffer_size=3): + """Function `fn` Needs to be mappable and return an iterable --- ideally a generator.""" self.fn = bufferize(fn, buffer_size=buffer_size, null_value=[]) self.result_stream = chain([]) diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index ab8a37e..0bfb1ad 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -26,6 +26,7 @@ def make_streamable_and_wrap_in_packing_logic(fn, batched): class LazyProcessor: + """Accepts computation requests (push) and lazily produces results (pop).""" def __init__(self, stream_buffer: FlatStreamBuffer): self.queue = Queue() self.stream_buffer = stream_buffer @@ -35,8 +36,7 @@ class LazyProcessor: def pop(self): items = stream_queue(self.queue) - result = first(self.stream_buffer(items)) or Nothing - return result + return first(self.stream_buffer(items)) class LazyRestProcessor: @@ -50,7 +50,7 @@ class LazyRestProcessor: return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix)) def pop(self): - result = self.lazy_processor.pop() + result = self.lazy_processor.pop() or Nothing if not valid(result): logger.error(f"Received invalid result: {result}")