diff --git a/pyinfra/queue/queue_manager/pika_queue_manager.py b/pyinfra/queue/queue_manager/pika_queue_manager.py index af65545..c38d91f 100644 --- a/pyinfra/queue/queue_manager/pika_queue_manager.py +++ b/pyinfra/queue/queue_manager/pika_queue_manager.py @@ -82,7 +82,7 @@ class PikaQueueManager(QueueManager): self.channel.queue_declare(input_queue, arguments=args, auto_delete=False, durable=True) self.channel.queue_declare(output_queue, arguments=args, auto_delete=False, durable=True) - def republish(self, body, n_current_attempts, frame): + def republish(self, body: bytes, n_current_attempts, frame): self.channel.basic_publish( exchange="", routing_key=self._input_queue, @@ -108,10 +108,8 @@ class PikaQueueManager(QueueManager): n_attempts = get_n_previous_attempts(properties) + 1 try: - body = json.loads(body) - response = visitor(body) - response = json.dumps(response) - self.channel.basic_publish("", self._output_queue, response.encode()) + response = json.dumps(visitor(json.loads(body))).encode() + self.channel.basic_publish("", self._output_queue, response) self.channel.basic_ack(frame.delivery_tag) except ProcessingFailure: diff --git a/test/unit_tests/consumer_test.py b/test/unit_tests/consumer_test.py index bff6023..b64e69c 100644 --- a/test/unit_tests/consumer_test.py +++ b/test/unit_tests/consumer_test.py @@ -76,10 +76,10 @@ class TestConsumer: pass def callback(_): - raise ProcessingFailure() + raise ProcessingFailure def reject_patch(*args, **kwargs): - raise DebugError() + raise DebugError queue_manager.reject = reject_patch