2021年11月16日 星期二

[Python] Threading (3):Race Condition與Thread-Safe


上章學會了 Python Thread 的基本寫法

這章要講的是當你的程式用到 multi-thread ,有個一定要小心的問題,那就是:Race Condition

 

什麼是 Race Condition

Race Condition 發生在多個 thread 存取同個變數時,因為 thread 的執行順序是由作業系統排程決定,每次執行都會有不同的順序,而這不同的執行順序導致不預期的結果,就是 Race Condition


舉例來說有兩個thread A 與 B 都會讀取變數x,然後加 1 ,更新變數 x (一開始 x = 0)

我們希望的順序是這樣:

thread A: 讀 x (x=0)

thread A: x = x + 1

thread A: 更新 x,x 變成 1

thread B: 讀 x (x=1)

thread B: x = x + 1

thread B: 更新 x,x 變成 2

x 最後是 2


但有可能在作業系統的排程下,運行的順序是這樣:

thread A: 讀 x (x=0)

thread B: 讀 x (x=0)

thread A: x = x + 1

thread A: 更新 x,x 變成 1

thread B: x = x + 1

thread B: 更新 x,x 變成 1

x 最後是 1,不是我們預期的結果,thread A 的計算被 thread B 給蓋掉了


基本上,程式在多thread有讀取、更新同一變數的情況,就會有Race Condition的問題要處理


Race Condition 的解法

以上面的例子來說,其實我們預期是每個 thread 讀 x 然後加 1 更新完後再有其他 thread 讀 x 做更新的動作

換句話說,讀 x 加 1 再更新 x,這個動作我們只允許一個 thread 執行到這段,不能有多個 thread 同時跑到這段程式碼


而要做到擋住其他 thread 跑到某段程式碼,就需要使用到 Lock

下面是模擬 x 最後存成 1 的狀況



import threading
from time import sleep

x = 0

def count():
    global x
    t = x + 1
    sleep(1)
    x = t

t_a = threading.Thread(target=count)
t_b = threading.Thread(target=count)

t_a.start()
t_b.start()

t_a.join()
t_b.join()

print(x)


我們試著用 Lock 讓我們得到預期的 2 結果



import threading
from time import sleep

x = 0
lock = threading.Lock()

def count():
    lock.acquire()  # 拿取lock資源,沒拿到會卡在這
    global x
    t = x + 1
    sleep(1)
    x = t
    lock.release()  # 釋放lock資源


t_a = threading.Thread(target=count)
t_b = threading.Thread(target=count)

t_a.start()
t_b.start()

t_a.join()
t_b.join()

print(x)


其實只加了三行程式碼

當中要熟悉的是這兩個 Lock 物件方法:

- acquire(blocking=True, timeout=-1):當blocking=True時,thread會block住直到lock被釋放取得lock後,就會返回True

當blocking=False時thread不會block住,沒拿到lock就返回False

只有在blocking=True,timeout才會看,-1代表會等直到拿到lock

- release():將lock釋放


我們使用 threading.Lock() 建立一個 Lock 資源

在讀取 x 前 call lock.acquire() 要求 thread 要取得該 lock 才可以繼續往下執行,在更新完 x 做 lock.release() 釋放 lock

所以在執行到 "讀 x 加 1 再更新 x" 這段程式碼的 thread 一定要有 lock,而這 lock 只有一個,因此我們能確保同時只有一個 thread 跑到這段程式碼


另外上面的程式碼我們其實可以寫得更簡略,Lock 有實作 with (context manager),在count函式那我們可以這樣寫就好



def count():
    with lock:
        global x
        t = x + 1
        sleep(1)
        x = t


什麼是 thread safe

我們會講某個函式是否 thread safe,指的是這個函式執行是否會有 Race Condition 問題

當你使用第三方函式庫時且是在多thread的狀況下調用方法,就要特別注意是否有寫說是 thread safe,若非 thread safe 要自己處理 Race Condition 問題


補充:Semaphore

Doc:https://docs.python.org/zh-tw/3/library/threading.html#semaphore-objects

上面的 Lock 物件是只會建立一個資源,某個 thread 拿了就要等該 lock 被釋放後其他 thread 才可以取得

Semaphore 則可以指定資源數量,可以有多個資源供 thread 取得,直到 0 為止才會被阻塞要等待資源釋放,例如 threading.Semaphore(2) 就能產生好比是兩個 lock,就算第一個 thread 拿了其中一個,還有一個可以供下個 thread 使用,第三個 thread 才有可能遇到 Semaphore 沒了要等待釋放的狀況

Semaphore 多應用在有限容量的狀況,不能讓太多thread同時執行時


補充:RLock 重入鎖

Doc:https://docs.python.org/zh-tw/3/library/threading.html#rlock-objects

RLock 有些特性跟一般的 Lock 不太一樣

RLock 重入鎖必須由獲取它的 thread 釋放

一旦 thread 獲得了 RLock ,同一個 thread 再次獲取它將不阻塞, thread 必須在每次獲取它時釋放一次


補充:Condition

Doc:https://docs.python.org/zh-tw/3/library/threading.html#condition-objects


threading 模組內還有提供我們一些其他物件方法,做到比較複雜的 thread 操控,像是 Condition 物件

Condition 物件具有 wait() 與 notify() 方法,分別有以下功能:

- wait(timeout=None):會釋放Condition底層鎖,並阻塞住直到別的線程調用notify(),要注意只有在線程持有鎖的時候才可以調用這個方法,否則會有RuntimeError

- notify(n=1):喚醒最多n個wait中的線程,要注意只有在線程持有鎖的時候才可以調用這個方法,否則會有RuntimeError

透過 wait 與 notify 我們能做到讓 thread 在沒工作時可以釋放 lock,當有工作時在使用 notify 喚起 thread 工作

下面的例子是我們建立一個 producer 負責生成任務到 Queue 裡,另外有一個 consumer 則負責從 queue 裡撈資料出來執行,當 consumer 遇到 queue 沒有東西時可以做 wait 等待 producer 塞資料後再被喚醒做事



import threading
from queue import Queue
from time import sleep

con = threading.Condition()
queue = Queue()

def producer():
    for i in range(5):
        sleep(1)
        if con.acquire():
            queue.put(i)
            con.notify()
            con.release()
    print("producer end")


def consumer():
    while True:
        if con.acquire():
            if queue.empty():
                con.wait()
            x = queue.get()
            print(f"get {x}")
            con.release()
            if x == 4:
                break
    print("consumer end")
            
       
t1 = threading.Thread(target = producer)
t2 = threading.Thread(target = consumer)

t1.start()
t2.start()

t1.join()
t2.join()


結尾

threading 模組裡還有提供其他物件方法讓我們能控制多thread時的執行,像是 Event、Timer...等,上面只寫了幾個我覺得比較常用的,其實也不用急著要完全熟悉整個 threading 模組,總之先大概知道概念有這種問題與解法,等真的遇到有要解決的問題再來細研究也是來得及了,先看一堆沒再用,像我大概過一個月就忘了XD。



上一章:[Python] Multithreading (2):Hello Thread 建立第一個 Multithread 程式

下一章:[Python] Multihtreading (4):threading.local() 介紹



沒有留言:

張貼留言