diff --git a/pyinfra/server/bufferizer/__init__.py b/pyinfra/server/buffering/__init__.py similarity index 100% rename from pyinfra/server/bufferizer/__init__.py rename to pyinfra/server/buffering/__init__.py diff --git a/pyinfra/server/bufferizer/buffer.py b/pyinfra/server/buffering/buffer.py similarity index 100% rename from pyinfra/server/bufferizer/buffer.py rename to pyinfra/server/buffering/buffer.py diff --git a/pyinfra/server/buffering/queue.py b/pyinfra/server/buffering/queue.py new file mode 100644 index 0000000..b7c66a4 --- /dev/null +++ b/pyinfra/server/buffering/queue.py @@ -0,0 +1,24 @@ +from collections import deque +from itertools import takewhile + +from funcy import repeatedly + +from pyinfra.server.dispatcher.dispatcher import is_not_nothing, Nothing + + +def stream_queue(queue): + yield from takewhile(is_not_nothing, repeatedly(queue.popleft)) + + +class Queue: + def __init__(self): + self.__queue = deque() + + def append(self, package) -> None: + self.__queue.append(package) + + def popleft(self): + return self.__queue.popleft() if self.__queue else Nothing + + def __bool__(self): + return bool(self.__queue) diff --git a/pyinfra/server/bufferizer/lazy_bufferizer.py b/pyinfra/server/buffering/stream.py similarity index 74% rename from pyinfra/server/bufferizer/lazy_bufferizer.py rename to pyinfra/server/buffering/stream.py index 631a2a3..bd63c5b 100644 --- a/pyinfra/server/bufferizer/lazy_bufferizer.py +++ b/pyinfra/server/buffering/stream.py @@ -1,33 +1,11 @@ -import logging -from collections import deque from itertools import chain, takewhile from typing import Iterable from funcy import first, repeatedly, flatten -from pyinfra.server.bufferizer.buffer import bufferize +from pyinfra.server.buffering.buffer import bufferize from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing -logger = logging.getLogger(__name__) - - -def stream_queue(queue): - yield from takewhile(is_not_nothing, repeatedly(queue.popleft)) - - -class Queue: - def __init__(self): - self.__queue = deque() - - def append(self, package) -> None: - self.__queue.append(package) - - def popleft(self): - return self.__queue.popleft() if self.__queue else Nothing - - def __bool__(self): - return bool(self.__queue) - class FlatStreamBuffer: """Wraps a stream buffer and chains its output. Also flushes the stream buffer when applied to an iterable.""" diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index 98b83e4..05973ee 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -1,73 +1,7 @@ -import logging - from flask import Flask, jsonify, request -from funcy import compose, identity, first -from pyinfra.server.bufferizer.lazy_bufferizer import Queue, stream_queue, FlatStreamBuffer -from pyinfra.server.dispatcher.dispatcher import Nothing -from pyinfra.server.utils import unpack, normalize, pack -from pyinfra.utils.func import starlift, lift - -logger = logging.getLogger() - - -def make_streamable(fn, batched): - return compose(normalize, (identity if batched else starlift)(fn)) - - -def unpack_fn_pack(fn): - return compose(starlift(pack), fn, lift(unpack)) - - -def make_streamable_and_wrap_in_packing_logic(fn, batched): - fn = make_streamable(fn, batched) - fn = unpack_fn_pack(fn) - return fn - - -class QueuedStreamFunction: - def __init__(self, fn): - self.queue = Queue() - self.fn = fn - - def push(self, item): - self.queue.append(item) - - def pop(self): - items = stream_queue(self.queue) - return first(self.fn(items)) - - -class LazyRestProcessor: - def __init__(self, queued_stream_function: QueuedStreamFunction, submit_suffix="submit", pickup_suffix="pickup"): - self.submit_suffix = submit_suffix - self.pickup_suffix = pickup_suffix - self.queued_stream_function = queued_stream_function - - def push(self, request): - self.queued_stream_function.push(request.json) - return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix)) - - def pop(self): - result = self.queued_stream_function.pop() or Nothing - - if not valid(result): - logger.error(f"Received invalid result: {result}") - result = Nothing - - if result is Nothing: - resp = jsonify("No more items left") - resp.status_code = 204 - - else: - resp = jsonify(result) - resp.status_code = 206 - - return resp - - -def valid(result): - return isinstance(result, dict) or result is Nothing +from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction +from pyinfra.server.stream.rest import LazyRestProcessor def set_up_processing_server(queued_stream_function: QueuedStreamFunction): diff --git a/pyinfra/server/stream/__init__.py b/pyinfra/server/stream/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/server/stream/queued_stream_function.py b/pyinfra/server/stream/queued_stream_function.py new file mode 100644 index 0000000..81563b6 --- /dev/null +++ b/pyinfra/server/stream/queued_stream_function.py @@ -0,0 +1,16 @@ +from funcy import first + +from pyinfra.server.buffering.queue import stream_queue, Queue + + +class QueuedStreamFunction: + def __init__(self, fn): + self.queue = Queue() + self.fn = fn + + def push(self, item): + self.queue.append(item) + + def pop(self): + items = stream_queue(self.queue) + return first(self.fn(items)) diff --git a/pyinfra/server/stream/rest.py b/pyinfra/server/stream/rest.py new file mode 100644 index 0000000..3674913 --- /dev/null +++ b/pyinfra/server/stream/rest.py @@ -0,0 +1,40 @@ +import logging + +from flask import jsonify + +from pyinfra.server.dispatcher.dispatcher import Nothing +from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction + +logger = logging.getLogger() + + +class LazyRestProcessor: + def __init__(self, queued_stream_function: QueuedStreamFunction, submit_suffix="submit", pickup_suffix="pickup"): + self.submit_suffix = submit_suffix + self.pickup_suffix = pickup_suffix + self.queued_stream_function = queued_stream_function + + def push(self, request): + self.queued_stream_function.push(request.json) + return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix)) + + def pop(self): + result = self.queued_stream_function.pop() or Nothing + + if not valid(result): + logger.error(f"Received invalid result: {result}") + result = Nothing + + if result is Nothing: + resp = jsonify("No more items left") + resp.status_code = 204 + + else: + resp = jsonify(result) + resp.status_code = 206 + + return resp + + +def valid(result): + return isinstance(result, dict) or result is Nothing diff --git a/pyinfra/server/stream/utils.py b/pyinfra/server/stream/utils.py new file mode 100644 index 0000000..e69de29 diff --git a/pyinfra/server/utils.py b/pyinfra/server/utils.py index 6caefa7..e96d2b0 100644 --- a/pyinfra/server/utils.py +++ b/pyinfra/server/utils.py @@ -3,7 +3,7 @@ from itertools import takewhile, starmap, repeat, chain, tee from typing import Iterable, Callable, Dict, List, Union, Tuple import requests -from funcy import repeatedly, ilen, compose +from funcy import repeatedly, ilen, compose, identity from pyinfra.exceptions import UnexpectedItemType from pyinfra.utils.func import lift, star, starlift @@ -85,3 +85,17 @@ def inspect(msg="ins", embed=False): return x return inner + + +def make_streamable(fn, batched): + return compose(normalize, (identity if batched else starlift)(fn)) + + +def unpack_fn_pack(fn): + return compose(starlift(pack), fn, lift(unpack)) + + +def make_streamable_and_wrap_in_packing_logic(fn, batched): + fn = make_streamable(fn, batched) + fn = unpack_fn_pack(fn) + return fn diff --git a/test/fixtures/server.py b/test/fixtures/server.py index 1e89047..efd2b47 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -10,13 +10,13 @@ from PIL import Image from funcy import retry, first, compose, identity from waitress import serve -from pyinfra.server.bufferizer.lazy_bufferizer import FlatStreamBuffer +from pyinfra.server.buffering.stream import FlatStreamBuffer from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.server import ( set_up_processing_server, - make_streamable_and_wrap_in_packing_logic, - QueuedStreamFunction, ) +from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction +from pyinfra.server.utils import make_streamable_and_wrap_in_packing_logic from pyinfra.utils.func import starlift from test.utils.image import image_to_bytes diff --git a/test/unit_tests/server/buffer_test.py b/test/unit_tests/server/buffer_test.py index 732967c..674e537 100644 --- a/test/unit_tests/server/buffer_test.py +++ b/test/unit_tests/server/buffer_test.py @@ -1,7 +1,7 @@ from funcy import compose, lmapcat, compact, flatten from pyinfra.server.dispatcher.dispatcher import Nothing -from pyinfra.server.bufferizer.buffer import bufferize +from pyinfra.server.buffering.buffer import bufferize def test_buffer(): diff --git a/test/unit_tests/server/stream_buffer_test.py b/test/unit_tests/server/stream_buffer_test.py index 662ce3c..3df908a 100644 --- a/test/unit_tests/server/stream_buffer_test.py +++ b/test/unit_tests/server/stream_buffer_test.py @@ -1,9 +1,9 @@ import pytest from funcy import repeatedly, takewhile, notnone, lmap, lmapcat, lflatten -from pyinfra.server.bufferizer.lazy_bufferizer import FlatStreamBuffer, StreamBuffer +from pyinfra.server.buffering.stream import FlatStreamBuffer, StreamBuffer from pyinfra.server.dispatcher.dispatcher import Nothing -from pyinfra.server.server import QueuedStreamFunction +from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction from pyinfra.utils.func import lift, foreach