Pull request #30: Multiple consumers fix
Merge in RR/pyinfra from multiple_consumer_fix to master
Squashed commit of the following:
commit b2892d2f21c90e4ebc9a48718ab0a834bf65fc32
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Thu Apr 21 17:38:11 2022 +0200
formatting
commit 57b3be627caf3c6a87ec248036b7258e9063709d
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Thu Apr 21 17:36:05 2022 +0200
removed debug print
commit 41db74b9103b55fd6f6d79b27a654042f86c86ea
Author: Matthias Bisping <matthias.bisping@iqser.com>
Date: Thu Apr 21 17:35:05 2022 +0200
reintroduced usage for basic_consume. Normal consume was blocking other consumers, but basic_consume is not easily testable.
This commit is contained in:
parent
20c520aa17
commit
535be0413d
@ -11,6 +11,8 @@ omit =
|
||||
*/env/*
|
||||
*/build_venv/*
|
||||
*/build_env/*
|
||||
*/utils/banner.py
|
||||
*/utils/logger.py
|
||||
source =
|
||||
pyinfra
|
||||
src
|
||||
@ -44,6 +46,8 @@ omit =
|
||||
*/env/*
|
||||
*/build_venv/*
|
||||
*/build_env/*
|
||||
*/utils/banner.py
|
||||
*/utils/logger.py
|
||||
|
||||
ignore_errors = True
|
||||
|
||||
|
||||
@ -9,5 +9,8 @@ class Consumer:
|
||||
def consume_and_publish(self):
|
||||
self.queue_manager.consume_and_publish(self.callback)
|
||||
|
||||
def basic_consume_and_publish(self):
|
||||
self.queue_manager.basic_consume_and_publish(self.callback)
|
||||
|
||||
def consume(self, **kwargs):
|
||||
return self.queue_manager.consume(**kwargs)
|
||||
|
||||
@ -134,6 +134,17 @@ class PikaQueueManager(QueueManager):
|
||||
for message in self.consume():
|
||||
self.publish_response(message, visitor)
|
||||
|
||||
def basic_consume_and_publish(self, visitor):
|
||||
|
||||
logger.info(f"Basic consuming with callback {visitor.callback.__name__}")
|
||||
|
||||
def callback(channel, frame, properties, body):
|
||||
message = (frame, properties, body)
|
||||
return self.publish_response(message, visitor)
|
||||
|
||||
self.channel.basic_consume(self._input_queue, callback)
|
||||
self.channel.start_consuming()
|
||||
|
||||
def clear(self):
|
||||
try:
|
||||
self.channel.queue_purge(self._input_queue)
|
||||
|
||||
@ -3,10 +3,10 @@ import abc
|
||||
|
||||
class QueueHandle:
|
||||
def empty(self) -> bool:
|
||||
raise NotImplemented()
|
||||
raise NotImplementedError
|
||||
|
||||
def to_list(self) -> list:
|
||||
raise NotImplemented()
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class QueueManager(abc.ABC):
|
||||
@ -16,32 +16,36 @@ class QueueManager(abc.ABC):
|
||||
|
||||
@abc.abstractmethod
|
||||
def publish_request(self, request):
|
||||
pass
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def publish_response(self, response, callback):
|
||||
pass
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def pull_request(self):
|
||||
pass
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def consume(self, **kwargs):
|
||||
pass
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def clear(self):
|
||||
pass
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def input_queue(self) -> QueueHandle:
|
||||
pass
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def output_queue(self) -> QueueHandle:
|
||||
pass
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def consume_and_publish(self, callback):
|
||||
pass
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def basic_consume_and_publish(self, callback):
|
||||
raise NotImplementedError
|
||||
|
||||
@ -7,28 +7,28 @@ class StorageAdapter(ABC):
|
||||
|
||||
@abstractmethod
|
||||
def make_bucket(self, bucket_name):
|
||||
pass
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def has_bucket(self, bucket_name):
|
||||
pass
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def put_object(self, bucket_name, object_name, data):
|
||||
pass
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def get_object(self, bucket_name, object_name):
|
||||
pass
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def get_all_objects(self, bucket_name):
|
||||
pass
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def clear_bucket(self, bucket_name):
|
||||
pass
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def get_all_object_names(self, bucket_name):
|
||||
pass
|
||||
raise NotImplementedError
|
||||
|
||||
@ -30,6 +30,9 @@ class QueueManagerMock(QueueManager):
|
||||
for message in self.consume():
|
||||
self.publish_response(message, callback)
|
||||
|
||||
def basic_consume_and_publish(self, callback):
|
||||
raise NotImplementedError
|
||||
|
||||
def clear(self):
|
||||
self._input_queue.clear()
|
||||
self._output_queue.clear()
|
||||
|
||||
@ -9,6 +9,7 @@ from pyinfra.storage.storages import get_s3_storage
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--bucket_name", "-b", required=True)
|
||||
parser.add_argument("--analysis_container", "-a", choices=["detr", "ner", "image"], required=True)
|
||||
args = parser.parse_args()
|
||||
return args
|
||||
@ -42,7 +43,7 @@ def make_connection() -> pika.BlockingConnection:
|
||||
return connection
|
||||
|
||||
|
||||
def build_message_bodies(analyse_container_type):
|
||||
def build_message_bodies(analyse_container_type, bucket_name):
|
||||
def update_message(message_dict):
|
||||
if analyse_container_type == "detr" or analyse_container_type == "image":
|
||||
message_dict.update({"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "IMAGE_INFO.json.gz"})
|
||||
@ -53,7 +54,7 @@ def build_message_bodies(analyse_container_type):
|
||||
return message_dict
|
||||
|
||||
storage = get_s3_storage()
|
||||
for bucket_name, pdf_name in storage.get_all_object_names(parse_disjunction_string(CONFIG.storage.bucket)):
|
||||
for bucket_name, pdf_name in storage.get_all_object_names(bucket_name):
|
||||
if "pdf" not in pdf_name:
|
||||
continue
|
||||
file_id = pdf_name.split(".")[0]
|
||||
@ -69,7 +70,7 @@ def main(args):
|
||||
declare_queue(channel, CONFIG.rabbitmq.queues.input)
|
||||
declare_queue(channel, CONFIG.rabbitmq.queues.output)
|
||||
|
||||
for body in build_message_bodies(args.analysis_container):
|
||||
for body in build_message_bodies(args.analysis_container, args.bucket_name):
|
||||
channel.basic_publish("", CONFIG.rabbitmq.queues.input, body)
|
||||
print(f"Put {body} on {CONFIG.rabbitmq.queues.input}")
|
||||
|
||||
|
||||
@ -57,7 +57,7 @@ def main():
|
||||
def consume():
|
||||
consumer = Consumer(visitor, queue_manager)
|
||||
try:
|
||||
consumer.consume_and_publish()
|
||||
consumer.basic_consume_and_publish()
|
||||
except Exception as err:
|
||||
raise ConsumerError() from err
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user