refactoring: introduced queue-buffer-coupler; introduced recursion depth issie -- fixing next

This commit is contained in:
Matthias Bisping 2022-05-11 12:26:56 +02:00
parent 1eb4dbc657
commit 7bd35dce67
3 changed files with 94 additions and 33 deletions

View File

@ -14,6 +14,9 @@ def bufferize(fn, buffer_size=3, persist_fn=identity, null_value=None):
if item is not Nothing:
buffer.append(persist_fn(item))
# print("ITEM", item)
# print(buffer)
response_payload = fn(repeatedly(buffer.popleft, n_items_to_pop(buffer, item is Nothing)))
return response_payload or null_value

View File

@ -1,12 +1,11 @@
import logging
from collections import deque
from itertools import chain, takewhile
from typing import Union, Any
from itertools import chain
from funcy import flatten, compose, repeatedly
from funcy import first
from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing
from pyinfra.utils.func import lift
from pyinfra.server.bufferizer.buffer import bufferize
from pyinfra.server.dispatcher.dispatcher import Nothing
logger = logging.getLogger(__name__)
@ -21,37 +20,98 @@ class Queue:
def popleft(self):
return self.__queue.popleft() if self.__queue else Nothing
def __bool__(self):
return bool(self.__queue)
class QueueStreamer:
def __init__(self, queue: Queue):
self.queue = queue
def __call__(self):
return self.stream_queue()
# class QueueStreamer:
# def __init__(self, queue: Queue):
# self.queue = queue
#
# def __call__(self):
# return self.stream_queue()
#
# def stream_queue(self):
# yield from takewhile(is_not_nothing, repeatedly(self.queue.popleft))
# yield Nothing
def stream_queue(self):
yield from takewhile(is_not_nothing, repeatedly(self.queue.popleft))
# class OutputBuffer:
# def __init__(self, fn, queue_streamer):
# self.fn = fn
# self.queue_streamer = queue_streamer
# self.result_stream = chain([])
#
# def __call__(self):
# return self.compute_next()
#
# def compute_next(self) -> Union[Nothing, Any]:
# try:
# return next(self.result_stream)
# except StopIteration:
# self.result_stream = chain(self.result_stream, self.compute())
# return self.compute_next() # Produces stream of `Nothing` if execution queue is empty
# except TypeError as err:
# raise TypeError("Function failed with type error. Is it mappable?") from err
#
# def compute(self):
# yield from compose(flatten, lift(self.fn))(self.queue_streamer())
# yield Nothing
# class OutputBuffer:
# def __init__(self, fn):
# self.fn = fn
# self.result_queue = Queue()
#
# def compute(self, *args, **kwargs):
# for r in self.fn(*args, **kwargs):
# self.result_queue.append(r)
#
# def get_next(self):
# print(self.result_queue)
# return self.result_queue.popleft()
#
# def __bool__(self):
# return bool(self.result_queue)
class InputOutputBuffer:
def __init__(self, fn, buffer_size=3):
self.fn = bufferize(fn, buffer_size=buffer_size, null_value=[])
self.result_stream = chain([])
def compute_next(self, item):
try:
self.result_stream = chain(self.result_stream, self.compute(item))
except TypeError as err:
raise TypeError("Function failed with type error. Is it mappable?") from err
def compute(self, item):
yield from self.fn(item)
yield Nothing
def get_next(self):
return first(chain(self.result_stream, [Nothing]))
class OutputBuffer:
def __init__(self, fn, queue_streamer):
self.fn = fn
self.queue_streamer = queue_streamer
self.result_stream = chain([])
class QueueBufferCoupler:
def __init__(self, queue: Queue, buffer: InputOutputBuffer):
self.queue = queue
self.buffer = buffer
def __call__(self):
return self.compute_next()
def compute_next(self) -> Union[Nothing, Any]:
try:
return next(self.result_stream)
except StopIteration:
self.result_stream = chain(self.result_stream, self.compute())
return self.compute_next() # Produces stream of `Nothing` if execution queue is empty
except TypeError as err:
raise TypeError("Function failed with type error. Is it mappable?") from err
def compute_next(self):
item = self.buffer.get_next()
def compute(self):
yield from compose(flatten, lift(self.fn))(self.queue_streamer())
yield Nothing
if item is Nothing:
item = self.queue.popleft()
print(">>", item)
self.buffer.compute_next(item)
return self.compute_next()
else:
return item

View File

@ -4,7 +4,7 @@ from flask import Flask, jsonify, request
from funcy import rcompose
from pyinfra.server.bufferizer.buffer import bufferize
from pyinfra.server.bufferizer.lazy_bufferizer import QueueStreamer, Queue, OutputBuffer
from pyinfra.server.bufferizer.lazy_bufferizer import Queue, InputOutputBuffer, QueueBufferCoupler
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.utils import unpack, normalize, pack
from pyinfra.utils.func import starlift, lift
@ -49,16 +49,14 @@ class RestStreamProcessor:
self.submit_suffix = submit_suffix
self.pickup_suffix = pickup_suffix
self.queue = Queue()
self.queue_processor = OutputBuffer(
bufferize(fn, buffer_size=3, null_value=[]), queue_streamer=QueueStreamer(self.queue)
)
self.processor = QueueBufferCoupler(self.queue, InputOutputBuffer(fn))
def submit(self, request):
self.queue.append(request.json)
return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix))
def pickup(self):
result = self.queue_processor()
result = self.processor()
if not valid(result):
logger.error(f"Received invalid result: {result}")