Just click the Suggest pipeline button in Kibana's Processing tab and within a few seconds you're looking at a complete pipeline (Grok pattern, date normalization, type conversions) with a preview of how your actual log documents parse through it.
The alternative is doing this by hand: write a Grok pattern, testing it, fixing the edge cases, realizing the field names don't match ECS, renaming them, adding a date processor. And all that is just the work for a single service.
The three jobs every log pipeline has
Every log processing pipeline does the same three things: Things usually start with extracting fields from raw log messages, normalizing them to a consistent schema, and cleaning up whatever you don't need. Most teams would build and maintain these by hand, which can be challenging as log formats change and you realize that the person who wrote the Grok pattern moved teams, and nothing about the pipeline is documented except the pattern itself.
Every new service now means doing it again from scratch, with a different format, different edge cases, and eventually a different person maintaining a pattern they didn't write.
For the initial pipeline, Streams handles all three jobs automatically and validates the result before anything touches your production data.
What happens when you click "Suggest pipeline"
Open the Processing tab for a stream in Kibana. Click the button. Within seconds, the panel populates with a proposed pipeline (typically a parsing step, date normalization, type conversions, and field cleanup) along with a live preview showing what your most recent documents look like after the pipeline runs.
n this view you can see the exact fields that will be extracted, their types, and how many of your sample documents parsed successfully. If a field name is off, you can also edit it inline; if a step is adding noise, just remove it. And if the parse rate needs work, you can easily adjust and re-run generation. Nothing is written to the stream until you explicitly confirm. For now at least this is an important step for the human to be in the loop with these changes. As systems like these mature more, this may not be necessary in the future.
Let's walk through the steps in more detail.
Stage 1: Log grouping and pattern extraction
The first stage of our process doesn't involve a reasoning model. It's actually deterministic: the same input always produces the same output, with no variance from a model. It also scopes down what Stage 2 has to figure out.
Before any extraction runs, Streams clusters the messages by log format fingerprint. The algorithm is really simple too: digits map to 0, letters map to a, and punctuation is preserved as-is. Two messages that produce the same fingerprint land in the same group.
# two entries from the same nginx stream
2026-03-30 14:22:31 192.168.1.100 - james "GET /api/v1/health" 200
2026-03-30 08:01:05 10.0.0.5 - alice "GET /api/v2/status" 404
# fingerprint
0-0-0 0:0:0 0.0.0.0 - a "a /a/a0/a" 0
0-0-0 0:0:0 0.0.0.0 - a "a /a/a0/a" 0
A stream with mixed log formats produces multiple groups, one per distinct format in the batch. This is a fairly simple but really effective way for us to cluster similar logs together and it makes all the other steps much more reliable.
Both Grok and Dissect run on the same input, though they work differently. Grok runs per group, as it supports multiple patterns and handles each distinct format independently. Dissect uses a single pattern, so it targets only the largest group in the batch.
For each candidate, a heuristic algorithm analyzes the messages and identifies field boundaries: what's fixed text and what varies. It generates a pattern with positional placeholder names. An LLM then reviews the extracted field positions against a sample of up to 10 messages and renames the placeholders to human-readable, schema-compliant names.
# grok heuristic output (positional placeholders)
%{IPV4:field_0} - %{USER:field_1} \[%{HTTPDATE:field_2}\] "%{WORD:field_3} %{URIPATHPARAM:field_4}..."
# after LLM field naming (ECS-aligned)
%{IPV4:source.ip} - %{USER:user.name} \[%{HTTPDATE:@timestamp}\] "%{WORD:http.request.method} %{URIPATHPARAM:url.path}..."
# dissect heuristic output (positional placeholders)
%{field_0} - %{field_1} [%{field_2}] "%{field_3} %{field_4} %{?field_5}" %{field_6} %{field_7}
# after LLM field naming (ECS-aligned)
%{source.ip} - %{user.name} [%{@timestamp}] "%{http.request.method} %{url.path} %{?http_version}" %{http.response.status_code} %{http.response.body.bytes}
The resulting processor is simulated against your submitted documents to measure its parse rate. Grok is a little more expressive, with typed fields, named captures, multiple sub-patterns. The big downside is that it's also slower. Dissect on the other hand is faster but limited to fixed-position splits. Simple log formats tend to parse cleanly with dissect; complex ones need grok.
The candidate with the higher parse rate becomes that group's parsing processor. This runs for every group in the batch. Stage 1 hands Stage 2 one parsing processor per group found.
For a batch of nginx access logs, the extraction produces two candidates for the one format group present:
# input (sampled from 300 submitted documents)
192.168.1.100 - james [30/Mar/2026:14:22:31 +0000] "GET /api/v1/health HTTP/1.1" 200 1234
# grok candidate → parse rate 94% (282/300)
%{IPV4:source.ip} - %{USER:user.name} \[%{HTTPDATE:@timestamp}\] "%{WORD:http.request.method} %{URIPATHPARAM:url.path} HTTP/%{NUMBER:http.version}" %{NUMBER:http.response.status_code:int} %{NUMBER:http.response.body.bytes:int}
# dissect candidate → parse rate 71% (213/300)
%{source.ip} - %{user.name} [%{@timestamp}] "%{http.request.method} %{url.path} %{?http_version}" %{http.response.status_code} %{http.response.body.bytes}
# winner: grok
Grok wins here because %{HTTPDATE} handles the bracketed timestamp format; Dissect tries to split on fixed positions and fails on the surrounding brackets. Both run in parallel; comparing their results adds negligible time since this intial simulation is only done on a sample of documents.
Stage 2: The reasoning agent
Stage 1 produces a parsing processor; Stage 2 turns it into a complete, validated pipeline.
This stage uses a reasoning agent that iterates through a loop with two tools, running up to six iterations.
The loop:
- The agent takes the Stage 1 parsing processor and proposes additional steps: date normalization, type conversions, field cleanup, and PII masking for fields it identifies as sensitive.
- It runs the complete proposed pipeline against your original documents (the raw data, not pre-processed) and returns validation results.
- If the simulation fails, the agent reads the error messages and adjusts. The failures are very specific, and we're making good use of the LLMs capabilities to understand them: which processor failed, on what percentage of documents, with what error type. When the parse rate drops below 80%, the tool returns:
Parse rate is too low: 67.00% (minimum required: 80%). The pipeline is not
extracting fields from enough documents. Review the processors and ensure
they handle the document structure correctly.
Processor "grok[0]" has a failure rate of 33.00% (maximum allowed: 20%).
This processor is failing on too many documents.
The agent now reads the processor name, the failure rate, and the threshold, then adjusts the pattern on the next iteration. It can't commit until the errors resolve.
- This repeats until the pipeline passes, then commits and sends for user approval in the UI.
To ensure quality we enforce two hard thresholds at the tool level, not by the agent's judgment:
- If fewer than 80% of documents parse successfully, the simulation returns an error. The agent must fix this before proceeding.
- If any individual processor fails on more than 20% of documents, the simulation is invalid.
Validation is also embedded in the tool: the model sees an error message and must resolve it before proceeding. It can't commit a pipeline that fails these checks.
Under the hood we're steering the agent in a spefific direction. The system prompt here includes: "Simplify first. Remove problematic processors rather than adding workarounds. A pipeline that handles 95% of documents perfectly is better than one that attempts 100% but fails unpredictably."
If your data is already well-structured (proper @timestamp, correct field types, no raw text that needs parsing), the agent detects this and commits an empty pipeline. It doesn't add processors for the sake of it.
The output is Streamlang
The agent writes Streamlang DSL, Elastic's processing language for streams, which compiles to ingest pipelines behind the scenes.
The field schema, the processor types, the step format: all expressed in Streamlang. Here's what the user-approved pipeline looks like for the nginx example above, targeting an ECS stream:
steps:
- action: grok
from: message
patterns:
- "%{IPV4:source.ip} - %{USER:user.name} \\[%{HTTPDATE:@timestamp}\\] \"%{WORD:http.request.method} %{URIPATHPARAM:url.path} HTTP/%{NUMBER:http.version}\" %{NUMBER:http.response.status_code:int} %{NUMBER:http.response.body.bytes:int}"
- action: date
from: "@timestamp"
formats:
- "dd/MMM/yyyy:HH:mm:ss Z"
- action: convert
from: http.response.status_code
type: integer
- action: remove
from: message
Two schemas, one generator
Not everyone lands logs in the same shape, and Elastic needs to support a variety of formats. Teams running OpenTelemetry collectors want their data in OTel-native fields. Teams on Elastic's traditional stack expect ECS. Both are valid, and forcing everyone onto one schema would mean asking half our users to restructure their pipelines before they can even get started.
So Streams supports both, and the generator handles both. We automatically detect if we should use OTel or ECS here. For this we mostly look at the name of the stream and check if it contains otel, as that's what the current naming in our stack defaults to.
The pipeline looks different for each because the canonical field names differ:
| OTel | ECS | |
|---|---|---|
| Log body | body.text | message |
| Log level | severity_text | log.level |
| Service name | resource.attributes.service.name | service.name |
| Host name | resource.attributes.host.name | host.name |
An OTel stream gets a grok processor that reads from body.text:
{ "action": "grok", "from": "body.text", "patterns": ["..."] }
An ECS stream reads from message:
{ "action": "grok", "from": "message", "patterns": ["..."] }
OTel streams alias the ECS field names to their OTel equivalents. log.level is an alias for severity_text. message is an alias for body.text. A query written for ECS works on an OTel stream without changes, since the alias layer handles the translation.
{
"message": { "path": "body.text", "type": "alias" },
"log.level": { "path": "severity_text", "type": "alias" }
}
The agent is aware of which side of this it's on. It doesn't add a rename step for severity_text → log.level on an OTel stream because the alias already provides that mapping. On an ECS stream, it adds the normalization explicitly.
Schema normalization
Field extraction is the most important and obvious part, but our fields also need to align.
If two services both log HTTP requests but call the status code field differently (response_status in one, http_code in another), a query for http.response.status_code: 5* returns nothing for either of them. Schema normalization maps both to the standard name:
# before: extracted field names from two different services
{ "response_status": 500 } # service A
{ "http_code": 500 } # service B
# after: ECS normalization
{ "http.response.status_code": 500 }
Now every service uses http.response.status_code, and the query works across all of them.
During simulation, the agent checks ECS and OTel metadata for every field it generates. Fields that already have standard names are left alone. Fields that map to a known ECS field get renamed. The simulation metrics surface this explicitly: each field in the results includes its ECS or OTel type indicator, so you can see at a glance what's been normalized.
The bar the agent must clear
The system prompt sets explicit acceptance criteria for a user-approved pipeline:
- 99% of documents must have a valid
@timestamp - All fields must have the correct types for the target schema
- The overall failure rate must be below 0.5%
If the agent can't satisfy all of these within six iterations, the generation fails.
To summarize
Pipeline generation takes seconds where the manual process takes hours. The time savings come from automating the validation loop you'd otherwise run by hand: write a pattern, test it against real documents, read the failures, adjust, and try again. The agent does this in up to six cycles against the last documents your stream actually received.
What's coming next in Streams and processing
The most user-facing change in progress is the refinement loop. Right now, if the suggestion is close but not exactly right, you edit steps manually and that's it. The next version lets you adjust the proposed pipeline and send it back through the agent with your changes as context, so it builds from where you left off rather than starting from scratch.
Two other things are in progress: generation going async (currently it blocks the UI for a few seconds; soon it runs in the background), and support for streams that already have a pipeline. For now, it only handles streams without existing processing steps.
The same capabilities are also being exposed as callable tools in the Streams agent builder and as APIs for third-party agent frameworks. An agent can run a full pipeline generation as part of a broader onboarding workflow, without the UI.