pyinfra/test/queue/queue_manager_mock.py
Matthias Bisping c9bfc767a8 Pull request #32: restructuring: moved test out of module scope
Merge in RR/pyinfra from partial_responses to master

Squashed commit of the following:

commit afd67d87a6349c4b97453a12274c6ccf5e976339
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Tue Apr 26 12:48:12 2022 +0200

    updated test container dockerfile for new location of tests directory

commit 37881da08ebedf0f2d0c6c2b267bdb47818a0da1
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date:   Tue Apr 26 12:45:12 2022 +0200

    restructuring: moved test out  of module scope
2022-04-26 13:12:54 +02:00

47 lines
1.4 KiB
Python

from pyinfra.queue.queue_manager.queue_manager import QueueManager, QueueHandle
from test.queue.queue_mock import QueueMock
def monkey_patch_queue_handle(queue) -> QueueHandle:
queue_handle = QueueHandle()
queue_handle.empty = lambda: not queue
queue_handle.to_list = lambda: list(queue)
return queue_handle
class QueueManagerMock(QueueManager):
def __init__(self, input_queue, output_queue):
super().__init__(QueueMock(), QueueMock())
def publish_request(self, request):
self._input_queue.append(request)
def publish_response(self, message, callback):
self._output_queue.append(callback(message))
def pull_request(self):
return self._input_queue.popleft()
def consume(self, **kwargs):
while self._input_queue:
yield self.pull_request()
def consume_and_publish(self, callback):
for message in self.consume():
self.publish_response(message, callback)
def basic_consume_and_publish(self, callback):
raise NotImplementedError
def clear(self):
self._input_queue.clear()
self._output_queue.clear()
@property
def input_queue(self) -> QueueHandle:
return monkey_patch_queue_handle(self._input_queue)
@property
def output_queue(self) -> QueueHandle:
return monkey_patch_queue_handle(self._output_queue)