Write next generation JavaScript with Babel 7

Write next generation JavaScript with Babel 7

You write the next generation JavaScript code (ES6 or ES2018!) and using Babel to convert them to ES5. Even more, with the new @babel/preset-env module, it is able to intellectually convert your next generation ECMAScript code to compatible syntax based on browser compatibility statistics. So you don't have to target specific browser versions anymore!

ref:
https://babeljs.io/
https://babeljs.io/docs/en/babel-preset-env

There is a real-world project with proper configurations. The following article is based on this project.
https://github.com/vinta/pangu.js

Babel

$ npm install \
@babel/core \
@babel/cli \
@babel/preset-env \
--save-dev

ref:
https://babeljs.io/setup

// babel.config.js
module.exports = function(api) {
    api.cache(false);
    return {
        presets: [
            "@babel/preset-env"
        ],
        comments: false
    };
};

ref:
https://babeljs.io/docs/en/configuration

It is also recommended to put common commands in the scripts section of the package.json file.

// package.json
{
    ...
    "scripts": {
        "clear:shared": "rm -rf ./dist/shared/",
        "clear:browser": "rm -rf ./dist/browser/",
        "clear:node": "rm -rf ./dist/node/",
        "clear": "npm run clear:shared && npm run clear:browser && npm run clear:node",
        "build:shared": "npm run clear:shared && babel src/shared/ -d dist/shared/",
        "build:browser": "npm run clear:browser && webpack",
        "build:node": "npm run clear:node && babel src/node/ -d dist/node/",
        "build": "npm run build:shared && npm run build:browser && npm run build:node",
    },
    ...
}
$ npm run build:node

ref:
https://babeljs.io/docs/en/babel-cli/

Webpack

$ npm install \
webpack \
webpack-cli \
babel-loader \
terser-webpack-plugin \
--save-dev
// webpack.config.js
var _ = require('underscore');
var fs = require('fs');
var path = require('path');
var TerserPlugin = require('terser-webpack-plugin');
var webpack = require('webpack');

var packageInfo = require('./package.json');

var entryPath = './src/browser/pangu.js';

module.exports = {
  target: 'web',
  // mode: 'development',
  mode: 'production',
  entry: {
    'pangu': entryPath,
    'pangu.min': entryPath
  },
  output: {
    path: path.resolve(__dirname, 'dist/browser/'),
    filename: '[name].js',
    library: 'pangu',
    libraryTarget: 'umd',
    umdNamedDefine: true
  },
  module: {
    rules: [
      {
        test: /\.js$/,
        exclude: /node_modules|node/,
        use: {
          loader: 'babel-loader',
          options: {
            babelrc: false,
            presets: [
              [
                "@babel/preset-env",
                {
                  "modules": "umd"
                }
              ]
            ]
          }
        }
      }
    ]
  },
  devtool: false,
  optimization: {
    minimizer: [
      new TerserPlugin({
        include: /\.min\.js$/
      })
    ],
  },
}

ref:
https://webpack.js.org/configuration/

@babel/preset-env transpiles your files to commonjs by default, which requires the transpiled files to be included by require or import. To make this compatible with your Chrome extension, you need to transpile the files as umd module.

ref:
https://stackoverflow.com/questions/52929562/babel-7-uncaught-referenceerror-after-transpiling-a-module

$ nom run build:browser

Karma

$ npm install \
@babel/register \
karma-babel-preprocessor \
karma-chrome-launcher \
karma-coverage \
karma-mocha \
karma-mocha-reporter \
puppeteer \
chai \
--save-dev
// karma.conf.js
module.exports = function(config) {
  config.set({
    frameworks: [
      'mocha'
    ],
    browsers: [
      'ChromeHeadless'
    ],
    files: [
      'node_modules/chai/chai.js',
      'dist/browser/pangu.js',
      'test/browser/*.js',
    ],
    preprocessors: {
        'dist/browser/pangu.js': ['coverage'],
    },
    reporters: [
      'mocha',
      'coverage'
    ],
    singleRun: true,
    coverageReporter: {
      type: 'lcov',
      subdir: '.'
    },
  });
};

ref:
https://karma-runner.github.io/3.0/config/configuration-file.html

$ nom run test:browser
MongoDB Change Stream: react to real-time data changes

MongoDB Change Stream: react to real-time data changes

What is Change Stream?

Change Stream is a Change Data Capture (CDC) feature provided by MongoDB since v3.6. In layman's terms, it's a high-level API that allows you to subscribe to real-time notifications whenever there is a change in your MongoDB collections, databases, or the entire cluster, in an event-driven fashion.

Change Stream uses information stored in the oplog (operations log) to produce the change event. The oplog.rs is a special capped collection that keeps a rolling record of all insert, update, and remove operations that come into your MongoDB so other members of the Replica Set can copy them. Since Change Stream is built on top of the oplog, it is only available for Replica Sets and Sharded clusters.

The problem with most databases' replication logs is that they have long been considered to be an internal implementation detail of the database, not a public API (Martin Kleppmann, 2017).

Change Stream comes to rescue!

Change Stream in a Sharded cluster

MongoDB has a global logical clock that enables the server to order all changes across a Sharded cluster.

To guarantee total ordering of changes, for each change notification the mongos checks with each shard to see if the shard has seen more recent changes. Sharded clusters with one or more shards that have little or no activity for the collection, or are "cold", can negatively affect the response time of the change stream as the mongos must still check with those cold shards to guarantee total ordering of changes.

References:

What can Change Stream do?

There are some typical use cases of Change Stream:

  • Syncing fields between the source and denormalized collections to mitigate the data consistency issue.
  • Invalidating the cache.
  • Updating the search index.
  • Replicating data to a data warehouse.
  • Hooking up Change Stream to a generic streaming processing pipeline, e.g., Kafka or Spark Streaming.

How to open a Change Stream?

First of all, you must have a Replica Set or a Shared cluster for your MongoDB deployment and make sure you are using WiredTiger storage engine. If you don't, you might use MongoDB all wrong.

All code samples below are written in Node.js.

const { MongoClient, ReadPreference } = require('mongodb');

const MONGO_URL = 'mongodb://127.0.0.1:27017/';

(async () => {
    const mongoClient = await MongoClient.connect(MONGO_URL, {
        appname: 'test',
        readPreference: ReadPreference.PRIMARY,
        useNewUrlParser: true,
    });
    const db = await mongoClient.db('test');
    const changeStream = db.collection('user').watch([], {'fullDocument': 'updateLookup'});

    changeStream.on('change', (event) => {
        console.log(event);
    });
})();

You could also enable 'fullDocument': 'updateLookup' which includes the entire document in each update event, but as the name says, it does a lookup which has an overhead and might exceed the 16MB limitation on BSON documents.

Also, the content of fullDocument may differ from the updateDescription if other majority-committed operations modified the document between the original update operation and the full document lookup. Be cautious when you use it.

References:

  • Change Events
    • Besides regular insert, update, and delete, there is also a replace event which triggered by a update operation.

How to aggregate Change Stream events?

One of the advantages of Change Stream is that you are able to leverage MongoDB's powerful aggregation framework - allowing you to filter and modify the output of Change Stream.

However, there is a tricky part in update events, field names and their contents in the updateDescription.updatedFields might vary if the updated field is an array field. Assuming that we have a tags field which is a list of strings in the user collection. You could try running following code in the mongo shell:

  • $addToSet produces complete items of the array field
  • $push produces only the inserted item of the array field
  • $pull produces complete items of the array field
var userId = ObjectId();
db.getCollection('user').insert({
    "_id" : userId,
    "username" : "vinta",
    "tags" : ["tag1"]
});

db.getCollection('user').updateOne({_id: userId}, {
    '$addToSet': {'tags': 'tag2'},
});
// the change event output would look like:
// {
//     ...
//     "operationType": "update",
//     "updateDescription": {
//         "updatedFields": {
//             "tags": ["tag1", "tag2"]
//         }
//     }
//     ...
// }

db.getCollection('user').updateOne({_id: userId}, {
    '$push': {'tags': 'tag3'},
});
// the change event output would look like:
// {
//     ...
//     "operationType": "update",
//     "updateDescription": {
//         "updatedFields": {
//             "tags.2": "tag3"
//         }
//     }
//     ...
// }

db.getCollection('user').updateOne({_id: userId}, {
    '$pull': {'tags': 'tag1'},
});
// the change event output would look like:
// {
//     ...
//     "operationType": "update",
//     "updateDescription": {
//         "updatedFields": {
//             "tags": ["tag2", "tag3"]
//         }
//     }
//     ...
// }

Fortunately, to mitigate the tags and tags.2 problem, we could do some aggregation to $project and $match change events if we only want to listen to the change of the tags field:

const pipeline = [
    {'$project': {
        '_id': 1,
        'operationType': 1,
        'documentKey': 1,
        'changedDocument': {
            '$objectToArray': {
                '$mergeObjects': ['$updateDescription.updatedFields', '$fullDocument'],
            },
        },
        'removedFields': '$updateDescription.removedFields',
    }},
    {'$match': {
        '$or': [
            {'changedDocument.k': /^tags$/},
            {'changedDocument.k': /^tags./},
            {'removedFields': {'$in': ['tags']}},
            {'operationType': 'delete'},
        ],
    }},
    {'$addFields': {
        'changedDocument': {'$arrayToObject': '$changedDocument'},
    }},
];
const changeStream = db.collection('user').watch(pipeline, {});

References:

How to resume a Change Stream?

Another critical feature of Change Stream is Resumability. Since any service will inevitably get restarted or crashed, it is essential that we can resume from the point of time that Change Stream was interrupted.

There are two options in watch() we can use:

  • resumeAfter: A resume token from any change event.
  • startAtOperationTime: A starting timestamp for Change Stream.

resumeAfter

Before using resumeAfter token, there is MongoDB configuration you might need to tackle with, FeatureCompatibilityVersion.

db.adminCommand({getParameter: 1, featureCompatibilityVersion: 1});
db.adminCommand({setFeatureCompatibilityVersion: "4.0"});

A resumeAfter token is carried by every Change Stream event: the _id field whose value looks like {'_data': '825C4607870000000129295A1004AF1EE5355B7344D6B25478700E75259D46645F696400645C42176528578222B13ADEAA0004'}. In other words, the {'_data': 'a hex string'} is your resumeAfter token.

In practice, you should store each resumeAfter token somewhere, for instance, Redis, so that you can resume from a blackout or a restart. It is also a good idea to wrap the store function with a debounced functionality.

Another unusual (and not so reliable) way to get a resumeAfter token is composing one from the oplog.rs collection:

const _ = require('lodash');
const { MongoClient, ReadPreference } = require('mongodb');

const MONGO_URL = 'mongodb://127.0.0.1:27017/';

(async () => {
    const mongoClient = await MongoClient.connect(MONGO_URL, {
        appname: 'test',
        replicaSet: 'rs0',
        readPreference: ReadPreference.PRIMARY,
        useNewUrlParser: true,
    });

    // cannot use 'local' database through mongos
    const localDb = await mongoClient.db('local');

    // querying oplog.rs might take seconds
    const doc = await localDb.collection('oplog.rs')
        .findOne(
            {'ns': 'test.user'}, // dbName.collectionName
            {'sort': {'$natural': -1}},
        );

    // https://stackoverflow.com/questions/48665409/how-do-i-resume-a-mongodb-changestream-at-the-first-document-and-not-just-change
    // https://github.com/mongodb/mongo/blob/master/src/mongo/db/storage/key_string.cpp
    // https://github.com/mongodb/mongo/blob/master/src/mongo/bson/bsontypes.h
    const resumeAfterData = [
        '82', // unknown
        doc.ts.toString(16), // timestamp
        '29', // unknown
        '29', // unknown
        '5A', // CType::BinData
        '10', // length (16)
        '04', // BinDataType of newUUID
        doc.ui.toString('hex'), // the collection uuid (see `db.getCollectionInfos({name: 'user'})`)
        '46', // CType::Object
        '64', // CType::OID (vary from the type of the collection primary key)
        '5F', // _ (vary from the field name of the collection primary key)
        '69', // i
        '64', // d
        '00', // null
        '64', // CType::OID (vary from the type of document primary key)
        _.get(doc, 'o2._id', _.get(doc, 'o._id')).toString('hex'), // ObjectID, update operations have `o2` field and others have `o` field
        '00', // null
        '04', // unknown
    ].join('').toUpperCase();

    const options = {
        'resumeAfter': {
            '_data': resumeAfterData,
        },
    };
    console.log(options);

    const db = await mongoClient.db('test');
    const changeStream = db.collection('user').watch([], options);

    changeStream.on('change', (event) => {
        console.log(event);
    });
})();

startAtOperationTime

The startAtOperationTime is only available in MongoDB 4.0+. It simply represents a starting point of time for the Change Stream. Also, you must make sure that the specified starting point is in the time range of the oplog if it is in the past.

The tricky part is that this option only accepts a MongoDB Timestamp object. You could also retrieve the latest timestamp directly from db.adminCommand({replSetGetStatus: 1}).

const { MongoClient, ReadPreference, Timestamp } = require('mongodb');

const MONGO_URL = 'mongodb://127.0.0.1:27017/';

(async () => {
    const mongoClient = await MongoClient.connect(MONGO_URL, {
        appname: 'test',
        readPreference: ReadPreference.PRIMARY,
        useNewUrlParser: true,
    });

    const options = {
        'startAtOperationTime': Timestamp(1, Date.now() / 1000),
    };
    console.log(options);

    const db = await mongoClient.db('test');
    const changeStream = db.collection('user').watch([], options);

    changeStream.on('change', (event) => {
        console.log(event);
    });
})();
MongoDB cookbook: Queries and Aggregations

MongoDB cookbook: Queries and Aggregations

Frequently accessed items are cached in memory, so that MongoDB can provide optimal response time.

MongoDB Shell in JavaScript

Administration

db.currentOp();

// slow queries
db.currentOp({
    "active": true,
    "secs_running": {"$gt" : 3},
    "ns": /^test\./
});

// queries not using any index
db.adminCommand({ 
    "currentOp": true,
    "op": "query", 
    "planSummary": "COLLSCAN"
});

// operations with high numYields
db.adminCommand({ 
    "currentOp": true, 
    "ns": /^test\./, 
    "numYields": {"$gte": 100} 
}) 

db.serverStatus().connections
{
    "current" : 269,
    "available" : 838591,
    "totalCreated" : 417342
}

ref:
https://docs.mongodb.com/manual/reference/method/db.currentOp/
https://hackernoon.com/mongodb-currentop-18fe2f9dbd68
http://www.mongoing.com/archives/6246

BSON Types

ref:
https://docs.mongodb.com/manual/reference/bson-types/

Check If A Document Exists

It is significantly faster to use find() + limit() because findOne() will always read + return the document if it exists. find() just returns a cursor (or not) and only reads the data if you iterate through the cursor.

db.getCollection('message').find({_id: ObjectId("585836504b287b5022a3ae26", delivered: false)}, {_id: 1}).limit(1)

ref:
https://stackoverflow.com/questions/8389811/how-to-query-mongodb-to-test-if-an-item-exists
https://blog.serverdensity.com/checking-if-a-document-exists-mongodb-slow-findone-vs-find/

Find Documents

db.getCollection('user').find({username: 'nanababy520'})

db.getCollection('message').find({_id: ObjectId("5a6383b8d93d7a3fadf75af3")})

db.getCollection('message').find({_cls: 'Message'}).sort({posted_at: -1})

db.getCollection('message').find({sender: ObjectId("57aace67ac08e72acc3b265f"), pricing: {$ne: 0}})

db.getCollection('message').find({
    sender: ObjectId("5ac0f56038cfff013a123d85"),
    created_at: {
        $gte: ISODate('2018-04-21 12:00:00Z'),
        $lte: ISODate('2018-04-21 13:00:00Z')
    }
})
.sort({created_at: -1})

Find Documents With Regular Expression

db.getCollection('user').find({'username': /vicky/})

ref:
https://docs.mongodb.com/manual/reference/operator/query/regex/

Find Documents With An Array Field

  • $in: [...] means "intersection" or "any element in"
  • $all: [...] means "subset" or "contain"
  • $elemMatch: {...} means "any element match"
  • $not: {$elemMatch: {$nin: [...]}} means "subset" or "in"

The last one roughly means not any([False, False, False, False]) where each False is indicating if the item is not in in [...].

ref:
https://stackoverflow.com/questions/12223465/mongodb-query-subset-of-an-array

db.getCollection('message').find({includes: ObjectId("5a4bb448af9c462c610d0cc7")})

db.getCollection('user').find({gender: 'F', tags: 'promoted'})
db.getCollection('user').find({gender: 'F', 'tags.1': {$exists: true}})

ref:
https://docs.mongodb.com/manual/reference/operator/query/exists/#exists-true

Find Documents With An Array Field Of Embedded Documents

Usually, you could use $elemMatch.

{'the_array_field': {'$elemMatch': {
    'a_field_of_each_element': {'$lte': now},
    'another_field_of_each_element': 123
}}}
db.getCollection('message').find({
    unlocks: {
        $elemMatch: {
            _cls: 'PointsUnlock',
            user: ObjectId("57f662e727a79d07993faec5")
        }
    }
})

db.getCollection('feature.shop.product').find({
    purchases: {
        $elemMatch: {
            _cls: 'Purchase'
        }
    }
})

db.getCollection('feature.shop.product').find({
    '_id': 'prod_CWlSTXBEU4mhEu',
    'purchases': {'$not': {'$elemMatch': {
        '_cls': 'DirectPurchase',
        'user': ObjectId("58b61d9094ab56f912ba10a5")
    }}},
})

ref:
https://docs.mongodb.com/manual/reference/operator/query/elemMatch/

Find Documents With Existence Of Fields Or Values

  • .find({'field': {'$exists': true}}): the field exists
  • .find({'field': {'$exists': false}}): the field does not exist
  • .find({'field': {'$type': 10}}): the field exists with a null value
  • .find({'field': null}): the field exists with a null value or the field does not exist
  • .find({'field': {'$ne': null}}): the field exists and the value is not null
  • .find({'array_field': {'$in': [null, []]}})
db.test.insert({'num': 1, 'check': 'value'})
db.test.insert({'num': 2, 'check': null})
db.test.insert({'num': 3})

db.test.find({});

db.test.find({'check': {'$exists': true}})
// return 1 and 2

db.test.find({'check': {'$exists': false}})
// return 3

db.test.find({'check': {'$type': 10}});
// return 2

db.test.find({'check': null})
// return 2 and 3

db.test.find({'check': {'$ne': null}});
// return 1

ref:
https://stackoverflow.com/questions/4057196/how-do-you-query-this-in-mongo-is-not-null
https://docs.mongodb.com/manual/tutorial/query-for-null-fields/

Find Documents Where An Array Field Does Not Contain A Certain Value

db.getCollection('user').update({_id: ObjectId("579994ac61ff217f96a585d9"), tags: {$ne: 'tag_to_add'}}, {$push: {tags: 'tag_to_add'}})

db.getCollection('user').update({_id: ObjectId("579994ac61ff217f96a585d9"), tags: {$nin: ['tag_to_add']}}, {$push: {tags: 'tag_to_add'}})

ref:
https://stackoverflow.com/questions/16221599/find-documents-with-arrays-not-containing-a-document-with-a-particular-field-val

Find Documents Where An Array Field Is Not Empty

db.getCollection('message').find({unlocks: {$exists: true}})

ref:
https://stackoverflow.com/questions/14789684/find-mongodb-records-where-array-field-is-not-empty

Find Documents Where An Array Field's Size Is Greater Than 1

db.getCollection('user.inbox').find({
    'messages.0': {'$exists': true}
})

db.getCollection('message').find({
    '_cls': 'Message',
    'unlocks.10': {'$exists': true}
}).sort({'posted_at': -1})

db.getCollection('message').find({
    '_cls': 'Message.ChatMessage',
    'sender': ObjectId("582ee32a5b9c861c87dc297e"),
    'unlocks': {'$exists': true, '$not': {'$size': 0}}
})

ref:
https://stackoverflow.com/questions/7811163/query-for-documents-where-array-size-is-greater-than-1/15224544

Find Documents With Computed Values Using $expr

For instance, compare 2 fields from a single document in a find() query.

db.getCollection('user').find({
    $expr: {
        $eq: [{$size: '$follows'}, {$size: '$blocks'}]
    }
})

ref:
https://thecodebarbarian.com/a-nodejs-perspective-on-mongodb-36-lookup-expr
https://dzone.com/articles/expressive-query-language-in-mongodb-36-2

Project A Subset Of An Array Field With $filter

A sample document:

{
    "_id" : "message_unlock_pricing",
    "seed" : 42,
    "distributions" : {
        "a" : 0.5,
        "b" : 0.5
    },
    "whitelist" : [ 
        {
            "_id" : ObjectId("57dd071dd20fc40c0cbed6b7"),
            "variation" : "a"
        }, 
        {
            "_id" : ObjectId("5b1173a1487fbe2b2e9bba04"),
            "variation" : "b"
        }, 
        {
            "_id" : ObjectId("5a66d5c2af9c462c617ce552"),
            "variation" : "b"
        }
    ]
}
var now = new Date();

db.getCollection('feature.ab.experiment').aggregate([
    {'$project': {
        '_id': 1,
        'seed': 1,
        'distributions': 1,
        'whitelist': {
            '$filter': {
               'input': {'$ifNull': ["$whitelist", []]},
               'as': "user",
               'cond': {'$eq': ['$$user._id', ObjectId("5a66d5c2af9c462c617ce552")]}
            }
         }
    }},
    {'$unwind': {
        'path': '$whitelist',
        'preserveNullAndEmptyArrays': true
    }}
])

ref:
https://stackoverflow.com/questions/42607221/mongodb-aggregation-project-check-if-array-contains

Insert Documents

db.getCollection('feature.launch').insert({
    'url': '//example.com/launchs/5a06b88aaf9c462c6146ce12.jpg',
    'user': {
        'id': ObjectId("5a06b88aaf9c462c6146ce12"),
        'username': 'luke0804',
        'tags': ["gender:male"]
    }
})

db.getCollection('feature.launch').insert({
    'url': '//example.com/launchs/57c16f5bb811055b66d8ef46.jpg',
    'user': {
        'id': ObjectId("57c16f5bb811055b66d8ef46"),
        'username': 'riva',
        'tags': ["gender:female"]
    }
})

Update Within A For Loop

var oldTags = ['famous', 'newstar', 'featured', 'western', 'recommended', 'popular'];
oldTags.forEach(function(tag) {
    db.getCollection('user').updateMany({tags: tag}, {$addToSet: {tags: 'badge:' + tag}});
});

Update With Conditions Of Field Values

You could update the value of the field to a specified value if the specified value is less than or greater than the current value of the field. The $min and $max operators can compare values of different types.

Only set posted_at to current timestamp if its current value is None or absent.

Post.objects.update_one(
    {
        '_id': bson.ObjectId(post_id),
        'media.0': {'$exists': True},
        'title': {'$ne': None},
        'location': {'$ne': None},
        'gender': {'$ne': None},
        'pricing': {'$ne': None},
    },
    {
        '$min': {'posted_at': utils.utcnow()},
    },
)

ref:
https://docs.mongodb.com/manual/reference/operator/update/min/
https://docs.mongodb.com/manual/reference/operator/update/max/

Update An Array Field

Array update operators:

  • $: Acts as a placeholder to update the first element in an array for documents that matches the query condition.
  • $[]: Acts as a placeholder to update all elements in an array for documents that match the query condition.
  • $[<identifier>]: Acts as a placeholder to update elements in an array that match the arrayFilters condition.
  • $addToSet: Adds elements to an array only if they do not already exist in the set.
  • $push: Adds an item to an array.
  • $pop: Removes the first or last item of an array.
  • $pull: Removes all array elements that match a specified query.
  • $pullAll: Removes all matching values from an array.

ref:
https://docs.mongodb.com/manual/reference/operator/update-array/
http://docs.mongoengine.org/guide/querying.html#atomic-updates
http://thecodebarbarian.com/a-nodejs-perspective-on-mongodb-36-array-filters.html

Add an element in an array field.

user_id = '582ee32a5b9c861c87dc297e'
tag = 'my_tag'

updated = User.objects \
    .filter(id=user_id, tags__ne=tag) \
    .update_one(push__tags=tag)

updated = User.objects \
    .filter(id=user_id) \
    .update_one(add_to_set__schedules={
        'tag': tag,
         'nbf': datetime.datetime(2018, 6, 4, 0, 0),
        'exp': datetime.datetime(2019, 5, 1, 0, 0),
    })

Insert an element into an array at a certain position.

slot = 2
Post.objects.filter(id=post_id, media__id__ne=media_id).update_one(__raw__={
    '$push': {
        'media': {
            '$each': [{'id': bson.ObjectId(media_id)}],
            '$position': slot,
        }
    }
})

ref:
https://docs.mongodb.com/manual/reference/operator/update/position/
http://docs.mongoengine.org/guide/querying.html#querying-lists

Remove elements in an array field. It is also worth noting that update(pull__abc=xyz) always returns 1.

user_id = '582ee32a5b9c861c87dc297e'
tag = 'my_tag'

updated = User.objects \
    .filter(id=user_id) \
    .update_one(pull__tags=tag)

updated = User.objects \
    .filter(id=user_id) \
    .update_one(pull__schedules={'tag': tag})

Remove multiple embedded documents in an array field.

import bson

user_id = '5a66d5c2af9c462c617ce552'
tags = ['valid_tag_1', 'future_tag']

updated_result = User._get_collection().update_one(
    {'_id': bson.ObjectId(user_id)},
    {'$pull': {'schedules': {'tag': {'$in': tags}}}},
)
print(updated_result.raw_result)
# {'n': 1, 'nModified': 1, 'ok': 1.0, 'updatedExisting': True}

ref:
https://stackoverflow.com/questions/28102691/pullall-while-removing-embedded-objects

db.getCollection('feature.feeds').updateMany(
    {
        'aliases': {'$exists': true},
        'exp': {'$gte': ISODate('2019-03-21T00:00:00.000+08:00')},
        'items': {'$elemMatch': {'username': 'engine'}},
    },
    {
        '$pull': {
            'items': {'username': 'engine'},
        }
    }
);

ref:
https://docs.mongodb.com/manual/reference/operator/update/pull/

You could also use add_to_set to add an item to an array only if it is not in the list, which always returns 1 if filter() matches any document. However, you are able to set full_result=True to get detail updated result.

update_result = User.objects.filter(id=user_id).update_one(
    add_to_set__tags=tag,
    full_result=True,
)
# {'n': 1, 'nModified': 1, 'ok': 1.0, 'updatedExisting': True}

ref:
http://docs.mongoengine.org/guide/querying.html#atomic-updates

Update a multi-level nested array field. Yes, arrayFilters supports it.

ref:
https://docs.mongodb.com/manual/reference/operator/update/positional-filtered/
https://stackoverflow.com/questions/23577123/updating-a-nested-array-with-mongodb

Update an embedding document in an array field.

MessagePackProduct.objects \
    .filter(id='prod_CR1u34BIpDbHeo', skus__id='sku_CR23rZOTLhYprP') \
    .update(__raw__={
        '$set': {'skus.$': {'id': 'sku_CR23rZOTLhYprP', 'test': 'test'}}
    })

ref:
https://stackoverflow.com/questions/9200399/replacing-embedded-document-in-array-in-mongodb
https://docs.mongodb.com/manual/reference/method/db.collection.update/#db.collection.update

Update specific embedded documents with arrayFilters in an array field.

User data:

{
    "_id" : ObjectId("5a66d5c2af9c462c617ce552"),
    "username" : "gibuloto",
    "tags" : [
        "beta",
        "future_tag",
        "expired_tag"
    ],
    "schedules" : [
        {
            "tag" : "valid_tag",
            "nbf" : ISODate("2018-05-01T16:00:00.000Z"),
            "exp" : ISODate("2020-06-04T16:00:00.000Z")
        },
        {
            "tag" : "future_tag",
            "nbf" : ISODate("2020-01-28T16:00:00.000Z"),
            "exp" : ISODate("2020-12-14T16:00:00.000Z")
        },
        {
            "tag" : "expired_tag",
            "nbf" : ISODate("2016-02-12T16:00:00.000Z"),
            "exp" : ISODate("2016-04-21T16:00:00.000Z")
        }
    ],
}

It is worth noting that <identifier> in $arrayFilters can only contain lowercase alphanumeric characters.

import bson

user_id = '5a66d5c2af9c462c617ce552'
tags = ['from_past_to_future']

updated_result = User._get_collection().update_one(
    {'_id': bson.ObjectId(user_id)},
    {
        '$addToSet': {'tags': {'$each': tags}},
        '$unset': {'schedules.$[schedule].nbf': True},
    },
    array_filters=[{'schedule.tag': {'$in': [tag for tag in tags]}}],
)
print(updated_result.raw_result)
# {'n': 1, 'nModified': 1, 'ok': 1.0, 'updatedExisting': True}

ref:
https://docs.mongodb.com/master/reference/operator/update/positional-filtered/

Update An Array Field With arrayFilters

You should use arrayFilters as much as possible.

The syntax of arrayFilters would be:

db.collection.update(
   {<query selector>},
   {<update operator>: {'array.$[<identifier>].field': value}},
   {arrayFilters: [{<identifier>: <condition>}}]}
)
Inbox._get_collection().update_many(
    {'messages.id': message_id},
    {'$set': {'messages.$[message].tags': tags}},
    array_filters=[
        {'message.id': message_id},
    ],
)

ref:
https://docs.mongodb.com/manual/reference/operator/update/positional-filtered/

Insert an element into an array field at a certain position.

db.getCollection('feature.forums.post').update(
   { _id: ObjectId("5b3c6a9c8433b15569cae54e") },
   {
     $push: {
        media: {
           $each: [{
                "mimetype" : "image/jpeg",
                "url" : "https://example.com/posts/5adb795b47d057338abe8910.jpg",
                "presets" : {}
            }],
           $position: 1
        }
     }
   }
)

Or use explicit array index $set.

media_id = 'xxx'
media_slot = 0

Post.objects \
    .filter(id=post_id, **{f'media__{media_slot}__id__ne': media_id}) \
    .update_one(__raw__={'$set': {f'media.{media_slot}': {'id': media_id}}})

ref:
https://docs.mongodb.com/manual/reference/operator/update/position/

Set an array field to empty.

db.getCollection('message').update(
    {'tags': 'pack:joycelai-1'},
    {'$set': {'unlocks': []}},
    {'multi': true}
)

db.getCollection('feature.shop.product').update(
    {},
    {'$set': {'purchases': []}},
    {'multi': true}
)

ref:
https://docs.mongodb.com/manual/reference/method/db.collection.update/
https://docs.mongodb.com/manual/reference/operator/update/set/

Remove elements from an array field.

var userId = ObjectId("57985b784af4124063f090d3");

db.getCollection('user').update(
    {'follows.user': userId},
    {'$pull': {'follows': {'user': userId}}},
    {
        'multi': true,
    }
);

db.getCollection('message').update(
    {'_id': {'$in': [
        ObjectId('5aca1ffc4271ab1624787ec4'),
        ObjectId('5aca31ab93ef2936291c3dd4'),
        ObjectId('5aca33d9b5eaef04943c0d0b'),
        ObjectId('5aca34e7a48c543b07fb0a0f'),
        ObjectId('5aca272d93ef296edc1c3dee'),
        ObjectId('5aca342aa48c54306dfb0a21'),
        ObjectId('5aca20756bd01023a8cb02e9')
    ]}},
    {'$pull': {'tags': 'pack:prod_D75YlDMzcCiAw3'}},
    {'multi': true}
);

ref:
https://docs.mongodb.com/manual/reference/operator/update/pull/

Update A Dictionary Field

Set a key/value in a dictionary field.

tutorial.data = {
    "price_per_message": 1200,
    "inbox": []
}

new_inbox = [
    {
        "id": "5af118c598eacb528e8fb8f9",
        "sender": "5a13239eaf9c462c611510fc"
    },
    {
        "id": "5af1117298eacb212a8fb8e9",
        "sender": "5a99554be9a21d5ff38b8ca5"
    }
]
tutorial.update(set__data__inbox=new_inbox)

ref:
https://stackoverflow.com/questions/21158028/updating-a-dictfield-in-mongoengine

Upsert: Update Or Create

You must use upsert=true with uniquely indexed fields. If you don't need the modified document, you should just use update_one(field1=123, field2=456, upsert=True).

Additionally, remember that modify() always reloads the whole object even the original one only loads specific fields with only(). Try to avoid using document.DB_QUERY_METHOD(), and using User.objects.filter().only().modify() or User.objects.filter().update() when it is possible.

tag_schedule = TagSchedule.objects \
    .filter(user=user_id, tag='vip') \
    .modify(
        started_at=started_at,
        ended_at=ended_at,
        upsert=True
    )

user = User.objects \
    .filter(id=user.id, tutorials__buy_diamonds__version=None) \
    .modify(set__tutorials__buy_diamonds__version='v1')

updated = User.objects \
    .filter(user=user_id, tag=tag) \
    .update_one(
        push__followers=new_follower,
    )

ref:
https://docs.mongodb.com/manual/reference/method/db.collection.update/#update-with-unique-indexes
http://docs.mongoengine.org/apireference.html#mongoengine.queryset.QuerySet.modify
http://docs.mongoengine.org/apireference.html#mongoengine.queryset.QuerySet.update_one

Rename A Field

Simply rename a field with $rename.

db.getCollection('user').updateMany(
    {
        'phone_no': {'$exists': true},
        'social.phone-number.uid': {'$exists': false},
    },
    {'$rename': {
        'phone_no': 'social.phone-number.uid',
    }}
);

ref:
https://docs.mongodb.com/manual/reference/operator/update/rename/

Do some extra data converting and rename the field manually.

db.getCollection('user').aggregate([
    {'$match': {
        'twitter.id': {'$exists': true},
        'social.twitter.uid': {'$exists': false},
    }},
    {'$project': {
        'twitter_id': '$twitter.id',
        'twitter_id_str': {'$toString': '$twitter.id'},
    }},
]).forEach(function (document) {
    printjson({
        'id': document._id,
    });

    db.getCollection('user').updateOne(
      {
          'twitter.id': document.twitter_id,
          'social.twitter.uid': {'$exists': false},
      },
      {
          '$unset': {'twitter.id': true},
          '$set': {'social.twitter.uid': document.twitter_id_str}
      }
    )
})

Insert/Replace Large Amount Of Documents

const operations = contracts.map((contract) => {
    // TODO: should create a new contract if there is any change of the contract?
    // use MongoDB transaction to change the new one and old one
    return {
        'replaceOne': {
            'filter': {'settlement_datetime': currentSettlementMonth.toDate(), 'user': contract.user},
            'replacement': contract,
            'upsert': true,
        },
    };
});

db.collection('user.contract').bulkWrite(
    operations,
    {ordered: true},
    (bulkError, result) => {
        if (bulkError) {
            return next(bulkError, null);
        }

        logger.info('Finished importing all contracts');
        return next(null, result);
    },
);

Update Large Numbers Of Documents

Use Bulk.find.arrayFilters() and Bulk.find.update() together.

In Python:

import datetime

expiration_time = datetime.datetime.utcnow() - datetime.timedelta(hours=48)

bulk = Outbox._get_collection().initialize_unordered_bulk_op()

for outbox in Outbox.objects.only('id').filter(messages__posted_at__lt=expiration_time):
    bulk.find({'_id': outbox.id}).update_one({
        '$pull': {'messages': {
            'posted_at': {'$lt': expiration_time},
        }},
    })

try:
    results = bulk.execute()
except pymongo.errors.InvalidOperation as err:
    if str(err) != 'No operations to execute':
        raise err

In JavaScript:

const operations = docs.map((doc) => {
    logger.debug(doc, 'Revenue');

    const operation = {
        'updateOne': {
            'filter': {
                '_id': doc._id,
            },
            'update': {
                '$set': {
                    'tags': doc.contract.tags,
                },
            },
        },
    };
    return operation;
});

db.collection('user.revenue').bulkWrite(
    operations,
    {ordered: false},
    (bulkError, bulkResult) => {
        if (bulkError) {
            return next(bulkError, null);
        }

        logger.info(bulkResult, 'Saved tags');
        return next(null, true);
    },
);
});

ref:
https://docs.mongodb.com/manual/reference/method/Bulk/
https://docs.mongodb.com/manual/reference/method/Bulk.find.arrayFilters/

Of course, you could also update the same document with multiple operations. However, it does not make sense.

from pymongo import UpdateOne
import bson

def _operations():
    if title = payload.get('title'):
        yield UpdateOne({'_id': bson.ObjectId(post_id)}, {'$set': {'title': title}})

    if location = payload.get('location'):
        yield UpdateOne({'_id': bson.ObjectId(post_id)}, {'$set': {'location': location}})      

    if pricing = payload.get('pricing'):
        yield UpdateOne({'_id': bson.ObjectId(post_id)}, {'$set': {'pricing': pricing}})

    if description = payload.get('description'):
        yield UpdateOne({'_id': bson.ObjectId(post_id)}, {'$set': {'description': description}})

    UpdateOne(
        {
            '_id': bson.ObjectId(post_id),
            'media.0': {'$exists': True},
            'title': {'$ne': None},
            'location': {'$ne': None},
            'pricing': {'$ne': None},
            'posted_at': {'$eq': None},
        },
        {'$set': {'posted_at': utils.utcnow()}},
    )

operations = list(_operations())
result = Post._get_collection().bulk_write(operations, ordered=True)
print(result.bulk_api_result)

ref:
https://api.mongodb.com/python/current/examples/bulk.html

Remove items from an array field of documents.

var userId = ObjectId("57a42a779f22bb6bcc434520");

db.getCollection('user').update(
    {'follows.user': userId},
    {'$pull': {'follows': {'user': userId}}},
    {'multi': true}
)

ref:
https://stackoverflow.com/questions/33594397/how-to-update-a-large-number-of-documents-in-mongodb-most-effeciently

Remove Large Numbers Of Documents

in mongo shell:

var bulk = db.getCollection('feature.journal.v2').initializeUnorderedBulkOp()
bulk.find({}).remove()
bulk.execute()

// or

var bulk = db.getCollection('feature.journal.v2').initializeUnorderedBulkOp()
bulk.find({event: 'quest.rewarded'}).remove()
bulk.find({event: 'message.sent'}).remove()
bulk.execute()

ref:
https://docs.mongodb.com/manual/reference/method/Bulk.find.remove/#bulk-find-remove

MongoEngine In Python

ref:
http://docs.mongoengine.org/guide/index.html
http://docs.mongoengine.org/apireference.html

Define Collections

It seems every collection in MongoEngine must have a id field.

ref:
http://docs.mongoengine.org/guide/defining-documents.html

Define A Field With Default EmbeddedDocument

The behavior of setting an EmbeddedDocument class as default works differently with and without only().

class User(ab.models.ABTestingMixin, db.Document):
    class UserSettings(db.EmbeddedDocument):
        reply_price = db.IntField(min_value=0, default=500, required=True)
        preferences = db.ListField(db.StringField())

    email = db.EmailField(max_length=255)
    created_at = db.DateTimeField(default=utils.now)
    last_active = db.DateTimeField(default=utils.now)
    settings = db.EmbeddedDocumentField(UserSettings, default=UserSettings)

If the user does not have settings field in DB, here is the difference.

user = User.objects.get(username='gibuloto')
isinstance(user.settings, User.UserSettings) == True

user = User.objects.only('settings').get(username='gibuloto')
(user.settings is None) == True

user = User.objects.exclude('settings').get(username='gibuloto')
isinstance(user.settings, User.UserSettings) == True

Filter With Raw Queries

post = Post.objects \
    .no_dereference().only('posted_at') \
    .filter(__raw__={
        '_id': bson.ObjectId(post_id),
        'media.0': {'$exists': True},
        'title': {'$ne': None},
        'location': {'$ne': None},
        'gender': {'$ne': None},
        'pricing': {'$ne': None},
    }) \
    .modify(__raw__={'$min': {'posted_at': utils.utcnow()}}, new=True)

print(post.posted_at)

ref:
http://docs.mongoengine.org/guide/querying.html#raw-queries

Check If A Document Exists

Use .exists().

import datetime

now = datetime.datetime.now(datetime.timezone.utc)
if TagSchedule.objects.filter(user=user_id, tag=tag, started_at__gt=now).exists():
    return 'exists'

You have to use __raw__ if the field you want to query is a db.ListField(GenericEmbeddedDocumentField(XXX) field.

if MessagePackProduct.objects.filter(id=message_pack_id, __raw__={'purchases.user': g.user.id}).exists():
    return 'exists'

Upsert: Get Or Create

buy_diamonds = BuyDiamonds.objects.filter(user_id=user.id).upsert_one()

ref:
http://docs.mongoengine.org/apireference.html#mongoengine.queryset.QuerySet.upsert_one

Store Files On GridFS

# models.py
class User(db.Document):
    username = db.StringField()
    image = db.ImageField(collection_name='user.images')
# tasks.py
import bson
import gridfs
import mongoengine

@celery.shared_task(bind=True, ignore_result=True)
def gridfs_save(task, user_id, format='JPEG', raw_data: bytes=None, **kwargs):
    image_id = None

    if raw_data is None:
        user = User.objects.only('image').get(id=user_id)
        if user.image.grid_id:
            image_id, raw_data = user.image.grid_id, user.image.read()

    if not raw_data:
        return

    gf = gridfs.GridFS(mongoengine.connection.get_db(), User.image.collection_name)

    with io.BytesIO(raw_data) as raw_image:
        with Image.open(raw_image) as image:
            image = image.convert('RGB')
            with io.BytesIO() as buffer:
                image.save(buffer, format=format, quality=80, **kwargs)
                buffer.seek(0)
                grid_id = gf.put(buffer, format=format, width=image.width, height=image.height, thumbnail_id=None)

    # NOTE: If function was passed with raw_data, only override if ID is the same as the read
    query = mongoengine.Q(id=user_id)
    if image_id:
        query = query & mongoengine.Q(image=image_id)

    user = User.objects.only('image').filter(query).modify(
        __raw__={'$set': {'image': grid_id}},
        new=False,
    )

    def cleanup():
        # Delete the old image
        if user and user.image:
            yield user.image.grid_id

        # The user image was already changed before the scheduled optimization took place
        # Drop the optimized image
        if user is None and image_id:
            yield image_id

    gridfs_delete.apply_async(kwargs=dict(
        collection=User.image.collection_name,
        grid_ids=list(cleanup()),
    ))

@celery.shared_task(bind=True, ignore_result=True)
def gridfs_delete(task, collection, grid_ids):
    gf = gridfs.GridFS(mongoengine.connection.get_db(), collection)
    for grid_id in grid_ids:
        gf.delete(bson.ObjectId(grid_id))

ref:
http://docs.mongoengine.org/guide/gridfs.html

Store Datetime

MongoDB stores datetimes in UTC.

ref:
https://docs.mongodb.com/manual/reference/method/Date/

2-phase Commit

The easiest way to think about 2-phase commit is idempotency, i.e., if you run a update many times, the results would "be the same": initial -> pending -> applied -> done.

ref:
https://docs.mongodb.com/manual/tutorial/perform-two-phase-commits/

Aggregation Pipeline

  • $match: Filters documents.
  • $project: Modifies document fields.
  • $addFields: Adds or overrides document fields.
  • $group: Groups documents by fields.
  • $lookup: Joins another collection.
  • $replaceRoot: Promotes an embedded document field to the top level and replace all other fields.
  • $unwind: Expanses an array field into multiple documents along with original documents.
  • $facet: Processes multiple pipelines within one stage and output to different fields.

There are special system variables, for instance, $$ROOT, $$REMOVE, $$PRUNE, which you could use in some stages of the aggregation pipeline.

ref:
https://docs.mongodb.com/manual/reference/aggregation-variables/#system-variables

Return Date As Unix Timestamp

import datetime

def stages():
    yield {'$project': {
        'createdAt': {'$floor': {'$divide': [{'$subtract': ['$$created', datetime.datetime.utcfromtimestamp(0)]}, 1000]}},
    }}

try:
    docs = MessagePackProduct.objects.aggregate(*stages())
except StopIteration:
    docs = []
else:
    for doc in docs:
        print(doc)

ref:
https://stackoverflow.com/questions/39274311/convert-iso-date-to-timestamp-in-mongo-query

Match Multiple Conditions Which Store In An Array Fields

db.getCollection('feature.promotions').insert({
    "name": "TEST",
    "nbf": ISODate("2018-05-31 16:00:00.000Z"),
    "exp": ISODate("2018-06-30 15:59:00.001Z"),
    "positions": {
        "discover": {
            "urls": [
                "https://example.com/events/2018/jun/event1/banner.html"
            ]
        }
    },
    "requirements" : [
        {
            // users who like women and their app version is greater than v2.21
            "preferences" : [
                "gender:female"
            ],
            "version_major_min": 2.0,
            "version_minor_min": 21.0
        },
        {
            // female CPs
            "tags" : [
                "stats",
                "gender:female"
            ]
        }
    ]
});
import werkzeug

user_agent = werkzeug.UserAgent('hello-world/2.25.1 (iPhone; iOS 11.4.1; Scale/2.00; com.example.app; zh-tw)')
user_preferences = ['gender:female', 'gender:male']
user_tags = ['beta', 'vip']
user_platforms = [user_agent.platform]

def stages():
    now = utils.utcnow()

    yield {'$match': {
        '$and': [
            {'nbf': {'$lte': now}},
            {'exp': {'$gt': now}},
            {'requirements': {'$elemMatch': {
                'preferences': {'$not': {'$elemMatch': {'$nin': user_preferences}}},
                'tags': {'$not': {'$elemMatch': {'$nin': user_tags}}},
                'platforms': {'$not': {'$elemMatch': {'$nin': user_platforms}}},
                '$or': [
                    {'$and': [
                        {'version_major_min': {'$lte': user_agent.version.major}},
                        {'version_minor_min': {'$lte': user_agent.version.minor}},
                    ]},
                    {'$and': [
                        {'version_minor_min': {'$exists': False}},
                        {'version_minor_min': {'$exists': False}},
                    ]},
                ],
            }}},
        ],
    }}
    yield {'$project': {
        'name': True,
        'nbf': True,
        'exp': True,
        'positions': {'$objectToArray': '$positions'},
    }}
    yield {'$unwind': '$positions'}
    yield {'$sort': {
        'exp': 1,
    }}
    yield {'$project': {
        '_id': False,
        'name': True,
        'position': '$positions.k',
        'url': {'$arrayElemAt': ['$positions.v.urls', 0]},
        'startedAt': {'$floor': {'$divide': [{'$subtract': ['$nbf', constants.UNIX_EPOCH]}, 1000]}},
        'endedAt': {'$floor': {'$divide': [{'$subtract': ['$exp', constants.UNIX_EPOCH]}, 1000]}},
    }}
    yield {'$group': {
        '_id': '$position',
        'items': {'$push': '$$ROOT'},
    }}

try:
    docs = Promotion.objects.aggregate(*stages())
except StopIteration:
    docs = []
else:
    docs = list(docs)

ref:
https://docs.mongodb.com/manual/reference/operator/query/in/
https://docs.mongodb.com/manual/reference/operator/query/nin/
https://docs.mongodb.com/manual/reference/operator/aggregation/setIsSubset/

Do Distinct With $group

def stages():
    yield {'$match': {
        'tags': 'some_tag',
    }}
    yield {'$unwind': '$unlocks'}
    yield {'$replaceRoot': {'newRoot': '$unlocks'}}
    yield {'$match': {
        '_cls': 'MessagePackUnlock',
    }}
    yield {'$group': {
        '_id': '$user',
        'timestamp': {'$first': '$timestamp'},
    }}

for unlock in MessagePackMessage.objects.aggregate(*stages()):
    tasks.offline_purchase_pack.apply(kwargs=dict(
        user_id=unlock['_id'],
        message_pack_id=message_pack.id,
        timestamp=unlock['timestamp'],
    ))

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/group/

Slice Items In Each $group

import random

def stages():
    yield {'$match': {'tags': {'$regex': '^badge:'}}}
    yield {'$unwind': {'path': '$tags', 'includeArrayIndex': 'index'}}
    yield {'$match': {'tags': {'$regex': '^badge:'}}}
    yield {'$project': {'_id': True, 'tag': '$tags', 'index': {'$mod': ['$index', random.random()]}}}
    yield {'$sort': {'index': 1}}
    yield {'$group': {'_id': '$tag', 'users': {'$addToSet': '$_id'}}}
    yield {'$project': {'_id': True, 'users': {'$slice': ['$users', 1000]}}}

docs = User.objects.aggregate(*stages())
for doc in docs:
    badge, user_ids = doc['_id'], doc['users']

Collect Items With $group And $addToSet

User data:

{
    "_id" : ObjectId("5a66d5c2af9c462c617ce552"),
    "username" : "gibuloto",
    "tags" : [ 
        "beta"
    ],
    "schedules" : [ 
        {
            "tag" : "stats",
            "nbf" : ISODate("2018-02-01T16:00:00.000Z"),
            "exp" : ISODate("2018-08-12T16:00:00.000Z")
        }, 
        {
            "tag" : "vip",
            "nbf" : ISODate("2018-05-13T16:00:00.000Z"),
            "exp" : ISODate("2018-05-20T16:00:00.000Z")
        }
    ]
}
def stages():
    now = utils.utcnow()

    yield {'$match': {
        'schedules': {'$elemMatch': {
            'nbf': {'$lte': now},
            'exp': {'$gte': now}
        }}
    }}
    yield {'$unwind': '$schedules'}
    yield {'$match': {
        'schedules.nbf': {'$lte': now},
        'schedules.exp': {'$gte': now}
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'username': True,
        'tag': '$schedules.tag',
        'nbf': '$schedules.nbf',
        'exp': '$schedules.exp'
    }}
    yield {'$group': {
        '_id': '$id',
        'tags': {'$addToSet': '$tag'},
    }}

for user_tag_schedule in User.objects.aggregate(*stages()):
    print(user_tag_schedule)

# output:
# {'_id': ObjectId('579b9387b7af8e1fd1635da9'), 'tags': ['stats']}
# {'_id': ObjectId('5a66d5c2af9c462c617ce552'), 'tags': ['chat', 'vip']}

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/group/

Project A New Field Based On Whether Elements Exist In Another Array Field

Use $addFields with $cond.

def stages():
    user_preferences = g.user.settings.preferences or ['gender:female']
    yield {'$match': {
        'gender': {'$in': [prefix_gender.replace('gender:', '') for prefix_gender in user_preferences]}
    }}

    yield {'$addFields': {
        'isPinned': {'$cond': {
            'if': {'$in': [constants.tags.HIDDEN, '$badges']},
            'then': True,
            'else': False,
        }},
    }}
    yield {'$sort': {
        'isPinned': -1,
        'posted_at': -1,
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'author': '$author',
        'title': '$title',
        'location': '$location',
        'postedAt': {'$floor': {'$divide': [{'$subtract': ['$posted_at', constants.UNIX_EPOCH]}, 1000]}},
        'viewCount': '$view_count',
        'commentCount': {'$size': {'$ifNull': ['$comments', []]}},
        'badges': '$badges',
        'isPinned': '$isPinned',
    }}

try:
    results = Post.objects.aggregate(*stages()).next()
except StopIteration:
    return Response(status=http.HTTPStatus.NOT_FOUND)

ref:
https://stackoverflow.com/questions/16512329/project-new-boolean-field-based-on-element-exists-in-an-array-of-a-subdocument
https://docs.mongodb.com/manual/reference/operator/aggregation/project/
https://docs.mongodb.com/manual/reference/operator/aggregation/addFields/
https://docs.mongodb.com/manual/reference/operator/aggregation/cond/

Project And Filter Out Elements Of An Array With $filter

Elements in details might have no value field.

def stages():
    yield {'$match': {
        '_id': bson.ObjectId(post_id),
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'author': '$author',
        'title': '$title',
        'location': '$location',
        'postedAt': {'$floor': {'$divide': [{'$subtract': ['$posted_at', constants.UNIX_EPOCH]}, 1000]}},
        'viewCount': '$view_count',
        'commentCount': {'$size': '$comments'},
        'details': [
            {'key': 'gender', 'value': '$gender'},
            {'key': 'pricing', 'value': '$pricing'},
            {'key': 'lineId', 'value': {'$ifNull': ['$lineId', None]}},
            {'key': 'description', 'value': {'$ifNull': ['$description', None]}},
        ],
    }}
    yield {'$addFields': {
        'details': {
            '$filter': {
                'input': '$details',
                'as': 'detail',
                'cond': {'$ne': ['$$detail.value', None]},
            }
        }
    }}

try:
    post = next(Post.objects.aggregate(*stages()))
except StopIteration:
    return Response(status=http.HTTPStatus.NOT_FOUND)

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/filter/#exp._S_filter
https://docs.mongodb.com/manual/reference/operator/aggregation/addFields/

Project Specific Fields Of Elements Of An Array With $map

def stages():
    yield {'$match': {
        '_id': bson.ObjectId(post_id),
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'author': '$author',
        'title': '$title',
        'location': '$location',
        'postedAt': {'$floor': {'$divide': [{'$subtract': ['$posted_at', constants.UNIX_EPOCH]}, 1000]}},
        'viewCount': '$view_count',
        'commentCount': {'$size': '$comments'},
        'details': [
            {'key': 'gender', 'value': '$gender'},
            {'key': 'pricing', 'value': '$pricing'},
            {'key': 'lineId', 'value': {'$ifNull': ['$lineId', None]}},
            {'key': 'description', 'value': {'$ifNull': ['$description', None]}},
        ],
        'media': {
            '$map': {
                'input': '$media',
                'as': 'transcoded_media',
                'in': {
                    'mimetype': '$$transcoded_media.mimetype',
                    'dash': '$$transcoded_media.presets.dash',
                    'hls': '$$transcoded_media.presets.hls',
                    'thumbnail': '$$transcoded_media.thumbnail',
                }
            }
        },
    }}
    yield {'$addFields': {
        'details': {
            '$filter': {
                'input': '$details',
                'as': 'detail',
                'cond': {'$ne': ['$$detail.value', None]},
            }
        }
    }}

try:
    post = next(Post.objects.aggregate(*stages()))
except StopIteration:
    return Response(status=http.HTTPStatus.NOT_FOUND)

ref:
https://stackoverflow.com/questions/33831665/how-to-project-specific-fields-from-a-document-inside-an-array

Do Advanced $project With $let

If you find youself want to do $project twice to tackle some fields, you should use $let.

def stages():
    yield {'$match': {
        'purchases.user': g.user.id,
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'name': True,
        'image': {
            '$ifNull': [{'$arrayElemAt': ['$images', 0]}, None],
        },
        'purchasedAt': {
            '$let': {
                'vars': {
                    'purchase': {
                        '$arrayElemAt': [
                            {
                                '$filter': {
                                    'input': '$purchases',
                                    'as': 'purchase',
                                    'cond': {
                                        '$and': [
                                            {'$eq': ['$$purchase.user', g.user.id]},
                                        ],
                                    },
                                },
                            },
                            0,
                        ],
                    },
                },
                'in': '$$purchase.timestamp',
            },
        },
    }}

try:
    docs = MessagePackProduct.objects.aggregate(*stages())
except StopIteration:
    docs = []
else:
    for doc in docs:
        print(doc)

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/let/

Deconstruct An Array Field With $unwind And Query Them With $match

def stages():
    category_tag = 'category:user'
    currency = 'usd'
    platform = 'ios'

    yield {'$match': {
        'active': True,
        'tags': category_tag,
        'total': {'$gt': 0},
        'preview_message': {'$exists': True},
    }}
    yield {'$unwind': '$skus'}
    yield {'$match': {
        'skus.attributes.platform': platform,
        'skus.attributes.currency': currency,
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'name': True,
        'caption': True,
        'description': True,
        'image': {
            '$ifNull': [{'$arrayElemAt': ['$images', 0]}, None],
        },
        'sku': '$skus',
        'created_at': True,
        'is_purchased': {'$in': [g.user.id, {'$ifNull': ['$purchases.user', []]}]},
    }}
    yield {'$sort': {'is_purchased': 1, 'created_at': -1}}

try:
    docs = MessagePackProduct.objects.aggregate(*stages())
except StopIteration:
    docs = []
else:
    for doc in docs:
        print(doc)

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/match/
https://docs.mongodb.com/manual/reference/operator/aggregation/unwind/
https://docs.mongodb.com/manual/reference/operator/aggregation/project/

Query The First Element In An Array Field With $arrayElemAt And $filter

def stages():
    category_tag = 'category:user'
    currency = 'usd'
    platform = 'ios'

    yield {'$match': {
        'active': True,
        'tags': category_tag,
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'name': True,
        'caption': True,
        'description': True,
        'image': {
            '$ifNull': [{'$arrayElemAt': ['$images', 0]}, None],
        },
        'preview_message': True,
        'metadata': True,
        'created_at': True,
        'updated_at': True,
        'active': True,
        'sku': {
            '$ifNull': [
                {
                    '$arrayElemAt': [
                        {
                            '$filter': {
                                'input': '$skus',
                                'as': 'sku',
                                'cond': {
                                    '$and': [
                                        {'$eq': ['$$sku.currency', currency]},
                                        {'$eq': ['$$sku.attributes.platform', platform]},
                                    ]
                                }
                            },
                        },
                        0
                    ]
                },
                None
            ],
        },
        'tags': True,
        'total': True,
        'is_bought': {'$in': [g.user.id, {'$ifNull': ['$purchases.user', []]}]},
    }}
    yield {'$sort': {'is_bought': 1, 'created_at': -1}}

try:
    docs = MessagePackProduct.objects.aggregate(*stages())
except StopIteration:
    docs = []
else:
    for doc in docs:
        print(doc)

ref:
https://docs.mongodb.com/master/reference/operator/aggregation/filter/
https://stackoverflow.com/questions/3985214/retrieve-only-the-queried-element-in-an-object-array-in-mongodb-collection

Join Another Collection Using $lookup

def stages():
    yield {'$match': {
        'tags': 'pack:prod_CR1u34BIpDbHeo',
    }}
    yield {'$lookup': {
        'from': 'user',
        'localField': 'sender',
        'foreignField': '_id',
        'as': 'sender_data',
    }}
    yield {'$unwind': '$sender_data'}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'sender': {
            'id': '$sender_data._id',
            'username': '$sender_data.username',
        },
        'caption': True,
        'posted_at': True,
    }}
    yield {'$sort': {'posted_at': -1}}

try:
    docs = Message.objects.aggregate(*stages())
except StopIteration:
    docs = []
else:
    for doc in docs:
        print(doc)

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/lookup/
https://thecodebarbarian.com/a-nodejs-perspective-on-mongodb-36-lookup-expr

Join Another Collection With Multiple Conditions Using pipeline in $lookup

To access the let variables in the $lookup pipeline, you could only use the $expr operator.

var start = ISODate('2018-09-22T00:00:00.000+08:00');

db.getCollection('feature.shop.order').aggregate([
    {'$match': {
        'payment.timestamp': {'$gte': start},
        'status': {'$in': ['paid']},
    }},
    {'$lookup': {
        'from': 'user',
        'localField': 'customer',
        'foreignField': '_id',
        'as': 'customer_data',
    }},
    {'$unwind': '$customer_data'},
    {'$project': {
        'variation': '$customer_data.experiments.message_unlock_price.variation',
        'amount_normalized': {'$divide': ['$amount', 100.0]},
    }},
    {'$addFields': {
        'amount_usd': {'$multiply': ['$amount_normalized', 0.033]},
    }},
    {'$group': {
       '_id': '$variation',
       'purchase_amount': {'$sum': '$amount_usd'},
       'paid_user_count': {'$sum': 1},
    }},
    {'$lookup': {
        'from': 'user',
        'let': {
            'variation': '$_id',
        },
        'pipeline': [
            {'$match': {
                'last_active': {'$gte': start},
                'experiments': {'$exists': true},
            }},
            {'$match': {
                '$expr': {
                    '$and': [
                         {'$eq': ['$experiments.message_unlock_price.variation', '$$variation']},
                    ],
                },
            }},
            {'$group': {
               '_id': '$experiments.message_unlock_price.variation',
               'count': {'$sum': 1},
            }},
        ],
        'as': 'variation_data',
    }},
    {'$unwind': '$variation_data'},
    {'$project': {
        '_id': 1,
        'purchase_amount': 1,
        'paid_user_count': 1,
        'total_user_count': '$variation_data.count',
    }},
    {'$addFields': {
        'since': start,
        'arpu': {'$divide': ['$purchase_amount', '$total_user_count']},
        'arppu': {'$divide': ['$purchase_amount', '$paid_user_count']},
    }},
    {'$sort': {'_id': 1}},
]);

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/lookup/#join-conditions-and-uncorrelated-sub-queries

or

def stages():
    yield {'$match': {'_id': bson.ObjectId(message_id)}}
    yield {'$limit': 1}
    yield {'$project': {
        '_cls': 1,
        'sender': 1,
        'unlocks': 1,
    }}
    yield {'$unwind': '$unlocks'}
    yield {'$match': {
        'unlocks.user': bson.ObjectId(user_id),
        'unlocks.amount': {'$gt': 0},
    }}
    yield {'$lookup': {
        'from': 'user',
        'let': {
            'sender': '$sender',
            'unlocker': '$unlocks.user',
        },
        'pipeline': [
            {'$match': {
                '$expr': {
                    '$or': [
                        {'$eq': ['$_id', '$$sender']},
                        {'$eq': ['$_id', '$$unlocker']}
                    ]
                }
            }}
        ],
        'as': 'users',
    }}
    yield {'$addFields': {
        'sender': {'$arrayElemAt': ['$users', 0]},
        'unlocker': {'$arrayElemAt': ['$users', 1]},
    }},
    yield {'$project': {
        '_id': 0,
        '_cls': 1,
        'id': '$_id',
        'sender': {
            'id': '$sender._id',
            'username': '$sender.username',
        },
        'unlocker': {
            'id': '$unlocker._id',
            'username': '$unlocker.username',
        },
        'amount': '$unlocks.amount',
    }}

try:
    context = Message.objects.aggregate(*stages()).next()
except StopIteration:
    pass

ref:
https://stackoverflow.com/questions/37086387/multiple-join-conditions-using-the-lookup-operator
https://docs.mongodb.com/manual/reference/operator/aggregation/lookup/#specify-multiple-join-conditions-with-lookup

Count Documents In Another Collection With $lookup (JOIN)

def stages():
    category_tag = f'category:{category}'
    yield {'$match': {
        'active': True,
        'tags': category_tag,
    }}
    yield {'$addFields': {
        'message_pack_id_tag': {'$concat': ['pack:', '$_id']},
    }}
    yield {'$lookup': {
        'from': 'message',
        'localField': 'message_pack_id_tag',
        'foreignField': 'tags',
        'as': 'total',
    }}
    yield {'$addFields': {
        'total': {'$size': '$total'}
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'name': True,
        'total': True,
    }}

try:
    docs = MessagePackProduct.objects.aggregate(*stages())
except StopIteration:
    docs = []
else:
    for doc in docs:
        print(doc)

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/lookup/#equality-match

Use $lookup as findOne() Which Returns An Object

Use $lookup and $unwind.

import bson

def stages():
    yield {'$match': {'_id': bson.ObjectId(gift_id)}}
    yield {'$limit': 1}
    yield {'$lookup': {
        'from': 'user',
        'localField': 'sender',
        'foreignField': '_id',
        'as': 'sender',
    }}
    yield {'$unwind': '$sender'}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'sender': {
            'id': '$sender._id',
            'username': '$sender.username',
        },
        'product_id': '$product._id',
        'sent_at': '$sent_at',
        'amount': '$cost.amount',
    }}

try:
    _context = Gift.objects.aggregate(*stages()).next()
except StopIteration:
    pass

ref:
https://stackoverflow.com/questions/37691727/how-to-use-mongodbs-aggregate-lookup-as-findone

Collapse Documents In An Array

def stages():
    yield {'$match': {
        'tags': f'tutorial:buy-diamonds:v1',
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'caption.text': True,
        'sender': True,
        'media.type': '$media.mimetype',
    }}
    yield {'$facet': {
        'inbox': [
            {'$sort': {'created_at': -1}},
            {'$limit': 10}
        ],
    }}
    yield {'$project': {
        'inbox': True,
        'required_unlock_count': {'$literal': 5},
        'price_per_message': {'$literal': 1200},
    }}

try:
    result = Message.objects.aggregate(*stages()).next()
except StopIteration:
    result = {}

JSON output:

{
    "inbox": [
        {
            "caption": {
                "text": "fuck yeah"
            },
            "id": "5aaba1e9593950337a90dcb3",
            "media": {
                "type": "video/mp4"
            },
            "sender": "5a66d5c2af9c462c617ce552"
        },
        {
            "caption": {
                "text": "test"
            },
            "id": "5ad549276b2c362a4efe5e21",
            "media": {
                "type": "image/jpeg"
            },
            "sender": "5a66d5c2af9c462c617ce552"
        }
    ],
    "price_per_message": 1200,
    "required_unlock_count": 5
}

Do Pagination With $facet And $project

def stages():
    # normal query
    yield {'$match': {
        'purchases.user': g.user.id,
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'name': True,
        'created_at': True,
        'meta': {
            'revision': '$revision',
            'tags': '$tags',
        },
    }}
    yield {'$sort': {'created_at': -1}}

    # pagination
    page = 0
    limit = 10
    yield {'$facet': {
        'meta': [
            {'$count': 'total'},
        ],
        'objects': [
            {'$skip': page * limit},
            {'$limit': limit},
        ]
    }}
    # JSON output:
    # {
    #    "meta": [
    #       {"total": 2}
    #    ],
    #    "objects": [
    #       {
    #          "id": "prod_CR1u34BIpDbHeo",
    #          "name": "Product Name 2"
    #       },
    #       {
    #          "id": "prod_Fkhf9JFK3Rdgk9",
    #          "name": "Product Name 1"
    #       }
    #    ]
    # }
    yield {'$project': {
        'total': {'$let': {
            'vars': {
                'meta': {'$arrayElemAt': ['$meta', 0]},
            },
            'in': '$$meta.total',
        }},
        'objects': True,
    }}
    # JSON output:
    # {
    #    "total": 2,
    #    "objects": [
    #       {
    #          "id": "prod_CR1u34BIpDbHeo",
    #          "name": "Product Name 2"
    #       },
    #       {
    #          "id": "prod_Fkhf9JFK3Rdgk9",
    #          "name": "Product Name 1"
    #       }
    #    ]
    # }

try:
    output = MessagePackProduct.objects.aggregate(*stages()).next()
except StopIteration:
    output = {}
else:
    print(output)

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/facet/
https://docs.mongodb.com/manual/reference/operator/aggregation/project/

Perform $facet + $project => Unwrap with $unwind => Do $facet + $project Again

def stages():
    yield {'$match': {
        'purchases.user': g.user.id,
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'name': True,
        'image': {
            '$ifNull': [{'$arrayElemAt': ['$images', 0]}, None],
        },
        'created_at': True,
    }}
    yield {'$sort': {'created_at': -1}}

    # pagination
    page = 0
    limit = 10
    yield {'$facet': {
        'meta': [
            {'$count': 'total'},
        ],
        'objects': [
            {'$skip': page * limit},
            {'$limit': limit},
        ]
    }}
    yield {'$project': {
        'total': {'$let': {
            'vars': {
                'meta': {'$arrayElemAt': ['$meta', 0]},
            },
            'in': '$$meta.total',
        }},
        'objects': True,
    }}

    # do $lookup after the pagination
    yield {'$unwind': '$objects'}
    yield {'$addFields': {
        'objects.message_pack_id_tag': {'$concat': ['pack:', '$objects.id']},
    }}
    yield {'$lookup': {
        'from': 'message',
        'localField': 'objects.message_pack_id_tag',
        'foreignField': 'tags',
        'as': 'objects.total',
    }}
    yield {'$addFields': {
        'objects.total': {'$size': '$objects.total'}
    }}

    # re-wrap into the pagination structure
    yield {'$facet': {
        'total_list': [
            {'$project': {
                'total': True,
            }},
        ],
        'objects': [
            {'$replaceRoot': {'newRoot': '$objects'}},
        ]
    }}
    yield {'$project': {
        'total': {'$let': {
            'vars': {
                'meta': {'$arrayElemAt': ['$total_list', 0]},
            },
            'in': '$$meta.total',
        }},
        'objects': True,
    }}

try:
    output = MessagePackProduct.objects.aggregate(*stages()).next()
except StopIteration:
    output = {}
else:
    print(output)

Do $group First To Reduce Numbers Of $lookup Calls

def stages():
    yield {'$match': {
        'tags': f'pack:{message_pack_id}',
    }}
    yield {'$group': {
        '_id': '$sender',
        'messages': {'$push': '$$ROOT'},
    }}
    yield {'$lookup': {
        'from': 'user',
        'localField': '_id',
        'foreignField': '_id',
        'as': 'sender_data',
    }}
    yield {'$unwind': '$messages'}
    yield {'$project': {
        '_id': False,
        'id': '$messages._id',
        'caption': {
            'text': '$messages.caption.text',
            'y': '$messages.caption.y',
        },
        'sender': {
            'id': {'$arrayElemAt': ['$sender_data._id', 0]},
            'username': {'$arrayElemAt': ['$sender_data.username', 0]},
        },
    }}

try:
    docs = Message.objects.aggregate(*stages())
except StopIteration:
    docs = []
else:
    for doc in docs:
        print(doc)

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/group/

Copy Collections To Another Database

var bulk = db.getSiblingDB('target_db')['target_collection'].initializeOrderedBulkOp();
db.getCollection('source_collection').find().forEach(function(d) {
    bulk.insert(d);
});
bulk.execute();

var bulk = db.getSiblingDB('test')['company.revenue'].initializeOrderedBulkOp();
db.getCollection('company.revenue').find().forEach(function(d) {
    bulk.insert(d);
});
bulk.execute();

var bulk = db.getSiblingDB('test')['user.contract'].initializeOrderedBulkOp();
db.getCollection('user.contract').find().forEach(function(d) {
    bulk.insert(d);
});
bulk.execute();

var bulk = db.getSiblingDB('test')['user.revenue'].initializeOrderedBulkOp();
db.getCollection('user.revenue').find().forEach(function(d) {
    bulk.insert(d);
});
bulk.execute();

ref:
https://stackoverflow.com/questions/11554762/how-to-copy-a-collection-from-one-database-to-another-in-mongodb

Sadly, cloneCollection() cannot clone collections from one local database to another local database.

ref:
https://docs.mongodb.com/manual/reference/command/cloneCollection/

Useful Tools

Backup

$ mongodump -h  127.0.0.1:27017 --oplog -j=8 --gzip --archive=/data/mongodump.tar.gz

ref:
https://docs.mongodb.com/manual/reference/program/mongodump/

Restore

$ mongorestore --drop --gzip --archive=2018-08-12T03.tar.gz

This kind of error typically indicates some sort of issue with data corruption, which is often caused by problems with the underlying storage device, file system or network connection.

restoring indexes for collection test.message from metadata
Failed: test.message: error creating indexes for test.message: createIndex error: BSONElement: bad type -47

ref:
https://docs.mongodb.com/manual/reference/program/mongorestore/

Profiling

You could also set the profiling level to 2 to record every query.

db.setProfilingLevel(2);

db.getCollection('system.profile').find({
    'ns': { 
        '$nin' : ['test.system.profile', 'test.system.indexes', 'test.system.js', 'test.system.users']
    }
}).limit(5).sort({'ts': -1}).pretty();

ref:
https://docs.mongodb.com/manual/tutorial/manage-the-database-profiler/
https://stackoverflow.com/questions/15204341/mongodb-logging-all-queries

$ pip install mongotail

# set the profiling level
$ mongotail 127.0.0.1:27017/test -l 2

# tail logs
$ mongotail 127.0.0.1:27017/test -f -m -f

ref:
https://github.com/mrsarm/mongotail

Monitoring

$ mongotop
$ mongostat

ref:
https://docs.mongodb.com/manual/reference/program/mongotop/
https://docs.mongodb.com/manual/reference/program/mongostat/

$ pip install mtools

$ mloginfo mongod.log

ref:
https://github.com/rueckstiess/mtools

Speed up Python and Node.js builds on Travis CI

Speed up Python and Node.js builds on Travis CI

Travis CI's caching archives all directories listed in the configuration and uploads them to Amazon S3. Cached contents are available to any build on the repository, including Pull Requests. For Python and Node.js projects, you could cache both site-packages and node_modules directories in every Travis CI build.

Here is an example of .travis.yml:

sudo: false

language: python

python:
  - "2.7"

node_js: 4

cache:
  directories:
    - $HOME/.cache/pip
    - $HOME/virtualenv/python2.7.9/lib/python2.7/site-packages
    - node_modules

before_install:
  - pip install -U pip

install:
  - pip install -r requirements.txt
  - pip install coverage --ignore-installed
  - npm install

script:
  - coverage run manage.py test

In the case of mine, after applying these changes, the installation time of pip and npm reduces from 180 seconds to 5 seconds.

One thing should be mentioned here: Since we didn't specify any bin folder in the configuration (and I don't think that's necessary), any execution file that being installed by pip such as coverage or django-admin.py will not exist in subsequent builds. If you need those commands, you could just force install them by adding pip install some_package --ignore-installed.

ref:
https://docs.travis-ci.com/user/caching/
https://stackoverflow.com/questions/19422229/how-to-cache-requirements-for-a-django-project-on-travis-ci
https://tzangms.com/how-to-speed-up-python-unit-test-on-travis-ci/

AWS Lambda Cookbook

AWS Lambda Cookbook

AWS Lambda is an event-driven service that you can upload your code to it and run those code on-demand without having your own servers.

ref:
http://aws.amazon.com/lambda/
http://docs.aws.amazon.com/lambda/latest/dg/limits.html

API Gateway 就是 URL routing
Lambda 則是那些 route (endpoint) 對應的 handler
如果你是用 event 或 schedule 的方式呼叫 Lambda function 的話
可以不用 API Gateway

AWS Lambda 有兩種 invocation type
一是 RequestResponse,同步(例如綁定 API Gateway 和你在 Lambda Management Console 操作的時候)
二是 Event,非同步

Runtimes

AWS Lambda supports the following runtime versions:

  • nodejs (Node v0.10)
  • nodejs4.3
  • java
  • python

ref:
http://docs.aws.amazon.com/lambda/latest/dg/current-supported-versions.html

Node.js

const aws = require('aws-sdk');

exports.handle = (event, context, callback) => {
  doYourShit();
  callback(null, 'DONE');
};

每個 Lambda function 會接收三個參數 eventcontextcallback

event 是從外部的 input
可能是來自 S3 object event、DynamoDB stream 或是由 API Gateway POST 進來的 JSON payload

context 則會包含當前這個 Lambda fuction 的一些 metadata
例如 context.getRemainingTimeInMillis()

callback 參數只有 Node.js runtime v4.3 才支援
v0.10 的話得用 context.succeed()context.fail()context.done()
不過誰他媽還在用 Node.js v0.10

ref:
http://docs.aws.amazon.com/lambda/latest/dg/programming-model.html
http://docs.aws.amazon.com/lambda/latest/dg/nodejs-prog-model-handler.html
http://docs.aws.amazon.com/lambda/latest/dg/nodejs-prog-model-context.html
http://docs.aws.amazon.com/lambda/latest/dg/best-practices.html

Calling another Lambda function in a Lambda function.

要注意的是
你的 Lambda function 的 role 得要有 invoke 其他 Lambda function 的權限才行

const util = require('util');

const aws = require('aws-sdk');

const params = {
  FunctionName: 'LambdaBaku_syncIssue',
  InvocationType: 'Event', // means asynchronous execution
  Payload: JSON.stringify({ issue_number: curatedIssue.number }),
};

lambda.invoke(params, (err, data) => {
  if (err) {
    console.log('FAIL', params);
    console.log(util.inspect(err));
  } else {
    console.log(data);
  }
});

ref:
http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Lambda.html
http://stackoverflow.com/questions/31714788/can-an-aws-lambda-function-call-another

完整的程式碼放在 GitHub 上
https://github.com/CodeTengu/lambdabaku

Users and Roles

如果你是用 apex 來管理 Lambda functions 的話
確保你用的 AWS credential (User) 擁有 AWSLambdaFullAccessAWSLambdaRole 這兩個 permissions

以 project 為單位建立 Role 即可
例如 lambdabaku_role
你可以在 IAM Management Console 找到那些你建立的 roles
基本上用 Basic execution role 就夠了
反正之後可以隨時修改 Role 的 permission / policy
Lambda function 屬於哪個 VPC 是額外指定的
跟 Role 沒有關係
也就是說你用 Basic execution role 還是可以支援 VPC

如果想在 Lambda function 裡存取 DynamoDB
要記得在 Role 裡新增對應的設定

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        },
        {
            "Sid": "Stmt1428341300017",
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": [
                "arn:aws:dynamodb:ap-northeast-1:004615714446:table/CodeTengu_Preference",
                "arn:aws:dynamodb:ap-northeast-1:004615714446:table/CodeTengu_WeeklyIssue",
                "arn:aws:dynamodb:ap-northeast-1:004615714446:table/CodeTengu_WeeklyPost"
            ]
        }
    ]
}

Scheduled Events

ref:
http://docs.aws.amazon.com/lambda/latest/dg/with-scheduled-events.html

API Gateway

單純一點的話
Security 可以選 Open with access key
然後到 API Gateway 介面的 API Keys 底下新增一組 access key
然後分配一個 API stage 給它

使用的時候在 HTTP header 加上 x-api-key: YOUR_API_KEY 即可

ref:
http://docs.aws.amazon.com/apigateway/latest/developerguide/how-to-api-keys.html

Related Projects

ref:
https://github.com/serverless/serverless
https://github.com/apex/apex
https://github.com/claudiajs/claudia
https://github.com/garnaat/kappa
https://github.com/Miserlou/Zappa
https://github.com/nficano/python-lambda

淺析 serverless 架構與實作
http://abalone0204.github.io/2016/05/22/serverless-simple-crud/

Deploy Lambda Functions via apex

$ curl https://raw.githubusercontent.com/apex/apex/master/install.sh | sh

$ apex deploy
$ apex invoke syncPublishedIssues --logs
$ echo -n '{"issue_number": 43}' | apex invoke syncIssue --logs

ref:
https://github.com/apex/apex
http://apex.run/