Apache Storm supportedit

Added in 2.1.

Apache Storm is a free and open source distributed realtime computation system. Storm makes it easy to reliably process unbounded streams of data, doing for realtime processing what Hadoop did for batch processing.

-- Storm website

With Storm, one can compute, transform and filter data typically in a streaming scenario. As opposed to the rest of the libraries mentioned in this documentation, Apache Storm is a computational framework that is not tied to Map/Reduce itself however it does integrate with Hadoop, mainly through HDFS.

Installationedit

In order to use elasticsearch-hadoop, its jar needs to be available in Storm’s classpath. The Storm documentation covers this in detail but in short, one can either have the jar available on all Storm nodes or have elasticsearch-hadoop part of the jar being deploy (which we recommend). The latter approach allows isolation between the jobs and since the jar is self-contained, can be easily be moved across environments without additional setup making it much more robust.

Configurationedit

The Storm integration supports the configuration options described in the Configuration chapter plus a few more that are specific to Storm, namely:

Spout specificedit

es.storm.spout.reliable (default false)
Indicates whether the dedicated EsSpout is reliable, that is replays the documents in case of failure or not. By default it is set to false since replaying requires the documents to be kept in memory until are being acknowledged.
es.storm.spout.fields (default "")
Specify what fields EsSpout will declare in its topology and extract from the returned documents. By default is unset meaning the documents are returned as Maps under the default field doc.
es.storm.spout.reliable.queue.size (default 0)
Applicable only if es.storm.spout.reliable is true. Sets the size of the queue which holds documents in memory to be replayed until they are acknowledged. By default, the queue is unbounded (0) however in a production environment it is indicated to limit the queue to limit the consumption of memory. If the queue is full, the Bolt drops any incoming Tuples and throws an exception.
es.storm.spout.reliable.retries.per.tuple (default 5)
Applicable only if es.storm.spout.reliable is true. Set the number of retries (replays) of a failed tuple before giving up. Setting it to a negative value will cause the tuple to be replayed until acknowledged.
es.storm.spout.reliable.handle.tuple.failure (default abort)

Applicable only if es.storm.spout.reliable is true. Indicates how to handle failing tuples after the number of retries is depleted. Possible values are :

ignore
the tuple is discarded
warn
a warning message is logged and the tuple is discarded
strict
an exception is thrown, aborting the current job

Bolt specificedit

es.storm.bolt.write.ack (default false)
Indicates whether the dedicated EsBolt is reliable, that is acknowledges the Tuple after it was written to Elasticsearch or the instance it receives it. By default it is false. Note that turning this on increases the memory requirements of the Bolt since it has to keep the data in memory until it is fully written.
es.storm.bolt.flush.entries.size (default 1000)
The number of entries that trigger a micro-batch write to Elasticsearch. By default, it uses the same value as es.batch.size.entries which, by default is 1000.
es.storm.bolt.tick.tuple.flush (default true)
Whether or not to flush the existing data if the Bolt receives a Tick tuple. This heart-beat-like mechanism goes hand in hand with the flush limit above to create both a time and size trigger. When using Storm’s internal ticks, remember to set the tick interval:
// tick every 2 seconds
builder.setBolt(...).addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 2)

Setting the configurationedit

The configuration can be set through Storm’s Config class, in which case they are available globally or, individually for each EsSpout and EsBolt instance. In general, it is recommended to set globally only the properties that apply to all the components and are unlikely to change:

Config conf = new Config();
conf.put("es.index.auto.create", "true");
StormSubmitter.submitTopology("myTopology", conf, topology);

For this reason, typically, one should use the per-component configuration model since it allows different configurations to be used within the same Storm topology:

Map conf = new HashMap();
conf.put("es.index.auto.create", "true");
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("esSpout", new EsSpout("index/type", conf));

Writing data to Elasticsearchedit

Through elasticsearch-hadoop, Elasticsearch is exposed to Storm through a native Bolt, namely org.elasticsearch.storm.EsBolt that writes the Storm Tuples to Elasticsearch:

import org.elasticsearch.storm.EsBolt; 

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 10);
builder.setBolt(
    "es-bolt",
    new EsBolt(
        "storm/docs"
    ),
    5)
    .shuffleGrouping("spout");

elasticsearch-hadoop EsBolt package import

EsBolt declaration

Various constructors are available for EsBolt - at least the Elasticsearch resource under which the data is indexed, is required

The number of EsBolt instances for this topology

The number of bolt instances depends highly on your topology and environment. In general a good rule of thumb is to take into account the number of the target index shards as well as the number of spouts sending data to it - a good formula is to take the minimum between the source spouts and the index shards; in this example 5. A high number of Bolts does not translate to a bigger through-put - make sure the Bolts are the bottleneck since increasing the number simply translates otherwise to wasted cycles.

For cases where the id (or other metadata fields like ttl or timestamp) of the document needs to be specified, one can do so by setting the appropriate mapping namely es.mapping.id. Thus assuming the documents contain a field called sentenceId which is unique and is suitable for an identifier, one can update the job configuration as follows:

Map conf = new HashMap();
conf.put("es.mapping.id", "sentenceId");
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("esSpout", new EsSpout("index/type", conf));

Writing existing JSON to Elasticsearchedit

If the data passed to Storm is already in JSON format, EsBolt can pass it directly to Elasticsearch without any transformation; the data is taken as is and sent over the wire. In such cases, one needs to indicate the JSON input by setting the es.input.json parameter to true. Further more, the Bolt expects the receiving Tuple to contain only one value/field representing the JSON document. By default, common textual types are recognized, such as chararray or bytearray; otherwise it falls back to calling toString to get a hold of the JSON content.

String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}";  
String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}";

Map conf = new HashMap();
conf.put("es.input.json", "true"); 

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("json-spout", new StringSpout(Arrays.asList(json1, json2)); 
builder.setBolt("es-bolt", new EsBolt("storm/json-trips", conf)) 
                                    .shuffleGrouping("json-spout");

JSON document represented as a String

Option indicating the input is in JSON format

Basic Spout which replays the given Strings as Tuples with only one value

Configure EsBolt to process JSON - the same setting can be passed through the global Conf object however it is typically convenient to define it locally

Writing to dynamic/multi-resourcesedit

In cases where the data needs to be indexed based on its content, one can choose the target index based on a Tuple field. Reusing the aforementioned media example, one can partition the documents based on their type. Assuming the document tuple contains fields media_type, title and year one can index them as follows:

builder.setBolt("es-bolt",
    new EsBolt("my-collection/{media_type}") 
).shuffleGrouping("spout");

Resource pattern using field type

For each tuple about to be written, elasticsearch-hadoop will extract the type field and use its value to determine the target resource. The functionality is also available when dealing with raw JSON - in this case, the value will be extracted from the JSON document itself.

The functionality is also available when dealing with raw JSON - in this case, the value will be extracted from the JSON document itself. Assuming the JSON source contains documents with the following structure:

{
    "media_type":"game",
    "title":"Final Fantasy VI",
    "year":"1994"
}

field within the JSON document that will be used by the pattern

the EsBolt with the configuration:

Map conf = new HashMap();
conf.put("es.input.json", "true"); 

builder.setBolt("es-bolt",
    new EsBolt(
        "my-collection-{year}/{media_type}",
        conf) 
    ).shuffleGrouping("spout");

Option indicating the input is in JSON format

Resource pattern - notice how the pattern is used both in the index and the type

Pass configuration to EsBolt to indicate the JSON input

Reading data from Elasticsearchedit

As you can expect, for reading data (typically executing queries) elasticsearch-hadoop offers a dedicated Spout through org.elasticsearch.storm.EsSpout which executes the query in Elasticsearch and streams the results back to Apache Storm:

import org.elasticsearch.storm.EsSpout; 

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(
    "es-spout",
    new EsSpout(      
        "storm/docs", 
        "?q=me*),     
    5);               
builder.setBolt("bolt", new PrinterBolt()).shuffleGrouping("es-spout");

elasticsearch-hadoop EsSpout package import

EsSpout declaration

The source Elasticsearch resource (index and type) for the data

The query to execute (optional) - if no query is specified, the entire indexed data is streamed

The number of EsSpout instances for this topology. The number should not be greater than the number of shards available for an index; if it does, it just wastes CPU cycles without improving performance.

The number of Spout instances depends highly on your topology and environment. Typically you should use the number of shards of your target data as an indicator - if you index has 5 shards, create 5 EsSpouts; however sometimes the shards number might be considerably bigger than the number of Spouts you can add to your Apache Storm cluster; in that case, it is better to limit the number of EsSpout instances. Last but not least, adding more EsSpout instances than the number of shards of the source index does not improve performance; in fact the extra instances will just waste resources without processing anything.

Customizing EsSpout fieldsedit

Since Storm requires each Spout to declare its fields when creating a topology, by default EsSpout declares for its tuples a generic doc field containing the documents returned (one per tuple) from Elasticsearch. When dealing with structured data (documents sharing the same fields), one can configure the EsSpout to declare as fields the document properties effectively unwrapping the document as a Tuple. By setting up es.storm.spout.fields, EsSpout will use them indicate to the Storm topology the tuple content and extract them from the returned document.

For example if the Elasticsearch documents contain 3 fields: name, age and gender by setting es.storm.spout.fields to name, age, gender, instead of returning a tuple with one field (doc, containing the document), a tuple containing the three named fields (name, age and gender) will be returned instead.