21 octobre 2014 Technique

Use ELK to Visualise Security Data: IPTables and KippoSSH Honeypot

Par Antonio Bonuccelli

Among the countless possible use cases where ELK can help save the day, displaying security-relevant data is certainly a very interesting one. In this blog post, using a virtual machine sitting on the cloud, we're going to show how to quickly set up a clustered instance of Elasticsearch to visualise firewall and honeypot datasources, namely IPtables and KippoSSH, focusing on the ELK-relevant configuration bits.

KippoSSH is a medium interaction honeypot capable of recording plenty of information about the attacker, including interactive TTY sessions recordings; for the purpose of this blog post, we'll leave that latter piece of info aside, and focus on making sense of some brute force data. Starting from the live raw data, we have logs containing:

  • Denied TCP and UDP connection;
  • Brute force SSH login attacks;

These look like:


elk@debianVM:~$ tail -3 /var/log/kern.log
Sep 24 13:52:06 debianVM kernel: iptables denied: IN=eth0 OUT= MAC=f2:3c:91:73:6c:71:84:78:ac:0d:8f:41:08:00 SRC= DST= LEN=48 TOS=0x00 PREC=0x00 TTL=110 ID=6886 PROTO=TCP SPT=37377 DPT=3306
Sep 24 13:52:44 debianVM kernel: iptables denied: IN=eth0 OUT= MAC=f2:3c:91:73:6c:71:84:78:ac:0d:8f:41:08:00 SRC= DST= LEN=40 TOS=0x00 PREC=0x00 TTL=107 ID=54661 PROTO=TCP SPT=6000 DPT=1433
Sep 24 13:55:22 debianVM kernel: iptables denied: IN=eth0 OUT= MAC=f2:3c:91:73:6c:71:84:78:ac:0d:8f:41:08:00 SRC= DST= LEN=44 TOS=0x00 PREC=0x00 TTL=57 ID=55765 PROTO=TCP SPT=80 DPT=5568

#kippo SSH

elk@debianVM:~$ egrep 'New connection' -A10 /opt/kippo/kippo-master/log/kippo.log | tail -11
2014-09-24 15:00:51+0100 [kippo.core.honeypot.HoneyPotSSHFactory] New connection: ( [session: 660]
2014-09-24 15:00:51+0100 [HoneyPotTransport,660,] Remote SSH version: SSH-2.0-libssh2_1.4.1
2014-09-24 15:00:51+0100 [HoneyPotTransport,660,] kex alg, key alg: diffie-hellman-group1-sha1 ssh-rsa
2014-09-24 15:00:51+0100 [HoneyPotTransport,660,] outgoing: aes128-ctr hmac-sha1 none
2014-09-24 15:00:51+0100 [HoneyPotTransport,660,] incoming: aes128-ctr hmac-sha1 none
2014-09-24 15:00:52+0100 [HoneyPotTransport,660,] NEW KEYS
2014-09-24 15:00:52+0100 [HoneyPotTransport,660,] starting service ssh-userauth
2014-09-24 15:00:53+0100 [SSHService ssh-userauth on HoneyPotTransport,660,] root trying auth password
2014-09-24 15:00:53+0100 [SSHService ssh-userauth on HoneyPotTransport,660,] login attempt [root/123456] failed
2014-09-24 15:00:54+0100 [-] root failed auth password
2014-09-24 15:00:54+0100 [-] unauthorized login:

Lots of interesting information that can be extracted from the above data.

Event collection and processing: Logstash

Logstash is an awesome piece of software and the first layer of the ELK stack, where the journey of an event begins.

Starting from the raw data, we want to be able to:

  1. identify relevant events by matching chosen patterns;
  2. extract relevant tokens to leverage them in our searches and dashboards later;
  3. send them to our chosen destination (elasticsearch cluster in this case) to perform analytics and investigations;

So we want to extract fields of interest like source IP, destination port, target usernames and passwords to name the obvious ones.

For sake of brevity, we will go specifically after only two log entries here, one from IPTables:

Sep 24 15:42:03 debianVM kernel: iptables denied: IN=eth0 OUT= MAC=f2:3c:91:73:6c:71:84:78:ac:0d:8f:41:08:00 SRC= DST= LEN=48 TOS=0x00 PREC=0x00 TTL=110 ID=6886 PROTO=TCP SPT=37377 DPT=3306

and one from KippoSSH:

2014-09-24 15:00:53+0100 [SSHService ssh-userauth on HoneyPotTransport,660,] login attempt [root/123456] failed

We'll need to tell Logstash where the logs are located and what to do with them, below is our logstash config file ($LOGSTASH_HOME/config/logstash.conf). Notice the three different sections Input, Filter, Output:

#input section, what data do we want to collect 
input { 
       file { 
              type => "linux-syslog" 
              path => "/var/log/kern.log" 
       file { 
              type => "honey-kippo" 
              path => "/opt/kippo/kippo-master/log/kippo.log" 
#filter section, what to do with the data, process it , enrich it... 
filter { 
        if [type] == "linux-syslog" {
                 grok { 
                        #for linux-syslog type events use Grok definition below
                        #to match the messages and extract fields of interest 
                        match => [ "message", "%{IPTABLES_DENIED}"] 
                 date { 
                        #use the field timestamp to match event time and               
                        #populate @timestamp field (used by Elasticsearch) 
                        match => [ "timestamp", "MMM dd HH:mm:ss"] 
                        timezone => "Europe/London" 
        } else if [type] == "honey-kippo" { 
                 grok { 
                        #like above, but using three separate 
                        #Grok definitions (see Grok definitions later) 
                        match => [ "message", "%{KIPPO_TIMESTAMP}\+\d+\s%{KIPPO_BODY_SSHSERVICE}\s%{KIPPO_MSG_LOGIN}" ] 
                 date { 
                       match => [ "timestamp", "YYYY-MM-dd HH:mm:ss" ] 
                       timezone => "Europe/London" } 
        geoip { 
                 #enrich both event types with geo fields based on the 
                 #src_ip field for analytics and drawing pretty maps 
                 source => "src_ip" 
#output section, where do we send the data 
output { 
         #events failing to match Grok definitions will be 
         #automatically tagged with '_grokparsefailure' 
         #in this case we want to send only events where 
         #field extraction will be happening correctly 
          if "_grokparsefailure" not in [tags] { 
             if [type] == "linux-syslog" { 
                 elasticsearch { 
                                 embedded => false 
                                 cluster => "joinus" 
                                 host => "" 
                                 bind_host => "" 
                                 index => "logstash-os" 
                                 index_type => "linux-syslog" 
             } else if [type] == "honey-kippo" { 
                 elasticsearch { 
                                 embedded => false 
                                 cluster => "joinus" 
                                 host => "" 
                                 bind_host => "" 
                                 index => "logstash-honey" 
                                 index_type => "kippo" 
          }  else { 
                    #let's print to logstash standard output 
                    #events not captured by our Grok definitions 
                    stdout { 
                             codec => rubydebug 

Notice in the above output section that the index names both start with logstash-*.

This is in order to leverage logstash-* index template mapping, which will allow us to make use of both an 'analyzed' and 'not_analyzed' versions of the fields, in order to be able to correctly draw our dashboards.See more on this aspect here

Now let's take a closer look at the Grok definitions we will be using.

We can get our raw data recognised and parsed by adding the 4 lines below to a pattern file in $LOGSTASH_HOME/patterns:

IPTABLES_DENIED %{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:_host} kernel: iptables denied: IN=(?<in>eth0) OUT= MAC=(?<mac_addr>\\S+) SRC=%{IP:src_ip} DST=%{IP:dst_ip} LEN=\\d+ TOS=0x\\d+ PREC=0x\\d+ TTL=\\d+ ID=\\d+(?:\\sDF)? PROTO=(?<proto>\\S+) SPT=(?<src_port>\\d+) DPT=(?<dst_port>\\d+)(?:\\sWINDOW=\\d+)?(?:\\sRES=0x\\d+)?(?:\\s[ACKSYNFIRT]{3})+(?:\\sURGP=\\d)?
KIPPO_TIMESTAMP (?<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2})
KIPPO_BODY_SSHSERVICE \\[SSHService ssh-userauth on HoneyPotTransport,\\d+,%{IP:src_ip>}\\]
KIPPO_MSG_LOGIN login\\sattempt\\s\\[(?<src_user>\\S+)/(?<src_pwd>\\S+)\\]\\s(?<outcome>succeeded|failed)

Here we are just telling Grok to be aware of these new definitions that we have referenced in Logstash main config file earlier. Grok will match the event types we have configured and use the regular expression above to extract our fields of interest.

You can see that some definitions are based on already existing patterns - for e.g. IPTABLES_DENIED reuses Grok patterns SYSLOGTIMESTAMP, HOSTNAME and IP. This is one of the key strengths of Grok, aimed at making a better use of your time, other than writing and re-writing regex after regex.

Using good online regex tools can help speed up on this task. Grok Debugger is a great tool you can rely on to construct and debug Grok patterns. Some other resources worth mentioning for pure regex testing are regex101.com, regextester.com.

See Grok Debugger in action below:

Now we're good to index as much data as we like (and our hardware can handle), so we will move to the next layer on the ELK stack.

Store the data and make it searchable: Elasticsearch

Setting up an Elasticsearch cluster is straightforward. In this demo we setup 2 elasticsearch nodes on a single host. You could also do this in production, if you have enough capacity on a single machine in some specific scenarios. See here for more details.

We could leave default settings and get started by:

  1. extract the elasticsearch tar archive
  2. launch each instance as a daemon

and these instance would just talk to each other like good old friends using multicast zen discovery and form a cluster with no configuration needed!

As we'd like to have a bit more control over the cluster behaviour we proceed to amend some defaults for each of the 2 nodes.For example we set node 1 to use:

#Set a name for each node, for readability and manageability
node.name: "node-1"
#Disable multicast discovery, you never know what happens in a network you don't own
discovery.zen.ping.multicast.enabled: false
#Configure an initial list of master nodes in the cluster, we know who we are
discovery.zen.ping.unicast.hosts: ["localhost:9301"]
#Set a cluster name, elasticsearch is a nice name though, we like here to set our own
cluster.name: joinus

And that is it!

Cluster 'joinus' is ready to accept data and make it searchable, resilient and all the magic Elasticsearch will do for us.

Let's just say "hi,how are you?" to our cluster

elk@debianVM:~$ curl -XGET 'localhost:9200/_cluster/health?pretty'
"cluster_name" : "joinus",
"status" : "green",
"timed_out" : false,
"number_of_nodes" : 2,
"number_of_data_nodes" : 2,
"active_primary_shards" : 100,
"active_shards" : 200,
"relocating_shards" : 0,
"initializing_shards" : 0,
"unassigned_shards" : 0

You know, looks good!

We can now create two indexes in our Elasticsearch cluster by issuing:

$ curl -XPUT 'localhost:9200/logstash-os?pretty'


$ curl -XPUT 'localhost:9200/logstash-honey?pretty'

For each of these requests, we will receive the below answer, saying index creation was successful:

"acknowledged" : true

We could also tell Elasticsearch explicitly how to interpret our fields (is this an integer or a date or a string), but for this simple demo we're happy to let Elasticsearch automatically determine the field types.

Open my eyes, show me what you've got: Kibana

While we impatiently await Kibana 4 to go GA let's use Kibana 3 to plot some data.

We have now left this running for a while to allow the bad guys to feed us with some events and make our dashboards nice and pretty.

Once Kibana is installed we are greeted with

If we go and have a look at the sample dashboard, we see

Notice our document types are showing up in 'Document Types' panel. Contextually on the left handside we also have a list of available searchable field:

Now that we have validated the data looks good, we can go ahead and start building our first dashboard from scratch!

Let's go one step back, and choose option 3, 'Blank Dashboard', and let's set a title for our new dashboard

And point it towards the index 'logstash-os' where we are storing IPTables events (by default '_all' indexes are set to be queried)

Then let's add a row to our empty 'Denied Connections' dashboard.

Finally let's add our first panel and choose panel type 'Terms' to show some nice aggregations

Notice that as you type your first char in Field form, Kibana will show you all the possible field matching the string as you type it. We have also here available fields named with extension .raw , this is because we are leveraging logstash index mapping template for having 'not_analyzed' fields.

We choose now 'src_ip' for this panel and let's set up some more options, we want to see the top 20 IPs, sorted by count, using bars as the visualization format.

Just hit 'Save' Et voilà !

You know, Kibana rocks!
Reiterate the above and have fun!

We have now plenty of data to leverage, manipulate and visualise from many different angles.

We can ask all the questions we want, just add a few queries to slice your data as you please, each panel can display all or a selection of the queries.

Then we can plot beautiful charts giving us lots of insight on the bad guys trying to access our host:

Likewise, for our KippoSSH honeypot data

You can click on any dashboard, drilldown and start your investigations: just ask the questions, ELK will give you the answers - easy peasy!"