碼天狗週刊 第 135 期 @vinta - Kubernetes, Python, MongoDB

碼天狗週刊 第 135 期 @vinta - Kubernetes, Python, MongoDB

本文同步發表於 CodeTengu Weekly - Issue 135

The incomplete guide to Google Kubernetes Engine

根據前陣子搗鼓 Kubernetes 的心得寫了一篇文章,跟大家分享一下,希望有幫助。內容包含概念介紹、建立 cluster、新增 node pools、部署 ConfigMap、Deployment with LivenessProbe/ReadinessProbe、Horizontal Pod Autoscaler、Pod Disruption Budget、StatefulSet、DaemonSet,到說明 Service 和 Ingress 的關係,以及 Node Affinity 與 Pod Affinity 的應用等。

順帶一提,就算只是架來玩玩,建議大家可以直接在 Google Kubernetes Engine 開一個 preemptible(類似 AWS 的 Spot Instances)的 k8s cluster,價格超便宜,所以就不要再用 minikube 啦。不過現在連 Amazon 也有自己的 managed Kubernetes 了,雖然目前公司是用 GCP,但是還是比較懷念 AWS 啊~

Fluent Python

雖然 Python 也是寫了一陣子了,但是每次讀這本書還是能夠學到不少。真心推薦。

當初學 Python 讀的是另一本 Learning Python,查了一下,哇都出到第五版了。

延伸閱讀:

A deep dive into the PyMongo MongoDB driver

Replica Set 通常是 MongoDB 的標準配置(再來就是 Sharding 了),這個 talk 詳細地說明了 Replica Set 是如何應對 service discovery 以及 PyMongo 和 Replica Set 之間是怎麼溝通的。

延伸閱讀:

Let's talk about usernames

就像我們之前提到過很多次的 Falsehoods 系列,這篇文章也是一直不厭其煩地告訴大家,幾乎每個系統、每個網站都會有的東西:username,其實沒有你以為的那麼簡單。大家感受一下。

作者也提到一個很重要的 The Tripartite Identity Pattern,把所謂的 ID 分成以下三種:

  1. System-level identifier, suitable for use as a target of foreign keys in our database
  2. Login identifier, suitable for use in performing a credential check
  3. Public identity, suitable for displaying to other users

而不要想用同一個 identifier 搞定所有用途。

Web Architecture 101

這篇文章淺顯易懂地解釋了一個現代的 web service 通常會具備的各項元件。不過說真的,如果你今天是一個初入門的後端工程師,你究竟得花多少時間和心力才能摸清楚這篇文章提到的東西?更別提那些更加底層的知識了,喔,這篇文章甚至也還沒提到 DevOps 的事情呢。就像之前讀到的 Will Kubernetes Collapse Under the Weight of Its Complexity?,總覺得整個態勢發展到現在,對新手(甚至是我們這種普通的 1x 工程師)似乎不是很友善啊。

延伸閱讀:

Integrate with Google Cloud API in Python

Integrate with Google Cloud API in Python

Installation

$ pipenv install google-cloud

# you could install specific components you want
$ pipenv install google-cloud-storage

ref:
https://google-cloud-python.readthedocs.io/en/latest/index.html

Google Cloud Storage

It is worth noting that, initializing storage.Client() is a blocking call.

ref:
https://google-cloud-python.readthedocs.io/en/latest/storage/buckets.html
https://cloud.google.com/storage/docs/reference/libraries

Update A File's Metadata

from google.cloud import storage

storage_client = storage.Client()
source_bucket = storage_client.get_bucket('asia.public.swag.live')
source_file = source_bucket.get_blob('launchs/57c16f5bb811055b66d8ef46.jpg')
source_file.metadata = {
    'Link': '<https://api.v2.swag.live/users/57c16f5bb811055b66d8ef46>; rel="user"',
}
source_file.patch()

ref:
https://github.com/GoogleCloudPlatform/google-cloud-python/issues/1185

Copy A File

from google.cloud import storage

def copy_file(source_bucket, source_name, destination_bucket, destination_name):
    storage_client = storage.Client()
    source_bucket = storage_client.get_bucket(source_bucket)
    source_file = source_bucket.blob(source_name)
    destination_bucket = storage_client.get_bucket(destination_bucket)
    destination_file = source_bucket.copy_blob(source_file, destination_bucket, destination_name)
    return destination_file

file_ext_mapping = {
    'image/jpeg': 'jpg',
    'video/mp4': 'mp4',
}
file_ext = file_ext_mapping[original_message.media.mimetype]
source_name = f'messages/{original_message.id}.{file_ext}'
destination_name = f'messages/{new_message.id}.{file_ext}'

copy_file(
    source_bucket='asia.uploads.swag.live',
    source_name=source_name,
    destination_bucket='asia.uploads.swag.live',
    destination_name=destination_name,
)

ref:
https://cloud.google.com/storage/docs/json_api/v1/objects/copy
https://cloud.google.com/storage/docs/renaming-copying-moving-objects#storage-copy-object-python

Copy A Folder With Batch Operations

from google.cloud import storage

def copy_files(source_bucket_name, source_name_prefix, destination_bucket_name, fix_destination_name_func=None):
    storage_client = storage.Client()
    source_bucket = storage_client.get_bucket(source_bucket_name)
    destination_bucket = storage_client.get_bucket(destination_bucket_name)
    blobs = source_bucket.list_blobs(prefix=source_name_prefix)

    # YOU CANNOT DO THIS
    # blobs is a HTTP iterator
    # blobs.num_results always return 0
    # if not blobs.num_results:
    #     raise ValueError(f'No objects matched: gs://{source_bucket.name}/{source_name_prefix}')

    with storage_client.batch():
        for source_blob in blobs:
            destination_name = fix_destination_name_func(source_blob.name) if callable(fix_destination_name_func) else source_blob.name
            source_bucket.copy_blob(source_blob, destination_bucket, destination_name)
    return True

source_bucket_name = 'asia.uploads.swag.live'
destination_bucket_name = 'asia.contents.swag.live'
source_name_prefix = 'forum-posts/123'

copy_files(
    source_bucket_name=source_bucket_name,
    destination_bucket_name=destination_bucket_name,
    source_name_prefix=source_name_prefix,
    fix_destination_name_func=lambda source_name: source_name.replace(source_name_prefix, 'forum-posts'),
)

equals to

$ gsutil cp -r "gs://asia.uploads.swag.live/forum-posts/123/*" "gs://asia.contents.swag.live/"

ref:
https://cloud.google.com/storage/docs/listing-objects

batch() does not guarantee the order of executions, so do not mix different type of calls in the same batch. For instance, the batch should not be a mixture of "copy a.txt" then delete a.txt.

ref:
https://googlecloudplatform.github.io/google-cloud-python/latest/storage/batch.html

Upload A File Directly To A Bucket

We first need to generate a signed upload URL and we could upload the file to the URL.

import base64
import datetime
import time

from oauth2client.client import GoogleCredentials
import yarl

credentials = GoogleCredentials.get_application_default()

def signurl(method, url, content_type=None, expires_at=None, md5sum=None, meta=None):
    method, is_resumable = method.upper(), False
    if method in ['RESUMABLE']:
        method, is_resumable = 'POST', True
    path = yarl.URL(url).path

    def signature():
        def _signature_parts():
            def _meta():
                for key, value in (meta or {}).items():
                    yield 'x-goog-meta-{key}:{value}'.format(key=key, value=value)
                if is_resumable:
                    yield 'x-goog-resumable:start'

            yield method
            yield md5sum or ''
            # we need to use `curl -H 'content-type:'` to upload if we sign an empty content-type
            yield content_type or 'application/octet-stream'
            yield str(int(time.mktime(expires_at.timetuple()))) if expires_at else ''
            yield from sorted(_meta())
            yield path

        _, signature = credentials.sign_blob('\n'.join(_signature_parts()))
        return base64.b64encode(signature).decode('utf-8')

    def params():
        yield 'GoogleAccessId', credentials.service_account_email
        if expires_at:
            yield 'Expires', int(time.mktime(expires_at.timetuple()))
        yield 'Signature', signature()

    return str(yarl.URL(url).with_query(**dict(params())))

signurl(
    method='RESUMABLE',
    url='https://storage.googleapis.com/asia.uploads.swag.live/forum-posts/your-filename.ext'
    expires_at=datetime.datetime.utcnow() + datetime.timedelta(hours=24),
)
$ curl -v -X 'POST' \
-H 'content-type: application/octet-stream' \
-H 'x-goog-resumable:start' \
-d '' 'THE_SIGNED_UPLOAD_URL'

$ curl -v -X PUT \
--upload-file whatever.mp4 \
THE_URL_FROM_LOCATION_HEADER_OF_THE_ABOVE_RESPONSE

ref:
https://cloud.google.com/storage/docs/access-control/signed-urls#signing-resumable
https://cloud.google.com/storage/docs/xml-api/resumable-upload
https://cloud.google.com/storage/docs/json_api/v1/how-tos/upload
https://cloud.google.com/storage/docs/uploading-objects

Setup Celery with your Flask project

Setup Celery with your Flask project

Installation

$ pipenv install flask "celery[redis,gevent]"

ref:
http://docs.celeryproject.org/en/latest/index.html
https://github.com/celery/celery

Configuration

$ tree simple-api
simple-api
├── Dockerfile
├── Pipfile
├── Pipfile.lock
├── app.py
├── requirements.txt
└── simple_api
    ├── bar
    │   ├── __init__.py
    │   ├── endpoints.py
    │   └── tasks.py
    ├── foo
    │   ├── __init__.py
    │   ├── endpoints.py
    │   └── tasks.py
    ├── __init__.py
    └── tasks.py
# simple_api/celeryconfig.py
import os

broker_url = os.environ.get('CELERY_BROKER_URL') or 'redis://127.0.0.1:6379/0'
result_backend = os.environ.get('CELERY_RESULT_BACKEND') or 'redis://127.0.0.1:6379/1'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']

task_eager_propagates = True
task_ignore_result = True

timezone = 'UTC'
enable_utc = True
# simple_api/__init__.py
from celery import Celery
from flask import Flask

def make_celery(app):
    celery = Celery(app.import_name)
    celery.config_from_object('simple_api.celeryconfig')

    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

def create_app():
    app = Flask(__name__)
    app.config.from_object('simple_api.config')

    from . import tasks

    app.celery = make_celery(app)
    return app
# app.py
import simple_api

app = simple_api.create_app()
celery = app.celery

ref:
http://flask.pocoo.org/docs/1.0/patterns/celery/
http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#configuration
http://docs.celeryproject.org/en/latest/userguide/application.html
http://docs.celeryproject.org/en/latest/userguide/configuration.html

Tasks

# simple_api/tasks.py
import celery

@celery.shared_task()
def sleep(message, seconds=1):
    import time
    time.sleep(seconds)
    print(message)
    return seconds

ref:
http://docs.celeryproject.org/en/latest/userguide/tasks.html

# simple_api/whatever.py
from simple_api import tasks

tasks.sleep.delay('Hello World', seconds=5)

ref:
http://docs.celeryproject.org/en/latest/userguide/calling.html

Usage

# run web
$ FLASK_APP=app.py FLASK_ENV=development  flask run

# run workers
$ celery -A app:celery worker -P gevent --without-gossip -c 100 --prefetch-multiplier 1 -Ofair -l info

ref:
http://docs.celeryproject.org/en/latest/userguide/optimizing.html

Flask project structures

Flask project structures

Once you choose to follow Application Factory pattern which is officially recommended, the only place you could access app (the Flask() object) directly is inside create_app().

With the factory function, you are able to apply configurations dynamically which is particularly important for unit tests and CI.

$ tree simple-api
simple-api
├── Dockerfile
├── Pipfile
├── Pipfile.lock
├── app.py
├── requirements.txt
└── simple_api
    ├── bar
    │   ├── __init__.py
    │   ├── endpoints.py
    │   └── tasks.py
    ├── foo
    │   ├── __init__.py
    │   ├── endpoints.py
    │   └── tasks.py
    ├── __init__.py
    ├── config.py
    └── tasks.py
# simple_api/config.py
import os

class Config(object):
    SECRET_KEY = 'secret-key'

class ProductionConfig(Config):
    pass

class DevelopmentConfig(Config):
    pass
# simple_api/__init__.py
from flask import Flask, request
from flask_caching import Cache
from flask_mongoengine import MongoEngine

cache = Cache()
db = MongoEngine()

def init_cache(app, cache):
    cache.init_app(app, config={
        'CACHE_TYPE': 'redis',
        'CACHE_REDIS_URL': app.config['CACHE_REDIS_URL'],
    })

def init_db(app, db):
    db.init_app(app)

def create_app(env='production'):
    configs = {
        'production': 'simple_api.config.ProductionConfig',
        'development': 'simple_api.config.DevelopmentConfig',
    }

    app = Flask(__name__)
    app.config.from_object(configs[env])

    init_cache(app, cache)
    init_db(app, db)

    from . import foo
    from . import bar
    app.register_blueprint(foo.bp)
    app.register_blueprint(bar.bp)

    @app.errorhandler(404)
    def page_not_found(exc):
        return f'Page not found: {request.path}', 404

    @app.route('/')
    def health():
        return 'OK'

    return app
# simple_api/foo/__init__.py
from flask import Blueprint

foo_bp = Blueprint('foo', __name__, url_prefix='/foo')

# modules are imported at the bottom to avoid errors due to circular dependencies
from . import endpoints, tasks
# app.py
import simple_api

app = simple_api.create_app()

@app.cli.command()
@click.argument('name')
def hello(name):
    print(f'Hello {name}')
$ FLASK_APP=app.py FLASK_ENV=development flask run

If you don't want to use Application Factory pattern, you could just initialize app and import it in your endpoints.py.

# simple_api/__init__.py
from flask import Flask

app = Flask(__name__)

# modules are imported at the bottom to avoid errors due to circular dependencies
from . import endpoints
# simple_api/endpoints.py
from . import app

@app.route('/')
def health():
    return 'OK'

folder structure
http://flask.pocoo.org/docs/1.0/tutorial/layout/
http://flask.pocoo.org/docs/1.0/tutorial/factory/
https://www.safaribooksonline.com/library/view/flask-web-development/9781491991725/ch07.html

application factory
http://flask.pocoo.org/docs/1.0/patterns/appfactories/
http://flask.pocoo.org/docs/1.0/patterns/packages/

blueprint
http://flask.pocoo.org/docs/1.0/tutorial/views/
http://flask.pocoo.org/docs/1.0/blueprints/

circular imports
http://flask.pocoo.org/docs/1.0/patterns/packages/#working-with-blueprints
https://www.safaribooksonline.com/library/view/flask-web-development/9781491991725/ch07.html#ch_large

Pipenv and Pipfile: The officially recommended Python packaging tool

Pipenv and Pipfile: The officially recommended Python packaging tool

You no longer need to use pip and virtualenv separately. Use pipenv instead.

ref:
https://docs.pipenv.org/

Install

$ pip install pipenv

ref:
https://github.com/pypa/pipenv

Usage

Caution: Pipenv is not compatible with Anaconda.

$ pyenv global 3.6.4

# initialize project virtualenv with a specific Python version
# automatically generate both Pipfile and Pipfile.lock from an existed requirements.txt
$ pipenv install --python python3

$ cd /path/to/project-contains-Pipfile
$ pipenv install

$ pipenv install pangu
$ pipenv install -r requirements.txt

# install packages to dev-packages
$ pipenv install --dev \
ipdb \
flake8 \
flake8-bandit \
flake8-bugbear \
flake8-comprehensions \
flake8-debugger \
flake8-print \
flake8-string-format \
pep8-naming \
pylint \
pylint-common \
pylint-celery \
pylint-django \
pylint-flask

# switch your shell environment to project virtualenv
$ pipenv shell
$ exit

# uninstall everything
$ pipenv uninstall --all

# remove project virtualenv
$ pipenv --rm

ref:
https://docs.pipenv.org/basics/
https://docs.pipenv.org/advanced/

Example Pipfile

[[source]]
url = "https://pypi.python.org/simple" 
verify_ssl = true 
name = "pypi" 

[packages] 
requests = ">=2.0.0" 

[dev-packages] 
flake8 = "*" 
ipdb = "*" 
ipython = "*" 
pylint = "*" 

[requires] 
python_version = "3.6"