consumer test adjustment WIP

This commit is contained in:
Matthias Bisping 2022-06-07 15:11:38 +02:00
parent 91701929e5
commit 0dee98b23d

View File

@ -3,8 +3,10 @@ import logging
from operator import itemgetter
import pytest
from funcy import lmap, lmapcat
from pyinfra.exceptions import ProcessingFailure
from pyinfra.utils.encoding import pack_for_upload
from pyinfra.visitor import get_object_descriptor, ForwardingStrategy
logger = logging.getLogger()
@ -17,83 +19,94 @@ class TestConsumer:
consumer.consume()
assert queue_manager.output_queue.empty()
@pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session")
def test_consuming_nonempty_input_queue_puts_messages_on_output_queue_in_fifo_order(
self, consumer, queue_manager, callback
# @pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session")
# def test_consuming_nonempty_input_queue_puts_messages_on_output_queue_in_fifo_order(
# self, consumer, queue_manager, callback
# ):
# def produce_items():
# return map(str, range(3))
#
# def mock_visitor(callback):
# def inner(data):
# print("data", data)
# return callback({"data": data.encode()})
#
# return inner
#
# callback = mock_visitor(callback)
#
# print(11111111111)
# queue_manager.clear()
# print(22222222222)
#
# for item in produce_items():
# print("item", item)
# queue_manager.publish_request(item)
#
# requests = consumer.consume(n=3)
# print(33333333333)
#
# print(requests)
# print(list(produce_items()))
# print(44444444444)
# for _, r in zip(produce_items(), requests):
# print(1)
# queue_manager.publish_response(r, callback)
# print(55555555555)
#
# assert queue_manager.output_queue.to_list() == ["00", "11", "22"]
# @pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session")
# @pytest.mark.parametrize("client_name", ["mock", "s3", "azure"], scope="session")
# @pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session")
# def test_consuming_nonempty_input_queue_with_visitor_puts_messages_on_output_queue_in_fifo_order(
# self, consumer, queue_manager, visitor, bucket_name, storage, items, input_data_encoder
# ):
#
# visitor.response_strategy = ForwardingStrategy()
#
# queue_manager.clear()
# storage.clear_bucket(bucket_name)
#
# for data, message in items:
# storage.put_object(**get_object_descriptor(message), data=pack_for_upload(data))
# queue_manager.publish_request(message)
#
# requests = consumer.consume(inactivity_timeout=5)
#
# for itm, req in zip(items, requests):
# logger.debug(f"Processing item {itm}")
# queue_manager.publish_response(req, visitor)
#
# assert lmapcat(itemgetter("data"), queue_manager.output_queue.to_list()) == ["00", "11", "22"]
@pytest.mark.parametrize("queue_manager_name", ["pika"], scope="session")
def test_message_is_republished_when_callback_raises_processing_failure_exception(
self, consumer, queue_manager, bucket_name, items
):
def produce_items():
return map(str, range(3))
class DebugError(Exception):
pass
def mock_visitor(callback):
def inner(data):
return callback({"data": data.encode()})
def callback(_):
raise ProcessingFailure
return inner
def reject_patch(*args, **kwargs):
raise DebugError
callback = mock_visitor(callback)
queue_manager.reject = reject_patch
queue_manager.clear()
for item in produce_items():
queue_manager.publish_request(item)
for data, message in items:
queue_manager.publish_request(message)
requests = consumer.consume()
for _, r in zip(produce_items(), requests):
queue_manager.publish_response(r, callback)
logger = logging.getLogger("pyinfra.queue.queue_manager.pika_queue_manager")
logger.addFilter(lambda record: False)
assert queue_manager.output_queue.to_list() == ["00", "11", "22"]
@pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session")
@pytest.mark.parametrize("client_name", ["mock", "s3", "azure"], scope="session")
@pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session")
def test_consuming_nonempty_input_queue_with_visitor_puts_messages_on_output_queue_in_fifo_order(
self, consumer, queue_manager, visitor, bucket_name, storage, items, input_data_encoder
):
visitor.response_strategy = ForwardingStrategy()
queue_manager.clear()
storage.clear_bucket(bucket_name)
for data, message in items:
print(data)
storage.put_object(**get_object_descriptor(message), data=gzip.compress(data))
queue_manager.publish_request(message)
requests = consumer.consume(inactivity_timeout=5)
for itm, req in zip(items, requests):
logger.debug(f"Processing item {itm}")
queue_manager.publish_response(req, visitor)
assert list(map(itemgetter("data"), queue_manager.output_queue.to_list())) == ["00", "11", "22"]
# @pytest.mark.parametrize("queue_manager_name", ["pika"], scope="session")
# def test_message_is_republished_when_callback_raises_processing_failure_exception(
# self, consumer, queue_manager, bucket_name, items
# ):
# class DebugError(Exception):
# pass
#
# def callback(_):
# raise ProcessingFailure
#
# def reject_patch(*args, **kwargs):
# raise DebugError
#
# queue_manager.reject = reject_patch
#
# queue_manager.clear()
#
# for data, message in items:
# queue_manager.publish_request(message)
#
# requests = consumer.consume()
#
# logger = logging.getLogger("pyinfra.queue.queue_manager.pika_queue_manager")
# logger.addFilter(lambda record: False)
#
# with pytest.raises(DebugError):
# while True:
# queue_manager.publish_response(next(requests), callback)
# TODO: for some reason this code now interferes with the commented out tests. all tests work on their own, but
# not together.
with pytest.raises(DebugError):
while True:
queue_manager.publish_response(next(requests), callback)