Setup Celery with your Flask project

Setup Celery with your Flask project

Installation

$ pipenv install flask "celery[redis,gevent]"

ref:
http://docs.celeryproject.org/en/latest/index.html
https://github.com/celery/celery

Configuration

$ tree simple-api
simple-api
├── Dockerfile
├── Pipfile
├── Pipfile.lock
├── app.py
├── requirements.txt
└── simple_api
    ├── bar
    │   ├── __init__.py
    │   ├── endpoints.py
    │   └── tasks.py
    ├── foo
    │   ├── __init__.py
    │   ├── endpoints.py
    │   └── tasks.py
    ├── __init__.py
    └── tasks.py
# simple_api/celeryconfig.py
import os

broker_url = os.environ.get('CELERY_BROKER_URL') or 'redis://127.0.0.1:6379/0'
result_backend = os.environ.get('CELERY_RESULT_BACKEND') or 'redis://127.0.0.1:6379/1'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']

task_eager_propagates = True
task_ignore_result = True

timezone = 'UTC'
enable_utc = True
# simple_api/__init__.py
from celery import Celery
from flask import Flask

def make_celery(app):
    celery = Celery(app.import_name)
    celery.config_from_object('simple_api.celeryconfig')

    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

def create_app():
    app = Flask(__name__)
    app.config.from_object('simple_api.config')

    from . import tasks

    app.celery = make_celery(app)
    return app
# app.py
import simple_api

app = simple_api.create_app()
celery = app.celery

ref:
http://flask.pocoo.org/docs/1.0/patterns/celery/
http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#configuration
http://docs.celeryproject.org/en/latest/userguide/application.html
http://docs.celeryproject.org/en/latest/userguide/configuration.html

Tasks

# simple_api/tasks.py
import celery

@celery.shared_task()
def sleep(message, seconds=1):
    import time
    time.sleep(seconds)
    print(message)
    return seconds

ref:
http://docs.celeryproject.org/en/latest/userguide/tasks.html

# simple_api/whatever.py
from simple_api import tasks

tasks.sleep.delay('Hello World', seconds=5)

ref:
http://docs.celeryproject.org/en/latest/userguide/calling.html

Usage

# run web
$ FLASK_APP=app.py FLASK_ENV=development  flask run

# run workers
$ celery -A app:celery worker -P gevent --without-gossip -c 100 --prefetch-multiplier 1 -Ofair -l info

ref:
http://docs.celeryproject.org/en/latest/userguide/optimizing.html

Run a Celery task at a specific time

Run a Celery task at a specific time

Schedule Tasks

You are able to run any Celery task at a specific time through eta (means "Estimated Time of Arrival") parameter.

import datetime

import celery

@celery.shared_task(bind=True)
def add_tag(task, user_id, tag):
    User.objects.filter(id=user_id, tags__ne=tag).update(push__tags=tag)
    return True

user_id = '582ee32a5b9c861c87dc297e'
tag = 'new_tag'
started_at = datetime.datetime(2018, 3, 12, tzinfo=datetime.timezone.utc)
add_tag.apply_async((user_id, tag), eta=started_at)

ref:
http://docs.celeryproject.org/en/master/userguide/calling.html#eta-and-countdown

Revoke Tasks

Revoked tasks will be discarded until their eta.

from celery.result import AsyncResult

AsyncResult(task_id).revoke()

ref:
http://docs.celeryproject.org/en/latest/reference/celery.result.html#celery.result.AsyncResult.revoke

Revoking tasks works by sending a broadcast message to all the workers, the workers then keep a list of revoked tasks in memory. When a worker starts up it will synchronize revoked tasks with other workers in the cluster.

The list of revoked tasks is in-memory so if all workers restart the list of revoked ids will also vanish. If you want to preserve this list between restarts you need to specify a file for these to be stored in by using the –statedb argument to celery worker.

ref:
http://docs.celeryproject.org/en/latest/userguide/workers.html#worker-persistent-revokes