2021年11月16日 星期二

[Python] Threading (5):concurrent.futures.ThreadPoolExecutor 介紹

concurrent.futures module提供python異步執行的高階interface

https://docs.python.org/3/library/concurrent.futures.html


這裡介紹當中的 ThreadPoolExecutor,當你有一個工作可以切分成多塊重複執行時,可以透過使用 ThreadPoolExecutor 建立多個 thread 併發跑。


Thread Pool 的好處是我們可以重複使用 thread,以減少新生成 thread 耗的資源


下面的官方範例建立了一個內有五個 thread 的 thread pool 做為 executor,去連 URLS 連結拿取頁面資料,然後印出 



import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']


# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()


# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))



- ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

ThreadPoolExecutor 物件初始化

當max_workers=None時,3.8版後預設為min(32, os.cpu_count() + 4)

initializer是每個worker開始前會執行的函式

initargs是傳給initializer的參數,需傳入tuple


- submit(fn, *args, **kwargs)

ThreadPoolExecutor 物件方法

返回future物件,future物件會將async function包裝起來(Future encapsulates the asynchronous execution of a callable.)

executor call submit()後就會開始排worker執行function


- concurrent.futures.as_completed(fs, timeout=None)

會yield返回future object,優先從完成的future返回 (finished or cancelled futures)

如果在__next__()時,超過timeout時間沒有完成的future,會raise concurrent.futures.TimeoutError


- result(timeout=None)

fututre 物件方法

取得future的結果,若future還未完成會等到完成返回值

若等超過timeout時間則會raise concurrent.futures.TimeoutError

若future在完成前被cancel則會raise CancelledError


- map(func, *iterables, timeout=None, chunksize=1)

ThreadPoolExecutor 物件方法

map 方法會回傳 iterator,iter 的是 func 回傳值 result,此方法會要依序等回傳值

不同於上面的 as_completed,as_completed 是返回 future,且會優先返回完成的 future



import concurrent.futures
import urllib.request
from itertools import repeat
URLS = [
        'http://www.cnn.com/',
        'http://www.foxnews.com/',
        #'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',]
        #'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()


with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    for i in executor.map(load_url, URLS, repeat(60)):
        print(len(i))
 
 


上一章:[Python] Multihtreading (4):threading.local() 介紹



沒有留言:

張貼留言