Setup Celery with your Flask project

Setup Celery with your Flask project

Installation

$ pipenv install flask "celery[redis,gevent]"

ref:
http://docs.celeryproject.org/en/latest/index.html
https://github.com/celery/celery

Configuration

$ tree simple-api
simple-api
├── Dockerfile
├── Pipfile
├── Pipfile.lock
├── app.py
├── requirements.txt
└── simple_api
    ├── bar
    │   ├── __init__.py
    │   ├── endpoints.py
    │   └── tasks.py
    ├── foo
    │   ├── __init__.py
    │   ├── endpoints.py
    │   └── tasks.py
    ├── __init__.py
    └── tasks.py
# simple_api/celeryconfig.py
import os

broker_url = os.environ.get('CELERY_BROKER_URL') or 'redis://127.0.0.1:6379/0'
result_backend = os.environ.get('CELERY_RESULT_BACKEND') or 'redis://127.0.0.1:6379/1'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']

task_eager_propagates = True
task_ignore_result = True

timezone = 'UTC'
enable_utc = True
# simple_api/__init__.py
from celery import Celery
from flask import Flask

def make_celery(app):
    celery = Celery(app.import_name)
    celery.config_from_object('simple_api.celeryconfig')

    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

def create_app():
    app = Flask(__name__)
    app.config.from_object('simple_api.config')

    from . import tasks

    app.celery = make_celery(app)
    return app
# app.py
import simple_api

app = simple_api.create_app()
celery = app.celery

ref:
http://flask.pocoo.org/docs/1.0/patterns/celery/
http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#configuration
http://docs.celeryproject.org/en/latest/userguide/application.html
http://docs.celeryproject.org/en/latest/userguide/configuration.html

Tasks

# simple_api/tasks.py
import celery

@celery.shared_task()
def sleep(message, seconds=1):
    import time
    time.sleep(seconds)
    print(message)
    return seconds

ref:
http://docs.celeryproject.org/en/latest/userguide/tasks.html

# simple_api/whatever.py
from simple_api import tasks

tasks.sleep.delay('Hello World', seconds=5)

ref:
http://docs.celeryproject.org/en/latest/userguide/calling.html

Usage

# run web
$ FLASK_APP=app.py FLASK_ENV=development  flask run

# run workers
$ celery -A app:celery worker -P gevent --without-gossip -c 100 --prefetch-multiplier 1 -Ofair -l info

ref:
http://docs.celeryproject.org/en/latest/userguide/optimizing.html

Flask project structures

Flask project structures

Once you choose to follow Application Factory pattern which is officially recommended, the only place you could access app (the Flask() object) directly is inside create_app().

With the factory function, you are able to apply configurations dynamically which is particularly important for unit tests and CI.

$ tree simple-api
simple-api
├── Dockerfile
├── Pipfile
├── Pipfile.lock
├── app.py
├── requirements.txt
└── simple_api
    ├── bar
    │   ├── __init__.py
    │   ├── endpoints.py
    │   └── tasks.py
    ├── foo
    │   ├── __init__.py
    │   ├── endpoints.py
    │   └── tasks.py
    ├── __init__.py
    ├── config.py
    └── tasks.py
# simple_api/config.py
import os

class Config(object):
    SECRET_KEY = 'secret-key'

class ProductionConfig(Config):
    pass

class DevelopmentConfig(Config):
    pass
# simple_api/__init__.py
from flask import Flask, request
from flask_caching import Cache
from flask_mongoengine import MongoEngine

cache = Cache()
db = MongoEngine()

def init_cache(app, cache):
    cache.init_app(app, config={
        'CACHE_TYPE': 'redis',
        'CACHE_REDIS_URL': app.config['CACHE_REDIS_URL'],
    })

def init_db(app, db):
    db.init_app(app)

def create_app(env='production'):
    configs = {
        'production': 'simple_api.config.ProductionConfig',
        'development': 'simple_api.config.DevelopmentConfig',
    }

    app = Flask(__name__)
    app.config.from_object(configs[env])

    init_cache(app, cache)
    init_db(app, db)

    from . import foo
    from . import bar
    app.register_blueprint(foo.bp)
    app.register_blueprint(bar.bp)

    @app.errorhandler(404)
    def page_not_found(exc):
        return f'Page not found: {request.path}', 404

    @app.route('/')
    def health():
        return 'OK'

    return app
# simple_api/foo/__init__.py
from flask import Blueprint

foo_bp = Blueprint('foo', __name__, url_prefix='/foo')

# modules are imported at the bottom to avoid errors due to circular dependencies
from . import endpoints, tasks
# app.py
import simple_api

app = simple_api.create_app()

@app.cli.command()
@click.argument('name')
def hello(name):
    print(f'Hello {name}')
$ FLASK_APP=app.py FLASK_ENV=development flask run

If you don't want to use Application Factory pattern, you could just initialize app and import it in your endpoints.py.

# simple_api/__init__.py
from flask import Flask

app = Flask(__name__)

# modules are imported at the bottom to avoid errors due to circular dependencies
from . import endpoints
# simple_api/endpoints.py
from . import app

@app.route('/')
def health():
    return 'OK'

folder structure
http://flask.pocoo.org/docs/1.0/tutorial/layout/
http://flask.pocoo.org/docs/1.0/tutorial/factory/
https://www.safaribooksonline.com/library/view/flask-web-development/9781491991725/ch07.html

application factory
http://flask.pocoo.org/docs/1.0/patterns/appfactories/
http://flask.pocoo.org/docs/1.0/patterns/packages/

blueprint
http://flask.pocoo.org/docs/1.0/tutorial/views/
http://flask.pocoo.org/docs/1.0/blueprints/

circular imports
http://flask.pocoo.org/docs/1.0/patterns/packages/#working-with-blueprints
https://www.safaribooksonline.com/library/view/flask-web-development/9781491991725/ch07.html#ch_large

碼天狗週刊 第 130 期 @vinta - Programming, Code Review, Scrum, MongoDB, NoSQL, IPFS

碼天狗週刊 第 130 期 @vinta - Programming, Code Review, Scrum, MongoDB, NoSQL, IPFS

本文同步發表於 CodeTengu Weekly - Issue 130

Write code that's easy to delete, and easy to debug too.

Write code that is easy to delete, not easy to extend. 的最新續篇,同樣非常值得一讀。而且就像作者說的,programming is terrible

延伸閱讀:

Dimagi's Code Review Policies

在 code review 的時候應該明確地區分出 Merge blockerI personally prefer A over B, but no strong opinion. 這兩種反饋。

在 code review 中發現的問題,如果是會顯著地影響程式碼品質、效能或是可讀性,那的確是需要被修正;但是如果 code review 到後來,雙方一來一往都只是在修改一些無關痛癢的小問題時,reviewer 應該問問自己是不是陷入完美主義的陷阱裡了。在大多數的情況下,其實完全可以先 merge 並部署上線,確認測試完畢之後,再請該作者另外開一個新的 pull request 進行那些優先權沒有那麼高(但是你實在看不順眼)的修改。畢竟曠日費時的 code review 對整個團隊的效率和士氣都不會有什麼正面幫助,尤其當你有同事處在不同的時區或是 remote 工作時。

不過我指的不是 coding style 這種層面的修改,不應該浪費寶貴的 code review 時間在那些程式就可以檢查出來的問題,編輯器 linter 或是 pre-commit-hooks 都是好東西,再不然在 CI 做 auto-format 也行。

其他 curator 也都分享過不少關於 Code Review 的文章,例如:

Why Scrum is the Wrong Way to Build Software

標題都說得這麼明白了,大家自己感受一下,有什麼想說的話不要憋在心裡。

Specify arrayFilters for Array Update Operations in MongoDB

雖然之前一直在嫌棄 MongoDB,但是它的 aggregation 功能其實挺強大的(即便寫起來有點惱人)。除此之外,MongoDB 3.6 才引入的新特性 arrayFilters 也是把 MongoDB 對 array 的更新操作強化了不少,甚至還支援好幾層的 nested array。

IPFS: The (Very Slow) Distributed Permanent Web

前陣子終於有機會玩了一下 IPFS (InterPlanetary File System),也順手把使用方式和在過程中遇到的問題都記錄下來了,跟大家分享一下。不過說起來,這個技術在好幾年前就開始發展了,但是最近實際試用了一陣子發現還是非常不成熟啊,至少跟他們當初描繪的那個願景相比有很大一段距離。

外行員的海賊王專題

最近睡覺前都在看 YouTuber 講海賊王,忍不住跟大家分享一下,這個男人真的是用生命在看海賊王的。他對漫畫細節的考據和腦補的能力,完全會讓你懷疑你們看的究竟是不是同一部漫畫。

@vinta 分享!

每步都有伏笔!说说《战神》中那些值得注意的剧情和细节

如果你也有玩戰神,一定要看看這篇文章!當然,內含大量劇透~

是說前陣子才玩完戰神,現在接著玩 Detroit: Become Human,反而覺得有點索然無味啊......

@vinta 分享!