Python with Excel: xlrd, xlsxwriter, and xlutils

Python with Excel: xlrd, xlsxwriter, and xlutils

Libraries

xlsxwriter 的文件寫得比較好

ref:
http://www.python-excel.org/
https://xlsxwriter.readthedocs.org/en/latest/
http://openpyxl.readthedocs.org/en/latest/

Usage

row 是橫排
column 是直排

Default format

import xlsxwriter

workbook = xlsxwriter.Workbook('label_copy.xlsx')

# default cell format
workbook.formats[0].set_font_size(12)
workbook.formats[0].set_text_wrap() # 要加上這個才能正常顯示多行
workbook.formats[0].set_align('vcenter')

ref:
https://xlsxwriter.readthedocs.org/en/latest/format.html

Multiple lines

lines_format = workbook.add_format({
    'align': 'left',
    'font_size': 12,
    'text_wrap': True,
    'valign': 'vcenter',
})

# 或是用 """多行"""
content = 'first line\nsecond line'
worksheet.write(0, 0, content, lines_format)

重點是要加上 text_wrap

ref:
http://stackoverflow.com/questions/15370432/writing-multi-line-strings-into-cells-using-openpyxl

Write to existing excel files

from xlutils.copy import copy as xlutils_copy
import xlrd

rb = xlrd.open_workbook('your_file.xls', formatting_info=True)
wb = xlutils_copy(rb)
ws = wb.get_sheet(0)
ws.write(0, 0, 'Hello World')

ref:
https://stackoverflow.com/questions/2725852/writing-to-existing-workbook-using-xlwt

Examples

ref:
https://xlsxwriter.readthedocs.org/en/latest/examples.html

ipdb: The interactive Python debugger with IPython

ipdb: The interactive Python debugger with IPython

ipdb is an interactive Python Debugger with IPython integration, which features tab completion and syntax highlighting, etc. In layman's terms, ipdb is a better pdb.

ref:
https://github.com/gotcha/ipdb

Usage

$ pip install -U ipdb

ref:
https://pypi.python.org/pypi/ipdb

Add a breakpoint to any place you want to inspect, then run your code.

import ipdb; ipdb.set_trace()

If you use Sublime Text 3, try Python Breakpoints.
https://github.com/obormot/PythonBreakpoints

Useful Commands

Oldest frame is the frame in the stack where your program started; it is the oldest in time; the Newest frame, the other end of the stack, is where Python is executing code and is the current frame of execution.

# help: Print the list of all commands
h

# help: Print help about the certain command
h break

# print: Print the value of the expression
p some_obj
pp some_obj

# Print detailed information about the object
pinfo some_obj
pinfo2 some_obj

# args: Print arguments with their values of the current function
a

# list: List 11 lines of source code around the current line
l

# list: List 11 lines of source code around line 123
l 123

# longlist: List all source code for the current function or frame
ll

# jump: Jump to line 123, skip the execution of anything between
j 123

# args: List all arguments of the current function
a

# step: Execute code line by line, it may jump to another frame when a function call is encountered
s

# next: Execute code line by line, it doesn't enter functions called from the statement being executed
n

# return: Continue execution until the current function returns.
r

# continue: Continue execution, only stop when a breakpoint is encountered
c

# break: List all breakpoints
b

# break: Set a breakpoint at line 123
b 123

# break: Set a breakpoint at line 123 of file.py
b path/to/file.py:123

# break: Set a breakpoint on some_func that will be triggered if some_arg == 0
b some_func, some_arg == 0

# clear: Clear all breakpoints
clear

# where: Print a stack trace
w

# up: Move the current frame one level up in the stack trace
u

# down: Move the current frame one level down in the stack trace
d

# quit: Quit debugging
q

# use ! to run Python code that may conflict with pdb's built-in commands
!r = 123
!r = 123; c = 455

ref:
https://docs.python.org/2/library/pdb.html#debugger-commands
https://docs.python.org/3/library/pdb.html#debugger-commands
https://pymotw.com/2/pdb/
https://pymotw.com/3/pdb/
https://medium.com/instamojo-matters/become-a-pdb-power-user-e3fc4e2774b2

post_mortem

Debugging a failure after a program terminates is called post-mortem debugging.

>>> do_shit(a)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "pdb_post_mortem.py", line 13, in go
    for i in range(self.num_loops):
AttributeError: 'MyObj' object has no attribute 'num_loops'
>>> import ipdb; ipdb.pm()
>>> w

trace

Tracing a program as it runs. In this case, it will enter ipdb when sys.path changes.

import sys

# this function will execute on every line!!!
def trace_sys_path(frame, event, arg):
    if sys.path[0].endswith('/lib'):
        ipdb.set_trace()
    return trace_sys_path

sys.settrace(trace_sys_path)

ref:
https://youtu.be/5XvAVgcbmdY?t=22m51s

Use IPython magic functions in ipdb

Because that ipdb is not a full IPython shell: actually, it provides the same Python Debugger interface as pdb, ipdb lacks many features of IPython, for instance, magic functions. You could use following code to enter a real IPython environment for debugging.

from IPython import embed; embed()

Instead of import ipdb; ipdb.set_trace().

ref:
http://stackoverflow.com/questions/16184487/use-ipython-magic-functions-in-ipdb-shell
https://github.com/gotcha/ipdb/issues/33

Python decorators

Python decorators

Python 裡的所有東西都是 object
function 也是
所以你可以對 function 做任何跟 object 一樣的事
例如把一個 function 當成參數丟給另一個 function
當然也可以 decorate class

不帶參數的 decorator

第一層 def 接收 func
第二層 def 接收 func 的 *args, **kwargs

通常意義下的 decorator 是把 func(就是 something_1、something_2)丟給 decorator function
做一些額外的動作
然後回傳該 func 原本的 return
並不操作 func 本身

如果要操作 func 本身
例如幫 func 增加一個 attribute
請參考下下面的例子

def func_wrapper(func):
    def arg_wrapper(*args, **kwargs):
        print func.__name__ + ' was called'
        print args
        print kwargs

        return func(*args, **kwargs)

    return arg_wrapper

@func_wrapper
def something_1(name='[name]'):
    print 'something_1: ' + name

@func_wrapper
def something_2(name='[name]'):
    print 'something_2: ' + name

something_1('Tom')
something_2('Cathy')
something_1()
something_2()

ref:
http://www.dotblogs.com.tw/rickyteng/archive/2013/11/06/126852.aspx

帶參數的 decorator

需要傳參數給 decorator 時
要多 wrap 一次
其實就是在原本的 decorator function 的外面再多加一個 def 來接收參數

那個 model_class 就是傳給 decorator 的參數

# 跟不帶參數的 decorator 相比多了一層,用來接收 decorator 的參數
def can_access_item_required(model_class):
    def func_wrapper(func):
        def arg_wrapper(*args, **kwargs):
            request = args[0]
            item = model_class.objects.get(pk=kwargs['pk'])

            if not model_class.objects.can_access_item_by(item, request.user):
                raise PermissionDenied()

            return func(*args, **kwargs)

        return arg_wrapper

    return func_wrapper

@can_access_item_required(Label)
def label_update(request, pk):
    pass

# equals to

label_update = can_access_item_required(Artist)(label_update)

ref:
https://www.python.org/dev/peps/pep-0318/#current-syntax

decorator 修改 func 本身

不需要封裝什麼
單純地把 func 丟給另一個 function
然後再 return 那個 func 即可

in views.py

def intro_middleware_decorator(func):
    func.enable_intro_middleware = False
    return func

@intro_middleware_decorator
def intro_1(request):
    return render(request, 'dps/intro/1.html')

in middleware.py

class IntroMiddleware(object):
    """
    如果 user 沒有完成 intro 步驟,則轉址到 /intro/1/
    """
    def __init__(self):
        self.enable = True
    def process_view(self, request, view_func, view_args, view_kwargs):
        """
        要在 intro 系列的 view 加上 disable_intro_middleware_decorator
        避免無限迴圈
        """
        self.enable = getattr(view_func, 'enable_intro_middleware', True)
    def process_response(self, request, response):
        if self.enable:
            resolver = request.resolver_match
            namespace = getattr(resolver, 'namespace', None)
            # 避免 admin, login 之類的 view 也都被 redirect
            if namespace == 'dps':
                if request.user.is_authenticated():
                    profile = request.user.profile
                    if (profile.identity == ProfileIdentity.PHANTOM) and (not profile.is_ready):
                        return redirect('dps:intro-one')
                else:
                    return redirect('auth:login')
        return response

decorator in a Class

class SVAPIListView(object):

    @staticmethod
    def filter_last_modified_decorator(func):
        """
        用來讓 client 同步資料
        ?last_modified=2015-02-04T15:16:22&is_deleted=true
        可以拿到某個時間點之後,新增、修改、刪除的項目
        必須明確地指定 is_deleted=true 才會包含被刪除的項目
        """

        def _add_filters_for_syncing(self, *args, **kwargs):
            queryset = func(self, *args, **kwargs)

            last_modified = self.get_last_modified()
            if last_modified:
                queryset = queryset.filter(last_modified__gte=last_modified)

                is_deleted = self.get_is_deleted()
                target_user = self.get_target_user()
                if (self.request.user == target_user) and (is_deleted):
                    # 只有用戶本人才能拿到被刪除的 items
                    pass
                else:
                    queryset = queryset.filter(enable=True)
            else:
                queryset = queryset.filter(enable=True)

            return queryset

        return _add_filters_for_syncing

class UserAlbumList(SVAPIListView):
    serializer_class = api_serializers.AlbumListSerializer

    @UserIDAsUsernameMixin.filter_last_modified_decorator
    def get_queryset(self, profile_user):
        queryset = MusicAlbum.objects.filter(user=profile_user)
        queryset = queryset.select_related('user', 'user__profile')

        ordering = self.request.GET.get('ordering')
        if ordering == 'like_count':
            queryset = queryset.order_by('-like_count')
        else:
            queryset = queryset.order_by('-id')

        return queryset

    def get(self, request, user_id, *args, **kwargs):
        profile_user = self.get_target_user()
        queryset = self.get_queryset(profile_user=profile_user)
        data = self.get_serializer_data(queryset)

        return Response(data)

ref:
http://stackoverflow.com/questions/1263451/python-decorators-in-classes

多個 decorator 時的執行順序

如果有多個 decorator 裝飾同一個 function / class
執行的順序是由下往上的
會先執行 @decorator_1

@decorator_3
@decorator_2
@decorator_1
def function():
    pass

Class-based decorator

class RecorderDecorator(object):

    __slots__ = ['recorder', 'msg_template', 'subject', 'action']

    recorder = Recorder()

    def __init__(self, msg_template, subject=None, action=None):
        self.msg_template = msg_template
        self.subject = subject
        self.action = action

class model_recorder(RecorderDecorator):
    """
    用於 model 的 method
    接受一個 string template,會自動使用 method parameters 作為 template context
    會使用 method name 作為 action,model name 作為 subject
    會在裝飾的 method 執行完之後執行這個 decorator

    用法可以參考 Song.play()

    @recorder_decorators.model_recorder('user: {user.id}, ip: {from_ip}, full: {is_full}, embed: {is_embed}')
    def play(self, user, from_ip, is_full=False, is_embed=False, from_playlist=None):
        pass

    則 log message 會是 [song:play] user: 123, ip: 59.120.12.57, full: True, embed: False
    """

    def __call__(self, func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            func_result = func(*args, **kwargs)
            func_kwargs = inspect.getcallargs(func, *args, **kwargs)

            model_instance = func_kwargs.get('self')

            if getattr(model_instance, 'id'):
                new_msg_template = ', '.join(('item: {self.id}', self.msg_template))
            else:
                new_msg_template = self.msg_template

            if not self.subject:
                model_name = type_utils.item_type(model_instance)
                new_subject = model_name
            else:
                new_subject = self.subject

            if not self.action:
                new_action = func.__name__
            else:
                new_action = self.action

            msg = new_msg_template.format(**func_kwargs)
            self.recorder.write(new_subject, new_action, msg)

            return func_result

        return wrapper

class Song(models.Model):

    @model_recorder('user: {user.id}, ip: {from_ip}, full: {is_full}, embed: {is_embed}')
    def play(self, user, from_ip, is_full=False, is_embed=False, from_playlist=None):
        pass

ref:
https://stackoverflow.com/questions/9416947/python-class-based-decorator-with-parameters-that-can-decorate-a-method-or-a-fun

Database Routers in Django

Database Routers in Django

把部分 models / tables 獨立到一台資料庫

Database Router

沒辦法在 Model class 裡指定這個 model 只能用某個 database
而是要用 database router
就是判斷 model._meta.app_label == 'xxx' 的時候指定使用某一個 database
database 是指定義在 settings.DATABASES 的那些

不過 django 不支援跨 database 的 model relation
你不能用 foreign key 或 m2m 指向另一個 database 裡的 model
但是其實你直接用 user_id, song_id 之類的 int 欄位來記錄就好了

in settings.py

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': 'default',
        'USER': 'whatever',
        'PASSWORD': 'whatever',
        'HOST': '',
        'PORT': '',
    },
    'warehouse': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': 'warehouse',
        'USER': 'whatever',
        'PASSWORD': 'whatever',
        'HOST': '',
        'PORT': '',
    },
}

DATABASE_ROUTERS = [
    'app.db_routers.SVDatabaseRouter',
]

in app/db_routers.py

from oauth2_provider.models import AccessToken as OauthAccessToken
from oauth2_provider.oauth2_backends import get_oauthlib_core
from rest_framework2.authentication import BaseAuthentication

class SVOauthAuthentication(BaseAuthentication):
    www_authenticate_realm = 'api'

    def authenticate(self, request):
        oauthlib_core = get_oauthlib_core()
        valid, result = oauthlib_core.verify_request(request, scopes=[])

        if valid:
            return (result.user, result.access_token)
        else:
            access_token = request.GET.get('access_token', None)
            if access_token:
                try:
                    access_token_obj = OauthAccessToken.objects.get(token=access_token)
                except OauthAccessToken.DoesNotExist:
                    pass
                else:
                    return (access_token_obj.user, access_token_obj)

        return None

    def authenticate_header(self, request):
        return 'Bearer realm="{0}"'.format(self.www_authenticate_realm)

ref:
https://docs.djangoproject.com/en/dev/topics/db/multi-db/

Models

把所有需要放到另一台 db 的 models 都放在同一個 app 下
方便管理

in warehouse/models.py

class PlayRecord(SVWarehouseDBMixin, models.Model):
    song_id = models.IntegerField()
    user_id = models.IntegerField(null=True, blank=True)
    is_full = models.BooleanField(default=False)
    ip_address = models.IPAddressField()
    location = models.CharField(max_length=2)
    created_at = models.DateTimeField()

繼承 SVWarehouseDBMixin 這個 minxin 的 class 會被放到 warehouse 這台資料庫!
所以 migrate 的時候要注意,記得在 migration 檔案裡加上:

from south.db import dbs
warehouse_db = dbs['warehouse']

這樣 south 才會在 warehouse 這台資料庫上建立 table
不然就是你自己手動去 CREATE TABLE

Migration

如果你要把舊有的 app 的 models 搬到另一台資料庫
但是 models 不動(還是放在本來的 app 底下)
你可能會需要 reset 整個 migration 紀錄
從頭開始建立一個新的 migration
因為 schema 會錯亂
所以還是建議新開一個 app 來放那些要搬到另一台資料庫的 models
這樣 database router 和 migration 都會比較單純

in app/migrations/0001_initial.py

$ pip install south==1.0.2

# syncdb 默認只會作用到 default 資料庫,你要明確指定要用哪個 database 才行
$ ./manage.py syncdb --noinput
$ ./manage.py syncdb --noinput --database=warehouse

# migrate 卻可以作用到其他資料庫
# 因為 migrate 哪個資料庫是 migration file 裡的 `db` 參數在決定的
$ ./manage.py migrate

ref:
http://stackoverflow.com/questions/7029228/is-using-multiple-databases-and-south-together-possible

Unit Tests

Django only flushes the default database at the start of each test run. If your setup contains multiple databases, and you have a test that requires every database to be clean, you can use the multi_db attribute on the test suite to request a full flush.

在使用 --keepdb 的情況下,如果你的測試執行到一半就因為錯誤而中斷了,可能會發生資料庫裡有資料還沒被 flush 的問題,導致下次執行測試時失敗。不過如果你沒有用 --keepdb 的話,因為每次都會重建資料庫,所以不會有這個問題。

from django.test import TestCase

class YourBTestCase(TestCase):
    multi_db = True

    def setUp(self):
        do_shit()

ref:
https://docs.djangoproject.com/en/dev/topics/testing/tools/
https://docs.djangoproject.com/en/dev/topics/testing/advanced/#topics-testing-advanced-multidb

elasticsearch-dsl-py: The Official Elasticsearch ORM in Python

elasticsearch-dsl-py: The Official Elasticsearch ORM in Python

Query DSL 是 Elasticsearch 的查詢用 Domain-specific Language (DSL),實際上就是一堆 JSON。elasticsearch-dsl 是官方發佈的一套用來操作 Query DSL 的 Python package,可以當成是 Elasticsearch 的 ORM。

希望之後可以直接支援用 SQL 來查詢,不然 Query DSL 真的有夠難寫。

ref:
https://github.com/elastic/elasticsearch-dsl-py

Installation

$ pip install elasticsearch-dsl>=5.0.0,<6.0.0

ref:
https://elasticsearch-dsl.readthedocs.org/en/latest/index.html

Schema

in app/mappings.py

from elasticsearch_dsl import DocType, String, Boolean
from elasticsearch_dsl.connections import connections
connections.create_connection(hosts=['127.0.0.1', ])

class AlbumDoc(DocType):
    upc = String(index='not_analyzed')
    title = String(analyzer='ik', fields={'raw': String(index='not_analyzed')})
    artist = String(analyzer='ik')
    is_ready = Boolean()

    class Meta:
        index = 'dps'
        doc_type = 'album'

    @classmethod
    def sync(cls, album):
        album_doc = AlbumDoc(meta={'id': album.id})
        album_doc.upc = album.get_upcs(output_str=False)
        album_doc.title = album.name
        album_doc.artist = album.artist.name
        album_doc.is_ready = album.is_ready
        album_doc.save()

    def save(self, *args, **kwargs):
        return super(AlbumDoc, self).save(*args, **kwargs)

    def get_model_obj(self):
        from svapps.dps.models import Album
        return Album.objects.get(id=self.meta.id)

# to create mappings
AlbumDoc.init()

一定要執行一次 YourDocType.init(),這樣 Elasticsearch 才會根據你的 DocType 產生對應的 mapping。否則 Elasticsearch 就會在你第一次倒資料進去的時候根據你的資料的 data type 建立對應的 mapping,所以 analyzer 之類的設定就會是預設的 standard,你可以透過 _mapping API 來檢查。

需要全文搜尋的欄位要設為 analyzed(string 欄位默認都是 analyzed),不需要全文搜尋的欄位,也就是要求精確的欄位,例如:usernameemailzip code,就可以設成 not_analyzed,但是你就不能對 analyzed 的欄位使用 term 了,除非你對該欄位額外再建立一個 raw 欄位。

ref:
https://elasticsearch-dsl.readthedocs.org/en/latest/persistence.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-term-query.html#CO59-2

Store Data

album_doc = AlbumDoc(meta={'id': 42})
album_doc.upc = ['887375000619', '887375502069']
album_doc.title = 'abc'
album_doc.artist = 'xyz'
album_doc.is_ready = True
album_doc.save()

# 可以如常地 query,不用管它是不是 list
search = AlbumDoc.search().filter('term', upc='887375000619')
response = search.execute()

因為 Elasticsearch 是 schemaless,所以即使你定義了 String 欄位,還是可以存一個 list 進去。

Search Data

  • must:必須符合所有條件
  • should:符合其中一個條件即可
search = TrackDoc.search() \
    .filter('term', is_ready=True) \
    .query('match', title=u'沒有的啊')

search = TrackDoc.search() \
    .filter('term', is_ready=True) \
    .query(
        Q('match', title='沒有的啊') & \
        Q('match', artist='那我懂你意思了') & \
        Q('match', album='沒有的, 啊!?')
    )

q = Q(
    'bool',
    must=[
        Q('match', title={'query': track_name, 'fuzziness': 'AUTO'}),
    ],
    should=[
        Q('match', album={'query': album_name, 'minimum_should_match': '60%'}),
        Q('match', artist={'query': artist_name, 'minimum_should_match': '80%'}),
    ],
    minimum_should_match=1
)
search = TrackDoc.search().filter('term', is_ready=True).query(q)

q = Q(
    'bool',
    should=[
        Q('term', isrc=q),
        Q('term', upc=q),
        Q('match', **{'title.raw': {'query': q}}),
        Q('multi_match', query=q, fields=['title', 'artist', 'album']),
    ],
)
search = Search(index='dps', doc_type=['track', 'album']).query(q)
search = search[:20]

# print the raw Query DSL
import uniout
from pprint import pprint
pprint(search.to_dict())

response = search.execute()

print(response.hits.total)
print(response[0].title)
print(response[0].artist)
print(response[0].album)
print(response[0].is_ready)

ref:
https://elasticsearch-dsl.readthedocs.org/en/latest/search_dsl.html