This is the second post in a three-part series about Instant Aggregations. Read how it all started in The Tale of Caching and Why It Matters from Simon Willnauer and Shay Banon and how the magnificent end to the story in Instant Aggregations: Rewriting Queries for Fun and Profit. Enjoy the trilogy!
When writing software, adding cool new features is of course always great fun. But sometimes it’s also important to work on internal changes in the code base that enable those shiny new additions in the future. For example, there were a number of cool new ideas for Elasticsearch floating around that were essentially blocked by the lack of having a good intermediate representation for search requests arriving through the REST layer, which prevented early query optimizations and delayed parsing to the shard level.
For the upcoming release of Elasticsearch 5.0 we embarked on a large refactoring to change the way search requests work internally. In this blog post we want to highlight some of the changes and challenges that came along with the refactoring of queries and the search request, how it helped us improve our testing and how it enables great new features like "Instant Aggregations", that will be highlighted in a following blog post.
How search requests were sent across the cluster prior to the Refactoring
When you send a search request to a node in the cluster, the node receiving the request coordinates the search request from then on. The coordinating node identifies what shards the search request needs to be executed on, and forwards it to the nodes that hold those shards via the internal node-to-node transport layer. Each shard returns a set of documents, whose ids will be sent back to the coordinating node that is responsible for reducing the matching documents obtained to the top matching ones that need to be returned to the client. Once those documents are identified, they are fetched as part of the fetch phase and finally returned.
Before Elasticsearch 5.0, each node received the original search request, parsed the query and used other information available from the shard (like mappings) to actually create the Lucene query that was then executed as part of the query phase.
The body of the search request is not parsed on the coordinating node but rather serialized via the transport layer untouched, as an opaque byte array which holds nothing more than its json representation. This was historically done to avoid having to write serialization code for every single query and every single section of a search request, as elasticsearch uses its own binary serialization protocol. We eventually came to the conclusion that this was the wrong trade-off.
So what's the problem with this?
While simply forwarding incoming requests on the coordinating node as described above is simple and fast (at least on the coordinating node), there are some drawbacks:
- Each search request gets parsed multiple times, once for each shard of the target index. That means that even if with the default of 5 shards, a query will be parsed 5 times across the cluster (even multiple times in the same node if that node holds more than one shard that is relevant for the query). This is a potential waste of cpu-cycles as the search request could be parsed earlier and only once.
- For the same reason, if a query is malformed, it will be rejected with an error by each shard. This is why in Elasticsearch 1.x you get multiple parse failures in your response when you make a mistake in your query. It is all the same error, but it gets thrown by each shard. On 2.x we already deduplicate those kind of errors, but they are still thrown from all shards.
- The search request cannot be rewritten for optimizations on the coordinating node without impacting performance, as this would mean having to parse it twice.
- Parsing and query creation are tightly coupled, making unit testing of each individual step difficult without having the whole context of a shard.
All these drawbacks arise because before 5.0 there is no intermediate representation of a query within elasticsearch, only the raw JSON request and the final Lucene query. The latter can only be created on the shards, because Lucene queries are not serializable and often depend on context information only available on the shard. By the way, this is actually true not only for the "query" part of the search request, but for many other things like aggregations, suggestions etc.. that can be part of a search request.
So, wouldn't it be nice to be able to parse queries to an intermediate representation once and early? That way we would be able to eventually rewrite them, and optimize their execution, for example by shortcutting them to queries that are less expensive.
What we did
For the queries we achieved this by splitting the query parsing into two phases:
- Json/Yaml parsing that happens on the coordinating node, which allows us to represent a search request in a serializable intermediate format. This phase is independent from mappings and data present in the shards, to make sure that it can be performed on any node. This happens in a method called “fromXContent()” that every query has and which produces a new intermediate query object (all implementing the QueryBuilder interface).
- Conversion from elasticsearch serializable queries to lucene queries ready to be executed on the shard. This phase depends on mappings and information present in the shards, so it has to happen separately on each shard to be able to run the query against its data. The method handling this in the QueryBuilder interface is called `toQuery()`.
As a result of the split, the code is better organized as parsing is decoupled from lucene query generation. Having a serializable intermediate format for queries meant that we had to write serialization code for all of the queries and search sections supported in elasticsearch. Also, every query implements `equals()` and `hashCode()`, so that they can be compared with each other for easier caching and to aid their testing.
Once we moved parsing to the coordinating node, we could also move some validation to earlier stages of a search request and throw one single error earlier. Validation applies to malformed queries in terms of invalid json, queries that are missing some of their required values, or queries with invalid values provided.
In order to make all those changes in a safe way that doesn’t break existing behavior, we wrote extensive unit tests. For each query, we added randomized tests to verify that it can be properly parsed and serialized, added tests that the resulting lucene query is the expected one and also test explicitly that all of the required values are checked and validated correctly. In order to do so, we created a base test class which provides the code necessary for test setup and has some abstract methods that only need to be filled in for each individual query. This base class is shipped as part of our test framework helping downstream developers of custom queries with testing.
Most of our query parsing code had low unit test coverage before the refactoring. For example, the `org.elasticsearch.index.query` package that contains all the QueryBuilder and QueryParser classes had only 47% test coverage (actually, with randomized testing we cover more over time, these numbers refer to the coverage in one CI run) before the refactoring, going to above 77% coverage after the refactoring branch was merged.
How we did it
This query refactoring was really a long running task that needed a proper feature branch. Initially we thought it could be enough to only refactor the query part of the search request, but in the end we went for refactoring almost all parts of the search request. The branch stayed alive for several months and we took care of merging master in daily (or at least twice a week). In some cases we found bugs that we fixed upstream rather than on the branch, to get them out with regular releases as soon as possible.
As a team working on the feature branch, we decided to essentially replicate the development model of how we work on Elasticsearch master: instead of trying to plow through all changes needed and having an enormous bulk review at the very end, each change was submitted and reviewed as a small, manageable pull request against the branch. Having a closely knit team dedicated to this work, we could agree early on the goals of the changes, coding and architectural standards. This later made finding a reviewer familiar with those goals easy and made review cycles faster.
Given the amount of code that this refactoring touched, the challenging part was to keep the branch healthy while the task was in progress. We had to sit down and come up with some incremental steps to run tests while the refactoring was still in progress. It was all a matter of introducing the new functionality gradually, while leaving the old infrastructure alive until there was a complete replacement for it. Initially we introduced the two phase parsing but it all still happened on the data nodes rather than on the coordinating node. When all queries were migrated, we were able to move parsing to the coordinating node and start working on the remaining sections of the search request (e.g. rescore, suggestions, highlighting and so on).
After several months of wading through code that was originally written by multiple authors and at very different times through the evolution of the code base, we learned a couple of things:
- just because you’ve refactored a handful of queries already doesn’t mean you have any idea of what the other about 50 classes look like.
- in order to keep your branch healthy, it is vital to use CI to frequently run your test suite and deliver improvements in small incremental steps, even though this means doing more work in total.
- changing architectural decisions years into development does have a non-negligible cost attached, but it’s the right thing to do, pays off in terms of maintainability and enables new features.
All in all, the improvements made with this large refactoring become obvious when we need to go back to the old 2.x branch now and then to backport something from the current main development line. Not only has the parsing infrastructure become much more efficient, but introducing the intermediate query representation and decoupling parsing from query creation has lead to much cleaner code and the new test infrastructure makes debugging of parsing-related problems and writing new test much easier.
But the most important thing is, that we are now able to analyze, rewrite and optimize search request on the coordinating node much more easily. Features like “instant aggregations” that were long talked about, but never tackled because of the months of work required, suddenly became possible to implement quickly. In an upcoming blog post we will shed some more light on what this feature is and how it works.