Kafka inputedit

Use the kafka input to read from topics in a Kafka cluster.

To configure this input, specify a list of one or more hosts in the cluster to bootstrap the connection with, a list of topics to track, and a group_id for the connection.

Example configuration:

filebeat.inputs:
- type: kafka
  hosts:
    - kafka-broker-1:9092
    - kafka-broker-2:9092
  topics: ["my-topic"]
  group_id: "filebeat"

The following example shows how to use the kafka input to ingest data from Microsoft Azure Event Hubs that have Kafka compatibility enabled:

filebeat.inputs:
- type: kafka
  hosts: ["<your event hub namespace>.servicebus.windows.net:9093"]
  topics: ["<your event hub instance>"]
  group_id: "<your consumer group>"

  username: "$ConnectionString"
  password: "<your connection string>"
  ssl.enabled: true

For more details on the mapping between Kafka and Event Hubs configuration parameters, see the Azure documentation.

Compatibilityedit

This input works with all Kafka versions in between 0.11 and 2.1.0. Older versions might work as well, but are not supported.

Configuration optionsedit

The kafka input supports the following configuration options plus the Common options described later.

hostsedit

A list of Kafka bootstrapping hosts (brokers) for this cluster.

topicsedit

A list of topics to read from.

group_idedit

The Kafka consumer group id.

client_idedit

The Kafka client id (optional).

versionedit

The version of the Kafka protocol to use (defaults to "1.0.0").

initial_offsetedit

The initial offset to start reading, either "oldest" or "newest". Defaults to "oldest".

connect_backoffedit

How long to wait before trying to reconnect to the kafka cluster after a fatal error. Default is 30s.

consume_backoffedit

How long to wait before retrying a failed read. Default is 2s.

max_wait_timeedit

How long to wait for the minimum number of input bytes while reading. Default is 250ms.

wait_closeedit

When shutting down, how long to wait for in-flight messages to be delivered and acknowledged.

isolation_leveledit

This configures the Kafka group isolation level:

  • "read_uncommitted" returns all messages in the message channel.
  • "read_committed" hides messages that are part of an aborted transaction.

The default is "read_uncommitted".

fetchedit

Kafka fetch settings:

min
The minimum number of bytes to wait for. Defaults to 1.
default
The default number of bytes to read per request. Defaults to 1MB.
max
The maximum number of bytes to read per request. Defaults to 0 (no limit).

expand_event_list_from_fieldedit

If the fileset using this input expects to receive multiple messages bundled under a specific field then the config option expand_event_list_from_field value can be assigned the name of the field. For example in the case of azure filesets the events are found under the json object "records".

{
"records": [ {event1}, {event2}]
}

This setting will be able to split the messages under the group value (records) into separate events.

rebalanceedit

Kafka rebalance settings:

strategy
Either "range" or "roundrobin". Defaults to "range".
timeout
How long to wait for an attempted rebalance. Defaults to 60s.
max_retries
How many times to retry if rebalancing fails. Defaults to 4.
retry_backoff
How long to wait after an unsuccessful rebalance attempt. Defaults to 2s.

Common optionsedit

The following configuration options are supported by all inputs.

enablededit

Use the enabled option to enable and disable inputs. By default, enabled is set to true.

tagsedit

A list of tags that Filebeat includes in the tags field of each published event. Tags make it easy to select specific events in Kibana or apply conditional filtering in Logstash. These tags will be appended to the list of tags specified in the general configuration.

Example:

filebeat.inputs:
- type: kafka
  . . .
  tags: ["json"]
fieldsedit

Optional fields that you can specify to add additional information to the output. For example, you might add fields that you can use for filtering log data. Fields can be scalar values, arrays, dictionaries, or any nested combination of these. By default, the fields that you specify here will be grouped under a fields sub-dictionary in the output document. To store the custom fields as top-level fields, set the fields_under_root option to true. If a duplicate field is declared in the general configuration, then its value will be overwritten by the value declared here.

filebeat.inputs:
- type: kafka
  . . .
  fields:
    app_id: query_engine_12
fields_under_rootedit

If this option is set to true, the custom fields are stored as top-level fields in the output document instead of being grouped under a fields sub-dictionary. If the custom field names conflict with other field names added by Filebeat, then the custom fields overwrite the other fields.

processorsedit

A list of processors to apply to the input data.

See Processors for information about specifying processors in your config.

pipelineedit

The Ingest Node pipeline ID to set for the events generated by this input.

The pipeline ID can also be configured in the Elasticsearch output, but this option usually results in simpler configuration files. If the pipeline is configured both in the input and output, the option from the input is used.

keep_nulledit

If this option is set to true, fields with null values will be published in the output document. By default, keep_null is set to false.

indexedit

If present, this formatted string overrides the index for events from this input (for elasticsearch outputs), or sets the raw_index field of the event’s metadata (for other outputs). This string can only refer to the agent name and version and the event timestamp; for access to dynamic fields, use output.elasticsearch.index or a processor.

Example value: "%{[agent.name]}-myindex-%{+yyyy.MM.dd}" might expand to "filebeat-myindex-2019.11.01".

publisher_pipeline.disable_hostedit

By default, all events contain host.name. This option can be set to true to disable the addition of this field to all events. The default value is false.