18 2월 2016 엔지니어링

Elasticsearch and SIEM: implementing host portscan detection

By Antonio Bonuccelli

Intro: using a SIEM approach

Effectively monitoring security across a large organization is a non-trivial task faced everyday by all sorts of organizations.The speed, scalability and flexibility of the Elastic stack can play as a great asset when trying to get visibility and proactively monitoring large amounts of data.

The traditional SIEM approach relies on normalization of the data from raw, based on a schema.

For example a failed login, be it from a Linux

Nov 26 12:15:04 zeus sshd[19571]: Failed password for ciro from 10.0.4.23 port 57961 ssh2

or a Windows host,

Log Name:      Security
Source:        Microsoft-Windows-Security-Auditing
Date:          27/11/2015 2:07:33 PM
Event ID:      4625
Task Category: Logon
Level:         Information
Keywords:      Audit Failure
User:          N/A
Computer:      minerva
Description:
An account failed to log on.
Subject:
   Security ID:  NULL SID
   Account Name:  -
   Account Domain:  -
   Logon ID:  0x0
Logon Type:  3
Account For Which Logon Failed:
   Account Name:  gennaro
<.....>

will be indexed observing a common structured format:

Penguin

"src_user": "ciro"
"src_ip": "10.0.0.111"
"auth_type":  "ssh2"


Microsoft

“src_user”:”gennaro”
“src_ip”:”10.0.0.118”
“auth_type”:”3”

Using a field naming convention allows to build correlation logic abstracting from which source the event originated from, be it a Windows or a Linux failed login.

Also some tagging or categorization of the data can be performed,

grok{
    match => { "message" => ["%{SSH_AUTH_1}","%{SSH_AUTH_2}"] }
    add_tag => [ "auth_success" ]
    }
grok{
    match => { "message" => ["%{SSH_AUTH_3}","%{SSH_AUTH_4}"] }
    add_tag => [ "auth_failure" ]
    }

where SSH_AUTH_X are our custom defined grok patterns to match success/failure events.

Using this approach, correlation logic can be applied to all the events, regardless of the datasource from which the event originated from.

Following the same approach, we will show how to use the Elastic stack to cover a basic network security use case, TCP host portscan detection, for which we'll implement alerting via email.

Implementation I: datasource

When trying to detect whether a portscan against a given host on your premises was carried on , network traffic data becomes relevant.

implementation I.png

For this use case we will want to monitor all events indicating a new TCP connection being initiated from source to target host, in short all TCP packets with SYN=1, ACK=0.


While we impatiently wait for Packetbeat Flows to be released and allow more out-of-the-box network protocol level capture capabilities, we'll use tcpdump capture using the below command for the purpose of this blog:

sudo tcpdump -i eth0  -n -tttt tcp[13] == 2 | nc localhost 5001

the above command will listen on the eth0 network interface of the monitored host and capture all and only the TCP packets indicating that a new TCP connection handshake was initiated, also avoiding resolving IP to hostnames for faster execution; then we pipe the results to netcat to send them to our Logstash instance for event processing, which we assume here to be running locally.

For convenience, we can launch the above command using a all time favourite linux CLI utility, screen.

!/bin/bash
screen -d -m /bin/bash -c 'sudo tcpdump -i eth0  -n -tttt tcp[13] == 2 | nc localhost 5001'

This is what the captured raw data looks like

2016-02-09 13:51:09.625253 IP 192.168.1.105.60805 > 192.168.1.1.80: Flags [S], seq 2127832187, win 29200, options [mss 1460,sackOK,TS val 259965981 ecr 0,nop,wscale 7], length 0

Implementation II : event processing

We'll use logstash to mangle the data and extract the information relevant to this use case, namely timestamp, src_ip and dst_port.

grok{
            match => {"message" => "%{TCPD_TIMESTAMP:timestamp} IP %{IP:src_ip}\.%{INT:src_port} > %{IP:dst_ip}\.%{INT:dst_port}(?<payload>[^$]+)"}
            add_tag => ["network","tcp_connection_started"]
        }

where TCPD_TIMESTAMP is a custom defined grok pattern to match 2016-02-09 13:51:09.625253.

As we have extracted the information we were after (timestamp,src_ip,dst_ip) we can decide to trash message and payload fields:


mutate{
        remove_field => ["message","payload"]
    }

Next we send these events to Elasticsearch index logstash-tcpdump-%{+YYYY.MM.dd}

	elasticsearch {
	hosts => "es-server:9200"
	index => "logstash-tcpdump-%{+YYYY.MM.dd}"
	user => "logstash"
	password => "verysecretpassword"
	ssl => true
	cacert => "/path/to/cacert.pem"
	}

Implementation III: searching for a portscan

We're now at the stage where events are coming into Elasticsearch and we want to be automatically alerted when our monitored host will receive (or launch!) a portscan.

Thinking person

This is what our indexed event looks like:

{
    "@version": "1",
    "@timestamp": "2016-02-08T00:56:58.407Z",
    "host": "127.0.0.1",
    "port": 41433,
    "type": "tcpdump",
    "timestamp": "2016-02-08 01:56:58.407625",
    "src_ip": "192.168.1.105",
    "src_port": "55203",
    "dst_ip": "192.168.1.1",
    "dst_port": "80"
}

We can define a TCP host portscan as a large amount of connections attempted within a short amount of time between a source and a target host, where the target port is always changing from connection to connection. How would this translate to an elasticsearch query?

GET logstash-tcpdump-*/_search
{
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "tags": "tcp_connection_started"
          }
        },
        {
          "range": {
            "@timestamp": {
              "gte": "now-30s"
            }
          }
        }
      ]
    }
  },
  "aggs": {
    "by_src_ip": {
      "terms": {
        "field": "src_ip"
      },
      "aggs": {
        "by_target_ip": {
          "terms": {
            "field": "dst_ip",
            "order": {
              "unique_port_count": "desc"
            }
          },
          "aggs": {
            "unique_port_count": {
              "cardinality": {
                "field": "dst_port"
              }
            }
          }
        }
      }
    }
  }
}

We leverage here a killer feature of Elasticsearch: aggregations. Specifically terms and cardinality aggregations. Note we're purely interested in aggregated results, hence setting size:0. The response we receive looks like:

{
  "took": 9,
  "timed_out": false,
  "_shards": {
    "total": 24,
    "successful": 24,
    "failed": 0
  },
  "hits": {
    "total": 46,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "by_src_ip": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "192.168.1.17",
          "doc_count": 44,
          "by_target_ip": {
            "doc_count_error_upper_bound": 0,
            "sum_other_doc_count": 0,
            "buckets": [
              {
                "key": "192.168.1.105",
                "doc_count": 44,
                "unique_port_count": {
                  "value": 41
                }
              }
            ]
          }
        },
        {
          "key": "192.168.1.105",
          "doc_count": 2,
          "by_target_ip": {
            "doc_count_error_upper_bound": 0,
            "sum_other_doc_count": 0,
            "buckets": [
              {
                "key": "192.168.1.10",
                "doc_count": 1,
                "unique_port_count": {
                  "value": 1
                }
              },
              {
                "key": "192.168.1.32",
                "doc_count": 1,
                "unique_port_count": {
                  "value": 1
                }
              }
            ]
          }
        }
      ]
    }
  }
}

From the above we can infer that host 192.168.1.17 has initiated 41 different TCP connections against host 192.168.1.105 which seems suspicious: 192.168.1.17 is our attacker.

Also host 192.168.1.105 has initiated 2 TCP connections against hosts 192.168.1.10 and 192.168.1.32, which seems legitimate.

Next we'll see how we can use Watcher to automatically receive an email when an event like this happens.

Implementation IV: alert me!

Watcher is our friend here, all we need to do is to configure a service email account, then define a new Watch and define how to act when a portscan is detected.

First we define a schedule, how often should the Watch be executed:

"trigger": {
    "schedule": {
      "interval": "10s"
    }
  }

Next, define what query search_type to run, on what indices and document types:

"input": {
    "search": {
      "request": {
        "search_type": "query_then_fetch",
        "indices": [
          "logstash-tcpdump-*"
        ],
        "types": [
          "tcpdump"
        ],
        "body": { #<insert query discussed in previous paragraph here>
}

Now specify what condition would trigger the watch:

"condition": {
    "script": {
      "inline": "for (int i = 0; i < ctx.payload.aggregations.by_src_ip.buckets.size(); i++) {for (int j = 0; j < ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets.size(); j++) {if (ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets[j].unique_port_count.value > threshold) return true;};};return false;",
      "params": {
        "threshold": 50
      }
    }
  }

The above groovy script will scan our aggregated results and look for a unique_port_count bucket where the cardinality is greater than 50; so putting within context, if a host has established within 30 seconds timerange, more than 50 connection each using a different port against another host, we will call this a portscan.

Last, what action should our Watch perform once its conditions are met? Send a nice email to warn us!

"actions": {
    "email_administrator": {
      "transform": {
        "script": {
          "inline": "def target='';def attacker='';def body='';for (int i = 0; i < ctx.payload.aggregations.by_src_ip.buckets.size(); i++) {for (int j = 0; j < ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets.size(); j++) {if (ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets[j].unique_port_count.value > threshold) {target=ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets[j].key;attacker=ctx.payload.aggregations.by_src_ip.buckets[i].key;body='Detected portscan from ['+attacker+'] to ['+target+']. '+ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets[j].unique_port_count.value+ ' unique ports scanned.'; return [ body : body ];};};};",
          "params": {
            "threshold": 50
          }
        }
      },
      "email": {
        "profile": "standard",
        "attach_data": true,
        "priority": "high",
        "to": [
          "info@elastic.co"
        ],
        "subject": "[Security Alert] - Port scan detected",
        "body": "{{ctx.payload.body}}"
      }
    }
  }

What we do here is scanning again through the results to pick the attacker and target hosts, plus the count of how many unique ports were scanned.

The resulting watch then becomes:

PUT _watcher/watch/port_scan_watch
{
  "trigger": {
    "schedule": {
      "interval": "10s"
    }
  },
  "input": {
    "search": {
      "request": {
        "search_type": "query_then_fetch",
        "indices": [
          "logstash-tcpdump-*"
        ],
        "types": [
          "tcpdump"
        ],
        "body": {
          "size": 0,
          "query": {
            "bool": {
              "must": [
                {
                  "match": {
                    "tags": "tcp_connection_started"
                  }
                },
                {
                  "range": {
                    "@timestamp": {
                      "gte": "now-30s"
                    }
                  }
                }
              ]
            }
          },
          "aggs": {
            "by_src_ip": {
              "terms": {
                "field": "src_ip"
              },
              "aggs": {
                "by_target_ip": {
                  "terms": {
                    "field": "dst_ip",
                    "order": {
                      "unique_port_count": "desc"
                    }
                  },
                  "aggs": {
                    "unique_port_count": {
                      "cardinality": {
                        "field": "dst_port"
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
  },
  "condition": {
    "script": {
      "inline": "for (int i = 0; i < ctx.payload.aggregations.by_src_ip.buckets.size(); i++) {for (int j = 0; j < ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets.size(); j++) {if (ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets[j].unique_port_count.value > threshold) return true;};};return false;",
      "params": {
        "threshold": 50
      }
    }
  },
  "throttle_period": "30s",
  "actions": {
    "email_administrator": {
      "transform": {
        "script": {
          "inline": "def target='';def attacker='';def body='';for (int i = 0; i < ctx.payload.aggregations.by_src_ip.buckets.size(); i++) {for (int j = 0; j < ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets.size(); j++) {if (ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets[j].unique_port_count.value > threshold) {target=ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets[j].key;attacker=ctx.payload.aggregations.by_src_ip.buckets[i].key;body='Detected portscan from ['+attacker+'] to ['+target+']. '+ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets[j].unique_port_count.value+ ' unique ports scanned.'; return [ body : body ];};};};",
          "params": {
            "threshold": 50
          }
        }
      },
      "email": {
        "profile": "standard",
        "attach_data": true,
        "priority": "high",
        "to": [
          "antonio@elastic.co"
        ],
        "subject": "[Security Alert] - Port scan detected",
        "body": "{{ctx.payload.body}}"
      }
    }
  }
}

Testing our setup: you got mail!

Now on to seeing some action, let's login to a host that has connectivity towards our monitored host (in this example 192.168.1.105) and launch a port scan against it:

Elastic-MacBook-Air:~ user$ nmap 192.168.1.105 -p1-500
Starting Nmap 6.47 ( http://nmap.org ) at 2016-02-09 15:38 CET
Nmap scan report for w530 (192.168.1.105)
Host is up (0.0078s latency).
Not shown: 495 closed ports
PORT    STATE SERVICE
22/tcp  open  ssh
80/tcp  open  http
139/tcp open  netbios-ssn
389/tcp open  ldap
445/tcp open  microsoft-ds

Explicitly looking to probe privileged ports from 1 to 500. A few seconds later, we receive an email:

security-alert-port-scan-detected-email.png

Et voila! The alert was triggered and intended watch action was performed.

Note that we could have multiple detections from different hosts, however for the purpose of this blog post we limit ourselves to detecting and reporting only the first one in the list.

As a side node, if you like NMap, take a look at this blog post to see all the awesome things you can do using logstash-codec-nmap.

This is just an example of how to leverage the Elastic stack for performing security monitoring, creativity is the only limit.

Happy alerting!