Parallel tasks in Python: concurrent.futures

Parallel tasks in Python: concurrent.futures

TL;DR: concurrent.futures is well suited to Embarrassingly Parallel tasks. You could write concurrent code with a simple for loop.


concurrent.futures is part of the standard library in Python 3.2+. If you're using an older version of Python, you need to install the futures package.

$ pip install futures


You should use the ProcessPoolExecutor for CPU intensive tasks and the ThreadPoolExecutor is suited for network operations or I/O. The ProcessPoolExecutor uses the multiprocessing module, which is not affected by GIL (Global Interpreter Lock) but also means that only picklable objects can be executed and returned.

In Python 3.5+, map() receives an optional argument: chunksize. For very long iterables, using a large value for chunksize can significantly improve performance compared to the default size of 1. With ThreadPoolExecutor, chunksize has no effect.

from concurrent.futures import ThreadPoolExecutor
import time

import requests

def fetch(a):
    url = '{0}'.format(a)
    r = requests.get(url)
    result = r.json()['args']
    return result

start = time.time()

# if max_workers is None or not given, it will default to the number of processors, multiplied by 5
with ThreadPoolExecutor(max_workers=None) as executor:
    for result in, range(42)):
        print('response: {0}'.format(result))

print('time: {0}'.format(time.time() - start))

You might want to change the value of max_workers to 1 and observe the difference.


executor.submit() and as_completed()

executor.submit() returns a Future object. A Future is basically an object that encapsulates an asynchronous execution of a function that will finish (or raise an exception) in the future.

The main difference between map and as_completed is that map returns the results in the order in which you pass iterables. On the other hand, the first result from the as_completed function is from whichever future completed first. Besides, iterating a map() returns results of futures; iterating a as_completed(futures) returns futures themselves.

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

import requests

def fetch(url, timeout):
    r = requests.get(url, timeout=timeout)
    data = r.json()['args']
    return data

start = time.time()

with ThreadPoolExecutor(max_workers=20) as executor:
    futures = {}
    for i in range(42):
        url = '{0}'.format(i)
        future = executor.submit(fetch, url, 60)
        futures[future] = url

    for future in as_completed(futures):
        url = futures[future]
            data = future.result()
        except Exception as exc:
            print('fetch {0}, get {1}'.format(url, data))

print('time: {0}'.format(time.time() - start))




Read and write files in Go

Reading file line by line

建議用 bufio.Scanner
但是 Scanner 有個缺點
就是一行太長(超過 64K)的時候會出現 bufio.Scanner: token too long 的錯誤
這時候還是得用 bufio.Reader

fin, err := os.Open(path)
if err != nil {
defer fin.Close()

scanner := bufio.NewScanner(fin)
for scanner.Scan() {
    line := scanner.Text()
    fmt.Fprintln(os.Stdin, line)

if err := scanner.Err(); err != nil {
    fmt.Fprintln(os.Stderr, err)

If you know the maximum length of the tokens you will be reading, copy the bufio.Scanner code into your project and change the const MaxScanTokenSize value.



Reading and writing file line by line

fmt.Fprintln(writer, line)bw.WriteString(line) 還要快

func FileSpacing(filename string, w io.Writer) (err error) {
    fr, err := os.Open(filename)
    if err != nil {
        return err
    defer fr.Close()

    br := bufio.NewReader(fr)
    bw := bufio.NewWriter(w)

    for {
        line, err := br.ReadString('\n')
        if err == nil {
            fmt.Fprint(bw, TextSpacing(line))
        } else {
            if err == io.EOF {
                fmt.Fprint(bw, TextSpacing(line))
            return err
    defer bw.Flush()

    return nil

Copy a file

fin, _ := os.Open("source.txt")
fout, _ := os.Create("destination.txt")

io.Copy(fout, fin)

defer fout.Close()
defer fin.Close()


Compute MD5 of a file

func md5Of(filename string) string {
    var result []byte

    file, err := os.Open(filename)
    defer file.Close()

    hash := md5.New()
    _, err = io.Copy(hash, file)

    checksum := hex.EncodeToString(hash.Sum(result))

    return checksum


Read and save file in Django / Python

File 和 ImageFile 接受 Python 的 file 或 StringIO 物件
而 ContentFile 接受 string


Django Form

image_file = request.FILES['file']

# 方法一, image_file)

# 方法二
profile.mugshot = image_file

File Upload with Form in Django


from django.core.files import File

with open('/home/vinta/image.png', 'rb') as f:
    profile.mugshot = File(f)

Django ContentFile

import os
import uuid

from django.core.files.base import ContentFile

import requests

url = ''
r = requests.get(url)
file_url, file_ext = os.path.splitext(r.url)
file_name = '%s%s' % (str(uuid.uuid4()).replace('-', ''), file_ext)'123.png', ContentFile(r.content), save=False)

# 如果 profile.mugshot 是 ImageField 欄位的話
# 可以用以下的方式來判斷它是不是合法的圖檔
except TypeError:
    raise RuntimeError('圖檔格式不正確')

Data URI, Base64

from binascii import a2b_base64

from django.core.files.base import ContentFile

data_uri = '....'
head, data = data_uri.split(',')
binary_data = a2b_base64(data)

# 方法一'whatever.jpg', ContentFile(binary_data), save=False)

# 不能用這種方式,因為少了 file name
profile.mugshot = ContentFile(binary_data)

# 方法二
f = open('image.png', 'wb')

# 方法三
from StringIO import StringIO
from PIL import Image
img =
print img.size


StringIO, PIL image

你就把 StringIO 想成是 open('/home/vinta/some_file.txt', 'rb') 的 file 物件

from StringIO import StringIO

from PIL import Image
import requests

r = requests.get('')
img =
print pil_image.size

StringIO, PIL image, Django

from StringIO import StringIO

from django.core.files.base import ContentFile

from PIL import Image

raw_img_io = StringIO(binary_data)
img =
img = img.resize((524, 328), Image.ANTIALIAS)
img_io = StringIO(), 'PNG', quality=100)'whatever.png', ContentFile(img_io.getvalue()), save=False)


Download file from URL, tempfile

import os
import tempfile
import requests
import xlrd

    file_path = report.file.path
    temp = None
except NotImplementedError:
    url = report.file.url
    r = requests.get(url, stream=True)
    file_url, file_ext = os.path.splitext(r.url)

    # delete=True 會在 temp.close() 之後自己刪掉
    temp = tempfile.NamedTemporaryFile(prefix='report_file_', suffix=file_ext, dir='/tmp', delete=False)
    file_path =

    with open(file_path, 'wb') as f:
        for chunk in r.iter_content(chunk_size=1024):
            if chunk:

wb = xlrd.open_workbook(file_path)


# 因為是 tempfile.NamedTemporaryFile(delete=False)
# 所以你要自己刪掉
except AttributeError:


Write to files in Python

因為寫入資料到 disk 很慢
比較好的方式是累積一些資料後再一次寫入(所謂的 buffer)
Python 的 open('/path/to/file/') 預設就有 buffer 了(第三個參數就是 buffering)
無論是 read, write 或 append

The optional buffering argument specifies the file’s desired buffer size: 0 means unbuffered, 1 means line buffered, any other positive value means use a buffer of (approximately) that size (in bytes). A negative buffering means to use the system default, which is usually line buffered for tty devices and fully buffered for other files. If omitted, the system default is used.

In [1]: import io
Out[2]: 8192
from import NoArgsCommand

from music.models import Song

class Command(NoArgsCommand):

    def handle_noargs(self, **options):
        start = time.time()

        total_lines = 0
        with open('dump_song_data.txt', 'w') as f:
            # header_line = 'user id, song id\n'
            # f.write(header_line)

            songs = Song.objects.filter(play_count__gte=10000)
            for song in songs.iterator():
                line = '%s\n' % (

                total_lines += 1

        end = time.time()

        print(end - start)