Engineering

# Elasticsearch Data Enrichment with Logstash: A Few Security Examples

Data enrichment provides additional insights into your data, and is frequently used in security use cases. Some common security-related questions include:

• Are there any botnet IPs visiting my web server?
• Do these IP addresses exist in an external DNSBL (DNS Blocklist)?
• Is there anyone in my organization visiting a known malware URL/domain?

There are 3 common ways to perform data enrichment in Logstash: Elasticsearch, DNS and translate filters. Whichever one you choose largely depends on the data feed for enrichment.

And if performance is a concern, the Logstash memcache plugin can help.

## Logstash Elasticsearch Filter

Firstly, let's examine the Logstash Elasticsearch filter. It enables lookups to be performed against Elasticsearch indexes. Fields of interest in the results can then be incorporated into the current event.

For instance, if your threat data feed is stored in an Elasticsearch index (e.g. logstash-input-blueliv), this is likely to be the easiest method for integration.

For example, assuming your botip list index has a structure like below:

PUT botlist/ip/1
{
"ip" : "1.1.1.1"
}

Then in the logstash elasticsearch filter, you'll need to set the query and the fields that you want to populate from your botlist index. In this case, it's using "botip" field to indicate it's found in the botlist index. Then, the mutate section will add a tag "botnet" into the final event.

filter {
elasticsearch {
hosts => "your_es_host"
index => "botlist"
query => "ip:%{message}"
fields => { "ip" => "botip" }
result_size => 1
enable_sort => false
}

if [botip] {
mutate {
remove_field => "botip"
}
}
}

If your system requires immediate security action during data ingestion, you can consider adding a percolate query in elasticsearch. Please refer to our official documentation on percolate query.

## Logstash DNS Filter

Up next, the Logstash DNS filter can be used to perform DNS lookups to resolve a domain to an IP address, or a reverse lookup to map an IP to a domain.

A DNSBL service sometimes requires a special DNS format for result lookups. For example, spamhaus.org uses a reverse IP address format to check whether an address is listed in their service.

The example below demonstrates how to re-format an IP address using a grok filter, then utilize the DNS filter to check whether the said domain is listed in zen.spamhaus.org:

filter {
grok {
}
mutate {
}
}
dns {
resolve => [ "spamhaus_reverse_lookup" ] <= perform lookup using the reverse address format.
nameserver => [ "10.0.1.1" ]
action => replace
}
if "dns_successful_lookup" in [tags] {
if [spamhaus_reverse_lookup] == "127.0.0.2" { <= 127.0.0.2 is a special return address from spamhaus that indicate the lookup address is listed in spamhaus spam database
mutate {
}
}
}
}

## Logstash Translate Filter

Last but not least, the Logstash translate filter can also achieve the lookup goal simply by using a dictionary to map matching values between columns/key-value pairs.

For example, Malware Domain List provides a free feed in CSV format. It can be converted as an input for the translate filter as follows:

$more malware.yaml "213.155.12.XXX/sec/bin/upload/v1crypted.exe": "true" <= true indicate it's a known malware URL "128.134.30.XXX/w.exe" : "true" "114.203.87.XXX/help.asp" : "true" With this, the below translate filter example can be used to detect whether a message/URL is listed in the malware feed: $ more malware.cfg
input {
stdin {
codec => json
}
}
filter {
translate {
field => "url"
destination => "malware" <= if a match is found, the translate filter will assign the mapped value to the destination field. In this example, if the value in url match an entry in the input CSV, malware field will assign a 'true' value.
dictionary_path => "malware.yaml"
}
}
output {
stdout {
codec => rubydebug
}
}

Input

{ "url" : "128.134.30.XXX/w.exe" }

Output

{
"@timestamp" => 2018-01-15T09:53:10.829Z,
<b>"malware" => "true",</b>
"url" => "128.134.30.XXX/w.exe",
"@version" => "1",
"host" => "localhost"
}

## Enrichment at Scale

While Elasticsearch and translate filters are good for smaller workloads, we can improve this by providing a scalable enrichment layer that doesn't hold state on individual Logstash nodes.

For example, with a recently updated memcache plugin prototype we can do extremely fast, non-blocking lookups on whatever it is we want to match. Examples could be malware requests, threat data (known bad IPs), or even assets (server IP to host lookups). The lookups to memcache are simple key/value lookups. The other benefit to memcache is that it will not block on updates. Since we have a very high read workload, we don't want to block lookups when we update our enrichment keys. Since memcache is volatile and will not persist across reboots, you should take this into account when refreshing the data and ensure that you store this information in a persistent store such that you can re-populate memcache as needed.

Utilizing memcache and memcache storage pools, we can scale and push as much traffic as we want. With proper configuration, a single memcache instance can conservatively handle 100k+ lookups per second. Per memcached docs, they suggest 200k+, but we've seen even higher numbers with proper tuning. When in a pool configuration, this will scale linearly.

The new logstash-filter-memcached plugin is a complete rewrite from other versions that have been published. This version supports the following

• Memcache Pools via Consistent Hashing
• Namespaces
• Multi-Get / Multi-Set

The Logstash team plans to release a supported memcached plugin with equivalent functionality in the future, but for now let's use the prototype and start with a sample common use case.

## Example: Botip Lookup

A common use case is looking up ips from a spam/bot feed:

filter {
memcached {
hosts => ["127.0.0.1:11211"]
get => {
"%{ip}" => "threat_src"
}
}
}

Note: This is just using a single memcache host for demonstration purposes

In this example, we would need to populate the IPs into memcache. The key would be the IP. So the actual request from memcache would look like this.

$telnet localhost 11211 set 1.1.1.1 0 900 5 data <= set 1.1.1.1 as a botip address, 900 is the entry exipration time botip <= this is the threat source name STORED get 1.1.1.1 <= double check VALUE 1.1.1.1 0 5 botip END Below is a complete example with logstash configuration file, input and output: $ more memcache.cfg
input {
stdin {
codec => json
}
}
filter {
memcached {
hosts => ["localhost:11211"]
get => {
"%{ip}" => "threat_src"
}
}
}
output {
stdout {
codec => rubydebug
}
}

Input

{ "ip" : "1.1.1.1" }

Output

{
"@timestamp" => 2018-01-17T03:54:36.642Z,
"host" => "localhost",
"ip" => "1.1.1.1",
<b>"threat_src" => "botip",</b>
"@version" => "1"
}

The value returned by memcached filter would be assigned to the variable threat_src and show up in your log as "threat_src" : "botip". If there is no match, the threat_src will not be added in the final record.

With any of these examples, it's up to you to maintain and populate memcache with the information you want to enrich. When using pools, ensure that the configuration is the same on both the nodes populating memcache, as well as the Logstash nodes that are subscribing to memcache nodes.

Generally, ingestion-time enrichment of data using Logstash provides analysts with easy and fast access to contextual information that helps them identify issues and other interesting artifacts in their data quickly. When the Elastic Stack is used for security analytics use cases, enrichment techniques, such as those shown in this post, can be extremely valuable for quickly identifying threat-related activity present in security-related event messages, enabling analysts to further investigate and begin their incident response processes.