Sequence IDs: Coming Soon to an Elasticsearch Cluster Near You

What If...

"What if" questions are fun. "What if you had a time machine: when would you visit?" "What if you had one wish and it would come true: what would it be?" They're the types of hypothetical questions you can ask at a dinner party and get insights about what interests and motives somebody has if barriers were broken down. A few years ago at Elastic, we asked ourselves a "what if" that we knew would lead to interesting insights:

"What if we had total ordering on index operations in Elasticsearch? What could we build?"

The answers were far ranging:

All of this requires one little barrier to be broken down: adding sequence numbers to index operations. Easy: we just need to add a counter to every operation in the primary! So easy, we saw several iterations of community members and employees trying their hand at it. But as we peeled back the layers of the onion, we discovered it was much, much more complicated than what it first appeared. Almost 6 years after we were first discussing how useful a Changes API would be, we still don't have one. What gives?! The purpose of this blog is to share what happened behind the scenes and give some insights into the answer for this question.

In the last two years, we practically rewrote the replication logic from the ground up. We took the best things from known academic algorithms, while making sure we could still ensure the parallelism that makes Elasticsearch so fast: something many if not all traditional consensus algorithms can't do. We collaborated with distributed systems specialists and built a TLA+ specification for our replication model. We added lots of tests and test infrastructure.

This blog is necessarily technical, as we dig into some of the guts of how Elasticsearch does replication. However, we've aimed to make it accessible to a wider audience by explaining/defining/linking to some terminology, even if you may already understand it. First, let's dive into some of the challenges Elasticsearch deals with.


Before we go much further, we have to talk a bit about our replication model and why it matters. In an Elasticsearch data index, data is split up into what are called "shards" which are basically sub-divisions of the index. You may have 5 primary shards (basically 5 sub-divisions of the index) and each primary shard may have any number of copies (called replicas) of that primary. But it's important that there's only 1 "primary shard" (often shortened to primaries) for each sub-division. The primary shard accepts indexing operations (indexing operations are things like "add documents" or "delete a document") first and replicates the index operations to replicas. It is therefore fairly straightforward to keep incrementing a counter and assign each operation a sequence number before it's being forwarded to the replicas. And as long as nobody ever restarts a server, you have a network with 100% uptime, your hardware doesn't fail, you don't have any long Java garbage collection events, and nobody ever upgrades the software, this straightforward approach actually works.

But we live in the real world and it's when these assumptions change that Elasticsearch can end up in a "failure" mode and "recovery" process. If they affect a node running a primary shard, it may require the primary to step down and for another replica to take its place. Since the change can come abruptly, it is possible that some ongoing indexing operations were not fully replicated yet. If you had 2 or more replicas, some of those operations may have reached one and not the other. Even worse, because Elasticsearch indexes documents concurrently (which is part of why Elasticsearch is so fast!), each of those replicas can have a different set of operations that do not exist in the other one. Even if you run with one replica (the default setting in Elasticsearch) there might trouble. If an old primary copy comes back and is added as a replica, it may contain operations that were never replicated to the new primary. All of these scenarios have one thing in common: the history of operations on shards can diverge upon primary failure and we need some way to fix it.

Primary Terms & Sequence Numbers

The first step we took was to be able to distinguish between old and new primaries. We have to have a way to identify operations that came from an older primary vs operations that come from a newer primary. On top of that, the entire cluster needs to be in agreement as to that so there's no contention in the event of a problem. This drove us to implement primary terms. These primary terms are incremental and change when a primary is promoted. They're persisted in the cluster state, thus representing a sort of “version” or “generation” of primaries that the cluster is on. With primary terms in place, any collision in the operation history can be resolved by looking at the operations’ primary term. New terms win over older terms. We can even start rejecting operations that are too old and avoid messy situations.

Once we had the protection of primary terms in place, we added a simple counter and started issuing each operation a sequence number from that counter. These sequence numbers thus allow us to understand the specific order of index operations that happened on the primary, which we can use for a variety of purposes we'll talk about in the next few sections. You can see the assigned sequence number and the primary term in the response:

$ curl -H 'Content-Type: application/json' -XPOST -d '{ "bar": "baz" }'
  "_index" : "foo",
  "_type" : "doc",
  "_id" : "MlDBm10BditXXu4kjj5E",
  "_version" : 1,
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  "_seq_no" : 19,
  "_primary_term" : 1

Notice the _seq_no and _primary_term which are now returned with the response.

Local and Global Checkpoints

With primary terms and sequence numbers, we had the tools we need to theoretically be able to detect differences between shards and realign them when a primary fails. An old primary with primary term x can be recovered by removing operations with primary term x which don't exist in the new primary's history and missing operations with a higher primary term can be indexed into the old primary.

Sadly, comparing histories of millions of operations is just impractical when you're simultaneously indexing hundreds of thousands or millions of events per second. Storage costs become prohibitive and the computational effort of a straightforward comparison will just take too long. In order to deal with this, Elasticsearch maintains a safety marker called the global checkpoint. The global checkpoint is a sequence number for which we know that all active shards histories are aligned at least up to it. Said differently, all operations with a lower sequence number than the global checkpoint are guaranteed to have been processed by all active shards and are equal in their respective histories. This means that after a primary fails, we only need to compare operations above the last known global checkpoint between the new primary and any remaining replicas. When the old primary comes back, we take the global checkpoint it last knew about and compare operations above it with the new primary. This way, we only compare operations that need comparing rather than complete history.

Advancing the global checkpoint is the responsibility of the primary shard. It does so by keeping track of completed operations on the replicas. Once it detects that all replicas have advanced beyond a given sequence number, it will update the global checkpoint accordingly. Instead of keeping track of all operations all the time, the shard copies maintain a local variant the global checkpoint called the local checkpoint. The local checkpoint is a sequence number for which all lower sequence numbers were processed on the shard copy. Whenever the replicas acknowledge (or ack) a write operation to the primary, they also give it an updated local checkpoint. Using the local checkpoints, the primary is able to update the global checkpoint which is then sent to all shard copies during the next indexing operation.

The following animation shows what happens as sequence numbers and global/local checkpoints are increased given concurrency challenges in the face of things like lossy networks and sudden failures:


As index operations are sent from the primary to the replicas concurrently, we keep track of the highest sequence number that every replica has acknowledged receipt of and call this the global checkpoint. The primary tells all replicas what the global checkpoint is. Thus, if a primary switches, we need to compare and potentially re-process only the operations since the last global checkpoint rather than all the files on disk.

The Global Checkpoint has another nice property — it represents the boundary between operations that are guaranteed to stay (they are in the histories of all active shards) and the region which can contain operations that may be rolled back if the primary just happened to fail before they were fully replicated and acknowledge to the user. This is subtle but important property which will be crucial to a future Changes API or Cross-Datacenter Replication features.

The First Benefit: Faster Recovery

We skipped over how the actual recovery process worked prior to Elasticsearch 6.0. When Elasticsearch recovers a replica after it has been offline, it has to make sure that that replica is identical with the active primary. Inactive shards have synced flush markers to quickly make this validation but shards with active indexing simply have no guarantees. If a shard goes offline while there is still active indexing, the new primary shard then copies Lucene segments (which are files on disk) across the network. This can be a heavy, time-consuming operation if those shards are large. This had to happen because we weren't keeping track of individual write operations (sequence numbers) until 6.0 and behind the scenes, Lucene merges all the adds/updates/deletes into larger segments in a way that you can't recover the individual operations that made up the changes… that is, unless you keep the transaction log (or translog) around for a period of time.

That's what we now do: we keep the translog until it grows "too large" or "too old" to warrant keeping any more of it. If a replica needs to be "brought up to date" we use the last global checkpoint known to that replica and just replay the relevant changes from the primary translog rather than an expensive large file copy. If the primary's translog was "too large" or "too old" to be able to re-play to the replica, then we fall back to the old file-based recovery.

If you've been operating a large cluster that has real network disconnects, restarts, upgrades, etc, we expect this will make you significantly happier as you won't be waiting for long periods as shards recover.

Things to Know

As mentioned in the last section, a translog is kept until it's "too large" or "too old" to warrant keeping it. How do we determine what's too large or too old? It's configurable, of course! In 6.0, we're introducing 2 new translog settings:

  • index.translog.retention.size: defaults to 512mb. If the translog grows past this, we only keep this amount around.
  • index.translog.retention.age: defaults to 12h. We don't keep translog files past this age.

These settings are important because they affect how well the new, faster recovery works as well as the disk usage. A larger translog retention size or a longer age will mean that you'll have a higher chance of recovering with the new faster recovery vs relying on the older file-based recovery. However, they come with a cost: these increase disk utilization, and remember that transaction logs are per-shard. As a working example, if you have 20 indices and each has 5 primary shards, and you're writing lots of data over a 12 hour period, it's possible to have 20*5*512mb = 50GB of extra disk utilized by the translog until that 12 hour window rolls off. You can tune this up or down on a per-index basis if you have different recovery and sizing needs on different indices. For example, you may want to consider any adjustments to the translog retention windows if you expect machine or node maintenance.

Note: prior to 6.0 the translog size could grow to 512MB (by default) under indexing as well per the index.translog.flush_threshold_size setting. This means that the new retention policy will not change the storage requirements for active shards. The change does impact shards from which indexing stops. Instead of cleaning up the translog, we now keep it around for another 12 hours.

The Next Benefit: Cross-Datacenter Replication

As mentioned at the beginning of the post, there are lots of wonderful things we could do in Elasticsearch if we had ordered index operations. It took a while, but now we do. Faster recovery is the first use case we decided to build in: it allows us to test the waters of the new functionality we added.

But we know that cross-datacenter replication is also a common ask by our enterprise customers, so that's another feature we're going to be adding soon. This requires new APIs to be built, new additional monitoring functionality on top of the replication, and yes, a lot more testing and documentation.

There's still more to do

As you can see on the Sequence Numbers GitHub issue, we're off to a good start with the sequence numbers feature, but there is still work to be done! We think the work done so far represents a major step forward even if it doesn't cover everything we can build up/around sequence numbers. If you're interested in following our work as we continue, feel free to follow that ticket or PRs with the label :Sequence IDs or just ping us on discuss!

The framework for us to build on is there in 6.0 and we're excited by the next "what if" question we can ask and what kind of answers it may bring. For now, if you want to try out the new sequence numbers recovery, download 6.0.0-beta1 and become a pioneer!