19 11월 2013

Keeping Elasticsearch in Sync

By Andrew Cholakian

UPDATE: This article refers to our hosted Elasticsearch offering by an older name, Found. Please note that Found is now known as Elastic Cloud.

One of the trickiest parts of integrating Elasticsearch into an existing app is figuring out how to manage the flow of data from an authoritative data source, such as an SQL database, into Elasticsearch. In most cases this means utilizing Elasticsearch's bulk API. It also implies designing an application to effectively make data available in an efficient, robust, on-time manner. This usually requires modifying an application's workflow to replicate data in batches. This article is a survey of various patterns for accomplishing this task.

The Bulk API, a Must for Most Applications

This article presumes that Elasticsearch is being used as a secondary store, that is to say that some other database holds the canonical version of all data to be indexed in Elasticsearch. Whether the authoritative datastore is a traditional SQL database such as PostgreSQL, or a NoSQL database such as Riak, the underlying concepts at the application level are overwhelmingly similar.

The bulk API is a key part of most Elasticsearch replication stories for multiple reasons. First, using the bulk API to update multiple documents is much faster for both Elasticsearch and a client application than issuing multiple HTTP requests to update those same documents. Second, using batch update strategies in the application can help reduce repeat updating of documents; it is often the case that some frequently changing fields in a document, pageviews or ‘likes’ for example, need only occasional updates. Finally, thinking in terms of batches means coming to terms with inconsistency of data between Elasticsearch and the authoritative data source. It is rare to have an application that actually requires tight coupling, and the tradeoffs are seldom worth it. Generally speaking, working effectively with Elasticsearch means tolerating small delays between the authoritative dataset and Elasticsearch. It is of course possible to eliminate such delays, but the price is high.

In most cases, when designing a replication strategy, the two most important concerns to consider are acceptable replication delay and data consistency. These are business requirements that must be gathered up front. Perfect synchronization between an application’s authoritative datastore and Elasticsearch is rarely needed, and seldom possible, though it may be requested by product owners. In fact, most applications can tolerate multiple seconds, minutes, hours, or even days of delay. Even in the case where only a small delay is considered acceptable the bulk API can be used to great effect.

The Problems of Too-Frequent Updates and Non-Batch Updates

A sound Elasticsearch replication strategy seeks to minimize both the number of updates and to group updates into larger batches. Opting not to use the bulk API prevents Elasticsearch from internally optimizing the execution of the request and to expend more time processing the overhead from a greater number of HTTP requests. Sending individual updates requests can slowdown the client application itself for many of the same reasons Elasticsearch can be internally slowed, HTTP overhead and less potential for efficient use of shared data and tasks during the update process.

Given that Elasticsearch is based on Lucene, the write performance of Lucene is an important consideration. Lucene makes the tradeoff of fast and flexible reads at the expense of cheap writes. Under the hood an Elasticsearch index is actually composed of multiple Lucene indexes. Each Lucene index is in turn composed of multiple ‘segments’ inside of which documents reside. Lucene segments are essentially immutable collections of documents. Documents inside a segment may not be modified. When an update is made to a document the old document is marked as deleted in its existing segment and the new document is buffered and used to create a new segment. This results in substantial costs for the modification of content since nothing can be changed in-place. Further worsening the performance picture, all analyzers must be re-run for documents whose values change, incurring potentially high CPU utilization. It should also be noted that when the number of segments in the index has grown excessively and/or the ratio of deleted documents in a segment is high, multiple segments are merged into a new single segment by copying documents out of old segments and into a new one, after which the old segments are deleted. It is for these reasons that frequent updates should be avoided. For those interested in more of the internals of Elasticsearch, and adjusting knobs that affect the performance of these internals, Elasticsearch From the Bottom Up is a recommended starting point.

The following scenario will illustrate these problems. Assume a blog application with a ‘posts’ table in a SQL database that should be replicated to Elasticsearch. The ‘posts’ table in this example has a ‘pageviews’ property that is updated every time someone views a post. This table is replicated to a ‘posts’ index in Elasticsearch, where the ‘pageview’ field is used as an index-time boost, the more pageviews received, the higher the document scores. In this scenario it is easy to see how a simple strategy for performing individual updates, where a separate update request is sent each time the document is updated in the authoritative datastore, would have catastrophic consequences. If an article appears on the front page of a popular site then thousands of requests per minute could hit the page. This would trigger thousands of update requests to the authoritative SQL datastore which in turn would be replicated to Elasticsearch. Since updates to Elasticsearch are expensive, especially given that each counter update will force Elasticsearch to re-analyze all the text for the stored blog post object, the results may be disastrous. This scenario could end up seriously increasing resource utilization to the point where performance may be impacted. The authoritative SQL database will likely be fine so long as the counter is incremented with an efficient query, but Elasticsearch will have far more work to do.

Using Queues to Manage Batches

There are many ways to bulk import documents into Elasticsearch, but the most versatile way is to use a queue with some sort of uniqueness constraint. The exact technology underlying the queue is unimportant except in cases of high load. Whether SQL tables, AMQP message queues, Redis sorted sets, or something else entirely are used as a queue implementation, the general workings of such a system are universal. The basic idea is to define an acceptable interval between document updates and to update the document no more frequently than that interval. Using such a queue, when data changes in the authoritative data source a queue entry that points to the resource is created.

So, in our previous posts example, we would create a queue entry with the data {record_type: "post", record_id: 123}. If the document is modified a second time, it is important that no duplicate queue entry be made. This deduplication of multiple changes over a given time period effectively insulates Elasticsearch from excessive write churn. This sort of behavior generally requires a unique primary key index on the queue. To execute items on the queue, a periodically scheduled worker task can, for instance, dequeue the first 1000 such queue entries, look up the source record with an SQL query, build up 1000 bulk entries in memory, then issue the update for all 1000 using a bulk API call. An example of this pattern can be seen in the pseudocode below.

// Our ORM Model, linked to a table in our database
class PostModel < ORMModel {
  function after_update(function (instance) {
    elasticsearch_queue.add(instance);
  })
}

// Our queue implementation, it formats the queue entry for an instance
// of the ORM model. The queue server could be anything from an SQL table to an AMQP queue
class ElasticsearchQueue < Queue {
  function add(instance) {
    queue_server.insert_unless_exists(record_type: instance.class, record_id: instance.id);
  }
}

// Our worker, scheduled to run every N minutes.
// It will read 1000 entries off the queue, issue a single batch request
// to elasticsearch, and repeat if more items are still on the queue
class QueueWorker < Worker {
  function work() {
    errorCount = 0;
    // Transactionally reserve 1000 entries at a time, but don't delete them from the queue yet
    // The entries will be only be fully deleted once the bulk operation has successfully completed
    while ((queue_entries = Queue.reserve(limit: 1000)) && records.length > 0) {
      try {
        // In this example we will build up the JSON-like body for the 
        // elasticsearch queue API request. It consists of newline separated
        // JSON documents comprising action metadata, and document values
        bulk_body = "";
        record_class = Class.named(queue_entry.record_type);
        i = 0;  
        queue_entries.each(function (queue_entry) {
          i++;
          record = record_class.find(id: queue_entry.record_id);
       
          // Note, a production ready version of this code would double-check that the document still exists
          // and would create a bulk delete request if the record was no longer present
          action_metadata = {index: {
                              _index: record_class.elasticsearch_index,
                              _type: record_class.elasticsearch_type,
                              _id: record.id}}
       
          bulk_body += "\n" unless i == 1;
          bulk_body += encode_json(action_metadata);
          bulk_body += "\n";            
          bulk_body += encode_json(record);
        });
        
        bulk_body += "\n"; // The bulk API requires termination by a newline  
        http_client.post("http://host.for.elasticsearch:9200/_bulk", body: bulk_body);
        // Now that we're sure processing has succeeded we can fully delete the queue entries
        queue_entries.delete
      } catch (StandardError ex) {
        queue_entries.unreserve;
        
        // Simply let the while loop retry up to 5 times.
        errorCount += 1
        if (errorCount >= 5) {
          throw(CannotReplicateElasticsearchError);
        } else {
          Logger.warn("Error processing elasticsearchqueue, will retry. Attempt: $errorCount", ex)
        }
      }
    }
  }
}

This sort of pattern has a number of desirable properties. For one thing, it mostly isolates the workings of replication from other parts of the codebase. The queuing system can operate independently of other components, and the background worker can shift the computational expense of the Elasticsearch lifecycle away from the main execution flow. Additionally, the use of a queue and workers puts a throttle on the rate of insertion, a desirable property, and simultaneously decouples replication error handling from save operations on ORM models. By tuning the number of workers the maximum insertion speed into Elasticsearch can be constrained, even if every object in the database suddenly inserts an entry into the queue. The queue also isolates insertion errors from the main model lifecycle, meaning that in a scenario where Elasticsearch is overloaded, the ORM’s lifecycle still works. This system will degrade softly, experiences adverse effects in the form of prolonged delays in new data appearing in Elasticsearch rather than in full system failure. The is a small price to pay for avoiding slowdowns or worse.

A full implementation of this system handles deletion elegantly as well. When an item is deleted from the main datastore a queue entry is created pointing at the now missing item. The same worker code can then notice that the item is missing and issue a bulk delete request instead of a bulk index request. In the bulk API delete requests can be interleaved with index requests, simplifying the code. Implementing this logic is left as an exercise for the reader.

There are some issues with this system however. Queues are remarkably poor at handling a full-reindex of data. This requires inserting a queue entry for every record in the database. If the database is large, this can be a very expensive operation in and of itself. However, inserts into almost any queue will almost always be faster than indexing a document to Elasticsearch.

Batching Based on Ranges

Not every dataset needs to deal with updates to documents. Some datasets only insert, and never update data. An example of this is a logging application. Logs are generally immutable. While a queue is still an acceptable method of dealing with the recording of logs, batching based on ranges of immutable data tends to be a superior option. Let’s take the example of an application that logs a history of a user’s geographic location for a Foursquare-like app. In this case the location history of a user is immutable, and we can opt to remove some of the ceremony of using a queue.

Let us assume that we have the business constraint of making this data available at most twenty seconds after a user has posted a location update. Replication can be implemented by having the periodic tasks scheduled to run once per minute. These tasks will add all records to Elasticsearch since the last time the background task ran. Since Elasticsearch makes data available for search within 1 second of a search by default (see the text concerning refresh_interval in Elasticsearch from the Bottom Up, and we’ll have to allow for application-side processing time, this task has been set to run every ten seconds to give a ten second margin of error in hitting the twenty second deadline. The logic for this sort of system can be seen in the following pseudocode.

// Gets executed every ten seconds
class LocationImportWorker < PeriodicWorker {
    function work() {
        // If a job runs long, we want the next one to block waiting on this one
        // This is a naive way of handling this situation, ideally both jobs could run concurrently using 
        // more careful state tracking that is beyond the scope of this example
        lock = acquire_lock("location_import_work"); 
        
        last_job_range_end = get_last_job_range_end();
        // Use the datastore's clock to prevent any bugs from clock skew
        // Choose 1 second ago, since the current second may yet see more records!
        this_job_range_end = Datastore.query("now() - '1 second'); 

        locations = UserGeoLocations.where("timestamp > $last_job_ended_at AND timestamp < $this_job_range_end);
        // In this example the boilerplate for generating a bulk HTTP request has been omitted
        elasticsearch_client.bulk_index(locations); 
        
        set_last_job_range_end(this_job_range_end);
        
        lock.release();
    }
}

Notice that this is fairly simple logic, and requires minimal infrastructure, nothing more than a simple, single worker process. The simplicity of this architecture makes it an ideal choice when dealing with immutable datasets that only grow. This simplicity has a powerful secondary benefit as well, increased performance due to removing the need for a queue, and getting rid of the costs of queue/dequeue operations. Note that given a transactional datastore it is possible for slow transactions or transactions with inaccurate timestamp fields to be missed by the background job. This can be ameliorated by either guaranteeing transaction runtimes with an enforced timeout, waiting longer to index data, giving slow transactions a chance to finish, or by relying on DB specific features, such as Postgres txids.

A powerful enhancement to this replication scheme is time based index quantization. What this involves is creating a new index for every N time periods. Indexes could be named something like user_geolocations_YYYY_MM. Since Elasticsearch queries can run across multiple indexes with practically no performance penalties (so long as the number of indexes is not absurd), this can be a great strategy for improving query speed so long as queries can be isolated to specific time periods. The other situation where time based index quantization can be useful is when data is only retained temporarily. If in this scenario millions of records are recorded each month, but only the last six months are needed, past months can be easily removed by deleting the full index for that month and issuing queries only to the indexes for the most recent six months. Deleting a full index is much faster than deleting individual documents, as it is essentially a matter of deleting a relatively small number of files on the filesystem, rather than dealing with the slow to update segment files.

Combining Queues and Ranges

It is possible to fix the full-reindex case of the queue pattern, where when reindexing the full dataset one must insert a queue entry for each row in the database, by combining both queue and range based approaches. The key here is to retroactively update rows via the range based approach for infrequent full table ‘scrubs’, while relying on hooks in the ORM lifecycle to account for any changes since then. That approach allows one to periodically issue full table reindexes with little cost. There is a slight risk of a race condition between the two systems, but the probabilities of it being an actual issue are slim, and can be avoided with careful locking if required.

Error Handling in Replication

In either strategy error handling is of paramount importance. A replication strategy that does not ensure the replication of data is generally not a very useful one. All sorts of things may happen to interrupt replication: the authoritative database may be down, the elasticsearch cluster may not work, etc. It is for this reason that all replication operations should be idempotent. In other words, running an operation 1,000 times should leave the system in the same state as running it a single time. Idempotence simplifies error handling by making it a simple matter of re-running the standard code, rather than having a special error handling execution path.

The queue strategy above is inherently idempotent, all that is needed is a mechanism to detect and retry failed runs. This can be seen in the try/catch block in queue pseudocode. Additionally, it is important to note that the Queue entries should be transactionally locked during processing, and only deleted once all records have been successfully sent to the bulk API . To achieve this sort of behavior the Queue in use should have some sort of transactional capabilities. Alternatively, ensuring that only a single worker is running removes the need for any sort of locking. In that case, queue entries need not be explicitly reserved, they can simply be read at the start of the run, then deleted after the bulk API operation has completed.

The range based worker operates on similar principles. It is, in fact, simpler than the queue based one since there is only a single value to work with, the last_range_end value. So long as that is never updated till the transaction fully completes, the job may simply be re-run to fix any inconsistent state. Note that it is possible for there to be a partial write with the Bulk API, meaning that if an error is returned it is not always possible to know how many items were or were not written to elasticsearch.

Why Marking Source Records is an Anti-Pattern

One other approach is to add a special marker to source records in the authoritative database indicating that they are in need of an update. This is an approach to be avoided in most scenarios. The workings of such a system generally involve adding a field or fields to the source datastore that can be used determine whether the record needs indexing or not. These fields can take the form either of incrementing counters, timestamps, or even a simple boolean flag. While these systems are fine from a functional standpoint, there are a few properties that make them problematic at mid to large scale.

The primary issue is that when isolation between the replication state of a source record and the actual data in the source record is removed, severe performance problems can appear. In the queuing system previously described, for instance, inserts into the queue have no performance impact on the performance of the table within the authoritative datastore. If that state is kept within the rows, however, high write throughput can create an effective lock on the row given long transactions, especially in an MVCC system such as PostgreSQL. This throughput should not be too bad in the general case, but if one needs to re-index the entire table, an UPDATE SET reindex=TRUE across millions of rows could be disastrous. These systems also require a greater amount of storage, at least one additional column must be added to the source table. A queue architecture, by contrast, only needs temporary storage for records in a state of flux. Additionally, if the records are particularly large, say blog posts or other such things, modifying the source records can be less performant if full rows need to be internally rewritten by the underlying database engine. Finally, it should be noted that from an architectural standpoint such systems are harder to maintain by virtue of the tighter coupling between replication behavior and the individual rows of the source datastore.

Going Forward from Here

The two advised strategies in this article should work well for the majority of applications. Variations on these patterns abound, and many details may change when tailoring a solution for a particular scenario. Generally it is best to use a good Elasticsearch binding that helps to build the somewhat awkwardly formatted bulk API requests. Newline separated JSON can be cumbersome to deal with efficiently. For further information on the Elasticsearch bulk API, please read the official docs.