2021年11月17日 星期三

RabbitMQ (3):Python實作RabbitMQ Producer端與Consumer端

 

Producer 端實作

使用 python pika client library 實作簡單的 Producer 端發送訊息到 MQ 裡

pika doc:https://pika.readthedocs.io/en/stable/


這裡是參照 RabbitMQ 官方教學的範例



import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))  # 首先,建立與RabbitMQ server間的連線

channel = connection.channel()   # 打開 channel 連線

channel.queue_declare(queue='hello')   # 建立Queue (queue 的名字叫 hello)

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')   # 發送訊息到 hello queue,訊息的內容是 "Hello World!" 字串

connection.close()   # 關閉連線,也確保network buffer flush


跑了上面的程式碼後 (別忘記要先跑 RabbitMQ server )

從RabbitMQ console上我們看到我們宣告的 hello queue

然後目前裡面有一個 message



1. Connection 和 Channel 有什麼差別?

Connection 是 TPC connection,指的是實際上的網路連線,而這 Connection 是 multiplexing 的,一個 Connection 連線上可以開有多個 Channel 做溝通與傳輸

一般我們會盡量重複利用同個 Connection 不要連太多連線到 RabbitMQ


2. Queue可以重複declare嗎?

這上面的範例程式碼是可以重複跑的

跑多次 channel.queue_declare(queue='hello') 是沒問題的

因為 queue_declare() 這函式是 idempotent (跑多次的結果都一樣)

所以其實要看你用的 library 的函式是否為 idempotent

但在 RabbitMQ 是不能生成同名的 Queue


3. routing_key是什麼?

要發送訊息給 RabbitMQ 的 queue 中前,一定要透過 exchange 元件

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')

exchange 參數帶空字串會將訊息傳到 default exchange,而 routing_key 跟 exchange 要將訊息導向哪個 queue 有關

default exchange 能 routing_key 帶 queue 名,則 default exchange 就會知道要導到哪個 queue



Consumer 端實作

一樣是參照 RabbitMQ 官方教學的範例



import pika, sys, os

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='hello')  # 在producer的程式我們已經declare queue,所以這行其實沒作用,但上面也有提到了,重複CALL沒關係,就確保hello queue一定有

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)

    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)   # 註冊 callback 到 queue,收到 message 會跑 callback function

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()    # never-ending loop

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)


首先建立連線

註冊 callback 函式到 RabbitMQ queue,之後進入 loop 等待與接收訊息,收到訊息會跑 callback 函式



上一章:RabbitMQ (2):使用Docker建立RabbitMQ測試環境

下一章:RabbitMQ (4):Message Acknowledgments 與 Durability



沒有留言:

張貼留言