Merge in RR/pyinfra from 2.0.0-input-output-file-pattern-for-download-strategy to 2.0.0
Squashed commit of the following:
commit c7ce79ebbeace6a8cb7925ed69eda2d7cd2a4783
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Fri Jun 24 12:35:29 2022 +0200
refactor
commit 80f04e544962760adb2dc60c9dd03ccca22167d6
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Fri Jun 24 11:06:10 2022 +0200
refactoring of component factory, callback and client-pipeline getter
commit 6c024e1a789e1d55f0739c6846e5c02e8b7c943d
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Thu Jun 23 20:04:10 2022 +0200
operations section in config cleaned up; added upload formatter
commit c85800aefc224967cea591c1ec4cf1aaa3ac8215
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Thu Jun 23 19:22:51 2022 +0200
refactoring; removed obsolete config entries and code
commit 4be125952d82dc868935c8c73ad87fd8f0bd1d6c
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Thu Jun 23 19:14:47 2022 +0200
removed obsolete code
commit ac69a5c8e3f1e2fd7e828a17eeab97984f4f9746
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Thu Jun 23 18:58:41 2022 +0200
refactoring: rm dl strat module
commit efd36d0fc4f8f36d267bfa9d35415811fe723ccc
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Thu Jun 23 18:33:51 2022 +0200
refactoring: multi dl strat -> downloader, rm single dl strat
commit afffdeb993500a6abdb6fe85a549e3d6e97e9ee7
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Thu Jun 23 16:39:22 2022 +0200
operations section in config cleaned up
commit 671129af3e343490e0fb277a2b0329aa3027fd73
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Thu Jun 23 16:09:16 2022 +0200
rename prometheus metric name to include service name
commit 932a3e314b382315492aecab95b1f02f2916f8a6
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Thu Jun 23 14:43:23 2022 +0200
cleaned up file descr mngr
commit 79350b4ce71fcd095ed6a5e1d3a598ea246fae53
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Thu Jun 23 12:26:15 2022 +0200
refactoring WIP: moving response stratgey logic into storage strategy (needs to be refactored as well, later) and file descr mngr. Here the moved code needs to be cleaned up.
commit 7e48c66f0c378b25a433a4034eefdc8a0957e775
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Thu Jun 23 12:00:48 2022 +0200
refactoring; removed operation / response folder from output path
commit 8e6cbdaf23c48f6eeb52512b7f382d5727e206d6
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Thu Jun 23 11:08:37 2022 +0200
refactoring; added operation -> file pattern mapping to file descr mngr (mainly for self-documentaton purposes)
commit 2c80d7cec0cc171e099e5b13aadd2ae0f9bf4f02
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Thu Jun 23 10:59:57 2022 +0200
refactoring: introduced input- and output-file specific methods to file descr mngr
commit ecced37150eaac3008cc1b01b235e5f7135e504b
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Thu Jun 23 10:43:26 2022 +0200
refactoring
commit 3828341e98861ff8d63035ee983309ad5064bb30
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Thu Jun 23 10:42:46 2022 +0200
refactoring
commit 9a7c412523d467af40feb6924823ca89e28aadfe
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Wed Jun 22 17:04:54 2022 +0200
add prometheus metric name for default operation
commit d207b2e274ba53b2a21a18c367bb130fb05ee1cd
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Wed Jun 22 17:02:55 2022 +0200
Merge config
commit d3fdf36b12d8def18810454765e731599b833bfc
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Wed Jun 22 17:01:12 2022 +0200
added fixmes / todos
commit f49d0b9cb7764473ef9d127bc5d88525a4a16a23
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date: Wed Jun 22 16:28:25 2022 +0200
update script
... and 47 more commits
120 lines
4.3 KiB
Python
120 lines
4.3 KiB
Python
import logging
|
|
|
|
import pytest
|
|
from funcy import pluck, lflatten
|
|
|
|
from pyinfra.config import CONFIG
|
|
from pyinfra.component_factory import ComponentFactory
|
|
from pyinfra.exceptions import ProcessingFailure
|
|
from pyinfra.visitor.strategies.response.forwarding import ForwardingStrategy
|
|
from test.utils.storage import pack_for_upload
|
|
|
|
logger = logging.getLogger()
|
|
|
|
|
|
@pytest.mark.xfail(
|
|
reason="NOTE: Something is messed up in the test setups."
|
|
" These tests fail when run together with other tests. Do not know yet which ones and why."
|
|
)
|
|
class TestConsumer:
|
|
@pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session")
|
|
def test_consuming_empty_input_queue_does_not_put_anything_on_output_queue(self, consumer, queue_manager):
|
|
queue_manager.clear()
|
|
consumer.consume()
|
|
assert queue_manager.output_queue.empty()
|
|
|
|
@pytest.mark.parametrize(
|
|
"queue_manager_name",
|
|
[
|
|
"pika", # NOTE: pika must come first. Test fails IFF pika is in second position, for whatever reason.
|
|
"mock",
|
|
],
|
|
scope="session",
|
|
)
|
|
def test_consuming_nonempty_input_queue_puts_messages_on_output_queue_in_fifo_order(
|
|
self, consumer, queue_manager, callback, queue_manager_name
|
|
):
|
|
|
|
assert consumer.queue_manager is queue_manager
|
|
|
|
def produce_items():
|
|
return map(str, range(4))
|
|
|
|
def mock_visitor(callback):
|
|
def inner(data):
|
|
return callback({"data": data.encode()})
|
|
|
|
return inner
|
|
|
|
callback = mock_visitor(callback)
|
|
|
|
queue_manager.clear()
|
|
|
|
for item in produce_items():
|
|
queue_manager.publish_request(item)
|
|
|
|
requests = consumer.consume(n=4)
|
|
|
|
for r in requests:
|
|
queue_manager.publish_response(r, callback)
|
|
|
|
assert queue_manager.output_queue.to_list()[:3] == ["00", "11", "22"]
|
|
|
|
@pytest.mark.parametrize("queue_manager_name", ["mock", "pika"], scope="session")
|
|
@pytest.mark.parametrize("client_name", ["mock", "s3", "azure"], scope="session")
|
|
@pytest.mark.parametrize("response_strategy_name", ["forwarding", "storage"], scope="session")
|
|
def test_consuming_nonempty_input_queue_with_visitor_puts_messages_on_output_queue_in_fifo_order(
|
|
self, consumer, queue_manager, visitor, bucket_name, storage, items, input_data_encoder,
|
|
):
|
|
file_descriptor_manager = ComponentFactory(CONFIG).get_file_descriptor_manager()
|
|
|
|
visitor.response_strategy = ForwardingStrategy()
|
|
|
|
queue_manager.clear()
|
|
storage.clear_bucket(bucket_name)
|
|
|
|
for data, message in items:
|
|
storage.put_object(**file_descriptor_manager.get_input_object_descriptor(message), data=pack_for_upload(data))
|
|
queue_manager.publish_request(message)
|
|
|
|
requests = consumer.consume(inactivity_timeout=5)
|
|
|
|
for itm, req in zip(items, requests):
|
|
logger.debug(f"Processing item {itm}")
|
|
queue_manager.publish_response(req, visitor)
|
|
|
|
assert lflatten(pluck("analysis_payloads", queue_manager.output_queue.to_list())) == ["00", "11", "22"]
|
|
|
|
@pytest.mark.parametrize("queue_manager_name", ["pika"], scope="session")
|
|
def test_message_is_republished_when_callback_raises_processing_failure_exception(
|
|
self, consumer, queue_manager, bucket_name, items
|
|
):
|
|
class DebugError(Exception):
|
|
pass
|
|
|
|
def callback(_):
|
|
raise ProcessingFailure
|
|
|
|
def reject_patch(*args, **kwargs):
|
|
raise DebugError
|
|
|
|
queue_manager.reject = reject_patch
|
|
|
|
queue_manager.clear()
|
|
|
|
for data, message in items:
|
|
queue_manager.publish_request(message)
|
|
|
|
requests = consumer.consume()
|
|
|
|
logger = logging.getLogger("pyinfra.queue.queue_manager.pika_queue_manager")
|
|
logger.addFilter(lambda record: False)
|
|
|
|
# TODO:
|
|
# The note above says order of mock and pika matters. Note, that you can produce an error for debugging, when
|
|
# using `queue_manager.publish_response(next(requests), callback)` without the-with and while-block, where it
|
|
# becomes obvious, that the test with the note above then uses the data from THIS here test.
|
|
with pytest.raises(DebugError):
|
|
while True:
|
|
queue_manager.publish_response(next(requests), callback)
|