Python decorators

Python decorators

Python 裡的所有東西都是 object
function 也是
所以你可以對 function 做任何跟 object 一樣的事
例如把一個 function 當成參數丟給另一個 function
當然也可以 decorate class

不帶參數的 decorator

第一層 def 接收 func
第二層 def 接收 func 的 *args, **kwargs

通常意義下的 decorator 是把 func(就是 something_1、something_2)丟給 decorator function
做一些額外的動作
然後回傳該 func 原本的 return
並不操作 func 本身

如果要操作 func 本身
例如幫 func 增加一個 attribute
請參考下下面的例子

def func_wrapper(func):
    def arg_wrapper(*args, **kwargs):
        print func.__name__ + ' was called'
        print args
        print kwargs

        return func(*args, **kwargs)

    return arg_wrapper

@func_wrapper
def something_1(name='[name]'):
    print 'something_1: ' + name

@func_wrapper
def something_2(name='[name]'):
    print 'something_2: ' + name

something_1('Tom')
something_2('Cathy')
something_1()
something_2()

ref:
http://www.dotblogs.com.tw/rickyteng/archive/2013/11/06/126852.aspx

帶參數的 decorator

需要傳參數給 decorator 時
要多 wrap 一次
其實就是在原本的 decorator function 的外面再多加一個 def 來接收參數

那個 model_class 就是傳給 decorator 的參數

# 跟不帶參數的 decorator 相比多了一層,用來接收 decorator 的參數
def can_access_item_required(model_class):
    def func_wrapper(func):
        def arg_wrapper(*args, **kwargs):
            request = args[0]
            item = model_class.objects.get(pk=kwargs['pk'])

            if not model_class.objects.can_access_item_by(item, request.user):
                raise PermissionDenied()

            return func(*args, **kwargs)

        return arg_wrapper

    return func_wrapper

@can_access_item_required(Label)
def label_update(request, pk):
    pass

# equals to

label_update = can_access_item_required(Artist)(label_update)

ref:
https://www.python.org/dev/peps/pep-0318/#current-syntax

decorator 修改 func 本身

不需要封裝什麼
單純地把 func 丟給另一個 function
然後再 return 那個 func 即可

in views.py

def intro_middleware_decorator(func):
    func.enable_intro_middleware = False
    return func

@intro_middleware_decorator
def intro_1(request):
    return render(request, 'dps/intro/1.html')

in middleware.py

class IntroMiddleware(object):
    """
    如果 user 沒有完成 intro 步驟,則轉址到 /intro/1/
    """
    def __init__(self):
        self.enable = True
    def process_view(self, request, view_func, view_args, view_kwargs):
        """
        要在 intro 系列的 view 加上 disable_intro_middleware_decorator
        避免無限迴圈
        """
        self.enable = getattr(view_func, 'enable_intro_middleware', True)
    def process_response(self, request, response):
        if self.enable:
            resolver = request.resolver_match
            namespace = getattr(resolver, 'namespace', None)
            # 避免 admin, login 之類的 view 也都被 redirect
            if namespace == 'dps':
                if request.user.is_authenticated():
                    profile = request.user.profile
                    if (profile.identity == ProfileIdentity.PHANTOM) and (not profile.is_ready):
                        return redirect('dps:intro-one')
                else:
                    return redirect('auth:login')
        return response

decorator in a Class

class SVAPIListView(object):

    @staticmethod
    def filter_last_modified_decorator(func):
        """
        用來讓 client 同步資料
        ?last_modified=2015-02-04T15:16:22&is_deleted=true
        可以拿到某個時間點之後,新增、修改、刪除的項目
        必須明確地指定 is_deleted=true 才會包含被刪除的項目
        """

        def _add_filters_for_syncing(self, *args, **kwargs):
            queryset = func(self, *args, **kwargs)

            last_modified = self.get_last_modified()
            if last_modified:
                queryset = queryset.filter(last_modified__gte=last_modified)

                is_deleted = self.get_is_deleted()
                target_user = self.get_target_user()
                if (self.request.user == target_user) and (is_deleted):
                    # 只有用戶本人才能拿到被刪除的 items
                    pass
                else:
                    queryset = queryset.filter(enable=True)
            else:
                queryset = queryset.filter(enable=True)

            return queryset

        return _add_filters_for_syncing

class UserAlbumList(SVAPIListView):
    serializer_class = api_serializers.AlbumListSerializer

    @UserIDAsUsernameMixin.filter_last_modified_decorator
    def get_queryset(self, profile_user):
        queryset = MusicAlbum.objects.filter(user=profile_user)
        queryset = queryset.select_related('user', 'user__profile')

        ordering = self.request.GET.get('ordering')
        if ordering == 'like_count':
            queryset = queryset.order_by('-like_count')
        else:
            queryset = queryset.order_by('-id')

        return queryset

    def get(self, request, user_id, *args, **kwargs):
        profile_user = self.get_target_user()
        queryset = self.get_queryset(profile_user=profile_user)
        data = self.get_serializer_data(queryset)

        return Response(data)

ref:
http://stackoverflow.com/questions/1263451/python-decorators-in-classes

多個 decorator 時的執行順序

如果有多個 decorator 裝飾同一個 function / class
執行的順序是由下往上的
會先執行 @decorator_1

@decorator_3
@decorator_2
@decorator_1
def function():
    pass

Class-based decorator

class RecorderDecorator(object):

    __slots__ = ['recorder', 'msg_template', 'subject', 'action']

    recorder = Recorder()

    def __init__(self, msg_template, subject=None, action=None):
        self.msg_template = msg_template
        self.subject = subject
        self.action = action

class model_recorder(RecorderDecorator):
    """
    用於 model 的 method
    接受一個 string template,會自動使用 method parameters 作為 template context
    會使用 method name 作為 action,model name 作為 subject
    會在裝飾的 method 執行完之後執行這個 decorator

    用法可以參考 Song.play()

    @recorder_decorators.model_recorder('user: {user.id}, ip: {from_ip}, full: {is_full}, embed: {is_embed}')
    def play(self, user, from_ip, is_full=False, is_embed=False, from_playlist=None):
        pass

    則 log message 會是 [song:play] user: 123, ip: 59.120.12.57, full: True, embed: False
    """

    def __call__(self, func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            func_result = func(*args, **kwargs)
            func_kwargs = inspect.getcallargs(func, *args, **kwargs)

            model_instance = func_kwargs.get('self')

            if getattr(model_instance, 'id'):
                new_msg_template = ', '.join(('item: {self.id}', self.msg_template))
            else:
                new_msg_template = self.msg_template

            if not self.subject:
                model_name = type_utils.item_type(model_instance)
                new_subject = model_name
            else:
                new_subject = self.subject

            if not self.action:
                new_action = func.__name__
            else:
                new_action = self.action

            msg = new_msg_template.format(**func_kwargs)
            self.recorder.write(new_subject, new_action, msg)

            return func_result

        return wrapper

class Song(models.Model):

    @model_recorder('user: {user.id}, ip: {from_ip}, full: {is_full}, embed: {is_embed}')
    def play(self, user, from_ip, is_full=False, is_embed=False, from_playlist=None):
        pass

ref:
https://stackoverflow.com/questions/9416947/python-class-based-decorator-with-parameters-that-can-decorate-a-method-or-a-fun

Database Routers in Django

Database Routers in Django

把部分 models / tables 獨立到一台資料庫

Database Router

沒辦法在 Model class 裡指定這個 model 只能用某個 database
而是要用 database router
就是判斷 model._meta.app_label == 'xxx' 的時候指定使用某一個 database
database 是指定義在 settings.DATABASES 的那些

不過 django 不支援跨 database 的 model relation
你不能用 foreign key 或 m2m 指向另一個 database 裡的 model
但是其實你直接用 user_id, song_id 之類的 int 欄位來記錄就好了

in settings.py

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': 'default',
        'USER': 'whatever',
        'PASSWORD': 'whatever',
        'HOST': '',
        'PORT': '',
    },
    'warehouse': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': 'warehouse',
        'USER': 'whatever',
        'PASSWORD': 'whatever',
        'HOST': '',
        'PORT': '',
    },
}

DATABASE_ROUTERS = [
    'app.db_routers.SVDatabaseRouter',
]

in app/db_routers.py

from oauth2_provider.models import AccessToken as OauthAccessToken
from oauth2_provider.oauth2_backends import get_oauthlib_core
from rest_framework2.authentication import BaseAuthentication

class SVOauthAuthentication(BaseAuthentication):
    www_authenticate_realm = 'api'

    def authenticate(self, request):
        oauthlib_core = get_oauthlib_core()
        valid, result = oauthlib_core.verify_request(request, scopes=[])

        if valid:
            return (result.user, result.access_token)
        else:
            access_token = request.GET.get('access_token', None)
            if access_token:
                try:
                    access_token_obj = OauthAccessToken.objects.get(token=access_token)
                except OauthAccessToken.DoesNotExist:
                    pass
                else:
                    return (access_token_obj.user, access_token_obj)

        return None

    def authenticate_header(self, request):
        return 'Bearer realm="{0}"'.format(self.www_authenticate_realm)

ref:
https://docs.djangoproject.com/en/dev/topics/db/multi-db/

Models

把所有需要放到另一台 db 的 models 都放在同一個 app 下
方便管理

in warehouse/models.py

class PlayRecord(SVWarehouseDBMixin, models.Model):
    song_id = models.IntegerField()
    user_id = models.IntegerField(null=True, blank=True)
    is_full = models.BooleanField(default=False)
    ip_address = models.IPAddressField()
    location = models.CharField(max_length=2)
    created_at = models.DateTimeField()

繼承 SVWarehouseDBMixin 這個 minxin 的 class 會被放到 warehouse 這台資料庫!
所以 migrate 的時候要注意,記得在 migration 檔案裡加上:

from south.db import dbs
warehouse_db = dbs['warehouse']

這樣 south 才會在 warehouse 這台資料庫上建立 table
不然就是你自己手動去 CREATE TABLE

Migration

如果你要把舊有的 app 的 models 搬到另一台資料庫
但是 models 不動(還是放在本來的 app 底下)
你可能會需要 reset 整個 migration 紀錄
從頭開始建立一個新的 migration
因為 schema 會錯亂
所以還是建議新開一個 app 來放那些要搬到另一台資料庫的 models
這樣 database router 和 migration 都會比較單純

in app/migrations/0001_initial.py

$ pip install south==1.0.2

# syncdb 默認只會作用到 default 資料庫,你要明確指定要用哪個 database 才行
$ ./manage.py syncdb --noinput
$ ./manage.py syncdb --noinput --database=warehouse

# migrate 卻可以作用到其他資料庫
# 因為 migrate 哪個資料庫是 migration file 裡的 `db` 參數在決定的
$ ./manage.py migrate

ref:
http://stackoverflow.com/questions/7029228/is-using-multiple-databases-and-south-together-possible

Unit Tests

Django only flushes the default database at the start of each test run. If your setup contains multiple databases, and you have a test that requires every database to be clean, you can use the multi_db attribute on the test suite to request a full flush.

在使用 --keepdb 的情況下,如果你的測試執行到一半就因為錯誤而中斷了,可能會發生資料庫裡有資料還沒被 flush 的問題,導致下次執行測試時失敗。不過如果你沒有用 --keepdb 的話,因為每次都會重建資料庫,所以不會有這個問題。

from django.test import TestCase

class YourBTestCase(TestCase):
    multi_db = True

    def setUp(self):
        do_shit()

ref:
https://docs.djangoproject.com/en/dev/topics/testing/tools/
https://docs.djangoproject.com/en/dev/topics/testing/advanced/#topics-testing-advanced-multidb

elasticsearch-dsl-py: The Official Elasticsearch ORM in Python

elasticsearch-dsl-py: The Official Elasticsearch ORM in Python

Query DSL 是 Elasticsearch 的查詢用 Domain-specific Language (DSL),實際上就是一堆 JSON。elasticsearch-dsl 是官方發佈的一套用來操作 Query DSL 的 Python package,可以當成是 Elasticsearch 的 ORM。

希望之後可以直接支援用 SQL 來查詢,不然 Query DSL 真的有夠難寫。

ref:
https://github.com/elastic/elasticsearch-dsl-py

Installation

$ pip install elasticsearch-dsl>=5.0.0,<6.0.0

ref:
https://elasticsearch-dsl.readthedocs.org/en/latest/index.html

Schema

in app/mappings.py

from elasticsearch_dsl import DocType, String, Boolean
from elasticsearch_dsl.connections import connections
connections.create_connection(hosts=['127.0.0.1', ])

class AlbumDoc(DocType):
    upc = String(index='not_analyzed')
    title = String(analyzer='ik', fields={'raw': String(index='not_analyzed')})
    artist = String(analyzer='ik')
    is_ready = Boolean()

    class Meta:
        index = 'dps'
        doc_type = 'album'

    @classmethod
    def sync(cls, album):
        album_doc = AlbumDoc(meta={'id': album.id})
        album_doc.upc = album.get_upcs(output_str=False)
        album_doc.title = album.name
        album_doc.artist = album.artist.name
        album_doc.is_ready = album.is_ready
        album_doc.save()

    def save(self, *args, **kwargs):
        return super(AlbumDoc, self).save(*args, **kwargs)

    def get_model_obj(self):
        from svapps.dps.models import Album
        return Album.objects.get(id=self.meta.id)

# to create mappings
AlbumDoc.init()

一定要執行一次 YourDocType.init(),這樣 Elasticsearch 才會根據你的 DocType 產生對應的 mapping。否則 Elasticsearch 就會在你第一次倒資料進去的時候根據你的資料的 data type 建立對應的 mapping,所以 analyzer 之類的設定就會是預設的 standard,你可以透過 _mapping API 來檢查。

需要全文搜尋的欄位要設為 analyzed(string 欄位默認都是 analyzed),不需要全文搜尋的欄位,也就是要求精確的欄位,例如:usernameemailzip code,就可以設成 not_analyzed,但是你就不能對 analyzed 的欄位使用 term 了,除非你對該欄位額外再建立一個 raw 欄位。

ref:
https://elasticsearch-dsl.readthedocs.org/en/latest/persistence.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-term-query.html#CO59-2

Store Data

album_doc = AlbumDoc(meta={'id': 42})
album_doc.upc = ['887375000619', '887375502069']
album_doc.title = 'abc'
album_doc.artist = 'xyz'
album_doc.is_ready = True
album_doc.save()

# 可以如常地 query,不用管它是不是 list
search = AlbumDoc.search().filter('term', upc='887375000619')
response = search.execute()

因為 Elasticsearch 是 schemaless,所以即使你定義了 String 欄位,還是可以存一個 list 進去。

Search Data

  • must:必須符合所有條件
  • should:符合其中一個條件即可
search = TrackDoc.search() \
    .filter('term', is_ready=True) \
    .query('match', title=u'沒有的啊')

search = TrackDoc.search() \
    .filter('term', is_ready=True) \
    .query(
        Q('match', title='沒有的啊') & \
        Q('match', artist='那我懂你意思了') & \
        Q('match', album='沒有的, 啊!?')
    )

q = Q(
    'bool',
    must=[
        Q('match', title={'query': track_name, 'fuzziness': 'AUTO'}),
    ],
    should=[
        Q('match', album={'query': album_name, 'minimum_should_match': '60%'}),
        Q('match', artist={'query': artist_name, 'minimum_should_match': '80%'}),
    ],
    minimum_should_match=1
)
search = TrackDoc.search().filter('term', is_ready=True).query(q)

q = Q(
    'bool',
    should=[
        Q('term', isrc=q),
        Q('term', upc=q),
        Q('match', **{'title.raw': {'query': q}}),
        Q('multi_match', query=q, fields=['title', 'artist', 'album']),
    ],
)
search = Search(index='dps', doc_type=['track', 'album']).query(q)
search = search[:20]

# print the raw Query DSL
import uniout
from pprint import pprint
pprint(search.to_dict())

response = search.execute()

print(response.hits.total)
print(response[0].title)
print(response[0].artist)
print(response[0].album)
print(response[0].is_ready)

ref:
https://elasticsearch-dsl.readthedocs.org/en/latest/search_dsl.html

Elasticsearch: More than a Search Engine

Elasticsearch is a schemaless, document-oriented search engine, has a bunch of powerful quering APIs. It's also a great NoSQL database.

ref:
https://www.elastic.co/products/elasticsearch

Glossary

以下的定義以 Elasticsearch 5.6 為準,可能跟舊版的定義不同。在新版的 Elasticsearch 中,每個 index 只能有一個 mapping type,之前的版本則可以有多個。

  • cluster:一個 cluster 包含一個或多個 nodes,會自動選出一個 master node
  • node:一台跑著 Elasticsearch 的機器就是一個 node
  • index:類似關聯式資料庫裡的 table
  • mapping:類似關聯式資料庫裡的 table schema
  • field:類似關聯式資料庫裡的 column
  • document:類似關聯式資料庫裡的 row
  • text:任意的非結構化文字,text 會被 analyze 變成 term,然後才能被搜尋
  • term:實際上存在 Elasticsearch 裡的東西
  • analysis: 把 text 變成 term 的過程,例如 normalize、tokenize 和 stopword remove

ref:
https://www.elastic.co/guide/en/elasticsearch/reference/current/glossary.html

Mapping

Mapping 可以顯式地指定,但是如果沒有指定,Elasticsearch 就會在第一次有資料進去的時候,自動根據資料建立對應的 mapping,所以一些欄位的屬性(例如 analyzer)可能不會符合你的預期,所以最好還是手動指定 mapping。

常用的 field data types:

  • text:表示 string 類型,用於 full-text search
  • keyword:表示 string 類型,用於 exact value 的 filter、sort 或 aggregate(就是舊版的 not_analyzed

在 Elasticsearch 中,同一個欄位可以被 index 成不同的 data types,例如 location 欄位,可以透過 fields 屬性,同時 index 成 textkeyword,用來全文搜索和 exact value 過濾。也可以分別指定不同的 analyzer。

ref:
https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html

Analysis

一個 analyzer 由三個部分組成:

  • Character filters
  • Tokenizers
  • Token filters

你可以自己組合出你的 analyzer,以 elasticsearch-dsl-py 為例:

from elasticsearch_dsl import DocType, Date, Integer, Keyword, Text, Boolean
from elasticsearch_dsl import analyzer, tokenizer

text_analyzer = analyzer('text_analyzer',
    char_filter=["html_strip"],
    tokenizer="standard",
    filter=["asciifolding", "lowercase", "snowball", "stop"]
)

cjk_analyzer = analyzer('text_analyzer',
    char_filter=["html_strip"],
    tokenizer=tokenizer('trigram', 'nGram', min_gram=2, max_gram=3),
    filter=["asciifolding", "lowercase", "snowball", "stop"]
)

ref:
http://elasticsearch-dsl.readthedocs.io/en/latest/persistence.html

Testing Analyzer

測試某段文字在某個 analyzer 下的效果:

POST http://127.0.0.1:9200/_analyze
{
  "tokenizer": "standard",
  "filter": ["lowercase", "asciifolding"],
  "text": "Is this déja vu?"
}

ref:
https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis-analyzers.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis-tokenizers.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis-tokenfilters.html

Chinese Words Segmentation

ik 之類的分詞 plugin 的效果都不是很好,內建的 cjk 加上 NGram 可能會是比較好的選擇(可以用 multi-field index)。另外一個作法是,把資料餵進去 Elasticsearch 之前就先分好詞,可以用 Jieba,分詞完的文本以空格分隔,然後用 Elasticsearch 的 whitespace tokenizer。

中文搜尋經驗分享
https://blog.liang2.tw/2015Talk-Chinese-Search/

ref:
https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis-lang-analyzer.html#cjk-analyzer
https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis-ngram-tokenizer.html
https://www.elastic.co/guide/en/elasticsearch/plugins/current/analysis-smartcn.html << 不推薦
https://github.com/medcl/elasticsearch-analysis-ik << 堪用

RESTful APIs

Show useful information for humans
http://127.0.0.1:9200/_cat
https://www.elastic.co/guide/en/elasticsearch/reference/current/cat.html

List all indices and aliases
http://127.0.0.1:9200/_aliases

List mappings under am index
http://127.0.0.1:9200/repo/_mapping

List documents under an index
http://127.0.0.1:9200/repo/_search

Query DSL

query 就是你要搜索的主體
filter 則是這個搜索的前置條件
https://www.elastic.co/guide/en/elasticsearch/guide/master/search-in-depth.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-queries.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-filters.html

要做 exact value 的 query
請用 term
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-term-query.html

要做 full text 的 query
請用 match
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-match-query.html

要一次 query 多個欄位
請用 multi_match
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-multi-match-query.html

要用 AND (must), OR (should), NOT (must_not) 的條件搜索
請用 bool
https://www.elastic.co/guide/en/elasticsearch/guide/master/bool-query.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-bool-query.html

要結合 filter 和 query
請用 filtered
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-filtered-query.html

More Like This query
除了可以輸入文字之外,還可以直接指定 document id 找出相似的結果
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-mlt-query.html

對特定欄位加權
https://www.elastic.co/guide/en/elasticsearch/guide/current/query-time-boosting.html
https://www.elastic.co/guide/en/elasticsearch/guide/current/_boosting_query_clauses.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-boosting-query.html

Multi-index, Multi-type

除了可以搜尋單一 type,也可以跨 index、跨 type

  • /_search: Search all types in all indices
  • /gb/_search: Search all types in the gb index
  • /gb,us/_search: Search all types in the gb and us indices
  • /g*,u*/_search: Search all types in any indices beginning with g or beginning with u
  • /gb/user/_search: Search type user in the gb index
  • /gb,us/user,tweet/_search: Search types user and tweet in the gb and us indices
  • /_all/user,tweet/_search: Search types user and tweet in all indices

ref:
https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html#search-multi-index-type

Read and Write Files in Django and Python

Read and Write Files in Django and Python

File 和 ImageFile 接受 Python 的 file 或 StringIO 物件
而 ContentFile 接受 string

ref:
https://docs.djangoproject.com/en/dev/ref/files/file/#the-file-object

Django Form

image_file = request.FILES['file']

# 方法一
profile.mugshot.save(image_file.name, image_file)

# 方法二
profile.mugshot = image_file

profile.save()

open('/path/to/file.png')

from django.core.files import File

with open('/home/vinta/image.png', 'rb') as f:
    profile.mugshot = File(f)
    profile.save()

Django ContentFile

import os
import uuid

from django.core.files.base import ContentFile

import requests

url = 'http://vinta.ws/static/photo.jpg'
r = requests.get(url)
file_url, file_ext = os.path.splitext(r.url)
file_name = '%s%s' % (str(uuid.uuid4()).replace('-', ''), file_ext)

profile.mugshot.save('123.png', ContentFile(r.content), save=False)

# 如果 profile.mugshot 是 ImageField 欄位的話
# 可以用以下的方式來判斷它是不是合法的圖檔
try:
    profile.mugshot.width
except TypeError:
    raise RuntimeError('圖檔格式不正確')

profile.save()

Data URI, Base64

from binascii import a2b_base64

from django.core.files.base import ContentFile

data_uri = 'data:image/jpeg;base64,/9j/4AAQSkZJRg....'
head, data = data_uri.split(',')
binary_data = a2b_base64(data)

# 方法一
profile.mugshot.save('whatever.jpg', ContentFile(binary_data), save=False)
profile.save()

# 不能用這種方式,因為少了 file name
profile.mugshot = ContentFile(binary_data)
profile.save()

# 方法二
f = open('image.png', 'wb')
f.write(binary_data)
f.close()

# 方法三
from StringIO import StringIO
from PIL import Image
img = Image.open(StringIO(binary_data))
print img.size

ref:
https://stackoverflow.com/questions/19395649/python-pil-create-and-save-image-from-data-uri

StringIO, PIL image

你就把 StringIO 想成是 open('/home/vinta/some_file.txt', 'rb') 的 file 物件

from StringIO import StringIO

from PIL import Image
import requests

r = requests.get('http://vinta.ws/static/photo.jpg')
img = Image.open(StringIO(r.content))
print pil_image.size

StringIO, PIL image, Django

from StringIO import StringIO

from django.core.files.base import ContentFile

from PIL import Image

img = Image.open(instance.file)
# or
raw_img_io = StringIO(binary_data)
img = Image.open(raw_img_io)
img = img.resize((524, 328), Image.ANTIALIAS)
img_io = StringIO()
img.save(img_io, 'PNG', quality=100)

profile.image.save('whatever.png', ContentFile(img_io.getvalue()), save=False)
profile.save()

ref:
https://stackoverflow.com/questions/3723220/how-do-you-convert-a-pil-image-to-a-django-file

Download file from URL, tempfile

import os
import tempfile
import requests
import xlrd

try:
    file_path = report.file.path
    temp = None
except NotImplementedError:
    url = report.file.url
    r = requests.get(url, stream=True)
    file_url, file_ext = os.path.splitext(r.url)

    # delete=True 會在 temp.close() 之後自己刪掉
    temp = tempfile.NamedTemporaryFile(prefix='report_file_', suffix=file_ext, dir='/tmp', delete=False)
    file_path = temp.name

    with open(file_path, 'wb') as f:
        for chunk in r.iter_content(chunk_size=1024):
            if chunk:
                f.write(chunk)
                f.flush()

wb = xlrd.open_workbook(file_path)

# 因為是 tempfile.NamedTemporaryFile(delete=False)
# 所以你要自己刪掉
try:
    os.remove(temp.name)
except AttributeError:
    pass

ref:
https://stackoverflow.com/questions/16694907/download-large-file-in-python-with-requests