Loading

Streamlang

Streamlang is a YAML domain-specific language (DSL) for defining stream processing and routing logic. Streamlang provides a consistent processing interface that can be converted to multiple execution targets, including Elasticsearch ingest pipelines and ES|QL. This allows processing to run at ingest time or query time without rewriting rules.

You can write Streamlang directly using the YAML editing mode in the Processing tab or the interactive mode which generates Streamlang behind the scenes.

A Streamlang configuration is a YAML document with a single top-level steps array. Each step is either an action block (processor) or a condition block:

steps:
  - action: <processor_type>
    # processor-specific parameters
  - action: <processor_type>
    # processor-specific parameters
    where:
      # optional condition
  - condition:
      field: <field_path>
      eq: <value>
      steps:
        - action: <processor_type>
          # nested processor
		

Steps run in order. Each processor transforms input documents, and passes results to the next step.

Processors are the building blocks of a Streamlang configuration. Each processor has an action field that specifies an operation to perform.

All processors support the following common options:

Option Type Description
description string A human-readable description of the processor.
ignore_failure boolean When true, document processing continues even if this processor fails.
where condition A condition that the processor must meet to run.

The following table lists all available processors. Refer to the individual processor pages for YAML parameters and examples.

Action Description
append Adds values to an array field, or creates the field as an array if it doesn't exist.
concat Concatenates a mix of field values and literal strings into a single field.
convert Converts a field value to a different data type.
date Parses date strings into timestamps.
dissect Parses structured text using delimiter-based patterns.
drop_document Prevents indexing of a document from based on a condition.
grok Parses unstructured text using predefined or custom patterns.
join Concatenates the values of multiple fields with a delimiter.
lowercase Converts a string field to lowercase.
math Evaluates an arithmetic expression and stores the result.
network_direction Determines network traffic direction based on source and destination IP addresses.
redact Redacts sensitive data in a string field by matching patterns.
remove Removes a field from the document.
remove_by_prefix Removes a field and all nested fields matching a prefix.
rename Moves a field's value to a new field name and removes the original.
replace Replaces portions of a string field that match a regular expression.
set Assigns a value to a field, creating the field if it doesn't exist.
trim Removes leading and trailing whitespace from a string field.
uppercase Converts a string field to uppercase.

Conditions are Boolean expressions that control when processors run and how wired streams route data into partitions. They appear in where clauses on processors, in condition blocks, and in stream partitioning.

Each comparison condition specifies a field and an operator with a value:

Operator Description Example value
eq Equals "active", 200
neq Not equals "error"
lt Less than 100
lte Less than or equal to 100
gt Greater than 0
gte Greater than or equal to 1
contains Field value contains the substring "error"
startsWith Field value starts with the string "/api"
endsWith Field value ends with the string ".log"
includes Multivalue field includes the value "admin"

Use range to match values within a numeric range. You can combine any of gt, gte, lt, and lte:

where:
  field: attributes.status_code
  range:
    gte: 200
    lt: 300
		

Use exists to check whether a field is present:

# Field must exist
where:
  field: attributes.user_id
  exists: true

# Field must not exist
where:
  field: attributes.temp
  exists: false
		

Combine conditions using and, or, and not:

# All conditions must be true
where:
  and:
    - field: attributes.env
      eq: production
    - field: attributes.level
      eq: error

# At least one condition must be true
where:
  or:
    - field: attributes.level
      eq: error
    - field: attributes.level
      eq: warn

# Negate a condition
where:
  not:
    field: attributes.path
    startsWith: "/internal"
		
Condition Description
always: {} Always evaluates to true.
never: {} Always evaluates to false.

When partitioning data into child streams, conditions use the previous operators to define how to route documents to a child stream.

For example, the following routes documents to a child stream when attributes.filepath equals Linux.log:

field: attributes.filepath
eq: Linux.log
		

To enter conditions in YAML format when configuring a partition, turn on the Syntax editor under Condition in the Partitioning tab.

Condition blocks group processors so that they run only when they meet a condition. Use a condition step with nested steps:

steps:
  - condition:
      field: attributes.env
      eq: production
      steps:
        - action: set
          to: attributes.is_prod
          value: true
        - action: remove
          from: attributes.debug_info
		

You can nest condition blocks for complex logic:

steps:
  - condition:
      field: attributes.source
      eq: webserver
      steps:
        - action: grok
          from: body.message
          patterns:
            - "%{IP:attributes.client_ip} %{WORD:attributes.method} %{URIPATHPARAM:attributes.path} %{NUMBER:attributes.status}"
        - condition:
            field: attributes.status
            gte: 500
            steps:
              - action: set
                to: attributes.alert_level
                value: critical