Parse and organize logsedit

If your log data is unstructured or semi-structured, you can parse it and break it into meaningful fields. You can use those fields to explore and analyze your data. For example, you can find logs within a specific timestamp range or filter logs by log level to focus on potential issues.

After parsing, you can use the structured fields to further organize your logs by configuring a reroute processor to send specific logs to different target data streams.

Refer to the following sections for more on parsing and organizing your log data:

  • Extract structured fields: Extract structured fields like timestamps, log levels, or IP addresses to make querying and filtering your data easier.
  • Reroute log data to specific data streams: Route data from the generic data stream to a target data stream for more granular control over data retention, permissions, and processing.

Extract structured fieldsedit

Make your logs more useful by extracting structured fields from your unstructured log data. Extracting structured fields makes it easier to search, analyze, and filter your log data.

Follow the steps below to see how the following unstructured log data is indexed by default:

2023-08-08T13:45:12.123Z WARN 192.168.1.101 Disk usage exceeds 90%.

Start by storing the document in the logs-example-default data stream:

  1. In Kibana, go to ManagementDev Tools.
  2. In the Console tab, add the example log to Elasticsearch using the following command:

    POST logs-example-default/_doc
    {
      "message": "2023-08-08T13:45:12.123Z WARN 192.168.1.101 Disk usage exceeds 90%."
    }
  3. Then, you can retrieve the document with the following search:

    GET /logs-example-default/_search

The results should look like this:

{
  ...
  "hits": {
    ...
    "hits": [
      {
        "_index": ".ds-logs-example-default-2023.08.09-000001",
        ...
        "_source": {
          "message": "2023-08-08T13:45:12.123Z WARN 192.168.1.101 Disk usage exceeds 90%.",
          "@timestamp": "2023-08-09T17:19:27.73312243Z"
        }
      }
    ]
  }
}

Elasticsearch indexes the message field by default and adds a @timestamp field. Since there was no timestamp set, it’s set to now. At this point, you can search for phrases in the message field like WARN or Disk usage exceeds. For example, use the following command to search for the phrase WARN in the log’s message field:

GET logs-example-default/_search
{
  "query": {
    "match": {
      "message": {
        "query": "WARN"
      }
    }
  }
}

While you can search for phrases in the message field, you can’t use this field to filter log data. Your message, however, contains all of the following potential fields you can extract and use to filter and aggregate your log data:

  • @timestamp (2023-08-08T13:45:12.123Z): Extracting this field lets you sort logs by date and time. This is helpful when you want to view your logs in the order that they occurred or identify when issues happened.
  • log.level (WARN): Extracting this field lets you filter logs by severity. This is helpful if you want to focus on high-severity WARN or ERROR-level logs, and reduce noise by filtering out low-severity INFO-level logs.
  • host.ip (192.168.1.101): Extracting this field lets you filter logs by the host IP addresses. This is helpful if you want to focus on specific hosts that you’re having issues with or if you want to find disparities between hosts.
  • message (Disk usage exceeds 90%.): You can search for phrases or words in the message field.

These fields are part of the Elastic Common Schema (ECS). The ECS defines a common set of fields that you can use across Elasticsearch when storing data, including log and metric data.

Extract the @timestamp fieldedit

When you added the log to Elasticsearch in the previous section, the @timestamp field showed when the log was added. The timestamp showing when the log actually occurred was in the unstructured message field:

        ...
        "_source": {
          "message": "2023-08-08T13:45:12.123Z WARN 192.168.1.101 Disk usage exceeds 90%.",
          "@timestamp": "2023-08-09T17:19:27.73312243Z"
        }
        ...

The timestamp in the message field shows when the log occurred.

The timestamp in the @timestamp field shows when the log was added to Elasticsearch.

When looking into issues, you want to filter for logs by when the issue occurred not when the log was added to your project. To do this, extract the timestamp from the unstructured message field to the structured @timestamp field by completing the following:

Use an ingest pipeline to extract the @timestamp fieldedit

Ingest pipelines consist of a series of processors that perform common transformations on incoming documents before they are indexed. To extract the @timestamp field from the example log, use an ingest pipeline with a dissect processor. The dissect processor extracts structured fields from unstructured log messages based on a pattern you set.

Elasticsearch can parse string timestamps that are in yyyy-MM-dd'T'HH:mm:ss.SSSZ and yyyy-MM-dd formats into date fields. Since the log example’s timestamp is in one of these formats, you don’t need additional processors. More complex or nonstandard timestamps require a date processor to parse the timestamp into a date field.

Use the following command to extract the timestamp from the message field into the @timestamp field:

PUT _ingest/pipeline/logs-example-default
{
  "description": "Extracts the timestamp",
  "processors": [
    {
      "dissect": {
        "field": "message",
        "pattern": "%{@timestamp} %{message}"
      }
    }
  ]
}

The name of the pipeline,logs-example-default, needs to match the name of your data stream. You’ll set up your data stream in the next section. For more information, refer to the data stream naming scheme.

The field you’re extracting data from, message in this case.

The pattern of the elements in your log data. The %{@timestamp} %{message} pattern extracts the timestamp, 2023-08-08T13:45:12.123Z, to the @timestamp field, while the rest of the message, WARN 192.168.1.101 Disk usage exceeds 90%., stays in the message field. The dissect processor looks for the space as a separator defined by the pattern.

Test the pipeline with the simulate pipeline APIedit

The simulate pipeline API runs the ingest pipeline without storing any documents. This lets you verify your pipeline works using multiple documents. Run the following command to test your ingest pipeline with the simulate pipeline API.

POST _ingest/pipeline/logs-example-default/_simulate
{
  "docs": [
    {
      "_source": {
        "message": "2023-08-08T13:45:12.123Z WARN 192.168.1.101 Disk usage exceeds 90%."
      }
    }
  ]
}

The results should show the @timestamp field extracted from the message field:

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_version": "-3",
        "_source": {
          "message": "WARN 192.168.1.101 Disk usage exceeds 90%.",
          "@timestamp": "2023-08-08T13:45:12.123Z"
        },
        ...
      }
    }
  ]
}

Make sure you’ve created the ingest pipeline using the PUT command in the previous section before using the simulate pipeline API.

Configure a data stream with an index templateedit

After creating your ingest pipeline, run the following command to create an index template to configure your data stream’s backing indices:

PUT _index_template/logs-example-default-template
{
  "index_patterns": [ "logs-example-*" ],
  "data_stream": { },
  "priority": 500,
  "template": {
    "settings": {
      "index.default_pipeline":"logs-example-default"
    }
  },
  "composed_of": [
    "logs@mappings",
    "logs@settings",
    "logs@custom",
    "ecs@mappings"
  ],
  "ignore_missing_component_templates": ["logs@custom"]
}

index_pattern: Needs to match your log data stream. Naming conventions for data streams are <type>-<dataset>-<namespace>. In this example, your logs data stream is named logs-example-*. Data that matches this pattern will go through your pipeline.

data_stream: Enables data streams.

priority: Sets the priority of you Index Template. Index templates with higher priority take precedence over lower priority. If a data stream matches multiple index templates, Elasticsearch uses the template with the higher priority. Built-in templates have a priority of 200, so use a priority higher than 200 for custom templates.

index.default_pipeline: The name of your ingest pipeline. logs-example-default in this case.

composed_of: Here you can set component templates. Component templates are building blocks for constructing index templates that specify index mappings, settings, and aliases. Elastic has several built-in templates to help when ingesting your log data.

The example index template above sets the following component templates:

  • logs@mappings: general mappings for log data streams that include disabling automatic date detection from string fields and specifying mappings for data_stream ECS fields.
  • logs@settings: general settings for log data streams including the following:

    • The default lifecycle policy that rolls over when the primary shard reaches 50 GB or after 30 days.
    • The default pipeline uses the ingest timestamp if there is no specified @timestamp and places a hook for the logs@custom pipeline. If a logs@custom pipeline is installed, it’s applied to logs ingested into this data stream.
    • Sets the ignore_malformed flag to true. When ingesting a large batch of log data, a single malformed field like an IP address can cause the entire batch to fail. When set to true, malformed fields with a mapping type that supports this flag are still processed.
  • logs@custom: a predefined component template that is not installed by default. Use this name to install a custom component template to override or extend any of the default mappings or settings.
  • ecs@mappings: dynamic templates that automatically ensure your data stream mappings comply with the Elastic Common Schema (ECS).
Create a data streamedit

Create your data stream using the data stream naming scheme. Name your data stream to match the name of your ingest pipeline, logs-example-default in this case. Post the example log to your data stream with this command:

POST logs-example-default/_doc
{
  "message": "2023-08-08T13:45:12.123Z WARN 192.168.1.101 Disk usage exceeds 90%."
}

View your documents using this command:

GET /logs-example-default/_search

You should see the pipeline has extracted the @timestamp field:

{
...
{
  ...
  "hits": {
    ...
    "hits": [
      {
        "_index": ".ds-logs-example-default-2023.08.09-000001",
        "_id": "RsWy3IkB8yCtA5VGOKLf",
        "_score": 1,
        "_source": {
          "message": "WARN 192.168.1.101 Disk usage exceeds 90%.",
          "@timestamp": "2023-08-08T13:45:12.123Z"
        }
      }
    ]
  }
}

The extracted @timestamp field.

You can now use the @timestamp field to sort your logs by the date and time they happened.

Troubleshoot the @timestamp fieldedit

Check the following common issues and solutions with timestamps:

  • Timestamp failure: If your data has inconsistent date formats, set ignore_failure to true for your date processor. This processes logs with correctly formatted dates and ignores those with issues.
  • Incorrect timezone: Set your timezone using the timezone option on the date processor.
  • Incorrect timestamp format: Your timestamp can be a Java time pattern or one of the following formats: ISO8601, UNIX, UNIX_MS, or TAI64N. For more information on timestamp formats, refer to the mapping date format.

Extract the log.level fieldedit

Extracting the log.level field lets you filter by severity and focus on critical issues. This section shows you how to extract the log.level field from this example log:

2023-08-08T13:45:12.123Z WARN 192.168.1.101 Disk usage exceeds 90%.

To extract and use the log.level field:

Add log.level to your ingest pipelineedit

Add the %{log.level} option to the dissect processor pattern in the ingest pipeline you created in the Extract the @timestamp field section with this command:

PUT _ingest/pipeline/logs-example-default
{
  "description": "Extracts the timestamp and log level",
  "processors": [
    {
      "dissect": {
        "field": "message",
        "pattern": "%{@timestamp} %{log.level} %{message}"
      }
    }
  ]
}

Now your pipeline will extract these fields:

  • The @timestamp field: 2023-08-08T13:45:12.123Z
  • The log.level field: WARN
  • The message field: 192.168.1.101 Disk usage exceeds 90%.

In addition to setting an ingest pipeline, you need to set an index template. You can use the index template created in the Extract the @timestamp field section.

Test the pipeline with the simulate APIedit

Test that your ingest pipeline works as expected with the simulate pipeline API:

POST _ingest/pipeline/logs-example-default/_simulate
{
  "docs": [
    {
      "_source": {
        "message": "2023-08-08T13:45:12.123Z WARN 192.168.1.101 Disk usage exceeds 90%."
      }
    }
  ]
}

The results should show the @timestamp and the log.level fields extracted from the message field:

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_version": "-3",
        "_source": {
          "message": "192.168.1.101 Disk usage exceeds 90%.",
          "log": {
            "level": "WARN"
          },
          "@timestamp": "2023-8-08T13:45:12.123Z",
        },
        ...
      }
    }
  ]
}
Query logs based on log.leveledit

Once you’ve extracted the log.level field, you can query for high-severity logs like WARN and ERROR, which may need immediate attention, and filter out less critical INFO and DEBUG logs.

Let’s say you have the following logs with varying severities:

2023-08-08T13:45:12.123Z WARN 192.168.1.101 Disk usage exceeds 90%.
2023-08-08T13:45:14.003Z ERROR 192.168.1.103 Database connection failed.
2023-08-08T13:45:15.004Z DEBUG 192.168.1.104 Debugging connection issue.
2023-08-08T13:45:16.005Z INFO 192.168.1.102 User changed profile picture.

Add them to your data stream using this command:

POST logs-example-default/_bulk
{ "create": {} }
{ "message": "2023-08-08T13:45:12.123Z WARN 192.168.1.101 Disk usage exceeds 90%." }
{ "create": {} }
{ "message": "2023-08-08T13:45:14.003Z ERROR 192.168.1.103 Database connection failed." }
{ "create": {} }
{ "message": "2023-08-08T13:45:15.004Z DEBUG 192.168.1.104 Debugging connection issue." }
{ "create": {} }
{ "message": "2023-08-08T13:45:16.005Z INFO 192.168.1.102 User changed profile picture." }

Then, query for documents with a log level of WARN or ERROR with this command:

GET logs-example-default/_search
{
  "query": {
    "terms": {
      "log.level": ["WARN", "ERROR"]
    }
  }
}

The results should show only the high-severity logs:

{
...
  },
  "hits": {
  ...
    "hits": [
      {
        "_index": ".ds-logs-example-default-2023.08.14-000001",
        "_id": "3TcZ-4kB3FafvEVY4yKx",
        "_score": 1,
        "_source": {
          "message": "192.168.1.101 Disk usage exceeds 90%.",
          "log": {
            "level": "WARN"
          },
          "@timestamp": "2023-08-08T13:45:12.123Z"
        }
      },
      {
        "_index": ".ds-logs-example-default-2023.08.14-000001",
        "_id": "3jcZ-4kB3FafvEVY4yKx",
        "_score": 1,
        "_source": {
          "message": "192.168.1.103 Database connection failed.",
          "log": {
            "level": "ERROR"
          },
          "@timestamp": "2023-08-08T13:45:14.003Z"
        }
      }
    ]
  }
}

Extract the host.ip fieldedit

Extracting the host.ip field lets you filter logs by host IP addresses allowing you to focus on specific hosts that you’re having issues with or find disparities between hosts.

The host.ip field is part of the Elastic Common Schema (ECS). Through the ECS, the host.ip field is mapped as an ip field type. ip field types allow range queries so you can find logs with IP addresses in a specific range. You can also query ip field types using Classless Inter-Domain Routing (CIDR) notation to find logs from a particular network or subnet.

This section shows you how to extract the host.ip field from the following example logs and query based on the extracted fields:

2023-08-08T13:45:12.123Z WARN 192.168.1.101 Disk usage exceeds 90%.
2023-08-08T13:45:14.003Z ERROR 192.168.1.103 Database connection failed.
2023-08-08T13:45:15.004Z DEBUG 192.168.1.104 Debugging connection issue.
2023-08-08T13:45:16.005Z INFO 192.168.1.102 User changed profile picture.

To extract and use the host.ip field:

Add host.ip to your ingest pipelineedit

Add the %{host.ip} option to the dissect processor pattern in the ingest pipeline you created in the Extract the @timestamp field section:

PUT _ingest/pipeline/logs-example-default
{
  "description": "Extracts the timestamp log level and host ip",
  "processors": [
    {
      "dissect": {
        "field": "message",
        "pattern": "%{@timestamp} %{log.level} %{host.ip} %{message}"
      }
    }
  ]
}

Your pipeline will extract these fields:

  • The @timestamp field: 2023-08-08T13:45:12.123Z
  • The log.level field: WARN
  • The host.ip field: 192.168.1.101
  • The message field: Disk usage exceeds 90%.

In addition to setting an ingest pipeline, you need to set an index template. You can use the index template created in the Extract the @timestamp field section.

Test the pipeline with the simulate APIedit

Test that your ingest pipeline works as expected with the simulate pipeline API:

POST _ingest/pipeline/logs-example-default/_simulate
{
  "docs": [
    {
      "_source": {
        "message": "2023-08-08T13:45:12.123Z WARN 192.168.1.101 Disk usage exceeds 90%."
      }
    }
  ]
}

The results should show the host.ip, @timestamp, and log.level fields extracted from the message field:

{
  "docs": [
    {
      "doc": {
        ...
        "_source": {
          "host": {
            "ip": "192.168.1.101"
          },
          "@timestamp": "2023-08-08T13:45:12.123Z",
          "message": "Disk usage exceeds 90%.",
          "log": {
            "level": "WARN"
          }
        },
        ...
      }
    }
  ]
}
Query logs based on host.ipedit

You can query your logs based on the host.ip field in different ways, including using CIDR notation and range queries.

Before querying your logs, add them to your data stream using this command:

POST logs-example-default/_bulk
{ "create": {} }
{ "message": "2023-08-08T13:45:12.123Z WARN 192.168.1.101 Disk usage exceeds 90%." }
{ "create": {} }
{ "message": "2023-08-08T13:45:14.003Z ERROR 192.168.1.103 Database connection failed." }
{ "create": {} }
{ "message": "2023-08-08T13:45:15.004Z DEBUG 192.168.1.104 Debugging connection issue." }
{ "create": {} }
{ "message": "2023-08-08T13:45:16.005Z INFO 192.168.1.102 User changed profile picture." }
CIDR notationedit

You can use CIDR notation to query your log data using a block of IP addresses that fall within a certain network segment. CIDR notations uses the format of [IP address]/[prefix length]. The following command queries IP addresses in the 192.168.1.0/24 subnet meaning IP addresses from 192.168.1.0 to 192.168.1.255.

GET logs-example-default/_search
{
  "query": {
    "term": {
      "host.ip": "192.168.1.0/24"
    }
  }
}

Because all of the example logs are in this range, you’ll get the following results:

{
  ...
  },
  "hits": {
    ...
      {
        "_index": ".ds-logs-example-default-2023.08.16-000001",
        "_id": "ak4oAIoBl7fe5ItIixuB",
        "_score": 1,
        "_source": {
          "host": {
            "ip": "192.168.1.101"
          },
          "@timestamp": "2023-08-08T13:45:12.123Z",
          "message": "Disk usage exceeds 90%.",
          "log": {
            "level": "WARN"
          }
        }
      },
      {
        "_index": ".ds-logs-example-default-2023.08.16-000001",
        "_id": "a04oAIoBl7fe5ItIixuC",
        "_score": 1,
        "_source": {
          "host": {
            "ip": "192.168.1.103"
          },
          "@timestamp": "2023-08-08T13:45:14.003Z",
          "message": "Database connection failed.",
          "log": {
            "level": "ERROR"
          }
        }
      },
      {
        "_index": ".ds-logs-example-default-2023.08.16-000001",
        "_id": "bE4oAIoBl7fe5ItIixuC",
        "_score": 1,
        "_source": {
          "host": {
            "ip": "192.168.1.104"
          },
          "@timestamp": "2023-08-08T13:45:15.004Z",
          "message": "Debugging connection issue.",
          "log": {
            "level": "DEBUG"
          }
        }
      },
      {
        "_index": ".ds-logs-example-default-2023.08.16-000001",
        "_id": "bU4oAIoBl7fe5ItIixuC",
        "_score": 1,
        "_source": {
          "host": {
            "ip": "192.168.1.102"
          },
          "@timestamp": "2023-08-08T13:45:16.005Z",
          "message": "User changed profile picture.",
          "log": {
            "level": "INFO"
          }
        }
      }
    ]
  }
}
Range queriesedit

Use range queries to query logs in a specific range.

The following command searches for IP addresses greater than or equal to 192.168.1.100 and less than or equal to 192.168.1.102.

GET logs-example-default/_search
{
  "query": {
    "range": {
      "host.ip": {
        "gte": "192.168.1.100",
        "lte": "192.168.1.102"
      }
    }
  }
}

Greater than or equal to 192.168.1.100.

Less than or equal to 192.168.1.102.

You’ll get the following results only showing logs in the range you’ve set:

{
  ...
  },
  "hits": {
    ...
      {
        "_index": ".ds-logs-example-default-2023.08.16-000001",
        "_id": "ak4oAIoBl7fe5ItIixuB",
        "_score": 1,
        "_source": {
          "host": {
            "ip": "192.168.1.101"
          },
          "@timestamp": "2023-08-08T13:45:12.123Z",
          "message": "Disk usage exceeds 90%.",
          "log": {
            "level": "WARN"
          }
        }
      },
      {
        "_index": ".ds-logs-example-default-2023.08.16-000001",
        "_id": "bU4oAIoBl7fe5ItIixuC",
        "_score": 1,
        "_source": {
          "host": {
            "ip": "192.168.1.102"
          },
          "@timestamp": "2023-08-08T13:45:16.005Z",
          "message": "User changed profile picture.",
          "log": {
            "level": "INFO"
          }
        }
      }
    ]
  }
}

Reroute log data to specific data streamsedit

By default, an ingest pipeline sends your log data to a single data stream. To simplify log data management, use a reroute processor to route data from the generic data stream to a target data stream. For example, you might want to send high-severity logs to a specific data stream to help with categorization.

This section shows you how to use a reroute processor to send the high-severity logs (WARN or ERROR) from the following example logs to a specific data stream and keep the regular logs (DEBUG and INFO) in the default data stream:

2023-08-08T13:45:12.123Z WARN 192.168.1.101 Disk usage exceeds 90%.
2023-08-08T13:45:14.003Z ERROR 192.168.1.103 Database connection failed.
2023-08-08T13:45:15.004Z DEBUG 192.168.1.104 Debugging connection issue.
2023-08-08T13:45:16.005Z INFO 192.168.1.102 User changed profile picture.

When routing data to different data streams, we recommend picking a field with a limited number of distinct values to prevent an excessive increase in the number of data streams. For more details, refer to the Size your shards documentation.

To use a reroute processor:

Add a reroute processor to the ingest pipelineedit

Add a reroute processor to your ingest pipeline with the following command:

PUT _ingest/pipeline/logs-example-default
{
  "description": "Extracts fields and reroutes WARN",
  "processors": [
    {
      "dissect": {
        "field": "message",
        "pattern": "%{@timestamp} %{log.level} %{host.ip} %{message}"
      },
      "reroute": {
        "tag": "high_severity_logs",
        "if" : "ctx.log?.level == 'WARN' || ctx.log?.level == 'ERROR'",
        "dataset": "critical"
      }
    }
  ]
}

tag: Identifier for the processor that you can use for debugging and metrics. In the example, the tag is set to high_severity_logs.

if: Conditionally runs the processor. In the example, "ctx.log?.level == 'WARN' || ctx.log?.level == 'ERROR'", means the processor runs when the log.level field is WARN or ERROR.

dataset: the data stream dataset to route your document to if the previous condition is true. In the example, logs with a log.level of WARN or ERROR are routed to the logs-critical-default data stream.

In addition to setting an ingest pipeline, you need to set an index template. You can use the index template created in the Extract the @timestamp field section.

Add logs to a data streamedit

Add the example logs to your data stream with this command:

POST logs-example-default/_bulk
{ "create": {} }
{ "message": "2023-08-08T13:45:12.123Z WARN 192.168.1.101 Disk usage exceeds 90%." }
{ "create": {} }
{ "message": "2023-08-08T13:45:14.003Z ERROR 192.168.1.103 Database connection failed." }
{ "create": {} }
{ "message": "2023-08-08T13:45:15.004Z DEBUG 192.168.1.104 Debugging connection issue." }
{ "create": {} }
{ "message": "2023-08-08T13:45:16.005Z INFO 192.168.1.102 User changed profile picture." }

Verify the reroute processor workededit

The reroute processor should route any logs with a log.level of WARN or ERROR to the logs-critical-default data stream. Query the the data stream using the following command to verify the log data was routed as intended:

GET logs-critical-default/_search

Your should see similar results to the following showing that the high-severity logs are now in the critical dataset:

{
  ...
  "hits": {
    ...
    "hits": [
        ...
        "_source": {
          "host": {
            "ip": "192.168.1.101"
          },
          "@timestamp": "2023-08-08T13:45:12.123Z",
          "message": "Disk usage exceeds 90%.",
          "log": {
            "level": "WARN"
          },
          "data_stream": {
            "namespace": "default",
            "type": "logs",
            "dataset": "critical"
          },
          {
        ...
        "_source": {
          "host": {
            "ip": "192.168.1.103"
           },
          "@timestamp": "2023-08-08T13:45:14.003Z",
          "message": "Database connection failed.",
          "log": {
            "level": "ERROR"
          },
          "data_stream": {
            "namespace": "default",
            "type": "logs",
            "dataset": "critical"
          }
        }
      }
    ]
  }
}