refactoring: move
This commit is contained in:
parent
5d2b71d647
commit
96bf831b00
24
pyinfra/server/buffering/queue.py
Normal file
24
pyinfra/server/buffering/queue.py
Normal file
@ -0,0 +1,24 @@
|
||||
from collections import deque
|
||||
from itertools import takewhile
|
||||
|
||||
from funcy import repeatedly
|
||||
|
||||
from pyinfra.server.dispatcher.dispatcher import is_not_nothing, Nothing
|
||||
|
||||
|
||||
def stream_queue(queue):
|
||||
yield from takewhile(is_not_nothing, repeatedly(queue.popleft))
|
||||
|
||||
|
||||
class Queue:
|
||||
def __init__(self):
|
||||
self.__queue = deque()
|
||||
|
||||
def append(self, package) -> None:
|
||||
self.__queue.append(package)
|
||||
|
||||
def popleft(self):
|
||||
return self.__queue.popleft() if self.__queue else Nothing
|
||||
|
||||
def __bool__(self):
|
||||
return bool(self.__queue)
|
||||
@ -1,33 +1,11 @@
|
||||
import logging
|
||||
from collections import deque
|
||||
from itertools import chain, takewhile
|
||||
from typing import Iterable
|
||||
|
||||
from funcy import first, repeatedly, flatten
|
||||
|
||||
from pyinfra.server.bufferizer.buffer import bufferize
|
||||
from pyinfra.server.buffering.buffer import bufferize
|
||||
from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def stream_queue(queue):
|
||||
yield from takewhile(is_not_nothing, repeatedly(queue.popleft))
|
||||
|
||||
|
||||
class Queue:
|
||||
def __init__(self):
|
||||
self.__queue = deque()
|
||||
|
||||
def append(self, package) -> None:
|
||||
self.__queue.append(package)
|
||||
|
||||
def popleft(self):
|
||||
return self.__queue.popleft() if self.__queue else Nothing
|
||||
|
||||
def __bool__(self):
|
||||
return bool(self.__queue)
|
||||
|
||||
|
||||
class FlatStreamBuffer:
|
||||
"""Wraps a stream buffer and chains its output. Also flushes the stream buffer when applied to an iterable."""
|
||||
@ -1,73 +1,7 @@
|
||||
import logging
|
||||
|
||||
from flask import Flask, jsonify, request
|
||||
from funcy import compose, identity, first
|
||||
|
||||
from pyinfra.server.bufferizer.lazy_bufferizer import Queue, stream_queue, FlatStreamBuffer
|
||||
from pyinfra.server.dispatcher.dispatcher import Nothing
|
||||
from pyinfra.server.utils import unpack, normalize, pack
|
||||
from pyinfra.utils.func import starlift, lift
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
|
||||
def make_streamable(fn, batched):
|
||||
return compose(normalize, (identity if batched else starlift)(fn))
|
||||
|
||||
|
||||
def unpack_fn_pack(fn):
|
||||
return compose(starlift(pack), fn, lift(unpack))
|
||||
|
||||
|
||||
def make_streamable_and_wrap_in_packing_logic(fn, batched):
|
||||
fn = make_streamable(fn, batched)
|
||||
fn = unpack_fn_pack(fn)
|
||||
return fn
|
||||
|
||||
|
||||
class QueuedStreamFunction:
|
||||
def __init__(self, fn):
|
||||
self.queue = Queue()
|
||||
self.fn = fn
|
||||
|
||||
def push(self, item):
|
||||
self.queue.append(item)
|
||||
|
||||
def pop(self):
|
||||
items = stream_queue(self.queue)
|
||||
return first(self.fn(items))
|
||||
|
||||
|
||||
class LazyRestProcessor:
|
||||
def __init__(self, queued_stream_function: QueuedStreamFunction, submit_suffix="submit", pickup_suffix="pickup"):
|
||||
self.submit_suffix = submit_suffix
|
||||
self.pickup_suffix = pickup_suffix
|
||||
self.queued_stream_function = queued_stream_function
|
||||
|
||||
def push(self, request):
|
||||
self.queued_stream_function.push(request.json)
|
||||
return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix))
|
||||
|
||||
def pop(self):
|
||||
result = self.queued_stream_function.pop() or Nothing
|
||||
|
||||
if not valid(result):
|
||||
logger.error(f"Received invalid result: {result}")
|
||||
result = Nothing
|
||||
|
||||
if result is Nothing:
|
||||
resp = jsonify("No more items left")
|
||||
resp.status_code = 204
|
||||
|
||||
else:
|
||||
resp = jsonify(result)
|
||||
resp.status_code = 206
|
||||
|
||||
return resp
|
||||
|
||||
|
||||
def valid(result):
|
||||
return isinstance(result, dict) or result is Nothing
|
||||
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
|
||||
from pyinfra.server.stream.rest import LazyRestProcessor
|
||||
|
||||
|
||||
def set_up_processing_server(queued_stream_function: QueuedStreamFunction):
|
||||
|
||||
0
pyinfra/server/stream/__init__.py
Normal file
0
pyinfra/server/stream/__init__.py
Normal file
16
pyinfra/server/stream/queued_stream_function.py
Normal file
16
pyinfra/server/stream/queued_stream_function.py
Normal file
@ -0,0 +1,16 @@
|
||||
from funcy import first
|
||||
|
||||
from pyinfra.server.buffering.queue import stream_queue, Queue
|
||||
|
||||
|
||||
class QueuedStreamFunction:
|
||||
def __init__(self, fn):
|
||||
self.queue = Queue()
|
||||
self.fn = fn
|
||||
|
||||
def push(self, item):
|
||||
self.queue.append(item)
|
||||
|
||||
def pop(self):
|
||||
items = stream_queue(self.queue)
|
||||
return first(self.fn(items))
|
||||
40
pyinfra/server/stream/rest.py
Normal file
40
pyinfra/server/stream/rest.py
Normal file
@ -0,0 +1,40 @@
|
||||
import logging
|
||||
|
||||
from flask import jsonify
|
||||
|
||||
from pyinfra.server.dispatcher.dispatcher import Nothing
|
||||
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
|
||||
class LazyRestProcessor:
|
||||
def __init__(self, queued_stream_function: QueuedStreamFunction, submit_suffix="submit", pickup_suffix="pickup"):
|
||||
self.submit_suffix = submit_suffix
|
||||
self.pickup_suffix = pickup_suffix
|
||||
self.queued_stream_function = queued_stream_function
|
||||
|
||||
def push(self, request):
|
||||
self.queued_stream_function.push(request.json)
|
||||
return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix))
|
||||
|
||||
def pop(self):
|
||||
result = self.queued_stream_function.pop() or Nothing
|
||||
|
||||
if not valid(result):
|
||||
logger.error(f"Received invalid result: {result}")
|
||||
result = Nothing
|
||||
|
||||
if result is Nothing:
|
||||
resp = jsonify("No more items left")
|
||||
resp.status_code = 204
|
||||
|
||||
else:
|
||||
resp = jsonify(result)
|
||||
resp.status_code = 206
|
||||
|
||||
return resp
|
||||
|
||||
|
||||
def valid(result):
|
||||
return isinstance(result, dict) or result is Nothing
|
||||
0
pyinfra/server/stream/utils.py
Normal file
0
pyinfra/server/stream/utils.py
Normal file
@ -3,7 +3,7 @@ from itertools import takewhile, starmap, repeat, chain, tee
|
||||
from typing import Iterable, Callable, Dict, List, Union, Tuple
|
||||
|
||||
import requests
|
||||
from funcy import repeatedly, ilen, compose
|
||||
from funcy import repeatedly, ilen, compose, identity
|
||||
|
||||
from pyinfra.exceptions import UnexpectedItemType
|
||||
from pyinfra.utils.func import lift, star, starlift
|
||||
@ -85,3 +85,17 @@ def inspect(msg="ins", embed=False):
|
||||
return x
|
||||
|
||||
return inner
|
||||
|
||||
|
||||
def make_streamable(fn, batched):
|
||||
return compose(normalize, (identity if batched else starlift)(fn))
|
||||
|
||||
|
||||
def unpack_fn_pack(fn):
|
||||
return compose(starlift(pack), fn, lift(unpack))
|
||||
|
||||
|
||||
def make_streamable_and_wrap_in_packing_logic(fn, batched):
|
||||
fn = make_streamable(fn, batched)
|
||||
fn = unpack_fn_pack(fn)
|
||||
return fn
|
||||
|
||||
6
test/fixtures/server.py
vendored
6
test/fixtures/server.py
vendored
@ -10,13 +10,13 @@ from PIL import Image
|
||||
from funcy import retry, first, compose, identity
|
||||
from waitress import serve
|
||||
|
||||
from pyinfra.server.bufferizer.lazy_bufferizer import FlatStreamBuffer
|
||||
from pyinfra.server.buffering.stream import FlatStreamBuffer
|
||||
from pyinfra.server.dispatcher.dispatcher import Nothing
|
||||
from pyinfra.server.server import (
|
||||
set_up_processing_server,
|
||||
make_streamable_and_wrap_in_packing_logic,
|
||||
QueuedStreamFunction,
|
||||
)
|
||||
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
|
||||
from pyinfra.server.utils import make_streamable_and_wrap_in_packing_logic
|
||||
from pyinfra.utils.func import starlift
|
||||
from test.utils.image import image_to_bytes
|
||||
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
from funcy import compose, lmapcat, compact, flatten
|
||||
|
||||
from pyinfra.server.dispatcher.dispatcher import Nothing
|
||||
from pyinfra.server.bufferizer.buffer import bufferize
|
||||
from pyinfra.server.buffering.buffer import bufferize
|
||||
|
||||
|
||||
def test_buffer():
|
||||
|
||||
@ -1,9 +1,9 @@
|
||||
import pytest
|
||||
from funcy import repeatedly, takewhile, notnone, lmap, lmapcat, lflatten
|
||||
|
||||
from pyinfra.server.bufferizer.lazy_bufferizer import FlatStreamBuffer, StreamBuffer
|
||||
from pyinfra.server.buffering.stream import FlatStreamBuffer, StreamBuffer
|
||||
from pyinfra.server.dispatcher.dispatcher import Nothing
|
||||
from pyinfra.server.server import QueuedStreamFunction
|
||||
from pyinfra.server.stream.queued_stream_function import QueuedStreamFunction
|
||||
from pyinfra.utils.func import lift, foreach
|
||||
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user