Parallel tasks in Python: concurrent.futures

Parallel tasks in Python: concurrent.futures

Install

concurrent.futures is part of the standard library in Python 3.2+. If you're using an older version of Python, you need to install the futures package.

$ pip install futures

ref:
https://docs.python.org/3/library/concurrent.futures.html

executor.map()

You should use the ProcessPoolExecutor for CPU intensive tasks and the ThreadPoolExecutor is suited for network operations or I/O. The ProcessPoolExecutor uses the multiprocessing module, which is not affected by GIL (Global Interpreter Lock) but also means that only picklable objects can be executed and returned.

In Python 3.5+, map() receives an optional argument: chunksize. For very long iterables, using a large value for chunksize can significantly improve performance compared to the default size of 1. With ThreadPoolExecutor, chunksize has no effect.

from concurrent.futures import ThreadPoolExecutor
import time

import requests

def fetch(a):
    url = 'http://httpbin.org/get?a={0}'.format(a)
    r = requests.get(url)
    return r.json()['args']

start = time.time()

# if max_workers is None or not given, it will default to the number of processors, multiplied by 5
with ThreadPoolExecutor(max_workers=None) as executor:
    for result in executor.map(fetch, range(30)):
        print('response: {0}'.format(result))

print('Use requests + ThreadPoolExecutor cost: {}'.format(time.time() - start))

ref:
https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures
https://www.blog.pythonlibrary.org/2016/08/03/python-3-concurrency-the-concurrent-futures-module/
http://masnun.com/2016/03/29/python-a-quick-introduction-to-the-concurrent-futures-module.html

executor.submit() and as_completed()

executor.submit() returns a Future object. A Future is basically an object that encapsulates an asynchronous execution of a function that will finish (or raise an exception) in the future.

The main difference between map and as_completed is that map returns the results in the order in which you pass iterables. On the other hand, the first result from the as_completed function is from whichever future completed first. Besides, iterating a map() returns results of futures; iterating a as_completed(futures) returns futures themselves.

from concurrent.futures import ThreadPoolExecutor, as_completed

import requests

def fetch(url, timeout):
    r = requests.get(url, timeout=timeout)
    data = r.json()['args']
    return data

with ThreadPoolExecutor(max_workers=10) as executor:
    futures = {}
    for i in range(42):
        url = 'https://httpbin.org/get?i={0}'.format(i)
        future = executor.submit(fetch, url, 60)
        futures[future] = url

    for future in as_completed(futures):
        url = futures[future]
        try:
            data = future.result()
        except Exception as exc:
            print(exc)
        else:
            print('fetch {0}, get {1}'.format(url, data))

ref:
https://docs.python.org/3/library/concurrent.futures.html#future-objects

Read and write files in Go

Reading file line by line

如果要一行一行地讀
建議用 bufio.Scanner
但是 Scanner 有個缺點
就是一行太長(超過 64K)的時候會出現 bufio.Scanner: token too long 的錯誤
這時候還是得用 bufio.Reader

fin, err := os.Open(path)
if err != nil {
    fmt.Println(err)
}
defer fin.Close()

scanner := bufio.NewScanner(fin)
for scanner.Scan() {
    line := scanner.Text()
    fmt.Fprintln(os.Stdin, line)
}

if err := scanner.Err(); err != nil {
    fmt.Fprintln(os.Stderr, err)
}

If you know the maximum length of the tokens you will be reading, copy the bufio.Scanner code into your project and change the const MaxScanTokenSize value.

ref:
http://stackoverflow.com/questions/6141604/go-readline-string
http://stackoverflow.com/questions/1821811/how-to-read-write-from-to-file
http://stackoverflow.com/questions/8757389/reading-file-line-by-line-in-go
http://stackoverflow.com/questions/5884154/golang-read-text-file-into-string-array-and-write
https://github.com/polaris1119/The-Golang-Standard-Library-by-Example/blob/master/chapter01/01.4.md

bufio
https://golang.org/pkg/bufio/
https://golang.org/pkg/bufio/#Scanner

Reading and writing file line by line

fmt.Fprintln(writer, line)bw.WriteString(line) 還要快

func FileSpacing(filename string, w io.Writer) (err error) {
    fr, err := os.Open(filename)
    if err != nil {
        return err
    }
    defer fr.Close()

    br := bufio.NewReader(fr)
    bw := bufio.NewWriter(w)

    for {
        line, err := br.ReadString('\n')
        if err == nil {
            fmt.Fprint(bw, TextSpacing(line))
        } else {
            if err == io.EOF {
                fmt.Fprint(bw, TextSpacing(line))
                break
            }
            return err
        }
    }
    defer bw.Flush()

    return nil
}

Copy a file

fin, _ := os.Open("source.txt")
fout, _ := os.Create("destination.txt")

io.Copy(fout, fin)

defer fout.Close()
defer fin.Close()

ref:
http://stackoverflow.com/questions/23272663/transfer-a-big-file-in-golang
http://golang.org/pkg/io/#Copy

Compute MD5 of a file

func md5Of(filename string) string {
    var result []byte

    file, err := os.Open(filename)
    checkError(err)
    defer file.Close()

    hash := md5.New()
    _, err = io.Copy(hash, file)
    checkError(err)

    checksum := hex.EncodeToString(hash.Sum(result))

    return checksum
}

ref:
http://stackoverflow.com/questions/29505089/how-can-i-compare-two-files-in-golang

Read and save file in Django / Python

File 和 ImageFile 接受 Python 的 file 或 StringIO 物件
而 ContentFile 接受 string

ref:
https://docs.djangoproject.com/en/dev/ref/files/file/#the-file-object

Django Form

image_file = request.FILES['file']

# 方法一
profile.mugshot.save(image_file.name, image_file)

# 方法二
profile.mugshot = image_file

profile.save()

ref:
File Upload with Form in Django

open('/path/to/file.png')

from django.core.files import File

with open('/home/vinta/image.png', 'rb') as f:
    profile.mugshot = File(f)
    profile.save()

Django ContentFile

import os
import uuid

from django.core.files.base import ContentFile

import requests

url = 'http://vinta.ws/static/photo.jpg'
r = requests.get(url)
file_url, file_ext = os.path.splitext(r.url)
file_name = '%s%s' % (str(uuid.uuid4()).replace('-', ''), file_ext)

profile.mugshot.save('123.png', ContentFile(r.content), save=False)

# 如果 profile.mugshot 是 ImageField 欄位的話
# 可以用以下的方式來判斷它是不是合法的圖檔
try:
    profile.mugshot.width
except TypeError:
    raise RuntimeError('圖檔格式不正確')

profile.save()

Data URI, Base64

from binascii import a2b_base64

from django.core.files.base import ContentFile

data_uri = 'data:image/jpeg;base64,/9j/4AAQSkZJRg....'
head, data = data_uri.split(',')
binary_data = a2b_base64(data)

# 方法一
profile.mugshot.save('whatever.jpg', ContentFile(binary_data), save=False)
profile.save()

# 不能用這種方式,因為少了 file name
profile.mugshot = ContentFile(binary_data)
profile.save()

# 方法二
f = open('image.png', 'wb')
f.write(binary_data)
f.close()

# 方法三
from StringIO import StringIO
from PIL import Image
img = Image.open(StringIO(binary_data))
print img.size

ref:
http://stackoverflow.com/questions/19395649/python-pil-create-and-save-image-from-data-uri

StringIO, PIL image

你就把 StringIO 想成是 open('/home/vinta/some_file.txt', 'rb') 的 file 物件

from StringIO import StringIO

from PIL import Image
import requests

r = requests.get('http://vinta.ws/static/photo.jpg')
img = Image.open(StringIO(r.content))
print pil_image.size

StringIO, PIL image, Django

from StringIO import StringIO

from django.core.files.base import ContentFile

from PIL import Image

raw_img_io = StringIO(binary_data)
img = Image.open(raw_img_io)
img = img.resize((524, 328), Image.ANTIALIAS)
img_io = StringIO()
img.save(img_io, 'PNG', quality=100)

profile.image.save('whatever.png', ContentFile(img_io.getvalue()), save=False)
profile.save()

ref:
http://stackoverflow.com/questions/3723220/how-do-you-convert-a-pil-image-to-a-django-file

Download file from URL, tempfile

import os
import tempfile
import requests
import xlrd

try:
    file_path = report.file.path
    temp = None
except NotImplementedError:
    url = report.file.url
    r = requests.get(url, stream=True)
    file_url, file_ext = os.path.splitext(r.url)

    # delete=True 會在 temp.close() 之後自己刪掉
    temp = tempfile.NamedTemporaryFile(prefix='report_file_', suffix=file_ext, dir='/tmp', delete=False)
    file_path = temp.name

    with open(file_path, 'wb') as f:
        for chunk in r.iter_content(chunk_size=1024):
            if chunk:
                f.write(chunk)
                f.flush()

wb = xlrd.open_workbook(file_path)

...

# 因為是 tempfile.NamedTemporaryFile(delete=False)
# 所以你要自己刪掉
try:
    os.remove(temp.name)
except AttributeError:
    pass

ref:
http://stackoverflow.com/questions/16694907/how-to-download-large-file-in-python-with-requests-py
http://pymotw.com/2/tempfile/