Apache Spark supportedit

Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala and Python, and an optimized engine that supports general execution graphs.

-- Spark website

Spark provides fast iterative/functional-like capabilities over large data sets, typically by caching data in memory. As opposed to the rest of the libraries mentioned in this documentation, Apache Spark is computing framework that is not tied to Map/Reduce itself however it does integrate with Hadoop, mainly to HDFS.

Installationedit

Just like other libraries, elasticsearch-hadoop needs to be available in Spark’s classpath. As Spark has multiple deployment modes, this can translate to the target classpath, whether it is on only one node (as is the case with the local mode - which will be used through-out the documentation) or per-node depending on the desired infrastructure.

Configurationedit

Through elasticsearch-hadoop, Spark can integrate with Elasticsearch through its dedicated InputFormat, and in case of writing, through OutputFormat. These are described at length in the Map/Reduce chapter so please refer to that for an in-depth explanation.

In short, one needs to setup a basic Hadoop Configuration object with the target Elasticsearch cluster and index, potentially a query, and she’s good to go.

From Spark’s perspective, they only thing required is setting up serialization - Spark relies by default on Java serialization which is convenient but fairly inefficient. This is the reason why Hadoop itself introduced its own serialization mechanism and its own types - namely Writables. As such, InputFormat and OutputFormats are required to return Writables which, out of the box, Spark does not understand. The good news is, one can easily enable a different serialization (Kryo) which handles the conversion automatically and also does this quite efficiently.

SparkConf sc = new SparkConf(); //.setMaster("local");
sc.set("spark.serializer", KryoSerializer.class.getName()); 

// needed only when using the Java API
JavaSparkContext jsc = new JavaSparkContext(sc);

Enable the Kryo serialization support with Spark

Or if you prefer Scala

val sc = new SparkConf(...)
sc.set("spark.serializer", classOf[KryoSerializer].getName)    

Enable the Kryo serialization support with Spark

Note that the Kryo serialization is used as a work-around for dealing with Writable types; one can choose to convert the types directly (from Writable to Serializable types) - which is fine however for getting started, the one liner above seems to be the most effective.

Reading data from Elasticsearchedit

To read data, simply pass in the org.elasticsearch.hadoop.mr.EsInputFormat class - since it supports both the old and the new Map/Reduce APIs, you are free to use either method on SparkContext's, hadoopRDD (which we recommend for conciseness reasons) or newAPIHadoopRDD. Which ever you chose, stick with it to avoid confusion and problems down the road.

Old (org.apache.hadoop.mapred) APIedit

JobConf conf = new JobConf();                   
conf.set("es.resource", "radio/artists");       
conf.set("es.query", "?q=me*");                 

JavaPairRDD esRDD = jsc.hadoopRDD(conf, EsInputFormat.class,
                                        Text.class, MapWritable.class); 
long docCount = esRDD.count();

Create the Hadoop object (use the old API)

Configure the source (index)

Setup the query (optional)

Create a Spark RDD on top of Elasticsearch through EsInputFormat - the key represent the doc id, the value the doc itself

The Scala version is below:

val conf = new JobConf()                                
conf.set("es.resource", "radio/artists")                
conf.set("es.query", "?q=me*")                          
val esRDD = sc.hadoopRDD(conf, classOf[EsInputFormat[Text, MapWritable]],     
                               classOf[Text], classOf[MapWritable]))
val docCount = esRDD.count();

Create the Hadoop object (use the old API)

Configure the source (index)

Setup the query (optional)

Create a Spark RDD on top of Elasticsearch through EsInputFormat

New (org.apache.hadoop.mapreduce) APIedit

As expected, the mapreduce API version is strikingly similar - replace hadoopRDD with newAPIHadoopRDD and JobConf with Configuration. That’s about it.

Configuration conf = new Configuration();                   
conf.set("es.resource", "radio/artists");       
conf.set("es.query", "?q=me*");                 

JavaPairRDD esRDD = jsc.newAPIHadoopRDD(conf, EsInputFormat.class,
                                              Text.class, MapWritable.class); 
long docCount = esRDD.count();

Create the Hadoop object (use the new API)

Configure the source (index)

Setup the query (optional)

Create a Spark RDD on top of Elasticsearch through EsInputFormat - the key represent the doc id, the value the doc itself

The Scala version is below:

val conf = new Configuration()                          
conf.set("es.resource", "radio/artists")                
conf.set("es.query", "?q=me*")                          
val esRDD = sc.newAPIHadoopRDD(conf, classOf[EsInputFormat[Text, MapWritable]],     
                                  classOf[Text], classOf[MapWritable]))
val docCount = esRDD.count();

Create the Hadoop object (use the new API)

Configure the source (index)

Setup the query (optional)

Create a Spark RDD on top of Elasticsearch through EsInputFormat