2021年11月17日 星期三

RabbitMQ (4):Message Acknowledgments與Durability

 

Message Acknowledgments

RabbitMQ 提供了 Message Acknoledgments (簡稱ack) 的功能,讓我們能確保消息有準確傳達,以及正確執行

運作上就是 consumer 在完成任務後要再通知打 ack 給 RabbitMQ,讓 RabbitMQ 知道這個消息已經執行完成,MQ 收到 ack 之後也就能真的刪除這個消息

在沒有收到 ack 的狀況,超過一定時間 timeout 後,RabbitMQ 會自動重送消息給 consumer

ack timeout 預設是 30 分鐘,也可以修改 rabbitmq.conf 設定檔的 consumer_timeout 值做調整

另外,ack 一定要跟收到消息的 channel 使用同個,不然會有錯誤


上一章的 Consumer 程式在註冊 callback 的地方其實有設定 auto_ack=True,pika就會在運行完 callback 函式後自動幫我們回傳 ack 給 MQ

channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)


若沒有 auto_ack=True 時,我們要這樣回傳 ack

def callback(ch, method, properties, body):
    (" [x] Received %r" % body)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(queue='hello', on_message_callback=callback)


delivery_tag 是 RabbitMQ 發送消息給 consumer 會帶的 tag,在一個 channel 裡是 unique 的值,ack 時帶這個值讓 MQ 知道是針對哪個消息的 ack


prefetch_count

Queue 和 Consumer 的工作模式,我之前其實一直以為是 Consumer 做完一個,之後才會又收到一個消息要處理

但其實不是這樣的!

若沒做什麼設定下,RabbitMQ 會一直將消息派發給註冊的 Consumer,Consumer 端 client library 會有快取用來存這些派發來的消息,在這狀況下,其實消息不是停在 RabbitMQ 的 queue,而是存在 client 端

然後這樣的消息派發,RabbitMQ 不會看當前這 Consumer 還在處理中的消息量 (還未ack的消息) ,都會發,這樣可能造成某 Consumer 已經負荷過多還是一直收到消息


如果希望 RabbitMQ 在發消息前能看一下當前 Consumer 處理消息的狀況再發送,可以設定 prefetch_count 值

channel.basic_qos(prefetch_count=1)


prefetch_count 設定成 1,則 RabbitMQ 會看這 Consumer 若有一個還未 ack 的消息在處理中,就不會再發消息給該 Consumer,如此能比較平均分配消息處理,不會有一堆消息卡在某過負荷的 Consumer 一直沒 ack

prefetch_count 設定比較小,能讓消息分配更平均,設定比較大 Consumer 一次收到比較多消息,Consumer 會比較少處於 idle 沒事做的狀態,也少多次傳輸的時間

通常在消息處理時間長時,prefetch_count 會設定比較小,處理時間短時,prefetch_count 設定可以比較大


Message Durability

Message Acknowledgments 在 consumer 掛掉時能確保有重送機制,但換作如果是 RabbitMQ 掛掉就沒救了

這種狀況,能設定 Queue 為 durable,確保在 RabbitMQ restart 後 queue 還能存活

channel.queue_declare(queue='task_queue', durable=True)


還要設定 message 是 persistent 的 (將delivery_mode 設定成 2),會將消息存進 disk 中,防止 RabbitMQ crash 時消息遺失

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))


但 message persistence 還是有可能消息遺失,如果需要更嚴謹的設計,需要使用 publisher confirms (https://www.rabbitmq.com/confirms.html#publisher-confirms)



上一章:RabbitMQ (3):Python 實作 RabbitMQ Producer 端與 Consumer 端

下一章:RabbitMQ (5):路由設定與Exchange種類


沒有留言:

張貼留言