Configurationedit

elasticsearch-hadoop behavior can be customized through the properties below, typically by setting them on the target job Hadoop Configuration. However some of them can be specified through other means depending on the library used (see the relevant section).

Note

All configuration properties start with the es prefix. The namespace es.internal is reserved by the library for its internal use and should not be used by the user at any point.

Required settingsedit

es.resource
Elasticsearch resource location, where data is read and written to. Requires the format <index>/<type> (relative to the Elasticsearch host/port (see below))).
es.resource = twitter/tweet   # index 'twitter', type 'tweet'
es.resource.read (defaults to es.resource)
Elasticsearch resource used for reading (but not writing) data. Useful when reading and writing data to different Elasticsearch indices within the same job. Typically set automatically (expect for the Map/Reduce module which requires manual configuration).
es.resource.write(defaults to es.resource)
Elasticsearch resource used for writing (but not reading) data. Used typically for dynamic resource writes or when writing and reading data to different Elasticsearch indices within the same job. Typically set automatically (expect for the Map/Reduce module which requires manual configuration).

Note that [multiple](https://www.elastic.co/guide/en/elasticsearch/guide/current/multi-index-multi-type.html) indices and/or types are allowed only for reading. Use _all/types to search types in all indices or index/ to search all types within index. Do note that reading multiple indices/types typically works only when the have the same structure and only with some libraries. Integrations that require a strongly type mapping (such as a table like Hive or SparkSQL) are likely to fail.

Dynamic/multi resource writesedit

For writing, elasticsearch-hadoop allows the target resource to be resolved at runtime by using patterns (by using the {<field-name>} format), resolved at runtime based on the data being streamed to Elasticsearch. That is, one can save documents to a certain index or type based on one or multiple fields resolved from the document about to be saved.

For example, assuming the following document set (described here in JSON for readability - feel free to translate this into the actual Java objects):

{
    "media_type":"game",
    "title":"Final Fantasy VI",
    "year":"1994"
},
{
    "media_type":"book",
    "title":"Harry Potter",
    "year":"2010"
},
{
    "media_type":"music",
    "title":"Surfing With The Alien",
    "year":"1987"
}

to index each of them based on their media_type one would use the following pattern:

# index the documents based on their type
es.resource.write = my-collection/{media_type}

which would result in Final Fantasy VI indexed under my-collection/game, Harry Potter under my-collection/book and Surfing With The Alien under my-collection/music. For more information, please refer to the dedicated integration section.

Important

Dynamic resources are supported only for writing, for doing multi-index/types reads, use an appropriate search query.

Formatting dynamic/multi resource writesedit

When using dynamic/multi writes, one can also specify a formatting of the value returned by the field. Out of the box, elasticsearch-hadoop provides formatting for date/timestamp fields which is useful for automatically grouping time-based data (such as logs) within a certain time range under the same index. By using the Java SimpleDataFormat syntax, one can format and parse the date in a locale-sensitive manager.

For example assuming the data contains a @timestamp field, one can group the documents in daily indices using the following configuration:

# index the documents based on their date
es.resource.write = my-collection/{@timestamp:YYYY.MM.dd} 

@timestamp field formatting - in this case YYYY.MM.dd

The same configuration property is used (es.resource.write) however, through the special : characters a formatting pattern is specified. Please refer to the SimpleDataFormat javadocs for more information on the syntax supported. In this case YYYY.MM.dd translates the date into the year (specified by four digits), month by 2 digits followed by the day by two digits (such as 2015.01.28).

Logstash users will find this pattern quite familiar.

Essential settingsedit

Networkedit

es.nodes (default localhost)
List of Elasticsearch nodes to connect to. When using Elasticsearch remotely, do set this option. Note that the list does not have to contain every node inside the Elasticsearch cluster; these are discovered automatically by elasticsearch-hadoop by default (see below). Each node can have its HTTP/REST port specified manually (e.g. mynode:9600).
es.port (default 9200)
Default HTTP/REST port used for connecting to Elasticsearch - this setting is applied to the nodes in es.nodes that do not have any port specified.

Queryingedit

es.query (default none)

Holds the query used for reading data from the specified es.resource. By default it is not set/empty, meaning the entire data under the specified index/type is returned. es.query can have three forms:

uri query
using the form ?uri_query, one can specify a query string. Notice the leading ?.
query dsl
using the form query_dsl - note the query dsl needs to start with { and end with } as mentioned here
external resource
if none of the two above do match, elasticsearch-hadoop will try to interpret the parameter as a path within the HDFS file-system. If that is not the case, it will try to load the resource from the classpath or, if that fails, from the Hadoop DistributedCache. The resource should contain either a uri query or a query dsl.

To wit, here is an example:

# uri (or parameter) query
es.query = ?q=costinl

# query dsl
es.query = { "query" : { "term" : { "user" : "costinl" } } }

# external resource
es.query = org/mypackage/myquery.json

In other words, es.query is flexible enough so that you can use whatever search api you prefer, either inline or by loading it from an external resource.

Tip

We recommend using query dsl externalized in a file, included within the job jar (and thus available on its classpath). This makes it easy to identify, debug and organize your queries. Through-out the documentation we use the uri query to save text and increase readability - real-life queries quickly become unwielding when used as uris.

Operationedit

es.input.json (default false)
Whether the input is already in JSON format or not (the default). Please see the appropriate section of each integration for more details about using JSON directly.
es.write.operation (default index)

The write operation elasticsearch-hadoop should peform - can be any of:

index (default)
new data is added while existing data (based on its id) is replaced (reindexed).
create
adds new data - if the data already exists (based on its id), an exception is thrown.
update
updates existing data (based on its id). If no data is found, an exception is thrown.
upsert
known as merge or insert if the data does not exist, updates if the data exists (based on its id).
Note

Added in 2.1.

es.output.json (default false)
Whether the output from the connector should be in JSON format or not (the default). When enabled, the documents are returned in raw JSON format (as returned from Elasticsearch). Please see the appropriate section of each integration for more details about using JSON directly.

Mapping (when writing to Elasticsearch)edit

es.mapping.id (default none)
The document field/property name containing the document id.
es.mapping.parent (default none)
The document field/property name containing the document parent. To specify a constant, use the <CONSTANT> format.
es.mapping.version (default none)
The document field/property name containing the document version. To specify a constant, use the <CONSTANT> format.
es.mapping.version.type (default depends on es.mapping.version)
Indicates the type of versioning used. If es.mapping.version is undefined (default), its value is unspecified. If es.mapping.version is specified, its value becomes external.
es.mapping.routing (default none)
The document field/property name containing the document routing. To specify a constant, use the <CONSTANT> format.
es.mapping.ttl (default none)
The document field/property name containing the document time-to-live. To specify a constant, use the <CONSTANT> format.
es.mapping.timestamp (default none)
The document field/property name containing the document timestamp. To specify a constant, use the <CONSTANT> format.
Note

Added in 2.1.

es.mapping.date.rich (default true)
Whether to create a rich Date like object for Date fields in Elasticsearch or returned them as primitives (String or long). By default this is true. The actual object type is based on the library used; noteable exception being Map/Reduce which provides no built-in Date object and as such LongWritable and Text are returned regardless of this setting.
Note

Added in 2.1.

es.mapping.include (default none)
Field/property to be included in the document sent to Elasticsearch. Useful for extracting the needed data from entities. The syntax is similar to that of Elasticsearch include/exclude. Multiple values can be specified by using a comma. By default, no value is specified meaning all properties/fields are included.
Note

Added in 2.1.

es.mapping.exclude (default none)
Field/property to be excluded in the document sent to Elasticsearch. Useful for eliminating unneeded data from entities. The syntax is similar to that of Elasticsearch include/exclude. Multiple values can be specified by using a comma. By default, no value is specified meaning no properties/fields are excluded.

For example:

# extracting the id from the field called 'uuid'
es.mapping.id = uuid

# specifying a parent with id '123'
es.mapping.parent = <123>

# combine include / exclude for complete control
# include
es.mapping.include = u*, foo.*
# exclude
es.mapping.exclude = *.description

Using the configuration above, each entry will have only its top-level fields, starting with u and nested fields under foo included in the document with the exception of any nested field named description. Additionally the document parent will be 123 while the document id extracted from field uuid.

Metadata (when reading from Elasticsearch)edit

es.read.metadata (default false)
Whether to include the document metadata (such as id and version) in the results or not (default).
es.read.metadata.field (default _metadata)
The field under which the metadata information is placed. When es.read.metadata is set to true, the information is returned as a Map under the specified field.
es.read.metadata.version (default false)
Whether to include the document version in the returned metadata. Applicable only if es.read.metadata is enabled.

Update settings (when writing to Elasticsearch)edit

One using the update or upsert operation, additional settings (that mirror the update API) are available:

es.update.script (default none)
Script used for updating the document.
es.update.script.lang (default none)
Script language. By default, no value is specified applying the node configuration.
es.update.script.params (default none)
Script parameters (if any). The document (currently read) field/property who’s value is used. To specify a constant, use the <CONSTANT> format. Multiple values can be specified through commas (,)

For example:

# specifying 2 parameters, one extracting the value from field 'number', the other containing the value '123':
es.update.script.params = param1:number,param2:<123>
es.update.script.params.json
Script parameters specified in raw, JSON format. The specified value is passed as is, without any further processing or filtering. Typically used for migrating existing update scripts.

For example:

es.update.script.params.json = {"param1":1, "param2":2}
es.update.retry.on.conflict (default 0)
How many times an update to a document is retried in case of conflict. Useful in concurrent environments.

Advanced settingsedit

Indexedit

es.index.auto.create (default yes)
Whether elasticsearch-hadoop should create an index (if its missing) when writing data to Elasticsearch or fail.
es.index.read.missing.as.empty (default no)
Whether elasticsearch-hadoop will allow reading of non existing indices (and return an empty data set) or not (and throw an exception)
es.field.read.empty.as.null (default yes)
Whether elasticsearch-hadoop will treat empty fields as null. This settings is typically not needed (as elasticsearch-hadoop already handles the null case) but is enabled for making it easier to work with text fields that haven’t been sanitized yet.
es.field.read.validate.presence (default warn)

To help out spot possible mistakes when querying data from Hadoop (which results in incorrect data being returned), elasticsearch-hadoop can perform validation spotting missing fields and potential typos. Possible values are :

ignore
no validation is performed
warn
a warning message is logged in case the validation fails
strict
an exception is thrown, halting the job, if a field is missing

The default (warn) will log any typos to the console when the job starts:

WARN main mr.EsInputFormat - Field(s) [naem, adress] not found
   in the Elasticsearch mapping specified; did you mean [name, location.address]?

Networkedit

es.nodes.discovery (default true)
Whether to discovery the nodes within the Elasticsearch cluster or only to use the ones given in es.nodes for metadata queries. Note that this setting only applies during start-up; afterwards when reading and writing, elasticsearch-hadoop uses the target index shards (and their hosting nodes) unless es.nodes.client.only is enabled.
es.nodes.client.only (default false)
Whether to use Elasticsearch client nodes (or load-balancers). When enabled, elasticsearch-hadoop will route all its requests (after nodes discovery, if enabled) through the client nodes within the cluster. Note this typically significantly reduces the node parallelism and thus it is disabled by default. Enabling it also disables es.nodes.data.only (since a client node is a non-data node).
Note

Added in 2.1.2.

es.nodes.data.only (default true)
Whether to use Elasticsearch data nodes only. When enabled, elasticsearch-hadoop will route all its requests (after nodes discovery, if enabled) through the data nodes within the cluster. The purpose of this configuration setting is to avoid overwhelming non-data nodes as these tend to be "smaller" nodes. This is enabled by default.
Note

Added in 2.2.

es.http.timeout (default 1m)
Timeout for HTTP/REST connections to Elasticsearch.
es.http.retries (default 3)
Number of retries for establishing a (broken) http connection. The retries are applied for each conversation with an Elasticsearch node. Once the retries are depleted, the connection will automatically be re-reouted to the next available Elasticsearch node (based on the declaration of es.nodes, followed by the discovered nodes - if enabled).
es.scroll.keepalive (default 10m)
The maximum duration of result scrolls between query requests.
es.scroll.size (default 50)
Number of results/items returned by each individual scroll.
es.action.heart.beat.lead (default 15s)
The lead to task timeout before elasticsearch-hadoop informs Hadoop the task is still running to prevent task restart.
Note

Added in 2.1.

Basic Authenticationedit

es.net.http.auth.user
Basic Authentication user name
es.net.http.auth.pass
Basic Authentication password
Note

Added in 2.1.

SSLedit

es.net.ssl (default false)
Enable SSL
es.net.ssl.keystore.location
key store (if used) location (typically a URL, without a prefix it is interpreted as a classpath entry)
es.net.ssl.keystore.pass
key store password
es.net.ssl.keystore.type (default JKS)
key store type. PK12 is an common, alternative format
es.net.ssl.truststore.location
trust store location (typically a URL, without a prefix it is interpreted as a classpath entry)
es.net.ssl.truststore.pass
trust store password
es.net.ssl.cert.allow.self.signed (default false)
Whether or not to allow self signed certificates
es.net.ssl.protocol(default TLS)
SSL protocol to be used

Proxyedit

es.net.proxy.http.host
Http proxy host name
es.net.proxy.http.port
Http proxy port
es.net.proxy.http.user
Http proxy user name
es.net.proxy.http.pass
Http proxy password
es.net.proxy.http.use.system.props(default yes)
Whether the use the system Http proxy properties (namely http.proxyHost and http.proxyPort) or not
es.net.proxy.socks.host
Http proxy host name
es.net.proxy.socks.port
Http proxy port
es.net.proxy.socks.user
Http proxy user name
es.net.proxy.socks.pass
Http proxy password
es.net.proxy.socks.use.system.props(default yes)
Whether the use the system Socks proxy properties (namely socksProxyHost and socksProxyHost) or not
Note

elasticsearch-hadoop allows proxy settings to be applied only to its connection using the setting above. Take extra care when there is already a JVM-wide proxy setting (typically through system properties) to avoid unexpected behavior.

Serializationedit

es.batch.size.bytes (default 1mb)
Size (in bytes) for batch writes using Elasticsearch bulk API. Note the bulk size is allocated per task instance. Always multiply by the number of tasks within a Hadoop job to get the total bulk size at runtime hitting Elasticsearch.
es.batch.size.entries (default 1000)
Size (in entries) for batch writes using Elasticsearch bulk API - (0 disables it). Companion to es.batch.size.bytes, once one matches, the batch update is executed. Similar to the size, this setting is per task instance; it gets multiplied at runtime by the total number of Hadoop tasks running.
es.batch.write.refresh (default true)
Whether to invoke an index refresh or not after a bulk update has been completed. Note this is called only after the entire write (meaning multiple bulk updates) have been executed.
es.batch.write.retry.count (default 3)
Number of retries for a given batch in case Elasticsearch is overloaded and data is rejected. Note that only the rejected data is retried. If there is still data rejected after the retries have been performad, the Hadoop job is cancelled (and fails). A negative value indicates infinite retries; be careful in setting this value as it can have unwanted side effects.
es.batch.write.retry.wait (default 10s)
Time to wait between batch write retries.
es.ser.reader.value.class (default depends on the library used)
Name of the ValueReader implementation for converting JSON to objects. This is set by the framework depending on the library (Map/Reduce, Cascading, Hive, Pig, etc…​) used.
es.ser.writer.value.class (default depends on the library used)
Name of the ValueWriter implementation for converting objects to JSON. This is set by the framework depending on the library (Map/Reduce, Cascading, Hive, Pig, etc…​) used.