2021年11月17日 星期三

RabbitMQ (5):路由設定與Exchange種類

RabbitMQ 提供各種 routing 方法,讓我們能在傳送消息上做到各種變化以適應各種條件問題

前幾章大概說明了消息的傳輸路徑,先來看第一站:Exchange 元件

RabbitMQ 提供了四種 Exchange


1. Direct

Producer 透過設定 routing_key 的方式,將消息傳到特定 queue 或是同時多個 queue

Direct Exchange 會傳給與 Producer 帶的 routing_key 字串完全符合的 binding routing_key 對應的 queue 裡

import pika
from pika.exchange_type import ExchangeType

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='hello_exchange', exchange_type=ExchangeType.direct)

channel.queue_declare(queue='hello')
channel.queue_declare(queue='hello2')

channel.queue_bind(queue="hello", exchange="hello_exchange", routing_key="hello_key")

channel.queue_bind(queue="hello2", exchange="hello_exchange", routing_key="hello_key")


channel.basic_publish(exchange='hello_exchange', routing_key='hello_key', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()


結果:

可以看到 hello_exchange 有兩個 binding 分別對應 hello 和 hello2 這兩個 queue



hello Queue和 hello2 Queue都有各收到一個"Hello World!"消息




2. Fanout

Fanout Exchange 會發送消息到他所知道的所有 queue,不會看 routing_key

import pika
from pika.exchange_type import ExchangeType

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='fanout_exchange', exchange_type=ExchangeType.fanout)

channel.queue_declare(queue='hello')
channel.queue_declare(queue='hello2')

channel.queue_bind(exchange='fanout_exchange',queue="hello")
channel.queue_bind(exchange="fanout_exchange",queue="hello2",)


channel.basic_publish(exchange='fanout_exchange',routing_key='', body='Hello World!')   # 還是要帶 routing_key 不然 pika 會噴錯, 但 fanout exchange 其實不會看這個值
print(" [x] Sent 'Hello World!'")
connection.close()


結果:



3. Topic

Topic Exchange 跟 Direct Exchange 差不多,但可以在 binding routing_key 裡使用 wildcard

import pika
from pika.exchange_type import ExchangeType

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_exchange', exchange_type=ExchangeType.topic)

channel.queue_declare(queue='hello')
channel.queue_declare(queue='hello2')

channel.queue_bind(exchange='topic_exchange', queue="hello", routing_key= "log.*")
channel.queue_bind(exchange='topic_exchange', queue="hello2", routing_key= "other.*")


channel.basic_publish(exchange='topic_exchange',routing_key='log.a', body='Hello World!')   # 會只送到 queue hello
print(" [x] Sent 'Hello World!'")
connection.close()


結果:



4. Headers

Header Exchange 則會看消息的 header 值來決定要傳到哪些Queue


import pika
from pika.exchange_type import ExchangeType

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='header_exchange', exchange_type=ExchangeType.headers)

channel.queue_declare(queue='hello')
channel.queue_declare(queue='hello2')

channel.queue_bind(exchange='header_exchange', queue="hello", arguments={'x-match': 'any', 'key1': 'one', 'key2': 'two'})   # x-match 設定成 any 的話 header 值只要對到一個就會送
channel.queue_bind(exchange='header_exchange', queue="hello2", arguments={'x-match': 'all', 'key1': 'one', 'key2': 'two'} )   # x-match 設定成 all 的話 header 值要全對到才會送


channel.basic_publish(exchange='header_exchange', routing_key='', body='Hello World!',properties=pika.BasicProperties(
                          headers={'key1': 'one'}
                      ))   # 只會送到 hello queue
print(" [x] Sent 'Hello World!'")
connection.close()


上一章:RabbitMQ (4):Message Acknowledgments 與 Durability

下一章:RabbitMQ (6):使用 RabbitMQ 建立 RPC system



沒有留言:

張貼留言