rabbitmqedit

Pull events from a RabbitMQ queue.

The default settings will create an entirely transient queue and listen for all messages by default. If you need durability or any other advanced settings, please set the appropriate options

This plugin uses the March Hare library for interacting with the RabbitMQ server. Most configuration options map directly to standard RabbitMQ and AMQP concepts. The AMQP 0-9-1 reference guide and other parts of the RabbitMQ documentation are useful for deeper understanding.

The properties of messages received will be stored in the [@metadata][rabbitmq_properties] field if the @metadata_enabled setting is checked. Note that storing metadata may degrade performance. The following properties may be available (in most cases dependent on whether they were set by the sender):

  • app-id
  • cluster-id
  • consumer-tag
  • content-encoding
  • content-type
  • correlation-id
  • delivery-mode
  • exchange
  • expiration
  • message-id
  • priority
  • redeliver
  • reply-to
  • routing-key
  • timestamp
  • type
  • user-id

For example, to get the RabbitMQ message’s timestamp property into the Logstash event’s @timestamp field, use the date filter to parse the [@metadata][rabbitmq_properties][timestamp] field:

   filter {
     if [@metadata][rabbitmq_properties][timestamp] {
       date {
         match => ["[@metadata][rabbitmq_properties][timestamp]", "UNIX"]
       }
     }
   }

Additionally, any message headers will be saved in the [@metadata][rabbitmq_headers] field.

 

Synopsisedit

This plugin supports the following configuration options:

Required configuration options:

rabbitmq {
    host => ...
    subscription_retry_interval_seconds => ...
}

Available configuration options:

Setting Input type Required Default value

ack

boolean

No

true

add_field

hash

No

{}

arguments

array

No

{}

auto_delete

boolean

No

false

automatic_recovery

boolean

No

true

codec

codec

No

"json"

connect_retry_interval

number

No

1

connection_timeout

number

No

durable

boolean

No

false

exchange

string

No

exclusive

boolean

No

false

heartbeat

number

No

host

string

Yes

key

string

No

"logstash"

metadata_enabled

boolean

No

false

passive

boolean

No

false

password

password

No

"guest"

port

number

No

5672

prefetch_count

number

No

256

queue

string

No

""

ssl

boolean

No

false

subscription_retry_interval_seconds

number

Yes

5

tags

array

No

threads

number

No

1

type

string

No

user

string

No

"guest"

verify_ssl

boolean

No

false

vhost

string

No

"/"

Detailsedit

 

ackedit

  • Value type is boolean
  • Default value is true

Enable message acknowledgements. With acknowledgements messages fetched by Logstash but not yet sent into the Logstash pipeline will be requeued by the server if Logstash shuts down. Acknowledgements will however hurt the message throughput.

This will only send an ack back every prefetch_count messages. Working in batches provides a performance boost here.

add_fieldedit

  • Value type is hash
  • Default value is {}

Add a field to an event

argumentsedit

  • Value type is array
  • Default value is {}

Extra queue arguments as an array. To make a RabbitMQ queue mirrored, use: {"x-ha-policy" => "all"}

auto_deleteedit

  • Value type is boolean
  • Default value is false

Should the queue be deleted on the broker when the last consumer disconnects? Set this option to false if you want the queue to remain on the broker, queueing up messages until a consumer comes along to consume them.

automatic_recoveryedit

  • Value type is boolean
  • Default value is true

Set this to automatically recover from a broken connection. You almost certainly don’t want to override this!!!

codecedit

  • Value type is codec
  • Default value is "json"

The codec used for input data. Input codecs are a convenient method for decoding your data before it enters the input, without needing a separate filter in your Logstash pipeline.

connect_retry_intervaledit

  • Value type is number
  • Default value is 1

Time in seconds to wait before retrying a connection

connection_timeoutedit

  • Value type is number
  • There is no default value for this setting.

The default connection timeout in milliseconds. If not specified the timeout is infinite.

debug (DEPRECATED)edit

  • DEPRECATED WARNING: This configuration item is deprecated and may not be available in future versions.
  • Value type is boolean
  • Default value is false

Enable or disable logging

durableedit

  • Value type is boolean
  • Default value is false

Is this queue durable? (aka; Should it survive a broker restart?)

exchangeedit

  • Value type is string
  • There is no default value for this setting.

The name of the exchange to bind the queue to.

exclusiveedit

  • Value type is boolean
  • Default value is false

Is the queue exclusive? Exclusive queues can only be used by the connection that declared them and will be deleted when it is closed (e.g. due to a Logstash restart).

heartbeatedit

  • Value type is number
  • There is no default value for this setting.

Heartbeat delay in seconds. If unspecified no heartbeats will be sent

hostedit

  • This is a required setting.
  • Value type is string
  • There is no default value for this setting.

RabbitMQ server address

keyedit

  • Value type is string
  • Default value is "logstash"

The routing key to use when binding a queue to the exchange. This is only relevant for direct or topic exchanges.

  • Routing keys are ignored on fanout exchanges.
  • Wildcards are not valid on direct exchanges.

metadata_enablededit

  • Value type is boolean
  • Default value is false

Enable the storage of message headers and properties in @metadata. This may impact performance

passiveedit

  • Value type is boolean
  • Default value is false

If true the queue will be passively declared, meaning it must already exist on the server. To have Logstash create the queue if necessary leave this option as false. If actively declaring a queue that already exists, the queue options for this plugin (durable etc) must match those of the existing queue.

passwordedit

  • Value type is password
  • Default value is "guest"

RabbitMQ password

portedit

  • Value type is number
  • Default value is 5672

RabbitMQ port to connect on

prefetch_countedit

  • Value type is number
  • Default value is 256

Prefetch count. If acknowledgements are enabled with the ack option, specifies the number of outstanding unacknowledged

queueedit

  • Value type is string
  • Default value is ""

The name of the queue Logstash will consume events from. If left empty, a transient queue with an randomly chosen name will be created.

ssledit

  • Value type is boolean
  • Default value is false

Enable or disable SSL

subscription_retry_interval_secondsedit

  • This is a required setting.
  • Value type is number
  • Default value is 5

Amount of time in seconds to wait after a failed subscription request before retrying. Subscribes can fail if the server goes away and then comes back

tagsedit

  • Value type is array
  • There is no default value for this setting.

Add any number of arbitrary tags to your event.

This can help with processing later.

threadsedit

  • Value type is number
  • Default value is 1

typeedit

  • Value type is string
  • There is no default value for this setting.

Add a type field to all events handled by this input.

Types are used mainly for filter activation.

The type is stored as part of the event itself, so you can also use the type to search for it in Kibana.

If you try to set a type on an event that already has one (for example when you send an event from a shipper to an indexer) then a new input will not override the existing type. A type set at the shipper stays with that event for its life even when sent to another Logstash server.

useredit

  • Value type is string
  • Default value is "guest"

RabbitMQ username

verify_ssledit

  • Value type is boolean
  • Default value is false

Validate SSL certificate

vhostedit

  • Value type is string
  • Default value is "/"

The vhost to use. If you don’t know what this is, leave the default.