RabbitMQ 提供各種 routing 方法,讓我們能在傳送消息上做到各種變化以適應各種條件問題
前幾章大概說明了消息的傳輸路徑,先來看第一站:Exchange 元件
RabbitMQ 提供了四種 Exchange
1. Direct
Producer 透過設定 routing_key 的方式,將消息傳到特定 queue 或是同時多個 queue
Direct Exchange 會傳給與 Producer 帶的 routing_key 字串完全符合的 binding routing_key 對應的 queue 裡
import pika
from pika.exchange_type import ExchangeType
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='hello_exchange', exchange_type=ExchangeType.direct)
channel.queue_declare(queue='hello')
channel.queue_declare(queue='hello2')
channel.queue_bind(queue="hello", exchange="hello_exchange", routing_key="hello_key")
channel.queue_bind(queue="hello2", exchange="hello_exchange", routing_key="hello_key")
channel.basic_publish(exchange='hello_exchange', routing_key='hello_key', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
結果:
可以看到 hello_exchange 有兩個 binding 分別對應 hello 和 hello2 這兩個 queue
hello Queue和 hello2 Queue都有各收到一個"Hello World!"消息
2. Fanout
Fanout Exchange 會發送消息到他所知道的所有 queue,不會看 routing_key
import pika
from pika.exchange_type import ExchangeType
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='fanout_exchange', exchange_type=ExchangeType.fanout)
channel.queue_declare(queue='hello')
channel.queue_declare(queue='hello2')
channel.queue_bind(exchange='fanout_exchange',queue="hello")
channel.queue_bind(exchange="fanout_exchange",queue="hello2",)
channel.basic_publish(exchange='fanout_exchange',routing_key='', body='Hello World!') # 還是要帶 routing_key 不然 pika 會噴錯, 但 fanout exchange 其實不會看這個值
print(" [x] Sent 'Hello World!'")
connection.close()
結果:
3. Topic
Topic Exchange 跟 Direct Exchange 差不多,但可以在 binding routing_key 裡使用 wildcard
import pika
from pika.exchange_type import ExchangeType
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_exchange', exchange_type=ExchangeType.topic)
channel.queue_declare(queue='hello')
channel.queue_declare(queue='hello2')
channel.queue_bind(exchange='topic_exchange', queue="hello", routing_key= "log.*")
channel.queue_bind(exchange='topic_exchange', queue="hello2", routing_key= "other.*")
channel.basic_publish(exchange='topic_exchange',routing_key='log.a', body='Hello World!') # 會只送到 queue hello
print(" [x] Sent 'Hello World!'")
connection.close()
結果:
4. Headers
Header Exchange 則會看消息的 header 值來決定要傳到哪些Queue
import pika
from pika.exchange_type import ExchangeType
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='header_exchange', exchange_type=ExchangeType.headers)
channel.queue_declare(queue='hello')
channel.queue_declare(queue='hello2')
channel.queue_bind(exchange='header_exchange', queue="hello", arguments={'x-match': 'any', 'key1': 'one', 'key2': 'two'}) # x-match 設定成 any 的話 header 值只要對到一個就會送
channel.queue_bind(exchange='header_exchange', queue="hello2", arguments={'x-match': 'all', 'key1': 'one', 'key2': 'two'} ) # x-match 設定成 all 的話 header 值要全對到才會送
channel.basic_publish(exchange='header_exchange', routing_key='', body='Hello World!',properties=pika.BasicProperties(
headers={'key1': 'one'}
)) # 只會送到 hello queue
print(" [x] Sent 'Hello World!'")
connection.close()
上一章:RabbitMQ (4):Message Acknowledgments 與 Durability
下一章:RabbitMQ (6):使用 RabbitMQ 建立 RPC system
沒有留言:
張貼留言