Customers

Using ElasticSearch and Logstash to Serve Billions of Searchable Events for Customers

We're pleased to welcome our first guest blogger, Ralph Meijer, Software Developer at Rackspace working on Mailgun. Ralph graciously offered to send us a write up of Mailgun's use of Elasticsearch after we met up at Monitorama EU. He has been an enthusiastic member of the Elasticsearch community for some time now and frequently attends our Elasticsearch Netherlands Meetup.

Needle in the Haystack
Mailgun sends and receives a lot of emails and we track and store every event that happens to every email. It adds up to billions of events per month that we need to make available to our customers, along with the ability easily parse through this data with full text search. Below is a detailed account of this challenge and how we solved it with the help of Elasticsearch and Logstash (note: we were delighted to hear about Logstash joining Elasticsearch shortly after completing this project).

Events

When thinking of events in Mailgun, consider the following: an incoming message is accepted or rejected, an outgoing message is delivered or rejected (spam? bounce?), the message is opened or a link in the message is clicked on, the recipient wants to unsubscribe. Beside the kind of event, each of these events can have meta data that helps our customers figure out what happened to their messages, why, and when it happened. This meta data includes message sender, recipient address, the message-id, SMTP error codes, link URLs, geographic information, etc.

Every event basically comes down to a timestamp and a number of fields with their values. A typical representation of an event is an associative array, also known as dictionary or hash table.

Giving Access To Events

Assuming we have all these events, we need a way for our customer to use them in some way. In the Mailgun control panel, there is a logs tab, that shows log events in reverse chronological order and allows one to filter the view by domain and severity. An example log line is this:

Mailgun Logs UI

In this case, the event has severity 'warn' as the SMTP code signifies a temporary problem and we will reattempt delivery. There are two fields here, a timestamp and a pre-formatted, unstructured text message. In addition, we color code the event based on severity.

Besides this web interface, we also have an API for retrieving logs programmatically, and way to set up webhooks for getting notifications when events happen. The latter notifications are already structured documents with many meta-data fields formatted as JSON. For example, the SMTP code is in its own field, as are the sender and recipient address and the message subject.

Unfortunately, the old logs API was limited. It only returned events for message delivery (no tracking) and the contents were a pre-formatted string as shown in the control panel. There was no way to get or search in individual fields (like those in the webhook notifications) or do any kind of full text search. Similarly, the control panel lacked full text search.

Elasticsearch For Storage And Querying

To provide an API and access via the control panel, we wanted a new backend to improve on these shortfalls, along with several additional requirements:

  • Allow filtering on most properties.
  • Allow full text search.
  • Capable of storing events for at least 30 days, but with limited retention.
  • Easily extendable by adding nodes.
  • Resilient to failing nodes.

Enter Elasticsearch, a search engine that can index new documents in near real-time and make them immediately available for querying. Elasticsearch is based on Apache Lucene and allows for setting up clusters of nodes that store any number of indices in a distributed, fault-tolerant way. If a node disappears, the cluster will rebalance the (shards of) indices over the remaining nodes. You can configure how many shards make up each index and how many replicas of these shards there should be. If a master shard goes offline, one of the replicas is promoted to master and used to repopulate another node.

Elasticsearch is document-oriented and optionally schema-free. This means that you can throw any JSON document at it and have it indexed as individual fields. This is a perfect match for our events.

Elasticsearch also has a powerful querying/filtering interface for search in particular fields or do full text searches.

Getting Events Into Elasticsearch

There are quite a bit of projects and services out there focussed on logging events. We ultimately picked Logstash, a tool for collecting, parsing, mangling and passing on logs.

Internally, the events pushed out via our webhooks are also used in other parts of our system. We currently use Redis for this. Logstash has a Redis input plugin that retrieves log events from a Redis list. After some minor filtering, the events can then be sent out via an output plugin. A very commonly used output plugin is the Elasticsearch plugin.

A great way to use Elasticsearch's very rich API is by setting up Kibana, a tool to "make sense of a mountain of logs". The new incarnation, Kibana 3, is fully client side JavaScript, and will be the default interface for Logstash. Unlike previous versions, it no longer depends on a Logstash-like schema, but is now usable for any Elasticsearch index.

Mailgun Kibana Dashboard

Authorization

With this setup we have a full solution for putting our events in a central place and a rich API to dive into these logs. However, we don't want to expose all the events to just everybody, so we need a way to add authorization to the equation. Currently Elasticsearch nor Kibana offer authentication and authorization, so directly exposing the Elasticsearch API is not an option.

We opted for building a two-staged proxy setup. One proxy for authorization and rate-limiting and one for translating requests to our new Events API to Elasticsearch requests. We have made the former proxy available under the Apache 2.0 license as vulcan on Github. We also ported our old, now deprecated, Logs API to use Elasticsearch instead of the old system.

Index Design

There are various ways to select how you organize your indexes, based on the amount of documents (per time period) and querying patterns.

Logstash, by default, creates a new index per day, which then contains all events received from its inputs. You can configure it to change the rotation period (if any) or make separate indices per user or other property (like the type of event).

In our case we have about 1500 events per second and we wanted retention periods configurable per account. The options:

  • One big index.
  • One index per day.
  • One index per user account.

Of course each these could be split up further by sharding on, for example, the event type, if needed.

One way to manage retention in Elasticsearch is by setting TTLs on each document. Elasticsearch will then periodically delete expired documents in bulk. This makes it easy to do custom retention per account. However, it turns out to be a rather expensive (repetitive) operation.

A much lighter weight operation is dropping entire indices. This is why Logstash defaults to daily indices. When an index falls outside your retention period, you can simply drop the index from a cron job.

However, this doesn't give us custom retentions. Looking at our numbers, it was more efficient for us to keep all entries and limit queries to older indices from the API layer. We have many user accounts, and keeping an index per day and per user would become impractical. Of course, keeping all logs for all accounts in daily indices means that we are storing all of that on disk. However, if an account has a two day retention, it does limit the amount of data kept in caches and the processing power for queries that span multiple days. On top of that, for our own internal uses, it is very helpful for debugging to still have older logs and access them through Kibana.

Mappings

To define how a (fields in a) document should be massaged, indexed and stored in an index, Elasticsearch keeps a so-called mapping. For each field for a it defines the type, how values are analyzed and tokenized for indexing and querying, whether the value needs to be stored and some other settings. By default, the mapping is dynamic, i.e. Elasticsearch will try to determine the type of the field from the first value it gets and applies settings to it.

If you have homogeneous data, this works quite well. However, if you have data coming from different sources, or different types of logs, in our case, it might be that the value of a field with the same name represents something else and even has a different type. As Elasticsearch will reject an attempt to index a document with a type mismatch, we need to define a mapping.

I defined a single mapping for the types of log events exposed via our Events API. Not all types events have all fields, but a field with a particular name always means the same.

Analysis

By default, fields are set up with the standard analyzer as part of the mapping. In general, this means that a string value is put in lower case and split up in words. These tokens are then put in the index to point to this document for this field.

In some cases, you want something different because it has undesirable effects. For example identifiers like account IDs, e-mail addresses or URLs, the default tokenizer will split on dashes and doesn't consider the domain name as a single token (besides its constituent domain labels). If you then facet on the domain name field, you get (parts of) the individual domain labels as a facet.

To resolve this, can set the index attribute for such fields to not_analyzed. This causes the value to be indexed as is, without any case mapping or tokenization. When applying this to the domain.name field, for example, each domain will be a facet.

If you still want to be able to have the document to be found by parts in such fields, you can use the so-called multi-field type. It allows you to map the same value to different core types an/or different mapping attributes and make them available under different names. We used this technique for IP addresses, where the default field (e.g. sending-ip) has the type ip, and a non-default field (e.g. sending-ip.untouched) is configured to be not_analyzed and of type string. You can then use the default field for doing range queries on IP addresses, and the .untouched one for faceting.

For most fields, though, we haven't done this (yet). We are looking into applying the pattern capture tokenfilter to some of these fields (like e-mail addresses) in the near future, in combination with the multi-field type trick.

Monitoring

To see how your cluster is holding up, you really want to set up monitoring for it. Elasticsearch has a great API for retrieving the cluster state and node statistics. We have set up Graphite for storing these metrics and made comprehensive dashboards out of them, below is one of the panels.

Mailgun Graphite Monitoring Dashboard

To poll these metrics and pass them to Graphite I created Vör, which was made available under the MIT/X11 License by Mochi Media. An additional poller for keeping track of the size of Redis lists is also in the works.

On top of that, we count things like messages received, clicks, opens, API requests and time such requests via StatsD and have added these metrics to our Graphite dashboards, too.

Mailgun Graphite Event Dashboard

This is a great way to see what is happening. Graphite has a large number of functions you can apply to (series of) metrics before graphing or returning the result as a JSON document. For example, it is really easy to create a graph that shows the number of API requests against server load or indexing speed.

Current Setup

Some statistics on our set up:

  • Between 40 to 60 million log events per day.
  • 30 days log retention.
  • 30 indexes
  • 5 shards per index
  • 1 replica for each shard
  • 2x 50 to 80 GB per index (times two because of the replica shards).

For this we have set currently set up a total of 9 Rackspace cloud servers, in various configurations, based on our findings with the dashboards above:

  • 6x 30GB RAM, 8 vCPUs, 1TB disk: Elasticsearch data nodes.
  • 2x 8GB RAM, 4 vCPUs: Elasticsearch proxy node, Logstash, Graphite and StatsD.
  • 2x 4GB RAM, 2 core: Elasticsearch proxy node, Vulcan and API server

Most of these will eventually move to dedicated boxes, while retaining the ability to create additional cloud servers to scale up.

The Elasticsearch data nodes are configured to take and lock 16GB of RAM for the JVM heap. The rest of the configuration is pretty standard. We set the maximum fieldcache size to 40% of the heap to make sure the cluster does not break down on expensive and ill-advised faceting and sorting on fields with many different values. We also increased a few cluster wide settings for speeding up recovery and rebalancing. Especially the indices.recovery.max_bytes_per_sec is set very low by default, for the amount of documents we store.

Conclusion

We are very happy with Elasticsearch for keeping our log events and have been getting very positive feedback from our users about the new Events API and the improvements in the Logs tab of the Control panel. Digging through our logs by searching in almost any field is a significant improvement for them and ElasticSearch makes this fast and painless. Also, this tool chain of Logstash, Elasticsearch and Kibana is also ideal for internal application logging.

If you want to know more about how we use Elasticsearch or questions about the Events API, be sure to drop us a line! You can read more details about the Events API itself on the Mailgun blog.

Happy logging, er, emailing!

Many thanks to Ralph for sharing his experiences with the wider community. If you're interested in being a guest blogger or have other Elasticsearch related write ups you'd like us to help you reshare, let me know.