News

Elasticsearch for Apache Hadoop 2.1 GA: Spark, Storm and More

Elasticsearch for Apache Hadoop, affectionately known as ES-Hadoop, enables Hadoop users and data-hungry businesses to enhance their work-flows with a full-blown search and analytics engine, in real-time. We are thrilled to announce the GA release of ES-Hadoop 2.1.

This GA release has been the result of 4 successful Betas: Beta1, Beta2, Beta3, Beta4, plus RC1, over the last 10 months. We would like to thank the community and all our users for their valuable feedback during the Beta process.

Version 2.1 embraces the emerging, real-time components in the Hadoop ecosystem, (in particular Apache Spark and Storm), beefs up security support by adding SSL/TLS transport encryption with both HTTP and PKI authentication, and introduces a YARN module. All while preserving backwards compatibility, extending the number of supported Hadoop runtimes, and gaining new certifications. Plus, we're still bringing you all this functionality with only a single JAR file to download, no dependencies required.

In fact, upgrading to 2.1 is simply a matter of updating the ES-Hadoop JAR. However, to help those with longer release cycles, we have also released ES-Hadoop 2.0.3, the last maintenance release for the 2.0.x branch. It contains only important bug-fixes without introducing any new features.

Native Integration with Spark and Spark SQL

Since its announcement, Apache Spark has taken the 'Big Data' world by storm. While ES-Hadoop 2.0 provides support for Spark through its Map/Reduce functionality, 2.1 goes beyond that, providing native Java and Scala APIs tightly integrated with Spark Core and Spark SQL. This means one can easily index and analyze data through Elasticsearch in real-time directly from Spark. Simply put, through ES-Hadoop, Spark treats Elasticsearch indexes as RDDs (Resilient Distributed Dataset) and DataFrames.

Furthermore, the connector can take any Spark RDD and index it or run SQL queries on top Elasticsearch indexes.

In good ol' ES-Hadoop tradition, the developer never has to leave her environment; the same Spark API, conventions and data types work transparently on top of Elasticsearch.

import org.elasticsearch.spark._   
val sc = new SparkContext(conf)         
// collection of data
val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
sc.makeRDD(Seq(numbers, airports)).saveToEs("spark/docs")
// case classes
case class Trip(departure: String, arrival: String, days: long)               
val upcomingTrip = Trip("OTP", "SFO", 10)
val lastWeekTrip = Trip("MUC", "OTP", 3)
val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))           
sc.saveToEs(rdd, "spark/trips")

Also available in Spark SQL as DataFrame (or <code>SchemaRDD for those on Spark 1.1-1.2):

// as a DataFrame
val df = sqlContext.read().format("org.elasticsearch.spark.sql").load("spark/trips")
df.printSchema()
// root
//|-- departure: string (nullable = true)
//|-- arrival: string (nullable = true)
//|-- days: long (nullable = true)
val filter = df.filter(df("arrival").equalTo("OTP").and(df("days").gt(3))

And also in pure SQL:

CREATE TEMPORARY TABLE trips USING org.elasticsearch.spark.sql OPTIONS (path "spark/trips")
SELECT departure FROM trips WHERE arrival = "OTP" and days > 3

The integration is not only elegant and out of the developer's way, but also quite efficient; ES-Hadoop connector pushed down all of Spark SQL queries translating them into Query DSL, thus effectively executing the queries directly on Elasticsearch and leveraging its lightning fast search capabilities; both of the queries above are translated by the connector at runtime into:

{
  "query" : {
    "filtered" : {
      "query" : {
        "match_all" : {}
      },
      "filter" : {
        "and" : [{
            "query" : {
              "match" : {
                "arrival" : "OTP"
              }
            }
          }, {
            "days" : {
              "gt" : 3
            }
          }
        ]
      }
    }
  }
}

Note that as with all the other integrations, through ES-Hadoop all the reads and writes in the example above are executed in parallel across the index shards, for what we call partition-to-partition architecture.

2.1 supports Spark 1.0 through 1.4, in other words all stable releases currently known, so whatever Spark version is targeted, ES-Hadoop can be used right away.

Storm Integration

In addition to Apache Spark, ES-Hadoop 2.1 also provides native integration with Apache Storm, exposing Elasticsearch as Spout stream search results or as a `Bolt' indexing Tuples flowing through your topology and making them accessible for analysis immediately.

TopologyBuilder builder = new TopologyBuilder();
builder.setBolt("esBolt", new EsBolt("twitter/tweets"));

Executing queries in Elasticsearch for Storm is yet another one-liner:

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("es-spout", new EsSpout("twitter/tweets", "?q=nfl*), 5);
builder.setBolt("bolt", new PrinterBolt()).shuffleGrouping("es-spout");

Under the covers, ES-Hadoop uses its parallelized infrastructure to map the Spout and Bolt instances across the index shards.

Low-latency/high-performance patterns like micro-batching and tick-tuples are supported to provide excellent throughput out of the box and closely integrate the real-time capabilities of Storm and Elasticsearch.

Security enhancements

2.1 introduces official support for cryptographic connections between Elasticsearch and Hadoop clusters through SSL/TLS, enabling data-sensitive environments to transparently encrypt data at transport level, thus prevent snooping and preserving data confidentiality. Furthermore, in addition to HTTP authentication, 2.1 introduces Public Key Infrastructure (PKI) for encrypting the authentication process as well.

Elasticsearch on YARN

Another major addition in 2.1 is the introduction of the Elasticsearch-on-YARN (aka ES-Yarn) project for running an Elasticsearch cluster within a YARN environment. Similar to the repository-hdfs plugin, ES-Yarn is distributed as part of the ES-Hadoop project, but is independent and has no dependencies outside YARN itself.

With ES-Yarn, one can now provision, start and stop Elasticsearch directly on a YARN cluster. In YARN lingo, es-yarn bootstraps a client that deploys a dedicated ApplicationManager in YARN which, on its behalf, creates one container for each Elasticsearch node required.

For the user, es-yarn is a straight-forward CLI (Command-Line Interface) for deploying and managing the life cycle of the Elasticsearch cluster within YARN.

Simply download elasticsearch-yarn-2.1.jar and run:

$ hadoop jar elasticsearch-yarn-2.1.jar
No command specified
Usage:
 -download-es  : Downloads Elasticsearch.zip
 -install      : Installs/Provisions Elasticsearch-YARN into HDFS
 -install-es   : Installs/Provisions Elasticsearch into HDFS
 -start        : Starts provisioned Elasticsearch in YARN
 -status       : Reports status of Elasticsearch in YARN
 -stop         : Stops Elasticsearch in YARN
 -help         : Prints this help
Configuration options can be specified _after_ each command; see the documentation for more information.

Enhanced Functionality

2.1 introduces a number of enhancements to existing features such as:

  • JSON results - Documents from Elasticsearch are returned in raw format, as JSON documents. (As an implementation note the data from Elasticsearch is passed to the client in verbatim form, without any processing).
  • Document metadata - In addition to its content, a document's metadata can be now returned without any extra network cost.
  • Inclusion/Exclusion of fields - On the mapping front, it is now possible to specify what fields to be included or excluded for data about to be written to Elasticsearch. This makes it quite handy not only for doing quick transformation of the data but also specifying document metadata without storing it.
  • Client-node routing - For clusters in restrained environments, it is now possible to use the connector through client nodes only. That is, rather than accessing the cluster data nodes directly, the connector will use the client nodes instead (which do need to have the HTTP(S) port opened) and ask those to do the work on its behalf. This change will impact parallelism as the connector will not communicate directly with the nodes. However, the performance penalty is insignificant unless a lot of data is read/written and when locality is not of importance.

Certifications

While certifications are not as cool as talking about new features, we know customers love peace of mind as much as we do. We increased the number of supported platforms in 2.1, reaching out to our partners to make sure ES-Hadoop works properly out of the box. We are happy to report that 2.1 is certified with CDH 5.x, HDP 2.x, and MapR 4.x, as well as being Databricks Spark certified.

We look forward to your feedback on Elasticsearch Hadoop 2.1 GA! You can find the binaries available on the download page, the sources on GitHub and the new features are explained in the reference documentation. As always, you can file bugs or feature requests on our issue tracker and ask questions on the forum or IRC.