Streamlang
Streamlang is a YAML domain-specific language (DSL) for defining stream processing and routing logic. Streamlang provides a consistent processing interface that can be converted to multiple execution targets, including Elasticsearch ingest pipelines and ES|QL. This allows processing to run at ingest time or query time without rewriting rules.
You can write Streamlang directly using the YAML editing mode in the Processing tab or the interactive mode which generates Streamlang behind the scenes.
A Streamlang configuration is a YAML document with a single top-level steps array. Each step is either an action block (processor) or a condition block:
steps:
- action: <processor_type>
# processor-specific parameters
- action: <processor_type>
# processor-specific parameters
where:
# optional condition
- condition:
field: <field_path>
eq: <value>
steps:
- action: <processor_type>
# nested processor
Steps run in order. Each processor transforms input documents, and passes results to the next step.
Processors are the building blocks of a Streamlang configuration. Each processor has an action field that specifies an operation to perform.
All processors support the following common options:
| Option | Type | Description |
|---|---|---|
description |
string | A human-readable description of the processor. |
ignore_failure |
boolean | When true, document processing continues even if this processor fails. |
where |
condition | A condition that the processor must meet to run. |
The following table lists all available processors. Refer to the individual processor pages for YAML parameters and examples.
| Action | Description |
|---|---|
append |
Adds values to an array field, or creates the field as an array if it doesn't exist. |
concat |
Concatenates a mix of field values and literal strings into a single field. |
convert |
Converts a field value to a different data type. |
date |
Parses date strings into timestamps. |
dissect |
Parses structured text using delimiter-based patterns. |
drop_document |
Prevents indexing of a document from based on a condition. |
grok |
Parses unstructured text using predefined or custom patterns. |
join |
Concatenates the values of multiple fields with a delimiter. |
lowercase |
Converts a string field to lowercase. |
math |
Evaluates an arithmetic expression and stores the result. |
network_direction |
Determines network traffic direction based on source and destination IP addresses. |
redact |
Redacts sensitive data in a string field by matching patterns. |
remove |
Removes a field from the document. |
remove_by_prefix |
Removes a field and all nested fields matching a prefix. |
rename |
Moves a field's value to a new field name and removes the original. |
replace |
Replaces portions of a string field that match a regular expression. |
set |
Assigns a value to a field, creating the field if it doesn't exist. |
trim |
Removes leading and trailing whitespace from a string field. |
uppercase |
Converts a string field to uppercase. |
Conditions are Boolean expressions that control when processors run and how wired streams route data into partitions. They appear in where clauses on processors, in condition blocks, and in stream partitioning.
Each comparison condition specifies a field and an operator with a value:
| Operator | Description | Example value |
|---|---|---|
eq |
Equals | "active", 200 |
neq |
Not equals | "error" |
lt |
Less than | 100 |
lte |
Less than or equal to | 100 |
gt |
Greater than | 0 |
gte |
Greater than or equal to | 1 |
contains |
Field value contains the substring | "error" |
startsWith |
Field value starts with the string | "/api" |
endsWith |
Field value ends with the string | ".log" |
includes |
Multivalue field includes the value | "admin" |
Use range to match values within a numeric range. You can combine any of gt, gte, lt, and lte:
where:
field: attributes.status_code
range:
gte: 200
lt: 300
Use exists to check whether a field is present:
# Field must exist
where:
field: attributes.user_id
exists: true
# Field must not exist
where:
field: attributes.temp
exists: false
Combine conditions using and, or, and not:
# All conditions must be true
where:
and:
- field: attributes.env
eq: production
- field: attributes.level
eq: error
# At least one condition must be true
where:
or:
- field: attributes.level
eq: error
- field: attributes.level
eq: warn
# Negate a condition
where:
not:
field: attributes.path
startsWith: "/internal"
| Condition | Description |
|---|---|
always: {} |
Always evaluates to true. |
never: {} |
Always evaluates to false. |
When partitioning data into child streams, conditions use the previous operators to define how to route documents to a child stream.
For example, the following routes documents to a child stream when attributes.filepath equals Linux.log:
field: attributes.filepath
eq: Linux.log
To enter conditions in YAML format when configuring a partition, turn on the Syntax editor under Condition in the Partitioning tab.
Condition blocks group processors so that they run only when they meet a condition. Use a condition step with nested steps:
steps:
- condition:
field: attributes.env
eq: production
steps:
- action: set
to: attributes.is_prod
value: true
- action: remove
from: attributes.debug_info
You can nest condition blocks for complex logic:
steps:
- condition:
field: attributes.source
eq: webserver
steps:
- action: grok
from: body.message
patterns:
- "%{IP:attributes.client_ip} %{WORD:attributes.method} %{URIPATHPARAM:attributes.path} %{NUMBER:attributes.status}"
- condition:
field: attributes.status
gte: 500
steps:
- action: set
to: attributes.alert_level
value: critical