Apache Pig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs.
|-- Pig website|
It provides a high-level, powerful, scripting-like transformation language which gets compiled into Map/Reduce jobs at runtime by the Pig compiler. To simplify working with arbitrary data, Pig associates a schema (or type information) with each data set for validation and performance. This in turn, breaks it down into discrete data types that can be transformed through various operators or custom functions (or UDFs). Data can be loaded from and stored to various storages such as the local file-system or HDFS, and with elasticsearch-hadoop into Elasticsearch as well.
In order to use elasticsearch-hadoop, its jar needs to be in Pig’s classpath. There are various ways of making that happen though typically the REGISTER command is used:
The command expects a proper URI that can be found either on the local file-system or remotely. Typically it’s best to use a distributed file-system (like HDFS or Amazon S3) and use that since the script might be executed on various machines.
As an alternative, when using the command-line, one can register additional jars through the
-Dpig.additional.jars option (that accepts an URI as well):
$ pig -Dpig.additional.jars=/path/elasticsearch-hadoop.jar:<other.jars> script.pig
or if the jars are on HDFS
$ pig \ -Dpig.additional.jars=hdfs://<cluster-name>:<cluster-port>/<path>/elasticsearch-hadoop.jar:<other.jars> script.pig
With Pig, one can specify the configuration properties (as an alternative to Hadoop
Configuration object) as a constructor parameter when declaring
elasticsearch-hadoop configuration (target resource)
elasticsearch-hadoop option (http timeout)
another elasticsearch-hadoop configuration (disable automatic index creation)
To avoid having to specify the fully qualified class name (
org.elasticsearch.hadoop.pig.EsStorage), consider using a shortcut through
DEFINE EsStorage org.elasticsearch.hadoop.pig.EsStorage();
Do note that it is possible (and recommended) to specify the configuration parameters to reduce script duplication, such as
DEFINE EsStorage org.elasticsearch.hadoop.pig.EsStorage('my.cfg.param=value');
Pig definitions are replaced as are; even though the syntax allows parametrization, Pig will silently ignore any parameters outside the
Tuple field namesedit
Among the various types available in Pig,
tuples are used the most. Tuples are defined as “ordered sets of fields” (e.g.
(19,2)) however structurally they are shaped
as ordered maps since each field has a name, which may be defined or not (e.g.
(field:19, another:2)). The ordered aspect is important and forces elasticsearch-hadoop to use JSON arrays for tuples (using JSON objects is not an option as it does not preserve ordering besides the fact that it requires keys/names which might be or not available in a tuple).
Obeying the rule of least surprise, elasticsearch-hadoop by default will disregard a tuple’s field names, both when writing and reading.
To change this behavior (which in effect means treating tuples as arrays of maps instead of arrays), use the boolean property
es.mapping.pig.tuple.use.field.names (by default
false) and set it to
The table below illustrates the difference between the two settings:
|Tuple schema||Tuple value||Resulting JSON representation|
When using tuples, it is highly recommended to create the index mapping before-hand as it is quite common for tuples to contain mixed types (numbers, strings, other tuples, etc…) which, when mapped as an array (the default) can cause parsing errors (as the automatic mapping can infer the fields to be numbers instead of strings, etc…). In fact, the example above falls in this category since the tuple contains both a number (
1) and a string (
"kimchy"), which will the auto-detection to map both
bar as a number and thus causing an exception when encountering
"kimchy". Please refer to this for more information.
flattening the tuple into primitive/data atoms before sending the data off to Elasticsearch.
By default, Pig will only use one reducer per job which in most cases is inefficient. To address these issue:
- Use the Parallel Features
As explained in the reference docs, out of the box Pig expects each reducer to process about 1 GB of data; unfortunately if the data is scattered
around the network this becomes inefficient as the entire job is effectively serialized. Change this by increasing the number of reducers to map that of your shards through the
-- launch the Map/Reduce job with 5 reducers SET default_parallel 5;
or by using the
PARALLEL keyword with
B = GROUP A BY t PARALLEL 18;
- Disable split combination
- Out of the box Pig over-eagerly combines its input splits even if it does not know how big they are. This again kills parallelism since it serializes the queries to Elasticsearch ; typically this looks as follows in the logs:
20yy-mm-dd hh:mm:ss,mss [JobControl] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 25 20yy-mm-dd hh:mm:ss,mss [JobControl] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 1
Avoid this by setting
true (one can also use
false however we recommend the former) either by setting the property before invoking the script:
pig -Dpig.noSplitCombination=true myScript.pig
in the Pig script itself:
SET pig.noSplitCombination TRUE;
or through the global
pig.properties configuration in your Pig install:
Unfortunately elasticsearch-hadoop cannot set these properties automatically so the user has to do that manually per script or making them global through the Pig configuration as described above.
Out of the box, elasticsearch-hadoop uses the Pig schema to map the data in Elasticsearch, using both the field names and types in the process. There are cases however when the names in Pig cannot
be used with Elasticsearch (invalid characters, existing names with different layout, etc…). For such cases, one can use the
es.mapping.names setting which accepts a comma-separated list of mapped names in the following format:
Pig field name :
Elasticsearch field name
name mapping for two fields
Since elasticsearch-hadoop 2.1, the Pig schema case sensitivity is preserved to Elasticsearch and back.
Writing data to Elasticsearchedit
Elasticsearch is exposed as a native
Storage to Pig so it can be used to store data into it:
-- load data from HDFS into Pig using a schema A = LOAD 'src/test/resources/artists.dat' USING PigStorage() AS (id:long, name, url:chararray, picture: chararray); -- transform data B = FOREACH A GENERATE name, TOTUPLE(url, picture) AS links; -- save the result to Elasticsearch STORE B INTO 'radio/artists' USING org.elasticsearch.hadoop.pig.EsStorage();
Elasticsearch resource (index and type) associated with the given storage
additional configuration parameters can be passed here - in this case the defaults are used
For cases where the id (or other metadata fields like
timestamp) of the document needs to be specified, one can do so by setting the appropriate mapping, namely
es.mapping.id. Following the previous example, to indicate to Elasticsearch to use the field
id as the document id, update the
STORE B INTO 'radio/artists USING org.elasticsearch.hadoop.pig.EsStorage('es.mapping.id=id'...);
Writing existing JSON to Elasticsearchedit
When the job input data is already in JSON, elasticsearch-hadoop allows direct indexing without applying any transformation; the data is taken as is and sent directly to Elasticsearch. In such cases, one needs to indicate the json input by setting
es.input.json parameter. As such, in this case elasticsearch-hadoop expects to receive a tuple with a single field (representing the JSON document); the library will recognize common textual types such as
bytearray otherwise it just calls
toString to get a hold of the JSON content.
Table 4. Pig types to use for JSON representation
use this when the JSON data is represented as a
use this if the JSON data is represented as a
make sure the
Make sure the data is properly encoded, in
UTF-8. The field content is considered the final form of the document sent to Elasticsearch.
Writing to dynamic/multi-resourcesedit
One can index the data to a different resource, depending on the row being read, by using patterns. Reusing the aforementioned media example, one could configure it as follows:
Tuple field used by the resource pattern. Any of the declared fields can be used.
Resource pattern using field
For each tuple about to be written, elasticsearch-hadoop will extract the
type field and use its value to determine the target resource.
The functionality is also available when dealing with raw JSON - in this case, the value will be extracted from the JSON document itself. Assuming the JSON source contains documents with the following structure:
the table declaration can be as follows:
Schema declaration for the tuple. Since JSON input is used, the schema is simply a holder to the raw data
Resource pattern relying on fields within the JSON document and not on the table schema
Reading data from Elasticsearchedit
As you would expect, loading the data is straight forward:
Due to a bug in Pig,
LoadFunctions are not aware of any schema associated with them. This means
EsStorage is forced to fully parse the documents
from Elasticsearch before passing the data to Pig for projection. In practice, this has little impact as long as a document top-level fields are used; for nested fields consider extracting the values
yourself in Pig.
Reading data from Elasticsearch as JSONedit
In the case where the results from Elasticsearch need to be in JSON format (typically to be sent down the wire to some other system), one can instruct elasticsearch-hadoop to return the data as is. By setting
true, the connector will parse the response from Elasticsearch, identify the documents and, without converting them, return their content to the user as
If automatic index creation is used, please review this section for more information.
Pig internally uses native java types for most of its types and elasticsearch-hadoop abides to that convention.
|Pig type||Elasticsearch type|
Available in Pig 0.10 or higher
Available in Pig 0.11 or higher
Available in Pig 0.12 or higher
While Elasticsearch understands the Pig types up to version 0.12.1, it is backwards compatible with Pig 0.9
It is worth mentioning that rich data types available only in Elasticsearch, such as
GeoShape are supported by converting their structure into the primitives available in the table above. For example, based on its storage a
geo_point might be
returned as a
chararray or a