Kafka output settingsedit

Specify these settings to send data over a secure connection to Kafka. In the Fleet Output settings, make sure that the Kafka output type is selected.

This functionality is in technical preview and may be changed or removed in a future release. Elastic will work to fix any issues, but features in technical preview are not subject to the support SLA of official GA features.

Kafka output is currently not supported on Elastic Agents using the Elastic Defend integration.

General settingsedit

Kafka version

The Kafka protocol version that Elastic Agent will request when connecting. Defaults to 1.0.0. Currently Kafka versions from 0.8.2.0 to 2.6.0 are supported, however the latest Kafka version (3.x.x) is expected to be compatible when version 2.6.0 is selected.

Hosts

The addresses your Elastic Agents will use to connect to one or more Kafka brokers. Use the format host:port (without any protocol http://). Click Add row to specify additional addresses.

Examples:

  • localhost:9092
  • mykafkahost:9092

Refer to the Fleet Server documentation for default ports and other configuration details.

Authentication settingsedit

Select the mechanism that Elastic Agent uses to authenticate with Kafka.

None

No authentication is used between Elastic Agent and Kafka. This is the default option. In production, it’s recommended to have an authentication method selected.

Plaintext

Set this option for traffic between Elastic Agent and Kafka to be sent as plaintext, without any transport layer security.

This is the default option when no authentication is set.

Encryption

Set this option for traffic between Elastic Agent and Kafka to use transport layer security.

When Encryption is selected, the Server SSL certificate authorities and Verification mode mode options become available.

Username / Password

Connect to Kafka with a username and password.

Provide your username and password, and select a SASL (Simple Authentication and Security Layer) mechanism for your login credentials.

When SCRAM is enabled, Elastic Agent uses the SCRAM mechanism to authenticate the user credential. SCRAM is based on the IETF RFC5802 standard which describes a challenge-response mechanism for authenticating users.

  • Plain - SCRAM is not used to authenticate
  • SCRAM-SHA-256 - uses the SHA-256 hashing function
  • SCRAM-SHA-512 - uses the SHA-512 hashing function

To prevent unauthorized access your Kafka password is stored as a secret value. While secret storage is recommended, you can choose to override this setting and store the password as plain text in the agent policy definition. Secret storage requires Fleet Server version 8.12 or higher.

Note that this setting can also be stored as a secret value or as plain text for preconfigured outputs. See Preconfiguration settings in the Kibana Guide to learn more.

SSL

Authenticate using the Secure Sockets Layer (SSL) protocol. Provide the following details for your SSL certificate:

Client SSL certificate

The certificate generated for the client. Copy and paste in the full contents of the certificate. This is the certificate that all the agents will use to connect to Kafka.

In cases where each client has a unique certificate, the local path to that certificate can be placed here. The agents will pick the certificate in that location when establishing a connection to Kafka.

Client SSL certificate key

The private key generated for the client. This must be in PKCS 8 key. Copy and paste in the full contents of the certificate key. This is the certificate key that all the agents will use to connect to Kafka.

In cases where each client has a unique certificate key, the local path to that certificate key can be placed here. The agents will pick the certificate key in that location when establishing a connection to Kafka.

To prevent unauthorized access the certificate key is stored as a secret value. While secret storage is recommended, you can choose to override this setting and store the key as plain text in the agent policy definition. Secret storage requires Fleet Server version 8.12 or higher.

Note that this setting can also be stored as a secret value or as plain text for preconfigured outputs. See Preconfiguration settings in the Kibana Guide to learn more.

Server SSL certificate authorities

The CA certificate to use to connect to Kafka. This is the CA used to generate the certificate and key for Kafka. Copy and paste in the full contents for the CA certificate.

This setting is optional. This setting is not available when the authentication None and Plaintext options are selected.

Click Add row to specify additional certificate authories.

Verification mode

Controls the verification of server certificates. Valid values are:

Full
Verifies that the provided certificate is signed by a trusted authority (CA) and also verifies that the server’s hostname (or IP address) matches the names identified within the certificate.
None
Performs no verification of the server’s certificate. This mode disables many of the security benefits of SSL/TLS and should only be used after cautious consideration. It is primarily intended as a temporary diagnostic mechanism when attempting to resolve TLS errors; its use in production environments is strongly discouraged.
Strict
Verifies that the provided certificate is signed by a trusted authority (CA) and also verifies that the server’s hostname (or IP address) matches the names identified within the certificate. If the Subject Alternative Name is empty, it returns an error.
Certificate
Verifies that the provided certificate is signed by a trusted authority (CA), but does not perform any hostname verification.

The default value is Full. This setting is not available when the authentication None and Plaintext options are selected.

Partitioning settingsedit

The number of partitions created is set automatically by the Kafka broker based on the list of topics. Records are then published to partitions either randomly, in round-robin order, or according to a calculated hash.

Random

Publish records to Kafka output broker event partitions randomly. Specify the number of events to be published to the same partition before the partitioner selects a new partition.

Round robin

Publish records to Kafka output broker event partitions in a round-robin fashion. Specify the number of events to be published to the same partition before the partitioner selects a new partition.

Hash

Publish records to Kafka output broker event partitions based on a hash computed from the specified list of fields. If a field is not specified, the Kafka event key value is used.

Topics settingsedit

Use these options to dynamically set the Kafka topic for each Elastic Agent event.

Default topic

Set the default topic to use. Click Add topic processor to specify additional processors to set topics based on event contents.

Processors

For each processor provide a condition, the event value to check against, and the resulting Kafka topic.

Refer to conditions in the Elastic Agent processor syntax for condition descriptions. Currently the equals, contains, and regexp conditions are available.

Events that don’t match against any defined processor are set to the default topic.

Processors are applied in the order that they appear, from top to bottom.

The value field must be specified in a [key]: value format with both the key and value being strings. For example, host.port: 2000 or message: Test.

Quotation marks are included in the match string. That is, they should be specified in the key or value only if it’s expected for them to be included in the events being matched against. So, message: "error" will match against the literal string "error", including the quotations marks, and not against the unquoted error. This applies to both single (') and double (") quotation marks.

As an example for setting up your processors, you might want to route log events based on severity. To do so, you can specify a default topic for all events not matched by other processors:

  • %{[fields.log_topic]}.

Then, create a processor to route critical events:

  • Condition: Contains
  • Value: message: “CRITICAL”
  • Topic: critical-%{[agent.version]}

And create another processor to route error events:

  • Condition: Contains
  • Value: message: “ERR”
  • Topic: error-%{[agent.version]}

All non-critical and non-error events will then route to the default %{[fields.log_topic]} topic.

Header settingsedit

A header is a key-value pair, and multiple headers can be included with the same key. Only string values are supported. These headers will be included in each produced Kafka message.

Key

The key to set in the Kafka header.

Value

The value to set in the Kafka header.

Click Add header to configure additional headers to be included in each Kafka message.

Client ID

The configurable ClientID used for logging, debugging, and auditing purposes. The default is Elastic. The Client ID is part of the protocol to identify where the messages are coming from.

Compression settingsedit

You can enable compression to reduce the volume of Kafka output.

Codec

Select a compression codec to use. Supported codecs are snappy, lz4 and gzip.

Level

For the gzip codec you can choose a compression level. The level must be in the range of 1 (best speed) to 9 (best compression).

Increasing the compression level reduces the network usage but increases the CPU usage. The default value is 4.

Broker settingsedit

Configure timeout and buffer size values for the Kafka brokers.

Broker timeout

The maximum length of time a Kafka broker waits for the required number of ACKs before timing out (see the ACK reliability setting further in). The default is 30 seconds.

Broker reachability timeout

The maximum length of time that an Elastic Agent waits for a response from a Kafka broker before timing out. The default is 30 seconds.

ACK reliability

The ACK reliability level required from broker. Options are:

  • Wait for local commit
  • Wait for all replicas to commit
  • Do not wait

The default is Wait for local commit.

Note that if ACK reliability is set to Do not wait no ACKs are returned by Kafka. Messages might be lost silently in the event of an error.

Other settingsedit

Key

An optional formatted string specifying the Kafka event key. If configured, the event key can be extracted from the event using a format string.

See the Kafka documentation for the implications of a particular choice of key; by default, the key is chosen by the Kafka cluster.

Proxy

Select a proxy URL for Elastic Agent to connect to Kafka. To learn about proxy configuration, refer to Using a proxy server with Elastic Agent and Fleet.

Advanced YAML configuration

YAML settings that will be added to the Kafka output section of each policy that uses this output. Make sure you specify valid YAML. The UI does not currently provide validation.

See Advanced YAML configuration for descriptions of the available settings.

Make this output the default for agent integrations

When this setting is on, Elastic Agents use this output to send data if no other output is set in the agent policy.

Make this output the default for agent monitoring

When this setting is on, Elastic Agents use this output to send agent monitoring data if no other output is set in the agent policy.

Advanced YAML configurationedit

Setting Description

backoff.init

(string) The number of seconds to wait before trying to reconnect to Kafka after a network error. After waiting backoff.init seconds, Elastic Agent tries to reconnect. If the attempt fails, the backoff timer is increased exponentially up to backoff.max. After a successful connection, the backoff timer is reset.

Default: 1s

backoff.max

(string) The maximum number of seconds to wait before attempting to connect to Kafka after a network error.

Default: 60s

bulk_max_size

(int) The maximum number of events to bulk in a single Kafka request.

Default: 2048

bulk_flush_frequency

(int) Duration to wait before sending bulk Kafka request. 0` is no delay.

Default: 0

channel_buffer_size

(int) Per Kafka broker number of messages buffered in output pipeline.

Default: 256

client_id

(string) The configurable ClientID used for logging, debugging, and auditing purposes.

Default: Elastic Agent

codec

Output codec configuration. You can specify either the json or format codec. By default the json codec is used.

json.pretty: If pretty is set to true, events will be nicely formatted. The default is false.

json.escape_html: If escape_html is set to true, html symbols will be escaped in strings. The default is false.

Example configuration that uses the json codec with pretty printing enabled to write events to the console:

output.console:
  codec.json:
    pretty: true
    escape_html: false

format.string: Configurable format string used to create a custom formatted message.

Example configurable that uses the format codec to print the events timestamp and message field to console:

output.console:
  codec.format:
    string: '%{[@timestamp]} %{[message]}'

Default: json

keep_alive

(string) The keep-alive period for an active network connection. If 0s, keep-alives are disabled.

Default: 0s

max_message_bytes

(int) The maximum permitted size of JSON-encoded messages. Bigger messages will be dropped. This value should be equal to or less than the broker’s message.max.bytes.

Default: 1000000 (bytes)

metadata

Kafka metadata update settings. The metadata contains information about brokers, topics, partition, and active leaders to use for publishing.

refresh_frequency
Metadata refresh interval. Defaults to 10 minutes.
full
Strategy to use when fetching metadata. When this option is true, the client will maintain a full set of metadata for all the available topics. When set to false it will only refresh the metadata for the configured topics. The default is false.
retry.max
Total number of metadata update retries. The default is 3.
retry.backoff
Waiting time between retries. The default is 250ms.

queue.mem.events

The number of events the queue can store. This value should be evenly divisible by queue.mem.flush.min_events to avoid sending partial batches to the output.

Default: 3200 events

queue.mem.flush.min_events

The minimum number of events required for publishing. If this value is set to 0 or 1, events are available to the output immediately. If this value is greater than 1 the output must wait for the queue to accumulate this minimum number of events or for queue.mem.flush.timeout to expire before publishing. When greater than 1 this value also defines the maximum possible batch that can be sent by the output.

Default: 1600 events

queue.mem.flush.timeout

(int) The maximum wait time for queue.mem.flush.min_events to be fulfilled. If set to 0s, events are available to the output immediately.

Default: 10s