56 lines
1.6 KiB
Python
56 lines
1.6 KiB
Python
from itertools import islice
|
|
|
|
from pyinfra.queue.queue_manager.queue_manager import QueueManager, QueueHandle
|
|
from test.queue.queue_mock import QueueMock
|
|
|
|
|
|
def monkey_patch_queue_handle(queue) -> QueueHandle:
|
|
queue_handle = QueueHandle()
|
|
queue_handle.empty = lambda: not queue
|
|
queue_handle.to_list = lambda: list(queue)
|
|
return queue_handle
|
|
|
|
|
|
class QueueManagerMock(QueueManager):
|
|
def __init__(self, input_queue, output_queue):
|
|
super().__init__(QueueMock(), QueueMock())
|
|
|
|
def publish_request(self, request):
|
|
self._input_queue.append(request)
|
|
|
|
def publish_response(self, message, callback):
|
|
|
|
response_messages = callback(message)
|
|
|
|
if isinstance(response_messages, dict):
|
|
response_messages = [response_messages]
|
|
|
|
for response_message in response_messages:
|
|
self._output_queue.append(response_message)
|
|
|
|
def pull_request(self):
|
|
return self._input_queue.popleft()
|
|
|
|
def consume(self, **kwargs):
|
|
while self._input_queue:
|
|
yield self.pull_request()
|
|
|
|
def consume_and_publish(self, callback, n=None):
|
|
for message in islice(self.consume(), n):
|
|
self.publish_response(message, callback)
|
|
|
|
def basic_consume_and_publish(self, callback):
|
|
raise NotImplementedError
|
|
|
|
def clear(self):
|
|
self._input_queue.clear()
|
|
self._output_queue.clear()
|
|
|
|
@property
|
|
def input_queue(self) -> QueueHandle:
|
|
return monkey_patch_queue_handle(self._input_queue)
|
|
|
|
@property
|
|
def output_queue(self) -> QueueHandle:
|
|
return monkey_patch_queue_handle(self._output_queue)
|