aber logisch richtig...er
This commit is contained in:
parent
9ed1c48b56
commit
2a76639049
@ -7,9 +7,9 @@ rabbitmq:
|
||||
heartbeat: $RABBITMQ_HEARTBEAT|7200 # Controls AMQP heartbeat timeout in seconds
|
||||
|
||||
queues: # Names of queues for...
|
||||
input: input_queue # requests to service
|
||||
output: response_queue # responses by service
|
||||
dead_letter: dead_letter_queue # messages that failed to process
|
||||
input: image_request_queue # requests to service
|
||||
output: image_response_queue # responses by service
|
||||
dead_letter: image_dead_letter_queue # messages that failed to process
|
||||
|
||||
prefetch_count: 1
|
||||
|
||||
|
||||
@ -1,17 +1,10 @@
|
||||
import sys
|
||||
import pika
|
||||
import sys
|
||||
from retry import retry
|
||||
|
||||
from mini_queue.utils.config import CONFIG
|
||||
|
||||
|
||||
def callback(channel, method, properties, body):
|
||||
print("Received %r" % body)
|
||||
response = body
|
||||
channel.basic_publish(exchange="", routing_key=CONFIG.rabbitmq.queues.output, body=response)
|
||||
channel.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
|
||||
def init_params():
|
||||
credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password)
|
||||
parameters = pika.ConnectionParameters(
|
||||
@ -21,13 +14,3 @@ def init_params():
|
||||
credentials=credentials,
|
||||
)
|
||||
return parameters
|
||||
|
||||
|
||||
# @retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
|
||||
def consume(parameters, queue):
|
||||
connection = pika.BlockingConnection(parameters)
|
||||
channel = connection.channel()
|
||||
channel.basic_consume(queue=queue, auto_ack=False, on_message_callback=callback)
|
||||
print(" [*] Waiting for messages. To exit press CTRL+C")
|
||||
|
||||
channel.start_consuming()
|
||||
|
||||
@ -1,10 +1,35 @@
|
||||
from mini_queue.consumer import consume, init_params
|
||||
from mini_queue.consumer import init_params
|
||||
from mini_queue.utils.config import CONFIG
|
||||
import sys
|
||||
import pika
|
||||
from retry import retry
|
||||
|
||||
from mini_queue.utils.config import CONFIG
|
||||
|
||||
|
||||
def callback(channel, method, properties, body):
|
||||
print("Received %r" % body)
|
||||
response = body
|
||||
channel.basic_publish(exchange="", routing_key=CONFIG.rabbitmq.queues.output, body=response)
|
||||
channel.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
print("startet happy pikachu!")
|
||||
queue = CONFIG.rabbitmq.queues.input
|
||||
|
||||
parameters = init_params()
|
||||
consume(parameters, queue)
|
||||
|
||||
connection = pika.BlockingConnection(parameters)
|
||||
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue=CONFIG.rabbitmq.queues.output, durable=True)
|
||||
|
||||
while True:
|
||||
try:
|
||||
channel.basic_consume(queue=CONFIG.rabbitmq.queues.input, auto_ack=False, on_message_callback=callback)
|
||||
print(" [*] Waiting for messages. To exit press CTRL+C")
|
||||
channel.start_consuming()
|
||||
except:
|
||||
pass
|
||||
|
||||
@ -8,7 +8,6 @@ from mini_queue.utils.config import CONFIG
|
||||
if __name__ == "__main__":
|
||||
|
||||
credentials = pika.PlainCredentials(CONFIG.rabbitmq.user, CONFIG.rabbitmq.password)
|
||||
queue = CONFIG.rabbitmq.queues.input
|
||||
parameters = pika.ConnectionParameters(
|
||||
host=CONFIG.rabbitmq.host,
|
||||
port=CONFIG.rabbitmq.port,
|
||||
@ -18,9 +17,13 @@ if __name__ == "__main__":
|
||||
body = json.dumps({"fileId": "234", "dossierId": "3403"})
|
||||
connection = pika.BlockingConnection(parameters)
|
||||
channel = connection.channel()
|
||||
channel.queue_declare(queue=queue, durable=True)
|
||||
channel.basic_publish("", queue, body)
|
||||
print(f" [x] Sent {body} on {queue}")
|
||||
|
||||
channel.queue_declare(queue=CONFIG.rabbitmq.queues.input, durable=True)
|
||||
channel.queue_declare(queue=CONFIG.rabbitmq.queues.output, durable=True)
|
||||
|
||||
channel.basic_publish("", CONFIG.rabbitmq.queues.input, body)
|
||||
|
||||
print(f" [x] Put {body} on {CONFIG.rabbitmq.queues.input}")
|
||||
|
||||
for method_frame, _, body in channel.consume(queue=CONFIG.rabbitmq.queues.output):
|
||||
print(json.loads(body))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user