Elasticsearch’s new range enrich policy enables a new level of contextual data analysis

blog-elasticsearch-720x420.png

Elasticsearch 7.16 introduced a new enrich policy: range. The range policy allows one to match a number, date, or IP address in incoming documents to a range of the same type in the enrich index. Being able to match against an IP range can be specifically useful in security use cases where the additional metadata can be used to further refine detection rules. As we’ve already added an example to our documentation using IP ranges, we’ll go through an example here using the date_range type.

Our fictional example: incidents and on-call schedules

Say we have a number of on-call schedules and we want to add them to Elasticsearch so that each continuous shift is a document. Let’s introduce our fictional test cases: Bob, Alice, Dan, Matt and Lizzie.

Bob likes to work Nine-to-Six taking an hour lunch break at noon. We can add his schedule for Monday, the 29th of November like this:

PUT /on_call_schedules
{
  "mappings": {
    "properties": {
      "shift": { 
        "type": "date_range", 
        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
      }
    }
  }
}
POST on_call_schedules/_doc
{
  "engineer" : { 
    "name" : "Bob"
  },
  "shift" : {
    "gte" : "2021-11-29 08:00:00", 
    "lte" : "2021-11-29 12:00:00"
  }
}
POST on_call_schedules/_doc
{
  "engineer" : { 
    "name" : "Bob"
  },
  "shift" : {
    "gte" : "2021-11-29 13:00:00", 
    "lte" : "2021-11-29 17:00:00"
  }
}

For the other engineers, their stories are as follows: Alice has a similar schedule but starts lunch at 13:00,Dan and Matt are in different time zones,Matt works half a day, 0:00 - 4:00,Dan works 3:00-8:00 with a lunch break until 9:00 and ends work at 12:00, andLizzie works evenings from 16:00 till midnight taking a break at 20:00.

The remaining requests to fill the index look like this (putting objects on the same line for conciseness):

POST on_call_schedules/_doc
{
  "engineer" : {  "name" : "Alice"  },
  "shift" : { "gte" : "2021-11-29 09:00:00", "lte" : "2021-11-29 13:00:00"  }
}
POST on_call_schedules/_doc
{
  "engineer" : {  "name" : "Alice"  }, 
  "shift" : {  "gte" : "2021-11-29 14:00:00", "lte" : "2021-11-29 18:00:00"  }
}
POST on_call_schedules/_doc
{
  "engineer" : {  "name" : "Dan"  },
  "shift" : { "gte" : "2021-11-29 03:00:00", "lte" : "2021-11-29 08:00:00"  }
}
POST on_call_schedules/_doc
{
  "engineer" : {  "name" : "Dan"  }, 
  "shift" : {  "gte" : "2021-11-29 09:00:00", "lte" : "2021-11-29 12:00:00"  }
}
POST on_call_schedules/_doc
{
  "engineer" : {  "name" : "Matt"  }, 
  "shift" : {  "gte" : "2021-11-29 00:00:00", "lte" : "2021-11-29 04:00:00"  }
}
POST on_call_schedules/_doc
{
  "engineer" : {  "name" : "Lizzie"  }, 
  "shift" : {  "gte" : "2021-11-29 16:00:00", "lte" : "2021-11-29 20:00:00"  }
}
POST on_call_schedules/_doc
{
  "engineer" : {  "name" : "Lizzie"  }, 
  "shift" : {  "gte" : "2021-11-29 21:00:00", "lte" : "2021-11-30 00:00:00"  }
}

Now that we have an index with all the schedules, we can move on to creating an enrich policy to find the on call engineers when we supply a date by matching it against the shift field containing a date_range:

PUT /_enrich/policy/add-oncall-engineers-policy
{
  "range": {
    "indices": "on_call_schedules",
    "match_field": "shift",
    "enrich_fields": ["engineer.name"]
  }
}

With the policy in place, we can execute it so that the data in the source index can be prepared for use:

POST /_enrich/policy/add-oncall-engineers-policy/_execute?wait_for_completion=true

Now we’ll create an ingest pipeline so we can process incoming documents:

PUT /_ingest/pipeline/engineer_lookup
{
  "processors" : [
    {
      "enrich" : {
        "description": "Add on-call engineer based on 'date'",
        "policy_name": "add-oncall-engineers-policy",
        "field" : "@timestamp",
        "target_field": "oncall_engineers",
        "max_matches": "25"
      }
    }
  ]
}

At this point we’re all set to log some incidents and have them enriched with the engineers that were scheduled.

Giving our setup a go with incident 1

Let’s log incident one which was handled by Dan at 6:12 in the morning:

PUT /incidents/_doc/incident1?pipeline=engineer_lookup
{
  "@timestamp": "2021-11-29 06:12:33",
  "severity": "high",
  "handled_by": "Dan"
}

When we retrieve the document we can see that Dan was the only one scheduled:

GET /incidents/_doc/incident1

Response:

{
  "_index" : "incidents",
  "_type" : "_doc",
  "_id" : "incident1",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "severity" : "high",
    "@timestamp" : "2021-11-29 06:12:33",
    "handled_by" : "Dan",
    "oncall_engineers" : [
      {
        "shift" : {
          "gte" : "2021-11-29 03:00:00",
          "lte" : "2021-11-29 08:00:00"
        },
        "engineer" : {
          "name" : "Dan"
        }
      }
    ]
  }
}

More incidents

Let’s log three more incidents, two handled by Dan at 11:12 and 14:08 and one by Alice at 16:12:

PUT /incidents/_doc/incident2?pipeline=engineer_lookup
{
  "@timestamp": "2021-11-29 11:12:52",  "severity": "high",  "handled_by": "Dan"
}

PUT /incidents/_doc/incident3?pipeline=engineer_lookup
{
  "@timestamp": "2021-11-29 14:08:06",   "severity": "high",  "handled_by": "Dan"
}

PUT /incidents/_doc/incident4?pipeline=engineer_lookup&refresh=wait_for
{
  "@timestamp": "2021-11-29 16:12:16",  "severity": "high",  "handled_by": "Alice"
}

Based on our schedule, when incident two happened, we should have had three engineers on call. Let’s verify:

GET /incidents/_doc/incident2

Response:

{
  "_index" : "incidents",
  "_type" : "_doc",
  "_id" : "incident2",
  "_version" : 1,
  "_seq_no" : 1,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "severity" : "high",
    "@timestamp" : "2021-11-29 11:12:52",
    "handled_by" : "Dan",
    "oncall_engineers" : [
      {
        "shift" : {
          "gte" : "2021-11-29 08:00:00",
          "lte" : "2021-11-29 12:00:00"
        },
        "engineer" : {
          "name" : "Bob"
        }
      },
      {
        "shift" : {
          "gte" : "2021-11-29 09:00:00",
          "lte" : "2021-11-29 13:00:00"
        },
        "engineer" : {
          "name" : "Alice"
        }
      },
      {
        "shift" : {
          "gte" : "2021-11-29 09:00:00",
          "lte" : "2021-11-29 12:00:00"
        },
        "engineer" : {
          "name" : "Dan"
        }
      }
    ]
  }
}

As we can see, the shifts were matched correctly.

Incident three is a bit odd, Dan handled this incident but wasn’t scheduled to do so! Dan works too hard. Instead of retrieving the incident directly, let’s search for all incidents which Dan handled when he wasn’t scheduled:

GET incidents/_search
{
  "query": {
    "bool": {
      "must_not": [
        {
          "term": {
            "oncall_engineers.engineer.name.keyword": "Dan"
          }
        }
      ], 
      "filter": [
        {
          "term": {
            "handled_by.keyword": "Dan"
          }
        }
      ]
    }
  }
}

And indeed, we get incident three as a hit:

{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 0.0,
    "hits" : [
      {
        "_index" : "incidents",
        "_type" : "_doc",
        "_id" : "incident3",
        "_score" : 0.0,
        "_source" : {
          "severity" : "high",
          "@timestamp" : "2021-11-29 14:08:06",
          "handled_by" : "Dan",
          "oncall_engineers" : [
            {
              "shift" : {
                "gte" : "2021-11-29 13:00:00",
                "lte" : "2021-11-29 17:00:00"
              },
              "engineer" : {
                "name" : "Bob"
              }
            },
            {
              "shift" : {
                "gte" : "2021-11-29 14:00:00",
                "lte" : "2021-11-29 18:00:00"
              },
              "engineer" : {
                "name" : "Alice"
              }
            }
          ]
        }
      }
    ]
  }
}

In addition to searches, we can also run aggregations. Let's break things down by how often each engineer handled an incident and for how many incidents they were on call:

GET incidents/_search
{
  "aggs": {
    "on_call_per_incident": {
      "terms": {
        "field": "oncall_engineers.engineer.name.keyword",
        "size": 10
      }
    },
    "handled_incidents": {
      "terms": {
        "field": "handled_by.keyword",
        "size": 10
      }
    }
  }
  , "size": 0
}

As a final example, let's also break it down per engineer and who was also on call when that engineer handled an incident:

GET incidents/_search
{
  "aggs": {
    "incidents_handled_by": {
      "terms": {
        "field": "handled_by.keyword",
        "size": 10
      }
      , "aggs": {
        "supporting": {
          "terms": {
            "field": "oncall_engineers.engineer.name.keyword",
            "size": 10
          }
        }
      }
    }
  }
  , "size": 0
}

The Benefits of the range enrich policy

The range enrich policy opens up new options for matching and new ways to enrich documents. In this blog post we showed a fictional company with it’s scheduled engineers and logged incidents. Using Elasticsearch’s capabilities, we can log incidents, enrich the incidents with which engineers were scheduled, and analyze the data.

Happy enriching, and please check out our blog post around the enrich cache that was also introduced in 7.16 that allows for higher ingest rates.