diff --git a/pyinfra/server/processor/processor.py b/pyinfra/server/processor/processor.py index d335a86..38f2b0c 100644 --- a/pyinfra/server/processor/processor.py +++ b/pyinfra/server/processor/processor.py @@ -1,7 +1,12 @@ +import logging +from collections import deque from itertools import chain -from typing import Union, Any -from funcy import chunks, first +from funcy import repeatedly, identity, flatten + +logger = logging.getLogger(__name__) + +from typing import Union, Any from pyinfra.server.dispatcher.dispatcher import Nothing @@ -10,19 +15,44 @@ class OnDemandProcessor: def __init__(self, fn): """Function `fn` has to return an iterable and ideally is a generator.""" self.execution_queue = chain([]) - self.fn = fn - self.chunk = [] + self.fn = hesitant_bufferize(fn) def submit(self, package, **kwargs) -> None: self.execution_queue = chain(self.execution_queue, [package]) def compute_next(self) -> Union[Nothing, Any]: - if not self.chunk: - self.chunk = [*self.helper(self.execution_queue)] - if self.chunk: - return self.chunk.pop() - else: - return Nothing + return next(self.compute()) + + def compute(self): + yield from flatten(map(self.helper, chain(self.execution_queue, [Nothing]))) + yield Nothing def helper(self, packages): return self.fn(packages) + + +def hesitant_bufferize(fn, buffer_size=3, persist_fn=identity): + def buffered_fn(item): + + if item is not Nothing: + buffer.append(persist_fn(item)) + + response_payload = fn(repeatedly(buffer.popleft, n_items_to_pop(buffer, item is Nothing))) + + return response_payload + + def buffer_full(current_buffer_size): + # TODO: this assert does not hold for receiver test, unclear why + # assert current_buffer_size <= buffer_size + if current_buffer_size > buffer_size: + logger.warning(f"Overfull buffer. size: {current_buffer_size}; intended capacity: {buffer_size}") + + return current_buffer_size == buffer_size + + def n_items_to_pop(buffer, final): + current_buffer_size = len(buffer) + return (final or buffer_full(current_buffer_size)) * current_buffer_size + + buffer = deque() + + return buffered_fn diff --git a/pyinfra/server/receiver/receivers/rest.py b/pyinfra/server/receiver/receivers/rest.py index d2ce17a..9f6d285 100644 --- a/pyinfra/server/receiver/receivers/rest.py +++ b/pyinfra/server/receiver/receivers/rest.py @@ -1,12 +1,16 @@ from typing import Iterable import requests +from funcy import chunks, flatten from pyinfra.server.receiver.receiver import Receiver class RestReceiver(Receiver): + def __init__(self, chunk_size=3): + self.chunk_size = chunk_size + def __call__(self, responses: Iterable[requests.Response]): - for response in responses: + for response in flatten(chunks(self.chunk_size, responses)): response.raise_for_status() yield response.json() diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index 6876f55..5ecfc0e 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -2,7 +2,7 @@ import logging from functools import partial from flask import Flask, jsonify, request -from funcy import rcompose, compose, first, chunks +from funcy import rcompose, compose, first, chunks, flatten from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.processor.processor import OnDemandProcessor @@ -28,7 +28,6 @@ class StreamProcessor: def make_streamable(operation, batched, batch_size): operation = operation if batched else starlift(operation) - operation = compose(operation, first, partial(chunks, batch_size)) operation = StreamProcessor(operation) return operation