refactoring

This commit is contained in:
Matthias Bisping 2022-05-13 12:42:07 +02:00
parent 1acf16dc91
commit 3b7605772e
2 changed files with 9 additions and 9 deletions

View File

@ -2,11 +2,10 @@ import logging
from collections import deque
from itertools import chain, takewhile
from funcy import first, repeatedly, compose, flatten
from funcy import first, repeatedly, flatten
from pyinfra.server.bufferizer.buffer import bufferize
from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing
from pyinfra.utils.func import lift
logger = logging.getLogger(__name__)
@ -33,18 +32,19 @@ class FlatStreamBuffer:
"""Wraps a stream buffer and chains its output. Also flushes the stream buffer when applied to an iterable."""
def __init__(self, fn, buffer_size=3):
"""Function `fn` Needs to be a mappable generator."""
self.fn = lift(StreamBuffer(fn, buffer_size=buffer_size))
"""Function `fn` Needs to be mappable and return an iterable --- ideally a generator."""
self.stream_buffer = StreamBuffer(fn, buffer_size=buffer_size)
def __call__(self, items):
items = chain(items, [Nothing])
yield from compose(flatten, self.fn)(items)
yield from flatten(map(self.stream_buffer, items))
class StreamBuffer:
"""Puts a function `fn` between an input and an output buffer. `fn` Needs to be a mappable generator."""
"""Puts a function `fn` between an input and an output buffer."""
def __init__(self, fn, buffer_size=3):
"""Function `fn` Needs to be mappable and return an iterable --- ideally a generator."""
self.fn = bufferize(fn, buffer_size=buffer_size, null_value=[])
self.result_stream = chain([])

View File

@ -26,6 +26,7 @@ def make_streamable_and_wrap_in_packing_logic(fn, batched):
class LazyProcessor:
"""Accepts computation requests (push) and lazily produces results (pop)."""
def __init__(self, stream_buffer: FlatStreamBuffer):
self.queue = Queue()
self.stream_buffer = stream_buffer
@ -35,8 +36,7 @@ class LazyProcessor:
def pop(self):
items = stream_queue(self.queue)
result = first(self.stream_buffer(items)) or Nothing
return result
return first(self.stream_buffer(items))
class LazyRestProcessor:
@ -50,7 +50,7 @@ class LazyRestProcessor:
return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix))
def pop(self):
result = self.lazy_processor.pop()
result = self.lazy_processor.pop() or Nothing
if not valid(result):
logger.error(f"Received invalid result: {result}")