Parallel tasks in Python: concurrent.futures

Parallel tasks in Python: concurrent.futures

Install

concurrent.futures is part of the standard library in Python 3.2+. If you're using an older version of Python, you need to install the futures package.

$ pip install futures

ref:
https://docs.python.org/3/library/concurrent.futures.html

executor.map()

You should use the ProcessPoolExecutor for CPU intensive tasks and the ThreadPoolExecutor is suited for network operations or I/O. The ProcessPoolExecutor uses the multiprocessing module, which is not affected by GIL (Global Interpreter Lock) but also means that only picklable objects can be executed and returned.

In Python 3.5+, map() receives an optional argument: chunksize. For very long iterables, using a large value for chunksize can significantly improve performance compared to the default size of 1. With ThreadPoolExecutor, chunksize has no effect.

from concurrent.futures import ThreadPoolExecutor
import time

import requests

def fetch(a):
    url = 'http://httpbin.org/get?a={0}'.format(a)
    r = requests.get(url)
    return r.json()['args']

start = time.time()

# if max_workers is None or not given, it will default to the number of processors, multiplied by 5
with ThreadPoolExecutor(max_workers=None) as executor:
    for result in executor.map(fetch, range(30)):
        print('response: {0}'.format(result))

print('Use requests + ThreadPoolExecutor cost: {}'.format(time.time() - start))

ref:
https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures
https://www.blog.pythonlibrary.org/2016/08/03/python-3-concurrency-the-concurrent-futures-module/
http://masnun.com/2016/03/29/python-a-quick-introduction-to-the-concurrent-futures-module.html

executor.submit() and as_completed()

executor.submit() returns a Future object. A Future is basically an object that encapsulates an asynchronous execution of a function that will finish (or raise an exception) in the future.

The main difference between map and as_completed is that map returns the results in the order in which you pass iterables. On the other hand, the first result from the as_completed function is from whichever future completed first. Besides, iterating a map() returns results of futures; iterating a as_completed(futures) returns futures themselves.

from concurrent.futures import ThreadPoolExecutor, as_completed

import requests

def fetch(url, timeout):
    r = requests.get(url, timeout=timeout)
    data = r.json()['args']
    return data

with ThreadPoolExecutor(max_workers=10) as executor:
    futures = {}
    for i in range(42):
        url = 'https://httpbin.org/get?i={0}'.format(i)
        future = executor.submit(fetch, url, 60)
        futures[future] = url

    for future in as_completed(futures):
        url = futures[future]
        try:
            data = future.result()
        except Exception as exc:
            print(exc)
        else:
            print('fetch {0}, get {1}'.format(url, data))

ref:
https://docs.python.org/3/library/concurrent.futures.html#future-objects

goroutine, channel

channel

如果是沒有 buffer 的 channel
讀取 channel(value <- ch)會 block 當前的 goroutine,直到別的 goroutine 寫入數據(ch <- 1)
寫入 channel(ch <- 1)也會 block 當前的 goroutine,直到別的 goroutine 接收數據(value <- ch)
main() 其實也是個 goroutine

runtime.GOMAXPROCS(1) 的情況下(也是默認的情況)
同一時間只會有一個 goroutine 在跑
當 goroutine 遇到阻塞(例如 IO, time.Sleep())時
就會讓出 CPU 給別的 goroutine(相當於執行了 runtime.Gosched
也就是說如果某個 goroutine 沒有遇到阻塞
它就不會讓出執行權給其他的 goroutine
而是會一直執行到 return

func say(s string) {
    for i := 0; i < 5; i++ {
        fmt.Println(s)
    }
}

# 默認只會用到一個 CPU(只在一個 thread 裡跑)
# main() 這個 goroutine 被這個無限迴圈的 for loop 佔滿了(無限迴圈不算是阻塞)
# 沒有機會把執行權交給其他的 goroutine
func main() {
    go say("something")
    for {
    }
}

ref:
http://eleme.io/blog/2014/goroutine-1/
http://eleme.io/blog/2014/goroutine-2/
http://eleme.io/blog/2014/goroutine-3/

buffered channel

# 默認是沒有 buffer 的 channel
# 只要一有數據存入,在數據沒被取出之前,channel 都會阻塞
ch0 := make(chan int)

# 容量為 2 的 channel,可以想成是一個 queue
# 在 queue 滿之前,沒有 goroutine 會被阻塞
ch2 := make(chan int, 2)

runtime.GOMAXPROCS

Go 默認只會使用一個 CPU 來執行 goroutine
不會依據你的 CPU 數來切換
但是你可以用環境變數 GOMAXPROCS 指定
或是在程式裡指定

import "runtime"

runtime.GOMAXPROCS(runtime.NumCPU())

sync.WaitGroup

WaitGroups are more useful for doing different tasks in parallel.

package main

import (
    "fmt"
    "io/ioutil"
    "log"
    "net/http"
    "sync"
)

func main() {
    var wg sync.WaitGroup

    var aww string
    wg.Add(1)
    go func() {
        defer wg.Done()
        aww = fetch("http://www.reddit.com/r/aww.json")
    }()

    var funny string
    wg.Add(1)
    go func() {
        defer wg.Done()
        funny = fetch("http://www.reddit.com/r/funny.json")
    }()

    wg.Wait()

    fmt.Println("aww:", aww)
    fmt.Println("funny:", funny)
}

func fetch(url string) string {
    res, err := http.Get(url)
    if err != nil {
        log.Fatal(err)
    }
    body, err := ioutil.ReadAll(res.Body)
    res.Body.Close()
    if err != nil {
        log.Fatal(err)
    }
    return string(body)
}

ref:
http://nathanleclaire.com/blog/2014/02/21/how-to-wait-for-all-goroutines-to-finish-executing-before-continuing-part-two-fixing-my-ooops/

fatal error: all goroutines are asleep - deadlock!

func main() {
    ch := make(chan int)
    <- ch
}

因為沒有任何人可以向 ch 寫入數據
所以 main() 的 goroutine 會一直阻塞
等待果陀的數據