from pyinfra.queue.queue_manager.queue_manager import QueueManager, QueueHandle from test.queue.queue_mock import QueueMock def monkey_patch_queue_handle(queue) -> QueueHandle: queue_handle = QueueHandle() queue_handle.empty = lambda: not queue queue_handle.to_list = lambda: list(queue) return queue_handle class QueueManagerMock(QueueManager): def __init__(self, input_queue, output_queue): super().__init__(QueueMock(), QueueMock()) def publish_request(self, request): self._input_queue.append(request) def publish_response(self, message, callback): self._output_queue.append(callback(message)) def pull_request(self): return self._input_queue.popleft() def consume(self, **kwargs): while self._input_queue: yield self.pull_request() def consume_and_publish(self, callback): for message in self.consume(): self.publish_response(message, callback) def basic_consume_and_publish(self, callback): raise NotImplementedError def clear(self): self._input_queue.clear() self._output_queue.clear() @property def input_queue(self) -> QueueHandle: return monkey_patch_queue_handle(self._input_queue) @property def output_queue(self) -> QueueHandle: return monkey_patch_queue_handle(self._output_queue)