Tech Topics

# Integrating Elasticsearch with ArcSight SIEM - Part 4

Editor's Note (August 3, 2021): This post uses deprecated features. Please reference the map custom regions with reverse geocoding documentation for current instructions.

 Get your ArcSight security data into Elasticsearch and visualized in Kibana in literally minutes with the Logstash ArcSight module. Learn more.

Editor's Note: Be sure to check out the other posts in this 6-part blog series. Part 1 kicks off the series with getting started content. Part 2 continues the story with how to proactively monitor security data in Elasticsearch using X-Pack. Part 3 walks you through how to scale the architecture. Part 4 (this post) and Part 5 provide examples of setting up alerts for common security threats using the alerting features in X-Pack. Part 6 extends the alerting story with automated anomaly detection using machine learning.

Following on from our last security alerting post, where we attempted to identify a successful login from a remote IP address, we introduce a more complex requirement to identify a sequence of events on the same authentication logs.

As you will see below, the watch can get quite complex very quickly. It is for this reason, in part, that we are so excited about the arrival of machine learning features in X-Pack. Whilst threshold-based event notification is powerful, such as triggering a notification after a successful login if there were successful logins previously, the ability to identify anomalous behaviour automatically without having to define specific conditions simplifies the experience of the user and reduces the need to create complex rules. As we roll out machine learning broadly, we will provide similar examples to this post.

### Detecting a Successful Brute Force Login

Problem: “Alert when user attempts N failed logins, followed by a successful login during a window size of X minutes”

This examples uses the same dataset and fields as the previous post. To recap:

CEF:0|Unix|Unix|5.0|cowrie.login.success|login attempt [root/!root] succeeded|Unknown|destinationUserName=root externalId=15 startTime=Nov 15 2016 19:22:29 destinationHostName=elastic_honeypot destinationAddress=192.168.20.2 deviceReceiptTime=Nov 15 2016 19:22:29 deviceTimeZone=Z transportProtocol=TCP applicationProtocol=SSHv2 destinationServiceName=sshd devicePayloadId=15 message=login attempt [root/!root] succeeded categoryOutcome=success categoryBehaviour=cowrie.login.success sourceTranslatedAddress=192.168.1.105 sourceAddress=192.168.1.105 deviceDirection=1 cs1=0 cs1Label=isError cs2=SSHService ssh-userauth on HoneyPotSSHTransport,2,192.168.1.105 cs2Label=system cs3=!root cs3Label=password cs4=111f70f0 cs4Label=session


The values of cowrie.login.failed and cowrie.login.success for our field categoryBehaviour represent a failed and successful login respectively. The fields destinationAddress and destinationUserName indicate the target server and logged in user respectively. Note that while our example data is from a single server, the provided watch could be applied to a larger infrastructure with potentially hundreds of servers. Finally, the startTime indicates the time at which the event occurred. This field will be be parsed by our Logstash config file into the @timestamp field.

Last month’s watch, which alerted on successful logins, would likely result in a significant number of false alarms requiring investigation if we relied on this technique to identify potential risks for investigation. We therefore refine our alert to look for a specific suspicious sequence where “N failed logins attempts are followed by a successful login within an X minute window”. This should give us a relatively reasonable indication that a brute force attack has occurred and been successful. For the purposes of example, we assume X to be 5 minutes.

To identify this discrete set of events with X-Pack (using the watcher API) we’ll need to use a combination of Elasticsearch aggregations and Painless scripting.

Using the same iterative process as described previously, we first identify the input and query to capture the relevant data before evaluating the response with a condition to determine whether the watch fires an action. Next we extract the data points of interest through a transformation before finally taking action.

The supporting files for this post are provided within the same repository as the previous watcher example.

#### Step 1 - Set Up Watch Trigger and Watch Input

For a user based analysis of events, we’ll need a summary of the failed and successful logins per user. The following Elasticsearch query provides the appropriate summary:

GET cef-*/syslog/_search
{
"query": {
"bool": {
"filter": [
{
"terms": {
"categoryBehaviour": [
]
}
},
{
"exists": {
}
}
]
}
},
"aggs": {
"users": {
"terms": {
"size": 1500,
"min_doc_count": 4
},
"aggs": {
"times": {
"terms": {
"field": "@timestamp",
"size": 15000,
"order": {
"_term": "asc"
}
},
"aggs": {
"access": {
"terms": {
"field": "categoryBehaviour",
"size": 1
}
}
}
}
}
}
},
"size": 0
}

##### Some Considerations

The above query may take a few seconds to execute as we analyse the whole month in one query - with most of the time spent returning the large JSON response. This analysis of the complete dataset, however, represents an unrealistic production execution. Rather than examining all attempted logins, a production watch would only ever be responsible for the last X minutes. In practice the above query would therefore be modified to include a date filter looking at all logins since now-(X+Y), where Y represents how often the watch periodically executes in seconds. (Note that we check for time period X+Y to ensure we don’t miss any edge case sequences). Additionally, we must also add an additional query to the final watch to ensure duplicate alerts are not generated - the same sequence will be alerted for multiple executions otherwise. In order to achieve this de-duplication we utilise the output of each watch execution. On detecting a valid unique sequence, our watch will index a document for each suspected successful brute force attack with the @timestamp field set to the the time of the successful login e.g.

{
"@timestamp": "2016-11-16T14:07:14.000Z",
}


We can collect the above events prior to executing the earlier query. These can in turn be used to ensure only new sequences are alerted every X seconds. The following query captures previous alerts:

POST cef-ssh-watch-results/_search
{
"size": 0,
"aggs": {
"users": {
"terms": {
"size": 100
},
"aggs": {
"times": {
"terms": {
"field": "@timestamp",
"size": 100
}
}
}
}
}
}


This query is chained prior the earlier query using a chain input.

The number of points assessed by a production watch will therefore be significantly lower than the above query. For historical analysis, we assume a maximum of 1500 users and up to 15k attempted logins per user. These are appropriate values for an historical analysis of this dataset, but would also be adjusted for any production deployment. With no time restriction, the above obviously doesn’t require the logins occur within a specific time range, rather that they just occurred in sequence.

#### Step 2 - Set Up Watch Condition

For our condition we are looking for a consecutive series of failed attempts followed by a success. Since this watch runs every Y seconds, but looks back over a longer period of X minutes + Y seconds, the time overlap across each execution could result in duplicate alerts. To avoid this, we'll check the index of previous alerts, and not generate a new one if one already exists for this sequence. Utilising the new Painless scripting language with a script condition, we evaluate each user's bucket of logins, returning true as soon as we identify the required pattern and confirm it represents a new event, or false otherwise.

if (ctx.payload.events.hits.total == 0 || ctx.payload.events.aggregations.users.buckets.size() == 0) { return false; }
#collect previous alerts to avoid duplication as we execute multiple times over the same window - we use a stream for example purposes
def historical_events = [];
{
historical_events = ctx.payload.previous.aggregations.users.buckets.stream().flatMap(user -> user.times.buckets.stream().map(time -> user.key + '-' + time.key)).collect(Collectors.toList());
}
{
def failed = 0;
for (time in user.times.buckets) {
failed += 1;
} else {
#a successful login. Check against the metadata required_failures* and confirm we haven't seen before
if (failed >= ctx.metadata.required_failures && !historical_events.contains(user.key + '-' + time.key)) {
return true;
} else {
failed=0;
}
}
}
}
return false;


Note the above uses the variable ctx.metadata.required_failures to represent the value of N i.e. the number of required failures before a successful login. This value will be set in the broader watch as a configuration parameter allowing simple tuning - see Step 5 - Putting it all together.

#### Step 3 - Transform Watch Input into Watch Payload

To ensure our actions receive a summary of the matching events, we utilise a transform operation to map the current response document payload to an appropriate summary data structure. Here we ideally need to know which usernames have been compromised at which times. A simple script transform, again using Painless, can collect these points as a map returning them as the payload. Again we ensure our sequences are unique to avoid duplicate events.

#collect previous alerts to avoid duplication as we execute multiple times over the same window
def historical_events = [];
historical_events = ctx.payload.previous.aggregations.users.buckets.stream().flatMap(user -> user.times.buckets.stream().map(time -> user.key + '-' + time.key)).collect(Collectors.toList());
}
def users=[:];
def times = [];
def failed = 0;
for (time in user.times.buckets) {
failed += 1;
} else {
#a successful login. Check against the metadata required_failures* and confirm we haven't seen before. If we haven't record.
if (failed >= ctx.metadata.required_failures && !historical_events.contains(user.key + '-' + time.key)) {
}
#reset either way
failed = 0;
}
}
if (times.length > 0) {
users[user.key] = times;
}
}
return users;


The above script iterates through the aggregation buckets for each user, each of which contains a sub bucket for every time at which an event has occurred - effectively one per document due to the terms aggregation. These time buckets in turn contain a value indicating whether the event was a success or failure. The inner loop iterates over the event times for each user, maintaining a counter ‘failed’ for the number of failed logins. On finding a successful login, the current counter value is checked against a watcher level parameter ctx.metadata.required_failures. If the counter is greater than this value the sequence is satisfied, with the user and time recorded into a map users provided it has not been seen before, else the counter is reset. Finally the user map is returned as the watch payload capturing all the detected successful brute force attacks.

The result of the above might look like this:

["root":[ "2016-11-16T14:07:14.000Z",  "2016-11-16T14:47:01.000Z", "2016-11-16T17:35:47.000Z", "2016-11-29T11:57:52.000Z", "2016-11-29T16:02:00.000Z" ]]


The above shows that the “root” user has potentially been exposed to a successful brute force attack 5 times.

#### Step 4 - Setup Watch Actions

On detecting a successful brute force attack we fire two actions: a log action, to assist with debugging, and an index action to index one document per detected threat. The indexed actions are required in future watch executions to avoid duplicates.

The log action is simple and renders the map constructed in Step 3 - Transform Watch Input into Watch Payload using a Mustache template.

The index action aims to index a document for each successful brute force attack, with a username and time at which the successful login occurred. To achieve this we need to flatten the map and create a separate doc for each, placing the resultant list under the appropriate “_doc” key. This can be achieved through a simple Painless script. Fortunately, painless supports lambda functions allowing us to achieve this in a single line.

return ['_doc':ctx.payload.entrySet().stream().flatMap(value -> value.getValue().stream().map(timestamp -> ['alert':true,'@timestamp':timestamp,'destinationUserName':value.getKey()])).collect(Collectors.toList())];


#### Step 5 - Putting it all together

Assembling the above produces the following. Notice how we add a date filter to the query to examine only the window period. The watch executes every 5 seconds, using a value of 3 for the number of sequential required failures. All of these are environment specific and would need to be adjusted appropriately.

POST _xpack/watcher/watch/_execute
{
"window_period": "5m",
"required_failures": 3
},
"trigger": {
"schedule": {
"interval": "5s"
}
},
"input": {
"chain": {
"inputs": [
{
"previous": {
"search": {
"request": {
"indices": ["cef-ssh-watch-results"],
"types": "brute_force",
"body": {
"size": 0,
"query":{
"bool":{
"filter": [
{
"range": {
"@timestamp": {
"gte": "now-305s"
}
}
}
]
}
},
"aggs": {
"users": {
"terms": {
"size": 100
},
"aggs": {
"times": {
"terms": {
"field": "@timestamp",
"size": 100
}
}
}
}
}
}
}
}
}
},
{
"events": {
"search": {
"request": {
"indices": ["<cef-ssh-{now/d}>","<cef-ssh-{now/d-1d}>"],
"types": "syslog",
"body": {
"query": {
"bool": {
"filter": [
{
"terms": {
"categoryBehaviour": [
]
}
},
{
"exists": {
}
},
{
"range": {
"@timestamp": {
"gte": "now-305s"
}
}
}
]
}
},
"aggregations": {
"users": {
"terms": {
"size": 1500,
"min_doc_count": 4
},
"aggs": {
"times": {
"terms": {
"field": "@timestamp",
"size": 15000,
"order": {
"_term": "asc"
}
},
"aggs": {
"access": {
"terms": {
"field": "categoryBehaviour",
"size": 1
}
}
}
}
}
}
},
"size": 0
}
}
}
}
}
]
}
},
"condition": {
"script": {
"inline": "if (ctx.payload.events.hits.total == 0 || ctx.payload.events.aggregations.users.buckets.size() == 0) { return false; } def historical_events = []; if (ctx.payload.previous.hits.total > 0) { historical_events = ctx.payload.previous.aggregations.users.buckets.stream().flatMap(user -> user.times.buckets.stream().map(time -> user.key + '-' + time.key)).collect(Collectors.toList()); } for (user in ctx.payload.events.aggregations.users.buckets) {  def failed = 0; for (time in user.times.buckets) { if (time.access.buckets[0].key == 'cowrie.login.failed') { failed += 1; } else { if (failed >= ctx.metadata.required_failures && !historical_events.contains(user.key + '-' + time.key)) { return true; } else { failed=0; } } } } return false;"
}
},
"transform": {
"script": "def historical_events = []; if (ctx.payload.previous.hits.total > 0) { historical_events = ctx.payload.previous.aggregations.users.buckets.stream().flatMap(user -> user.times.buckets.stream().map(time -> user.key + '-' + time.key)).collect(Collectors.toList()); } def users=[:]; for (user in ctx.payload.events.aggregations.users.buckets) { def times = []; def failed = 0; for (time in user.times.buckets) { if (time.access.buckets[0].key == 'cowrie.login.failed') { failed += 1; } else { if (failed >= ctx.metadata.required_failures && !historical_events.contains(user.key + '-' + time.key)) { times.add(time.key_as_string); } failed = 0; } } if (times.length > 0) { users[user.key] = times; } } return users;"
},
"actions": {
"log": {
"logging": {
}
},
"transform": {
},
"index": {
"index": "cef-ssh-watch-results",
"doc_type": "brute_force"
}
}
}
}


By removing the date filter and modifying the index pattern to cef-*, we can execute the watch across the entire historical dataset. In order to visualize the detected brute force attempts against the successful logins we turn to Timelion.

In a multi-server environment we would need to adjust the above to make analysis user-server specific, thus adjusting our problem statement slightly to "A user attempts N failed logins, followed by a successful login, on a specific server in any X minute window". Rather than complicating our watch, we can simply configure Logstash to concatenate a host and username field using the mutate filter. Our watch remain structurally the same with some small modifications to use the new user_host field rather than destinationUserName. Furthermore, the server name could be extracted in the transform script to provide details of the target server on which the attack was successful.