refactoring: further simplified queue consuming function; added one -> many test fixture param

This commit is contained in:
Matthias Bisping 2022-05-12 17:55:45 +02:00
parent 461c0fe6a6
commit 8b0c2d4e07
3 changed files with 21 additions and 16 deletions

View File

@ -2,7 +2,7 @@ import logging
from collections import deque from collections import deque
from itertools import chain, takewhile from itertools import chain, takewhile
from funcy import first, repeatedly from funcy import first, repeatedly, compose
from pyinfra.server.bufferizer.buffer import bufferize from pyinfra.server.bufferizer.buffer import bufferize
from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing from pyinfra.server.dispatcher.dispatcher import Nothing, is_not_nothing
@ -119,19 +119,15 @@ class StreamBuffer:
# return Nothing # return Nothing
def make_queue_consumer(buffer: StreamBuffer): def make_consumer(buffer: StreamBuffer):
def compute_next(queue): def compute(items):
return next(compute(queue)) yield from consume(items)
def compute(queue):
yield from consume_queue(queue)
yield from flush_buffer() yield from flush_buffer()
yield signal_termination() yield signal_termination()
def consume_queue(queue): def consume(items):
queue_items = stream_queue(queue) yield from chain.from_iterable(map(buffer, items))
yield from chain.from_iterable(map(buffer, queue_items))
def flush_buffer(): def flush_buffer():
yield from buffer(Nothing) yield from buffer(Nothing)
@ -139,7 +135,7 @@ def make_queue_consumer(buffer: StreamBuffer):
def signal_termination(): def signal_termination():
return Nothing return Nothing
return compute_next return compose(first, compute)
def stream_queue(queue): def stream_queue(queue):

View File

@ -3,7 +3,7 @@ import logging
from flask import Flask, jsonify, request from flask import Flask, jsonify, request
from funcy import compose, identity from funcy import compose, identity
from pyinfra.server.bufferizer.lazy_bufferizer import Queue, StreamBuffer, make_queue_consumer from pyinfra.server.bufferizer.lazy_bufferizer import Queue, StreamBuffer, stream_queue, make_consumer
from pyinfra.server.dispatcher.dispatcher import Nothing from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.utils import unpack, normalize, pack from pyinfra.server.utils import unpack, normalize, pack
from pyinfra.utils.func import starlift, lift from pyinfra.utils.func import starlift, lift
@ -44,14 +44,14 @@ class RestStreamProcessor:
self.queue = Queue() self.queue = Queue()
# self.processor = QueueBufferCoupler(self.queue, StreamBuffer(fn)) # self.processor = QueueBufferCoupler(self.queue, StreamBuffer(fn))
# self.output_stream = chain.from_iterable(map(StreamBuffer(fn), stream_queue(self.queue))) # self.output_stream = chain.from_iterable(map(StreamBuffer(fn), stream_queue(self.queue)))
self.fn = make_queue_consumer(StreamBuffer(fn)) self.fn = make_consumer(StreamBuffer(fn))
def submit(self, request): def submit(self, request):
self.queue.append(request.json) self.queue.append(request.json)
return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix)) return jsonify(request.base_url.replace(self.submit_suffix, self.pickup_suffix))
def pickup(self): def pickup(self):
result = self.fn(self.queue) result = self.fn(stream_queue(self.queue))
if not valid(result): if not valid(result):
logger.error(f"Received invalid result: {result}") logger.error(f"Received invalid result: {result}")

View File

@ -51,10 +51,14 @@ def operation_conditionally_batched(operation, batched):
@pytest.fixture @pytest.fixture
def operation(item_type, batched): def operation(item_type, batched, one_to_many):
def upper(string: bytes, metadata): def upper(string: bytes, metadata):
return string.decode().upper().encode(), metadata return string.decode().upper().encode(), metadata
def duplicate(string: bytes, metadata):
for _ in range(2):
yield upper(string, metadata)
def rotate(im: bytes, metadata): def rotate(im: bytes, metadata):
im = Image.open(io.BytesIO(im)) im = Image.open(io.BytesIO(im))
return image_to_bytes(im.rotate(90)), metadata return image_to_bytes(im.rotate(90)), metadata
@ -65,7 +69,7 @@ def operation(item_type, batched):
yield f"page_{i}".encode(), metadata yield f"page_{i}".encode(), metadata
try: try:
return {"string": upper, "image": rotate, "pdf": stream_pages}[item_type] return {"string": duplicate if one_to_many else upper, "image": rotate, "pdf": stream_pages}[item_type]
except KeyError: except KeyError:
raise ValueError(f"No operation specified for item type {item_type}") raise ValueError(f"No operation specified for item type {item_type}")
@ -75,6 +79,11 @@ def item_type(request):
return request.param return request.param
@pytest.fixture(params=[True, False])
def one_to_many(request):
return request.param
@pytest.fixture(params=[False, True]) @pytest.fixture(params=[False, True])
def batched(request): def batched(request):
"""Controls, whether the buffer processor function of the webserver is applied to batches or single items.""" """Controls, whether the buffer processor function of the webserver is applied to batches or single items."""