Configure the Kafka output
editConfigure the Kafka output
editThe Kafka output sends events to Apache Kafka.
To use this output, edit the APM Server configuration file to disable the Elasticsearch output by commenting it out, and enable the Kafka output by uncommenting the Kafka section.
Example configuration:
output.kafka: # initial brokers for reading cluster metadata hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"] # message topic selection + partitioning topic: '%{[fields.log_topic]}' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000
Events bigger than max_message_bytes
will be dropped. To avoid this problem, make sure APM Server does not generate events bigger than max_message_bytes
.
Kibana configuration
editAPM Server uses the APM integration to set up and manage APM templates, policies, and pipelines. To confirm the integration is installed, APM Server polls either Elasticsearch or Kibana on startup. When using a non-Elasticsearch output, APM Server requires access to Kibana via the Kibana endpoint.
Example configuration:
apm-server: kibana: enabled: true host: "https://..." username: "elastic" password: "xxx"
Compatibility
editThis output works with all Kafka versions in between 0.11 and 2.2.2. Older versions might work as well, but are not supported.
Configuration options
editYou can specify the following options in the kafka
section of the apm-server.yml
config file:
enabled
The enabled
config is a boolean setting to enable or disable the output. If set
to false, the output is disabled.
The default value is false
.
hosts
The list of Kafka broker addresses from where to fetch the cluster metadata. The cluster metadata contain the actual Kafka brokers events are published to.
version
Kafka version apm-server is assumed to run against. Defaults to 1.0.0.
Event timestamps will be added, if version 0.10.0.0+ is enabled.
Valid values are all Kafka releases in between 0.8.2.0
and 2.0.0
.
See Compatibility for information on supported versions.
username
The username for connecting to Kafka. If username is configured, the password must be configured as well.
password
The password for connecting to Kafka.
sasl.mechanism
This functionality is in beta and is subject to change. The design and code is less mature than official GA features and is being provided as-is with no warranties. Beta features are not subject to the support SLA of official GA features.
The SASL mechanism to use when connecting to Kafka. It can be one of:
-
PLAIN
for SASL/PLAIN. -
SCRAM-SHA-256
for SCRAM-SHA-256. -
SCRAM-SHA-512
for SCRAM-SHA-512.
If sasl.mechanism
is not set, PLAIN
is used if username
and password
are provided. Otherwise, SASL authentication is disabled.
topic
The Kafka topic used for produced events.
You can set the topic dynamically by using a format string to access any
event field. For example, this configuration uses a custom field,
fields.log_topic
, to set the topic for each event:
topic: '%{[fields.log_topic]}'
See the topics
setting for other ways to set the
topic dynamically.
topics
An array of topic selector rules. Each rule specifies the topic
to use for
events that match the rule. During publishing, APM Server sets the topic
for each event based on the first matching rule in the array. Rules
can contain conditionals, format string-based fields, and name mappings. If the
topics
setting is missing or no rule matches, the
topic
field is used.
Rule settings:
-
topic
-
The topic format string to use. If this string contains field
references, such as
%{[fields.name]}
, the fields must exist, or the rule fails. -
mappings
-
A dictionary that takes the value returned by
topic
and maps it to a new name. -
default
-
The default string value to use if
mappings
does not find a match. -
when
- A condition that must succeed in order to execute the current rule.
The following example sets the topic based on whether the message field contains the specified string:
output.kafka: hosts: ["localhost:9092"] topic: "logs-%{[agent.version]}" topics: - topic: "critical-%{[agent.version]}" when.contains: message: "CRITICAL" - topic: "error-%{[agent.version]}" when.contains: message: "ERR"
This configuration results in topics named critical-8.15.2
,
error-8.15.2
, and logs-8.15.2
.
key
Optional formatted string specifying the Kafka event key. If configured, the event key can be extracted from the event using a format string.
See the Kafka documentation for the implications of a particular choice of key; by default, the key is chosen by the Kafka cluster.
partition
Kafka output broker event partitioning strategy. Must be one of random
,
round_robin
, or hash
. By default the hash
partitioner is used.
random.group_events
: Sets the number of events to be published to the same
partition, before the partitioner selects a new partition by random. The
default value is 1 meaning after each event a new partition is picked randomly.
round_robin.group_events
: Sets the number of events to be published to the
same partition, before the partitioner selects the next partition. The default
value is 1 meaning after each event the next partition will be selected.
hash.hash
: List of fields used to compute the partitioning hash value from.
If no field is configured, the events key
value will be used.
hash.random
: Randomly distribute events if no hash or key value can be computed.
All partitioners will try to publish events to all partitions by default. If a
partition’s leader becomes unreachable for the beat, the output might block. All
partitioners support setting reachable_only
to overwrite this
behavior. If reachable_only
is set to true
, events will be published to
available partitions only.
Publishing to a subset of available partitions potentially increases resource usage because events may become unevenly distributed.
client_id
The configurable client ID used for logging, debugging, and auditing purposes. The default is "beats".
worker
The number of concurrent load-balanced Kafka output workers.
codec
Output codec configuration. If the codec
section is missing, events will be JSON encoded.
See Change the output codec for more information.
metadata
Kafka metadata update settings. The metadata do contain information about brokers, topics, partition, and active leaders to use for publishing.
-
refresh_frequency
- Metadata refresh interval. Defaults to 10 minutes.
-
full
-
Strategy to use when fetching metadata, when this option is
true
, the client will maintain a full set of metadata for all the available topics, if the this option is set tofalse
it will only refresh the metadata for the configured topics. The default is false. -
retry.max
- Total number of metadata update retries when cluster is in middle of leader election. The default is 3.
-
retry.backoff
-
Waiting time between retries during leader elections. Default is
250ms
.
max_retries
The number of times to retry publishing an event after a publishing failure. After the specified number of retries, the events are typically dropped.
Set max_retries
to a value less than 0 to retry until all events are published.
The default is 3.
backoff.init
The number of seconds to wait before trying to republish to Kafka
after a network error. After waiting backoff.init
seconds, APM Server
tries to republish. If the attempt fails, the backoff timer is increased
exponentially up to backoff.max
. After a successful publish, the backoff
timer is reset. The default is 1s
.
backoff.max
The maximum number of seconds to wait before attempting to republish to
Kafka after a network error. The default is 60s
.
bulk_max_size
The maximum number of events to bulk in a single Kafka request. The default is 2048.
bulk_flush_frequency
Duration to wait before sending bulk Kafka request. 0 is no delay. The default is 0.
timeout
The number of seconds to wait for responses from the Kafka brokers before timing out. The default is 30 (seconds).
broker_timeout
The maximum duration a broker will wait for number of required ACKs. The default is 10s
.
channel_buffer_size
Per Kafka broker number of messages buffered in output pipeline. The default is 256.
keep_alive
The keep-alive period for an active network connection. If 0s
, keep-alives are disabled. The default is 0s
.
compression
Sets the output compression codec. Must be one of none
, snappy
, lz4
and gzip
. The default is gzip
.
Known issue with Azure Event Hub for Kafka
When targeting Azure Event Hub for Kafka, set compression
to none
as the provided codecs are not supported.
compression_level
Sets the compression level used by gzip. Setting this value to 0 disables compression. The compression level must be in the range of 1 (best speed) to 9 (best compression).
Increasing the compression level will reduce the network usage but will increase the CPU usage.
The default value is 4.
max_message_bytes
The maximum permitted size of JSON-encoded messages. Bigger messages will be dropped. The default value is 1000000 (bytes). This value should be equal to or less than the broker’s message.max.bytes
.
required_acks
The ACK reliability level required from broker. 0=no response, 1=wait for local commit, -1=wait for all replicas to commit. The default is 1.
Note: If set to 0, no ACKs are returned by Kafka. Messages might be lost silently on error.
enable_krb5_fast
This functionality is in beta and is subject to change. The design and code is less mature than official GA features and is being provided as-is with no warranties. Beta features are not subject to the support SLA of official GA features.
Enable Kerberos FAST authentication. This may conflict with some Active Directory installations. It is separate from the standard Kerberos settings because this flag only applies to the Kafka output. The default is false
.
ssl
Configuration options for SSL parameters like the root CA for Kafka connections.
The Kafka host keystore should be created with the
-keyalg RSA
argument to ensure it uses a cipher supported by
Filebeat’s Kafka library.
See SSL/TLS output settings for more information.