Engineering

Calculating ingest lag and storing ingest time in Elasticsearch to improve observability

When viewing and analysing data with Elasticsearch, it is not uncommon to see visualizations and monitoring and alerting solutions that make use of timestamps that have been generated on remote/monitored systems. However, using remote-generated timestamps may be risky. 

If there is a delay between the occurrence of a remote event and the event arriving to Elasticsearch, or if the time on a remote system is set incorrectly, then important events could fly under the radar. Therefore, when ingesting documents into Elasticsearch it is often helpful to store the ingest time of each document, as well as to monitor how long it takes for each event to arrive to the Elasticsearch cluster. A larger than normal ingest lag may indicate that there is a problem with the ingest process or a problem with the time setting on a remote system.

In this blog we will show how to use an ingest node with the set processor to add an ingest timestamp to documents when they arrive at an Elasticsearch cluster. This timestamp can be used in visualisations, monitoring, and alerting. 

Additionally, we will show how to use the script processor to calculate the ingest lag. This lag is the difference between the timestamp of when an event has occurred on a remote/monitored system versus the time that the corresponding document arrives at an Elasticsearch cluster. This can be used for ensuring that the ingest process is not taking too long, and for detecting if remote timestamps are set incorrectly. 

Adding an ingest timestamp and calculating ingest lag

Below we give an example of an ingest pipeline that adds an ingest timestamp called "ingest_time". It also calculates the lag between the remote event timestamp and the time that the event arrives at Elasticsearch and stores this in a field called "lag_in_seconds".

The “ingest_time” field serves two purposes: (1) it can be used as the time field in Kibana visualizations as well as for monitoring and alerting, and (2) it is used in the lag calculation. 

Note that we assume that each document will have a field called “event_timestamp” that corresponds to when each event occurred on the remote/monitored system. The name of your event timestamp field will likely be different for your data and should be modified accordingly.

We write our pipeline to Elasticsearch as follows:

PUT _ingest/pipeline/calculate_lag 
{ 
  "description": "Add an ingest timestamp and calculate ingest lag", 
  "processors": [ 
    { 
      "set": { 
        "field": "_source.ingest_time", 
        "value": "{{_ingest.timestamp}}" 
      } 
    }, 
    { 
      "script": { 
        "lang": "painless", 
        "source": """ 
            if(ctx.containsKey("ingest_time") && ctx.containsKey("event_timestamp")) { 
              ctx['lag_in_seconds'] = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx['event_timestamp']), ZonedDateTime.parse(ctx['ingest_time']))/1000; 
            } 
        """ 
      } 
    } 
  ] 
}

And we can then test the pipeline with the simulate API: 

POST _ingest/pipeline/calculate_lag/_simulate 
{ 
  "docs": [ 
    { 
      "_source": { 
        "event_timestamp": "2019-11-07T20:39:00.000Z" 
      } 
    } 
  ] 
}

Which should respond with something similar to the following, which includes the "lag_in_seconds" and the "ingest_time" fields:

{ 
  "docs" : [ 
    { 
      "doc" : { 
        "_index" : "_index", 
        "_type" : "_doc", 
        "_id" : "_id", 
        "_source" : { 
          "lag_in_seconds" : 17950223, 
          "ingest_time" : "2020-06-02T14:49:23.236Z", 
          "event_timestamp" : "2019-11-07T20:39:00.000Z" 
        }, 
        "_ingest" : { 
          "timestamp" : "2020-06-02T14:49:23.236Z" 
        } 
      } 
    } 
  ] 
}

Finally, we can write a real document to Elasticsearch with the pipeline: 

PUT test_index/_doc/1?pipeline=calculate_lag 
{ 
  "event_timestamp": "2019-11-07T20:39:00.000Z", 
  "other_field": "whatever" 
}

And we can retrieve the document 

GET test_index/_doc/1

Which should respond with the following:

{ 
  "_index" : "test_index", 
  "_type" : "_doc", 
  "_id" : "1", 
  "_version" : 1, 
  "_seq_no" : 0, 
  "_primary_term" : 1, 
  "found" : true, 
  "_source" : { 
    "lag_in_seconds" : 17950358, 
    "ingest_time" : "2020-06-02T14:51:38.068Z", 
    "event_timestamp" : "2019-11-07T20:39:00.000Z", 
    "other_field" : "whatever" 
  } 
}

Specifying the pipeline in index settings

When using an ingest pipeline in a production deployment it may be preferable to apply the pipeline to the index settings, rather than specifying the pipeline in the PUT URL. This can be done by adding index.default_pipeline to the index settings as follows:

PUT test_index/_settings 
{ 
  "index.default_pipeline": "calculate_lag" 
}

Now any document that is sent into test_index will pass through the calculate_lag pipeline without the need for ?pipeline=calculate_lag in the URL. We can verify this is working with the following PUT command.

PUT test_index/_doc/2 
{ 
  "event_timestamp": "2019-11-07T20:39:00.000Z", 
  "other_field": "This is a new doc" 
}

Execute the following command to see the document that we have just ingested.

GET test_index/_doc/2

Which should return an enriched document that looks like this:

{ 
  "_index" : "test_index", 
  "_type" : "_doc", 
  "_id" : "2", 
  "_version" : 1, 
  "_seq_no" : 1, 
  "_primary_term" : 1, 
  "found" : true, 
  "_source" : { 
    "lag_in_seconds" : 17951461, 
    "ingest_time" : "2020-06-02T15:10:01.670Z", 
    "event_timestamp" : "2019-11-07T20:39:00.000Z", 
    "other_field" : "This is a new doc" 
  } 
}

How to use ingest lag

A larger than expected ingest lag could indicate a problem with the ingest process. Therefore, the lag can be used to trigger alerts if it exceeds a certain threshold. Or the lag can be fed into a machine learning job to detect unexpected deviations in the ingest processing time. 

Alternatively, analysis can be performed to detect if any host has a much larger lag than other hosts, which could indicate that it may have a problem with its clock setting. Furthermore, a Kibana dashboard can be created to display a graphical representation of the historical versus current lag values. 

If the lag is larger than expected, the cause of the delay should be investigated. If it is caused by an incorrect setting of a clock on a remote system, then the remote clock should be set correctly. If the lag is caused by a slow ingest process, then the ingest process should be investigated and tuned to make it perform up to expectations. The possibilities of how to use the lag are only limited by your needs. 

Using the ingest timestamp

When looking at visualizations in Kibana or watching for anomalies, we often consider events that occurred in the last day or the last week. However, if we depend on the remotely-generated event timestamp as opposed to the ingest timestamp, then any lag on the ingest process may cause some documents to never be viewed or monitored. For example, if an event occurred yesterday but just arrived at the cluster today, it would not show up in the dashboard for today’s events because its remotely-generated timestamp is from yesterday. Furthermore, it would not have been available when we looked at dashboards yesterday because it was not yet stored in Elasticsearch (as it only arrived today). 

On the other hand, if we visualize data and set up alerts using the ingest timestamp, we are guaranteed that we are considering the most recent events to arrive at Elasticsearch regardless of when the events occurred. This will ensure that events are not missed even if the ingest process gets temporarily backed up. 

Another advantage of using the ingest timestamp is related to the fact that the event timestamp could be maliciously or inadvertently set incorrectly. For example, imagine that we are monitoring a remote system and a hacker sets its clock to some date in the 1980s. If we depend on the remotely-generated event timestamp, it is possible that we will miss all activity that a malicious user is performing on that system — unless we specifically look for events that are stored as occurring in the 1980s. On the other hand, if we depend on the ingest timestamp, we are guaranteed that we will consider all of the events that have recently arrived to the Elasticsearch cluster regardless of the timestamp given to each event by a remote system. 

Conclusion

In this blog we have covered how to use an ingest processor to store the ingest time and to calculate the lag of the ingest process, and why you might want to do this. Additionally, we outlined the advantages of using the ingest time for monitoring and in visualizations, as well as the risks of using remote event timestamps. 

If you don't yet have your own Elasticsearch cluster, you can spin up a free trial of Elasticsearch Service in a few minutes, or download the Elastic Stack and run it locally. Also, have a look at Elastic Observability for more information on how to bring your logs, metrics, and APM traces together at scale in a single stack so you can monitor and react to events happening anywhere in your environment.

ElasticON Global 2021

Join us at ElasticON Global for free!

Our biggest event of the year is back Oct 5-7. Take your organization's search, observability, or security capabilities to a whole new level.