The Incomplete Guide to Google Kubernetes Engine

The Incomplete Guide to Google Kubernetes Engine

Kubernetes is the de facto standard of container orchestration (deploying workloads on distributed systems). Google Kubernetes Engine (GKE) is the managed Kubernetes as a Service provided by Google Cloud Platform.

Currently, GKE is still your best choice compares to other managed Kubernetes services, i.e., Azure Container Service (AKS) and Amazon Elastic Container Service for Kubernetes (EKS).

ref:
https://kubernetes.io/
https://cloud.google.com/kubernetes-engine/

You could find the sample project on GitHub.
https://github.com/vinta/simple-project-on-k8s

Installation

Install gcloud to create Kubernetes clusters on Google Cloud Platform.

Install kubectl to interact with any Kubernetes cluster.

$ brew install kubernetes-cli
# or
$ gcloud components install kubectl
$ gcloud components update

ref:
https://cloud.google.com/sdk/docs/
https://kubernetes.io/docs/tasks/tools/install-kubectl/

Some useful tools:

Concepts

Nodes

  • Cluster: A set of machines, called nodes, that run containerized applications.
  • Node: A single virtual or physical machine that provides hardware resources.
  • Edge Node: The node which is exposed to the Internet.
  • Master Node: The node which is responsible for managing the whole cluster.

Objects

  • Pod: A group of tightly related containers. Each pod is like a logical host has its own IP, hostname, and storages.
  • PodPreset: A set of pre-defined configurations can be injected into Pods automatically.
  • Service: A load balancer of a set of Pods which selected by labels, also called Service Discovery.
  • Ingress: A revered proxy acts as an entry point to the cluster, which allows domain-based and path-based routing to different Services.
  • ConfigMap: Key-value configuration data can be mounted into containers or consumed as environment variables.
  • Secret: Similar to ConfigMap but for storing sensitive data only.
  • Volume: A ephemeral file system whose lifetime is the same as the Pod.
  • PersistentVolume: A persistent file system that can be mounted to the cluster, without being associated with any particular node.
  • PersistentVolumeClaim: A binding between a Pod and a PersistentVolume.
  • StorageClass: A storage provisioner which allows users to request storages dynamically.
  • Namespace: The way to partition a single cluster into multiple virtual groups.

Controllers

  • ReplicationController: Ensures that a specified number of Pods are always running.
  • ReplicaSet: The next-generation ReplicationController.
  • Deployment: The recommended way to deploy stateless Pods.
  • StatefulSet: Similar to Deployment but provides guarantees about the ordering and unique names of Pods.
  • DaemonSet: Ensures a copy of a Pod is running on every node.
  • Job: Creates Pods that runs to completion (exit with 0).
  • CronJob: A Job which can run at a specific time or run regularly.
  • HorizontalPodAutoscaler: Automatically scales the number of Pods based on CPU and memory utilization or custom metric targets.

ref:
https://kubernetes.io/docs/concepts/
https://kubernetes.io/docs/reference/glossary/?all=true

Setup Google Cloud Accounts

Make sure you use the right Google Cloud Platform account.

$ gcloud init
# or
$ gcloud config configurations list
$ gcloud config configurations activate default
$ gcloud config set project simple-project-198818
$ gcloud config set compute/region asia-east1
$ gcloud config set compute/zone asia-east1-a
$ gcloud config list

Create Clusters

Create a regional cluster in asia-east1 region which has 1 node in each of the asia-east1 zones using --region=asia-east1 --num-nodes=1. By default, a cluster only creates its cluster master and nodes in a single compute zone.

# show available OSs and versions of Kubernetes
$ gcloud container get-server-config

# show available CPU platforms in the desired zone
$ gcloud compute zones describe asia-east1-a
availableCpuPlatforms:
- Intel Skylake
- Intel Broadwell
- Intel Haswell
- Intel Ivy Bridge

$ gcloud container clusters create demo \
--cluster-version=1.11.6-gke.6 \
--node-version=1.11.6-gke.6 \
--scopes=gke-default,cloud-platform,storage-full,compute-ro,pubsub,https://www.googleapis.com/auth/cloud_debugger \
--region=asia-east1 \
--num-nodes=1 \
--enable-autoscaling --min-nodes=1 --max-nodes=10 \
--maintenance-window=20:00 \
--machine-type=n1-standard-4 \
--min-cpu-platform="Intel Skylake" \
--enable-ip-alias \
--create-subnetwork="" \
--image-type=UBUNTU \
--node-labels=custom.kubernetes.io/fs-type=xfs

$ gcloud container clusters describe demo --region=asia-east1

$ kubectl version
Client Version: version.Info{Major:"1", Minor:"13", GitVersion:"v1.13.3", GitCommit:"721bfa751924da8d1680787490c54b9179b1fed0", GitTreeState:"clean", BuildDate:"2019-02-04T04:48:55Z", GoVersion:"go1.11.5", Compiler:"gc", Platform:"darwin/amd64"}
Server Version: version.Info{Major:"1", Minor:"11+", GitVersion:"v1.11.5-gke.5", GitCommit:"9aba9c1237d9d2347bef28652b93b1cba3aca6d8", GitTreeState:"clean", BuildDate:"2018-12-11T02:36:50Z", GoVersion:"go1.10.3b4", Compiler:"gc", Platform:"linux/amd64"}

$ kubectl get nodes -o wide

You can only get a regional cluster by creating a whole new cluster, Google currently won't allow you to turn an existed cluster into a regional one.

ref:
https://cloud.google.com/sdk/gcloud/reference/container/clusters/create
https://cloud.google.com/compute/docs/machine-types
https://cloud.google.com/kubernetes-engine/docs/concepts/regional-clusters
https://cloud.google.com/kubernetes-engine/docs/how-to/min-cpu-platform
https://cloud.google.com/kubernetes-engine/docs/how-to/alias-ips

Google Kubernetes Engine clusters running Kubernetes version 1.8+ enable Role-Based Access Control (RBAC) by default. Therefore, you must explicitly provide --enable-legacy-authorization option to disable RBAC.

ref:
https://cloud.google.com/kubernetes-engine/docs/how-to/role-based-access-control

Delete the cluster. After you delete the cluster, you might also need to manually delete persistent disks (under Compute Engine), load balancers (under Network services) and static IPs (under VPC network) which belong to the cluster on Google Cloud Platform Console.

$ gcloud container clusters delete demo --region=asia-east1

Create Node Pools

Create a cluster with preemptible VMs which are much cheaper than regular instances using --preemptible.

You might receive The connection to the server x.x.x.x was refused - did you specify the right host or port? error while upgrading the cluster which includes adding new node pools.

$ gcloud container node-pools create n1-standard-4-pre \
--cluster=demo \
--node-version=1.11.6-gke.6 \
--scopes=gke-default,storage-full,compute-ro,pubsub,https://www.googleapis.com/auth/cloud_debugger \
--region=asia-east1 \
--num-nodes=1 \
--enable-autoscaling --min-nodes=1 --max-nodes=10 \
--machine-type=n1-standard-4 \
--min-cpu-platform="Intel Skylake" \
--node-labels=custom.kubernetes.io/scopes-storage-full=true
--enable-autorepair \
--preemptible

$ gcloud container node-pools list --cluster=demo --region=asia-east1

$ gcloud container operations list

ref:
https://cloud.google.com/sdk/gcloud/reference/container/node-pools/create
https://cloud.google.com/kubernetes-engine/docs/concepts/preemptible-vm
https://cloud.google.com/compute/docs/regions-zones/

Build Docker Images

You could use Google Cloud Build or any Continuous Integration (CI) service to automatically build Docker images and push them to Google Container Registry.

Furthermore, you need to tag your Docker images appropriately with the registry name format: region_name.gcr.io/your_project_id/your_image_name:version.

ref:
https://cloud.google.com/container-builder/
https://cloud.google.com/container-registry/

An example of cloudbuild.yaml:

substitutions:
  _REPO_NAME: simple-api
steps:
- id: pull-image
  name: gcr.io/cloud-builders/docker
  entrypoint: "/bin/sh"
  args: [
    "-c",
    "docker pull asia.gcr.io/$PROJECT_ID/$_REPO_NAME:$BRANCH_NAME || true"
  ]
  waitFor: [
    "-"
  ]
- id: build-image
  name: gcr.io/cloud-builders/docker
  args: [
    "build",
    "--cache-from", "asia.gcr.io/$PROJECT_ID/$_REPO_NAME:$BRANCH_NAME",
    "--label", "git.commit=$SHORT_SHA",
    "--label", "git.branch=$BRANCH_NAME",
    "--label", "ci.build-id=$BUILD_ID",
    "-t", "asia.gcr.io/$PROJECT_ID/$_REPO_NAME:$SHORT_SHA",
    "simple-api/"
  ]
  waitFor: [
    "pull-image",
  ]
images:
  - asia.gcr.io/$PROJECT_ID/$_REPO_NAME:$SHORT_SHA

ref:
https://cloud.google.com/container-builder/docs/build-config
https://cloud.google.com/container-builder/docs/create-custom-build-steps

Of course, you could also manually push Docker images to Google Container Registry.

$ gcloud auth configure-docker && \
gcloud config set project simple-project-198818 && \
export PROJECT_ID="$(gcloud config get-value project -q)"

$ docker build --rm -t asia.gcr.io/${PROJECT_ID}/simple-api:v1 simple-api/

$ gcloud docker -- push asia.gcr.io/${PROJECT_ID}/simple-api:v1

$ gcloud container images list --repository=asia.gcr.io/${PROJECT_ID}

ref:
https://cloud.google.com/container-registry/docs/pushing-and-pulling

Moreover, you should always adopt Multi-Stage builds for your Dockerfiles.

FROM python:3.6.8-alpine3.7 AS builder

ENV PATH=$PATH:/root/.local/bin
ENV PIP_DISABLE_PIP_VERSION_CHECK=1

WORKDIR /usr/src/app/

RUN apk add --no-cache --virtual .build-deps \
        build-base \
        linux-headers \
        openssl-dev \
        zlib-dev

COPY requirements.txt .

RUN pip install --user -r requirements.txt && \
    find $(python -m site --user-base) -type f -name "*.pyc" -delete && \
    find $(python -m site --user-base) -type f -name "*.pyo" -delete && \
    find $(python -m site --user-base) -type d -name "__pycache__" -delete

###

FROM python:3.6.8-alpine3.7

ENV PATH=$PATH:/root/.local/bin
ENV FLASK_APP=app.py

WORKDIR /usr/src/app/

RUN apk add --no-cache --virtual .run-deps \
    ca-certificates \
    curl \
    openssl \
    zlib

COPY --from=builder /root/.local/ /root/.local/
COPY . .

EXPOSE 8000

CMD ["uwsgi", "--ini", "config/uwsgi.ini", "--single-interpreter", "--enable-threads", "--http", ":8000"]

ref:
https://medium.com/@tonistiigi/advanced-multi-stage-build-patterns-6f741b852fae

Create Pods

No, you should never create Pods directly which are so-called naked Pods. Use Deployment instead.

ref:
https://kubernetes.io/docs/concepts/workloads/pods/pod-overview/

Pods have following life cycles (states):

  • Pending
  • Running
  • Succeeded
  • Failed
  • Unknown

ref:
https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/

Inspect Pods

Show information about Pods.

$ kubectl get all

$ kubectl get deploy

$ kubectl get pods
$ kubectl get pods -l app=simple-api
$ kubectl get pods

$ kubectl describe pod simple-api-5bbf4dd4f9-8b4c9
$ kubectl get pod simple-api-5bbf4dd4f9-8b4c9 -o yaml

ref:
https://kubernetes.io/docs/reference/generated/kubectl/kubectl-commands#describe
https://kubernetes.io/docs/reference/generated/kubectl/kubectl-commands#get

Execute a command in a container.

$ kubectl exec -i -t simple-api-5bbf4dd4f9-8b4c9 -- sh

ref:
https://kubernetes.io/docs/reference/generated/kubectl/kubectl-commands#exec

Tail Pod logs. It is also recommended to use kubetail.

$ kubectl logs simple-api-5bbf4dd4f9-8b4c9 -f
$ kubectl logs deploy/simple-api -f
$ kubectl logs statefulset/mongodb-rs0 -f

$ kubetail simple-api
$ kubetail simple-worker
$ kubetail mongodb-rs0 -c db

ref:
https://kubernetes.io/docs/reference/generated/kubectl/kubectl-commands#logs
https://github.com/johanhaleby/kubetail

List all Pods on a certain node.

$ kubectl describe node gke-demo-default-pool-fb33ac26-frkw
...
Non-terminated Pods:         (7 in total)
  Namespace                  Name                                              CPU Requests  CPU Limits  Memory Requests  Memory Limits
  ---------                  ----                                              ------------  ----------  ---------------  -------------
  default                    mongodb-rs0-1                                     2100m (53%)   4 (102%)    4G (30%)         4G (30%)
  default                    simple-api-84554476df-w5b5g                       500m (25%)    1 (51%)     1G (16%)         1G (16%)
  default                    simple-worker-6495b6b74b-rqplv                    500m (25%)    1 (51%)     1G (16%)         1G (16%)
  kube-system                fluentd-gcp-v3.0.0-848nq                          100m (2%)     0 (0%)      200Mi (1%)       300Mi (2%)
  kube-system                heapster-v1.5.3-6447d67f78-7psb2                  138m (3%)     138m (3%)   301856Ki (2%)    301856Ki (2%)
  kube-system                kube-dns-788979dc8f-5zvfk                         260m (6%)     0 (0%)      110Mi (0%)       170Mi (1%)
  kube-system                kube-proxy-gke-demo-default-pool-3c058fcf-x7cv    100m (2%)     0 (0%)      0 (0%)           0 (0%)
...

$ kubectl get pods --all-namespaces -o wide --sort-by="{.spec.nodeName}"

Check resource usage.

$ kubectl top pods
$ kubectl top nodes

ref:
https://kubernetes.io/docs/reference/generated/kubectl/kubectl-commands#top
https://kubernetes.io/docs/tasks/debug-application-cluster/

Restart Pods.

# you could simply kill Pods which would restart automatically if your Pods are managed by any Deployment
$ kubectl delete pods -l app=simple-worker

# you could replace a resource by providing a manifest
$ kubectl replace --force -f simple-api/

ref:
https://stackoverflow.com/questions/40259178/how-to-restart-kubernetes-pods

Completely delete resources.

$ kubectl delete -f simple-api/ -R
$ kubectl delete deploy simple-api
$ kubectl delete deploy -l app=simple,role=worker

# delete a Pod forcefully
$ kubectl delete pod simple-api-668d465985-886h5 --grace-period=0 --force
$ kubectl delete deploy simple-api --grace-period=0 --force

# delete all resources under a namespace
$ kubectl delete daemonsets,deployments,services,statefulset,pvc,pv --all --namespace tick

ref:
https://kubernetes.io/docs/reference/generated/kubectl/kubectl-commands#delete

Create ConfigMaps

Create an environment-variable-like ConfigMap.

kind: ConfigMap
apiVersion: v1
metadata:
  name: simple-api
data:
  FLASK_ENV: production
  MONGODB_URL: mongodb://mongodb-rs0-0.mongodb-rs0.default.svc.cluster.local,mongodb-rs0-1.mongodb-rs0.default.svc.cluster.local,mongodb-rs0-3.mongodb-rs0.default.svc.cluster.local/demo?readPreference=secondaryPreferred&maxPoolSize=10
  CACHE_URL: redis://redis-cache.default.svc.cluster.local/0
  CELERY_BROKER_URL: redis://redis-broker.default.svc.cluster.local/0
  CELERY_RESULT_BACKEND: redis://redis-broker.default.svc.cluster.local/1

Load environment variables from a ConfigMap:

kind: Deployment
apiVersion: apps/v1
metadata:
  name: simple-api
  labels:
    app: simple-api
spec:
  replicas: 1
  selector:
    matchLabels:
      app: simple-api
  template:
    metadata:
      labels:
        app: simple-api
    spec:
      containers:
      - name: simple-api
        image: asia.gcr.io/simple-project-198818/simple-api:4fc4199
        command: ["uwsgi", "--ini", "config/uwsgi.ini", "--single-interpreter", "--enable-threads", "--http", ":8000"]
        envFrom:
        - configMapRef:
            name: simple-api
        ports:
        - containerPort: 8000

Create a file-like ConfigMap.

kind: ConfigMap
apiVersion: v1
metadata:
  name: redis-cache
data:
  redis.conf: |-
    maxmemory-policy allkeys-lfu
    appendonly no
    save ""

Mount files from a ConfigMap:

kind: Deployment
apiVersion: apps/v1
metadata:
  name: redis-cache
  labels:
    app: redis-cache
spec:
  replicas: 1
  selector:
    matchLabels:
      app: redis-cache
  template:
    metadata:
      labels:
        app: redis-cache
    spec:
      volumes:
      - name: config
        configMap:
          name: redis-cache
      containers:
      - name: redis
        image: redis:4.0.10-alpine
        command: ["redis-server"]
        args: ["/etc/redis/redis.conf", "--loglevel", "verbose", "--maxmemory", "1g"]
        volumeMounts:
        - name: config
          mountPath: /etc/redis
        ports:
        - containerPort: 6379

ref:
https://kubernetes.io/docs/tasks/configure-pod-container/configure-pod-configmap/

Only mount a single file with subPath.

kind: Deployment
apiVersion: apps/v1
metadata:
  name: redis-cache
  labels:
    app: redis-cache
spec:
  replicas: 1
  selector:
    matchLabels:
      app: redis-cache
  template:
    metadata:
      labels:
        app: redis-cache
    spec:
      volumes:
      - name: config
        configMap:
          name: redis-cache
      containers:
      - name: redis
        image: redis:4.0.10-alpine
        command: ["redis-server"]
        args: ["/etc/redis/redis.conf", "--loglevel", "verbose", "--maxmemory", "1g"]
        volumeMounts:
        - name: config
          mountPath: /etc/redis/redis.conf
          subPath: redis.conf
        ports:
        - containerPort: 6379

ref:
https://github.com/kubernetes/kubernetes/issues/44815#issuecomment-297077509

It is worth noting that changing ConfigMap or Secret won't trigger re-deploying Deployment. A workaround might be changing the name of ConfigMap every time you change the content of ConfigMap. If you mount ConfigMap as environment variables, you must trigger a re-deployment explicitly.

ref:
https://github.com/kubernetes/kubernetes/issues/22368

Create Secrets

First of all, Secrets are only base64 encoded, not encrypted.

Encode and decode a Secret value.

$ echo -n 'YOUR_SECRET_KEY' | base64
WU9VUl9TRUNSRVRfS0VZ

$ echo 'WU9VUl9TRUNSRVRfS0VZ' | base64 --decode
YOUR_SECRET_KEY

Create an environment-variable-like Secret.

kind: Secret
apiVersion: v1
metadata:
  name: simple-api
data:
  SECRET_KEY: WU9VUl9TRUNSRVRfS0VZ

Export data (base64-encoded) from a Secret.

$ kubectl get secret simple-project-com --export=true -o yaml

ref:
https://kubernetes.io/docs/concepts/configuration/secret/

Create Deployments With Probes

Deployment are designed for stateless (or nearly stateless) services. Deployment controls ReplicaSet and ReplicaSet controls Pod.

ref:
https://kubernetes.io/docs/concepts/workloads/controllers/deployment/

livenessProbe can be used to determine when an application must be restarted by Kubernetes, while readinessProbe can be used to determine when a container is ready to accept traffic.

ref:
https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/

It is also a best practice to always specify resource limits: resources.requests and resources.limits.

ref:
https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/

Create a Deployment with probes.

kind: Deployment
apiVersion: apps/v1
metadata:
  name: simple-api
  labels:
    app: simple-api
spec:
  replicas: 1
  selector:
    matchLabels:
      app: simple-api
  template:
    metadata:
      labels:
        app: simple-api
    spec:
      containers:
      - name: simple-api
        image: asia.gcr.io/simple-project-198818/simple-api:4fc4199
        command: ["uwsgi", "--ini", "config/uwsgi.ini", "--single-interpreter", "--enable-threads", "--http", ":8000"]
        envFrom:
        - configMapRef:
            name: simple-api
        ports:
        - containerPort: 8000
        livenessProbe:
          exec:
            command: ["curl", "-fsS", "-m", "0.1", "-H", "User-Agent: KubernetesHealthCheck/1.0", "http://127.0.0.1:8000/health"]
          initialDelaySeconds: 5
          periodSeconds: 1
          successThreshold: 1
          failureThreshold: 5
        readinessProbe:
          exec:
            command: ["curl", "-fsS", "-m", "0.1", "-H", "User-Agent: KubernetesHealthCheck/1.0", "http://127.0.0.1:8000/health"]
          initialDelaySeconds: 3
          periodSeconds: 1
          successThreshold: 1
          failureThreshold: 3
        resources:
          requests:
            cpu: 500m
            memory: 1G
          limits:
            cpu: 1000m
            memory: 1G

Create another Deployment of Celery workers.

kind: Deployment
apiVersion: apps/v1
metadata:
  name: simple-worker
spec:
  replicas: 2
  selector:
    matchLabels:
      app: simple-worker
  template:
    metadata:
      labels:
        app: simple-worker
    spec:
      terminationGracePeriodSeconds: 30
      containers:
      - name: simple-worker
        image: asia.gcr.io/simple-project-198818/simple-api:4fc4199
        command: ["celery", "-A", "app:celery", "worker", "--without-gossip", "-Ofair", "-l", "info"]
        envFrom:
        - configMapRef:
            name: simple-api
        readinessProbe:
          exec:
            command: ["sh", "-c", "celery inspect -q -A app:celery -d celery@$(hostname) --timeout 10 ping"]
          initialDelaySeconds: 15
          periodSeconds: 15
          timeoutSeconds: 10
          successThreshold: 1
          failureThreshold: 3
        resources:
          requests:
            cpu: 500m
            memory: 1G
          limits:
            cpu: 1000m
            memory: 1G
$ kubectl apply -f simple-api/ -R
$ kubectl get pods

The minimum value of timeoutSeconds is 1 so that you might need to use exec.command to run arbitrary shell commands with custom timeout settings.

ref:
https://cloudplatform.googleblog.com/2018/05/Kubernetes-best-practices-Setting-up-health-checks-with-readiness-and-liveness-probes.html

Create Deployments With InitContainers

If multiple Init Containers are specified for a Pod, those Containers are run one at a time in sequential order. Each must succeed before the next can run. When all of the Init Containers have run to completion, Kubernetes initializes regular containers as usual.

kind: Service
apiVersion: v1
metadata:
  name: gcs-proxy-media-simple-project-com
spec:
  type: NodePort
  selector:
    app: gcs-proxy-media-simple-project-com
  ports:
    - name: http
      port: 80
      targetPort: 80
---
kind: ConfigMap
apiVersion: v1
metadata:
  name: google-cloud-storage-proxy
data:
  nginx.conf: |-
    worker_processes auto;

    http {
      include mime.types;
      default_type application/octet-stream;

      server {
        listen 80;

        if ( $http_user_agent ~* (GoogleHC|KubernetesHealthCheck) ) {
          return 200;
        }

        root /usr/share/nginx/html;
        open_file_cache max=10000 inactive=10m;
        open_file_cache_valid 1m;
        open_file_cache_min_uses 1;
        open_file_cache_errors on;

        include /etc/nginx/conf.d/*.conf;
      }
    }
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: gcs-proxy-media-simple-project-com
spec:
  replicas: 1
  selector:
    matchLabels:
      app: gcs-proxy-media-simple-project-com
  template:
    metadata:
      labels:
        app: gcs-proxy-media-simple-project-com
    spec:
      volumes:
      - name: nginx-config
        configMap:
          name: google-cloud-storage-proxy
      - name: nginx-config-extra
        emptyDir: {}
      initContainers:
      - name: create-robots-txt
        image: busybox
        command: ["sh", "-c"]
        args:
        - |
            set -euo pipefail
            cat << 'EOF' > /etc/nginx/conf.d/robots.txt
            User-agent: *
            Disallow: /
            EOF
        volumeMounts:
        - name: nginx-config-extra
          mountPath: /etc/nginx/conf.d/
      - name: create-nginx-extra-conf
        image: busybox
        command: ["sh", "-c"]
        args:
        - |
            set -euo pipefail
            cat << 'EOF' > /etc/nginx/conf.d/extra.conf
            location /robots.txt {
              alias /etc/nginx/conf.d/robots.txt;
            }
            EOF
        volumeMounts:
        - name: nginx-config-extra
          mountPath: /etc/nginx/conf.d/
      containers:
      - name: http
        image: swaglive/openresty:gcsfuse
        imagePullPolicy: Always
        args: ["nginx", "-c", "/usr/local/openresty/nginx/conf/nginx.conf", "-g", "daemon off;"]
        ports:
        - containerPort: 80
        securityContext:
          privileged: true
          capabilities:
            add: ["CAP_SYS_ADMIN"]
        env:
          - name: GCSFUSE_OPTIONS
            value: "--debug_gcs --implicit-dirs --stat-cache-ttl 1s --type-cache-ttl 24h --limit-bytes-per-sec -1 --limit-ops-per-sec -1 -o ro,allow_other"
          - name: GOOGLE_CLOUD_STORAGE_BUCKET
            value: asia.contents.simple-project.com
        volumeMounts:
        - name: nginx-config
          mountPath: /usr/local/openresty/nginx/conf/nginx.conf
          subPath: nginx.conf
          readOnly: true
        - name: nginx-config-extra
          mountPath: /etc/nginx/conf.d/
          readOnly: true
        readinessProbe:
          httpGet:
            port: 80
            path: /
            httpHeaders:
            - name: User-Agent
              value: "KubernetesHealthCheck/1.0"
          timeoutSeconds: 1
          initialDelaySeconds: 5
          periodSeconds: 5
          failureThreshold: 1
          successThreshold: 1
        resources:
          requests:
            cpu: 0m
            memory: 500Mi
          limits:
            cpu: 1000m
            memory: 500Mi
$ kubectl exec -i -t simple-api-5968cfc48d-8g755 -- sh                                                                                  (gke_simple-project-198818_asia-east1_demo/default)
> curl http://gcs-proxy-media-simple-project-com/robots.txt
User-agent: *
Disallow: /

ref:
https://kubernetes.io/docs/concepts/workloads/pods/init-containers/
https://blog.percy.io/tuning-nginx-behind-google-cloud-platform-http-s-load-balancer-305982ddb340

Create Deployments With Canary Deployment

TODO

ref:
https://kubernetes.io/docs/concepts/cluster-administration/manage-deployment/#canary-deployments
https://medium.com/google-cloud/kubernetes-canary-deployments-for-mere-mortals-13728ce032fe

Rollback A Deployment

Yes, you could publish a deployment with kubectl apply --record and rollback it with kubectl rollout undo. However, the simplest way might be just git checkout the previous commit and deploy again with kubectl apply.

The formal way.

$ kubectl apply -f simple-api/ -R --record
$ kubectl rollout history deploy/simple-api
$ kubectl rollout undo deploy/simple-api --to-revision=2

The git way.

$ git checkout b7ed8d5
$ kubectl apply -f simple-api/ -R
$ kubectl get pods

ref:
https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#rolling-back-a-deployment

Scale A Deployment

Simply increase the number of spec.replicas and deploy again.

$ kubectl apply -f simple-api/ -R
# or
$ kubectl scale --replicas=10 deploy/simple-api

$ kubectl get pods

ref:
https://kubernetes.io/docs/reference/generated/kubectl/kubectl-commands#scale
https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#scaling-a-deployment

Create HorizontalPodAutoscalers (HPA)

The Horizontal Pod Autoscaler automatically scales the number of pods in a Deployment based on observed CPU utilization, memory usage, or custom metrics. Yes, HPA only applies to Deployments and ReplicationControllers.

kind: HorizontalPodAutoscaler
apiVersion: autoscaling/v2beta1
metadata:
  name: simple-api
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: simple-api
  minReplicas: 2
  maxReplicas: 20
  metrics:
  - type: Resource
    resource:
      name: cpu
      targetAverageUtilization: 80
  - type: Resource
    resource:
      name: memory
      targetAverageValue: 800M
---
kind: HorizontalPodAutoscaler
apiVersion: autoscaling/v2beta1
metadata:
  name: simple-worker
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: simple-worker
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      targetAverageUtilization: 80
  - type: Resource
    resource:
      name: memory
      targetAverageValue: 500M
$ kubectl apply -f simple-api/hpa.yaml

$ kubectl get hpa --watch
NAME            REFERENCE                  TARGETS                   MINPODS   MAXPODS   REPLICAS   AGE
simple-api      Deployment/simple-api      18685952/800M, 4%/80%     2         20        3          10m
simple-worker   Deployment/simple-worker   122834944/500M, 11%/80%   2         10        3          10m

ref:
https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/
https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale-walkthrough/

You could run some load testing.

ref:
https://medium.com/@jonbcampos/kubernetes-horizontal-pod-scaling-190e95c258f5

There is also Cluster Autoscaler in Google Kubernetes Engine.

$ gcloud container clusters update demo \
--enable-autoscaling --min-nodes=1 --max-nodes=10 \
--node-pool=default-pool

ref:
https://cloud.google.com/kubernetes-engine/docs/concepts/cluster-autoscaler

Create VerticalPodsAutoscalers (VPA)

TODO

ref:
https://medium.com/@Mohamed.ahmed/kubernetes-autoscaling-101-cluster-autoscaler-horizontal-pod-autoscaler-and-vertical-pod-2a441d9ad231

Create PodDisruptionBudget (PDB)

  • Voluntary disruptions: actions initiated by application owners or admins.
  • Involuntary disruptions: unavoidable cases like hardware failures or system software error.

PodDisruptionBudgets are only accounted for with voluntary disruptions, something like a hardware failure will not take PodDisruptionBudget into account. PDB cannot prevent involuntary disruptions from occurring, but they do count against the budget.

Create a PodDisruptionBudget for a stateless application.

kind: PodDisruptionBudget
apiVersion: policy/v1beta1
metadata:
  name: simple-api
spec:
  minAvailable: 90%
  selector:
    matchLabels:
      app: simple-api

Create a PodDisruptionBudget for a multiple-instance stateful application.

kind: PodDisruptionBudget
apiVersion: policy/v1beta1
metadata:
  name: mongodb-rs0
spec:
  minAvailable: 2
  selector:
    matchLabels:
      app: mongodb-rs0
$ kubectl apply -f simple-api/pdb.yaml
$ kubectl apply -f mongodb/pdb.yaml

$ kubectl get pdb
NAME          MIN AVAILABLE   MAX UNAVAILABLE   ALLOWED DISRUPTIONS   AGE
mongodb-rs0   2               N/A               1                     48m
simple-api    90%             N/A               0                     48m

ref:
https://kubernetes.io/docs/concepts/workloads/pods/disruptions/
https://kubernetes.io/docs/tasks/run-application/configure-pdb/

Actually, you could also have the similar functionality using .spec.strategy.rollingUpdate.

  • maxUnavailable: The maximum number of Pods that can be unavailable during the update process.
  • maxSurge: The maximum number of Pods that can be created over the desired number of Pods.

Which makes sure that total ready Pods >= total desired Pods - maxUnavailable and total Pods <= total desired Pods + maxSurge.

ref:
https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#writing-a-deployment-spec
https://cloud.google.com/kubernetes-engine/docs/how-to/updating-apps

Create Services

A Service is basically a load balancer of a set of Pods which are selected by labels. Since you can't rely on any Pod's IP which changes every time it creates and destroys, you should always provide a Service as an entry point for your Pods or so-called Microservice.

Typically, containers you run in the cluster are not accessible from the Internet, because they do not have external IP addresses. You must explicitly expose your application by creating a Service or an Ingress.

There are following Service types:

  • ClusterIP: A virtual IP which is only reachable from within the cluster. Also, the default Service type.
  • NodePort: It opens a specific port on all Nodes, and any traffic sent to the specific port on any node is forwarded to the Service.
  • LoadBalancer: It builds on NodePorts by additionally configuring the cloud provider to create an external load balancer.
  • ExternalName: It maps the service to an external CNAME record, i.e., your MySQL RDS on AWS.

Create a Service.

kind: Service
apiVersion: v1
metadata:
  name: simple-api
spec:
  type: NodePort
  selector:
    app: simple-api
  ports:
    - name: http
      port: 80
      targetPort: 8000

type: NodePorts is enough in most cases; spec.selector must match labels defined in the corresponding Deployment as the same as spec.ports.targetPort and spec.ports.protocol.

$ kubectl apply -f simple-api/ -R

$ kubectl get svc,endpoints

$ kubespy trace service simple-api
[ADDED v1/Service] default/simple-api
[ADDED v1/Endpoints] default/simple-api
    Directs traffic to the following live Pods:
        - [Ready] simple-api-6b4b4c4bfb-g5dln @ 10.28.1.42
        - [Ready] simple-api-6b4b4c4bfb-h66dg @ 10.28.8.24

ref:
https://kubernetes.io/docs/concepts/services-networking/service/
https://medium.com/google-cloud/kubernetes-nodeport-vs-loadbalancer-vs-ingress-when-should-i-use-what-922f010849e0

After a Service is created, kube-dns creates a corresponding DNS A record named your-service.your-namespace.svc.cluster.local which resolves to an internal IP in the cluster. In ths case: simple-api.default.svc.cluster.local. Headless Services (without a cluster IP) are also assigned a DNS A record which has the same form. Unlike normal Services, this A record directly resolves to a set of IPs of Pods selected by the Service. Clients should be expected to consume the set of IPs or use round-robin selection from the set.

You should always prefer DNS names of a Service over injected environment variables, e.g., FOO_SERVICE_HOST and FOO_SERVICE_PORT.

ref:
https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/

For more detail about Kubernetes networking, go to:
https://github.com/hackstoic/kubernetes_practice/blob/master/%E7%BD%91%E7%BB%9C.md
https://containerops.org/2017/01/30/kubernetes-services-and-ingress-under-x-ray/
https://www.safaribooksonline.com/library/view/kubernetes-up-and/9781491935668/ch07.html

Configure Services With Google Cloud CDN

kind: BackendConfig
apiVersion: cloud.google.com/v1beta1
metadata:
  name: cdn
spec:
  cdn:
    enabled: true
    cachePolicy:
      includeHost: false
      includeProtocol: false
      includeQueryString: false
---
kind: Service
apiVersion: v1
metadata:
  name: gcs-proxy-media-simple-project-com
  annotations:
    beta.cloud.google.com/backend-config: '{"ports": {"http":"cdn"}}'
    cloud.google.com/neg: '{"ingress": true}'
spec:
  selector:
    app: gcs-proxy-media-simple-project-com
  ports:
    - name: http
      port: 80
      targetPort: 80

ref:
https://cloud.google.com/kubernetes-engine/docs/concepts/backendconfig

Configure Services With Network Endpoint Groups (NEGs)

To use container-native load balancing, you must create a cluster with --enable-ip-alias flag, and just add an annotation to your Services. However, the load balancer is not created until you create an Ingress for the Service.

kind: Service
apiVersion: v1
metadata:
  name: simple-api
  annotations:
    cloud.google.com/neg: '{"ingress": true}'
spec:
  selector:
    app: simple-api
  ports:
    - name: http
      port: 80
      targetPort: 8000

ref:
https://cloud.google.com/kubernetes-engine/docs/how-to/container-native-load-balancing

Create An Internal Load Balancer

ref:
https://medium.com/@johnjjung/creating-an-inter-kubernetes-cluster-services-using-an-internal-loadbalancer-137f768bb3fc

Use Port Forwarding

Access a Service or a Pod on your local machine with port forwarding.

# 8080 is the local port and 80 is the remote port
$ kubectl port-forward svc/simple-api 8080:80

# port forward to a Pod directly
$ kubectl port-forward mongo-rs0-0 27017:27017

$ open http://127.0.0.1:8080/

ref:
https://kubernetes.io/docs/tasks/access-application-cluster/port-forward-access-application-cluster/

Create An Ingress

Pods in Kubernetes are not reachable from outside the cluster, so you need a way to expose your Pods to the Internet. Even though you could associate Pods with a Service of the right type, i.e., NodePort or LoadBalancer, the recommended way to expose services is using Ingress. You can do a lot of different things with an Ingress, and there are many types of Ingress controllers that have different capabilities.

There are some reasons to choose Ingress over Service:

  • Service is internal load balancer and Ingress is a gateway of external access to Services
  • Service is L3 load balancer and Ingress is L7 load balancer
  • Ingress allows domain-based and path-based routing to different Services
  • It is not efficient to create a cloud provider's load balancer for each Service you want to expose

Create an Ingress which is implemented using Google Cloud Load Balancing (L7 HTTP load balancer). You should make sure Services exist before creating the Ingress.

kind: Ingress
apiVersion: extensions/v1beta1
metadata:
  name: simple-project
  annotations:
    kubernetes.io/ingress.class: "gce"
    # kubernetes.io/tls-acme: "true"
    # ingress.kubernetes.io/ssl-redirect: "true"
spec:
  # tls:
  # - secretName: simple-project-com-tls
  #   hosts:
  #   - simple-project.com
  #   - www.simple-project.com
  #   - api.simple-project.com
  rules:
  - host: simple-project.com
    http:
      paths:
      - path: /*
        backend:
          serviceName: simple-frontend
          servicePort: 80
  - host: www.simple-project.com
    http:
      paths:
      - path: /*
        backend:
          serviceName: simple-frontend
          servicePort: 80
  - host: api.simple-project.com
    http:
      paths:
      - path: /*
        backend:
          serviceName: simple-api
          servicePort: 80
  - host: asia.contents.simple-project.com
    http:
      paths:
      - path: /*
        backend:
          serviceName: gcs-proxy-media-simple-project-com
          servicePort: 80
  backend:
    serviceName: simple-api
    servicePort: 80

It might take several minutes to spin up a Google HTTP load balancer (includes acquiring the public IP), and at least 5 minutes before the GCE API starts healthchecking backends. After getting your public IP, you could go to your domain provider and create new DNS records which point to the IP.

$ kubectl apply -f ingress.yaml

$ kubectl describe ing simple-project

ref:
https://kubernetes.io/docs/concepts/services-networking/ingress/
https://kubernetes.io/docs/tasks/access-application-cluster/create-external-load-balancer/
https://www.joyfulbikeshedding.com/blog/2018-03-26-studying-the-kubernetes-ingress-system.html

To read more about Google Load balancer, go to:
https://cloud.google.com/kubernetes-engine/docs/tutorials/http-balancer
https://cloud.google.com/compute/docs/load-balancing/http/backend-service

Setup The Ingress With TLS Certificates

To automatically create HTTPS certificates for your domains:

Create Ingress Controllers

Kubernetes supports multiple Ingress controllers:

ref:
https://container-solutions.com/production-ready-ingress-kubernetes/

Create StorageClasses

StorageClass provides a way to define different available storage types, for instance, ext4 SSD, XFS SSD, CephFS, NFS. You could specify what you want in PersistentVolumeClaim or StatefulSet.

kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
  name: ssd
provisioner: kubernetes.io/gce-pd
parameters:
  type: pd-ssd
---
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
  name: ssd-xfs
provisioner: kubernetes.io/gce-pd
parameters:
  type: pd-ssd
  fsType: xfs
---
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
  name: ssd-regional
provisioner: kubernetes.io/gce-pd
parameters:
  type: pd-ssd
  zones: asia-east1-a, asia-east1-b, asia-east1-c
  replication-type: regional-pd
$ kubectl apply -f storageclass.yaml
$ kubectl get sc
NAME PROVISIONER AGE
ssd kubernetes.io/gce-pd 5s
ssd-regional kubernetes.io/gce-pd 4s
ssd-xfs kubernetes.io/gce-pd 3s
standard (default) kubernetes.io/gce-pd 1h

ref:
https://kubernetes.io/docs/concepts/storage/storage-classes/#gce

Create PersistentVolumeClaims

A Volume is just a directory which you could mount into containers and it is shared by all containers inside the same Pod. Also, it has an explicit lifetime - the same as the Pod that encloses it. Sources of Volume are various, they could be a remote Git repo, a file path of the host machine, a folder from a PersistentVolumeClaim, or data from a ConfigMap and a Secret.

PersistentVolumes are used to manage durable storage in a cluster. Unlike Volumes, PersistentVolumes have a lifecycle independent of any individual Pod. On Google Kubernetes Engine, PersistentVolumes are typically backed by Google Compute Engine Persistent Disks. Typically, you don't have to create PersistentVolumes explicitly. In Kubernetes 1.6 and later versions, you only need to create PersistentVolumeClaim, and the corresponding PersistentVolume would be dynamically provisioned with StorageClasses. Pods use PersistentVolumeClaims as Volumes.

Be care of creating a Deployment with PersistentVolumeClaim. In most of the case, you might not want to multiple replica of a Deployment write data into the same PersistentVolumeClaim.

ref:
https://kubernetes.io/docs/concepts/storage/volumes/
https://kubernetes.io/docs/concepts/storage/persistent-volumes/
https://cloud.google.com/kubernetes-engine/docs/concepts/persistent-volumes

Also, IOPS is based on the disk size and node size. You need to claim a large disk size if you want high IOPS even you only have very few disk usage.

ref:
https://cloud.google.com/compute/docs/disks/performance

On Kubernetes v1.10+, it is possible to create local PersistentVolumes for your StatefulSets. Previously, PersistentVolumes only supported remote volume types, for instance, GCE's Persistent Disk and AWS's EBS. However, using local storage ties your applications to that specific node, making your application harder to schedule.

ref:
https://kubernetes.io/blog/2018/04/13/local-persistent-volumes-beta/

Create A StatefulSet

Pods created under a StatefulSet have a few unique attributes: the name of the pod is not random, instead each pod gets an ordinal name. In addition, Pods are created one at a time instead of all at once, which can help when bootstrapping a stateful system. StatefulSet also deletes/updates one Pod at a time, in reverse order with respect to its ordinal index, and it waits for each to be completely shutdown before deleting the next.

Rule of thumb: once you find out that you need PersistentVolume for the component, you might just consider using StatefulSet.

ref:
https://kubernetes.io/docs/tutorials/stateful-application/basic-stateful-set/
https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/
https://akomljen.com/kubernetes-persistent-volumes-with-deployment-and-statefulset/

Create a StatefulSet of a three-node MongoDB replica set.

kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: default-view
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: view
subjects:
  - kind: ServiceAccount
    name: default
    namespace: default
---
kind: Service
apiVersion: v1
metadata:
  name: mongodb-rs0
spec:
  clusterIP: None
  selector:
    app: mongodb-rs0
  ports:
    - port: 27017
      targetPort: 27017
---
kind: StatefulSet
apiVersion: apps/v1
metadata:
  name: mongodb-rs0
spec:
  replicas: 3
  updateStrategy:
    type: RollingUpdate
  serviceName: mongodb-rs0
  selector:
    matchLabels:
      app: mongodb-rs0
  volumeClaimTemplates:
  - metadata:
      name: data
    spec:
      accessModes: ["ReadWriteOnce"]
      storageClassName: ssd-xfs
      resources:
        requests:
          storage: 100G
  template:
    metadata:
      labels:
        app: mongodb-rs0
    spec:
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
            - matchExpressions:
              - key: custom.kubernetes.io/fs-type
                operator: In
                values:
                - "xfs"
              - key: cloud.google.com/gke-preemptible
                operator: NotIn
                values:
                - "true"
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - topologyKey: "kubernetes.io/hostname"
              labelSelector:
                matchExpressions:
                  - key: "app"
                    operator: In
                    values:
                    - mongodb-rs0
      terminationGracePeriodSeconds: 10
      containers:
      - name: db
        image: mongo:3.6.5
        command: ["mongod"]
        args: ["--bind_ip_all", "--replSet", "rs0"]
        ports:
        - containerPort: 27017
        volumeMounts:
        - name: data
          mountPath: /data/db
        readinessProbe:
          exec:
            command: ["mongo", --eval, "db.adminCommand('ping')"]
        resources:
          requests:
            cpu: 2
            memory: 4G
          limits:
            cpu: 4
            memory: 4G
      - name: sidecar
        image: cvallance/mongo-k8s-sidecar
        env:
          - name: MONGO_SIDECAR_POD_LABELS
            value: app=mongodb-rs0
          - name: KUBE_NAMESPACE
            value: default
          - name: KUBERNETES_MONGO_SERVICE_NAME
            value: mongodb-rs0
$ kubectl apply -f storageclass.yaml
$ kubectl apply -f mongodb/ -R

$ kubectl get pods

$ kubetail mongodb -c db
$ kubetail mongodb -c sidecar

$ kubectl scale statefulset mongodb-rs0 --replicas=4

The purpose of cvallance/mongo-k8s-sidecar is to automatically add new Pods to the replica set and remove Pods from the replica set while you scale up or down MongoDB StatefulSet.

ref:
https://github.com/cvallance/mongo-k8s-sidecar
https://kubernetes.io/blog/2017/01/running-mongodb-on-kubernetes-with-statefulsets/
https://medium.com/@thakur.vaibhav23/scaling-mongodb-on-kubernetes-32e446c16b82

Create A Headless Service For A StatefulSet

Headless Services (clusterIP: None) are just like normal Kubernetes Services, except they don’t do any load balancing for you. For a typical StatefulSet component, for instance, a database with Master-Slave replication, you don't want Kubernetes load balancing in order to prevent writing data to slaves accidentally.

When headless Services combine with StatefulSets, they can give you unique DNS addresses which return A records that point directly to Pods themselves. DNS names are in the format of static-pod-name.headless-service-name.namespace.svc.cluster.local.

kind: Service
apiVersion: v1
metadata:
  name: redis-broker
spec:
  clusterIP: None
  selector:
    app: redis-broker
  ports:
  - port: 6379
    targetPort: 6379
---
kind: StatefulSet
apiVersion: apps/v1
metadata:
  name: redis-broker
spec:
  replicas: 1
  serviceName: redis-broker
  selector:
    matchLabels:
      app: redis-broker
  volumeClaimTemplates:
  - metadata:
      name: data
    spec:
      accessModes: ["ReadWriteOnce"]
      storageClassName: ssd
      resources:
        requests:
          storage: 32Gi
  template:
    metadata:
      labels:
        app: redis-broker
    spec:
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
            - matchExpressions:
              - key: cloud.google.com/gke-preemptible
                operator: NotIn
                values:
                - "true"
      volumes:
      - name: config
        configMap:
          name: redis-broker
      containers:
      - name: redis
        image: redis:4.0.10-alpine
        command: ["redis-server"]
        args: ["/etc/redis/redis.conf", "--loglevel", "verbose", "--maxmemory", "1g"]
        ports:
        - containerPort: 6379
        volumeMounts:
        - name: data
          mountPath: /data
        - name: config
          mountPath: /etc/redis
        readinessProbe:
          exec:
            command: ["sh", "-c", "redis-cli -h $(hostname) ping"]
          initialDelaySeconds: 5
          timeoutSeconds: 1
          periodSeconds: 1
          successThreshold: 1
          failureThreshold: 3
        resources:
          requests:
            cpu: 250m
            memory: 1G
          limits:
            cpu: 1000m
            memory: 1G

If redis-broker has 2 replicas, nslookup redis-broker.default.svc.cluster.local returns multiple A records for a single DNS lookup is commonly known as round-robin DNS.

$ kubectl run -i -t --image busybox dns-test --restart=Never --rm /bin/sh

> nslookup redis-broker.default.svc.cluster.local
Server: 10.63.240.10
Address 1: 10.63.240.10 kube-dns.kube-system.svc.cluster.local
Name: redis-broker.default.svc.cluster.local
Address 1: 10.60.6.2 redis-broker-0.redis-broker.default.svc.cluster.local
Address 2: 10.60.6.7 redis-broker-1.redis-broker.default.svc.cluster.local

> nslookup redis-broker-0.redis-broker.default.svc.cluster.local
Server: 10.63.240.10
Address 1: 10.63.240.10 kube-dns.kube-system.svc.cluster.local
Name: redis-broker-0.redis-broker.default
Address 1: 10.60.6.2 redis-broker-0.redis-broker.default.svc.cluster.local

ref:
https://kubernetes.io/docs/concepts/services-networking/service/#headless-services
https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/#services
https://kubernetes.io/docs/tutorials/stateful-application/basic-stateful-set/#using-stable-network-identities

Moreover, there is no port re-mapping for a headless Service due to the IP resolves to Pod directly.

kind: Service
apiVersion: v1
metadata:
  namespace: tick
  name: influxdb
spec:
  clusterIP: None
  selector:
    app: influxdb
  ports:
  - name: api
    port: 4444
    targetPort: 8086
  - name: admin
    port: 8083
    targetPort: 8083
$ kubectl apply -f tick/ -R
$ kubectl get svc --namespace tick
NAME       TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)             AGE
influxdb   ClusterIP   None         <none>        4444/TCP,8083/TCP   1h

$ curl http://influxdb.tick.svc.cluster.local:4444/ping
curl: (7) Failed to connect to influxdb.tick.svc.cluster.local port 4444: Connection refused

$ curl -I http://influxdb.tick.svc.cluster.local:8086/ping
HTTP/1.1 204 No Content
Content-Type: application/json
Request-Id: 7fc09a56-8538-11e8-8d1d-000000000000

Create A DaemonSet

Create a DaemonSet which changes OS kernel configurations on each node.

kind: DaemonSet
apiVersion: apps/v1
metadata:
  name: thp-disabler
spec:
  selector:
    matchLabels:
      app: thp-disabler
  template:
    metadata:
      labels:
        app: thp-disabler
    spec:
      hostPID: true
      containers:
      - name: configurer
        image: gcr.io/google-containers/startup-script:v1
        securityContext:
          privileged: true
        env:
        - name: STARTUP_SCRIPT
          value: |
            #! /bin/bash
            set -o errexit
            set -o pipefail
            set -o nounset

            echo 'never' > /sys/kernel/mm/transparent_hugepage/enabled
            echo 'never' > /sys/kernel/mm/transparent_hugepage/defrag

ref:
https://kubernetes.io/docs/concepts/workloads/controllers/daemonset/

Create A CronJob

Backup your MongoDB database every hour.

kind: CronJob
apiVersion: batch/v1beta1
metadata:
  name: backup-mongodb-rs0
spec:
  suspend: false
  schedule: "30 * * * *"
  startingDeadlineSeconds: 600
  jobTemplate:
    spec:
      template:
        spec:
          restartPolicy: OnFailure
          affinity:
            nodeAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                nodeSelectorTerms:
                - matchExpressions:
                  - key: custom.kubernetes.io/scopes-storage-full
                    operator: In
                    values:
                    - "true"
          volumes:
          - name: backups-dir
            emptyDir: {}
          initContainers:
          - name: clean
            image: busybox
            command: ["rm", "-rf", "/backups/*"]
            volumeMounts:
            - name: backups-dir
              mountPath: /backups
          - name: backup
            image: vinta/mongodb-tools:4.0.1
            workingDir: /backups
            command: ["sh", "-c"]
            args:
            - mongodump --host=$MONGODB_URL --readPreference=secondaryPreferred --oplog --gzip --archive=$(date +%Y-%m-%dT%H-%M-%S).tar.gz
            env:
            - name: MONGODB_URL
              value: mongodb-rs0-0.mongodb-rs0.default.svc.cluster.local,mongodb-rs0-1.mongodb-rs0.default.svc.cluster.local,mongodb-rs0-3.mongodb-rs0.default.svc.cluster.local
            volumeMounts:
            - name: backups-dir
              mountPath: /backups
            resources:
              requests:
                cpu: 2
                memory: 2G
          containers:
          - name: upload
            image: google/cloud-sdk:alpine
            workingDir: /backups
            command: ["sh", "-c"]
            args:
            - gsutil -m cp -r . gs://$(GOOGLE_CLOUD_STORAGE_BUCKET)
            env:
            - name: GOOGLE_CLOUD_STORAGE_BUCKET
              value: simple-project-backups
            volumeMounts:
            - name: backups-dir
              mountPath: /backups
              readOnly: true

Note: The environment variable appears in parentheses, $(VAR), and it is required for the variable to be expanded in the command or args field.

apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: simple-api-send-email
spec:
  schedule: "*/30 * * * *"
  concurrencyPolicy: Forbid
  jobTemplate:
    spec:
      template:
        spec:
          restartPolicy: Never
          containers:
          - name: simple-api-send-email
            image: asia.gcr.io/simple-project-198818/simple-api:4fc4199
            command: ["flask", "shell", "-c"]
            args:
            - |
              from bar.tasks import send_email
              send_email.delay('Hey!', 'Stand up!', to=['[email protected]'])
            envFrom:
            - configMapRef:
                name: simple-api

You could just write a simple Python script as a CronJob since everyting is containerized.

ref:
https://kubernetes.io/docs/tasks/job/automated-tasks-with-cron-jobs/

Define NodeAffinity And PodAffinity

Prevent that Pods locate on preemptible nodes. Also, you should always prefer nodeAffinity over nodeSelector.

kind: StatefulSet
apiVersion: apps/v1
spec:
  template:
    spec:
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
            - matchExpressions:
              - key: cloud.google.com/gke-preemptible
                operator: NotIn
                values:
                - "true"

ref:
https://medium.com/google-cloud/using-preemptible-vms-to-cut-kubernetes-engine-bills-in-half-de2481b8e814

spec.PodAntiAffinity ensures that each Pod of the same Deployment or StatefulSet does not co-locate on a single node.

kind: StatefulSet
apiVersion: apps/v1
spec:
  template:
    spec:
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - topologyKey: "kubernetes.io/hostname"
              labelSelector:
                matchExpressions:
                  - key: "app"
                    operator: In
                    values:
                    - mongodb-rs0

ref:
https://kubernetes.io/docs/concepts/configuration/assign-pod-node/

Migrate Pods from Old Nodes to New Nodes

  • Cordon marks old nodes as unschedulable
  • Drain evicts all Pods on old nodes
for node in $(kubectl get nodes -l cloud.google.com/gke-nodepool=n1-standard-4-pre -o=name); do
  kubectl cordon "$node";
done

for node in $(kubectl get nodes -l cloud.google.com/gke-nodepool=n1-standard-4-pre -o=name); do
  kubectl drain --ignore-daemonsets --delete-local-data --grace-period=2 "$node";
done

$ kubectl get nodes
NAME                                       STATUS                     ROLES     AGE       VERSION
gke-demo-default-pool-3c058fcf-x7cv        Ready                      <none>    2h        v1.11.6-gke.6
gke-demo-default-pool-58da1098-1h00        Ready                      <none>    2h        v1.11.6-gke.6
gke-demo-default-pool-fc34abbf-9dwr        Ready                      <none>    2h        v1.11.6-gke.6
gke-demo-n1-standard-4-pre-1a54e45a-0m7p   Ready,SchedulingDisabled   <none>    58m       v1.11.6-gke.6
gke-demo-n1-standard-4-pre-1a54e45a-mx3h   Ready,SchedulingDisabled   <none>    58m       v1.11.6-gke.6
gke-demo-n1-standard-4-pre-1a54e45a-qhdz   Ready,SchedulingDisabled   <none>    58m       v1.11.6-gke.6

ref:
https://kubernetes.io/docs/reference/generated/kubectl/kubectl-commands#cordon
https://kubernetes.io/docs/reference/generated/kubectl/kubectl-commands#drain
https://cloud.google.com/kubernetes-engine/docs/tutorials/migrating-node-pool

Show Objects' Events

$ kubectl get events -w
$ kubectl get events -w --sort-by=.metadata.creationTimestamp
$ kubectl get events -w --sort-by=.metadata.creationTimestamp | grep mongo

ref:
https://kubernetes.io/docs/tasks/debug-application-cluster/

You could find more comprehensive logs on Google Cloud Stackdriver Logging if you are using GKE.

View Pods' Logs on Stackdriver Logging

You could use the following search formats.

textPayload:"OBJECT_FINALIZE"

logName="projects/simple-project-198818/logs/worker"
textPayload:"Added media preset"

logName="projects/simple-project-198818/logs/beat"
textPayload:"backend_cleanup"

resource.labels.pod_id="simple-api-6744bf74db-529qf"
textPayload:"5adb2bd460d6487649fe82ea"
timestamp>="2018-04-21T12:00:00Z"
timestamp<="2018-04-21T16:00:00Z"

resource.type="k8s_container"
resource.labels.cluster_name="production"
resource.labels.namespace_id="default"
resource.labels.pod_id:"simple-worker"
textPayload:"ConcurrentObjectUseError"

resource.type="k8s_node"
resource.labels.location="asia-east1"
resource.labels.cluster_name="production"
logName="projects/simple-project-198818/logs/node-problem-detector"

# see a Pod's logs
resource.type="k8s_container"
resource.labels.cluster_name="production"
resource.labels.namespace_id="default"
resource.labels.pod_name="cache-redis-0"
"start"

# see a Node's logs
resource.type="k8s_node"
resource.labels.location="asia-east1"
resource.labels.cluster_name="production"
resource.labels.node_name="gke-production-n1-highmem-32-p0-2bd334ec-v4ng"
"start"

ref:
https://kubernetes.io/docs/tasks/debug-application-cluster/logging-stackdriver/
https://cloud.google.com/logging/docs/view/advanced-filters

Best Practices

ref:
https://cloud.google.com/solutions/best-practices-for-building-containers
https://medium.com/@sachin.arote1/kubernetes-best-practices-9b1435a4cb53
https://medium.com/@brendanrius/scaling-kubernetes-for-25m-users-a7937e3536a0

Common Issues

Switch Contexts

Get authentication credentials to allow your kubectl to interact with the cluster.

$ gcloud container clusters get-credentials demo --project simple-project-198818

ref:
https://cloud.google.com/sdk/gcloud/reference/container/clusters/get-credentials
https://kubernetes.io/docs/tasks/access-application-cluster/configure-access-multiple-clusters/

A Context is roughly a configuration profile which indicates the cluster, the namespace, and the user you use. Contexts are stored in ~/.kube/config.

$ kubectl config get-contexts
$ kubectl config use-context gke_simple-project-198818_asia-east1_demo
$ kubectl config view

ref:
https://kubernetes.io/docs/concepts/configuration/organize-cluster-access-kubeconfig/

The recommended way to switch contexts is using fubectl.

$ kcs

ref:
https://github.com/kubermatic/fubectl

Pending Pods

One of the most common reasons of Pending Pods is lack of resources.

$ kubectl describe pod mongodb-rs0-1
...
Events:
Type       Reason              Age                  From                 Message
----       ------              ----                 ----                 -------
Warning    FailedScheduling    3m (x739 over 1d)    default-scheduler    0/3 nodes are available: 1 ExistingPodsAntiAffinityRulesNotMatch, 1 MatchInterPodAffinity, 1 NodeNotReady, 2 NoVolumeZoneConflict, 3 Insufficient cpu, 3 Insufficient memory, 3 MatchNodeSelector.
...

You could resize nodes in the cluster.

$ gcloud container clusters resize demo --node-pool=n1-standard-4-pre --size=5 --region=asia-east1

ref:
https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/

Init:Error Pods

$ kubectl describe mongodump-sh0-1543978800-bdkhl
$ kubectl logs mongodump-sh0-1543978800-bdkhl -c mongodump

ref:
https://kubernetes.io/docs/tasks/debug-application-cluster/debug-init-containers/#accessing-logs-from-init-containers

CrashLoopBackOff Pods

CrashLoopBackOff means the Pod is starting, then crashing, then starting again and crashing again.

When in doubt, kubectl describe.

$ kubectl describe pod the-pod-name
$ kubectl logs the-pod-name --previous

ref:
https://www.krenger.ch/blog/crashloopbackoff-and-how-to-fix-it/
https://sysdig.com/blog/debug-kubernetes-crashloopbackoff/

MongoDB Change Stream: react to real-time data changes

MongoDB Change Stream: react to real-time data changes

What is Change Stream?

Change Stream is a Change Data Capture (CDC) feature provided by MongoDB since v3.6. In layman's terms, it's a high-level API that allows you to subscribe to real-time notifications whenever there is a change in your MongoDB collections, databases, or the entire cluster, in an event-driven fashion.

Change Stream uses information stored in the oplog (operations log) to produce the change event. The oplog.rs is a special capped collection that keeps a rolling record of all insert, update, and remove operations that come into your MongoDB so other members of the Replica Set can copy them. Since Change Stream is built on top of the oplog, it is only available for Replica Sets and Sharded clusters.

The problem with most databases' replication logs is that they have long been considered to be an internal implementation detail of the database, not a public API (Martin Kleppmann, 2017).

Change Stream comes to rescue!

Change Stream in a Sharded cluster

MongoDB has a global logical clock that enables the server to order all changes across a Sharded cluster.

To guarantee total ordering of changes, for each change notification the mongos checks with each shard to see if the shard has seen more recent changes. Sharded clusters with one or more shards that have little or no activity for the collection, or are "cold", can negatively affect the response time of the change stream as the mongos must still check with those cold shards to guarantee total ordering of changes.

References:

What can Change Stream do?

There are some typical use cases of Change Stream:

  • Syncing fields between the source and denormalized collections to mitigate the data consistency issue.
  • Invalidating the cache.
  • Updating the search index.
  • Replicating data to a data warehouse.
  • Hooking up Change Stream to a generic streaming processing pipeline, e.g., Kafka or Spark Streaming.

How to open a Change Stream?

First of all, you must have a Replica Set or a Shared cluster for your MongoDB deployment and make sure you are using WiredTiger storage engine. If you don't, you might use MongoDB all wrong.

All code samples below are written in Node.js.

const { MongoClient, ReadPreference } = require('mongodb');

const MONGO_URL = 'mongodb://127.0.0.1:27017/';

(async () => {
    const mongoClient = await MongoClient.connect(MONGO_URL, {
        appname: 'test',
        readPreference: ReadPreference.PRIMARY,
        useNewUrlParser: true,
    });
    const db = await mongoClient.db('test');
    const changeStream = db.collection('user').watch([], {'fullDocument': 'updateLookup'});

    changeStream.on('change', (event) => {
        console.log(event);
    });
})();

You could also enable 'fullDocument': 'updateLookup' which includes the entire document in each update event, but as the name says, it does a lookup which has an overhead and might exceed the 16MB limitation on BSON documents.

Also, the content of fullDocument may differ from the updateDescription if other majority-committed operations modified the document between the original update operation and the full document lookup. Be cautious when you use it.

References:

  • Change Events
    • Besides regular insert, update, and delete, there is also a replace event which triggered by a update operation.

How to aggregate Change Stream events?

One of the advantages of Change Stream is that you are able to leverage MongoDB's powerful aggregation framework - allowing you to filter and modify the output of Change Stream.

However, there is a tricky part in update events, field names and their contents in the updateDescription.updatedFields might vary if the updated field is an array field. Assuming that we have a tags field which is a list of strings in the user collection. You could try running following code in the mongo shell:

  • $addToSet produces complete items of the array field
  • $push produces only the inserted item of the array field
  • $pull produces complete items of the array field
var userId = ObjectId();
db.getCollection('user').insert({
    "_id" : userId,
    "username" : "vinta",
    "tags" : ["tag1"]
});

db.getCollection('user').updateOne({_id: userId}, {
    '$addToSet': {'tags': 'tag2'},
});
// the change event output would look like:
// {
//     ...
//     "operationType": "update",
//     "updateDescription": {
//         "updatedFields": {
//             "tags": ["tag1", "tag2"]
//         }
//     }
//     ...
// }

db.getCollection('user').updateOne({_id: userId}, {
    '$push': {'tags': 'tag3'},
});
// the change event output would look like:
// {
//     ...
//     "operationType": "update",
//     "updateDescription": {
//         "updatedFields": {
//             "tags.2": "tag3"
//         }
//     }
//     ...
// }

db.getCollection('user').updateOne({_id: userId}, {
    '$pull': {'tags': 'tag1'},
});
// the change event output would look like:
// {
//     ...
//     "operationType": "update",
//     "updateDescription": {
//         "updatedFields": {
//             "tags": ["tag2", "tag3"]
//         }
//     }
//     ...
// }

Fortunately, to mitigate the tags and tags.2 problem, we could do some aggregation to $project and $match change events if we only want to listen to the change of the tags field:

const pipeline = [
    {'$project': {
        '_id': 1,
        'operationType': 1,
        'documentKey': 1,
        'changedDocument': {
            '$objectToArray': {
                '$mergeObjects': ['$updateDescription.updatedFields', '$fullDocument'],
            },
        },
        'removedFields': '$updateDescription.removedFields',
    }},
    {'$match': {
        '$or': [
            {'changedDocument.k': /^tags$/},
            {'changedDocument.k': /^tags./},
            {'removedFields': {'$in': ['tags']}},
            {'operationType': 'delete'},
        ],
    }},
    {'$addFields': {
        'changedDocument': {'$arrayToObject': '$changedDocument'},
    }},
];
const changeStream = db.collection('user').watch(pipeline, {});

References:

How to resume a Change Stream?

Another critical feature of Change Stream is Resumability. Since any service will inevitably get restarted or crashed, it is essential that we can resume from the point of time that Change Stream was interrupted.

There are two options in watch() we can use:

  • resumeAfter: A resume token from any change event.
  • startAtOperationTime: A starting timestamp for Change Stream.

resumeAfter

Before using resumeAfter token, there is MongoDB configuration you might need to tackle with, FeatureCompatibilityVersion.

db.adminCommand({getParameter: 1, featureCompatibilityVersion: 1});
db.adminCommand({setFeatureCompatibilityVersion: "4.0"});

A resumeAfter token is carried by every Change Stream event: the _id field whose value looks like {'_data': '825C4607870000000129295A1004AF1EE5355B7344D6B25478700E75259D46645F696400645C42176528578222B13ADEAA0004'}. In other words, the {'_data': 'a hex string'} is your resumeAfter token.

In practice, you should store each resumeAfter token somewhere, for instance, Redis, so that you can resume from a blackout or a restart. It is also a good idea to wrap the store function with a debounced functionality.

Another unusual (and not so reliable) way to get a resumeAfter token is composing one from the oplog.rs collection:

const _ = require('lodash');
const { MongoClient, ReadPreference } = require('mongodb');

const MONGO_URL = 'mongodb://127.0.0.1:27017/';

(async () => {
    const mongoClient = await MongoClient.connect(MONGO_URL, {
        appname: 'test',
        replicaSet: 'rs0',
        readPreference: ReadPreference.PRIMARY,
        useNewUrlParser: true,
    });

    // cannot use 'local' database through mongos
    const localDb = await mongoClient.db('local');

    // querying oplog.rs might take seconds
    const doc = await localDb.collection('oplog.rs')
        .findOne(
            {'ns': 'test.user'}, // dbName.collectionName
            {'sort': {'$natural': -1}},
        );

    // https://stackoverflow.com/questions/48665409/how-do-i-resume-a-mongodb-changestream-at-the-first-document-and-not-just-change
    // https://github.com/mongodb/mongo/blob/master/src/mongo/db/storage/key_string.cpp
    // https://github.com/mongodb/mongo/blob/master/src/mongo/bson/bsontypes.h
    const resumeAfterData = [
        '82', // unknown
        doc.ts.toString(16), // timestamp
        '29', // unknown
        '29', // unknown
        '5A', // CType::BinData
        '10', // length (16)
        '04', // BinDataType of newUUID
        doc.ui.toString('hex'), // the collection uuid (see `db.getCollectionInfos({name: 'user'})`)
        '46', // CType::Object
        '64', // CType::OID (vary from the type of the collection primary key)
        '5F', // _ (vary from the field name of the collection primary key)
        '69', // i
        '64', // d
        '00', // null
        '64', // CType::OID (vary from the type of document primary key)
        _.get(doc, 'o2._id', _.get(doc, 'o._id')).toString('hex'), // ObjectID, update operations have `o2` field and others have `o` field
        '00', // null
        '04', // unknown
    ].join('').toUpperCase();

    const options = {
        'resumeAfter': {
            '_data': resumeAfterData,
        },
    };
    console.log(options);

    const db = await mongoClient.db('test');
    const changeStream = db.collection('user').watch([], options);

    changeStream.on('change', (event) => {
        console.log(event);
    });
})();

startAtOperationTime

The startAtOperationTime is only available in MongoDB 4.0+. It simply represents a starting point of time for the Change Stream. Also, you must make sure that the specified starting point is in the time range of the oplog if it is in the past.

The tricky part is that this option only accepts a MongoDB Timestamp object. You could also retrieve the latest timestamp directly from db.adminCommand({replSetGetStatus: 1}).

const { MongoClient, ReadPreference, Timestamp } = require('mongodb');

const MONGO_URL = 'mongodb://127.0.0.1:27017/';

(async () => {
    const mongoClient = await MongoClient.connect(MONGO_URL, {
        appname: 'test',
        readPreference: ReadPreference.PRIMARY,
        useNewUrlParser: true,
    });

    const options = {
        'startAtOperationTime': Timestamp(1, Date.now() / 1000),
    };
    console.log(options);

    const db = await mongoClient.db('test');
    const changeStream = db.collection('user').watch([], options);

    changeStream.on('change', (event) => {
        console.log(event);
    });
})();
MongoDB operations: Replica Set

MongoDB operations: Replica Set

A replica set is a group of servers (mongod actually) that maintain the same data set, with one primary which takes client requests, and multiple secondaries that keep copies of the primary's data. If the primary crashes, secondaries can elect a new primary from amongst themselves.

Replication from primary to secondaries is asynchronous.

ref:
https://docs.mongodb.com/v3.6/replication/
https://www.safaribooksonline.com/library/view/mongodb-the-definitive/9781491954454/ch08.html
https://www.percona.com/blog/2018/10/10/mongodb-replica-set-scenarios-and-internals/

Concepts

  • Primary: A node that accepts writes and is the leader for voting. There can be only one primary.
  • Secondary: A node that replicates from the primary or another secondary and can be used for reads. There can be a max of 127.
  • Arbiter: The node does not hold data and only participates in the voting. Also, it cannot be elected as the primary.
    • In the event your node count is an even number, add one of these to break the tie. Never add one where it would make the count even.
  • Priority 0 node: The node cannot be selected as the primary. You might want to lower priority of some slow nodes.
    • Priority allows you to prefer specific nodes are primary.
  • Vote 0 node: The node does not participate in the voting.
    • In some cases, having more than eight nodes means additional nodes must not vote.
  • Hidden node: The hidden node must be a priority 0 node and is invisible to the driver which unable to take queries from clients.
  • Delayed node: The delayed node must be a hidden node, and its data lag behind the primary for some time.
  • Tags: Grants special ability to make queries directly to specific nodes. Useful for BI, geo-locality, and other advanced functions.

ref:
https://docs.mongodb.com/manual/core/replica-set-elections/
https://docs.mongodb.com/manual/core/replica-set-priority-0-member/
https://docs.mongodb.com/manual/core/replica-set-hidden-member/
https://docs.mongodb.com/manual/core/replica-set-delayed-member/

Common Architectures

ref:
https://docs.mongodb.com/v3.6/core/replica-set-architectures/
https://www.percona.com/blog/2018/03/22/the-anatomy-of-a-mongodb-replica-set/

Three-Node Replica Set: Primary, Secondary, Secondary

ref:
https://docs.mongodb.com/v3.6/tutorial/deploy-replica-set/
https://docs.mongodb.com/v3.6/tutorial/expand-replica-set/

If you are running MongoDB cluster on Kubernetes, PLEASE USE THE FULL DNS NAME (FQDN). DO NOT use something like pod-name.service-name.

$ mongo mongodb-rs0-0.mongodb-rs0.default.svc.cluster.local
> rs.initiate({
   _id : "rs0",
   members: [
      {_id: 0, host: "mongodb-rs0-0.mongodb-rs0.default.svc.cluster.local:27017"},
      {_id: 1, host: "mongodb-rs0-1.mongodb-rs0.default.svc.cluster.local:27017"},
      {_id: 2, host: "mongodb-rs0-2.mongodb-rs0.default.svc.cluster.local:27017"}
   ]
})
{
    "ok" : 1,
    "operationTime" : Timestamp(1531223087, 1),
    "$clusterTime" : {
        "clusterTime" : Timestamp(1531223087, 1),
        "signature" : {
            "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
            "keyId" : NumberLong(0)
        }
    }
}
rs0:PRIMARY> db.isMaster()

ref:
https://docs.mongodb.com/v3.6/reference/method/rs.initiate/

$ mongo mongodb-rs0-2.mongodb-rs0.default.svc.cluster.local
rs0:SECONDARY> rs.slaveOk()
rs0:SECONDARY> show dbs
rs0:SECONDARY> rs.conf()
{
    "_id" : "rs0",
    "version" : 1,
    "protocolVersion" : NumberLong(1),
    "members" : [
        {
            "_id" : 0,
            "host" : "mongodb-rs0-0.mongodb-rs0.default.svc.cluster.local:27017",
            "arbiterOnly" : false,
            "buildIndexes" : true,
            "hidden" : false,
            "priority" : 1,
            "tags" : {

            },
            "slaveDelay" : NumberLong(0),
            "votes" : 1
        },
        {
            "_id" : 1,
            "host" : "mongodb-rs0-1.mongodb-rs0.default.svc.cluster.local:27017",
            "arbiterOnly" : false,
            "buildIndexes" : true,
            "hidden" : false,
            "priority" : 1,
            "tags" : {

            },
            "slaveDelay" : NumberLong(0),
            "votes" : 1
        },
        {
            "_id" : 2,
            "host" : "mongodb-rs0-2.mongodb-rs0.default.svc.cluster.local:27017",
            "arbiterOnly" : false,
            "buildIndexes" : true,
            "hidden" : false,
            "priority" : 1,
            "tags" : {

            },
            "slaveDelay" : NumberLong(0),
            "votes" : 1
        }
    ],
    "settings" : {
        "chainingAllowed" : true,
        "heartbeatIntervalMillis" : 2000,
        "heartbeatTimeoutSecs" : 10,
        "electionTimeoutMillis" : 10000,
        "catchUpTimeoutMillis" : -1,
        "catchUpTakeoverDelayMillis" : 30000,
        "getLastErrorModes" : {

        },
        "getLastErrorDefaults" : {
            "w" : 1,
            "wtimeout" : 0
        },
        "replicaSetId" : ObjectId("5b449c2f9269bb1a807a8cdf")
    }
}
rs0:SECONDARY> rs.status()
{
    "set" : "rs0",
    "date" : ISODate("2018-07-10T11:47:48.474Z"),
    "myState" : 1,
    "term" : NumberLong(1),
    "heartbeatIntervalMillis" : NumberLong(2000),
    "optimes" : {
        "lastCommittedOpTime" : {
            "ts" : Timestamp(1531223260, 1),
            "t" : NumberLong(1)
        },
        "readConcernMajorityOpTime" : {
            "ts" : Timestamp(1531223260, 1),
            "t" : NumberLong(1)
        },
        "appliedOpTime" : {
            "ts" : Timestamp(1531223260, 1),
            "t" : NumberLong(1)
        },
        "durableOpTime" : {
            "ts" : Timestamp(1531223260, 1),
            "t" : NumberLong(1)
        }
    },
    "members" : [
        {
            "_id" : 0,
            "name" : "mongodb-rs0-0.mongodb-rs0.default.svc.cluster.local:27017",
            "health" : 1,
            "state" : 1,
            "stateStr" : "PRIMARY",
            "uptime" : 381,
            "optime" : {
                "ts" : Timestamp(1531223260, 1),
                "t" : NumberLong(1)
            },
            "optimeDate" : ISODate("2018-07-10T11:47:40Z"),
            "electionTime" : Timestamp(1531223098, 1),
            "electionDate" : ISODate("2018-07-10T11:44:58Z"),
            "configVersion" : 1,
            "self" : true
        },
        {
            "_id" : 1,
            "name" : "mongodb-rs0-1.mongodb-rs0.default.svc.cluster.local:27017",
            "health" : 1,
            "state" : 2,
            "stateStr" : "SECONDARY",
            "uptime" : 181,
            "optime" : {
                "ts" : Timestamp(1531223260, 1),
                "t" : NumberLong(1)
            },
            "optimeDurable" : {
                "ts" : Timestamp(1531223260, 1),
                "t" : NumberLong(1)
            },
            "optimeDate" : ISODate("2018-07-10T11:47:40Z"),
            "optimeDurableDate" : ISODate("2018-07-10T11:47:40Z"),
            "lastHeartbeat" : ISODate("2018-07-10T11:47:46.599Z"),
            "lastHeartbeatRecv" : ISODate("2018-07-10T11:47:47.332Z"),
            "pingMs" : NumberLong(0),
            "syncingTo" : "mongodb-rs0-0.mongodb-rs0.default.svc.cluster.local:27017",
            "configVersion" : 1
        },
        {
            "_id" : 2,
            "name" : "mongodb-rs0-2.mongodb-rs0.default.svc.cluster.local:27017",
            "health" : 1,
            "state" : 2,
            "stateStr" : "SECONDARY",
            "uptime" : 181,
            "optime" : {
                "ts" : Timestamp(1531223260, 1),
                "t" : NumberLong(1)
            },
            "optimeDurable" : {
                "ts" : Timestamp(1531223260, 1),
                "t" : NumberLong(1)
            },
            "optimeDate" : ISODate("2018-07-10T11:47:40Z"),
            "optimeDurableDate" : ISODate("2018-07-10T11:47:40Z"),
            "lastHeartbeat" : ISODate("2018-07-10T11:47:46.599Z"),
            "lastHeartbeatRecv" : ISODate("2018-07-10T11:47:47.283Z"),
            "pingMs" : NumberLong(0),
            "syncingTo" : "mongodb-rs0-0.mongodb-rs0.default.svc.cluster.local:27017",
            "configVersion" : 1
        }
    ],
    "ok" : 1,
    "operationTime" : Timestamp(1531223260, 1),
    "$clusterTime" : {
        "clusterTime" : Timestamp(1531223260, 1),
        "signature" : {
            "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
            "keyId" : NumberLong(0)
        }
    }
}

Three-Node Replica Set: Primary, Secondary, Arbiter

If your replica set has an even number of members, add an arbiter to obtain a majority of votes in an election for primary. Arbiters do not require dedicated hardware.

ref:
https://docs.mongodb.com/v3.6/tutorial/add-replica-set-arbiter/

Issues

Change Replica Set Name

  1. Start mongod without --replSet
  2. Run db.system.replset.remove({_id: 'oldReplicaSetName'}) in MongoDB Shell
  3. Start mongod with --replSet "newReplicaSetName"

ref:
https://stackoverflow.com/questions/33400607/how-do-i-rename-a-mongodb-replica-set

InvalidReplicaSetConfig: Our replica set configuration is invalid or does not include us

$ kubectl logs -f mongodb-rs0-0
REPL_HB [replexec-10] Error in heartbeat (requestId: 20048) to mongodb-rs0-2.mongodb-rs0:27017, response status: InvalidReplicaSetConfig: Our replica set configuration is invalid or does not include us
$ mongo mongodb-rs0-2.mongodb-rs0.default.svc.cluster.local
rs0:OTHER> rs.status()
{
    "state" : 10,
    "stateStr" : "REMOVED",
    "uptime" : 631,
    "optime" : {
        "ts" : Timestamp(1531224140, 1),
        "t" : NumberLong(1)
    },
    "optimeDate" : ISODate("2018-07-10T12:02:20Z"),
    "ok" : 0,
    "errmsg" : "Our replica set config is invalid or we are not a member of it",
    "code" : 93,
    "codeName" : "InvalidReplicaSetConfig",
    "operationTime" : Timestamp(1531224140, 1),
    "$clusterTime" : {
        "clusterTime" : Timestamp(1531224790, 1),
        "signature" : {
            "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
            "keyId" : NumberLong(0)
        }
    }
}

$ mongo mongodb-rs0-0.mongodb-rs0.default.svc.cluster.local
rs0:PRIMARY> rs.conf() 
{
    "_id" : "rs0",
    "version" : 9,
    "protocolVersion" : NumberLong(1),
    "members" : [
        {
            "_id" : 0,
            "host" : "mongodb-rs0-0.mongodb-rs0.default.svc.cluster.local:27017",
            "arbiterOnly" : false,
            "buildIndexes" : true,
            "hidden" : false,
            "priority" : 1,
            "tags" : {

            },
            "slaveDelay" : NumberLong(0),
            "votes" : 1
        },
        {
            "_id" : 1,
            "host" : "mongodb-rs0-1.mongodb-rs0.default.svc.cluster.local:27017",
            "arbiterOnly" : false,
            "buildIndexes" : true,
            "hidden" : false,
            "priority" : 1,
            "tags" : {

            },
            "slaveDelay" : NumberLong(0),
            "votes" : 1
        },
        {
            "_id" : 2,
            "host" : "mongodb-rs0-2.mongodb-rs0.default.svc.cluster.local:27017",
            "arbiterOnly" : false,
            "buildIndexes" : true,
            "hidden" : false,
            "priority" : 1,
            "tags" : {

            },
            "slaveDelay" : NumberLong(0),
            "votes" : 1
        }
    ],
    "settings" : {
        "chainingAllowed" : true,
        "heartbeatIntervalMillis" : 2000,
        "heartbeatTimeoutSecs" : 10,
        "electionTimeoutMillis" : 10000,
        "catchUpTimeoutMillis" : -1,
        "catchUpTakeoverDelayMillis" : 30000,
        "getLastErrorModes" : {

        },
        "getLastErrorDefaults" : {
            "w" : 1,
            "wtimeout" : 0
        },
        "replicaSetId" : ObjectId("5b449c2f9269bb1a807a8cdf")
    }
}

The faulty member's state is REMOVED (it was once in a replica set but was subsequently removed) and shows Our replica set config is invalid or we are not a member of it. In fact, the real issue is that the removed node is sill in the list of replica set members.

You could just manually remove the broken node from the replica set on the primary, restart the node, and re-add the node.

$ mongo mongodb-rs0-0.mongodb-rs0.default.svc.cluster.local
rs0:PRIMARY> rs.remove("mongodb-rs0-2.mongodb-rs0.default.svc.cluster.local:27017")

# restart the Pod
$ kubectl delete mongodb-rs0-2

$ mongo mongodb-rs0-0.mongodb-rs0.default.svc.cluster.local
rs0:PRIMARY> rs.add("mongodb-rs0-2.mongodb-rs0.default.svc.cluster.local:27017")

ref:
https://stackoverflow.com/questions/47439781/mongodb-replica-set-member-state-is-other
https://docs.mongodb.com/v3.6/tutorial/remove-replica-set-member/
https://docs.mongodb.com/manual/reference/replica-states/

db.isMaster(): Does not have a valid replica set config

rs0:OTHER> db.isMaster()
{
    "hosts" : [
        "mongodb-rs0-0.mongodb-rs0.default.svc.cluster.local:27017",
        "mongodb-rs0-1.mongodb-rs0.default.svc.cluster.local:27017",
        "mongodb-rs0-2.mongodb-rs0.default.svc.cluster.local27017"
    ],
    "setName" : "rs0",
    "ismaster" : false,
    "secondary" : false,
    "info" : "Does not have a valid replica set config",
    "isreplicaset" : true,
    "maxBsonObjectSize" : 16777216,
    "maxMessageSizeBytes" : 48000000,
    "maxWriteBatchSize" : 100000,
    "localTime" : ISODate("2018-07-10T14:34:48.640Z"),
    "logicalSessionTimeoutMinutes" : 30,
    "minWireVersion" : 0,
    "maxWireVersion" : 6,
    "readOnly" : false,
    "ok" : 1,
    "operationTime" : Timestamp(1531232610, 1),
    "$clusterTime" : {
        "clusterTime" : Timestamp(1531232610, 1),
        "signature" : {
            "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
            "keyId" : NumberLong(0)
        }
    }
}

You could just re-configure the replica set and only keep reachable members.

rs0:OTHER> oldConf = rs.conf()
rs0:OTHER> oldConf.members = [oldConf.members[0]]
rs0:OTHER> rs.reconfig(oldConf, {force: true})
rs0:PRIMARY> rs.add("mongodb-rs0-1.mongodb-rs0.default.svc.cluster.local:27017")
rs0:PRIMARY> rs.add("mongodb-rs0-2.mongodb-rs0.default.svc.cluster.local:27017")

ref:
https://docs.mongodb.com/v3.6/tutorial/reconfigure-replica-set-with-unavailable-members/

Change Replica Set Name

  1. Stop mongod
  2. Start mongod --bind_ip_all --port 27017 --dbpath /data/db without --replSet
  3. Remove the old Replica Set name
use admin
db.getCollection('system.version').remove({_id: 'shardIdentity'})

use local
db.getCollection('system.replset').remove({_id: 'rs0'})
  1. Start mongod --bind_ip_all --port 27017 --dbpath /data/db --shardsvr --replSet sh0

ref:
https://stackoverflow.com/questions/33400607/how-do-i-rename-a-mongodb-replica-set

Connect To A Replica Set Cluster

ref:
https://api.mongodb.com/python/current/examples/high_availability.html

Use Connection Pools

ref:
https://api.mongodb.com/python/current/faq.html#how-does-connection-pooling-work-in-pymongo

MongoDB cookbook: Indexes

MongoDB cookbook: Indexes

Indexes are crucial for the efficient execution of queries and aggregations in MongoDB. Without indexes, MongoDB must perform a collection scan, i.e., scan every document in a collection.

If a write operation modifies an indexed field, MongoDB updates all indexes that have the modified field as a key. So, be careful while choosing indexes.

Types Of Indexes

ref:
https://docs.mongodb.com/manual/indexes/
https://docs.mongodb.com/manual/applications/indexes/

Single Field Index

For a single field index and sort operations, the sort order (i.e. ascending or descending) of the index key doesn't matter. With index intersetion, single field indexs could be powerful.

ref:
https://docs.mongodb.com/manual/core/index-single/

Compound Index

The order of the fields listed in a compound index is very important.

ref:
https://docs.mongodb.com/manual/core/index-compound/
https://docs.mongodb.com/manual/tutorial/create-indexes-to-support-queries/

TTL Index

When the TTL thread is active, a background thread in mongod reads the values in the index and removes expired documents from the collection. You will see delete operations in the output of db.currentOp().

TTL indexes are a single-field indexes. Compound indexes do not support TTL and ignore the expireAfterSeconds option.

import datetime

class JournalEntry(db.Document):
    users = db.ListField(db.ReferenceField('User'))
    event = db.StringField()
    context = db.DynamicField()
    timestamp = db.DateTimeField(default=datetime.datetime.utcnow)

    meta = {
        'index_background': True,
        'indexes': [
            {
                'fields': ['timestamp'],
                'cls': False,
                'expireAfterSeconds': int(datetime.timedelta(days=90).total_seconds()),
            },
        ],
    }

ref:
https://docs.mongodb.com/manual/core/index-ttl/

Index Intersection

MongoDB can use multiple single field indexes to fulfill queries.

db.orders.createIndex({tags: 1});
db.orders.createIndex({key: { created_at: -1 }, background: true});

db.orders.find({item: 'abc123', qty: {$gt: 15}});

ref:
https://docs.mongodb.com/manual/core/index-intersection/

Covered Queries

ref:
https://docs.mongodb.com/manual/core/query-optimization/#read-operations-covered-query

Index Limits

The size of an index entry for an indexed field must be less than 1024 bytes. For instance, an arbitrary URL field can easily exceed 1024 bytes.

MongoDB will not insert into an indexed collection any document with an indexed field whose corresponding index entry would exceed the index key limit, and instead, will return an error; Updates to the indexed field will error if the updated value causes the index entry to exceed the indexkey limit.

ref:
https://docs.mongodb.com/manual/reference/limits/#indexes

List Indexes

db.message.getIndexes()

// show collection statistics
db.message.stats()
db.message.stats().indexSizes

// scale defaults to 1 to return size data in bytes
// 1024 * 1024 means MB
db.getCollection('message').stats({'scale': 1024 * 1024}).indexSizes

ref:
https://docs.mongodb.com/manual/tutorial/manage-indexes/

Add Indexes

TODO:
It seems like creating indexes on empty collection, even with background will cause DB latency.

An index which contains array fields might consume a lot of disk space.

db.message.createIndex({
    '_cls': 1,
    'sender': 1,
    'posted_at': 1
}, {'background': true, 'sparse': true})

db.message.createIndex({
    '_cls': 1,
    'includes': 1,
    'posted_at': 1
}, {'background': true, 'sparse': true})

db.getCollection('message').find({
    '$or': [
        // sent by cp
        {
            '_cls': 'Message.ChatMessage',
            'sender': ObjectId('582ee32a5b9c861c87dc297e'),
            'posted_at': {
                '$gte': ISODate('2018-01-08T00:00:00.000Z'),
                '$lt': ISODate('2018-01-14T00:00:00.000Z')
            }
        },
        // sent by payer
        {
            '_cls': 'Message.GiftMessage',
            'includes': ObjectId('582ee32a5b9c861c87dc297e'),
            'posted_at': {
                '$gte': ISODate('2018-01-08T00:00:00.000Z'),
                '$lt': ISODate('2018-01-14T00:00:00.000Z')
            }
        }
    ]
})
import pymongo
from your_app.models import YourModel

YourModel._get_collection().create_index(
    [
        ('users', pymongo.ASCENDING),
        ('timestamp', pymongo.DESCENDING),
    ], 
    background=True,
    partialFilterExpression={'timestamp': {'$exists': True}},
)

ref:
http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.create_index

You can't index two arrays together, in this example: includes and unlocks.

// it doesn't work
db.message.createIndex({
    '_cls': 1,
    'sender': 1,
    'includes': 1,
    'unlocks': 1
}, {'background': true, 'sparse': true})

The Order Of Fields of Compound Indexes

The order of fields in an index matters, you must consider Index Cardinality and Selectivity. Instead, the order of fields in a find() query or $match in an aggregation doesn't affect whether it can use an index or not.

The order of fields in a compound index should be:

  • First, fields on which you will query for exact values.
  • Second, fields on which you will sort.
  • Finally, fields on which you will query for a range of values.

ref:
https://docs.mongodb.com/manual/core/index-compound/#prefixes
https://emptysqua.re/blog/optimizing-mongodb-compound-indexes/
https://blog.mlab.com/2012/06/cardinal-ins/
https://stackoverflow.com/questions/33545339/how-does-the-order-of-compound-indexes-matter-in-mongodb-performance-wise
https://stackoverflow.com/questions/5245737/mongodb-indexes-order-and-query-order-must-match

Partial Indexes v.s. Sparse Indexes

Partial indexes should be preferred over sparse indexes. However, partial indexes only support a very small set of filter operators:

  • $exists
  • $eq or field: value
  • $gt, $gte, $lt, $lte
  • $type
  • $and

If you use 'partialFilterExpression': {'includes': {'$exists': true}}, MongoDB also indexes documents whose includes field has null value.

db.collection('message').createIndex(
    {'_cls': 1, 'includes': 1, 'posted_at': 1},
    {'background': true, 'partialFilterExpression': {'includes': {'$exists': true}}}
)

db.collection('message').createIndex(
  {'created_at': -1},
  {'background': true, 'partialFilterExpression': {'created_at': {'$gte': new Date("2018-01-01T16:00:00Z")}}}
)

ref:
https://docs.mongodb.com/manual/core/index-partial/
https://docs.mongodb.com/manual/core/index-sparse/

Create An Index On An Array Field

Querying will certainly be a lot easier in an array field index than a object field.

ref:
https://stackoverflow.com/questions/9589856/mongo-indexing-on-object-arrays-vs-objects

Create An Unique Index On An Array Field

Create an unique index on an array field.

The unique constraint applies to separate documents in the collection. That is, the unique index prevents separate documents from having the same value for the indexed key. It prevents different documents have the same transaction ID but allows one document has multiple identical transaction IDs.

db.getCollection('test1').createIndex({purchases.transaction_id: 1}, {unique: true})

db.getCollection('test1').insert({ _id: 1, purchases: [
    {transaction_id: 'A'}
]})

db.getCollection('test1').insert({ _id: 5, purchases: [
    {transaction_id: 'A'}
]})

db.getCollection('test1').update({ _id: 1}, {$push: {purchases: {transaction_id: 'A'}}})

To prevent one document has multiple identical transaction IDs, We would have atomic updates on single documents.

user = User(id=bson.ObjectId(user_id))
purchase = DirectPurchase(
    user=user,
    timestamp=timestamp,
    transaction_id=transaction_id,
)
MessagePackProduct.objects \
    .filter(id=message_pack_id, __raw__={
        'purchases': {'$not': {'$elemMatch': {
            '_cls': purchase._cls,
            'user': purchase.user.id,
        }}},
    }) \
    .update_one(push__purchases=purchase)

ref:
https://docs.mongodb.com/manual/core/index-unique/#unique-constraint-across-separate-documents

Sort With Indexes

ref:
https://docs.mongodb.com/manual/tutorial/sort-results-with-indexes/

Drop Indexes

db.message.dropIndex({
    'includes': 1
})

db.message.dropIndex({
    '_cls': 1,
    'posted_at': 1,
    'includes': 1
})

Remove Unused Indexes

You can use db.getCollection('COLLECTION_NAME').aggregate({$indexStats: {}}) to find unused indexes, there is a accesses.ops field which indicates the number of operations that have used the index. Also, you might want to remove indexes which have the same prefix.

db.getCollection('message').aggregate(
    {
        '$indexStats': {}
    },
    {
        '$match': {
            'accesses.ops': {'$gt': 0}
        }
    }
);

Result:

{
    "name" : "_cls_1_sender_1_posted_at_1",
    "key" : {
        "_cls" : 1,
        "sender" : 1,
        "posted_at" : 1
    },
    "host" : "a6ea11893605:27017",
    "accesses" : {
        "ops" : 3,
        "since" : "2018-01-26T07:04:51.137Z"
    }
}

ref:
https://blog.mlab.com/2017/01/using-mongodb-indexstats-to-identify-and-remove-unused-indexes/
https://scalegrid.io/blog/how-to-find-unused-indexes-in-mongodb/

Profiling

// enable
db.setProfilingLevel(2)

// disable
db.setProfilingLevel(0)

// see profiling data after you issues some queries
db.system.profile.find().limit(10).sort( { ts : -1 } ).pretty()

// delete profiling data
db.system.profile.drop()

Query Explain

There are both collection.find().explain() and collection.explain().find(). It's recommended to use collection.find().explain('executionStats') for getting more information, like total documents examined.

db.getCollection('message').find({
    '$or': [
        // sent by cp
        {
            '_cls': 'Message.ChatMessage',
            'sender': ObjectId('582ee32a5b9c861c87dc297e'),
            'posted_at': {
                '$gte': ISODate('2018-01-08T00:00:00.000Z'),
                '$lt': ISODate('2018-01-14T00:00:00.000Z')
            }
        },
        {
            '_cls': 'Message',
            'sender': ObjectId('582ee32a5b9c861c87dc297e'),
            'posted_at': {
                '$gte': ISODate('2018-01-08T00:00:00.000Z'),
                '$lt': ISODate('2018-01-14T00:00:00.000Z')
            }
        },
        // sent by payer
        {
            '_cls': 'Message.ChatMessage',
            'includes': ObjectId('582ee32a5b9c861c87dc297e'),
            'posted_at': {
                '$gte': ISODate('2018-01-08T00:00:00.000Z'),
                '$lt': ISODate('2018-01-14T00:00:00.000Z')
            }
        },
        {
            '_cls': 'Message.ReplyMessage',
            'includes': ObjectId('582ee32a5b9c861c87dc297e'),
            'posted_at': {
                '$gte': ISODate('2018-01-08T00:00:00.000Z'),
                '$lt': ISODate('2018-01-14T00:00:00.000Z')
            }
        },
        {
            '_cls': 'Message.GiftMessage',
            'includes': ObjectId('582ee32a5b9c861c87dc297e'),
            'posted_at': {
                '$gte': ISODate('2018-01-08T00:00:00.000Z'),
                '$lt': ISODate('2018-01-14T00:00:00.000Z')
            }
        }
    ]
})
// .explain()
// .explain('allPlansExecution')
.explain('executionStats')

ref:
https://docs.mongodb.com/manual/reference/method/cursor.explain/
https://docs.mongodb.com/manual/reference/method/db.collection.explain/#db.collection.explain

You could also explain a .update() query. However, .updateMany() and .updateOne() don't support .explain().

db.getCollection('user').explain().update(
    {'follows.user': ObjectId("57985b784af4124063f090d3")},
    {'$set': {'created_at': ISODate('2018-01-01 00:00:00.000Z')}},
    {'multi': true}
)

Some important fields to look at in the result of explain():

  • executionStats.totalKeysExamined
  • executionStats.totalDocsExamined
  • queryPlanner.winningPlan.stage
  • queryPlanner.winningPlan.inputStage.stage
  • queryPlanner.winningPlan.inputStage.indexName
  • queryPlanner.winningPlan.inputStage.direction

Possible values of stage:

  • COLLSCAN: scanning the entire collection
  • IXSCAN: scanning index keys
  • FETCH: retrieving documents
  • SHARD_MERGE: merging results from shards

ref:
https://docs.mongodb.com/manual/reference/explain-results/

Aggregation Explain

db.getCollection('message').explain().aggregate()

ref:
https://stackoverflow.com/questions/12702080/mongodb-explain-for-aggregation-framework
https://docs.mongodb.com/manual/reference/method/db.collection.explain/

If $project, $unwind, or $group occur prior to the $sort operation, $sort cannot use any indexes. Additionally, $sort can only use fields defined in previous $project stage.

Basically, you could just consider the $match part when you want to create new indexes.

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/sort/#sort-operator-and-performance

MongoEngine

_cls creation on indexes is automatically included if allow_inheritance is on. If you want to disable, set kwarg cls: False.

ref:
http://docs.mongoengine.org/guide/defining-documents.html#indexes

MongoDB cookbook: Queries and Aggregations

MongoDB cookbook: Queries and Aggregations

Frequently accessed items are cached in memory, so that MongoDB can provide optimal response time.

MongoDB Shell in JavaScript

Administration

db.currentOp();

// slow queries
db.currentOp({
    "active": true,
    "secs_running": {"$gt" : 3},
    "ns": /^test\./
});

// queries not using any index
db.adminCommand({ 
    "currentOp": true,
    "op": "query", 
    "planSummary": "COLLSCAN"
});

// operations with high numYields
db.adminCommand({ 
    "currentOp": true, 
    "ns": /^test\./, 
    "numYields": {"$gte": 100} 
}) 

db.serverStatus().connections
{
    "current" : 269,
    "available" : 838591,
    "totalCreated" : 417342
}

ref:
https://docs.mongodb.com/manual/reference/method/db.currentOp/
https://hackernoon.com/mongodb-currentop-18fe2f9dbd68
http://www.mongoing.com/archives/6246

BSON Types

ref:
https://docs.mongodb.com/manual/reference/bson-types/

Check If A Document Exists

It is significantly faster to use find() + limit() because findOne() will always read + return the document if it exists. find() just returns a cursor (or not) and only reads the data if you iterate through the cursor.

db.getCollection('message').find({_id: ObjectId("585836504b287b5022a3ae26", delivered: false)}, {_id: 1}).limit(1)

ref:
https://stackoverflow.com/questions/8389811/how-to-query-mongodb-to-test-if-an-item-exists
https://blog.serverdensity.com/checking-if-a-document-exists-mongodb-slow-findone-vs-find/

Find Documents

db.getCollection('user').find({username: 'nanababy520'})

db.getCollection('message').find({_id: ObjectId("5a6383b8d93d7a3fadf75af3")})

db.getCollection('message').find({_cls: 'Message'}).sort({posted_at: -1})

db.getCollection('message').find({sender: ObjectId("57aace67ac08e72acc3b265f"), pricing: {$ne: 0}})

db.getCollection('message').find({
    sender: ObjectId("5ac0f56038cfff013a123d85"),
    created_at: {
        $gte: ISODate('2018-04-21 12:00:00Z'),
        $lte: ISODate('2018-04-21 13:00:00Z')
    }
})
.sort({created_at: -1})

Find Documents With Regular Expression

db.getCollection('user').find({'username': /vicky/})

ref:
https://docs.mongodb.com/manual/reference/operator/query/regex/

Find Documents With An Array Field

  • $in: [...] means "intersection" or "any element in"
  • $all: [...] means "subset" or "contain"
  • $elemMatch: {...} means "any element match"
  • $not: {$elemMatch: {$nin: [...]}} means "subset" or "in"

The last one roughly means not any([False, False, False, False]) where each False is indicating if the item is not in in [...].

ref:
https://stackoverflow.com/questions/12223465/mongodb-query-subset-of-an-array

db.getCollection('message').find({includes: ObjectId("5a4bb448af9c462c610d0cc7")})

db.getCollection('user').find({gender: 'F', tags: 'promoted'})
db.getCollection('user').find({gender: 'F', 'tags.1': {$exists: true}})

ref:
https://docs.mongodb.com/manual/reference/operator/query/exists/#exists-true

Find Documents With An Array Field Of Embedded Documents

Usually, you could use $elemMatch.

{'the_array_field': {'$elemMatch': {
    'a_field_of_each_element': {'$lte': now},
    'another_field_of_each_element': 123
}}}
db.getCollection('message').find({
    unlocks: {
        $elemMatch: {
            _cls: 'PointsUnlock',
            user: ObjectId("57f662e727a79d07993faec5")
        }
    }
})

db.getCollection('feature.shop.product').find({
    purchases: {
        $elemMatch: {
            _cls: 'Purchase'
        }
    }
})

db.getCollection('feature.shop.product').find({
    '_id': 'prod_CWlSTXBEU4mhEu',
    'purchases': {'$not': {'$elemMatch': {
        '_cls': 'DirectPurchase',
        'user': ObjectId("58b61d9094ab56f912ba10a5")
    }}},
})

ref:
https://docs.mongodb.com/manual/reference/operator/query/elemMatch/

Find Documents With Existence Of Fields Or Values

  • .find({'field': {'$exists': true}}): the field exists
  • .find({'field': {'$exists': false}}): the field does not exist
  • .find({'field': {'$type': 10}}): the field exists with a null value
  • .find({'field': null}): the field exists with a null value or the field does not exist
  • .find({'field': {'$ne': null}}): the field exists and the value is not null
  • .find({'array_field': {'$in': [null, []]}})
db.test.insert({'num': 1, 'check': 'value'})
db.test.insert({'num': 2, 'check': null})
db.test.insert({'num': 3})

db.test.find({});

db.test.find({'check': {'$exists': true}})
// return 1 and 2

db.test.find({'check': {'$exists': false}})
// return 3

db.test.find({'check': {'$type': 10}});
// return 2

db.test.find({'check': null})
// return 2 and 3

db.test.find({'check': {'$ne': null}});
// return 1

ref:
https://stackoverflow.com/questions/4057196/how-do-you-query-this-in-mongo-is-not-null
https://docs.mongodb.com/manual/tutorial/query-for-null-fields/

Find Documents Where An Array Field Does Not Contain A Certain Value

db.getCollection('user').update({_id: ObjectId("579994ac61ff217f96a585d9"), tags: {$ne: 'tag_to_add'}}, {$push: {tags: 'tag_to_add'}})

db.getCollection('user').update({_id: ObjectId("579994ac61ff217f96a585d9"), tags: {$nin: ['tag_to_add']}}, {$push: {tags: 'tag_to_add'}})

ref:
https://stackoverflow.com/questions/16221599/find-documents-with-arrays-not-containing-a-document-with-a-particular-field-val

Find Documents Where An Array Field Is Not Empty

db.getCollection('message').find({unlocks: {$exists: true}})

ref:
https://stackoverflow.com/questions/14789684/find-mongodb-records-where-array-field-is-not-empty

Find Documents Where An Array Field's Size Is Greater Than 1

db.getCollection('user.inbox').find({
    'messages.0': {'$exists': true}
})

db.getCollection('message').find({
    '_cls': 'Message',
    'unlocks.10': {'$exists': true}
}).sort({'posted_at': -1})

db.getCollection('message').find({
    '_cls': 'Message.ChatMessage',
    'sender': ObjectId("582ee32a5b9c861c87dc297e"),
    'unlocks': {'$exists': true, '$not': {'$size': 0}}
})

ref:
https://stackoverflow.com/questions/7811163/query-for-documents-where-array-size-is-greater-than-1/15224544

Find Documents With Computed Values Using $expr

For instance, compare 2 fields from a single document in a find() query.

db.getCollection('user').find({
    $expr: {
        $eq: [{$size: '$follows'}, {$size: '$blocks'}]
    }
})

ref:
https://thecodebarbarian.com/a-nodejs-perspective-on-mongodb-36-lookup-expr
https://dzone.com/articles/expressive-query-language-in-mongodb-36-2

Project A Subset Of An Array Field With $filter

A sample document:

{
    "_id" : "message_unlock_pricing",
    "seed" : 42,
    "distributions" : {
        "a" : 0.5,
        "b" : 0.5
    },
    "whitelist" : [ 
        {
            "_id" : ObjectId("57dd071dd20fc40c0cbed6b7"),
            "variation" : "a"
        }, 
        {
            "_id" : ObjectId("5b1173a1487fbe2b2e9bba04"),
            "variation" : "b"
        }, 
        {
            "_id" : ObjectId("5a66d5c2af9c462c617ce552"),
            "variation" : "b"
        }
    ]
}
var now = new Date();

db.getCollection('feature.ab.experiment').aggregate([
    {'$project': {
        '_id': 1,
        'seed': 1,
        'distributions': 1,
        'whitelist': {
            '$filter': {
               'input': {'$ifNull': ["$whitelist", []]},
               'as': "user",
               'cond': {'$eq': ['$$user._id', ObjectId("5a66d5c2af9c462c617ce552")]}
            }
         }
    }},
    {'$unwind': {
        'path': '$whitelist',
        'preserveNullAndEmptyArrays': true
    }}
])

ref:
https://stackoverflow.com/questions/42607221/mongodb-aggregation-project-check-if-array-contains

Insert Documents

db.getCollection('feature.launch').insert({
    'url': '//example.com/launchs/5a06b88aaf9c462c6146ce12.jpg',
    'user': {
        'id': ObjectId("5a06b88aaf9c462c6146ce12"),
        'username': 'luke0804',
        'tags': ["gender:male"]
    }
})

db.getCollection('feature.launch').insert({
    'url': '//example.com/launchs/57c16f5bb811055b66d8ef46.jpg',
    'user': {
        'id': ObjectId("57c16f5bb811055b66d8ef46"),
        'username': 'riva',
        'tags': ["gender:female"]
    }
})

Update Within A For Loop

var oldTags = ['famous', 'newstar', 'featured', 'western', 'recommended', 'popular'];
oldTags.forEach(function(tag) {
    db.getCollection('user').updateMany({tags: tag}, {$addToSet: {tags: 'badge:' + tag}});
});

Update With Conditions Of Field Values

You could update the value of the field to a specified value if the specified value is less than or greater than the current value of the field. The $min and $max operators can compare values of different types.

Only set posted_at to current timestamp if its current value is None or absent.

Post.objects.update_one(
    {
        '_id': bson.ObjectId(post_id),
        'media.0': {'$exists': True},
        'title': {'$ne': None},
        'location': {'$ne': None},
        'gender': {'$ne': None},
        'pricing': {'$ne': None},
    },
    {
        '$min': {'posted_at': utils.utcnow()},
    },
)

ref:
https://docs.mongodb.com/manual/reference/operator/update/min/
https://docs.mongodb.com/manual/reference/operator/update/max/

Update An Array Field

Array update operators:

  • $: Acts as a placeholder to update the first element in an array for documents that matches the query condition.
  • $[]: Acts as a placeholder to update all elements in an array for documents that match the query condition.
  • $[<identifier>]: Acts as a placeholder to update elements in an array that match the arrayFilters condition.
  • $addToSet: Adds elements to an array only if they do not already exist in the set.
  • $push: Adds an item to an array.
  • $pop: Removes the first or last item of an array.
  • $pull: Removes all array elements that match a specified query.
  • $pullAll: Removes all matching values from an array.

ref:
https://docs.mongodb.com/manual/reference/operator/update-array/
http://docs.mongoengine.org/guide/querying.html#atomic-updates
http://thecodebarbarian.com/a-nodejs-perspective-on-mongodb-36-array-filters.html

Add an element in an array field.

user_id = '582ee32a5b9c861c87dc297e'
tag = 'my_tag'

updated = User.objects \
    .filter(id=user_id, tags__ne=tag) \
    .update_one(push__tags=tag)

updated = User.objects \
    .filter(id=user_id) \
    .update_one(add_to_set__schedules={
        'tag': tag,
         'nbf': datetime.datetime(2018, 6, 4, 0, 0),
        'exp': datetime.datetime(2019, 5, 1, 0, 0),
    })

Insert an element into an array at a certain position.

slot = 2
Post.objects.filter(id=post_id, media__id__ne=media_id).update_one(__raw__={
    '$push': {
        'media': {
            '$each': [{'id': bson.ObjectId(media_id)}],
            '$position': slot,
        }
    }
})

ref:
https://docs.mongodb.com/manual/reference/operator/update/position/
http://docs.mongoengine.org/guide/querying.html#querying-lists

Remove elements in an array field. It is also worth noting that update(pull__abc=xyz) always returns 1.

user_id = '582ee32a5b9c861c87dc297e'
tag = 'my_tag'

updated = User.objects \
    .filter(id=user_id) \
    .update_one(pull__tags=tag)

updated = User.objects \
    .filter(id=user_id) \
    .update_one(pull__schedules={'tag': tag})

Remove multiple embedded documents in an array field.

import bson

user_id = '5a66d5c2af9c462c617ce552'
tags = ['valid_tag_1', 'future_tag']

updated_result = User._get_collection().update_one(
    {'_id': bson.ObjectId(user_id)},
    {'$pull': {'schedules': {'tag': {'$in': tags}}}},
)
print(updated_result.raw_result)
# {'n': 1, 'nModified': 1, 'ok': 1.0, 'updatedExisting': True}

ref:
https://stackoverflow.com/questions/28102691/pullall-while-removing-embedded-objects

db.getCollection('feature.feeds').updateMany(
    {
        'aliases': {'$exists': true},
        'exp': {'$gte': ISODate('2019-03-21T00:00:00.000+08:00')},
        'items': {'$elemMatch': {'username': 'engine'}},
    },
    {
        '$pull': {
            'items': {'username': 'engine'},
        }
    }
);

ref:
https://docs.mongodb.com/manual/reference/operator/update/pull/

You could also use add_to_set to add an item to an array only if it is not in the list, which always returns 1 if filter() matches any document. However, you are able to set full_result=True to get detail updated result.

update_result = User.objects.filter(id=user_id).update_one(
    add_to_set__tags=tag,
    full_result=True,
)
# {'n': 1, 'nModified': 1, 'ok': 1.0, 'updatedExisting': True}

ref:
http://docs.mongoengine.org/guide/querying.html#atomic-updates

Update a multi-level nested array field. Yes, arrayFilters supports it.

ref:
https://docs.mongodb.com/manual/reference/operator/update/positional-filtered/
https://stackoverflow.com/questions/23577123/updating-a-nested-array-with-mongodb

Update an embedding document in an array field.

MessagePackProduct.objects \
    .filter(id='prod_CR1u34BIpDbHeo', skus__id='sku_CR23rZOTLhYprP') \
    .update(__raw__={
        '$set': {'skus.$': {'id': 'sku_CR23rZOTLhYprP', 'test': 'test'}}
    })

ref:
https://stackoverflow.com/questions/9200399/replacing-embedded-document-in-array-in-mongodb
https://docs.mongodb.com/manual/reference/method/db.collection.update/#db.collection.update

Update specific embedded documents with arrayFilters in an array field.

User data:

{
    "_id" : ObjectId("5a66d5c2af9c462c617ce552"),
    "username" : "gibuloto",
    "tags" : [
        "beta",
        "future_tag",
        "expired_tag"
    ],
    "schedules" : [
        {
            "tag" : "valid_tag",
            "nbf" : ISODate("2018-05-01T16:00:00.000Z"),
            "exp" : ISODate("2020-06-04T16:00:00.000Z")
        },
        {
            "tag" : "future_tag",
            "nbf" : ISODate("2020-01-28T16:00:00.000Z"),
            "exp" : ISODate("2020-12-14T16:00:00.000Z")
        },
        {
            "tag" : "expired_tag",
            "nbf" : ISODate("2016-02-12T16:00:00.000Z"),
            "exp" : ISODate("2016-04-21T16:00:00.000Z")
        }
    ],
}

It is worth noting that <identifier> in $arrayFilters can only contain lowercase alphanumeric characters.

import bson

user_id = '5a66d5c2af9c462c617ce552'
tags = ['from_past_to_future']

updated_result = User._get_collection().update_one(
    {'_id': bson.ObjectId(user_id)},
    {
        '$addToSet': {'tags': {'$each': tags}},
        '$unset': {'schedules.$[schedule].nbf': True},
    },
    array_filters=[{'schedule.tag': {'$in': [tag for tag in tags]}}],
)
print(updated_result.raw_result)
# {'n': 1, 'nModified': 1, 'ok': 1.0, 'updatedExisting': True}

ref:
https://docs.mongodb.com/master/reference/operator/update/positional-filtered/

Update An Array Field With arrayFilters

You should use arrayFilters as much as possible.

The syntax of arrayFilters would be:

db.collection.update(
   {<query selector>},
   {<update operator>: {'array.$[<identifier>].field': value}},
   {arrayFilters: [{<identifier>: <condition>}}]}
)
Inbox._get_collection().update_many(
    {'messages.id': message_id},
    {'$set': {'messages.$[message].tags': tags}},
    array_filters=[
        {'message.id': message_id},
    ],
)

ref:
https://docs.mongodb.com/manual/reference/operator/update/positional-filtered/

Insert an element into an array field at a certain position.

db.getCollection('feature.forums.post').update(
   { _id: ObjectId("5b3c6a9c8433b15569cae54e") },
   {
     $push: {
        media: {
           $each: [{
                "mimetype" : "image/jpeg",
                "url" : "https://example.com/posts/5adb795b47d057338abe8910.jpg",
                "presets" : {}
            }],
           $position: 1
        }
     }
   }
)

Or use explicit array index $set.

media_id = 'xxx'
media_slot = 0

Post.objects \
    .filter(id=post_id, **{f'media__{media_slot}__id__ne': media_id}) \
    .update_one(__raw__={'$set': {f'media.{media_slot}': {'id': media_id}}})

ref:
https://docs.mongodb.com/manual/reference/operator/update/position/

Set an array field to empty.

db.getCollection('message').update(
    {'tags': 'pack:joycelai-1'},
    {'$set': {'unlocks': []}},
    {'multi': true}
)

db.getCollection('feature.shop.product').update(
    {},
    {'$set': {'purchases': []}},
    {'multi': true}
)

ref:
https://docs.mongodb.com/manual/reference/method/db.collection.update/
https://docs.mongodb.com/manual/reference/operator/update/set/

Remove elements from an array field.

var userId = ObjectId("57985b784af4124063f090d3");

db.getCollection('user').update(
    {'follows.user': userId},
    {'$pull': {'follows': {'user': userId}}},
    {
        'multi': true,
    }
);

db.getCollection('message').update(
    {'_id': {'$in': [
        ObjectId('5aca1ffc4271ab1624787ec4'),
        ObjectId('5aca31ab93ef2936291c3dd4'),
        ObjectId('5aca33d9b5eaef04943c0d0b'),
        ObjectId('5aca34e7a48c543b07fb0a0f'),
        ObjectId('5aca272d93ef296edc1c3dee'),
        ObjectId('5aca342aa48c54306dfb0a21'),
        ObjectId('5aca20756bd01023a8cb02e9')
    ]}},
    {'$pull': {'tags': 'pack:prod_D75YlDMzcCiAw3'}},
    {'multi': true}
);

ref:
https://docs.mongodb.com/manual/reference/operator/update/pull/

Update A Dictionary Field

Set a key/value in a dictionary field.

tutorial.data = {
    "price_per_message": 1200,
    "inbox": []
}

new_inbox = [
    {
        "id": "5af118c598eacb528e8fb8f9",
        "sender": "5a13239eaf9c462c611510fc"
    },
    {
        "id": "5af1117298eacb212a8fb8e9",
        "sender": "5a99554be9a21d5ff38b8ca5"
    }
]
tutorial.update(set__data__inbox=new_inbox)

ref:
https://stackoverflow.com/questions/21158028/updating-a-dictfield-in-mongoengine

Upsert: Update Or Create

You must use upsert=true with uniquely indexed fields. If you don't need the modified document, you should just use update_one(field1=123, field2=456, upsert=True).

Additionally, remember that modify() always reloads the whole object even the original one only loads specific fields with only(). Try to avoid using document.DB_QUERY_METHOD(), and using User.objects.filter().only().modify() or User.objects.filter().update() when it is possible.

tag_schedule = TagSchedule.objects \
    .filter(user=user_id, tag='vip') \
    .modify(
        started_at=started_at,
        ended_at=ended_at,
        upsert=True
    )

user = User.objects \
    .filter(id=user.id, tutorials__buy_diamonds__version=None) \
    .modify(set__tutorials__buy_diamonds__version='v1')

updated = User.objects \
    .filter(user=user_id, tag=tag) \
    .update_one(
        push__followers=new_follower,
    )

ref:
https://docs.mongodb.com/manual/reference/method/db.collection.update/#update-with-unique-indexes
http://docs.mongoengine.org/apireference.html#mongoengine.queryset.QuerySet.modify
http://docs.mongoengine.org/apireference.html#mongoengine.queryset.QuerySet.update_one

Rename A Field

Simply rename a field with $rename.

db.getCollection('user').updateMany(
    {
        'phone_no': {'$exists': true},
        'social.phone-number.uid': {'$exists': false},
    },
    {'$rename': {
        'phone_no': 'social.phone-number.uid',
    }}
);

ref:
https://docs.mongodb.com/manual/reference/operator/update/rename/

Do some extra data converting and rename the field manually.

db.getCollection('user').aggregate([
    {'$match': {
        'twitter.id': {'$exists': true},
        'social.twitter.uid': {'$exists': false},
    }},
    {'$project': {
        'twitter_id': '$twitter.id',
        'twitter_id_str': {'$toString': '$twitter.id'},
    }},
]).forEach(function (document) {
    printjson({
        'id': document._id,
    });

    db.getCollection('user').updateOne(
      {
          'twitter.id': document.twitter_id,
          'social.twitter.uid': {'$exists': false},
      },
      {
          '$unset': {'twitter.id': true},
          '$set': {'social.twitter.uid': document.twitter_id_str}
      }
    )
})

Insert/Replace Large Amount Of Documents

const operations = contracts.map((contract) => {
    // TODO: should create a new contract if there is any change of the contract?
    // use MongoDB transaction to change the new one and old one
    return {
        'replaceOne': {
            'filter': {'settlement_datetime': currentSettlementMonth.toDate(), 'user': contract.user},
            'replacement': contract,
            'upsert': true,
        },
    };
});

db.collection('user.contract').bulkWrite(
    operations,
    {ordered: true},
    (bulkError, result) => {
        if (bulkError) {
            return next(bulkError, null);
        }

        logger.info('Finished importing all contracts');
        return next(null, result);
    },
);

Update Large Numbers Of Documents

Use Bulk.find.arrayFilters() and Bulk.find.update() together.

In Python:

import datetime

expiration_time = datetime.datetime.utcnow() - datetime.timedelta(hours=48)

bulk = Outbox._get_collection().initialize_unordered_bulk_op()

for outbox in Outbox.objects.only('id').filter(messages__posted_at__lt=expiration_time):
    bulk.find({'_id': outbox.id}).update_one({
        '$pull': {'messages': {
            'posted_at': {'$lt': expiration_time},
        }},
    })

try:
    results = bulk.execute()
except pymongo.errors.InvalidOperation as err:
    if str(err) != 'No operations to execute':
        raise err

In JavaScript:

const operations = docs.map((doc) => {
    logger.debug(doc, 'Revenue');

    const operation = {
        'updateOne': {
            'filter': {
                '_id': doc._id,
            },
            'update': {
                '$set': {
                    'tags': doc.contract.tags,
                },
            },
        },
    };
    return operation;
});

db.collection('user.revenue').bulkWrite(
    operations,
    {ordered: false},
    (bulkError, bulkResult) => {
        if (bulkError) {
            return next(bulkError, null);
        }

        logger.info(bulkResult, 'Saved tags');
        return next(null, true);
    },
);
});

ref:
https://docs.mongodb.com/manual/reference/method/Bulk/
https://docs.mongodb.com/manual/reference/method/Bulk.find.arrayFilters/

Of course, you could also update the same document with multiple operations. However, it does not make sense.

from pymongo import UpdateOne
import bson

def _operations():
    if title = payload.get('title'):
        yield UpdateOne({'_id': bson.ObjectId(post_id)}, {'$set': {'title': title}})

    if location = payload.get('location'):
        yield UpdateOne({'_id': bson.ObjectId(post_id)}, {'$set': {'location': location}})      

    if pricing = payload.get('pricing'):
        yield UpdateOne({'_id': bson.ObjectId(post_id)}, {'$set': {'pricing': pricing}})

    if description = payload.get('description'):
        yield UpdateOne({'_id': bson.ObjectId(post_id)}, {'$set': {'description': description}})

    UpdateOne(
        {
            '_id': bson.ObjectId(post_id),
            'media.0': {'$exists': True},
            'title': {'$ne': None},
            'location': {'$ne': None},
            'pricing': {'$ne': None},
            'posted_at': {'$eq': None},
        },
        {'$set': {'posted_at': utils.utcnow()}},
    )

operations = list(_operations())
result = Post._get_collection().bulk_write(operations, ordered=True)
print(result.bulk_api_result)

ref:
https://api.mongodb.com/python/current/examples/bulk.html

Remove items from an array field of documents.

var userId = ObjectId("57a42a779f22bb6bcc434520");

db.getCollection('user').update(
    {'follows.user': userId},
    {'$pull': {'follows': {'user': userId}}},
    {'multi': true}
)

ref:
https://stackoverflow.com/questions/33594397/how-to-update-a-large-number-of-documents-in-mongodb-most-effeciently

Remove Large Numbers Of Documents

in mongo shell:

var bulk = db.getCollection('feature.journal.v2').initializeUnorderedBulkOp()
bulk.find({}).remove()
bulk.execute()

// or

var bulk = db.getCollection('feature.journal.v2').initializeUnorderedBulkOp()
bulk.find({event: 'quest.rewarded'}).remove()
bulk.find({event: 'message.sent'}).remove()
bulk.execute()

ref:
https://docs.mongodb.com/manual/reference/method/Bulk.find.remove/#bulk-find-remove

MongoEngine In Python

ref:
http://docs.mongoengine.org/guide/index.html
http://docs.mongoengine.org/apireference.html

Define Collections

It seems every collection in MongoEngine must have a id field.

ref:
http://docs.mongoengine.org/guide/defining-documents.html

Define A Field With Default EmbeddedDocument

The behavior of setting an EmbeddedDocument class as default works differently with and without only().

class User(ab.models.ABTestingMixin, db.Document):
    class UserSettings(db.EmbeddedDocument):
        reply_price = db.IntField(min_value=0, default=500, required=True)
        preferences = db.ListField(db.StringField())

    email = db.EmailField(max_length=255)
    created_at = db.DateTimeField(default=utils.now)
    last_active = db.DateTimeField(default=utils.now)
    settings = db.EmbeddedDocumentField(UserSettings, default=UserSettings)

If the user does not have settings field in DB, here is the difference.

user = User.objects.get(username='gibuloto')
isinstance(user.settings, User.UserSettings) == True

user = User.objects.only('settings').get(username='gibuloto')
(user.settings is None) == True

user = User.objects.exclude('settings').get(username='gibuloto')
isinstance(user.settings, User.UserSettings) == True

Filter With Raw Queries

post = Post.objects \
    .no_dereference().only('posted_at') \
    .filter(__raw__={
        '_id': bson.ObjectId(post_id),
        'media.0': {'$exists': True},
        'title': {'$ne': None},
        'location': {'$ne': None},
        'gender': {'$ne': None},
        'pricing': {'$ne': None},
    }) \
    .modify(__raw__={'$min': {'posted_at': utils.utcnow()}}, new=True)

print(post.posted_at)

ref:
http://docs.mongoengine.org/guide/querying.html#raw-queries

Check If A Document Exists

Use .exists().

import datetime

now = datetime.datetime.now(datetime.timezone.utc)
if TagSchedule.objects.filter(user=user_id, tag=tag, started_at__gt=now).exists():
    return 'exists'

You have to use __raw__ if the field you want to query is a db.ListField(GenericEmbeddedDocumentField(XXX) field.

if MessagePackProduct.objects.filter(id=message_pack_id, __raw__={'purchases.user': g.user.id}).exists():
    return 'exists'

Upsert: Get Or Create

buy_diamonds = BuyDiamonds.objects.filter(user_id=user.id).upsert_one()

ref:
http://docs.mongoengine.org/apireference.html#mongoengine.queryset.QuerySet.upsert_one

Store Files On GridFS

# models.py
class User(db.Document):
    username = db.StringField()
    image = db.ImageField(collection_name='user.images')
# tasks.py
import bson
import gridfs
import mongoengine

@celery.shared_task(bind=True, ignore_result=True)
def gridfs_save(task, user_id, format='JPEG', raw_data: bytes=None, **kwargs):
    image_id = None

    if raw_data is None:
        user = User.objects.only('image').get(id=user_id)
        if user.image.grid_id:
            image_id, raw_data = user.image.grid_id, user.image.read()

    if not raw_data:
        return

    gf = gridfs.GridFS(mongoengine.connection.get_db(), User.image.collection_name)

    with io.BytesIO(raw_data) as raw_image:
        with Image.open(raw_image) as image:
            image = image.convert('RGB')
            with io.BytesIO() as buffer:
                image.save(buffer, format=format, quality=80, **kwargs)
                buffer.seek(0)
                grid_id = gf.put(buffer, format=format, width=image.width, height=image.height, thumbnail_id=None)

    # NOTE: If function was passed with raw_data, only override if ID is the same as the read
    query = mongoengine.Q(id=user_id)
    if image_id:
        query = query & mongoengine.Q(image=image_id)

    user = User.objects.only('image').filter(query).modify(
        __raw__={'$set': {'image': grid_id}},
        new=False,
    )

    def cleanup():
        # Delete the old image
        if user and user.image:
            yield user.image.grid_id

        # The user image was already changed before the scheduled optimization took place
        # Drop the optimized image
        if user is None and image_id:
            yield image_id

    gridfs_delete.apply_async(kwargs=dict(
        collection=User.image.collection_name,
        grid_ids=list(cleanup()),
    ))

@celery.shared_task(bind=True, ignore_result=True)
def gridfs_delete(task, collection, grid_ids):
    gf = gridfs.GridFS(mongoengine.connection.get_db(), collection)
    for grid_id in grid_ids:
        gf.delete(bson.ObjectId(grid_id))

ref:
http://docs.mongoengine.org/guide/gridfs.html

Store Datetime

MongoDB stores datetimes in UTC.

ref:
https://docs.mongodb.com/manual/reference/method/Date/

2-phase Commit

The easiest way to think about 2-phase commit is idempotency, i.e., if you run a update many times, the results would "be the same": initial -> pending -> applied -> done.

ref:
https://docs.mongodb.com/manual/tutorial/perform-two-phase-commits/

Aggregation Pipeline

  • $match: Filters documents.
  • $project: Modifies document fields.
  • $addFields: Adds or overrides document fields.
  • $group: Groups documents by fields.
  • $lookup: Joins another collection.
  • $replaceRoot: Promotes an embedded document field to the top level and replace all other fields.
  • $unwind: Expanses an array field into multiple documents along with original documents.
  • $facet: Processes multiple pipelines within one stage and output to different fields.

There are special system variables, for instance, $$ROOT, $$REMOVE, $$PRUNE, which you could use in some stages of the aggregation pipeline.

ref:
https://docs.mongodb.com/manual/reference/aggregation-variables/#system-variables

Return Date As Unix Timestamp

import datetime

def stages():
    yield {'$project': {
        'createdAt': {'$floor': {'$divide': [{'$subtract': ['$$created', datetime.datetime.utcfromtimestamp(0)]}, 1000]}},
    }}

try:
    docs = MessagePackProduct.objects.aggregate(*stages())
except StopIteration:
    docs = []
else:
    for doc in docs:
        print(doc)

ref:
https://stackoverflow.com/questions/39274311/convert-iso-date-to-timestamp-in-mongo-query

Match Multiple Conditions Which Store In An Array Fields

db.getCollection('feature.promotions').insert({
    "name": "TEST",
    "nbf": ISODate("2018-05-31 16:00:00.000Z"),
    "exp": ISODate("2018-06-30 15:59:00.001Z"),
    "positions": {
        "discover": {
            "urls": [
                "https://example.com/events/2018/jun/event1/banner.html"
            ]
        }
    },
    "requirements" : [
        {
            // users who like women and their app version is greater than v2.21
            "preferences" : [
                "gender:female"
            ],
            "version_major_min": 2.0,
            "version_minor_min": 21.0
        },
        {
            // female CPs
            "tags" : [
                "stats",
                "gender:female"
            ]
        }
    ]
});
import werkzeug

user_agent = werkzeug.UserAgent('hello-world/2.25.1 (iPhone; iOS 11.4.1; Scale/2.00; com.example.app; zh-tw)')
user_preferences = ['gender:female', 'gender:male']
user_tags = ['beta', 'vip']
user_platforms = [user_agent.platform]

def stages():
    now = utils.utcnow()

    yield {'$match': {
        '$and': [
            {'nbf': {'$lte': now}},
            {'exp': {'$gt': now}},
            {'requirements': {'$elemMatch': {
                'preferences': {'$not': {'$elemMatch': {'$nin': user_preferences}}},
                'tags': {'$not': {'$elemMatch': {'$nin': user_tags}}},
                'platforms': {'$not': {'$elemMatch': {'$nin': user_platforms}}},
                '$or': [
                    {'$and': [
                        {'version_major_min': {'$lte': user_agent.version.major}},
                        {'version_minor_min': {'$lte': user_agent.version.minor}},
                    ]},
                    {'$and': [
                        {'version_minor_min': {'$exists': False}},
                        {'version_minor_min': {'$exists': False}},
                    ]},
                ],
            }}},
        ],
    }}
    yield {'$project': {
        'name': True,
        'nbf': True,
        'exp': True,
        'positions': {'$objectToArray': '$positions'},
    }}
    yield {'$unwind': '$positions'}
    yield {'$sort': {
        'exp': 1,
    }}
    yield {'$project': {
        '_id': False,
        'name': True,
        'position': '$positions.k',
        'url': {'$arrayElemAt': ['$positions.v.urls', 0]},
        'startedAt': {'$floor': {'$divide': [{'$subtract': ['$nbf', constants.UNIX_EPOCH]}, 1000]}},
        'endedAt': {'$floor': {'$divide': [{'$subtract': ['$exp', constants.UNIX_EPOCH]}, 1000]}},
    }}
    yield {'$group': {
        '_id': '$position',
        'items': {'$push': '$$ROOT'},
    }}

try:
    docs = Promotion.objects.aggregate(*stages())
except StopIteration:
    docs = []
else:
    docs = list(docs)

ref:
https://docs.mongodb.com/manual/reference/operator/query/in/
https://docs.mongodb.com/manual/reference/operator/query/nin/
https://docs.mongodb.com/manual/reference/operator/aggregation/setIsSubset/

Do Distinct With $group

def stages():
    yield {'$match': {
        'tags': 'some_tag',
    }}
    yield {'$unwind': '$unlocks'}
    yield {'$replaceRoot': {'newRoot': '$unlocks'}}
    yield {'$match': {
        '_cls': 'MessagePackUnlock',
    }}
    yield {'$group': {
        '_id': '$user',
        'timestamp': {'$first': '$timestamp'},
    }}

for unlock in MessagePackMessage.objects.aggregate(*stages()):
    tasks.offline_purchase_pack.apply(kwargs=dict(
        user_id=unlock['_id'],
        message_pack_id=message_pack.id,
        timestamp=unlock['timestamp'],
    ))

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/group/

Slice Items In Each $group

import random

def stages():
    yield {'$match': {'tags': {'$regex': '^badge:'}}}
    yield {'$unwind': {'path': '$tags', 'includeArrayIndex': 'index'}}
    yield {'$match': {'tags': {'$regex': '^badge:'}}}
    yield {'$project': {'_id': True, 'tag': '$tags', 'index': {'$mod': ['$index', random.random()]}}}
    yield {'$sort': {'index': 1}}
    yield {'$group': {'_id': '$tag', 'users': {'$addToSet': '$_id'}}}
    yield {'$project': {'_id': True, 'users': {'$slice': ['$users', 1000]}}}

docs = User.objects.aggregate(*stages())
for doc in docs:
    badge, user_ids = doc['_id'], doc['users']

Collect Items With $group And $addToSet

User data:

{
    "_id" : ObjectId("5a66d5c2af9c462c617ce552"),
    "username" : "gibuloto",
    "tags" : [ 
        "beta"
    ],
    "schedules" : [ 
        {
            "tag" : "stats",
            "nbf" : ISODate("2018-02-01T16:00:00.000Z"),
            "exp" : ISODate("2018-08-12T16:00:00.000Z")
        }, 
        {
            "tag" : "vip",
            "nbf" : ISODate("2018-05-13T16:00:00.000Z"),
            "exp" : ISODate("2018-05-20T16:00:00.000Z")
        }
    ]
}
def stages():
    now = utils.utcnow()

    yield {'$match': {
        'schedules': {'$elemMatch': {
            'nbf': {'$lte': now},
            'exp': {'$gte': now}
        }}
    }}
    yield {'$unwind': '$schedules'}
    yield {'$match': {
        'schedules.nbf': {'$lte': now},
        'schedules.exp': {'$gte': now}
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'username': True,
        'tag': '$schedules.tag',
        'nbf': '$schedules.nbf',
        'exp': '$schedules.exp'
    }}
    yield {'$group': {
        '_id': '$id',
        'tags': {'$addToSet': '$tag'},
    }}

for user_tag_schedule in User.objects.aggregate(*stages()):
    print(user_tag_schedule)

# output:
# {'_id': ObjectId('579b9387b7af8e1fd1635da9'), 'tags': ['stats']}
# {'_id': ObjectId('5a66d5c2af9c462c617ce552'), 'tags': ['chat', 'vip']}

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/group/

Project A New Field Based On Whether Elements Exist In Another Array Field

Use $addFields with $cond.

def stages():
    user_preferences = g.user.settings.preferences or ['gender:female']
    yield {'$match': {
        'gender': {'$in': [prefix_gender.replace('gender:', '') for prefix_gender in user_preferences]}
    }}

    yield {'$addFields': {
        'isPinned': {'$cond': {
            'if': {'$in': [constants.tags.HIDDEN, '$badges']},
            'then': True,
            'else': False,
        }},
    }}
    yield {'$sort': {
        'isPinned': -1,
        'posted_at': -1,
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'author': '$author',
        'title': '$title',
        'location': '$location',
        'postedAt': {'$floor': {'$divide': [{'$subtract': ['$posted_at', constants.UNIX_EPOCH]}, 1000]}},
        'viewCount': '$view_count',
        'commentCount': {'$size': {'$ifNull': ['$comments', []]}},
        'badges': '$badges',
        'isPinned': '$isPinned',
    }}

try:
    results = Post.objects.aggregate(*stages()).next()
except StopIteration:
    return Response(status=http.HTTPStatus.NOT_FOUND)

ref:
https://stackoverflow.com/questions/16512329/project-new-boolean-field-based-on-element-exists-in-an-array-of-a-subdocument
https://docs.mongodb.com/manual/reference/operator/aggregation/project/
https://docs.mongodb.com/manual/reference/operator/aggregation/addFields/
https://docs.mongodb.com/manual/reference/operator/aggregation/cond/

Project And Filter Out Elements Of An Array With $filter

Elements in details might have no value field.

def stages():
    yield {'$match': {
        '_id': bson.ObjectId(post_id),
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'author': '$author',
        'title': '$title',
        'location': '$location',
        'postedAt': {'$floor': {'$divide': [{'$subtract': ['$posted_at', constants.UNIX_EPOCH]}, 1000]}},
        'viewCount': '$view_count',
        'commentCount': {'$size': '$comments'},
        'details': [
            {'key': 'gender', 'value': '$gender'},
            {'key': 'pricing', 'value': '$pricing'},
            {'key': 'lineId', 'value': {'$ifNull': ['$lineId', None]}},
            {'key': 'description', 'value': {'$ifNull': ['$description', None]}},
        ],
    }}
    yield {'$addFields': {
        'details': {
            '$filter': {
                'input': '$details',
                'as': 'detail',
                'cond': {'$ne': ['$$detail.value', None]},
            }
        }
    }}

try:
    post = next(Post.objects.aggregate(*stages()))
except StopIteration:
    return Response(status=http.HTTPStatus.NOT_FOUND)

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/filter/#exp._S_filter
https://docs.mongodb.com/manual/reference/operator/aggregation/addFields/

Project Specific Fields Of Elements Of An Array With $map

def stages():
    yield {'$match': {
        '_id': bson.ObjectId(post_id),
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'author': '$author',
        'title': '$title',
        'location': '$location',
        'postedAt': {'$floor': {'$divide': [{'$subtract': ['$posted_at', constants.UNIX_EPOCH]}, 1000]}},
        'viewCount': '$view_count',
        'commentCount': {'$size': '$comments'},
        'details': [
            {'key': 'gender', 'value': '$gender'},
            {'key': 'pricing', 'value': '$pricing'},
            {'key': 'lineId', 'value': {'$ifNull': ['$lineId', None]}},
            {'key': 'description', 'value': {'$ifNull': ['$description', None]}},
        ],
        'media': {
            '$map': {
                'input': '$media',
                'as': 'transcoded_media',
                'in': {
                    'mimetype': '$$transcoded_media.mimetype',
                    'dash': '$$transcoded_media.presets.dash',
                    'hls': '$$transcoded_media.presets.hls',
                    'thumbnail': '$$transcoded_media.thumbnail',
                }
            }
        },
    }}
    yield {'$addFields': {
        'details': {
            '$filter': {
                'input': '$details',
                'as': 'detail',
                'cond': {'$ne': ['$$detail.value', None]},
            }
        }
    }}

try:
    post = next(Post.objects.aggregate(*stages()))
except StopIteration:
    return Response(status=http.HTTPStatus.NOT_FOUND)

ref:
https://stackoverflow.com/questions/33831665/how-to-project-specific-fields-from-a-document-inside-an-array

Do Advanced $project With $let

If you find youself want to do $project twice to tackle some fields, you should use $let.

def stages():
    yield {'$match': {
        'purchases.user': g.user.id,
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'name': True,
        'image': {
            '$ifNull': [{'$arrayElemAt': ['$images', 0]}, None],
        },
        'purchasedAt': {
            '$let': {
                'vars': {
                    'purchase': {
                        '$arrayElemAt': [
                            {
                                '$filter': {
                                    'input': '$purchases',
                                    'as': 'purchase',
                                    'cond': {
                                        '$and': [
                                            {'$eq': ['$$purchase.user', g.user.id]},
                                        ],
                                    },
                                },
                            },
                            0,
                        ],
                    },
                },
                'in': '$$purchase.timestamp',
            },
        },
    }}

try:
    docs = MessagePackProduct.objects.aggregate(*stages())
except StopIteration:
    docs = []
else:
    for doc in docs:
        print(doc)

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/let/

Deconstruct An Array Field With $unwind And Query Them With $match

def stages():
    category_tag = 'category:user'
    currency = 'usd'
    platform = 'ios'

    yield {'$match': {
        'active': True,
        'tags': category_tag,
        'total': {'$gt': 0},
        'preview_message': {'$exists': True},
    }}
    yield {'$unwind': '$skus'}
    yield {'$match': {
        'skus.attributes.platform': platform,
        'skus.attributes.currency': currency,
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'name': True,
        'caption': True,
        'description': True,
        'image': {
            '$ifNull': [{'$arrayElemAt': ['$images', 0]}, None],
        },
        'sku': '$skus',
        'created_at': True,
        'is_purchased': {'$in': [g.user.id, {'$ifNull': ['$purchases.user', []]}]},
    }}
    yield {'$sort': {'is_purchased': 1, 'created_at': -1}}

try:
    docs = MessagePackProduct.objects.aggregate(*stages())
except StopIteration:
    docs = []
else:
    for doc in docs:
        print(doc)

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/match/
https://docs.mongodb.com/manual/reference/operator/aggregation/unwind/
https://docs.mongodb.com/manual/reference/operator/aggregation/project/

Query The First Element In An Array Field With $arrayElemAt And $filter

def stages():
    category_tag = 'category:user'
    currency = 'usd'
    platform = 'ios'

    yield {'$match': {
        'active': True,
        'tags': category_tag,
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'name': True,
        'caption': True,
        'description': True,
        'image': {
            '$ifNull': [{'$arrayElemAt': ['$images', 0]}, None],
        },
        'preview_message': True,
        'metadata': True,
        'created_at': True,
        'updated_at': True,
        'active': True,
        'sku': {
            '$ifNull': [
                {
                    '$arrayElemAt': [
                        {
                            '$filter': {
                                'input': '$skus',
                                'as': 'sku',
                                'cond': {
                                    '$and': [
                                        {'$eq': ['$$sku.currency', currency]},
                                        {'$eq': ['$$sku.attributes.platform', platform]},
                                    ]
                                }
                            },
                        },
                        0
                    ]
                },
                None
            ],
        },
        'tags': True,
        'total': True,
        'is_bought': {'$in': [g.user.id, {'$ifNull': ['$purchases.user', []]}]},
    }}
    yield {'$sort': {'is_bought': 1, 'created_at': -1}}

try:
    docs = MessagePackProduct.objects.aggregate(*stages())
except StopIteration:
    docs = []
else:
    for doc in docs:
        print(doc)

ref:
https://docs.mongodb.com/master/reference/operator/aggregation/filter/
https://stackoverflow.com/questions/3985214/retrieve-only-the-queried-element-in-an-object-array-in-mongodb-collection

Join Another Collection Using $lookup

def stages():
    yield {'$match': {
        'tags': 'pack:prod_CR1u34BIpDbHeo',
    }}
    yield {'$lookup': {
        'from': 'user',
        'localField': 'sender',
        'foreignField': '_id',
        'as': 'sender_data',
    }}
    yield {'$unwind': '$sender_data'}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'sender': {
            'id': '$sender_data._id',
            'username': '$sender_data.username',
        },
        'caption': True,
        'posted_at': True,
    }}
    yield {'$sort': {'posted_at': -1}}

try:
    docs = Message.objects.aggregate(*stages())
except StopIteration:
    docs = []
else:
    for doc in docs:
        print(doc)

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/lookup/
https://thecodebarbarian.com/a-nodejs-perspective-on-mongodb-36-lookup-expr

Join Another Collection With Multiple Conditions Using pipeline in $lookup

To access the let variables in the $lookup pipeline, you could only use the $expr operator.

var start = ISODate('2018-09-22T00:00:00.000+08:00');

db.getCollection('feature.shop.order').aggregate([
    {'$match': {
        'payment.timestamp': {'$gte': start},
        'status': {'$in': ['paid']},
    }},
    {'$lookup': {
        'from': 'user',
        'localField': 'customer',
        'foreignField': '_id',
        'as': 'customer_data',
    }},
    {'$unwind': '$customer_data'},
    {'$project': {
        'variation': '$customer_data.experiments.message_unlock_price.variation',
        'amount_normalized': {'$divide': ['$amount', 100.0]},
    }},
    {'$addFields': {
        'amount_usd': {'$multiply': ['$amount_normalized', 0.033]},
    }},
    {'$group': {
       '_id': '$variation',
       'purchase_amount': {'$sum': '$amount_usd'},
       'paid_user_count': {'$sum': 1},
    }},
    {'$lookup': {
        'from': 'user',
        'let': {
            'variation': '$_id',
        },
        'pipeline': [
            {'$match': {
                'last_active': {'$gte': start},
                'experiments': {'$exists': true},
            }},
            {'$match': {
                '$expr': {
                    '$and': [
                         {'$eq': ['$experiments.message_unlock_price.variation', '$$variation']},
                    ],
                },
            }},
            {'$group': {
               '_id': '$experiments.message_unlock_price.variation',
               'count': {'$sum': 1},
            }},
        ],
        'as': 'variation_data',
    }},
    {'$unwind': '$variation_data'},
    {'$project': {
        '_id': 1,
        'purchase_amount': 1,
        'paid_user_count': 1,
        'total_user_count': '$variation_data.count',
    }},
    {'$addFields': {
        'since': start,
        'arpu': {'$divide': ['$purchase_amount', '$total_user_count']},
        'arppu': {'$divide': ['$purchase_amount', '$paid_user_count']},
    }},
    {'$sort': {'_id': 1}},
]);

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/lookup/#join-conditions-and-uncorrelated-sub-queries

or

def stages():
    yield {'$match': {'_id': bson.ObjectId(message_id)}}
    yield {'$limit': 1}
    yield {'$project': {
        '_cls': 1,
        'sender': 1,
        'unlocks': 1,
    }}
    yield {'$unwind': '$unlocks'}
    yield {'$match': {
        'unlocks.user': bson.ObjectId(user_id),
        'unlocks.amount': {'$gt': 0},
    }}
    yield {'$lookup': {
        'from': 'user',
        'let': {
            'sender': '$sender',
            'unlocker': '$unlocks.user',
        },
        'pipeline': [
            {'$match': {
                '$expr': {
                    '$or': [
                        {'$eq': ['$_id', '$$sender']},
                        {'$eq': ['$_id', '$$unlocker']}
                    ]
                }
            }}
        ],
        'as': 'users',
    }}
    yield {'$addFields': {
        'sender': {'$arrayElemAt': ['$users', 0]},
        'unlocker': {'$arrayElemAt': ['$users', 1]},
    }},
    yield {'$project': {
        '_id': 0,
        '_cls': 1,
        'id': '$_id',
        'sender': {
            'id': '$sender._id',
            'username': '$sender.username',
        },
        'unlocker': {
            'id': '$unlocker._id',
            'username': '$unlocker.username',
        },
        'amount': '$unlocks.amount',
    }}

try:
    context = Message.objects.aggregate(*stages()).next()
except StopIteration:
    pass

ref:
https://stackoverflow.com/questions/37086387/multiple-join-conditions-using-the-lookup-operator
https://docs.mongodb.com/manual/reference/operator/aggregation/lookup/#specify-multiple-join-conditions-with-lookup

Count Documents In Another Collection With $lookup (JOIN)

def stages():
    category_tag = f'category:{category}'
    yield {'$match': {
        'active': True,
        'tags': category_tag,
    }}
    yield {'$addFields': {
        'message_pack_id_tag': {'$concat': ['pack:', '$_id']},
    }}
    yield {'$lookup': {
        'from': 'message',
        'localField': 'message_pack_id_tag',
        'foreignField': 'tags',
        'as': 'total',
    }}
    yield {'$addFields': {
        'total': {'$size': '$total'}
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'name': True,
        'total': True,
    }}

try:
    docs = MessagePackProduct.objects.aggregate(*stages())
except StopIteration:
    docs = []
else:
    for doc in docs:
        print(doc)

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/lookup/#equality-match

Use $lookup as findOne() Which Returns An Object

Use $lookup and $unwind.

import bson

def stages():
    yield {'$match': {'_id': bson.ObjectId(gift_id)}}
    yield {'$limit': 1}
    yield {'$lookup': {
        'from': 'user',
        'localField': 'sender',
        'foreignField': '_id',
        'as': 'sender',
    }}
    yield {'$unwind': '$sender'}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'sender': {
            'id': '$sender._id',
            'username': '$sender.username',
        },
        'product_id': '$product._id',
        'sent_at': '$sent_at',
        'amount': '$cost.amount',
    }}

try:
    _context = Gift.objects.aggregate(*stages()).next()
except StopIteration:
    pass

ref:
https://stackoverflow.com/questions/37691727/how-to-use-mongodbs-aggregate-lookup-as-findone

Collapse Documents In An Array

def stages():
    yield {'$match': {
        'tags': f'tutorial:buy-diamonds:v1',
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'caption.text': True,
        'sender': True,
        'media.type': '$media.mimetype',
    }}
    yield {'$facet': {
        'inbox': [
            {'$sort': {'created_at': -1}},
            {'$limit': 10}
        ],
    }}
    yield {'$project': {
        'inbox': True,
        'required_unlock_count': {'$literal': 5},
        'price_per_message': {'$literal': 1200},
    }}

try:
    result = Message.objects.aggregate(*stages()).next()
except StopIteration:
    result = {}

JSON output:

{
    "inbox": [
        {
            "caption": {
                "text": "fuck yeah"
            },
            "id": "5aaba1e9593950337a90dcb3",
            "media": {
                "type": "video/mp4"
            },
            "sender": "5a66d5c2af9c462c617ce552"
        },
        {
            "caption": {
                "text": "test"
            },
            "id": "5ad549276b2c362a4efe5e21",
            "media": {
                "type": "image/jpeg"
            },
            "sender": "5a66d5c2af9c462c617ce552"
        }
    ],
    "price_per_message": 1200,
    "required_unlock_count": 5
}

Do Pagination With $facet And $project

def stages():
    # normal query
    yield {'$match': {
        'purchases.user': g.user.id,
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'name': True,
        'created_at': True,
        'meta': {
            'revision': '$revision',
            'tags': '$tags',
        },
    }}
    yield {'$sort': {'created_at': -1}}

    # pagination
    page = 0
    limit = 10
    yield {'$facet': {
        'meta': [
            {'$count': 'total'},
        ],
        'objects': [
            {'$skip': page * limit},
            {'$limit': limit},
        ]
    }}
    # JSON output:
    # {
    #    "meta": [
    #       {"total": 2}
    #    ],
    #    "objects": [
    #       {
    #          "id": "prod_CR1u34BIpDbHeo",
    #          "name": "Product Name 2"
    #       },
    #       {
    #          "id": "prod_Fkhf9JFK3Rdgk9",
    #          "name": "Product Name 1"
    #       }
    #    ]
    # }
    yield {'$project': {
        'total': {'$let': {
            'vars': {
                'meta': {'$arrayElemAt': ['$meta', 0]},
            },
            'in': '$$meta.total',
        }},
        'objects': True,
    }}
    # JSON output:
    # {
    #    "total": 2,
    #    "objects": [
    #       {
    #          "id": "prod_CR1u34BIpDbHeo",
    #          "name": "Product Name 2"
    #       },
    #       {
    #          "id": "prod_Fkhf9JFK3Rdgk9",
    #          "name": "Product Name 1"
    #       }
    #    ]
    # }

try:
    output = MessagePackProduct.objects.aggregate(*stages()).next()
except StopIteration:
    output = {}
else:
    print(output)

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/facet/
https://docs.mongodb.com/manual/reference/operator/aggregation/project/

Perform $facet + $project => Unwrap with $unwind => Do $facet + $project Again

def stages():
    yield {'$match': {
        'purchases.user': g.user.id,
    }}
    yield {'$project': {
        '_id': False,
        'id': '$_id',
        'name': True,
        'image': {
            '$ifNull': [{'$arrayElemAt': ['$images', 0]}, None],
        },
        'created_at': True,
    }}
    yield {'$sort': {'created_at': -1}}

    # pagination
    page = 0
    limit = 10
    yield {'$facet': {
        'meta': [
            {'$count': 'total'},
        ],
        'objects': [
            {'$skip': page * limit},
            {'$limit': limit},
        ]
    }}
    yield {'$project': {
        'total': {'$let': {
            'vars': {
                'meta': {'$arrayElemAt': ['$meta', 0]},
            },
            'in': '$$meta.total',
        }},
        'objects': True,
    }}

    # do $lookup after the pagination
    yield {'$unwind': '$objects'}
    yield {'$addFields': {
        'objects.message_pack_id_tag': {'$concat': ['pack:', '$objects.id']},
    }}
    yield {'$lookup': {
        'from': 'message',
        'localField': 'objects.message_pack_id_tag',
        'foreignField': 'tags',
        'as': 'objects.total',
    }}
    yield {'$addFields': {
        'objects.total': {'$size': '$objects.total'}
    }}

    # re-wrap into the pagination structure
    yield {'$facet': {
        'total_list': [
            {'$project': {
                'total': True,
            }},
        ],
        'objects': [
            {'$replaceRoot': {'newRoot': '$objects'}},
        ]
    }}
    yield {'$project': {
        'total': {'$let': {
            'vars': {
                'meta': {'$arrayElemAt': ['$total_list', 0]},
            },
            'in': '$$meta.total',
        }},
        'objects': True,
    }}

try:
    output = MessagePackProduct.objects.aggregate(*stages()).next()
except StopIteration:
    output = {}
else:
    print(output)

Do $group First To Reduce Numbers Of $lookup Calls

def stages():
    yield {'$match': {
        'tags': f'pack:{message_pack_id}',
    }}
    yield {'$group': {
        '_id': '$sender',
        'messages': {'$push': '$$ROOT'},
    }}
    yield {'$lookup': {
        'from': 'user',
        'localField': '_id',
        'foreignField': '_id',
        'as': 'sender_data',
    }}
    yield {'$unwind': '$messages'}
    yield {'$project': {
        '_id': False,
        'id': '$messages._id',
        'caption': {
            'text': '$messages.caption.text',
            'y': '$messages.caption.y',
        },
        'sender': {
            'id': {'$arrayElemAt': ['$sender_data._id', 0]},
            'username': {'$arrayElemAt': ['$sender_data.username', 0]},
        },
    }}

try:
    docs = Message.objects.aggregate(*stages())
except StopIteration:
    docs = []
else:
    for doc in docs:
        print(doc)

ref:
https://docs.mongodb.com/manual/reference/operator/aggregation/group/

Copy Collections To Another Database

var bulk = db.getSiblingDB('target_db')['target_collection'].initializeOrderedBulkOp();
db.getCollection('source_collection').find().forEach(function(d) {
    bulk.insert(d);
});
bulk.execute();

var bulk = db.getSiblingDB('test')['company.revenue'].initializeOrderedBulkOp();
db.getCollection('company.revenue').find().forEach(function(d) {
    bulk.insert(d);
});
bulk.execute();

var bulk = db.getSiblingDB('test')['user.contract'].initializeOrderedBulkOp();
db.getCollection('user.contract').find().forEach(function(d) {
    bulk.insert(d);
});
bulk.execute();

var bulk = db.getSiblingDB('test')['user.revenue'].initializeOrderedBulkOp();
db.getCollection('user.revenue').find().forEach(function(d) {
    bulk.insert(d);
});
bulk.execute();

ref:
https://stackoverflow.com/questions/11554762/how-to-copy-a-collection-from-one-database-to-another-in-mongodb

Sadly, cloneCollection() cannot clone collections from one local database to another local database.

ref:
https://docs.mongodb.com/manual/reference/command/cloneCollection/

Useful Tools

Backup

$ mongodump -h  127.0.0.1:27017 --oplog -j=8 --gzip --archive=/data/mongodump.tar.gz

ref:
https://docs.mongodb.com/manual/reference/program/mongodump/

Restore

$ mongorestore --drop --gzip --archive=2018-08-12T03.tar.gz

This kind of error typically indicates some sort of issue with data corruption, which is often caused by problems with the underlying storage device, file system or network connection.

restoring indexes for collection test.message from metadata
Failed: test.message: error creating indexes for test.message: createIndex error: BSONElement: bad type -47

ref:
https://docs.mongodb.com/manual/reference/program/mongorestore/

Profiling

You could also set the profiling level to 2 to record every query.

db.setProfilingLevel(2);

db.getCollection('system.profile').find({
    'ns': { 
        '$nin' : ['test.system.profile', 'test.system.indexes', 'test.system.js', 'test.system.users']
    }
}).limit(5).sort({'ts': -1}).pretty();

ref:
https://docs.mongodb.com/manual/tutorial/manage-the-database-profiler/
https://stackoverflow.com/questions/15204341/mongodb-logging-all-queries

$ pip install mongotail

# set the profiling level
$ mongotail 127.0.0.1:27017/test -l 2

# tail logs
$ mongotail 127.0.0.1:27017/test -f -m -f

ref:
https://github.com/mrsarm/mongotail

Monitoring

$ mongotop
$ mongostat

ref:
https://docs.mongodb.com/manual/reference/program/mongotop/
https://docs.mongodb.com/manual/reference/program/mongostat/

$ pip install mtools

$ mloginfo mongod.log

ref:
https://github.com/rueckstiess/mtools