Python decorators

Python decorators

Python 裡的所有東西都是 object
function 也是
所以你可以對 function 做任何跟 object 一樣的事
例如把一個 function 當成參數丟給另一個 function
當然也可以 decorate class

不帶參數的 decorator

第一層 def 接收 func
第二層 def 接收 func 的 *args, **kwargs

通常意義下的 decorator 是把 func(就是 something_1、something_2)丟給 decorator function
做一些額外的動作
然後回傳該 func 原本的 return
並不操作 func 本身

如果要操作 func 本身
例如幫 func 增加一個 attribute
請參考下下面的例子

def func_wrapper(func):
    def arg_wrapper(*args, **kwargs):
        print func.__name__ + ' was called'
        print args
        print kwargs

        return func(*args, **kwargs)

    return arg_wrapper

@func_wrapper
def something_1(name='[name]'):
    print 'something_1: ' + name

@func_wrapper
def something_2(name='[name]'):
    print 'something_2: ' + name

something_1('Tom')
something_2('Cathy')
something_1()
something_2()

ref:
http://www.dotblogs.com.tw/rickyteng/archive/2013/11/06/126852.aspx

帶參數的 decorator

需要傳參數給 decorator 時
要多 wrap 一次
其實就是在原本的 decorator function 的外面再多加一個 def 來接收參數

那個 model_class 就是傳給 decorator 的參數

# 跟不帶參數的 decorator 相比多了一層,用來接收 decorator 的參數
def can_access_item_required(model_class):
    def func_wrapper(func):
        def arg_wrapper(*args, **kwargs):
            request = args[0]
            item = model_class.objects.get(pk=kwargs['pk'])

            if not model_class.objects.can_access_item_by(item, request.user):
                raise PermissionDenied()

            return func(*args, **kwargs)

        return arg_wrapper

    return func_wrapper

@can_access_item_required(Label)
def label_update(request, pk):
    pass

# equals to

label_update = can_access_item_required(Artist)(label_update)

ref:
https://www.python.org/dev/peps/pep-0318/#current-syntax

decorator 修改 func 本身

不需要封裝什麼
單純地把 func 丟給另一個 function
然後再 return 那個 func 即可

in views.py

def intro_middleware_decorator(func):
    func.enable_intro_middleware = False
    return func

@intro_middleware_decorator
def intro_1(request):
    return render(request, 'dps/intro/1.html')

in middleware.py

class IntroMiddleware(object):
    """
    如果 user 沒有完成 intro 步驟,則轉址到 /intro/1/
    """
    def __init__(self):
        self.enable = True
    def process_view(self, request, view_func, view_args, view_kwargs):
        """
        要在 intro 系列的 view 加上 disable_intro_middleware_decorator
        避免無限迴圈
        """
        self.enable = getattr(view_func, 'enable_intro_middleware', True)
    def process_response(self, request, response):
        if self.enable:
            resolver = request.resolver_match
            namespace = getattr(resolver, 'namespace', None)
            # 避免 admin, login 之類的 view 也都被 redirect
            if namespace == 'dps':
                if request.user.is_authenticated():
                    profile = request.user.profile
                    if (profile.identity == ProfileIdentity.PHANTOM) and (not profile.is_ready):
                        return redirect('dps:intro-one')
                else:
                    return redirect('auth:login')
        return response

decorator in a Class

class SVAPIListView(object):

    @staticmethod
    def filter_last_modified_decorator(func):
        """
        用來讓 client 同步資料
        ?last_modified=2015-02-04T15:16:22&is_deleted=true
        可以拿到某個時間點之後,新增、修改、刪除的項目
        必須明確地指定 is_deleted=true 才會包含被刪除的項目
        """

        def _add_filters_for_syncing(self, *args, **kwargs):
            queryset = func(self, *args, **kwargs)

            last_modified = self.get_last_modified()
            if last_modified:
                queryset = queryset.filter(last_modified__gte=last_modified)

                is_deleted = self.get_is_deleted()
                target_user = self.get_target_user()
                if (self.request.user == target_user) and (is_deleted):
                    # 只有用戶本人才能拿到被刪除的 items
                    pass
                else:
                    queryset = queryset.filter(enable=True)
            else:
                queryset = queryset.filter(enable=True)

            return queryset

        return _add_filters_for_syncing

class UserAlbumList(SVAPIListView):
    serializer_class = api_serializers.AlbumListSerializer

    @UserIDAsUsernameMixin.filter_last_modified_decorator
    def get_queryset(self, profile_user):
        queryset = MusicAlbum.objects.filter(user=profile_user)
        queryset = queryset.select_related('user', 'user__profile')

        ordering = self.request.GET.get('ordering')
        if ordering == 'like_count':
            queryset = queryset.order_by('-like_count')
        else:
            queryset = queryset.order_by('-id')

        return queryset

    def get(self, request, user_id, *args, **kwargs):
        profile_user = self.get_target_user()
        queryset = self.get_queryset(profile_user=profile_user)
        data = self.get_serializer_data(queryset)

        return Response(data)

ref:
http://stackoverflow.com/questions/1263451/python-decorators-in-classes

多個 decorator 時的執行順序

如果有多個 decorator 裝飾同一個 function / class
執行的順序是由下往上的
會先執行 @decorator_1

@decorator_3
@decorator_2
@decorator_1
def function():
    pass

Class-based decorator

class RecorderDecorator(object):

    __slots__ = ['recorder', 'msg_template', 'subject', 'action']

    recorder = Recorder()

    def __init__(self, msg_template, subject=None, action=None):
        self.msg_template = msg_template
        self.subject = subject
        self.action = action

class model_recorder(RecorderDecorator):
    """
    用於 model 的 method
    接受一個 string template,會自動使用 method parameters 作為 template context
    會使用 method name 作為 action,model name 作為 subject
    會在裝飾的 method 執行完之後執行這個 decorator

    用法可以參考 Song.play()

    @recorder_decorators.model_recorder('user: {user.id}, ip: {from_ip}, full: {is_full}, embed: {is_embed}')
    def play(self, user, from_ip, is_full=False, is_embed=False, from_playlist=None):
        pass

    則 log message 會是 [song:play] user: 123, ip: 59.120.12.57, full: True, embed: False
    """

    def __call__(self, func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            func_result = func(*args, **kwargs)
            func_kwargs = inspect.getcallargs(func, *args, **kwargs)

            model_instance = func_kwargs.get('self')

            if getattr(model_instance, 'id'):
                new_msg_template = ', '.join(('item: {self.id}', self.msg_template))
            else:
                new_msg_template = self.msg_template

            if not self.subject:
                model_name = type_utils.item_type(model_instance)
                new_subject = model_name
            else:
                new_subject = self.subject

            if not self.action:
                new_action = func.__name__
            else:
                new_action = self.action

            msg = new_msg_template.format(**func_kwargs)
            self.recorder.write(new_subject, new_action, msg)

            return func_result

        return wrapper

class Song(models.Model):

    @model_recorder('user: {user.id}, ip: {from_ip}, full: {is_full}, embed: {is_embed}')
    def play(self, user, from_ip, is_full=False, is_embed=False, from_playlist=None):
        pass

ref:
https://stackoverflow.com/questions/9416947/python-class-based-decorator-with-parameters-that-can-decorate-a-method-or-a-fun

Database Routers in Django

Database Routers in Django

把部分 models / tables 獨立到一台資料庫

Database Router

沒辦法在 Model class 裡指定這個 model 只能用某個 database
而是要用 database router
就是判斷 model._meta.app_label == 'xxx' 的時候指定使用某一個 database
database 是指定義在 settings.DATABASES 的那些

不過 django 不支援跨 database 的 model relation
你不能用 foreign key 或 m2m 指向另一個 database 裡的 model
但是其實你直接用 user_id, song_id 之類的 int 欄位來記錄就好了

in settings.py

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': 'default',
        'USER': 'whatever',
        'PASSWORD': 'whatever',
        'HOST': '',
        'PORT': '',
    },
    'warehouse': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': 'warehouse',
        'USER': 'whatever',
        'PASSWORD': 'whatever',
        'HOST': '',
        'PORT': '',
    },
}

DATABASE_ROUTERS = [
    'app.db_routers.SVDatabaseRouter',
]

in app/db_routers.py

from oauth2_provider.models import AccessToken as OauthAccessToken
from oauth2_provider.oauth2_backends import get_oauthlib_core
from rest_framework2.authentication import BaseAuthentication

class SVOauthAuthentication(BaseAuthentication):
    www_authenticate_realm = 'api'

    def authenticate(self, request):
        oauthlib_core = get_oauthlib_core()
        valid, result = oauthlib_core.verify_request(request, scopes=[])

        if valid:
            return (result.user, result.access_token)
        else:
            access_token = request.GET.get('access_token', None)
            if access_token:
                try:
                    access_token_obj = OauthAccessToken.objects.get(token=access_token)
                except OauthAccessToken.DoesNotExist:
                    pass
                else:
                    return (access_token_obj.user, access_token_obj)

        return None

    def authenticate_header(self, request):
        return 'Bearer realm="{0}"'.format(self.www_authenticate_realm)

ref:
https://docs.djangoproject.com/en/dev/topics/db/multi-db/

Models

把所有需要放到另一台 db 的 models 都放在同一個 app 下
方便管理

in warehouse/models.py

class PlayRecord(SVWarehouseDBMixin, models.Model):
    song_id = models.IntegerField()
    user_id = models.IntegerField(null=True, blank=True)
    is_full = models.BooleanField(default=False)
    ip_address = models.IPAddressField()
    location = models.CharField(max_length=2)
    created_at = models.DateTimeField()

繼承 SVWarehouseDBMixin 這個 minxin 的 class 會被放到 warehouse 這台資料庫!
所以 migrate 的時候要注意,記得在 migration 檔案裡加上:

from south.db import dbs
warehouse_db = dbs['warehouse']

這樣 south 才會在 warehouse 這台資料庫上建立 table
不然就是你自己手動去 CREATE TABLE

Migration

如果你要把舊有的 app 的 models 搬到另一台資料庫
但是 models 不動(還是放在本來的 app 底下)
你可能會需要 reset 整個 migration 紀錄
從頭開始建立一個新的 migration
因為 schema 會錯亂
所以還是建議新開一個 app 來放那些要搬到另一台資料庫的 models
這樣 database router 和 migration 都會比較單純

in app/migrations/0001_initial.py

$ pip install south==1.0.2

# syncdb 默認只會作用到 default 資料庫,你要明確指定要用哪個 database 才行
$ ./manage.py syncdb --noinput
$ ./manage.py syncdb --noinput --database=warehouse

# migrate 卻可以作用到其他資料庫
# 因為 migrate 哪個資料庫是 migration file 裡的 `db` 參數在決定的
$ ./manage.py migrate

ref:
http://stackoverflow.com/questions/7029228/is-using-multiple-databases-and-south-together-possible

Unit Tests

Django only flushes the default database at the start of each test run. If your setup contains multiple databases, and you have a test that requires every database to be clean, you can use the multi_db attribute on the test suite to request a full flush.

在使用 --keepdb 的情況下,如果你的測試執行到一半就因為錯誤而中斷了,可能會發生資料庫裡有資料還沒被 flush 的問題,導致下次執行測試時失敗。不過如果你沒有用 --keepdb 的話,因為每次都會重建資料庫,所以不會有這個問題。

from django.test import TestCase

class YourBTestCase(TestCase):
    multi_db = True

    def setUp(self):
        do_shit()

ref:
https://docs.djangoproject.com/en/dev/topics/testing/tools/
https://docs.djangoproject.com/en/dev/topics/testing/advanced/#topics-testing-advanced-multidb

elasticsearch-dsl-py: The Official Elasticsearch ORM in Python

elasticsearch-dsl-py: The Official Elasticsearch ORM in Python

Query DSL 是 Elasticsearch 的查詢用 Domain-specific Language (DSL),實際上就是一堆 JSON。elasticsearch-dsl 是官方發佈的一套用來操作 Query DSL 的 Python package,可以當成是 Elasticsearch 的 ORM。

希望之後可以直接支援用 SQL 來查詢,不然 Query DSL 真的有夠難寫。

ref:
https://github.com/elastic/elasticsearch-dsl-py

Installation

$ pip install elasticsearch-dsl>=5.0.0,<6.0.0

ref:
https://elasticsearch-dsl.readthedocs.org/en/latest/index.html

Schema

in app/mappings.py

from elasticsearch_dsl import DocType, String, Boolean
from elasticsearch_dsl.connections import connections
connections.create_connection(hosts=['127.0.0.1', ])

class AlbumDoc(DocType):
    upc = String(index='not_analyzed')
    title = String(analyzer='ik', fields={'raw': String(index='not_analyzed')})
    artist = String(analyzer='ik')
    is_ready = Boolean()

    class Meta:
        index = 'dps'
        doc_type = 'album'

    @classmethod
    def sync(cls, album):
        album_doc = AlbumDoc(meta={'id': album.id})
        album_doc.upc = album.get_upcs(output_str=False)
        album_doc.title = album.name
        album_doc.artist = album.artist.name
        album_doc.is_ready = album.is_ready
        album_doc.save()

    def save(self, *args, **kwargs):
        return super(AlbumDoc, self).save(*args, **kwargs)

    def get_model_obj(self):
        from svapps.dps.models import Album
        return Album.objects.get(id=self.meta.id)

# to create mappings
AlbumDoc.init()

一定要執行一次 YourDocType.init(),這樣 Elasticsearch 才會根據你的 DocType 產生對應的 mapping。否則 Elasticsearch 就會在你第一次倒資料進去的時候根據你的資料的 data type 建立對應的 mapping,所以 analyzer 之類的設定就會是預設的 standard,你可以透過 _mapping API 來檢查。

需要全文搜尋的欄位要設為 analyzed(string 欄位默認都是 analyzed),不需要全文搜尋的欄位,也就是要求精確的欄位,例如:usernameemailzip code,就可以設成 not_analyzed,但是你就不能對 analyzed 的欄位使用 term 了,除非你對該欄位額外再建立一個 raw 欄位。

ref:
https://elasticsearch-dsl.readthedocs.org/en/latest/persistence.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-term-query.html#CO59-2

Store Data

album_doc = AlbumDoc(meta={'id': 42})
album_doc.upc = ['887375000619', '887375502069']
album_doc.title = 'abc'
album_doc.artist = 'xyz'
album_doc.is_ready = True
album_doc.save()

# 可以如常地 query,不用管它是不是 list
search = AlbumDoc.search().filter('term', upc='887375000619')
response = search.execute()

因為 Elasticsearch 是 schemaless,所以即使你定義了 String 欄位,還是可以存一個 list 進去。

Search Data

  • must:必須符合所有條件
  • should:符合其中一個條件即可
search = TrackDoc.search() \
    .filter('term', is_ready=True) \
    .query('match', title=u'沒有的啊')

search = TrackDoc.search() \
    .filter('term', is_ready=True) \
    .query(
        Q('match', title='沒有的啊') & \
        Q('match', artist='那我懂你意思了') & \
        Q('match', album='沒有的, 啊!?')
    )

q = Q(
    'bool',
    must=[
        Q('match', title={'query': track_name, 'fuzziness': 'AUTO'}),
    ],
    should=[
        Q('match', album={'query': album_name, 'minimum_should_match': '60%'}),
        Q('match', artist={'query': artist_name, 'minimum_should_match': '80%'}),
    ],
    minimum_should_match=1
)
search = TrackDoc.search().filter('term', is_ready=True).query(q)

q = Q(
    'bool',
    should=[
        Q('term', isrc=q),
        Q('term', upc=q),
        Q('match', **{'title.raw': {'query': q}}),
        Q('multi_match', query=q, fields=['title', 'artist', 'album']),
    ],
)
search = Search(index='dps', doc_type=['track', 'album']).query(q)
search = search[:20]

# print the raw Query DSL
import uniout
from pprint import pprint
pprint(search.to_dict())

response = search.execute()

print(response.hits.total)
print(response[0].title)
print(response[0].artist)
print(response[0].album)
print(response[0].is_ready)

ref:
https://elasticsearch-dsl.readthedocs.org/en/latest/search_dsl.html

Read and Write Files in Django and Python

Read and Write Files in Django and Python

File 和 ImageFile 接受 Python 的 file 或 StringIO 物件
而 ContentFile 接受 string

ref:
https://docs.djangoproject.com/en/dev/ref/files/file/#the-file-object

Django Form

image_file = request.FILES['file']

# 方法一
profile.mugshot.save(image_file.name, image_file)

# 方法二
profile.mugshot = image_file

profile.save()

open('/path/to/file.png')

from django.core.files import File

with open('/home/vinta/image.png', 'rb') as f:
    profile.mugshot = File(f)
    profile.save()

Django ContentFile

import os
import uuid

from django.core.files.base import ContentFile

import requests

url = 'http://vinta.ws/static/photo.jpg'
r = requests.get(url)
file_url, file_ext = os.path.splitext(r.url)
file_name = '%s%s' % (str(uuid.uuid4()).replace('-', ''), file_ext)

profile.mugshot.save('123.png', ContentFile(r.content), save=False)

# 如果 profile.mugshot 是 ImageField 欄位的話
# 可以用以下的方式來判斷它是不是合法的圖檔
try:
    profile.mugshot.width
except TypeError:
    raise RuntimeError('圖檔格式不正確')

profile.save()

Data URI, Base64

from binascii import a2b_base64

from django.core.files.base import ContentFile

data_uri = '....'
head, data = data_uri.split(',')
binary_data = a2b_base64(data)

# 方法一
profile.mugshot.save('whatever.jpg', ContentFile(binary_data), save=False)
profile.save()

# 不能用這種方式,因為少了 file name
profile.mugshot = ContentFile(binary_data)
profile.save()

# 方法二
f = open('image.png', 'wb')
f.write(binary_data)
f.close()

# 方法三
from StringIO import StringIO
from PIL import Image
img = Image.open(StringIO(binary_data))
print img.size

ref:
https://stackoverflow.com/questions/19395649/python-pil-create-and-save-image-from-data-uri

StringIO, PIL image

你就把 StringIO 想成是 open('/home/vinta/some_file.txt', 'rb') 的 file 物件

from StringIO import StringIO

from PIL import Image
import requests

r = requests.get('http://vinta.ws/static/photo.jpg')
img = Image.open(StringIO(r.content))
print pil_image.size

StringIO, PIL image, Django

from StringIO import StringIO

from django.core.files.base import ContentFile

from PIL import Image

img = Image.open(instance.file)
# or
raw_img_io = StringIO(binary_data)
img = Image.open(raw_img_io)
img = img.resize((524, 328), Image.ANTIALIAS)
img_io = StringIO()
img.save(img_io, 'PNG', quality=100)

profile.image.save('whatever.png', ContentFile(img_io.getvalue()), save=False)
profile.save()

ref:
https://stackoverflow.com/questions/3723220/how-do-you-convert-a-pil-image-to-a-django-file

Download file from URL, tempfile

import os
import tempfile
import requests
import xlrd

try:
    file_path = report.file.path
    temp = None
except NotImplementedError:
    url = report.file.url
    r = requests.get(url, stream=True)
    file_url, file_ext = os.path.splitext(r.url)

    # delete=True 會在 temp.close() 之後自己刪掉
    temp = tempfile.NamedTemporaryFile(prefix='report_file_', suffix=file_ext, dir='/tmp', delete=False)
    file_path = temp.name

    with open(file_path, 'wb') as f:
        for chunk in r.iter_content(chunk_size=1024):
            if chunk:
                f.write(chunk)
                f.flush()

wb = xlrd.open_workbook(file_path)

# 因為是 tempfile.NamedTemporaryFile(delete=False)
# 所以你要自己刪掉
try:
    os.remove(temp.name)
except AttributeError:
    pass

ref:
https://stackoverflow.com/questions/16694907/download-large-file-in-python-with-requests

HTTP Cache Headers in Django

HTTP Cache Headers in Django

Conditional View Processing

def latest_entry(request, blog_id):
    return Entry.objects.filter(blog=blog_id).latest("published").published

@condition(last_modified_func=latest_entry)
def front_page(request, blog_id):
    ...

ref:
https://docs.djangoproject.com/en/dev/topics/conditional-view-processing/

Cache Middlewares

如果有啟用 Django 的 cache middleware(就是 UpdateCacheMiddlewareFetchFromCacheMiddleware
每一個 request 都會被標上 Cache-Control: max-age=600
那個 600 是根據 CACHE_MIDDLEWARE_SECONDS 設定的值

只要設置了 max-age > 0
response header 中就會被自動加入 Cache-ControlExpiresLast-Modified 兩個欄位

ref:
https://docs.djangoproject.com/en/dev/ref/settings/#std:setting-CACHE_MIDDLEWARE_SECONDS

Never Cache Decorator

from django.views.decorators.cache import never_cache

@never_cache
def myview(request):
    pass

如果你單純的就是不希望被 cache
就使用這種方式
@never_cache 只會設置 Cache-Control: max-age=0max-age=0 是馬上過期)

Cache Control Decorator

from django.views.decorators.cache import cache_control

class SongDetail(SVAPIDetailView):
    serializer_class = api_serializers.SongDetailSerializer

    @cache_control(no_store=True, no_cache=True, max_age=0)
    def get(self, request, song_id):
        do_something()

        return Response(data)

不知道為什麼,只設置 no-store 和 no-cache 的話
iOS 的 AFNetworking 還是會 cache
照道理說 no-store 的優先權應該是最高的
目前的解法是使用 Cache-Control: max-age=0

ref:
https://docs.djangoproject.com/en/dev/topics/cache/#controlling-cache-using-other-headers