Tech Topics

Writing Your Own Ingest Processor for Elasticsearch

With Elasticsearch 5.0, a new feature called the ingest node was introduced. To quote the official documentation


You can use ingest node to pre-process documents before the actual indexing takes place. This pre-processing happens by an ingest node that intercepts bulk and index requests, applies the transformations, and then passes the documents back to the index or bulk APIs.

So, by defining a pipeline you can configure the way a document should be transformed before it is being indexed. There are a fair share of processors already shipped with Elasticsearch, but it is also very easy to roll your own.

This blog post will show how to write an ingest processor that extracts URLs from a field and stores them in an array. This array could be used to pre-fetch this data, spider it, or to simply display the URLs connected with a text field in your application.

We will also show how easy it is to test your plugin, including a real integration test, when using Gradle.

Creating a plugin skeleton

In order to get started, you could simply check out the Elasticsearch source code and copy-paste another ingest processor plugin over. For example, the ingest-attachment, ingest-user-agent or ingest-geoip plugin) are existing ingest plugins that can be found in Elasticsearch. However, this means you would need to remove a lot of code and only then start coding.

The alternative for this would be to use a tool called cookiecutter. Cookiecutter is a command line tool to create projects from templates. I created such a template to make it easier for you to write your own ingest processor. So, let’s get started and create a plugin from a cookiecutter template. If you have not installed cookiecutter, run

pip install cookiecutter

Now we can start creating our plugin skeleton by running

cookiecutter gh:spinscale/cookiecutter-elasticsearch-ingest-processor

You will be asked four questions. The name of the plugin, a package directory (don’t change it), a description, your name (needed to fill into the license) and the Elasticsearch version to run against. If you pick url-extract as the name of your plugin, all the naming conventions in this document will apply to your local setup as well.

cookiecutter run

If you check out the newly created directory, you will find a fully fledged plugin, which allows you to even run a few tests out of the box.

You can run gradle clean check and see the tests pass. Now our next step should be to add dependencies and write a test.

Note: You are required to run gradle 2.13 to do plugin development. If you are running on mac OS, you can ensure this by running brew switch gradle 2.13.

So, let’s add the following dependency to build.gradle

dependencies {
  compile 'org.nibor.autolink:autolink:0.6.0'
  ...
}

Autolink is a small helper library to easily extract URLs from text.

If you use IntelliJ, you might want to run gradle idea now and then import the project into IntelliJ.

Tests first

Before writing any code, we should not only make up our mind, what the processor is supposed to do, but also write a test reflecting this behaviour and then work on the implementation. To do this, we open up UrlExtractProcessorTests and write a useful test, where an input field contains several URLs and we want an output field to be a list of those URLs.

public void testThatProcessorWorks() throws Exception {
    Map<String, Object> document = new HashMap<>();
    document.put("source_field", "this is a test field pointing to http://elastic.co and http://example.org/foo");
    IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
    UrlExtractProcessor processor = new UrlExtractProcessor(randomAsciiOfLength(10), "source_field", "target_field");
    processor.execute(ingestDocument);
    Map<String, Object> data = ingestDocument.getSourceAndMetadata();
    assertThat(data, hasKey("target_field"));
    assertThat(data.get("target_field"), is(instanceOf(List.class)));
    @SuppressWarnings("unchecked")
    List<String> urls = (List<String>) data.get("target_field");
    assertThat(urls, containsInAnyOrder("http://elastic.co", "http://example.org/foo"));
}

You can now run gradle check to see the test fail or just start UrlExtractProcessorTests in your IDE of choice (minor note, you might need to add the JVM argument -ea to your test runs to make running tests in your IDE work). Now, let’s make the implementation work.

Before we start fixing the implementation, we can quickly also edit the REST test named 20_url-extract_processor.yaml in the src/test/resources/rest-api-spec/test/ingest-url-extract/ directory. Let’s make this test useful as well.

In case you are wondering now, what a REST test is: Those are tests defined in YAML, that are also used by all the official clients to ensure they provide the same functionality. On top of that, they are running against an externally started cluster, when you run gradle integTest.

---
"Test url-extract processor with defaults":
  - do:
      ingest.put_pipeline:
        id: "my_pipeline"
        body:  >
          {
            "description": "_description",
            "processors": [
              {
                "url_extract" : {
                  "field" : "field1",
                  "target_field" : "field2"
                }
              }
            ]
          }
  - match: { acknowledged: true }
  - do:
      index:
        index: test
        type: test
        id: 1
        pipeline: "my_pipeline"
        body: {field1: "My homepage is at https://github.com/spinscale"}
  - do:
      get:
        index: test
        type: test
        id: 1
  - match: { _source.field1: "My homepage is at https://github.com/spinscale" }
  - match: { _source.field2: [ "https://github.com/spinscale" ] }

The great thing about this so called REST test is the fact, that it will run against an external Elasticsearch node with the plugin loaded - just how you would run this plugin in your cluster.

You might have noticed, the UrlExtractProcessorTests is actually a unit test and does not require an Elasticsearch node to be up and running to test it. We spent a lot of time to actually make sure, that processors are easily unit testable. This ensures that you have fast test cycles.

Writing the implementation

Alright, now you are ready to go to write the implementation. If you look at IngestUrlExtractPlugin, you will see that this is the plugin class loaded by Elasticsearch. This class implements IngestPlugin, which requires you to implement a method that specifies which processors should be loaded - so you could also add more than one processor per plugin. The getProcessors requires you to return a map with a factory class, that creates new processor implementations on demand.

You can also remove the YOUR_SETTING variable and the getSettings() method. Both were automatically added by the cookiecutter template, but are not needed for this example. So we basically end up with this class

public class IngestUrlExtractPlugin extends Plugin implements IngestPlugin {
    @Override
    public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
        return MapBuilder.<String, Processor.Factory>newMapBuilder()
                .put(UrlExtractProcessor.TYPE, new UrlExtractProcessor.Factory())
                .immutableMap();
    }
}

On to the UrlExtractProcessor class!

You can see the inline Factory class, that reads the configuration for this processor. Using the readStringProperty method for the field variable makes the configuration of a field to read the data from mandatory. We should also change the targetField assignment to not allow for default fields, but force the user to specify which field to write the data to. The factory now looks like this

public static final class Factory implements Processor.Factory {
    @Override
    public UrlExtractProcessor create(Map<String, Processor.Factory> factories, String tag, Map<String, Object> config)
        throws Exception {
        String field = readStringProperty(TYPE, tag, config, "field");
        String targetField = readStringProperty(TYPE, tag, config, "target_field");
        return new UrlExtractProcessor(tag, field, targetField);
    }
}

Ok, now there is no more other code left than the concrete implementation in UrlExtractProcessor.execute(). Let’s come up with the a simple implementation to
satisfy the test.

@Override
public void execute(IngestDocument ingestDocument) throws Exception {
    String content = ingestDocument.getFieldValue(field, String.class);
    LinkExtractor linkExtractor = LinkExtractor.builder().linkTypes(EnumSet.of(LinkType.URL)).build();
    List<String> links = StreamSupport.stream(linkExtractor.extractLinks(content).spliterator(), false)
            .map(link -> content.substring(link.getBeginIndex(), link.getEndIndex()))
            .collect(Collectors.toList());
    ingestDocument.setFieldValue(targetField, links);
}

If you run the UrlExtractProcessorTests tests now, those should pass. Running gradle clean check will also pass now.

gradle run

Congratulations, you just built your first ingest processor!

Add more tests

Of course this is only a barebones example, but you should always add more tests, which define the behavior of your processor. For example, what happens if the field you are going to read data from is not a string, but a map or a number. What happens with email addresses? Are URLs detected that use other schemes than http and https? You should check the autolink documentation for more information about that. Also, what happens if the target_field already exists - should those be merged or an error thrown?

Create the plugin distribution

If you want to use the plugin, you need to create an installable zip archive out of your code. You can simply run gradle assemble (better run gradle clean check assemble to not accidentally package a plugin with failing tests) and then you will have a zip file in build/distributions/, which you can install using

elasticsearch-plugin install file:///$path/$to/build/distributions/ingest-url-extract-0.0.1-SNAPSHOT.zip

Of course you should not put snapshot distributions into production.

Keep the security manager in mind

Before we are done with this blogpost, there is one last thing to keep in mind, which we did not need to touch due to the simplicity of the plugin. The Java Security Manager is always enabled in Elasticsearch. This means, you cannot easily read arbitrary files or open network sockets. If you want to do that, you can add exceptions to the security rules. Please be very careful, as with great power comes great responsibility. You should try to follow a least-privilege approach. Many Java libraries and dependencies use features like reflection to solve certain problems and you should try to work around those if possible instead of blindly enabled permissions for a plugin.

You will be warned early however, because even the unit tests start with the security manager enabled already.

Come up with your own processor code

The possibilities of this are only limited to your imagination. You can write your own geo preprocessors, you can extract content from binary files, you can add calculations from machine learning apps to speed up your queries and still have awesome recommendations, you can detect languages or extract named entities. Also, you don’t have to be worried that costly calculations in ingest processors drain your indexing or search performance, because you can offload them to dedicated nodes - which also means, that you can scale them differently than your data/search needs.

We are looking forward for the next wave of ingest processors now! Ping us if you have questions or need any help, we’re all hanging out in IRC or the discussion forums.

Further Resources