Protocol Modulesedit

Packetbeat’s source code is split up in Go packages or modules. The protocol modules can be found in individual folders in the beats/packetbeat/protos directory in the Beats GitHub repository.

Before starting, we recommend reading through the source code of some of the existing modules. For TCP based protocols, the MySQL or HTTP ones are good models to follow. For UDP protocols, you can look at the DNS module.

All protocol modules implement the TcpProtocolPlugin or the UdpProtocolPlugin (or both) from the following listing (found in beats/packetbeat/protos/protos.go).

// Functions to be exported by a protocol plugin
type ProtocolPlugin interface {
	// Called to initialize the Plugin
	Init(test_mode bool, results publish.Transactions) error

	// Called to return the configured ports
	GetPorts() []int
}

type TcpProtocolPlugin interface {
	ProtocolPlugin

	// Called when TCP payload data is available for parsing.
	Parse(pkt *Packet, tcptuple *common.TcpTuple,
		dir uint8, private ProtocolData) ProtocolData

	// Called when the FIN flag is seen in the TCP stream.
	ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
		private ProtocolData) ProtocolData

	// Called when a packets are missing from the tcp
	// stream.
	GapInStream(tcptuple *common.TcpTuple, dir uint8, nbytes int,
		private ProtocolData) (priv ProtocolData, drop bool)

	// ConnectionTimeout returns the per stream connection timeout.
	// Return <=0 to set default tcp module transaction timeout.
	ConnectionTimeout() time.Duration
}

type UdpProtocolPlugin interface {
	ProtocolPlugin

	// ParseUdp is invoked when UDP payload data is available for parsing.
	ParseUdp(pkt *Packet)
}

At the high level, the protocols plugins receive raw packet data via the Parse() or ParseUdp() methods and produce objects that will be indexed into Elasticsearch.

The Parse() and ParseUdp() methods are called for every packet that is sniffed and is using one of the ports of the protocol as defined by the GetPorts() function. They receive the packet in a Packet object, which looks like this:

type Packet struct {
	Ts      time.Time 
	Tuple   common.IpPortTuple 
	Payload []byte 
}

The timestamp of the packet

The source IP address, source port, destination IP address, destination port combination.

The application layer payload (that is, without the IP and TCP/UDP headers) as a byte slice.

The objects are sent using the publisher.Client interface defined in the beats/libbeat/publisher directory in the Beats GitHub repository.

Besides the Parse() function, the TCP layer also calls the ReceivedFin() function when a TCP stream is closed, and it calls the GapInStream() function when packet loss is detected in a TCP stream. The protocol module can use these callbacks to make decisions about what to do with partial data that it receives. For example, for the HTTP/1.0 protocol, the end of connection is used to know when the message is finished.

Registering Your Pluginedit

To configure your plugin, you need to add a configuration struct to the Protocols struct in config/config.go. This struct will be filled by goyaml on startup.

type Protocols struct {
	Icmp     Icmp
	Dns      Dns
	Http     Http
	Memcache Memcache
	Mysql    Mysql
	Mongodb  Mongodb
	Pgsql    Pgsql
	Redis    Redis
	Thrift   Thrift
}

Next create an ID for the new plugin in protos/protos.go:

// Protocol constants.
const (
	UnknownProtocol Protocol = iota
	HttpProtocol
	MysqlProtocol
	RedisProtocol
	PgsqlProtocol
	ThriftProtocol
	MongodbProtocol
	DnsProtocol
	MemcacheProtocol
)

// Protocol names
var ProtocolNames = []string{
	"unknown",
	"http",
	"mysql",
	"redis",
	"pgsql",
	"thrift",
	"mongodb",
	"dns",
	"memcache",
}

The protocol names must be in the same order as their corresponding protocol IDs. Additionally the protocol name must match the configuration name.

Finally register your new protocol plugin in packetbeat.go EnabledProtocolPlugins:

var EnabledProtocolPlugins map[protos.Protocol]protos.ProtocolPlugin = map[protos.Protocol]protos.ProtocolPlugin{
	protos.HttpProtocol:     new(http.Http),
	protos.MemcacheProtocol: new(memcache.Memcache),
	protos.MysqlProtocol:    new(mysql.Mysql),
	protos.PgsqlProtocol:    new(pgsql.Pgsql),
	protos.RedisProtocol:    new(redis.Redis),
	protos.ThriftProtocol:   new(thrift.Thrift),
	protos.MongodbProtocol:  new(mongodb.Mongodb),
	protos.DnsProtocol:      new(dns.Dns),
}

Once the module is registered, it can be configured, and packets will be processed.

Before implementing all the logic for your new protocol module, it can be helpful to first register the module and implement the minimal plugin interface for printing a debug message on received packets. This way you can test the plugin registration to ensure that it’s working correctly.

The TCP Parse Functionedit

For TCP protocols, the Parse() function is the heart of the module. As mentioned earlier, this function is called for every TCP packet that contains data on the configured ports.

It is important to understand that because TCP is a stream-based protocol, the packet boundaries don’t necessarily match the application layer message boundaries. For example, a packet can contain only a part of the message, it can contain a complete message, or it can contain multiple messages.

If you see a packet in the middle of the stream, you have no guaranties that its first byte is the beginning of a message. However, if the packet is the first seen in a given TCP stream, then you can assume it is the beginning of the message.

The Parse() function needs to deal with these facts, which generally means that it needs to keep state across multiple packets.

Let’s have a look again at its signature:

func Parse(pkt *protos.Packet, tcptuple *common.TcpTuple, dir uint8,
	private protos.ProtocolData) protos.ProtocolData

We’ve already talked about the first parameter, which contains the packet data. The rest of the parameters and the return value are used for maintaining state inside the TCP stream.

The tcptuple is a unique identifier for the TCP stream that the packet is part of. You can use the tcptuple.Hashable() function to get a value that you can store in a map. The dir flag gives you the direction in which the packet is flowing inside the TCP stream. The two possible values are TcpDirectionOriginal if the packet goes in the same direction as the first packet from the stream and TcpDirectionReverse if the packet goes in the other direction.

The private parameter can be used by the module to store state in the TCP stream. The module would typically cast this at run time to a type of its choice, modify it as needed, and then return the modified value. The next time the TCP layer calls Parse() or another function from the TcpProtocolPlugin interface, it will call the function with the modified private value.

Here is an example of how the MySQL module handles the private data:

	priv := mysqlPrivateData{}
	if private != nil {
		var ok bool
		priv, ok = private.(mysqlPrivateData)
		if !ok {
			priv = mysqlPrivateData{}
		}
	}

	[ ... ]

	return priv

Most modules then use a logic similar to the following to deal with incomplete data (this example is also from MySQL):

		ok, complete := mysqlMessageParser(priv.Data[dir])
		if !ok {
			// drop this tcp stream. Will retry parsing with the next
			// segment in it
			priv.Data[dir] = nil
			logp.Debug("mysql", "Ignore MySQL message. Drop tcp stream.")
			return priv
		}

		if complete {
			mysql.messageComplete(tcptuple, dir, stream)
		} else {
			// wait for more data
			break
		}

The mysqlMessageParser() is the function that tries to parse a single MySQL message. Its implementation is MySQL-specific, so it’s not interesting to us for this guide. It returns two values: ok, which is false if there was a parsing error from which we cannot recover, and complete, which indicates whether a complete and valid message was separated from the stream. These two values are used for deciding what to do next. In case of errors, we drop the stream. If there are no errors, but the message is not yet complete, we do nothing and wait for more data. Finally, if the message is complete, we go to the next level.

This block of code is called in a loop so that it can separate multiple messages found in the same packet.

The UDP ParseUdp Functionedit

If the protocol you are working on is running on top of UDP, then all the complexities that TCP parser/decoders need to deal with around extracting messages from packets are no longer relevant.

For an example, see the ParseUdp() function from the DNS module.

Correlationedit

Most protocols that Packetbeat supports today are request-response oriented. Packetbeat indexes into Elasticsearch a document for each request-response pair (called a transaction). This way we can have data from the request and the response in the same document and measure the response time.

But this can be different for your protocol. For example for an asynchronous protocol like AMPQ, it makes more sense to index a document for every message, and then no correlation is necessary. On the other hand, for a session-based protocol like SIP, it might make sense to index a document for a SIP transaction or for a full SIP dialog, which can have more than two messages.

The TCP stream or UDP ports are usually good indicators that two messages belong to the same transactions. Therefore most protocol implementations in Packetbeat use a map with tcptuple maps for correlating the requests with the responses. One thing you should be careful about is to expire and remove from this map incomplete transactions. For example, we might see the request that has created an entry in the map, but if we never see the reply, we need to remove the request from memory on a timer, otherwise we risk leaking memory.

Sending the Resultedit

After the correlation step, you should have an JSON-like object that can be sent to Elasticsearch for indexing. You send the object by publishing it through the publisher client interface, which is received by the Init function. The publisher client accepts structures of type common.MapStr, which is essentially a map[string]interface{} with a few more convenience methods added (see the beats/libbeat/common package in the Beats GitHub repository).

As an example, here is the relevant code from the Redis module:

    event := common.MapStr{
		"@timestamp":   common.Time(requ.Ts),
		"type":         "redis",
		"status":       error,
		"responsetime": responseTime,
		"redis":        returnValue,
		"method":       common.NetString(bytes.ToUpper(requ.Method)),
		"resource":     requ.Path,
		"query":        requ.Message,
		"bytes_in":     uint64(requ.Size),
		"bytes_out":    uint64(resp.Size),
		"src":          src,
		"dst":          dst,
	}
	if redis.SendRequest {
		event["request"] = requ.Message
	}
	if redis.SendResponse {
		event["response"] = resp.Message
	}

    return event

The following fields are required and their presence will be checked by system tests:

  • @timestamp. Set this to the timestamp of the first packet from the message and cast it to common.Time like in the example.
  • type. Set this to the protocol name.
  • status. The status of the transactions. Use either common.OK_STATUS or common.ERROR_STATUS. If the protocol doesn’t have responses or a meaning of status code, use OK.
  • resource. This should represent what is requested, with the exact meaning depending on the protocol. For HTTP, this is the URL. For SQL databases, this is the table name. For key-value stores, this is the key. If nothing seems to make sense to put in this field, use the empty string.

Helpersedit

Parsing Helpersedit

In libbeat you also find some helpers for implementing parsers for binary and text-based protocols. The Bytes_* functions are the most low-level helpers for binary protocols that use network byte order. These functions can be found in the beats/libbeat/common module in the Beats GitHub repository. In addition to these very low-level helpers, a stream buffer for parsing TCP-based streams, or simply UDP packets with integrated error handling, is provided by beats/libbeat/common/streambuf. The following example demonstrates using the stream buffer for parsing the Memcache protocol UDP header:

func parseUdpHeader(buf *streambuf.Buffer) (mcUdpHeader, error) {
    var h mcUdpHeader
    h.requestId, _ = buf.ReadNetUint16()
    h.seqNumber, _ = buf.ReadNetUint16()
    h.numDatagrams, _ = buf.ReadNetUint16()
    buf.Advance(2) // ignore reserved
    return h, buf.Err()
}

The stream buffer is also used to implement the binary and text-based protocols for memcache.

	header := buf.Snapshot()
	buf.Advance(memcacheHeaderSize)

	msg := parser.message
	if msg.IsRequest {
		msg.vbucket, _ = header.ReadNetUint16At(6)
	} else {
		msg.status, _ = header.ReadNetUint16At(6)
	}

	cas, _ := header.ReadNetUint64At(16)
	if cas != 0 {
		setCasUnique(msg, cas)
	}
	msg.opaque, _ = header.ReadNetUint32At(12)

	// check message length

	extraLen, _ := header.ReadNetUint8At(4)
	keyLen, _ := header.ReadNetUint16At(2)
	totalLen, _ := header.ReadNetUint32At(8)

    [...]

	if extraLen > 0 {
		tmp, _ := buf.Collect(int(extraLen))
		extras := streambuf.NewFixed(tmp)
		var err error
		if msg.IsRequest && requestArgs != nil {
			err = parseBinaryArgs(parser, requestArgs, header, extras)
		} else if responseArgs != nil {
			err = parseBinaryArgs(parser, responseArgs, header, extras)
		}
		if err != nil {
			msg.AddNotes(err.Error())
		}
	}

	if keyLen > 0 {
		key, _ := buf.Collect(int(keyLen))
		keys := []memcacheString{memcacheString{key}}
		msg.keys = keys
	}

	if valueLen == 0 {
		return parser.yield(buf.BufferConsumed())
	}

The stream buffer also implements a number of interfaces defined in the standard "io" package and can easily be used to serialize some packets for testing parsers (see beats/packetbeat/protos/memcache/binary_test.go).

Module Helpersedit

Packetbeat provides the module beats/packetbeat/protos/applayer with common definitions among all application layer protocols. For example using the Transaction type from applayer guarantees that the final document will have all common required fields defined. Just embed the applayer.Transaction with your own application layer transaction type to make use of it. Here is an example from the memcache protocol:

	type transaction struct {
		applayer.Transaction

		command *commandType

		request  *message
		response *message
	}

	func (t *transaction) Event(event common.MapStr) error { // use applayer.Transaction to write common required fields
		if err := t.Transaction.Event(event); err != nil {
			logp.Warn("error filling generic transaction fields: %v", err)
			return err
		}

		mc := common.MapStr{}
		event["memcache"] = mc

        [...]

		return nil
	}

Use applayer.Message in conjunction with applayer.Transaction for creating the transaction and applayer.Stream to manage your stream buffers for parsing.