What is Change Stream?
Change Stream is a Change Data Capture (CDC) feature provided by MongoDB since v3.6. In layman's terms, it's a high-level API that allows you to subscribe to real-time notifications whenever there is a change in your MongoDB collections, databases, or the entire cluster, in an event-driven fashion.
Change Stream uses information stored in the oplog (operations log) to produce the change event. The oplog.rs
is a special capped collection that keeps a rolling record of all insert, update, and remove operations that come into your MongoDB so other members of the Replica Set can copy them. Since Change Stream is built on top of the oplog, it is only available for Replica Sets and Sharded clusters.
The problem with most databases' replication logs is that they have long been considered to be an internal implementation detail of the database, not a public API (Martin Kleppmann, 2017).
Change Stream comes to rescue!
Change Stream in a Sharded cluster
MongoDB has a global logical clock that enables the server to order all changes across a Sharded cluster.
To guarantee total ordering of changes, for each change notification the mongos checks with each shard to see if the shard has seen more recent changes. Sharded clusters with one or more shards that have little or no activity for the collection, or are "cold", can negatively affect the response time of the change stream as the mongos must still check with those cold shards to guarantee total ordering of changes.
References:
What can Change Stream do?
There are some typical use cases of Change Stream:
- Syncing fields between the source and denormalized collections to mitigate the data consistency issue.
- Invalidating the cache.
- Updating the search index.
- Replicating data to a data warehouse.
- Hooking up Change Stream to a generic streaming processing pipeline, e.g., Kafka or Spark Streaming.
How to open a Change Stream?
First of all, you must have a Replica Set or a Shared cluster for your MongoDB deployment and make sure you are using WiredTiger storage engine. If you don't, you might use MongoDB all wrong.
All code samples below are written in Node.js.
const { MongoClient, ReadPreference } = require('mongodb');
const MONGO_URL = 'mongodb://127.0.0.1:27017/';
(async () => {
const mongoClient = await MongoClient.connect(MONGO_URL, {
appname: 'test',
readPreference: ReadPreference.PRIMARY,
useNewUrlParser: true,
});
const db = await mongoClient.db('test');
const changeStream = db.collection('user').watch([], {'fullDocument': 'updateLookup'});
changeStream.on('change', (event) => {
console.log(event);
});
})();
You could also enable 'fullDocument': 'updateLookup'
which includes the entire document in each update
event, but as the name says, it does a lookup which has an overhead and might exceed the 16MB limitation on BSON documents.
Also, the content of fullDocument
may differ from the updateDescription
if other majority-committed operations modified the document between the original update operation and the full document lookup. Be cautious when you use it.
References:
- Change Events
- Besides regular
insert
, update
, and delete
, there is also a replace
event which triggered by a update
operation.
How to aggregate Change Stream events?
One of the advantages of Change Stream is that you are able to leverage MongoDB's powerful aggregation framework - allowing you to filter and modify the output of Change Stream.
However, there is a tricky part in update
events, field names and their contents in the updateDescription.updatedFields
might vary if the updated field is an array field. Assuming that we have a tags
field which is a list of strings in the user
collection. You could try running following code in the mongo
shell:
$addToSet
produces complete items of the array field
$push
produces only the inserted item of the array field
$pull
produces complete items of the array field
var userId = ObjectId();
db.getCollection('user').insert({
"_id" : userId,
"username" : "vinta",
"tags" : ["tag1"]
});
db.getCollection('user').updateOne({_id: userId}, {
'$addToSet': {'tags': 'tag2'},
});
// the change event output would look like:
// {
// ...
// "operationType": "update",
// "updateDescription": {
// "updatedFields": {
// "tags": ["tag1", "tag2"]
// }
// }
// ...
// }
db.getCollection('user').updateOne({_id: userId}, {
'$push': {'tags': 'tag3'},
});
// the change event output would look like:
// {
// ...
// "operationType": "update",
// "updateDescription": {
// "updatedFields": {
// "tags.2": "tag3"
// }
// }
// ...
// }
db.getCollection('user').updateOne({_id: userId}, {
'$pull': {'tags': 'tag1'},
});
// the change event output would look like:
// {
// ...
// "operationType": "update",
// "updateDescription": {
// "updatedFields": {
// "tags": ["tag2", "tag3"]
// }
// }
// ...
// }
Fortunately, to mitigate the tags
and tags.2
problem, we could do some aggregation to $project
and $match
change events if we only want to listen to the change of the tags
field:
const pipeline = [
{'$project': {
'_id': 1,
'operationType': 1,
'documentKey': 1,
'changedDocument': {
'$objectToArray': {
'$mergeObjects': ['$updateDescription.updatedFields', '$fullDocument'],
},
},
'removedFields': '$updateDescription.removedFields',
}},
{'$match': {
'$or': [
{'changedDocument.k': /^tags$/},
{'changedDocument.k': /^tags./},
{'removedFields': {'$in': ['tags']}},
{'operationType': 'delete'},
],
}},
{'$addFields': {
'changedDocument': {'$arrayToObject': '$changedDocument'},
}},
];
const changeStream = db.collection('user').watch(pipeline, {});
References:
How to resume a Change Stream?
Another critical feature of Change Stream is Resumability. Since any service will inevitably get restarted or crashed, it is essential that we can resume from the point of time that Change Stream was interrupted.
There are two options in watch() we can use:
resumeAfter
: A resume token from any change event.
startAtOperationTime
: A starting timestamp for Change Stream.
resumeAfter
Before using resumeAfter
token, there is MongoDB configuration you might need to tackle with, FeatureCompatibilityVersion.
db.adminCommand({getParameter: 1, featureCompatibilityVersion: 1});
db.adminCommand({setFeatureCompatibilityVersion: "4.0"});
A resumeAfter
token is carried by every Change Stream event: the _id
field whose value looks like {'_data': '825C4607870000000129295A1004AF1EE5355B7344D6B25478700E75259D46645F696400645C42176528578222B13ADEAA0004'}
. In other words, the {'_data': 'a hex string'}
is your resumeAfter
token.
In practice, you should store each resumeAfter
token somewhere, for instance, Redis, so that you can resume from a blackout or a restart. It is also a good idea to wrap the store function with a debounced functionality.
Another unusual (and not so reliable) way to get a resumeAfter
token is composing one from the oplog.rs
collection:
const _ = require('lodash');
const { MongoClient, ReadPreference } = require('mongodb');
const MONGO_URL = 'mongodb://127.0.0.1:27017/';
(async () => {
const mongoClient = await MongoClient.connect(MONGO_URL, {
appname: 'test',
replicaSet: 'rs0',
readPreference: ReadPreference.PRIMARY,
useNewUrlParser: true,
});
// cannot use 'local' database through mongos
const localDb = await mongoClient.db('local');
// querying oplog.rs might take seconds
const doc = await localDb.collection('oplog.rs')
.findOne(
{'ns': 'test.user'}, // dbName.collectionName
{'sort': {'$natural': -1}},
);
// https://stackoverflow.com/questions/48665409/how-do-i-resume-a-mongodb-changestream-at-the-first-document-and-not-just-change
// https://github.com/mongodb/mongo/blob/master/src/mongo/db/storage/key_string.cpp
// https://github.com/mongodb/mongo/blob/master/src/mongo/bson/bsontypes.h
const resumeAfterData = [
'82', // unknown
doc.ts.toString(16), // timestamp
'29', // unknown
'29', // unknown
'5A', // CType::BinData
'10', // length (16)
'04', // BinDataType of newUUID
doc.ui.toString('hex'), // the collection uuid (see `db.getCollectionInfos({name: 'user'})`)
'46', // CType::Object
'64', // CType::OID (vary from the type of the collection primary key)
'5F', // _ (vary from the field name of the collection primary key)
'69', // i
'64', // d
'00', // null
'64', // CType::OID (vary from the type of document primary key)
_.get(doc, 'o2._id', _.get(doc, 'o._id')).toString('hex'), // ObjectID, update operations have `o2` field and others have `o` field
'00', // null
'04', // unknown
].join('').toUpperCase();
const options = {
'resumeAfter': {
'_data': resumeAfterData,
},
};
console.log(options);
const db = await mongoClient.db('test');
const changeStream = db.collection('user').watch([], options);
changeStream.on('change', (event) => {
console.log(event);
});
})();
startAtOperationTime
The startAtOperationTime
is only available in MongoDB 4.0+. It simply represents a starting point of time for the Change Stream. Also, you must make sure that the specified starting point is in the time range of the oplog
if it is in the past.
The tricky part is that this option only accepts a MongoDB Timestamp object. You could also retrieve the latest timestamp directly from db.adminCommand({replSetGetStatus: 1}).
const { MongoClient, ReadPreference, Timestamp } = require('mongodb');
const MONGO_URL = 'mongodb://127.0.0.1:27017/';
(async () => {
const mongoClient = await MongoClient.connect(MONGO_URL, {
appname: 'test',
readPreference: ReadPreference.PRIMARY,
useNewUrlParser: true,
});
const options = {
'startAtOperationTime': Timestamp(1, Date.now() / 1000),
};
console.log(options);
const db = await mongoClient.db('test');
const changeStream = db.collection('user').watch([], options);
changeStream.on('change', (event) => {
console.log(event);
});
})();