pyinfra/tests/queue_test.py
Julius Unverfehrt 3c4739ad8b Pull request #63: RED-6366 refactor
Merge in RR/pyinfra from RED-6366-refactor to master

Squashed commit of the following:

commit 8807cda514b5cc24b1be208173283275d87dcb97
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Fri Mar 10 13:15:15 2023 +0100

    enable docker-compose autouse for automatic tests

commit c4579581d3e9a885ef387ee97f3f3a5cf4731193
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Fri Mar 10 12:35:49 2023 +0100

    black

commit ac2b754c5624ef37ce310fce7196c9ea11bbca03
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Fri Mar 10 12:30:23 2023 +0100

    refactor storage url parsing

    - move parsing and validation to config where the connection url is
    actually read in
    - improve readability of parsing fn

commit 371802cc10b6d946c4939ff6839571002a2cb9f4
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Fri Mar 10 10:48:00 2023 +0100

    refactor

commit e8c381c29deebf663e665920752c2965d7abce16
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Fri Mar 10 09:57:34 2023 +0100

    rename

commit c8628a509316a651960dfa806d5fe6aacb7a91c1
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Fri Mar 10 09:37:01 2023 +0100

    renaming and refactoring

commit 4974d4f56fd73bc55bd76aa7a9bbb16babee19f4
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Fri Mar 10 08:53:09 2023 +0100

    refactor payload processor

    - limit make_uploader and make_downloader cache
    - partially apply them when the class is initialized with storage and
    bucket to make the logic and behaviour more comprehensive
    - renaming functional pipeline steps to be more expressive

commit f8d51bfcad2b815c8293ab27dd66b256255c5414
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Thu Mar 9 15:30:32 2023 +0100

    remove monitor and rename Payload

commit 412ddaa207a08aff1229d7acd5d95402ac8cd578
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Thu Mar 2 10:15:39 2023 +0100

    remove azure connection string and disable respective test for now for security reasons

commit 7922a2d9d325f3b9008ad4e3e56b241ba179f52c
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Wed Mar 1 13:30:58 2023 +0100

    make payload formatting function names more expressive

commit 7517e544b0f5a434579cc9bada3a37e7ac04059f
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Wed Mar 1 13:24:57 2023 +0100

    add some type hints

commit 095410d3009f2dcbd374680dd0f7b55de94c9e76
Author: Matthias Bisping <matthias.bisping@axbit.com>
Date:   Wed Mar 1 10:54:58 2023 +0100

    Refactoring

    - Renaming
    - Docstring adjustments

commit e992f0715fc2636eb13eb5ffc4de0bcc5d433fc8
Author: Matthias Bisping <matthias.bisping@axbit.com>
Date:   Wed Mar 1 09:43:26 2023 +0100

    Re-wording and typo fixes

commit 3c2d698f9bf980bc4b378a44dc20c2badc407b3e
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Tue Feb 28 14:59:59 2023 +0100

    enable auto startup for docker compose in tests

commit 55773b4fb0b624ca4745e5b8aeafa6f6a0ae6436
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Tue Feb 28 14:59:37 2023 +0100

    Extended tests for queue manager

commit 14f7f943f60b9bfb9fe77fa3cef99a1e7d094333
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Tue Feb 28 13:39:00 2023 +0100

    enable auto startup for docker compose in tests

commit 7caf354491c84c6e0b0e09ad4d41cb5dfbfdb225
Merge: 49d47ba d0277b8
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Tue Feb 28 13:32:52 2023 +0100

    Merge branch 'RED-6205-prometheus' of ssh://git.iqser.com:2222/rr/pyinfra into RED-6205-prometheus

commit 49d47baba8ccf11dee48a4c1cbddc3bbd12471e5
Author: Julius Unverfehrt <julius.unverfehrt@iqser.com>
Date:   Tue Feb 28 13:32:42 2023 +0100

    adjust Payload Processor signature

commit d0277b86bc54994b6032774bf0ec2d7b19d7f517
Merge: 5184a18 f6b35d6
Author: Christoph Schabert <christoph.schabert@iqser.com>
Date:   Tue Feb 28 11:07:16 2023 +0100

    Pull request #61: Change Sec Trigger to PR

    Merge in RR/pyinfra from cschabert/PlanSpecjava-1677578703647 to RED-6205-prometheus

    * commit 'f6b35d648c88ddbce1856445c3b887bce669265c':
      Change Sec Trigger to PR

commit f6b35d648c88ddbce1856445c3b887bce669265c
Author: Christoph Schabert <christoph.schabert@iqser.com>
Date:   Tue Feb 28 11:05:13 2023 +0100

    Change Sec Trigger to PR

... and 20 more commits
2023-03-13 15:11:25 +01:00

121 lines
4.5 KiB
Python

import json
import logging
import time
from multiprocessing import Process
import pika
import pika.exceptions
import pytest
from pyinfra.queue.development_queue_manager import DevelopmentQueueManager
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
@pytest.fixture(scope="session")
def development_queue_manager(queue_config):
queue_config.rabbitmq_heartbeat = 7200
development_queue_manager = DevelopmentQueueManager(queue_config)
yield development_queue_manager
logger.info("Tearing down development queue manager...")
try:
development_queue_manager.close_channel()
except pika.exceptions.ConnectionClosedByBroker:
pass
@pytest.fixture(scope="session")
def payload_processing_time(queue_config, offset=5):
# FIXME: this implicitly tests the heartbeat when running the end-to-end test. There should be another way to test
# this explicitly.
return queue_config.rabbitmq_heartbeat + offset
@pytest.fixture(scope="session")
def payload_processor(response_payload, payload_processing_time, payload_processor_type):
def process(payload):
time.sleep(payload_processing_time)
return response_payload
def process_with_failure(payload):
raise MemoryError
if payload_processor_type == "mock":
return process
elif payload_processor_type == "failing":
return process_with_failure
@pytest.fixture(scope="session", autouse=True)
def start_queue_consumer(queue_manager, payload_processor, sleep_seconds=5):
def consume_queue():
queue_manager.start_consuming(payload_processor)
p = Process(target=consume_queue)
p.start()
logger.info(f"Setting up consumer, waiting for {sleep_seconds}...")
time.sleep(sleep_seconds)
yield
logger.info("Tearing down consumer...")
p.terminate()
@pytest.fixture
def message_properties(message_headers):
if not message_headers:
return pika.BasicProperties(headers=None)
elif message_headers == "x-tenant-id":
return pika.BasicProperties(headers={"x-tenant-id": "redaction"})
else:
raise Exception(f"Invalid {message_headers=}.")
class TestQueueManager:
# FIXME: All tests here are wonky. This is due to the implementation of running the process-blocking queue_manager
# in a subprocess. It is then very hard to interact directly with the subprocess. If you have a better idea, please
# refactor; the tests here are insufficient to ensure the functionality of the queue manager!
@pytest.mark.parametrize("payload_processor_type", ["mock"], scope="session")
def test_message_processing_does_not_block_heartbeat(
self, development_queue_manager, request_payload, response_payload, payload_processing_time
):
development_queue_manager.clear_queues()
development_queue_manager.publish_request(request_payload)
time.sleep(payload_processing_time + 10)
_, _, body = development_queue_manager.get_response()
result = json.loads(body)
assert result == response_payload
@pytest.mark.parametrize("message_headers", [None, "x-tenant-id"])
@pytest.mark.parametrize("payload_processor_type", ["mock"], scope="session")
def test_queue_manager_forwards_message_headers(
self,
development_queue_manager,
request_payload,
response_payload,
payload_processing_time,
message_properties,
):
development_queue_manager.clear_queues()
development_queue_manager.publish_request(request_payload, message_properties)
time.sleep(payload_processing_time + 10)
_, properties, _ = development_queue_manager.get_response()
assert properties.headers == message_properties.headers
# FIXME: It is not possible to test the behavior of the queue manager directly, since it is running in a separate
# process. You require logging to see if the exception is handled correctly. Hence, this test is only useful for
# development, but insufficient to guarantee the correct behavior.
@pytest.mark.parametrize("payload_processor_type", ["failing"], scope="session")
def test_failed_message_processing_is_handled(
self,
development_queue_manager,
request_payload,
response_payload,
payload_processing_time,
):
development_queue_manager.clear_queues()
development_queue_manager.publish_request(request_payload)
time.sleep(payload_processing_time + 10)
_, _, body = development_queue_manager.get_response()
assert not body