上章學會了 Python Thread 的基本寫法
這章要講的是當你的程式用到 multi-thread ,有個一定要小心的問題,那就是:Race Condition
什麼是 Race Condition
Race Condition 發生在多個 thread 存取同個變數時,因為 thread 的執行順序是由作業系統排程決定,每次執行都會有不同的順序,而這不同的執行順序導致不預期的結果,就是 Race Condition
舉例來說有兩個thread A 與 B 都會讀取變數x,然後加 1 ,更新變數 x (一開始 x = 0)
我們希望的順序是這樣:
thread A: 讀 x (x=0)
thread A: x = x + 1
thread A: 更新 x,x 變成 1
thread B: 讀 x (x=1)
thread B: x = x + 1
thread B: 更新 x,x 變成 2
x 最後是 2
但有可能在作業系統的排程下,運行的順序是這樣:
thread A: 讀 x (x=0)
thread B: 讀 x (x=0)
thread A: x = x + 1
thread A: 更新 x,x 變成 1
thread B: x = x + 1
thread B: 更新 x,x 變成 1
x 最後是 1,不是我們預期的結果,thread A 的計算被 thread B 給蓋掉了
基本上,程式在多thread有讀取、更新同一變數的情況,就會有Race Condition的問題要處理
Race Condition 的解法
以上面的例子來說,其實我們預期是每個 thread 讀 x 然後加 1 更新完後再有其他 thread 讀 x 做更新的動作
換句話說,讀 x 加 1 再更新 x,這個動作我們只允許一個 thread 執行到這段,不能有多個 thread 同時跑到這段程式碼
而要做到擋住其他 thread 跑到某段程式碼,就需要使用到 Lock
下面是模擬 x 最後存成 1 的狀況
import threading
from time import sleep
x = 0
def count():
global x
t = x + 1
sleep(1)
x = t
t_a = threading.Thread(target=count)
t_b = threading.Thread(target=count)
t_a.start()
t_b.start()
t_a.join()
t_b.join()
print(x)
我們試著用 Lock 讓我們得到預期的 2 結果
import threading
from time import sleep
x = 0
lock = threading.Lock()
def count():
lock.acquire() # 拿取lock資源,沒拿到會卡在這
global x
t = x + 1
sleep(1)
x = t
lock.release() # 釋放lock資源
t_a = threading.Thread(target=count)
t_b = threading.Thread(target=count)
t_a.start()
t_b.start()
t_a.join()
t_b.join()
print(x)
其實只加了三行程式碼
當中要熟悉的是這兩個 Lock 物件方法:
- acquire(blocking=True, timeout=-1):當blocking=True時,thread會block住直到lock被釋放取得lock後,就會返回True
當blocking=False時thread不會block住,沒拿到lock就返回False
只有在blocking=True,timeout才會看,-1代表會等直到拿到lock
- release():將lock釋放
def count():
with lock:
global x
t = x + 1
sleep(1)
x = t
什麼是 thread safe
我們會講某個函式是否 thread safe,指的是這個函式執行是否會有 Race Condition 問題
當你使用第三方函式庫時且是在多thread的狀況下調用方法,就要特別注意是否有寫說是 thread safe,若非 thread safe 要自己處理 Race Condition 問題
補充:Semaphore
Doc:https://docs.python.org/zh-tw/3/library/threading.html#semaphore-objects
上面的 Lock 物件是只會建立一個資源,某個 thread 拿了就要等該 lock 被釋放後其他 thread 才可以取得
Semaphore 則可以指定資源數量,可以有多個資源供 thread 取得,直到 0 為止才會被阻塞要等待資源釋放,例如 threading.Semaphore(2) 就能產生好比是兩個 lock,就算第一個 thread 拿了其中一個,還有一個可以供下個 thread 使用,第三個 thread 才有可能遇到 Semaphore 沒了要等待釋放的狀況
Semaphore 多應用在有限容量的狀況,不能讓太多thread同時執行時
補充:RLock 重入鎖
Doc:https://docs.python.org/zh-tw/3/library/threading.html#rlock-objects
RLock 有些特性跟一般的 Lock 不太一樣
RLock 重入鎖必須由獲取它的 thread 釋放
一旦 thread 獲得了 RLock ,同一個 thread 再次獲取它將不阻塞, thread 必須在每次獲取它時釋放一次
補充:Condition
Doc:https://docs.python.org/zh-tw/3/library/threading.html#condition-objects
threading 模組內還有提供我們一些其他物件方法,做到比較複雜的 thread 操控,像是 Condition 物件
Condition 物件具有 wait() 與 notify() 方法,分別有以下功能:
- wait(timeout=None):會釋放Condition底層鎖,並阻塞住直到別的線程調用notify(),要注意只有在線程持有鎖的時候才可以調用這個方法,否則會有RuntimeError
- notify(n=1):喚醒最多n個wait中的線程,要注意只有在線程持有鎖的時候才可以調用這個方法,否則會有RuntimeError
透過 wait 與 notify 我們能做到讓 thread 在沒工作時可以釋放 lock,當有工作時在使用 notify 喚起 thread 工作
下面的例子是我們建立一個 producer 負責生成任務到 Queue 裡,另外有一個 consumer 則負責從 queue 裡撈資料出來執行,當 consumer 遇到 queue 沒有東西時可以做 wait 等待 producer 塞資料後再被喚醒做事
import threading
from queue import Queue
from time import sleep
con = threading.Condition()
queue = Queue()
def producer():
for i in range(5):
sleep(1)
if con.acquire():
queue.put(i)
con.notify()
con.release()
print("producer end")
def consumer():
while True:
if con.acquire():
if queue.empty():
con.wait()
x = queue.get()
print(f"get {x}")
con.release()
if x == 4:
break
print("consumer end")
t1 = threading.Thread(target = producer)
t2 = threading.Thread(target = consumer)
t1.start()
t2.start()
t1.join()
t2.join()
結尾
threading 模組裡還有提供其他物件方法讓我們能控制多thread時的執行,像是 Event、Timer...等,上面只寫了幾個我覺得比較常用的,其實也不用急著要完全熟悉整個 threading 模組,總之先大概知道概念有這種問題與解法,等真的遇到有要解決的問題再來細研究也是來得及了,先看一堆沒再用,像我大概過一個月就忘了XD。
上一章:[Python] Multithreading (2):Hello Thread 建立第一個 Multithread 程式
下一章:[Python] Multihtreading (4):threading.local() 介紹
沒有留言:
張貼留言