Merge branch 'master' of ssh://git.iqser.com:2222/rr/pyinfra into fixing_consumer_tests

This commit is contained in:
Matthias Bisping 2022-06-07 15:44:39 +02:00
commit 8d209d63c7
12 changed files with 212 additions and 116 deletions

View File

@ -29,6 +29,7 @@ storage:
endpoint: $STORAGE_ENDPOINT|"http://127.0.0.1:9000"
access_key: $STORAGE_KEY|root
secret_key: $STORAGE_SECRET|password
region: $STORAGE_REGION|"eu-west-1"
azure:
connection_string: $STORAGE_AZURECONNECTIONSTRING|"DefaultEndpointsProtocol=https;AccountName=iqserdevelopment;AccountKey=4imAbV9PYXaztSOMpIyAClg88bAZCXuXMGJG0GA1eIBpdh2PlnFGoRBnKqLy2YZUSTmZ3wJfC7tzfHtuC6FEhQ==;EndpointSuffix=core.windows.net"

View File

@ -5,7 +5,7 @@ from itertools import islice
import pika
from pyinfra.config import CONFIG
from pyinfra.exceptions import ProcessingFailure
from pyinfra.exceptions import ProcessingFailure, DataLoadingFailure
from pyinfra.queue.queue_manager.queue_manager import QueueHandle, QueueManager
from pyinfra.visitor import QueueVisitor
@ -97,7 +97,7 @@ class PikaQueueManager(QueueManager):
self.channel.basic_publish("", self._input_queue, json.dumps(request).encode())
def reject(self, body, frame):
logger.exception(f"Adding to dead letter queue: {body}")
logger.error(f"Adding to dead letter queue: {body}")
self.channel.basic_reject(delivery_tag=frame.delivery_tag, requeue=False)
def publish_response(self, message, visitor: QueueVisitor, max_attempts=3):
@ -120,8 +120,7 @@ class PikaQueueManager(QueueManager):
self.channel.basic_publish("", self._output_queue, response_message)
self.channel.basic_ack(frame.delivery_tag)
except ProcessingFailure:
except (ProcessingFailure, DataLoadingFailure):
logger.error(f"Message failed to process {n_attempts}/{max_attempts} times: {body}")

View File

@ -5,6 +5,7 @@ from operator import attrgetter
from minio import Minio
from pyinfra.exceptions import DataLoadingFailure
from pyinfra.storage.adapters.adapter import StorageAdapter
logger = logging.getLogger()
@ -34,6 +35,8 @@ class S3StorageAdapter(StorageAdapter):
try:
response = self.__client.get_object(bucket_name, object_name)
return response.data
except Exception as err:
raise DataLoadingFailure("Failed getting object from s3 client") from err
finally:
if response:
response.close()

View File

@ -32,4 +32,9 @@ def get_s3_client(params=None) -> Minio:
if not params:
params = CONFIG.storage.s3
return Minio(**parse_endpoint(params.endpoint), access_key=params.access_key, secret_key=params.secret_key)
return Minio(
**parse_endpoint(params.endpoint),
access_key=params.access_key,
secret_key=params.secret_key,
region=params.region,
)

View File

@ -1,5 +1,14 @@
import logging
from retry import retry
from pyinfra.config import CONFIG
from pyinfra.exceptions import DataLoadingFailure
from pyinfra.storage.adapters.adapter import StorageAdapter
logger = logging.getLogger(__name__)
logger.setLevel(CONFIG.service.logging_level)
class Storage:
def __init__(self, adapter: StorageAdapter):
@ -15,7 +24,15 @@ class Storage:
self.__adapter.put_object(bucket_name, object_name, data)
def get_object(self, bucket_name, object_name):
return self.__adapter.get_object(bucket_name, object_name)
return self.__get_object(bucket_name, object_name)
@retry(DataLoadingFailure, tries=3, delay=5, jitter=(1, 3))
def __get_object(self, bucket_name, object_name):
try:
return self.__adapter.get_object(bucket_name, object_name)
except Exception as err:
logging.error(err)
raise DataLoadingFailure from err
def get_all_objects(self, bucket_name):
return self.__adapter.get_all_objects(bucket_name)

View File

@ -218,7 +218,7 @@ class QueueVisitor:
data = self.storage.get_object(**object_descriptor)
except Exception as err:
logging.warning(f"Loading data from storage failed for {object_descriptor}.")
raise DataLoadingFailure() from err
raise DataLoadingFailure from err
return data

View File

@ -11,7 +11,10 @@ def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--bucket_name", "-b", required=True)
parser.add_argument(
"--analysis_container", "-a", choices=["detr", "ner", "image", "conversion", "extraction"], required=True
"--analysis_container",
"-a",
choices=["detr", "ner", "image", "conversion", "extraction", "dl_error"],
required=True,
)
args = parser.parse_args()
return args
@ -62,6 +65,8 @@ def build_message_bodies(analyse_container_type, bucket_name):
message_dict.update(
{"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "json.gz", "operation": "extraction"}
)
if analyse_container_type == "dl_error":
message_dict.update({"targetFileExtension": "no_such_file", "responseFileExtension": "IMAGE_INFO.json.gz"})
if analyse_container_type == "ner":
message_dict.update(
{"targetFileExtension": "TEXT.json.gz", "responseFileExtension": "NER_ENTITIES.json.gz"}
@ -89,7 +94,9 @@ def main(args):
channel.basic_publish("", CONFIG.rabbitmq.queues.input, body)
print(f"Put {body} on {CONFIG.rabbitmq.queues.input}")
for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output):
for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output, inactivity_timeout=1):
if not body:
break
print(f"Received {json.loads(body)}")
channel.basic_ack(method_frame.delivery_tag)
channel.close()

View File

@ -9,18 +9,17 @@ from pyinfra.exceptions import ConsumerError
from pyinfra.flask import run_probing_webserver, set_up_probing_webserver
from pyinfra.utils.banner import show_banner
logger = logging.getLogger()
@retry(ConsumerError, tries=3, delay=5, jitter=(1, 3))
def consume():
consumer = get_consumer()
try:
consumer = get_consumer()
consumer.basic_consume_and_publish()
except Exception as err:
logger.exception(err)
raise ConsumerError() from err
raise ConsumerError from err
def main():

View File

@ -3,11 +3,13 @@ storage:
endpoint: "http://127.0.0.1:9000"
access_key: root
secret_key: password
region: null
aws:
endpoint: https://s3.amazonaws.com
access_key: AKIA4QVP6D4LCDAGYGN2
secret_key: 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
region: $STORAGE_REGION|"eu-west-1"
azure:
connection_string: "DefaultEndpointsProtocol=https;AccountName=iqserdevelopment;AccountKey=4imAbV9PYXaztSOMpIyAClg88bAZCXuXMGJG0GA1eIBpdh2PlnFGoRBnKqLy2YZUSTmZ3wJfC7tzfHtuC6FEhQ==;EndpointSuffix=core.windows.net"

View File

@ -1,37 +1,79 @@
# import json
# from operator import itemgetter
#
# import pytest
# import requests
#
#
# def test_server_ready_check(url):
# response = requests.get(f"{url}/ready")
# response.raise_for_status()
# return response.status_code == 200
#
#
# @pytest.fixture
# def client(client_maker):
# return client_maker(processor_fn)
#
#
# def processor_fn(request):
# payload = json.loads(request.json)
# data = payload["data"].encode()
# metadata = payload["metadata"]
# response_payload = {"metadata_type": str(type(metadata)), "data_type": str(type(data))}
# return response_payload
#
#
# @pytest.mark.parametrize("data_type", ["pdf", "bytestring"])
# def test_sending_bytes_through_json(url, data):
# payload = {"data": data.decode("latin1"), "metadata": {"A": 1, "B": [2, 3]}}
#
# response = requests.post(f"{url}/process", json=json.dumps(payload))
#
# response_payload = response.json
# data_type, metadata_type = itemgetter("data_type", "metadata_type")(response_payload)
#
# assert data_type == "<class 'bytes'>"
# assert metadata_type == "<class 'dict'>"
import json
from operator import itemgetter
import pytest
from flask import Flask, request, jsonify
import fpdf
def set_up_processing_server():
app = Flask(__name__)
@app.route("/ready", methods=["GET"])
def ready():
resp = jsonify("OK")
resp.status_code = 200
return resp
@app.route("/process", methods=["POST"])
def process():
payload = json.loads(request.json)
data = payload["data"].encode()
metadata = payload["metadata"]
response_payload = {"metadata_type": str(type(metadata)), "data_type": str(type(data))}
return jsonify(response_payload)
return app
@pytest.fixture
def server():
server = set_up_processing_server()
server.config.update({"TESTING": True})
return server
@pytest.fixture
def client(server):
return server.test_client()
def test_server_ready_check(client):
response = client.get("/ready")
assert response.status_code == 200
assert response.json == "OK"
@pytest.mark.parametrize("data_type", ["pdf", "bytestring"])
def test_sending_bytes_through_json(client, data):
payload = {"data": data.decode("latin1"), "metadata": {"A": 1, "B": [2, 3]}}
response = client.post("/process", json=json.dumps(payload))
response_payload = response.json
data_type, metadata_type = itemgetter("data_type", "metadata_type")(response_payload)
assert data_type == "<class 'bytes'>"
assert metadata_type == "<class 'dict'>"
@pytest.fixture
def pdf():
pdf = fpdf.FPDF(unit="pt")
pdf.add_page()
return pdf_stream(pdf)
def pdf_stream(pdf: fpdf.fpdf.FPDF):
return pdf.output(dest="S").encode("latin1")
@pytest.fixture
def data(data_type, pdf):
if data_type == "pdf":
return pdf
elif data_type == "bytestring":
return "content".encode("latin1")

View File

@ -3,13 +3,38 @@ import logging
from operator import itemgetter
import pytest
from funcy import lmap, lmapcat
from pyinfra.exceptions import ProcessingFailure
from pyinfra.utils.encoding import pack_for_upload
from pyinfra.queue.consumer import Consumer
from pyinfra.visitor import get_object_descriptor, ForwardingStrategy
logger = logging.getLogger()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
@pytest.fixture(scope="session")
def consumer(queue_manager, callback):
return Consumer(callback, queue_manager)
@pytest.fixture(scope="session")
def access_callback():
return itemgetter("fileId")
@pytest.fixture()
def items():
def inner():
for i in range(3):
body = {
"dossierId": "folder",
"fileId": f"file{i}",
"targetFileExtension": "in.gz",
"responseFileExtension": "out.gz",
}
yield f"{i}".encode(), body
return list(inner())
class TestConsumer:
@ -19,66 +44,56 @@ class TestConsumer:
consumer.consume()
assert queue_manager.output_queue.empty()
# @pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session")
# def test_consuming_nonempty_input_queue_puts_messages_on_output_queue_in_fifo_order(
# self, consumer, queue_manager, callback
# ):
# def produce_items():
# return map(str, range(3))
#
# def mock_visitor(callback):
# def inner(data):
# print("data", data)
# return callback({"data": data.encode()})
#
# return inner
#
# callback = mock_visitor(callback)
#
# print(11111111111)
# queue_manager.clear()
# print(22222222222)
#
# for item in produce_items():
# print("item", item)
# queue_manager.publish_request(item)
#
# requests = consumer.consume(n=3)
# print(33333333333)
#
# print(requests)
# print(list(produce_items()))
# print(44444444444)
# for _, r in zip(produce_items(), requests):
# print(1)
# queue_manager.publish_response(r, callback)
# print(55555555555)
#
# assert queue_manager.output_queue.to_list() == ["00", "11", "22"]
@pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session")
def test_consuming_nonempty_input_queue_puts_messages_on_output_queue_in_fifo_order(
self, consumer, queue_manager, callback
):
def produce_items():
return map(str, range(3))
# @pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session")
# @pytest.mark.parametrize("client_name", ["mock", "s3", "azure"], scope="session")
# @pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session")
# def test_consuming_nonempty_input_queue_with_visitor_puts_messages_on_output_queue_in_fifo_order(
# self, consumer, queue_manager, visitor, bucket_name, storage, items, input_data_encoder
# ):
#
# visitor.response_strategy = ForwardingStrategy()
#
# queue_manager.clear()
# storage.clear_bucket(bucket_name)
#
# for data, message in items:
# storage.put_object(**get_object_descriptor(message), data=pack_for_upload(data))
# queue_manager.publish_request(message)
#
# requests = consumer.consume(inactivity_timeout=5)
#
# for itm, req in zip(items, requests):
# logger.debug(f"Processing item {itm}")
# queue_manager.publish_response(req, visitor)
#
# assert lmapcat(itemgetter("data"), queue_manager.output_queue.to_list()) == ["00", "11", "22"]
def mock_visitor(callback):
def inner(data):
return callback({"data": data.encode()})
return inner
callback = mock_visitor(callback)
queue_manager.clear()
for item in produce_items():
queue_manager.publish_request(item)
requests = consumer.consume()
for _, r in zip(produce_items(), requests):
queue_manager.publish_response(r, callback)
assert queue_manager.output_queue.to_list() == ["00", "11", "22"]
@pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session")
@pytest.mark.parametrize("client_name", ["mock", "s3", "azure"], scope="session")
@pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session")
def test_consuming_nonempty_input_queue_with_visitor_puts_messages_on_output_queue_in_fifo_order(
self, consumer, queue_manager, visitor, bucket_name, storage, items
):
visitor.response_strategy = ForwardingStrategy()
queue_manager.clear()
storage.clear_bucket(bucket_name)
for data, message in items:
storage.put_object(**get_object_descriptor(message), data=gzip.compress(data))
queue_manager.publish_request(message)
requests = consumer.consume(inactivity_timeout=5)
for itm, req in zip(items, requests):
logger.debug(f"Processing item {itm}")
queue_manager.publish_response(req, visitor)
assert list(map(itemgetter("data"), queue_manager.output_queue.to_list())) == ["00", "11", "22"]
@pytest.mark.parametrize("queue_manager_name", ["pika"], scope="session")
def test_message_is_republished_when_callback_raises_processing_failure_exception(
@ -88,10 +103,10 @@ class TestConsumer:
pass
def callback(_):
raise ProcessingFailure
raise ProcessingFailure()
def reject_patch(*args, **kwargs):
raise DebugError
raise DebugError()
queue_manager.reject = reject_patch
@ -105,8 +120,6 @@ class TestConsumer:
logger = logging.getLogger("pyinfra.queue.queue_manager.pika_queue_manager")
logger.addFilter(lambda record: False)
# TODO: for some reason this code now interferes with the commented out tests. all tests work on their own, but
# not together.
with pytest.raises(DebugError):
while True:
queue_manager.publish_response(next(requests), callback)

View File

@ -2,7 +2,10 @@ import logging
import pytest
logger = logging.getLogger()
from pyinfra.exceptions import DataLoadingFailure
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
@pytest.mark.parametrize("client_name", ["mock", "azure", "s3"], scope="session")
@ -42,3 +45,8 @@ class TestStorage:
storage.put_object(bucket_name, "file2", b"content 2")
full_names_received = storage.get_all_object_names(bucket_name)
assert {"file1", "file2"} == {*full_names_received}
def test_data_loading_failure_raised_if_object_not_present(self, storage, bucket_name):
storage.clear_bucket(bucket_name)
with pytest.raises(DataLoadingFailure):
storage.get_object(bucket_name, "folder/file")