fixed buffering and result streaming: all items are yielded individually and computed on demand now

This commit is contained in:
Matthias Bisping 2022-05-06 12:14:09 +02:00
parent 1b04c46853
commit dca3eaaa54
6 changed files with 39 additions and 56 deletions

View File

@ -1,30 +1,26 @@
import abc
import logging
import traceback
from itertools import chain
from operator import attrgetter
from typing import Iterable
from flask import Flask, jsonify, request
from funcy import compose
from funcy import compose, flatten
from pyinfra.server.dispatcher.dispatcher import Nothing
from pyinfra.server.utils import unpack_batchop_pack, unpack_op_pack
from pyinfra.utils.buffer import bufferize
from pyinfra.utils.func import starlift, llift
from pyinfra.utils.func import lift
logger = logging.getLogger()
def make_processor_fn(operation, buffer_size, batched):
if batched:
operation = starlift(operation)
wrapper = unpack_batchop_pack if batched else compose(llift, unpack_op_pack)
wrapper = unpack_batchop_pack if batched else compose(lift, unpack_op_pack)
operation = wrapper(operation)
operation = bufferize(operation, buffer_size=buffer_size)
operation = compose(flatten, operation)
return bufferize(operation, buffer_size=buffer_size)
return operation
class Processor(abc.ABC):
@ -36,7 +32,7 @@ class Processor(abc.ABC):
"""Submit computation request to execution queue; computation is performed on demand.
Use for processor functions that are 1 -> n where n can take up a lot of memory, since
the """
the"""
self.execution_queue = chain(self.execution_queue, self.processor_fn(package, final=final))
def compute_next(self):
@ -65,7 +61,7 @@ def set_up_processing_server(process_fn):
@app.route("/pickup", methods=["GET"])
def pickup():
result = processor.compute_next()
# print(result)
if result is Nothing:
resp = jsonify("No more items left")
resp.status_code = 204

View File

@ -1,12 +1,12 @@
from _operator import itemgetter
from itertools import takewhile, starmap, repeat, chain
from itertools import takewhile, starmap, repeat, chain, tee
from typing import Iterable, Callable, Dict, List, Union, Tuple
import requests
from funcy import repeatedly, identity, ilen, compose
from funcy import repeatedly, ilen, compose
from pyinfra.exceptions import UnexpectedItemType
from pyinfra.utils.func import parallel_map, lift, lstarlift, star, starlift
from pyinfra.utils.func import lift, star, starlift
from test.utils.server import bytes_to_string, string_to_bytes
@ -30,10 +30,6 @@ def pack_data_and_metadata_for_rest_transfer(data: Iterable, metadata: Iterable)
yield from starmap(pack, zip(data, metadata))
def dispatch_http_method_left_and_forward_data_right(*args):
return parallel_map(dispatch_methods, lift(identity))(*args)
def pack(data: bytes, metadata: dict):
package = {"data": bytes_to_string(data), "metadata": metadata}
return package
@ -49,11 +45,11 @@ def dispatch_methods(data):
def unpack_op_pack(operation) -> Callable[[Dict], List[Dict]]:
return compose(lstarlift(pack), normalize, star(operation), unpack)
return compose(starlift(pack), normalize, star(operation), unpack)
def unpack_batchop_pack(operation) -> Callable[[List[Dict]], List[Dict]]:
return compose(lstarlift(pack), normalize, operation, lift(unpack))
return compose(starlift(pack), normalize, operation, lift(unpack))
def normalize(itr: Iterable[Union[Tuple, Iterable]]) -> Iterable[Tuple]:
@ -72,10 +68,14 @@ def normalize_item(itm: Union[Tuple, Iterable]) -> Iterable:
def inspect(msg="ins", embed=False):
def inner(x):
if isinstance(x, Iterable) and not isinstance(x, dict):
x = list(x)
if isinstance(x, Iterable) and not isinstance(x, dict) and not isinstance(x, tuple):
x, y = tee(x)
y = list(y)
else:
y = x
print(msg, x)
l = len(y) if isinstance(y, list) else ""
print(msg, l, y)
if embed:
import IPython

View File

@ -68,6 +68,10 @@ def mock_make_load_data():
return load_data
# def pytest_make_parametrize_id(config, val, argname):
# return f"\n\t{argname}={val}\n"
@pytest.fixture(params=["minio", "aws"], scope="session")
def storage(client_name, bucket_name, request, docker_compose):
logger.debug("Setup for storage")

View File

@ -42,7 +42,7 @@ def endpoint(url):
return f"{url}/submit"
@pytest.fixture(params=[0, 2, 5])
@pytest.fixture(params=[1, 0, 5])
def n_items(request):
return request.param
@ -71,7 +71,7 @@ def images(n_items):
@pytest.fixture
def metadata(n_items):
return list(repeat({"dummy": True}, n_items))
return list(repeat({}, n_items))
@pytest.fixture

View File

@ -1,19 +1,16 @@
import io
import socket
from multiprocessing import Process
from operator import attrgetter
import fitz
import pytest
import requests
from PIL import Image
from funcy import retry, compose
from funcy import retry
from waitress import serve
from pyinfra.server.utils import unpack_op_pack, unpack_batchop_pack
from pyinfra.utils.buffer import bufferize
from pyinfra.utils.func import llift, starlift
from pyinfra.server.server import set_up_processing_server, make_processor_fn
from pyinfra.utils.func import starlift
from test.utils.image import image_to_bytes
@ -44,8 +41,13 @@ def server(processor_fn):
@pytest.fixture
def processor_fn(operation, buffer_size, batched):
return make_processor_fn(operation, buffer_size, batched)
def processor_fn(operation_conditionally_batched, buffer_size, batched):
return make_processor_fn(operation_conditionally_batched, buffer_size, batched)
@pytest.fixture
def operation_conditionally_batched(operation, batched):
return starlift(operation) if batched else operation
@pytest.fixture
@ -68,7 +70,7 @@ def operation(item_type, batched):
raise ValueError(f"No operation specified for item type {item_type}")
@pytest.fixture(params=["string"])
@pytest.fixture(params=["pdf", "string", "image"])
def item_type(request):
return request.param

View File

@ -2,7 +2,6 @@ import pytest
from funcy import rcompose, lmap
from pyinfra.server.dispatcher.dispatchers.rest import RestDispatcher
from pyinfra.server.interpreter.interpreters.identity import IdentityInterpreter
from pyinfra.server.interpreter.interpreters.rest_callback import RestPickupStreamer
from pyinfra.server.packer.packers.rest import RestPacker
from pyinfra.server.pipeline import Pipeline
@ -22,27 +21,9 @@ def test_mock_pipeline():
assert list(pipeline(data)) == list(rcompose(f, g, h, u)(data))
@pytest.mark.parametrize(
"batched",
[
True,
False,
],
)
@pytest.mark.parametrize(
"item_type",
[
"pdf",
# "string",
# "image",
],
)
@pytest.mark.parametrize(
"n_pages",
[100],
)
def test_pipeline(pipeline, input_data_items, metadata, target_data_items):
output = lmap(unpack, pipeline(input_data_items, metadata))
output = pipeline(input_data_items, metadata)
output = lmap(unpack, output)
assert output == target_data_items
@ -58,4 +39,4 @@ def rest_pipeline(server_process, endpoint, rest_interpreter):
@pytest.fixture
def rest_interpreter():
rcompose(RestPickupStreamer(), RestReceiver())
return rcompose(RestPickupStreamer(), RestReceiver())