pyinfra/test/queue/queue_manager_mock.py
2022-05-31 15:30:23 +02:00

56 lines
1.6 KiB
Python

from itertools import islice
from pyinfra.queue.queue_manager.queue_manager import QueueManager, QueueHandle
from test.queue.queue_mock import QueueMock
def monkey_patch_queue_handle(queue) -> QueueHandle:
queue_handle = QueueHandle()
queue_handle.empty = lambda: not queue
queue_handle.to_list = lambda: list(queue)
return queue_handle
class QueueManagerMock(QueueManager):
def __init__(self, input_queue, output_queue):
super().__init__(QueueMock(), QueueMock())
def publish_request(self, request):
self._input_queue.append(request)
def publish_response(self, message, callback):
response_messages = callback(message)
if isinstance(response_messages, dict):
response_messages = [response_messages]
for response_message in response_messages:
self._output_queue.append(response_message)
def pull_request(self):
return self._input_queue.popleft()
def consume(self, **kwargs):
while self._input_queue:
yield self.pull_request()
def consume_and_publish(self, callback, n=None):
for message in islice(self.consume(), n):
self.publish_response(message, callback)
def basic_consume_and_publish(self, callback):
raise NotImplementedError
def clear(self):
self._input_queue.clear()
self._output_queue.clear()
@property
def input_queue(self) -> QueueHandle:
return monkey_patch_queue_handle(self._input_queue)
@property
def output_queue(self) -> QueueHandle:
return monkey_patch_queue_handle(self._output_queue)