Apex and Terraform: The easiest way to manage AWS Lambda functions

Apex and Terraform: The easiest way to manage AWS Lambda functions

AWS Lambda lets you run code without provisioning or managing servers, which is so-called Serverless or Function as a Service (FaaS).

Apex is a Go command-line tool to manage and deploy your serverless functions on AWS Lambda. Apex is also integrated with Terraform to provide cloud infrastructure management, for instance, configuring your AWS Lambda functions with Amazon API Gateway.

ref:
https://aws.amazon.com/lambda/
https://aws.amazon.com/api-gateway/
https://github.com/apex/apex

You could browse projects created in this post on GitHub:
https://github.com/vinta/pangu.space
https://github.com/CodeTengu/LambdaBaku

Install

$ curl https://raw.githubusercontent.com/apex/apex/master/install.sh | sh

ref:
http://apex.run/#installation

Initialize

It is recommended to configure your AWS credentials with awscli.

$ pip install awscli
$ aws configure

ref:
https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html

To use Apex to manage Lambda functions, you have to make sure your AWS credential has minimum IAM permissions:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": [
        "iam:CreateRole",
        "iam:CreatePolicy",
        "iam:AttachRolePolicy",
        "iam:PassRole",
        "lambda:GetFunction",
        "lambda:ListFunctions",
        "lambda:CreateFunction",
        "lambda:DeleteFunction",
        "lambda:InvokeFunction",
        "lambda:GetFunctionConfiguration",
        "lambda:UpdateFunctionConfiguration",
        "lambda:UpdateFunctionCode",
        "lambda:CreateAlias",
        "lambda:UpdateAlias",
        "lambda:GetAlias",
        "lambda:ListAliases",
        "lambda:ListVersionsByFunction",
        "logs:FilterLogEvents",
        "cloudwatch:GetMetricStatistics"
      ],
      "Effect": "Allow",
      "Resource": "*"
    }
  ]
}
$ apex init

ref:
http://apex.run/#getting-started

After running apex init, Apex creates a Role and a Policy. You should be able to find them on AWS IAM Management Console. If you want to access other AWS resources, for instance, S3 buckets, DynamoDB tables, SNS, in your Lambda functions, you must create a new Policy which grants appropriate permissions and attachs itself to the Role that Apex created.

Here is a Policy example of operating certain DynamoDB tables:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt123456789",
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": [
                "arn:aws:dynamodb:ap-northeast-1:123456789:table/CodeTengu_Preference",
                "arn:aws:dynamodb:ap-northeast-1:123456789:table/CodeTengu_Preference/*",
                "arn:aws:dynamodb:ap-northeast-1:123456789:table/CodeTengu_WeeklyIssue",
                "arn:aws:dynamodb:ap-northeast-1:123456789:table/CodeTengu_WeeklyIssue/*",
                "arn:aws:dynamodb:ap-northeast-1:123456789:table/CodeTengu_WeeklyPost",
                "arn:aws:dynamodb:ap-northeast-1:123456789:table/CodeTengu_WeeklyPost/*"
            ]
        }
    ]
}

Write Lambda Functions

ref:
https://docs.aws.amazon.com/lambda/latest/dg/current-supported-versions.html
https://docs.aws.amazon.com/lambda/latest/dg/best-practices.html

Node.js

The simplest handler:

const aws = require('aws-sdk');

exports.handle = (event, context, callback) => {
  doYourShit();
  callback(null, 'DONE');
};

ref:
https://docs.aws.amazon.com/lambda/latest/dg/programming-model.html

Call another Lambda function in a Lambda function:

You must make sure your Lambda role has the permission of invoking other Lambda functions.

const util = require('util');

const aws = require('aws-sdk');

const params = {
  FunctionName: 'LambdaBaku_syncIssue',
  InvocationType: 'Event', // means asynchronous execution
  Payload: JSON.stringify({ issue_number: curatedIssue.number }),
};

lambda.invoke(params, (err, data) => {
  if (err) {
    console.log('FAIL', params);
    console.log(util.inspect(err));
  } else {
    console.log(data);
  }
});

ref:
https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Lambda.html
https://stackoverflow.com/questions/31714788/can-an-aws-lambda-function-call-another

Go

Write a Lambda function triggered by Amazon API Gateway:

package main

import (
    "encoding/json"
    "errors"
    "log"

    "github.com/aws/aws-lambda-go/events"
    "github.com/aws/aws-lambda-go/lambda"
    "github.com/vinta/pangu"
)

var (
    // ErrTextNotProvided is thrown when text is not provided in HTTP query string
    ErrTextNotProvided = errors.New("No text was provided in HTTP query string")
)

// Handler is the AWS Lambda function handler
func Handler(request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) {
    log.Printf("request id: %s\n", request.RequestContext.RequestID)

    text, ok := request.QueryStringParameters["t"]
    if !ok {
        errMap := map[string]string{
            "message": ErrTextNotProvided.Error(),
        }
        errMapJSON, _ := json.MarshalIndent(errMap, "", " ")

        return events.APIGatewayProxyResponse{
            Body: string(errMapJSON),
            StatusCode: 400,
        }, nil
    }

    log.Printf("text: %s\n", text)

    textPlainHeaders := map[string]string{
        "content-type": "text/plain; charset=utf-8",
    }

    return events.APIGatewayProxyResponse{
        Body: pangu.SpacingText(text),
        Headers: textPlainHeaders,
        StatusCode: 200,
    }, nil
}

func main() {
    lambda.Start(Handler)
}

ref:
https://aws.amazon.com/blogs/compute/announcing-go-support-for-aws-lambda/
https://docs.aws.amazon.com/lambda/latest/dg/go-programming-model-handler-types.html
https://docs.aws.amazon.com/lambda/latest/dg/go-programming-model-errors.html

Your "Integration Request" configurations in API Gateway should be like:

  • Integration type: Lambda Function
  • Use Lambda Proxy integration: Yes
  • Lambda Region: ap-northeast-1
  • Lambda Function: panguspace_spacing_text
  • Invoke with caller credentials: No
  • Credentials cache: Do not add caller credentials to cache key
  • Use Default Timeout: Yes

It's also worth noting that the API response is mainly defined by APIGatewayProxyResponse in Lambda function code. Configurations in API Gateway, i.e., "Integration Response" and "Method Response" do not matter.

ref:
https://docs.aws.amazon.com/apigateway/latest/developerguide/getting-started-with-lambda-integration.html

Usage

Deploy all functions:

$ apex deploy

ref:
http://apex.run/#deploying-functions

Invoke a function:

# invoke a function directly
$ apex invoke spacing_text --logs
{
    "statusCode": 400,
    "headers": null,
    "body":"{\"message\": \"No text was provided in the HTTP query string\"}"
}

# invoke a function with an API Gateway event
$ cat fixtures/spacing_text_event.json
{
    "queryStringParameters": {"t": "與PM戰鬥的人,應當小心自己不要成為PM"}
}
$ apex invoke spacing_text --logs < fixtures/spacing_text_event.json
{
    "statusCode": 200,
    "headers": {"content-type": "text/plain; charset=utf-8"},
    "body": "與 PM 戰鬥的人,應當小心自己不要成為 PM"
}

ref:
http://apex.run/#invoking-functions

View logs which might delay several seconds:

$ apex logs -f

Pack a function:

$ apex build spacing_text > spacing_text.zip

Configure API Gateway

Create API Keys

To setup API keys, do the following:

  1. Configure your API methods to require an API key
  2. Deploy your API
  3. Create an API key for the API in a region
  4. Create an Usage Plan and assign an API key with a certain Stage

In step 1, your "Method Request" configurations in API Gateway should be like:

  • Authorization: NONE
  • Request Validator: NONE
  • API Key Required: true

Now you are able to call the API with a x-api-key header:

$ curl -H "x-api-key: YOUR-API-KEY" https://xxx.execute-api.ap-northeast-1.amazonaws.com/v1/your-endpoint/

ref:
https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-create-usage-plans-with-rest-api.html
https://docs.aws.amazon.com/apigateway/latest/developerguide/how-to-use-postman-to-call-api.html

Actually, you could release your APIs without API keys if you like.

Setup a Custom Domain

To setup a custom domain which managed by Cloudflare, see the following link:
https://stackoverflow.com/a/46061708/885524

It is worth noting that even the Stack Overflow answer said using Full (Strict) SSL mode but actually Full also works.

Moreover, it might take a long time to generate "Target Domain Name" (xxx.cloudfront.net).

Don't forget to add "Base Path Mappings" in API Gateway Custom Domain Names:

  • api.pangu.space
    • Target Domain Name: xxx.cloudfront.net
    • ACM Certificate: *.pangu.space
    • Base Path Mappings:
      • Path: /v1
      • Destination: Pangu:v1

Manage Infrastructures with Terraform

Terraform is a tool to manage your cloud infrastructures as code.

$ brew install terraform

$ tree .
.
├── functions
│   ├── introduce
│   │   └── main.go
│   └── spacing_text
│       └── main.go
└── infrastructure
    ├── main.tf
    └── variables.tf

Define variables and data sources:

# infrastructure/variables.tf
data "aws_caller_identity" "current" {}

variable "aws_region" {}
variable "apex_environment" {}
variable "apex_function_role" {}

variable "apex_function_arns" {
  type = "map"
}

variable "apex_function_names" {
  type = "map"
}

variable "apex_function_introduce" {}
variable "apex_function_spacing_text" {}

ref:
https://www.terraform.io/docs/providers/aws/d/caller_identity.html

Define AWS resources:

# infrastructure/main.tf
resource "aws_api_gateway_rest_api" "pangu" {
  name = "Pangu"
}

resource "aws_api_gateway_method" "pangu_root" {
  rest_api_id   = "${aws_api_gateway_rest_api.pangu.id}"
  resource_id   = "${aws_api_gateway_rest_api.pangu.root_resource_id}"
  http_method   = "GET"
  authorization = "NONE"
}

resource "aws_api_gateway_integration" "pangu_root_get" {
  rest_api_id             = "${aws_api_gateway_rest_api.pangu.id}"
  resource_id             = "${aws_api_gateway_rest_api.pangu.root_resource_id}"
  http_method             = "${aws_api_gateway_method.pangu_root.http_method}"
  integration_http_method = "POST"
  type                    = "AWS_PROXY"
  uri                     = "arn:aws:apigateway:${var.aws_region}:lambda:path/2015-03-31/functions/${var.apex_function_introduce}/invocations"
}

resource "aws_api_gateway_method_response" "pangu_root_get_200" {
  rest_api_id = "${aws_api_gateway_rest_api.pangu.id}"
  resource_id = "${aws_api_gateway_rest_api.pangu.root_resource_id}"
  http_method = "${aws_api_gateway_method.pangu_root.http_method}"
  status_code = "200"

  response_models = {
    "application/json" = "Empty"
  }

  response_parameters = {
    "method.response.header.Access-Control-Allow-Origin" = true
  }
}

resource "aws_api_gateway_resource" "pangu_spacing_text" {
  rest_api_id = "${aws_api_gateway_rest_api.pangu.id}"
  parent_id   = "${aws_api_gateway_rest_api.pangu.root_resource_id}"
  path_part   = "spacing-text"
}

resource "aws_api_gateway_method" "pangu_spacing_text_get" {
  rest_api_id      = "${aws_api_gateway_rest_api.pangu.id}"
  resource_id      = "${aws_api_gateway_resource.pangu_spacing_text.id}"
  http_method      = "GET"
  authorization    = "NONE"
  api_key_required = true
}

resource "aws_api_gateway_integration" "pangu_spacing_text_get" {
  rest_api_id             = "${aws_api_gateway_rest_api.pangu.id}"
  resource_id             = "${aws_api_gateway_resource.pangu_spacing_text.id}"
  http_method             = "${aws_api_gateway_method.pangu_spacing_text_get.http_method}"
  integration_http_method = "POST"
  type                    = "AWS_PROXY"
  uri                     = "arn:aws:apigateway:${var.aws_region}:lambda:path/2015-03-31/functions/${var.apex_function_spacing_text}/invocations"
}

resource "aws_api_gateway_method_response" "pangu_spacing_text_get_200" {
  rest_api_id = "${aws_api_gateway_rest_api.pangu.id}"
  resource_id = "${aws_api_gateway_resource.pangu_spacing_text.id}"
  http_method = "${aws_api_gateway_method.pangu_spacing_text_get.http_method}"
  status_code = "200"

  response_models = {
    "application/json" = "Empty"
  }

  response_parameters = {
    "method.response.header.Access-Control-Allow-Origin" = true
  }
}

resource "aws_api_gateway_deployment" "pangu" {
  depends_on = [
    "aws_api_gateway_method.pangu_root",
    "aws_api_gateway_integration.pangu_root_get",
    "aws_api_gateway_method_response.pangu_root_get_200",
    "aws_api_gateway_resource.pangu_spacing_text",
    "aws_api_gateway_method.pangu_spacing_text_get",
    "aws_api_gateway_integration.pangu_spacing_text_get",
    "aws_api_gateway_method_response.pangu_spacing_text_get_200",
  ]

  rest_api_id = "${aws_api_gateway_rest_api.pangu.id}"
  stage_name  = "v1"
}

resource "aws_lambda_permission" "pangu_root_get" {
  statement_id  = "AllowInvokeFromAPIGateway"
  action        = "lambda:InvokeFunction"
  function_name = "${var.apex_function_introduce}"
  principal     = "apigateway.amazonaws.com"

  source_arn = "arn:aws:execute-api:${var.aws_region}:${data.aws_caller_identity.current.account_id}:${aws_api_gateway_rest_api.pangu.id}/*/${aws_api_gateway_integration.pangu_root_get.http_method}/"
}

resource "aws_lambda_permission" "pangu_spacing_text" {
  statement_id  = "AllowInvokeFromAPIGateway"
  action        = "lambda:InvokeFunction"
  function_name = "${var.apex_function_spacing_text}"
  principal     = "apigateway.amazonaws.com"

  source_arn = "arn:aws:execute-api:${var.aws_region}:${data.aws_caller_identity.current.account_id}:${aws_api_gateway_rest_api.pangu.id}/*/${aws_api_gateway_integration.pangu_spacing_text_get.http_method}${aws_api_gateway_resource.pangu_spacing_text.path}"
}

ref:
https://www.terraform.io/docs/providers/aws/guides/serverless-with-aws-lambda-and-api-gateway.html

# donwload provider plugins
$ apex infra init

# view the generated execution plan
$ apex infra plan

# deploy your infrastructures
$ apex infra apply
$ apex infra apply -auto-approve

ref:
http://apex.run/#managing-infrastructure

MongoDB cookbook: Indexes

MongoDB cookbook: Indexes

Indexes are crucial for the efficient execution of queries and aggregations in MongoDB. Without indexes, MongoDB must perform a collection scan, i.e., scan every document in a collection.

If a write operation modifies an indexed field, MongoDB updates all indexes that have the modified field as a key. So, be careful while choosing indexes.

Types Of Indexes

ref:
https://docs.mongodb.com/manual/indexes/
https://docs.mongodb.com/manual/applications/indexes/

Single Field Index

For a single field index and sort operations, the sort order (i.e. ascending or descending) of the index key doesn't matter. With index intersetion, single field indexs could be powerful.

ref:
https://docs.mongodb.com/manual/core/index-single/

Compound Index

The order of the fields listed in a compound index is very important.

ref:
https://docs.mongodb.com/manual/core/index-compound/
https://docs.mongodb.com/manual/tutorial/create-indexes-to-support-queries/

Index Intersection

MongoDB can use multiple single field indexes to fulfill queries.

db.orders.createIndex({item: 1})
db.orders.createIndex({qty: 1})

db.orders.find({item: 'abc123', qty: {$gt: 15}})

ref:
https://docs.mongodb.com/manual/core/index-intersection/

Covered Queries

ref:
https://docs.mongodb.com/manual/core/query-optimization/#read-operations-covered-query

List Indexes

db.message.getIndexes()

// show collection statistics
db.message.stats()
db.message.stats().indexSizes

ref:
https://docs.mongodb.com/manual/tutorial/manage-indexes/

Add Indexes

An index which contains array fields might consume a lot of disk space.

db.message.createIndex({
    '_cls': 1,
    'sender': 1,
    'posted_at': 1
}, {'background': true, 'sparse': true})

db.message.createIndex({
    '_cls': 1,
    'includes': 1,
    'posted_at': 1
}, {'background': true, 'sparse': true})

db.getCollection('message').find({
    '$or': [
        // sent by cp
        {
            '_cls': 'Message.ChatMessage',
            'sender': ObjectId('582ee32a5b9c861c87dc297e'),
            'posted_at': {
                '$gte': ISODate('2018-01-08T00:00:00.000Z'),
                '$lt': ISODate('2018-01-14T00:00:00.000Z')
            }
        },
        // sent by payer
        {
            '_cls': 'Message.GiftMessage',
            'includes': ObjectId('582ee32a5b9c861c87dc297e'),
            'posted_at': {
                '$gte': ISODate('2018-01-08T00:00:00.000Z'),
                '$lt': ISODate('2018-01-14T00:00:00.000Z')
            }
        }
    ]
})

You can't index two arrays together, in this example: includes and unlocks.

// it doesn't work
db.message.createIndex({
    '_cls': 1,
    'sender': 1,
    'includes': 1,
    'unlocks': 1
}, {'background': true, 'sparse': true})

The Order Of Fields of Compound Indexes

The order of fields in an index matters, you must consider Index Cardinality and Selectivity. Instead, the order of fields in a find() query or $match in an aggregation doesn't affect whether it can use an index or not.

The order of fields in a compound index should be:

  • First, fields on which you will query for exact values.
  • Second, fields on which you will sort.
  • Finally, fields on which you will query for a range of values.

ref:
https://blog.mlab.com/2012/06/cardinal-ins/
https://emptysqua.re/blog/optimizing-mongodb-compound-indexes/
https://stackoverflow.com/questions/5245737/mongodb-indexes-order-and-query-order-must-match
https://stackoverflow.com/questions/33545339/how-does-the-order-of-compound-indexes-matter-in-mongodb-performance-wise

Partial Indexes v.s. Sparse Indexes

Partial indexes should be preferred over sparse indexes. However, partial indexes only support a very small set of filter operators:

  • $exists
  • $eq or field: value
  • $gt, $gte, $lt, $lte
  • $type
  • $and

If you use 'partialFilterExpression': {'includes': {'$exists': true}}, MongoDB also indexes documents whose includes field has null value.

db.collection('message').createIndex(
    {'_cls': 1, 'includes': 1, 'posted_at': 1},
    {'background': true, 'partialFilterExpression': {'includes': {'$exists': true}}}
)

db.collection('message').createIndex(
  {'created_at': -1},
  {'background': true, 'partialFilterExpression': {'created_at': {'$gte': new Date("2018-01-01T16:00:00Z")}}}
)

ref:
https://docs.mongodb.com/manual/core/index-partial/
https://docs.mongodb.com/manual/core/index-sparse/

Create An Index On An Array Field

Querying will certainly be a lot easier in an array field index than a object field.

ref:
https://stackoverflow.com/questions/9589856/mongo-indexing-on-object-arrays-vs-objects

Create An Unique Index On An Array Field

Create an unique index on an array field.

The unique constraint applies to separate documents in the collection. That is, the unique index prevents separate documents from having the same value for the indexed key. It prevents different documents have the same transaction ID but allows one document has multiple identical transaction IDs.

db.getCollection('test1').createIndex({purchases.transaction_id: 1}, {unique: true})

db.getCollection('test1').insert({ _id: 1, purchases: [
    {transaction_id: 'A'}
]})

db.getCollection('test1').insert({ _id: 5, purchases: [
    {transaction_id: 'A'}
]})

db.getCollection('test1').update({ _id: 1}, {$push: {purchases: {transaction_id: 'A'}}})

To prevent one document has multiple identical transaction IDs, We would have atomic updates on single documents.

user = User(id=bson.ObjectId(user_id))
purchase = DirectPurchase(
    user=user,
    timestamp=timestamp,
    transaction_id=transaction_id,
)
MessagePackProduct.objects \
    .filter(id=message_pack_id, __raw__={
        'purchases': {'$not': {'$elemMatch': {
            '_cls': purchase._cls,
            'user': purchase.user.id,
        }}},
    }) \
    .update_one(push__purchases=purchase)

ref:
https://docs.mongodb.com/manual/core/index-unique/#unique-constraint-across-separate-documents

Sort With Indexes

ref:
https://docs.mongodb.com/manual/tutorial/sort-results-with-indexes/

Drop Indexes

db.message.dropIndex({
    'includes': 1
})

db.message.dropIndex({
    '_cls': 1,
    'posted_at': 1,
    'includes': 1
})

Remove Unused Indexes

You can use db.getCollection('message').aggregate({$indexStats: {}}) to find unused indexes, there is a accesses.ops which means the number of operations that have used the index. Also, you should remove indexes which have the same prefix.

db.getCollection('message').aggregate(
    {
        '$indexStats': {}
    },
    {
        '$match': {
            'accesses.ops': {'$gt': 0}
        }
    }
)

Result:

{
    "name" : "_cls_1_sender_1_posted_at_1",
    "key" : {
        "_cls" : 1,
        "sender" : 1,
        "posted_at" : 1
    },
    "host" : "a6ea11893605:27017",
    "accesses" : {
        "ops" : 3,
        "since" : "2018-01-26T07:04:51.137Z"
    }
}

這部漫畫真的不得了,有點像是重口味的 Zootopia,但是無論是劇情深度或是畫面的衝擊性都太 hardcore 了,我一個老實人是有點頂不住。

同場推薦 Designs 和約定的夢幻島,這陣子看得漫畫都太獵奇了。

ref:
https://blog.mlab.com/2017/01/using-mongodb-indexstats-to-identify-and-remove-unused-indexes/
https://scalegrid.io/blog/how-to-find-unused-indexes-in-mongodb/

Profiling

// enable
db.setProfilingLevel(2)

// disable
db.setProfilingLevel(0)

// see profiling data after you issues some queries
db.system.profile.find().limit(10).sort( { ts : -1 } ).pretty()

// delete profiling data
db.system.profile.drop()

Query Explain

There are both collection.find().explain() and collection.explain().find(). It's recommended to use collection.find().explain('executionStats') for getting more information, like total documents examined.

db.getCollection('message').find({
    '$or': [
        // sent by cp
        {
            '_cls': 'Message.ChatMessage',
            'sender': ObjectId('582ee32a5b9c861c87dc297e'),
            'posted_at': {
                '$gte': ISODate('2018-01-08T00:00:00.000Z'),
                '$lt': ISODate('2018-01-14T00:00:00.000Z')
            }
        },
        {
            '_cls': 'Message',
            'sender': ObjectId('582ee32a5b9c861c87dc297e'),
            'posted_at': {
                '$gte': ISODate('2018-01-08T00:00:00.000Z'),
                '$lt': ISODate('2018-01-14T00:00:00.000Z')
            }
        },
        // sent by payer
        {
            '_cls': 'Message.ChatMessage',
            'includes': ObjectId('582ee32a5b9c861c87dc297e'),
            'posted_at': {
                '$gte': ISODate('2018-01-08T00:00:00.000Z'),
                '$lt': ISODate('2018-01-14T00:00:00.000Z')
            }
        },
        {
            '_cls': 'Message.ReplyMessage',
            'includes': ObjectId('582ee32a5b9c861c87dc297e'),
            'posted_at': {
                '$gte': ISODate('2018-01-08T00:00:00.000Z'),
                '$lt': ISODate('2018-01-14T00:00:00.000Z')
            }
        },
        {
            '_cls': 'Message.GiftMessage',
            'includes': ObjectId('582ee32a5b9c861c87dc297e'),
            'posted_at': {
                '$gte': ISODate('2018-01-08T00:00:00.000Z'),
                '$lt': ISODate('2018-01-14T00:00:00.000Z')
            }
        }
    ]
})
// .explain()
// .explain('allPlansExecution')
.explain('executionStats')

ref:
https://docs.mongodb.com/manual/reference/method/cursor.explain/
https://docs.mongodb.com/manual/reference/method/db.collection.explain/#db.collection.explain

You could also explain a .update() query. However, .updateMany() and .updateOne() don't support .explain().

db.getCollection('user').explain().update(
    {'follows.user': ObjectId("57985b784af4124063f090d3")},
    {'$set': {'created_at': ISODate('2018-01-01 00:00:00.000Z')}},
    {'multi': true}
)

Some important fields to look at in the result of explain():

  • executionStats.totalKeysExamined
  • executionStats.totalDocsExamined
  • queryPlanner.winningPlan.stage
  • queryPlanner.winningPlan.inputStage.stage
  • queryPlanner.winningPlan.inputStage.indexName
  • queryPlanner.winningPlan.inputStage.direction

Possible values of stage:

  • COLLSCAN: scanning the entire collection
  • IXSCAN: scanning index keys
  • FETCH: retrieving documents
  • SHARD_MERGE: merging results from shards

ref:
https://docs.mongodb.com/manual/reference/explain-results/

Aggregation Explain

db.getCollection('message').explain().aggregate()

ref:
https://stackoverflow.com/questions/12702080/mongodb-explain-for-aggregation-framework
https://docs.mongodb.com/manual/reference/method/db.collection.explain/

If $project, $unwind, or $group occur prior to the $sort operation, $sort cannot use any indexes. Additionally, $sort can only use fields defined in previous $project stage.

Basically, you could just consider the $match part when you want to create new indexes.

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/sort/#sort-operator-and-performance

MongoEngine

_cls creation on indexes is automatically included if allow_inheritance is on. If you want to disable, set kwarg cls: False.

ref:
http://docs.mongoengine.org/guide/defining-documents.html#indexes

MongoDB cookbook: Queries and Aggregations

MongoDB cookbook: Queries and Aggregations

Frequently accessed items are cached in memory, so that MongoDB can provide optimal response time.

MongoDB Shell in JavaScript

Administration

db.currentOp();

// slow queries
db.currentOp({
    "active": true,
    "secs_running": {"$gt" : 3},
    "ns": /^swag\./
});

// queries not using any index
db.adminCommand({ 
    "currentOp": true,
    "op": "query", 
    "planSummary": "COLLSCAN"
});

// operations with high numYields
db.adminCommand({ 
    "currentOp": true, 
    "ns": /^swag\./, 
    "numYields": {"$gte": 100} 
}) 

db.serverStatus().connections
{
    "current" : 269,
    "available" : 838591,
    "totalCreated" : 417342
}

ref:
https://docs.mongodb.com/manual/reference/method/db.currentOp/
https://hackernoon.com/mongodb-currentop-18fe2f9dbd68
http://www.mongoing.com/archives/6246

BSON Types

ref:
https://docs.mongodb.com/manual/reference/bson-types/

Check If A Document Exists

It is significantly faster to use find() + limit() because findOne() will always read + return the document if it exists. find() just returns a cursor (or not) and only reads the data if you iterate through the cursor.

db.getCollection('message').find({_id: ObjectId("585836504b287b5022a3ae26", delivered: false)}, {_id: 1}).limit(1)

ref:
https://stackoverflow.com/questions/8389811/how-to-query-mongodb-to-test-if-an-item-exists
https://blog.serverdensity.com/checking-if-a-document-exists-mongodb-slow-findone-vs-find/

Find Documents

db.getCollection('user').find({username: 'nanababy520'})

db.getCollection('message').find({_id: ObjectId("5a6383b8d93d7a3fadf75af3")})

db.getCollection('message').find({_cls: 'Message'}).sort({posted_at: -1})

db.getCollection('message').find({sender: ObjectId("57aace67ac08e72acc3b265f"), pricing: {$ne: 0}})

db.getCollection('message').find({
    sender: ObjectId("5ac0f56038cfff013a123d85"),
    created_at: {
        $gte: ISODate('2018-04-21 12:00:00Z'),
        $lte: ISODate('2018-04-21 13:00:00Z')
    }
})
.sort({created_at: -1})

Find Documents With Regular Expression

db.getCollection('user').find({'username': /vicky/})

ref:
https://docs.mongodb.com/manual/reference/operator/query/regex/

Find Documents With An Array Field

  • $in: [...] means "intersection" or "any element in"
  • $all: [...] means "subset" or "contain"
  • $elemMatch: {...} means "any element match"
  • $not: {$elemMatch: {$nin: [...]}} means "subset" or "in"

The last one roughly means not any([False, False, False, False]) where each False is indicating if the item is not in in [...].

ref:
https://stackoverflow.com/questions/12223465/mongodb-query-subset-of-an-array

db.getCollection('message').find({includes: ObjectId("5a4bb448af9c462c610d0cc7")})

db.getCollection('user').find({gender: 'F', tags: 'promoted'})
db.getCollection('user').find({gender: 'F', 'tags.1': {$exists: true}})

ref:
https://docs.mongodb.com/manual/reference/operator/query/exists/#exists-true

Find Documents With An Array Field Of Embedded Documents

Usually, you could use $elemMatch.

{'the_array_field': {'$elemMatch': {
    'a_field_of_each_element': {'$lte': now},
    'another_field_of_each_element': 123
}}}
db.getCollection('message').find({
    unlocks: {
        $elemMatch: {
            _cls: 'PointsUnlock',
            user: ObjectId("57f662e727a79d07993faec5")
        }
    }
})

db.getCollection('feature.shop.product').find({
    purchases: {
        $elemMatch: {
            _cls: 'Purchase'
        }
    }
})

db.getCollection('feature.shop.product').find({
    '_id': 'prod_CWlSTXBEU4mhEu',
    'purchases': {'$not': {'$elemMatch': {
        '_cls': 'DirectPurchase',
        'user': ObjectId("58b61d9094ab56f912ba10a5")
    }}},
})

ref:
https://docs.mongodb.com/manual/reference/operator/query/elemMatch/

Find Documents With Existence Of Fields Or Values

  • .find({'field': {'$exists': true}}): the field exists
  • .find({'field': {'$exists': false}}): the field does not exist
  • .find({'field': {'$type': 10}}): the field exists with a null value
  • .find({'field': null}): the field exists with a null value or the field does not exist
  • .find({'field': {'$ne': null}}): the field exists and the value is not null
db.test.insert({'num': 1, 'check': 'value'})
db.test.insert({'num': 2, 'check': null})
db.test.insert({'num': 3})

db.test.find({});

db.test.find({'check': {'$exists': true}})
// return 1 and 2

db.test.find({'check': {'$exists': false}})
// return 3

db.test.find({'check': {'$type': 10}});
// return 2

db.test.find({'check': null})
// return 2 and 3

db.test.find({'check': {'$ne': null}});
// return 1

ref:
https://stackoverflow.com/questions/4057196/how-do-you-query-this-in-mongo-is-not-null
https://docs.mongodb.com/manual/tutorial/query-for-null-fields/

Find Documents Where An Array Field Does Not Contain A Certain Value

db.getCollection('user').update({_id: ObjectId("579994ac61ff217f96a585d9"), tags: {$ne: 'tag_to_add'}}, {$push: {tags: 'tag_to_add'}})

db.getCollection('user').update({_id: ObjectId("579994ac61ff217f96a585d9"), tags: {$nin: ['tag_to_add']}}, {$push: {tags: 'tag_to_add'}})

ref:
https://stackoverflow.com/questions/16221599/find-documents-with-arrays-not-containing-a-document-with-a-particular-field-val

Find Documents Where An Array Field Is Not Empty

db.getCollection('message').find({unlocks: {$exists: true}})

ref:
https://stackoverflow.com/questions/14789684/find-mongodb-records-where-array-field-is-not-empty

Find Documents Where An Array Field's Size Is Greater Than 1

db.getCollection('user.inbox').find({
    'messages.0': {'$exists': true}
})

db.getCollection('message').find({
    '_cls': 'Message',
    'unlocks.10': {'$exists': true}
}).sort({'posted_at': -1})

db.getCollection('message').find({
    '_cls': 'Message.ChatMessage',
    'sender': ObjectId("582ee32a5b9c861c87dc297e"),
    'unlocks': {'$exists': true, '$not': {'$size': 0}}
})

ref:
https://stackoverflow.com/questions/7811163/query-for-documents-where-array-size-is-greater-than-1/15224544

Find Documents With Computed Values Using $expr

For instance, compare 2 fields from a single document in a find() query.

db.getCollection('user').find({
    $expr: {
        $eq: [{$size: '$follows'}, {$size: '$blocks'}]
    }
})

ref:
https://thecodebarbarian.com/a-nodejs-perspective-on-mongodb-36-lookup-expr
https://dzone.com/articles/expressive-query-language-in-mongodb-36-2

Project A Subset Of An Array Field With $filter

A sample document:

{
    "_id" : "message_unlock_pricing",
    "seed" : 42,
    "distributions" : {
        "a" : 0.5,
        "b" : 0.5
    },
    "whitelist" : [ 
        {
            "_id" : ObjectId("57dd071dd20fc40c0cbed6b7"),
            "variation" : "a"
        }, 
        {
            "_id" : ObjectId("5b1173a1487fbe2b2e9bba04"),
            "variation" : "b"
        }, 
        {
            "_id" : ObjectId("5a66d5c2af9c462c617ce552"),
            "variation" : "b"
        }
    ]
}
var now = new Date();

db.getCollection('feature.ab.experiment').aggregate([
    {'$project': {
        '_id': 1,
        'seed': 1,
        'distributions': 1,
        'whitelist': {
            '$filter': {
               'input': {'$ifNull': ["$whitelist", []]},
               'as': "user",
               'cond': {'$eq': ['$$user._id', ObjectId("5a66d5c2af9c462c617ce552")]}
            }
         }
    }},
    {'$unwind': {
        'path': '$whitelist',
        'preserveNullAndEmptyArrays': true
    }}
])

ref:
https://stackoverflow.com/questions/42607221/mongodb-aggregation-project-check-if-array-contains

Insert Documents

db.getCollection('feature.launch').insert({
    'url': '//asia.public.swag.live/launchs/5a06b88aaf9c462c6146ce12.jpg',
    'user': {
        'id': ObjectId("5a06b88aaf9c462c6146ce12"),
        'username': 'luke0804',
        'tags': ["gender:male"]
    }
})

db.getCollection('feature.launch').insert({
    'url': '//asia.public.swag.live/launchs/57c16f5bb811055b66d8ef46.jpg',
    'user': {
        'id': ObjectId("57c16f5bb811055b66d8ef46"),
        'username': 'riva',
        'tags': ["gender:female"]
    }
})

Update Within A For Loop

var oldTags = ['famous', 'newstar', 'featured', 'western', 'recommended', 'popular'];
oldTags.forEach(function(tag) {
    db.getCollection('user').updateMany({tags: tag}, {$addToSet: {tags: 'badge:' + tag}});
});

Update An Array Field

You should use arrayFilters as much as possible.

The syntax of arrayFilters would be:

db.collection.update(
   {<query selector>},
   {<update operator>: {"array.$[<identifier>].field": value}},
   {arrayFilters: [{<identifier>: <condition>}}]}
)
Inbox._get_collection().update_many(
    {'messages.id': message_id},
    {'$set': {'messages.$[message].tags': tags}},
    array_filters=[
        {'message.id': message_id},
    ],
)

ref:
https://docs.mongodb.com/manual/reference/operator/update/positional-filtered/

Insert an element into an array field at a certain position.

db.getCollection('feature.forums.post').update(
   { _id: ObjectId("5b3c6a9c8433b15569cae54e") },
   {
     $push: {
        media: {
           $each: [{
                "mimetype" : "image/jpeg",
                "url" : "https://asia.uploads.swag.live/posts/5adb795b47d057338abe8910.jpg",
                "presets" : {}
            }],
           $position: 1
        }
     }
   }
)

Or use explicit array index $set.

media_id = 'xxx'
media_slot = 0

Post.objects \
    .filter(id=post_id, **{f'media__{media_slot}__id__ne': media_id}) \
    .update_one(__raw__={'$set': {f'media.{media_slot}': {'id': media_id}}})

ref:
https://docs.mongodb.com/manual/reference/operator/update/position/

Set an array field to empty.

db.getCollection('message').update(
    {'tags': 'pack:joycelai-1'},
    {'$set': {'unlocks': []}},
    {'multi': true}
)

db.getCollection('feature.shop.product').update(
    {},
    {'$set': {'purchases': []}},
    {'multi': true}
)

ref:
https://docs.mongodb.com/manual/reference/method/db.collection.update/
https://docs.mongodb.com/manual/reference/operator/update/set/

Remove elements from an array field.

var userId = ObjectId("57985b784af4124063f090d3");

db.getCollection('user').update(
    {'follows.user': userId},
    {'$pull': {'follows': {'user': userId}}},
    {
        'multi': true,
    }
);

db.getCollection('message').update(
    {'_id': {'$in': [
        ObjectId('5aca1ffc4271ab1624787ec4'),
        ObjectId('5aca31ab93ef2936291c3dd4'),
        ObjectId('5aca33d9b5eaef04943c0d0b'),
        ObjectId('5aca34e7a48c543b07fb0a0f'),
        ObjectId('5aca272d93ef296edc1c3dee'),
        ObjectId('5aca342aa48c54306dfb0a21'),
        ObjectId('5aca20756bd01023a8cb02e9')
    ]}},
    {'$pull': {'tags': 'pack:prod_D75YlDMzcCiAw3'}},
    {'multi': true}
);

ref:
https://docs.mongodb.com/manual/reference/operator/update/pull/

Update Large Numbers Of Documents

Use Bulk.find.arrayFilters() and Bulk.find.update() together.

import datetime

expiration_time = datetime.datetime.utcnow() - datetime.timedelta(hours=48)

bulk = Outbox._get_collection().initialize_unordered_bulk_op()

for outbox in Outbox.objects.only('id').filter(messages__posted_at__lt=expiration_time):
    bulk.find({'_id': outbox.id}).update_one({
        '$pull': {'messages': {
            'posted_at': {'$lt': expiration_time},
        }},
    })

try:
    results = bulk.execute()
except pymongo.errors.InvalidOperation as err:
    if str(err) != 'No operations to execute':
        raise err

ref:
https://docs.mongodb.com/manual/reference/method/Bulk/
https://docs.mongodb.com/manual/reference/method/Bulk.find.arrayFilters/

Of course, you could also update the same document with multiple operations. However, it does not make sense.

from pymongo import UpdateOne
import bson

def _operations():
    if title = payload.get('title'):
        yield UpdateOne({'_id': bson.ObjectId(post_id)}, {'$set': {'title': title}})

    if location = payload.get('location'):
        yield UpdateOne({'_id': bson.ObjectId(post_id)}, {'$set': {'location': location}})      

    if pricing = payload.get('pricing'):
        yield UpdateOne({'_id': bson.ObjectId(post_id)}, {'$set': {'pricing': pricing}})

    if description = payload.get('description'):
        yield UpdateOne({'_id': bson.ObjectId(post_id)}, {'$set': {'description': description}})

    UpdateOne(
        {
            '_id': bson.ObjectId(post_id),
            'media.0': {'$exists': True},
            'title': {'$ne': None},
            'location': {'$ne': None},
            'pricing': {'$ne': None},
            'posted_at': {'$eq': None},
        },
        {'$set': {'posted_at': utils.utcnow()}},
    )

operations = list(_operations())
result = Post._get_collection().bulk_write(operations, ordered=True)
print(result.bulk_api_result)

ref:
https://api.mongodb.com/python/current/examples/bulk.html

Remove items from an array field of documents.

var userId = ObjectId("57a42a779f22bb6bcc434520");

db.getCollection('user').update(
    {'follows.user': userId},
    {'$pull': {'follows': {'user': userId}}},
    {'multi': true}
)

ref:
https://stackoverflow.com/questions/33594397/how-to-update-a-large-number-of-documents-in-mongodb-most-effeciently

MongoEngine In Python

ref:
http://docs.mongoengine.org/guide/index.html
http://docs.mongoengine.org/apireference.html

Define Collections

It seems every collection in MongoEngine must have a id field.

ref:
http://docs.mongoengine.org/guide/defining-documents.html

Define A Field With Default EmbeddedDocument

The behavior of setting an EmbeddedDocument class as default works differently with and without only().

class User(ab.models.ABTestingMixin, db.Document):
    class UserSettings(db.EmbeddedDocument):
        reply_price = db.IntField(min_value=0, default=500, required=True)
        preferences = db.ListField(db.StringField())

    email = db.EmailField(max_length=255)
    created_at = db.DateTimeField(default=utils.now)
    last_active = db.DateTimeField(default=utils.now)
    settings = db.EmbeddedDocumentField(UserSettings, default=UserSettings)

If the user does not have settings field in DB, here is the difference.

user = User.objects.get(username='gibuloto')
isinstance(user.settings, User.UserSettings) == True

user = User.objects.only('settings').get(username='gibuloto')
(user.settings is None) == True

user = User.objects.exclude('settings').get(username='gibuloto')
isinstance(user.settings, User.UserSettings) == True

Filter With Raw Queries

post = Post.objects \
    .no_dereference().only('posted_at') \
    .filter(__raw__={
        '_id': bson.ObjectId(post_id),
        'media.0': {'$exists': True},
        'title': {'$ne': None},
        'location': {'$ne': None},
        'gender': {'$ne': None},
        'pricing': {'$ne': None},
    }) \
    .modify(__raw__={'$min': {'posted_at': utils.utcnow()}}, new=True)

print(post.posted_at)

ref:
http://docs.mongoengine.org/guide/querying.html#raw-queries

Check If A Document Exists

Use .exists().

import datetime

now = datetime.datetime.now(datetime.timezone.utc)
if TagSchedule.objects.filter(user=user_id, tag=tag, started_at__gt=now).exists():
    return 'exists'

You have to use __raw__ if the field you want to query is a db.ListField(GenericEmbeddedDocumentField(XXX) field.

if MessagePackProduct.objects.filter(id=message_pack_id, __raw__={'purchases.user': g.user.id}).exists():
    return 'exists'

Update With Conditions Of Field Values

You could update the value of the field to a specified value if the specified value is less than or greater than the current value of the field. The $min and $max operators can compare values of different types.

Only set posted_at to current timestamp if its current value is None or absent.

Post.objects.update_one(
    {
        '_id': bson.ObjectId(post_id),
        'media.0': {'$exists': True},
        'title': {'$ne': None},
        'location': {'$ne': None},
        'gender': {'$ne': None},
        'pricing': {'$ne': None},
    },
    {
        '$min': {'posted_at': utils.utcnow()},
    },
)

ref:
https://docs.mongodb.com/manual/reference/operator/update/min/
https://docs.mongodb.com/manual/reference/operator/update/max/

Update An Array Field

Array update operators:

  • $: Acts as a placeholder to update the first element in an array for documents that matches the query condition.
  • $[]: Acts as a placeholder to update all elements in an array for documents that match the query condition.
  • $[<identifier>]: Acts as a placeholder to update elements in an array that match the arrayFilters condition.
  • $addToSet: Adds elements to an array only if they do not already exist in the set.
  • $push: Adds an item to an array.
  • $pop: Removes the first or last item of an array.
  • $pull: Removes all array elements that match a specified query.
  • $pullAll: Removes all matching values from an array.

ref:
https://docs.mongodb.com/manual/reference/operator/update-array/
http://docs.mongoengine.org/guide/querying.html#atomic-updates
http://thecodebarbarian.com/a-nodejs-perspective-on-mongodb-36-array-filters.html

Add an element in an array field.

user_id = '582ee32a5b9c861c87dc297e'
tag = 'my_tag'

updated = User.objects \
    .filter(id=user_id, tags__ne=tag) \
    .update_one(push__tags=tag)

updated = User.objects \
    .filter(id=user_id) \
    .update_one(add_to_set__schedules={
        'tag': tag,
         'nbf': datetime.datetime(2018, 6, 4, 0, 0),
        'exp': datetime.datetime(2019, 5, 1, 0, 0),
    })

Insert an element into an array at a certain position.

slot = 2
Post.objects.filter(id=post_id, media__id__ne=media_id).update_one(__raw__={
    '$push': {
        'media': {
            '$each': [{'id': bson.ObjectId(media_id)}],
            '$position': slot,
        }
    }
})

ref:
https://docs.mongodb.com/manual/reference/operator/update/position/
http://docs.mongoengine.org/guide/querying.html#querying-lists

Remove elements in an array field. It is also worth noting that update(pull__abc=xyz) always returns 1.

user_id = '582ee32a5b9c861c87dc297e'
tag = 'my_tag'

updated = User.objects \
    .filter(id=user_id) \
    .update_one(pull__tags=tag)

updated = User.objects \
    .filter(id=user_id) \
    .update_one(pull__schedules={'tag': tag})

Remove multiple embedded documents in an array field.

import bson

user_id = '5a66d5c2af9c462c617ce552'
tags = ['valid_tag_1', 'future_tag']

updated_result = User._get_collection().update_one(
    {'_id': bson.ObjectId(user_id)},
    {'$pull': {'schedules': {'tag': {'$in': tags}}}},
)
print(updated_result.raw_result)
# {'n': 1, 'nModified': 1, 'ok': 1.0, 'updatedExisting': True}

ref:
https://stackoverflow.com/questions/28102691/pullall-while-removing-embedded-objects

You could also use add_to_set to add an item to an array only if it is not in the list, which always returns 1 if filter() matches any document. However, you are able to set full_result=True to get detail updated result.

update_result = User.objects.filter(id=user_id).update_one(
    add_to_set__tags=tag,
    full_result=True,
)
# {'n': 1, 'nModified': 1, 'ok': 1.0, 'updatedExisting': True}

ref:
http://docs.mongoengine.org/guide/querying.html#atomic-updates

Update a multi-level nested array field. Yes, arrayFilters supports it.

ref:
https://docs.mongodb.com/manual/reference/operator/update/positional-filtered/
https://stackoverflow.com/questions/23577123/updating-a-nested-array-with-mongodb

Update an embedding document in an array field.

MessagePackProduct.objects \
    .filter(id='prod_CR1u34BIpDbHeo', skus__id='sku_CR23rZOTLhYprP') \
    .update(__raw__={
        '$set': {'skus.$': {'id': 'sku_CR23rZOTLhYprP', 'test': 'test'}}
    })

ref:
https://stackoverflow.com/questions/9200399/replacing-embedded-document-in-array-in-mongodb
https://docs.mongodb.com/manual/reference/method/db.collection.update/#db.collection.update

Update specific embedded documents with arrayFilters in an array field.

User data:

{
    "_id" : ObjectId("5a66d5c2af9c462c617ce552"),
    "username" : "gibuloto",
    "tags" : [
        "beta",
        "future_tag",
        "expired_tag"
    ],
    "schedules" : [
        {
            "tag" : "valid_tag",
            "nbf" : ISODate("2018-05-01T16:00:00.000Z"),
            "exp" : ISODate("2020-06-04T16:00:00.000Z")
        },
        {
            "tag" : "future_tag",
            "nbf" : ISODate("2020-01-28T16:00:00.000Z"),
            "exp" : ISODate("2020-12-14T16:00:00.000Z")
        },
        {
            "tag" : "expired_tag",
            "nbf" : ISODate("2016-02-12T16:00:00.000Z"),
            "exp" : ISODate("2016-04-21T16:00:00.000Z")
        }
    ],
}

It is worth noting that <identifier> in $arrayFilters can only contain lowercase alphanumeric characters.

import bson

user_id = '5a66d5c2af9c462c617ce552'
tags = ['from_past_to_future']

updated_result = User._get_collection().update_one(
    {'_id': bson.ObjectId(user_id)},
    {
        '$addToSet': {'tags': {'$each': tags}},
        '$unset': {'schedules.$[schedule].nbf': True},
    },
    array_filters=[{'schedule.tag': {'$in': [tag for tag in tags]}}],
)
print(updated_result.raw_result)
# {'n': 1, 'nModified': 1, 'ok': 1.0, 'updatedExisting': True}

ref:
https://docs.mongodb.com/master/reference/operator/update/positional-filtered/

Update A Dictionary Field

Set a key/value in a dictionary field.

tutorial.data = {
    "price_per_message": 1200,
    "inbox": []
}

new_inbox = [
    {
        "id": "5af118c598eacb528e8fb8f9",
        "sender": "5a13239eaf9c462c611510fc"
    },
    {
        "id": "5af1117298eacb212a8fb8e9",
        "sender": "5a99554be9a21d5ff38b8ca5"
    }
]
tutorial.update(set__data__inbox=new_inbox)

ref:
https://stackoverflow.com/questions/21158028/updating-a-dictfield-in-mongoengine

Upsert: Update Or Create

You must use upsert=true with uniquely indexed fields. If you don't need the modified document, you should just use update_one(field1=123, field2=456, upsert=True).

Additionally, remember that modify() always reloads the whole object even the original one only loads specific fields with only(). Try to avoid using document.DB_QUERY_METHOD(), and using User.objects.filter().only().modify() or User.objects.filter().update() when it is possible.

tag_schedule = TagSchedule.objects \
    .filter(user=user_id, tag='vip') \
    .modify(
        started_at=started_at,
        ended_at=ended_at,
        upsert=True
    )

user = User.objects \
    .filter(id=user.id, tutorials__buy_diamonds__version=None) \
    .modify(set__tutorials__buy_diamonds__version='v1')

updated = User.objects \
    .filter(user=user_id, tag=tag) \
    .update_one(
        push__followers=new_follower,
    )

ref:
https://docs.mongodb.com/manual/reference/method/db.collection.update/#update-with-unique-indexes
http://docs.mongoengine.org/apireference.html#mongoengine.queryset.QuerySet.modify
http://docs.mongoengine.org/apireference.html#mongoengine.queryset.QuerySet.update_one

Upsert: Get Or Create

buy_diamonds = BuyDiamonds.objects.filter(user_id=user.id).upsert_one()

ref:
http://docs.mongoengine.org/apireference.html#mongoengine.queryset.QuerySet.upsert_one

Store Files On GridFS

# models.py
class User(db.Document):
    username = db.StringField()
    image = db.ImageField(collection_name='user.images')
# tasks.py
import bson
import gridfs
import mongoengine

@celery.shared_task(bind=True, ignore_result=True)
def gridfs_save(task, user_id, format='JPEG', raw_data: bytes=None, **kwargs):
    image_id = None

    if raw_data is None:
        user = User.objects.only('image').get(id=user_id)
        if user.image.grid_id:
            image_id, raw_data = user.image.grid_id, user.image.read()

    if not raw_data:
        return

    gf = gridfs.GridFS(mongoengine.connection.get_db(), User.image.collection_name)

    with io.BytesIO(raw_data) as raw_image:
        with Image.open(raw_image) as image:
            image = image.convert('RGB')
            with io.BytesIO() as buffer:
                image.save(buffer, format=format, quality=80, **kwargs)
                buffer.seek(0)
                grid_id = gf.put(buffer, format=format, width=image.width, height=image.height, thumbnail_id=None)

    # NOTE: If function was passed with raw_data, only override if ID is the same as the read
    query = mongoengine.Q(id=user_id)
    if image_id:
        query = query & mongoengine.Q(image=image_id)

    user = User.objects.only('image').filter(query).modify(
        __raw__={'$set': {'image': grid_id}},
        new=False,
    )

    def cleanup():
        # Delete the old image
        if user and user.image:
            yield user.image.grid_id

        # The user image was already changed before the scheduled optimization took place
        # Drop the optimized image
        if user is None and image_id:
            yield image_id

    gridfs_delete.apply_async(kwargs=dict(
        collection=User.image.collection_name,
        grid_ids=list(cleanup()),
    ))

@celery.shared_task(bind=True, ignore_result=True)
def gridfs_delete(task, collection, grid_ids):
    gf = gridfs.GridFS(mongoengine.connection.get_db(), collection)
    for grid_id in grid_ids:
        gf.delete(bson.ObjectId(grid_id))

ref:
http://docs.mongoengine.org/guide/gridfs.html

Store Datetime

MongoDB stores datetimes in UTC.

ref:
https://docs.mongodb.com/manual/reference/method/Date/

Two-phase Commit

The easiest way to think about 2-phase commit is idempotency, i.e., if you run a update many times, the results would "be the same": initial -> pending -> applied -> done.

ref:
https://docs.mongodb.com/manual/tutorial/perform-two-phase-commits/

Aggregation Pipeline

  • $match: Filters documents.
  • $project: Modifies document fields.
  • $addFields: Adds or overrides document fields.
  • $group: Groups documents by fields.
  • $lookup: Joins another collection.
  • $replaceRoot: Promotes an embedded document field to the top level and replace all other fields.
  • $unwind: Expanses an array field into multiple documents along with original documents.
  • $facet: Processes multiple pipelines within one stage and output to different fields.

There are special system variables, for instance, $$ROOT, $$REMOVE, $$PRUNE, which you could use in some stages of the aggregation pipeline.

ref:
https://docs.mongodb.com/manual/reference/aggregation-variables/#system-variables

Return Date As Unix Timestamp

import datetime

def stages():
    yield {'$project': {
        'createdAt': {'$floor': {'$divide': [{'$subtract': ['$$created', datetime.datetime.utcfromtimestamp(0)]}, 1000]}},
    }}

try:
    docs = MessagePackProduct.objects.aggregate(*stages())
except StopIteration:
    docs = []
else:
    for doc in docs:
        print(doc)

ref:
https://stackoverflow.com/questions/39274311/convert-iso-date-to-timestamp-in-mongo-query

Match Multiple Conditions Which Store In An Array Fields

db.getCollection('feature.promotions').insert({
    "name": "女 / 六月排行榜:寶石獵人",
    "nbf": ISODate("2018-05-31 16:00:00.000Z"),
    "exp": ISODate("2018-06-30 15:59:00.001Z"),
    "positions": {
        "discover": {
            "urls": [
                "https://swag.live/promo/events/2018/Jun/female/banner.html"
            ]
        }
    },
    "requirements" : [
        {
            // users who like women and their app version is greater than v2.21
            "preferences" : [
                "gender:female"
            ],
            "version_major_min": 2.0,
            "version_minor_min": 21.0
        },
        {
            // female CPs
            "tags" : [
                "stats",
                "gender:female"
            ]
        }
    ]
});
import werkzeug

user_agent = werkzeug.UserAgent('swag/2.25.1 (iPhone; iOS 11.4.1; Scale/2.00; com.live.swag.enterprise; zh-tw)')
user_preferences = ['gender:female', 'gender:male']
user_tags = ['beta', 'vip']
user_platforms = [user_agent.platform]

def stages():
    now = utils.utcnow()

    yield {'$match': {
        '$and': [
            {'nbf': {'$lte': now}},
            {'exp': {'$gt': now}},
            {'requirements': {'$elemMatch': {
                'preferences': {'$not': {'$elemMatch': {'$nin': user_preferences}}},
                'tags': {'$not': {'$elemMatch': {'$nin': user_tags}}},
                'platforms': {'$not': {'$elemMatch': {'$nin': user_platforms}}},
                '$or': [
                    {'$and': [
                        {'version_major_min': {'$lte': user_agent.version.major}},
                        {'version_minor_min': {'$lte': user_agent.version.minor}},
                    ]},
                    {'$and': [
                        {'version_minor_min': {'$exists': False}},
                        {'version_minor_min': {'$exists': False}},
                    ]},
                ],
            }}},
        ],
    }}
    yield {'$project': {
        'name': True,
        'nbf': True,
        'exp': True,
        'positions': {'$objectToArray': '$positions'},
    }}
    yield {'$unwind': '$positions'}
    yield {'$sort': {
        'exp': 1,
    }}
    yield {'$project': {
        '_id': False,
        'name': True,
        'position': '$positions.k',
        'url': {'$arrayElemAt': ['$positions.v.urls', 0]},
        'startedAt': {'$floor': {'$divide': [{'$subtract': ['$nbf', constants.UNIX_EPOCH]}, 1000]}},
        'endedAt': {'$floor': {'$divide': [{'$subtract': ['$exp', constants.UNIX_EPOCH]}, 1000]}},
    }}
    yield {'$group': {
        '_id': '$position',
        'items': {'$push': '$$ROOT'},
    }}

try:
    docs = Promotion.objects.aggregate(*stages())
except StopIteration:
    docs = []
else:
    docs = list(docs)

ref:
https://docs.mongodb.com/manual/reference/operator/query/in/
https://docs.mongodb.com/manual/reference/operator/query/nin/
https://docs.mongodb.com/manual/reference/operator/aggregation/setIsSubset/

Do Distinct With $group

def stages():
    yield {'$match': {
        'tags': 'some_tag',
    }}
    yield {'$unwind': '$unlocks'}
    yield {'$replaceRoot': {'newRoot': '$unlocks'}}
    yield {'$match': {
        '_cls': 'MessagePackUnlock',
    }}
    yield {'$group': {
        '_id': '$user',
        'timestamp': {'$first': '$timestamp'},
    }}

for unlock in MessagePackMessage.objects.aggregate(*stages()):
    tasks.offline_purchase_pack.apply(kwargs=dict(
        user_id=unlock['_id'],
        message_pack_id=message_pack.id,
        timestamp=unlock['timestamp'],
    ))

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/group/

Slice Items In Each $group

import random

def stages():
    yield {'$match': {'tags': {'$regex': '^badge:'}}}
    yield {'$unwind': {'path': '$tags', 'includeArrayIndex': 'index'}}
    yield {'$match': {'tags': {'$regex': '^badge:'}}}
    yield {'$project': {'_id': True, 'tag': '$tags', 'index': {'$mod': ['$index', random.random()]}}}
    yield {'$sort': {'index': 1}}
    yield {'$group': {'_id': '$tag', 'users': {'$addToSet': '$_id'}}}
    yield {'$project': {'_id': True, 'users': {'$slice': ['$users', 1000]}}}

docs = User.objects.aggregate(*stages())
for doc in docs:
    badge, user_ids = doc['_id'], doc['users']

Collect Items With $group And $addToSet

User data:

{
    "_id" : ObjectId("5a66d5c2af9c462c617ce552"),
    "username" : "gibuloto",
    "tags" : [ 
        "beta"
    ],
    "schedules" : [ 
        {
            "tag" : "stats",
            "nbf" : ISODate("2018-02-01T16:00:00.000Z"),
            "exp" : ISODate("2018-08-12T16:00:00.000Z")
        }, 
        {
            "tag" : "vip",
            "nbf" : ISODate("2018-05-13T16:00:00.000Z"),
            "exp" : ISODate("2018-05-20T16:00:00.000Z")
        }
    ]
}
def stages():
    now = utils.utcnow()

    yield {'$match': {
        'schedules': {'$elemMatch': {
            'nbf': {'$lte': now},
            'exp': {'$gte': now}
        }}
    }}
    yield {'$unwind': '$schedules'}
    yield {'$match': {
        'schedules.nbf': {'$lte': now},
        'schedules.exp': {'$gte': now}
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'username': True,
        'tag': '$schedules.tag',
        'nbf': '$schedules.nbf',
        'exp': '$schedules.exp'
    }}
    yield {'$group': {
        '_id': '$id',
        'tags': {'$addToSet': '$tag'},
    }}

for user_tag_schedule in User.objects.aggregate(*stages()):
    print(user_tag_schedule)

# output:
# {'_id': ObjectId('579b9387b7af8e1fd1635da9'), 'tags': ['stats']}
# {'_id': ObjectId('5a66d5c2af9c462c617ce552'), 'tags': ['chat', 'vip']}

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/group/

Project A New Field Based On Whether Elements Exist In Another Array Field

Use $addFields with $cond.

def stages():
    user_preferences = g.user.settings.preferences or ['gender:female']
    yield {'$match': {
        'gender': {'$in': [prefix_gender.replace('gender:', '') for prefix_gender in user_preferences]}
    }}

    yield {'$addFields': {
        'isPinned': {'$cond': {
            'if': {'$in': [constants.tags.HIDDEN, '$badges']},
            'then': True,
            'else': False,
        }},
    }}
    yield {'$sort': {
        'isPinned': -1,
        'posted_at': -1,
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'author': '$author',
        'title': '$title',
        'location': '$location',
        'postedAt': {'$floor': {'$divide': [{'$subtract': ['$posted_at', constants.UNIX_EPOCH]}, 1000]}},
        'viewCount': '$view_count',
        'commentCount': {'$size': {'$ifNull': ['$comments', []]}},
        'badges': '$badges',
        'isPinned': '$isPinned',
    }}

try:
    results = Post.objects.aggregate(*stages()).next()
except StopIteration:
    return Response(status=http.HTTPStatus.NOT_FOUND)

ref:
https://stackoverflow.com/questions/16512329/project-new-boolean-field-based-on-element-exists-in-an-array-of-a-subdocument
https://docs.mongodb.com/manual/reference/operator/aggregation/project/
https://docs.mongodb.com/manual/reference/operator/aggregation/addFields/
https://docs.mongodb.com/manual/reference/operator/aggregation/cond/

Project And Filter Out Elements Of An Array With $filter

Elements in details might have no value field.

def stages():
    yield {'$match': {
        '_id': bson.ObjectId(post_id),
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'author': '$author',
        'title': '$title',
        'location': '$location',
        'postedAt': {'$floor': {'$divide': [{'$subtract': ['$posted_at', constants.UNIX_EPOCH]}, 1000]}},
        'viewCount': '$view_count',
        'commentCount': {'$size': '$comments'},
        'details': [
            {'key': 'gender', 'value': '$gender'},
            {'key': 'pricing', 'value': '$pricing'},
            {'key': 'lineId', 'value': {'$ifNull': ['$lineId', None]}},
            {'key': 'description', 'value': {'$ifNull': ['$description', None]}},
        ],
    }}
    yield {'$addFields': {
        'details': {
            '$filter': {
                'input': '$details',
                'as': 'detail',
                'cond': {'$ne': ['$$detail.value', None]},
            }
        }
    }}

try:
    post = next(Post.objects.aggregate(*stages()))
except StopIteration:
    return Response(status=http.HTTPStatus.NOT_FOUND)

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/filter/#exp._S_filter
https://docs.mongodb.com/manual/reference/operator/aggregation/addFields/

Project Specific Fields Of Elements Of An Array With $map

def stages():
    yield {'$match': {
        '_id': bson.ObjectId(post_id),
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'author': '$author',
        'title': '$title',
        'location': '$location',
        'postedAt': {'$floor': {'$divide': [{'$subtract': ['$posted_at', constants.UNIX_EPOCH]}, 1000]}},
        'viewCount': '$view_count',
        'commentCount': {'$size': '$comments'},
        'details': [
            {'key': 'gender', 'value': '$gender'},
            {'key': 'pricing', 'value': '$pricing'},
            {'key': 'lineId', 'value': {'$ifNull': ['$lineId', None]}},
            {'key': 'description', 'value': {'$ifNull': ['$description', None]}},
        ],
        'media': {
            '$map': {
                'input': '$media',
                'as': 'transcoded_media',
                'in': {
                    'mimetype': '$$transcoded_media.mimetype',
                    'dash': '$$transcoded_media.presets.dash',
                    'hls': '$$transcoded_media.presets.hls',
                    'thumbnail': '$$transcoded_media.thumbnail',
                }
            }
        },
    }}
    yield {'$addFields': {
        'details': {
            '$filter': {
                'input': '$details',
                'as': 'detail',
                'cond': {'$ne': ['$$detail.value', None]},
            }
        }
    }}

try:
    post = next(Post.objects.aggregate(*stages()))
except StopIteration:
    return Response(status=http.HTTPStatus.NOT_FOUND)

ref:
https://stackoverflow.com/questions/33831665/how-to-project-specific-fields-from-a-document-inside-an-array

Do Advanced $project With $let

If you find youself want to do $project twice to tackle some fields, you should use $let.

def stages():
    yield {'$match': {
        'purchases.user': g.user.id,
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'name': True,
        'image': {
            '$ifNull': [{'$arrayElemAt': ['$images', 0]}, None],
        },
        'purchasedAt': {
            '$let': {
                'vars': {
                    'purchase': {
                        '$arrayElemAt': [
                            {
                                '$filter': {
                                    'input': '$purchases',
                                    'as': 'purchase',
                                    'cond': {
                                        '$and': [
                                            {'$eq': ['$$purchase.user', g.user.id]},
                                        ],
                                    },
                                },
                            },
                            0,
                        ],
                    },
                },
                'in': '$$purchase.timestamp',
            },
        },
    }}

try:
    docs = MessagePackProduct.objects.aggregate(*stages())
except StopIteration:
    docs = []
else:
    for doc in docs:
        print(doc)

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/let/

Deconstruct An Array Field With $unwind And Query Them With $match

def stages():
    category_tag = 'category:user'
    currency = 'usd'
    platform = 'ios'

    yield {'$match': {
        'active': True,
        'tags': category_tag,
        'total': {'$gt': 0},
        'preview_message': {'$exists': True},
    }}
    yield {'$unwind': '$skus'}
    yield {'$match': {
        'skus.attributes.platform': platform,
        'skus.attributes.currency': currency,
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'name': True,
        'caption': True,
        'description': True,
        'image': {
            '$ifNull': [{'$arrayElemAt': ['$images', 0]}, None],
        },
        'sku': '$skus',
        'created_at': True,
        'is_purchased': {'$in': [g.user.id, {'$ifNull': ['$purchases.user', []]}]},
    }}
    yield {'$sort': {'is_purchased': 1, 'created_at': -1}}

try:
    docs = MessagePackProduct.objects.aggregate(*stages())
except StopIteration:
    docs = []
else:
    for doc in docs:
        print(doc)

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/match/
https://docs.mongodb.com/manual/reference/operator/aggregation/unwind/
https://docs.mongodb.com/manual/reference/operator/aggregation/project/

Query The First Element In An Array Field With $arrayElemAt And $filter

def stages():
    category_tag = 'category:user'
    currency = 'usd'
    platform = 'ios'

    yield {'$match': {
        'active': True,
        'tags': category_tag,
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'name': True,
        'caption': True,
        'description': True,
        'image': {
            '$ifNull': [{'$arrayElemAt': ['$images', 0]}, None],
        },
        'preview_message': True,
        'metadata': True,
        'created_at': True,
        'updated_at': True,
        'active': True,
        'sku': {
            '$ifNull': [
                {
                    '$arrayElemAt': [
                        {
                            '$filter': {
                                'input': '$skus',
                                'as': 'sku',
                                'cond': {
                                    '$and': [
                                        {'$eq': ['$$sku.currency', currency]},
                                        {'$eq': ['$$sku.attributes.platform', platform]},
                                    ]
                                }
                            },
                        },
                        0
                    ]
                },
                None
            ],
        },
        'tags': True,
        'total': True,
        'is_bought': {'$in': [g.user.id, {'$ifNull': ['$purchases.user', []]}]},
    }}
    yield {'$sort': {'is_bought': 1, 'created_at': -1}}

try:
    docs = MessagePackProduct.objects.aggregate(*stages())
except StopIteration:
    docs = []
else:
    for doc in docs:
        print(doc)

ref:
https://docs.mongodb.com/master/reference/operator/aggregation/filter/
https://stackoverflow.com/questions/3985214/retrieve-only-the-queried-element-in-an-object-array-in-mongodb-collection

Join Another Collection Using $lookup

def stages():
    yield {'$match': {
        'tags': 'pack:prod_CR1u34BIpDbHeo',
    }}
    yield {'$lookup': {
        'from': 'user',
        'localField': 'sender',
        'foreignField': '_id',
        'as': 'sender_data',
    }}
    yield {'$unwind': '$sender_data'}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'sender': {
            'id': '$sender_data._id',
            'username': '$sender_data.username',
        },
        'caption': True,
        'posted_at': True,
    }}
    yield {'$sort': {'posted_at': -1}}

try:
    docs = Message.objects.aggregate(*stages())
except StopIteration:
    docs = []
else:
    for doc in docs:
        print(doc)

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/lookup/
https://thecodebarbarian.com/a-nodejs-perspective-on-mongodb-36-lookup-expr

Join Another Collection With Multiple Conditions Using pipeline in $lookup

To access the let variables in the $lookup pipeline, you could only use the $expr operator.

var start = ISODate('2018-09-22T00:00:00.000+08:00');

db.getCollection('feature.shop.order').aggregate([
    {'$match': {
        'payment.timestamp': {'$gte': start},
        'status': {'$in': ['paid']},
    }},
    {'$lookup': {
        'from': 'user',
        'localField': 'customer',
        'foreignField': '_id',
        'as': 'customer_data',
    }},
    {'$unwind': '$customer_data'},
    {'$project': {
        'variation': '$customer_data.experiments.message_unlock_price.variation',
        'amount_normalized': {'$divide': ['$amount', 100.0]},
    }},
    {'$addFields': {
        'amount_usd': {'$multiply': ['$amount_normalized', 0.033]},
    }},
    {'$group': {
       '_id': '$variation',
       'purchase_amount': {'$sum': '$amount_usd'},
       'paid_user_count': {'$sum': 1},
    }},
    {'$lookup': {
        'from': 'user',
        'let': {
            'variation': '$_id',
        },
        'pipeline': [
            {'$match': {
                'last_active': {'$gte': start},
                'experiments': {'$exists': true},
            }},
            {'$match': {
                '$expr': {
                    '$and': [
                         {'$eq': ['$experiments.message_unlock_price.variation', '$$variation']},
                    ],
                },
            }},
            {'$group': {
               '_id': '$experiments.message_unlock_price.variation',
               'count': {'$sum': 1},
            }},
        ],
        'as': 'variation_data',
    }},
    {'$unwind': '$variation_data'},
    {'$project': {
        '_id': 1,
        'purchase_amount': 1,
        'paid_user_count': 1,
        'total_user_count': '$variation_data.count',
    }},
    {'$addFields': {
        'since': start,
        'arpu': {'$divide': ['$purchase_amount', '$total_user_count']},
        'arppu': {'$divide': ['$purchase_amount', '$paid_user_count']},
    }},
    {'$sort': {'_id': 1}},
]);

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/lookup/#join-conditions-and-uncorrelated-sub-queries

or

def stages():
    yield {'$match': {'_id': bson.ObjectId(message_id)}}
    yield {'$limit': 1}
    yield {'$project': {
        '_cls': 1,
        'sender': 1,
        'unlocks': 1,
    }}
    yield {'$unwind': '$unlocks'}
    yield {'$match': {
        'unlocks.user': bson.ObjectId(user_id),
        'unlocks.amount': {'$gt': 0},
    }}
    yield {'$lookup': {
        'from': 'user',
        'let': {
            'sender': '$sender',
            'unlocker': '$unlocks.user',
        },
        'pipeline': [
            {'$match': {
                '$expr': {
                    '$or': [
                        {'$eq': ['$_id', '$$sender']},
                        {'$eq': ['$_id', '$$unlocker']}
                    ]
                }
            }}
        ],
        'as': 'users',
    }}
    yield {'$addFields': {
        'sender': {'$arrayElemAt': ['$users', 0]},
        'unlocker': {'$arrayElemAt': ['$users', 1]},
    }},
    yield {'$project': {
        '_id': 0,
        '_cls': 1,
        'id': '$_id',
        'sender': {
            'id': '$sender._id',
            'username': '$sender.username',
        },
        'unlocker': {
            'id': '$unlocker._id',
            'username': '$unlocker.username',
        },
        'amount': '$unlocks.amount',
    }}

try:
    context = Message.objects.aggregate(*stages()).next()
except StopIteration:
    pass

ref:
https://stackoverflow.com/questions/37086387/multiple-join-conditions-using-the-lookup-operator
https://docs.mongodb.com/manual/reference/operator/aggregation/lookup/#specify-multiple-join-conditions-with-lookup

Count Documents In Another Collection With $lookup (JOIN)

def stages():
    category_tag = f'category:{category}'
    yield {'$match': {
        'active': True,
        'tags': category_tag,
    }}
    yield {'$addFields': {
        'message_pack_id_tag': {'$concat': ['pack:', '$_id']},
    }}
    yield {'$lookup': {
        'from': 'message',
        'localField': 'message_pack_id_tag',
        'foreignField': 'tags',
        'as': 'total',
    }}
    yield {'$addFields': {
        'total': {'$size': '$total'}
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'name': True,
        'total': True,
    }}

try:
    docs = MessagePackProduct.objects.aggregate(*stages())
except StopIteration:
    docs = []
else:
    for doc in docs:
        print(doc)

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/lookup/#equality-match

Use $lookup as findOne() Which Returns An Object

Use $lookup and $unwind.

import bson

def stages():
    yield {'$match': {'_id': bson.ObjectId(gift_id)}}
    yield {'$limit': 1}
    yield {'$lookup': {
        'from': 'user',
        'localField': 'sender',
        'foreignField': '_id',
        'as': 'sender',
    }}
    yield {'$unwind': '$sender'}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'sender': {
            'id': '$sender._id',
            'username': '$sender.username',
        },
        'product_id': '$product._id',
        'sent_at': '$sent_at',
        'amount': '$cost.amount',
    }}

try:
    _context = Gift.objects.aggregate(*stages()).next()
except StopIteration:
    pass

ref:
https://stackoverflow.com/questions/37691727/how-to-use-mongodbs-aggregate-lookup-as-findone

Collapse Documents In An Array

def stages():
    yield {'$match': {
        'tags': f'tutorial:buy-diamonds:v1',
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'caption.text': True,
        'sender': True,
        'media.type': '$media.mimetype',
    }}
    yield {'$facet': {
        'inbox': [
            {'$sort': {'created_at': -1}},
            {'$limit': 10}
        ],
    }}
    yield {'$project': {
        'inbox': True,
        'required_unlock_count': {'$literal': 5},
        'price_per_message': {'$literal': 1200},
    }}

try:
    result = Message.objects.aggregate(*stages()).next()
except StopIteration:
    result = {}

JSON output:

{
    "inbox": [
        {
            "caption": {
                "text": "fuck yeah"
            },
            "id": "5aaba1e9593950337a90dcb3",
            "media": {
                "type": "video/mp4"
            },
            "sender": "5a66d5c2af9c462c617ce552"
        },
        {
            "caption": {
                "text": "test"
            },
            "id": "5ad549276b2c362a4efe5e21",
            "media": {
                "type": "image/jpeg"
            },
            "sender": "5a66d5c2af9c462c617ce552"
        }
    ],
    "price_per_message": 1200,
    "required_unlock_count": 5
}

Do Pagination With $facet And $project

def stages():
    # normal query
    yield {'$match': {
        'purchases.user': g.user.id,
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'name': True,
        'created_at': True,
        'meta': {
            'revision': '$revision',
            'tags': '$tags',
        },
    }}
    yield {'$sort': {'created_at': -1}}

    # pagination
    page = 0
    limit = 10
    yield {'$facet': {
        'meta': [
            {'$count': 'total'},
        ],
        'objects': [
            {'$skip': page * limit},
            {'$limit': limit},
        ]
    }}
    # JSON output:
    # {
    #    "meta": [
    #       {"total": 2}
    #    ],
    #    "objects": [
    #       {
    #          "id": "prod_CR1u34BIpDbHeo",
    #          "name": "Product Name 2"
    #       },
    #       {
    #          "id": "prod_Fkhf9JFK3Rdgk9",
    #          "name": "Product Name 1"
    #       }
    #    ]
    # }
    yield {'$project': {
        'total': {'$let': {
            'vars': {
                'meta': {'$arrayElemAt': ['$meta', 0]},
            },
            'in': '$$meta.total',
        }},
        'objects': True,
    }}
    # JSON output:
    # {
    #    "total": 2,
    #    "objects": [
    #       {
    #          "id": "prod_CR1u34BIpDbHeo",
    #          "name": "Product Name 2"
    #       },
    #       {
    #          "id": "prod_Fkhf9JFK3Rdgk9",
    #          "name": "Product Name 1"
    #       }
    #    ]
    # }

try:
    output = MessagePackProduct.objects.aggregate(*stages()).next()
except StopIteration:
    output = {}
else:
    print(output)

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/facet/
https://docs.mongodb.com/manual/reference/operator/aggregation/project/

Perform $facet + $project => Unwrap with $unwind => Do $facet + $project Again

def stages():
    yield {'$match': {
        'purchases.user': g.user.id,
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'name': True,
        'image': {
            '$ifNull': [{'$arrayElemAt': ['$images', 0]}, None],
        },
        'created_at': True,
    }}
    yield {'$sort': {'created_at': -1}}

    # pagination
    page = 0
    limit = 10
    yield {'$facet': {
        'meta': [
            {'$count': 'total'},
        ],
        'objects': [
            {'$skip': page * limit},
            {'$limit': limit},
        ]
    }}
    yield {'$project': {
        'total': {'$let': {
            'vars': {
                'meta': {'$arrayElemAt': ['$meta', 0]},
            },
            'in': '$$meta.total',
        }},
        'objects': True,
    }}

    # do $lookup after the pagination
    yield {'$unwind': '$objects'}
    yield {'$addFields': {
        'objects.message_pack_id_tag': {'$concat': ['pack:', '$objects.id']},
    }}
    yield {'$lookup': {
        'from': 'message',
        'localField': 'objects.message_pack_id_tag',
        'foreignField': 'tags',
        'as': 'objects.total',
    }}
    yield {'$addFields': {
        'objects.total': {'$size': '$objects.total'}
    }}

    # re-wrap into the pagination structure
    yield {'$facet': {
        'total_list': [
            {'$project': {
                'total': True,
            }},
        ],
        'objects': [
            {'$replaceRoot': {'newRoot': '$objects'}},
        ]
    }}
    yield {'$project': {
        'total': {'$let': {
            'vars': {
                'meta': {'$arrayElemAt': ['$total_list', 0]},
            },
            'in': '$$meta.total',
        }},
        'objects': True,
    }}

try:
    output = MessagePackProduct.objects.aggregate(*stages()).next()
except StopIteration:
    output = {}
else:
    print(output)

Do $group First To Reduce Numbers Of $lookup Calls

def stages():
    yield {'$match': {
        'tags': f'pack:{message_pack_id}',
    }}
    yield {'$group': {
        '_id': '$sender',
        'messages': {'$push': '$$ROOT'},
    }}
    yield {'$lookup': {
        'from': 'user',
        'localField': '_id',
        'foreignField': '_id',
        'as': 'sender_data',
    }}
    yield {'$unwind': '$messages'}
    yield {'$project': {
        '_id': False,
        'id': '$messages._id',
        'caption': {
            'text': '$messages.caption.text',
            'y': '$messages.caption.y',
        },
        'sender': {
            'id': {'$arrayElemAt': ['$sender_data._id', 0]},
            'username': {'$arrayElemAt': ['$sender_data.username', 0]},
        },
    }}

try:
    docs = Message.objects.aggregate(*stages())
except StopIteration:
    docs = []
else:
    for doc in docs:
        print(doc)

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/group/

Useful Tools

Backup

$ mongodump -h  127.0.0.1:27017 --oplog -j=8 --gzip --archive=/data/mongodump.tar.gz

ref:
https://docs.mongodb.com/manual/reference/program/mongodump/

Restore

$ mongorestore --drop --gzip --archive=2018-08-12T03.tar.gz

This kind of error typically indicates some sort of issue with data corruption, which is often caused by problems with the underlying storage device, file system or network connection.

restoring indexes for collection swag.message from metadata
Failed: swag.message: error creating indexes for swag.message: createIndex error: BSONElement: bad type -47

ref:
https://docs.mongodb.com/manual/reference/program/mongorestore/

Profiling

You could also set the profiling level to 2 to record every query.

db.setProfilingLevel(2);

db.getCollection('system.profile').find({
    'ns': { 
        '$nin' : ['swag.system.profile','swag.system.indexes', 'swag.system.js', 'swag.system.users']
    }
}).limit(5).sort({'ts': -1}).pretty();

ref:
https://docs.mongodb.com/manual/tutorial/manage-the-database-profiler/
https://stackoverflow.com/questions/15204341/mongodb-logging-all-queries

$ pip install mongotail

# set the profiling level
$ mongotail 127.0.0.1:27017/swag -l 2

# tail logs
$ mongotail 127.0.0.1:27017/swag -f -m -f

ref:
https://github.com/mrsarm/mongotail

Monitoring

$ mongotop
$ mongostat

ref:
https://docs.mongodb.com/manual/reference/program/mongotop/
https://docs.mongodb.com/manual/reference/program/mongostat/

$ pip install mtools

$ mloginfo mongod.log

ref:
https://github.com/rueckstiess/mtools

Speed up Python and Node.js builds on Travis CI

Speed up Python and Node.js builds on Travis CI

Travis CI's caching archives all directories listed in the configuration and uploads them to Amazon S3. Cached contents are available to any build on the repository, including Pull Requests. For Python and Node.js projects, you could cache both site-packages and node_modules directories in every Travis CI build.

Here is an example of .travis.yml:

sudo: false

language: python

python:
  - "2.7"

node_js: 4

cache:
  directories:
    - $HOME/.cache/pip
    - $HOME/virtualenv/python2.7.9/lib/python2.7/site-packages
    - node_modules

before_install:
  - pip install -U pip

install:
  - pip install -r requirements.txt
  - pip install coverage --ignore-installed
  - npm install

script:
  - coverage run manage.py test

In the case of mine, after applying these changes, the installation time of pip and npm reduces from 180 seconds to 5 seconds.

One thing should be mentioned here: Since we didn't specify any bin folder in the configuration (and I don't think that's necessary), any execution file that being installed by pip such as coverage or django-admin.py will not exist in subsequent builds. If you need those commands, you could just force install them by adding pip install some_package --ignore-installed.

References:

Caching Dependencies and Directories
https://docs.travis-ci.com/user/caching/

How to cache requirements for a Django project on Travis-CI?
http://stackoverflow.com/questions/19422229/how-to-cache-requirements-for-a-django-project-on-travis-ci

如何在 Travis CI 加快 Python 單元測試速度
https://tzangms.com/how-to-speed-up-python-unit-test-on-travis-ci/

Integrate with webpages using CasperJS (built on top of PhantomJS)

Integrate with webpages using CasperJS (built on top of PhantomJS)

PhantomJS is a headless and scriptable WebKit runtime (aka browser) with JavaScript API.

Usage

in script.js

Login and delete spare movie tags on Douban.

var casper = require('casper').create({
  pageSettings: {
    loadImages: true,
    loadPlugins: false
  },
  logLevel: 'debug',
  verbose: true
});

// save session cookies
var fs = require('fs');
var page = require('webpage').create();

var cookieFile = 'cookies.json';

var saveSessionCookie = function() {
  try {
    fs.statSync(cookieFile);
  } catch (e) {
    fs.write(cookieFile, JSON.stringify(phantom.cookies), 'w');
  }
}

if (fs.isFile(cookieFile)) {
  Array.prototype.forEach.call(JSON.parse(fs.read(cookieFile)), function(x) {
    phantom.addCookie(x);
  });
}

// script
var loginUrl = 'https://accounts.douban.com/login';
var startUrl = 'https://movie.douban.com/people/vinta/all';

var tags_do_not_delete = [
  '丹麦', '新西兰', '新加坡', '以色列', '印度', '意大利', '瑞典', '墨西哥', '俄罗斯', '西班牙', '比利时'
];

casper.start(loginUrl, function() {
  this.echo(this.getCurrentUrl());
  this.echo(this.getTitle());

  this.capture('login.png');

  var data = {
    form_email: 'xxx',
    form_password: 'xxx'
  };

  // 可能會被豆瓣要求輸入驗證碼
  // 可以用 casperjs script.js --remote-debugger-port=9000
  // 先打開 login.png 看驗證碼是什麼
  // 到 http://127.0.0.1:9000/ 的 console 手動輸入驗證碼
  // data['captcha-solution'] = '123';

  this.waitForSelector('form#lzform');
  this.fill('form#lzform', data, true);
});

casper.then(function() {
  this.echo(this.getCurrentUrl());
  this.echo(this.getTitle());

  saveSessionCookie();

  this.capture('all.png');

  this.open(startUrl).then(function() {
    this.waitForSelector('#open_tags', function() {
      this.click('#open_tags');
    });

    this.waitWhileSelector('#open_tags');
  });
});

casper.then(function() {
  this.echo(this.getCurrentUrl());
  this.echo(this.getTitle());

  var links = this.evaluate(function() {
    var tagList = document.querySelectorAll('ul.tag-list li a');
    var theLinks = Array.prototype.map.call(tagList, function(elem) {
        return {
          tag: elem.textContent.trim(),
          href: elem.getAttribute('href'),
          count: parseInt(elem.nextElementSibling.textContent, 10)
        };
    });

    return theLinks;
  });

  var filteredLinks = links.filter(function(link) {
    if (link.count < 5 && tags_do_not_delete.indexOf(link.tag) == -1) {
      return true;
    }
    return false;
  });

  this.each(filteredLinks, function(self, link) {
    this.echo(link.tag + ', ' + link.count);

    self.thenOpen(link.href, function() {
      this.echo(this.getCurrentUrl());
      this.echo(this.getTitle());

      this.waitForSelector('#tag-del', function() {
        this.click('#tag-del');
      });

      this.waitForSelector('input[name="del_submit"]', function() {
        this.click('input[name="del_submit"]');
      });
    });
  });
});

casper.run();

To evaluate JavaScript code in the context of the webpage, you must use evaluate() function. The context is a sandbox.

ref:
http://docs.casperjs.org/en/latest/modules/index.html

ref:
https://github.com/vinta/playground/blob/master/casperjs/script.js

Save session cookies

--cookies-file=xxx.txt only store non-session cookies (which remain your logged-in or authenticated status). You have to save every cookie manually.

var casper = require('casper').create();

// save session cookies
var fs = require('fs');
var page = require('webpage').create();

var cookieFile = 'cookies.json';

var saveSessionCookie = function() {
  try {
    fs.statSync(cookieFile);
  } catch (e) {
    fs.write(cookieFile, JSON.stringify(phantom.cookies), 'w');
  }
}

if (fs.isFile(cookieFile)) {
  Array.prototype.forEach.call(JSON.parse(fs.read(cookieFile)), function(x) {
    phantom.addCookie(x);
  });
}

casper.start('yourUrl', function() {
  // do your shit
});

ref:
http://stackoverflow.com/questions/18739354/how-can-i-use-persisted-cookies-from-a-file-using-phantomjs

Run

$ docker run --rm -v `pwd`:/data vinta/casperjs:1.1.3 script.js

# or

$ brew install casperjs
$ casperjs script.js --disk-cache=true

ref:
https://hub.docker.com/r/vinta/casperjs/
https://hub.docker.com/r/zopanix/casperjs/

ref:
http://phantomjs.org/api/command-line.html

Run in debugging mode

$ casperjs script.js --remote-debugger-port=9000
$ open http://127.0.0.1:9000/
  • Click the first link (something like "file:///usr/local/Cellar/xxx").
  • In Sources tab, press "Enable Debugging" button.
  • In Console tab, type "__run();" to start.
  • Once breakpoints worked, you could go to Console tab to debug.

ref:
http://phantomjs.org/troubleshooting.html