Cascading supportedit

Cascading is a data processing API and processing query planner used for defining, sharing, and executing data-processing workflows on a single computing node or distributed computing cluster.

-- Cascading website

Cascading abstracts the Map/Reduce API and focuses on data processing in terms of tuples flowing through pipes between taps, from input (called SourceTap) to output (named SinkTap). As the data flows, various operations are applied to the tuple; the whole system being transformed to Map/Reduce operations at runtime. With elasticsearch-hadoop, Elasticsearch can be plugged into Cascading flows as a SourceTap or SinkTap through EsTap, data to/from Elasticsearch being transparently converted from/to Cascading `tuple`s.

Installationedit

Just like other libraries, elasticsearch-hadoop needs to be available in the jar classpath (either by being manually deployed in the cluster or shipped along with the Hadoop job).

Configurationedit

Global configurationedit

Cascading is configured through a Map<Object, Object>, typically a Properties object which indicates the various Cascading settings and also the application jar:

Properties props = new Properties();
AppProps.setApplicationJarClass(props, Main.class);
FlowConnector flow = new HadoopFlowConnector(props);

elasticsearch-hadoop options can be specified in the same way, these being picked up automatically by all `EsTap`s down the flow:

Properties props = new Properties();
props.setProperty("es.index.auto.create", "false"); 
...
FlowConnector flow = new HadoopFlowConnector(props);

set elasticsearch-hadoop option

This approach can be used for local and remote/Hadoop flows - simply use the appropriate FlowConnector.

Per-Tap configurationedit

If a flow contains multiple Elasticsearch taps, the global approach does not work since the settings will clash with each other. For these scenario, elasticsearch-hadoop allows using per-Tap configuration:

Tap books = new EsTap("es-server", 9200, "my-col/books", "?q=potter");
Tap movies = new EsTap("es-server", 9200, "my-col/movies", "?q=terminator");

Note that the Tap configuration is merged with the global one so one can mix and match accordingly - for example specify the defaults in the global configuration and only declare the specifics on the Tap instance. Additionally, for maximum flexibility EsTap allows multiple arguments to be passed in, including a Properties object for the full range of options.

Mappingedit

By default, elasticsearch-hadoop uses the Cascading tuple to map the data in Elasticsearch, using both the field names and types in the process. There are cases however when the field names cannot be used directly with Elasticsearch (a common case when working with an existing flow). For such cases, one can use the es.mapping.names setting which accepts a comma-separated list of mapped names in the following format: Cascading field name:Elasticsearch field name

To wit:

Properties myTapCfg = new Properties();
myTapCfg.set("es.mapping.names", "date:@timestamp");	
Tap myTap = new EsTap(..., myTapCfg);

Maps Cascading field date in Elasticsearch to @timestamp

Since elasticsearch-hadoop 2.1, the connector preserves the tuple names case sensitivity.

Writing data to Elasticsearchedit

Simply hook EsTap into the Cascading flow:

Tap in = new Lfs(new TextDelimited(new Fields("id", "name", "url", "picture")),
                 "/resources/artists.dat");
Tap out = new EsTap("radio/artists", 
                    new Fields("name", "url", "picture")); 
new HadoopFlowConnector().connect(in, out, new Pipe("write-to-Es")).complete();

elasticsearch-hadoop resource (index and type)

Cascading tuple declaration

For cases where the id (or other metadata fields like ttl or timestamp) of the document needs to be specified, one can do so by setting the appropriate mapping, namely es.mapping.id. Following the previous example, to indicate to Elasticsearch to use the field id as the document id, update the Tap configuration:

Properties myTapCfg = new Properties();
myTapCfg.set("es.mapping.id", "id");

Writing existing JSON to Elasticsearchedit

When the job input data is already in JSON, elasticsearch-hadoop allows direct indexing without applying any transformation; the data is taken as is and sent directly to Elasticsearch. In such cases, one needs to indicate the json input by setting the es.input.json parameter. As such, in this case elasticsearch-hadoop expects to receive a tuple with a single field (representing the JSON document); the library will recognize Text or BytesWritable types otherwise it just calls toString to get a hold of the JSON content.

Make sure the data is properly encoded, in UTF-8. The job output is considered the final form of the document sent to Elasticsearch.

Properties props = new Properties();
...
props.setProperty("es.input.json", "true");                                   
Tap in = new Lfs(new TextLine(new Fields("line")),"/resources/artists.json"); 
Tap out = new EsTap("json-cascading-local/artists");
FlowConnector flow = new HadoopFlowConnector(props);
flow.connect(in, out, new Pipe("import-json")).complete();

Indicate the input is of type JSON

Load the (JSON) data as a single field (line)

Writing to dynamic/multi-resourcesedit

One can index the data to a different resource, depending on the tuple being read, by using patterns. Reusing the aforementioned media example, one could configure it as follows:

Tap out = new EsTap("my-collection-{media.type}/doc", 
                    new Fields("name", "media.type", "year")); 

Resource pattern using field media.type

Schema definition associated with the Tap. Any of the declared fields can be used (example uses media.type)

For each tuple about to be written, elasticsearch-hadoop will extract the media.type entry and use its value to determine the target resource.

The functionality is available when dealing with raw JSON as well - in this case, the value will be extracted from the JSON document itself. Assuming the JSON source contains documents with the following structure:

{
    "media_type":"book", 
    "title":"Harry Potter",
    "year":"2010"
}

field within the JSON document that will be used by the pattern

the Tap declaration can be as follows:

props.setProperty("es.input.json", "true");
Tap in = new Lfs(new TextLine(new Fields("line")),"/archives/collection.json");
Tap out = new EsTap("my-collection-{media_type}/doc", 
                    new Fields("line")); 

Resource pattern relying on fields within the JSON document and not on the Tap schema

Schema declaration for the Tap. Since JSON input is used, the schema is simply a holder to the raw data

Reading data from Elasticsearchedit

Just the same, add EsTap on the other end of a pipe, to read (instead of writing) to it.

Tap in = new EsTap("radio/artists/", 
                   "?q=me*"); 
Tap out = new StdOut(new TextLine());
new LocalFlowConnector().connect(in, out, new Pipe("read-from-Es")).complete();

elasticsearch-hadoop resource (index and type)

elasticsearch-hadoop query

Type conversionedit

Depending on the platform used, Cascading can use internally either Writable or JDK types for its tuples. Elasticsearch handles both transparently (see the Map/Reduce conversion section) though we recommend using the same types (if possible) in both cases to avoid the overhead of maintaining two different versions.

If automatic index creation is used, please review this section for more information.

Cascading Lingualedit

elasticsearch-hadoop also provides integration with Lingual, a Cascading extension that provides an ANSI SQL interface for Apache Hadoop. That is, one can execute in Hadoop, SQL queries directly on Elasticsearch.

Below is a quick setup of using elasticsearch-hadoop with Lingual (1.1) - for detailed information please refer to the Lingual user guide:

export LINGUAL_PLATFORM=hadoop
# register {es} as a provider
lingual catalog --init
lingual catalog --provider --add ./elasticsearch-hadoop-<version>.jar
# add a custom schema (called 'titles') for querying
lingual catalog --schema es-test --add
lingual catalog --schema es-test --stereotype titles -add \
    --columns emp_no,title,from_date,to_date --types int,string,date,date
lingual catalog --schema es-test --format es --add --provider es
lingual catalog --schema es-test --protocol es --add --provider es \
    --properties=host=es-server
lingual catalog --schema es-test --table titles --stereotype titles \
    -add employees/titles --format es --provider es --protocol es

Once the desired catalog has been declared and elasticsearch-hadoop registered with it, one can start querying the data

lingual shell
(shell) select count(*) from "es-test"."titles" where "title" = 'Engineer';
115003