diff --git a/.coveragerc b/.coveragerc index 41d11e8..e16d173 100644 --- a/.coveragerc +++ b/.coveragerc @@ -11,6 +11,8 @@ omit = */env/* */build_venv/* */build_env/* + */utils/banner.py + */utils/logger.py source = pyinfra src @@ -40,10 +42,12 @@ omit = */test/* */__init__.py */setup.py - */venv/* - */env/* - */build_venv/* - */build_env/* + */venv/* + */env/* + */build_venv/* + */build_env/* + */utils/banner.py + */utils/logger.py ignore_errors = True diff --git a/pyinfra/queue/consumer.py b/pyinfra/queue/consumer.py index c21cb26..1072178 100644 --- a/pyinfra/queue/consumer.py +++ b/pyinfra/queue/consumer.py @@ -9,5 +9,8 @@ class Consumer: def consume_and_publish(self): self.queue_manager.consume_and_publish(self.callback) + def basic_consume_and_publish(self): + self.queue_manager.basic_consume_and_publish(self.callback) + def consume(self, **kwargs): return self.queue_manager.consume(**kwargs) diff --git a/pyinfra/queue/queue_manager/pika_queue_manager.py b/pyinfra/queue/queue_manager/pika_queue_manager.py index 8bc70d4..5cde19f 100644 --- a/pyinfra/queue/queue_manager/pika_queue_manager.py +++ b/pyinfra/queue/queue_manager/pika_queue_manager.py @@ -134,6 +134,17 @@ class PikaQueueManager(QueueManager): for message in self.consume(): self.publish_response(message, visitor) + def basic_consume_and_publish(self, visitor): + + logger.info(f"Basic consuming with callback {visitor.callback.__name__}") + + def callback(channel, frame, properties, body): + message = (frame, properties, body) + return self.publish_response(message, visitor) + + self.channel.basic_consume(self._input_queue, callback) + self.channel.start_consuming() + def clear(self): try: self.channel.queue_purge(self._input_queue) diff --git a/pyinfra/queue/queue_manager/queue_manager.py b/pyinfra/queue/queue_manager/queue_manager.py index 44b6baf..d42bfc5 100644 --- a/pyinfra/queue/queue_manager/queue_manager.py +++ b/pyinfra/queue/queue_manager/queue_manager.py @@ -3,10 +3,10 @@ import abc class QueueHandle: def empty(self) -> bool: - raise NotImplemented() + raise NotImplementedError def to_list(self) -> list: - raise NotImplemented() + raise NotImplementedError class QueueManager(abc.ABC): @@ -16,32 +16,36 @@ class QueueManager(abc.ABC): @abc.abstractmethod def publish_request(self, request): - pass + raise NotImplementedError @abc.abstractmethod def publish_response(self, response, callback): - pass + raise NotImplementedError @abc.abstractmethod def pull_request(self): - pass + raise NotImplementedError @abc.abstractmethod def consume(self, **kwargs): - pass + raise NotImplementedError @abc.abstractmethod def clear(self): - pass + raise NotImplementedError @abc.abstractmethod def input_queue(self) -> QueueHandle: - pass + raise NotImplementedError @abc.abstractmethod def output_queue(self) -> QueueHandle: - pass + raise NotImplementedError @abc.abstractmethod def consume_and_publish(self, callback): - pass + raise NotImplementedError + + @abc.abstractmethod + def basic_consume_and_publish(self, callback): + raise NotImplementedError diff --git a/pyinfra/storage/adapters/adapter.py b/pyinfra/storage/adapters/adapter.py index 4c27026..57cff28 100644 --- a/pyinfra/storage/adapters/adapter.py +++ b/pyinfra/storage/adapters/adapter.py @@ -7,28 +7,28 @@ class StorageAdapter(ABC): @abstractmethod def make_bucket(self, bucket_name): - pass + raise NotImplementedError @abstractmethod def has_bucket(self, bucket_name): - pass + raise NotImplementedError @abstractmethod def put_object(self, bucket_name, object_name, data): - pass + raise NotImplementedError @abstractmethod def get_object(self, bucket_name, object_name): - pass + raise NotImplementedError @abstractmethod def get_all_objects(self, bucket_name): - pass + raise NotImplementedError @abstractmethod def clear_bucket(self, bucket_name): - pass + raise NotImplementedError @abstractmethod def get_all_object_names(self, bucket_name): - pass + raise NotImplementedError diff --git a/pyinfra/test/queue/queue_manager_mock.py b/pyinfra/test/queue/queue_manager_mock.py index 1928f12..58c40a3 100644 --- a/pyinfra/test/queue/queue_manager_mock.py +++ b/pyinfra/test/queue/queue_manager_mock.py @@ -30,6 +30,9 @@ class QueueManagerMock(QueueManager): for message in self.consume(): self.publish_response(message, callback) + def basic_consume_and_publish(self, callback): + raise NotImplementedError + def clear(self): self._input_queue.clear() self._output_queue.clear() diff --git a/scripts/mock_client.py b/scripts/mock_client.py index 0b456d9..0a97e03 100644 --- a/scripts/mock_client.py +++ b/scripts/mock_client.py @@ -9,6 +9,7 @@ from pyinfra.storage.storages import get_s3_storage def parse_args(): parser = argparse.ArgumentParser() + parser.add_argument("--bucket_name", "-b", required=True) parser.add_argument("--analysis_container", "-a", choices=["detr", "ner", "image"], required=True) args = parser.parse_args() return args @@ -42,7 +43,7 @@ def make_connection() -> pika.BlockingConnection: return connection -def build_message_bodies(analyse_container_type): +def build_message_bodies(analyse_container_type, bucket_name): def update_message(message_dict): if analyse_container_type == "detr" or analyse_container_type == "image": message_dict.update({"targetFileExtension": "ORIGIN.pdf.gz", "responseFileExtension": "IMAGE_INFO.json.gz"}) @@ -53,7 +54,7 @@ def build_message_bodies(analyse_container_type): return message_dict storage = get_s3_storage() - for bucket_name, pdf_name in storage.get_all_object_names(parse_disjunction_string(CONFIG.storage.bucket)): + for bucket_name, pdf_name in storage.get_all_object_names(bucket_name): if "pdf" not in pdf_name: continue file_id = pdf_name.split(".")[0] @@ -69,7 +70,7 @@ def main(args): declare_queue(channel, CONFIG.rabbitmq.queues.input) declare_queue(channel, CONFIG.rabbitmq.queues.output) - for body in build_message_bodies(args.analysis_container): + for body in build_message_bodies(args.analysis_container, args.bucket_name): channel.basic_publish("", CONFIG.rabbitmq.queues.input, body) print(f"Put {body} on {CONFIG.rabbitmq.queues.input}") diff --git a/src/serve.py b/src/serve.py index 236d42f..f670e74 100644 --- a/src/serve.py +++ b/src/serve.py @@ -57,7 +57,7 @@ def main(): def consume(): consumer = Consumer(visitor, queue_manager) try: - consumer.consume_and_publish() + consumer.basic_consume_and_publish() except Exception as err: raise ConsumerError() from err