Filter and aggregate logsedit

Filter and aggregate your log data to find specific information, gain insight, and monitor your systems more efficiently. You can filter and aggregate based on structured fields like timestamps, log levels, and IP addresses that you’ve extracted from your log data.

This guide shows you how to:

  • Filter logs — Narrow down your log data by applying specific criteria.
  • Aggregate logs — Analyze and summarize data to find patterns and gain insight.

Before you get startededit

The examples on this page use the following ingest pipeline and index template, which you can set in Dev Tools. If you haven’t used ingest pipelines and index templates to parse your log data and extract structured fields yet, start with the Parse and organize logs documentation.

Set the ingest pipeline with the following command:

PUT _ingest/pipeline/logs-example-default
{
  "description": "Extracts the timestamp log level and host ip",
  "processors": [
    {
      "dissect": {
        "field": "message",
        "pattern": "%{@timestamp} %{log.level} %{host.ip} %{message}"
      }
    }
  ]
}

Set the index template with the following command:

PUT _index_template/logs-example-default-template
{
  "index_patterns": [ "logs-example-*" ],
  "data_stream": { },
  "priority": 500,
  "template": {
    "settings": {
      "index.default_pipeline":"logs-example-default"
    }
  },
  "composed_of": [
    "logs-mappings",
    "logs-settings",
    "logs@custom",
    "ecs@dynamic_templates"
  ],
  "ignore_missing_component_templates": ["logs@custom"]
}

Filter logsedit

Filter your data using the fields you’ve extracted so you can focus on log data with specific log levels, timestamp ranges, or host IPs. You can filter your log data in different ways:

Filter logs in Log Exploreredit

Log Explorer is a Kibana tool that automatically provides views of your log data based on integrations and data streams. You can find Log Explorer in the Observability menu under Logs.

From Log Explorer, you can use the Kibana Query Language (KQL) in the search bar to narrow down the log data displayed in Log Explorer. For example, you might want to look into an event that occurred within a specific time range.

Add some logs with varying timestamps and log levels to your data stream:

  1. In Kibana, go to ManagementDev Tools.
  2. In the Console tab, run the following command:
POST logs-example-default/_bulk
{ "create": {} }
{ "message": "2023-09-15T08:15:20.234Z WARN 192.168.1.101 Disk usage exceeds 90%." }
{ "create": {} }
{ "message": "2023-09-14T10:30:45.789Z ERROR 192.168.1.102 Critical system failure detected." }
{ "create": {} }
{ "message": "2023-09-10T14:20:45.789Z ERROR 192.168.1.105 Database connection lost." }
{ "create": {} }
{ "message": "2023-09-20T09:40:32.345Z INFO 192.168.1.106 User logout initiated." }

For this example, let’s look for logs with a WARN or ERROR log level that occurred on September 14th or 15th. From Log Explorer:

  1. Add the following KQL query in the search bar to filter for logs with log levels of WARN or ERROR:

    log.level: ("ERROR" or "WARN")
  2. Click the current time range, select Absolute, and set the Start date to Sep 14, 2023 @ 00:00:00.000.

    Set the start date for your time range
  3. Click the end of the current time range, select Absolute, and set the End date to Sep 15, 2023 @ 23:59:59.999.

    Set the end date for your time range

Under the Documents tab, you’ll see the filtered log data matching your query.

Filter data by log level using KQL

For more on using Log Explorer, refer to the Discover documentation.

Filter logs with Query DSLedit

Query DSL is a JSON-based language that sends requests and retrieves data from indices and data streams. You can filter your log data using Query DSL from Developer Tools.

For example, you might want to troubleshoot an issue that happened on a specific date or at a specific time. To do this, use a boolean query with a range query to filter for the specific timestamp range and a term query to filter for WARN and ERROR log levels.

First, from Dev Tools, add some logs with varying timestamps and log levels to your data stream with the following command:

POST logs-example-default/_bulk
{ "create": {} }
{ "message": "2023-09-15T08:15:20.234Z WARN 192.168.1.101 Disk usage exceeds 90%." }
{ "create": {} }
{ "message": "2023-09-14T10:30:45.789Z ERROR 192.168.1.102 Critical system failure detected." }
{ "create": {} }
{ "message": "2023-09-10T14:20:45.789Z ERROR 192.168.1.105 Database connection lost." }
{ "create": {} }
{ "message": "2023-09-20T09:40:32.345Z INFO 192.168.1.106 User logout initiated." }

Let’s say you want to look into an event that occurred between September 14th and 15th. The following boolean query filters for logs with timestamps during those days that also have a log level of ERROR or WARN.

POST /logs-example-default/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "range": {
            "@timestamp": {
              "gte": "2023-09-14T00:00:00",
              "lte": "2023-09-15T23:59:59"
            }
          }
        },
        {
          "terms": {
            "log.level": ["WARN", "ERROR"]
          }
        }
      ]
    }
  }
}

The filtered results should show WARN and ERROR logs that occurred within the timestamp range:

{
  ...
  "hits": {
    ...
    "hits": [
      {
        "_index": ".ds-logs-example-default-2023.09.25-000001",
        "_id": "JkwPzooBTddK4OtTQToP",
        "_score": 0,
        "_source": {
          "message": "192.168.1.101 Disk usage exceeds 90%.",
          "log": {
            "level": "WARN"
          },
          "@timestamp": "2023-09-15T08:15:20.234Z"
        }
      },
      {
        "_index": ".ds-logs-example-default-2023.09.25-000001",
        "_id": "A5YSzooBMYFrNGNwH75O",
        "_score": 0,
        "_source": {
          "message": "192.168.1.102 Critical system failure detected.",
          "log": {
            "level": "ERROR"
          },
          "@timestamp": "2023-09-14T10:30:45.789Z"
        }
      }
    ]
  }
}

Aggregate logsedit

Use aggregation to analyze and summarize your log data to find patterns and gain insight. Bucket aggregations organize log data into meaningful groups making it easier to identify patterns, trends, and anomalies within your logs.

For example, you might want to understand error distribution by analyzing the count of logs per log level.

First, from Dev Tools, add some logs with varying log levels to your data stream using the following command:

POST logs-example-default/_bulk
{ "create": {} }
{ "message": "2023-09-15T08:15:20.234Z WARN 192.168.1.101 Disk usage exceeds 90%." }
{ "create": {} }
{ "message": "2023-09-14T10:30:45.789Z ERROR 192.168.1.102 Critical system failure detected." }
{ "create": {} }
{ "message": "2023-09-15T12:45:55.123Z INFO 192.168.1.103 Application successfully started." }
{ "create": {} }
{ "message": "2023-09-14T15:20:10.789Z WARN 192.168.1.104 Network latency exceeding threshold." }
{ "create": {} }
{ "message": "2023-09-10T14:20:45.789Z ERROR 192.168.1.105 Database connection lost." }
{ "create": {} }
{ "message": "2023-09-20T09:40:32.345Z INFO 192.168.1.106 User logout initiated." }
{ "create": {} }
{ "message": "2023-09-21T15:20:55.678Z DEBUG 192.168.1.102 Database connection established." }

Next, run this command to aggregate your log data using the log.level field:

POST logs-example-default/_search?size=0&filter_path=aggregations
{
"size": 0,
"aggs": {
    "log_level_distribution": {
      "terms": {
        "field": "log.level"
      }
    }
  }
}

Searches with an aggregation return both the query results and the aggregation, so you would see the logs matching the data and the aggregation. Setting size to 0 limits the results to aggregations.

The results should show the number of logs in each log level:

{
  "aggregations": {
    "error_distribution": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "ERROR",
          "doc_count": 2
        },
        {
          "key": "INFO",
          "doc_count": 2
        },
        {
          "key": "WARN",
          "doc_count": 2
        },
        {
          "key": "DEBUG",
          "doc_count": 1
        }
      ]
    }
  }
}

You can also combine aggregations and queries. For example, you might want to limit the scope of the previous aggregation by adding a range query:

GET /logs-example-default/_search
{
  "size": 0,
  "query": {
    "range": {
      "@timestamp": {
        "gte": "2023-09-14T00:00:00",
        "lte": "2023-09-15T23:59:59"
      }
    }
  },
  "aggs": {
    "my-agg-name": {
      "terms": {
        "field": "log.level"
      }
    }
  }
}

The results should show an aggregate of logs that occurred within your timestamp range:

{
  ...
  "hits": {
    ...
    "hits": []
  },
  "aggregations": {
    "my-agg-name": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "WARN",
          "doc_count": 2
        },
        {
          "key": "ERROR",
          "doc_count": 1
        },
        {
          "key": "INFO",
          "doc_count": 1
        }
      ]
    }
  }
}

For more on aggregation types and available aggregations, refer to the Aggregations documentation.