Aggregating data for faster performance

edit

By default, datafeeds fetch data from Elasticsearch using search and scroll requests. It can be significantly more efficient, however, to aggregate data in Elasticsearch and to configure your anomaly detection jobs to analyze aggregated data.

One of the benefits of aggregating data this way is that Elasticsearch automatically distributes these calculations across your cluster. You can then feed this aggregated data into the machine learning features instead of raw results, which reduces the volume of data that must be considered while detecting anomalies.

If you use a terms aggregation and the cardinality of a term is high, the aggregation might not be effective and you might want to just use the default search and scroll behavior.

Requirements and limitations

edit

There are some limitations to using aggregations in datafeeds. Your aggregation must include a date_histogram aggregation, which in turn must contain a max aggregation on the time field. This requirement ensures that the aggregated data is a time series and the timestamp of each bucket is the time of the last record in the bucket.

The name of the aggregation and the name of the field that the agg operates on need to match, otherwise the aggregation doesn’t work. For example, if you use a max aggregation on a time field called responsetime, the name of the aggregation must be also responsetime.

You must also consider the interval of the date histogram aggregation carefully. The bucket span of your anomaly detection job must be divisible by the value of the calendar_interval or fixed_interval in your aggregation (with no remainder). If you specify a frequency for your datafeed, it must also be divisible by this interval. Anomaly detection jobs cannot use date histograms with an interval measured in months because the length of the month is not fixed. Datafeeds tolerate weeks or smaller units.

As a rule of thumb, if your detectors use metric or sum analytical functions, set the date histogram aggregation interval to a tenth of the bucket span. This suggestion creates finer, more granular time buckets, which are ideal for this type of analysis. If your detectors use count or rare functions, set the interval to the same value as the bucket span.

If your datafeed uses aggregations with nested terms aggs and model plot is not enabled for the anomaly detection job, neither the Single Metric Viewer nor the Anomaly Explorer can plot and display an anomaly chart for the job. In these cases, the charts are not visible and an explanatory message is shown.

When the aggregation interval of the datafeed and the bucket span of the job don’t match, the values of the chart plotted in both the Single Metric Viewer and the Anomaly Explorer differ from the actual values of the job. To avoid this behavior, make sure that the aggregation interval in the datafeed configuration and the bucket span in the anomaly detection job configuration have the same values.

Including aggregations in anomaly detection jobs

edit

When you create or update an anomaly detection job, you can include the names of aggregations, for example:

PUT _ml/anomaly_detectors/farequote
{
  "analysis_config": {
    "bucket_span": "60m",
    "detectors": [{
      "function": "mean",
      "field_name": "responsetime",  
      "by_field_name": "airline"  
    }],
    "summary_count_field_name": "doc_count"
  },
  "data_description": {
    "time_field":"time"  
  }
}

The airline, responsetime, and time fields are aggregations. Only the aggregated fields defined in the analysis_config object are analyzed by the anomaly detection job.

When the summary_count_field_name property is set to a non-null value, the job expects to receive aggregated input. The property must be set to the name of the field that contains the count of raw data points that have been aggregated. It applies to all detectors in the job.

The aggregations are defined in the datafeed as follows:

PUT _ml/datafeeds/datafeed-farequote
{
  "job_id":"farequote",
  "indices": ["farequote"],
  "aggregations": {
    "buckets": {
      "date_histogram": {
        "field": "time",
        "fixed_interval": "360s",
        "time_zone": "UTC"
      },
      "aggregations": {
        "time": {  
          "max": {"field": "time"}
        },
        "airline": {  
          "terms": {
            "field": "airline",
            "size": 100
          },
          "aggregations": {
            "responsetime": {  
              "avg": {
                "field": "responsetime"
              }
            }
          }
        }
      }
    }
  }
}

The aggregations have names that match the fields that they operate on. The max aggregation is named time and its field also needs to be time.

The term aggregation is named airline and its field is also named airline.

The avg aggregation is named responsetime and its field is also named responsetime.

Your datafeed can contain multiple aggregations, but only the ones with names that match values in the job configuration are fed to the job.

Nested aggregations in datafeeds

edit

Datafeeds support complex nested aggregations. This example uses the derivative pipeline aggregation to find the first order derivative of the counter system.network.out.bytes for each value of the field beat.name.

"aggregations": {
  "beat.name": {
    "terms": {
      "field": "beat.name"
    },
    "aggregations": {
      "buckets": {
        "date_histogram": {
          "field": "@timestamp",
          "fixed_interval": "5m"
        },
        "aggregations": {
          "@timestamp": {
            "max": {
              "field": "@timestamp"
            }
          },
          "bytes_out_average": {
            "avg": {
              "field": "system.network.out.bytes"
            }
          },
          "bytes_out_derivative": {
            "derivative": {
              "buckets_path": "bytes_out_average"
            }
          }
        }
      }
    }
  }
}

Single bucket aggregations in datafeeds

edit

Datafeeds not only supports multi-bucket aggregations, but also single bucket aggregations. The following shows two filter aggregations, each gathering the number of unique entries for the error field.

{
  "job_id":"servers-unique-errors",
  "indices": ["logs-*"],
  "aggregations": {
    "buckets": {
      "date_histogram": {
        "field": "time",
        "interval": "360s",
        "time_zone": "UTC"
      },
      "aggregations": {
        "time": {
          "max": {"field": "time"}
        }
        "server1": {
          "filter": {"term": {"source": "server-name-1"}},
          "aggregations": {
            "server1_error_count": {
              "value_count": {
                "field": "error"
              }
            }
          }
        },
        "server2": {
          "filter": {"term": {"source": "server-name-2"}},
          "aggregations": {
            "server2_error_count": {
              "value_count": {
                "field": "error"
              }
            }
          }
        }
      }
    }
  }
}

Defining aggregations in datafeeds

edit

When you define an aggregation in a datafeed, it must have the following form:

"aggregations": {
  ["bucketing_aggregation": {
    "bucket_agg": {
      ...
    },
    "aggregations": {]
      "data_histogram_aggregation": {
        "date_histogram": {
          "field": "time",
        },
        "aggregations": {
          "timestamp": {
            "max": {
              "field": "time"
            }
          },
          [,"<first_term>": {
            "terms":{...
            }
            [,"aggregations" : {
              [<sub_aggregation>]+
            } ]
          }]
        }
      }
    }
  }
}

The top level aggregation must be either a bucket aggregation containing as single sub-aggregation that is a date_histogram or the top level aggregation is the required date_histogram. There must be exactly one date_histogram aggregation. For more information, see Date histogram aggregation.

The time_zone parameter in the date histogram aggregation must be set to UTC, which is the default value.

Each histogram bucket has a key, which is the bucket start time. This key cannot be used for aggregations in datafeeds, however, because they need to know the time of the latest record within a bucket. Otherwise, when you restart a datafeed, it continues from the start time of the histogram bucket and possibly fetches the same data twice. The max aggregation for the time field is therefore necessary to provide the time of the latest record within a bucket.

You can optionally specify a terms aggregation, which creates buckets for different values of a field.

If you use a terms aggregation, by default it returns buckets for the top ten terms. Thus if the cardinality of the term is greater than 10, not all terms are analyzed.

You can change this behavior by setting the size parameter. To determine the cardinality of your data, you can run searches such as:

GET .../_search
{
  "aggs": {
    "service_cardinality": {
      "cardinality": {
        "field": "service"
      }
    }
  }
}

By default, Elasticsearch limits the maximum number of terms returned to 10000. For high cardinality fields, the query might not run. It might return errors related to circuit breaking exceptions that indicate that the data is too large. In such cases, do not use aggregations in your datafeed. For more information, see Terms aggregation.

You can also optionally specify multiple sub-aggregations. The sub-aggregations are aggregated for the buckets that were created by their parent aggregation. For more information, see Aggregations.