14 September 2017 Engineering

Distributed Watch Execution in Elasticsearch 6.0

By Alexander Reelsen

Try this feature today. Download the 6.0.0 preview release to become a Pioneer.

Ever since introducing watcher in Elasticsearch 1.x we had one big issue on our roadmap: Making the execution of watches distributed. With the Elasticsearch 6.0 release, we will finally do it. This blog post describes the journey to distributed watch execution.

How it all started

Everything should start simple. New features should start simple and become powerful over time, once the simple solution has proven to be useful. This is also how Watcher started. A set of small features, that allows you to execute an input, check if it matches a condition and then optionally have a set of actions being executed. All good so far.

But how do we manage those executions? Where are they executed? How do we prevent parallel executions of the same watch? We decided for the simple solution first. Preventing parallel executions in the cluster can be solved easily when you decide to just run on the master node. Preventing parallel executions on a node can be solved with some smart locking. And all of this can run in a dedicated thread pool just for watcher executions.

This solution helped us to get off the ground, but came with some drawbacks. First and foremost, we are using the master node for a task that is usually not a master node task. This implies that additional resources are required. You need some memory to hold the watches in memory for execution, you will have higher IO due to the watch execution itself and this node will execute searches and index operations on the local cluster. Those are not typical tasks for a dedicated master node. On top of that this also means there is a scalability limit, as we can only have a single master in our cluster and all the other resources would go unused.

So while this solution worked for some time, we knew we had to improve.

Laying out a path

One of the good things of this implementation was the fact, that it is very internal, so the user does not really notice, where a watch is executed or loaded. The user is just putting watches and expects them to be executed. This means, we did not have to do any API changes for this feature.

So, how did we approach the problem? Should we introduce some fancy consistent hashing algorithm to scale the watch execution across all the nodes? That might work, but it turns out we already have a great mechanism to distribute data across a cluster, so why not piggyback on that one? This mechanism is called a shard and is basically the unit of scale in Elasticsearch. What if we execute the watches where the data is? So if you have one shard, you would only have one node where watches are executed. If you have 2 shards and 1 replica, watches would be executed across four nodes. Each shard would be responsible for executing 1/nth of the data, with n being the number of total shards to store watches.


By using shards as our base for watch execution we get a few things completely for free.

First, we now have data locality. As the execution always happens on the node that is holding a .watches shard, there is no network traffic happening. When a watch execution starts, the watch is always fetched locally and everything happens on that node.

Second, scaling out becomes simple. We can scale by adding more primaries or replicas. In the first graphic you would see, that only two nodes are executing watches, by adding a replica, you could make sure that all three nodes are actually executing watches.


Third, we can now easily control where the execution of watches should happen, without having to write any code for this. Elasticsearch itself already supports this, thanks to shard allocation filtering. We could run a command like this to ensure watch shards are only placed on nodes with a certain attribute

PUT .watches/_settings
{
  "index.routing.allocation.include.role": "watcher"
}


That's almost it, folks!

While working on this feature we got rid of some locking code, which should improve single node scalability, and reduced memory usage by removing an additional in memory watch representation (we only need to keep the information in memory, when a watch should be executed next). On top of that, no APIs have changed for distributed watch execution.

If you want to know where a watch was executed on, you will find this information now in the watch history, as an additional node_id field is stored.

Last but not least, there is the case of the rolling upgrade from 5.x to 6.x. Again, we tried to do the simple thing here. Distributed watch execution will only ever be enabled, if all nodes in a cluster are on 6.x, otherwise the execution will still happen on the master node.

Hopefully this article gives you a good overview of distributed watch execution and encourages you to try it out in Elasticsearch 6.0. So please download 6.0.0-beta2 and become a pioneer!