Tech Topics

Integrating Elasticsearch with ArcSight SIEM - Part 5

Get your ArcSight security data into Elasticsearch and visualized in Kibana in literally minutes with the Logstash ArcSight module. Learn more

Editor's Note: Be sure to check out the other posts in this 6-part blog series. Part 1 kicks off the series with getting started content. Part 2 continues the story with how to proactively monitor security data in Elasticsearch using X-Pack. Part 3 walks you through how to scale the architecture. Part 4 and Part 5 (this post) provide examples of setting up alerts for common security threats using the alerting features in X-Pack. Part 6 extends the alerting story with automated anomaly detection using machine learning.

Following on from our most recent security alerting post, where we attempted to identify a successful brute force login attack, we now push the limits of rule-based alerting by creating a watch to detect suspicious process execution on machines in your infrastructure. As a teaser, in our next post, we'll compare and contrast this rule-based approach with a new technique - using X-Pack machine learning capabilities to detect the same suspicious process activity.

For this (our last) Watcher example, we utilise a dataset which contains process starts events for a specific server as produced by auditd. Whilst we distribute this dataset in CEF format, this data could easily be captured using the Auditd Filebeat module for those not using ArcSight. Our sample event data looks like:

CEF:0|Unix|auditd||EXECVE|EXECVE|Low| eventId=30275 externalId=1737 categorySignificance=/Informational categoryBehavior=/Execute/Response categoryDeviceGroup=/Operating System catdt=Operating System categoryOutcome=/Success categoryObject=/Host/Application/Service art=1495990987103 cat=EXECVE rt=1495907409681 c6a4=fe80:0:0:0:5604:a6ff:fe32:b64 cs1Label=dev cs2Label=key cs3Label=success/res cs4Label=syscall cs5Label=subj cs6Label=terminal/tty cn2Label=ses cn3Label=uid c6a4Label=Agent IPv6 Address ahost=test-server-1 agt=192.168.0.12 av=7.3.0.7886.0 atz=Europe/London at=linux_auditd dtz=Europe/London deviceProcessName=auditd ad.argc=3 ad.a1=vim ad.a2=/etc/filebeat/filebeat.yml ad.a0=sudo aid=3xrP+T1wBABCAA5ZTdRz+fA\=\=

Auditd generates many event types and is hugely configurable. Our sample dataset includes a wide range of server activity. However, for the purposes of our watch and this blog, we are interested only in events which represent a process starting. With respect to the CEF format, these can be identified by the cat field containing the value EXECVE. The field ad.a0 represents the command issued, whilst ahost indicates the source host from which the event originated. rt provides the start time of the process in epoch milliseconds.

The CEF codec used to process this data with Logstash maps the above fields to their standardised CEF form. The cat field is mapped to deviceEventCategory, rt to deviceReceiptTime, ahost to agentHost and the ad.* fields to ad.argc as a concatenated string. The latter is subsequently processed with a filter to produce the field ad.a0. The full dataset, logstash configuration and accompanying watch described below can be found here.

Problem: "Alert when an unusual process signatures starts on a server in the last N minutes"

Given that Watcher is limited to alerting using a static rule, as defined by a query, we consider an unusual process to be one which is started on a server for the first time. This relies on the user constructing a field which acts as a signature and unique identifier. Whilst this makes for an interesting example, which can be adapted to other similar use cases (e.g., "Alert me when a user logs into a server for the first time"), it would practically result in a large number of false positives in large infrastructures. Analysts would be encouraged to limit the watch to either specific servers of interest or add additional rules to define "unusual". Another teaser: We'll describe how an X-Pack machine learning recipe can be used to extend the usefulness of this approach in our next post - for now, if you want to know what we mean by an ML recipe, check out this recent post.

Design

When considering the design of any watch, security analysts should consider scalability and performance. A naive solution to the above problem might use the following approach:

1. A query consisting of:

      a. An aggregation to identify all of the servers and all their respective processes that have ever been started ( i.e., two nested terms aggregations for server name and process name).

      b. A second aggregation (sibling of a) which identifies the servers and their respective processes that have been started in the last N minutes. This aggregation would be identical to a, but include a filter to restrict the results to the required period.

2. A Painless script condition and transformation which perform the set complement logic on the results of 1a and 1b i.e. to identify processes started in the last N minutes (1b) which are not present in 1a. The condition would return true if any of the complements result in a set size > 0. The transformation would, however, need to identify all complete complements for alerting. With painless lambda functions these can be achieved efficiently in a single line.

3. The results of 2 used in action to alert the servers which have new processes.

Whilst this approach only utilises a single query, and may work across several servers, consider how this scales when needing to identify new processes across potentially thousands of auditd instances each reporting process start events. The Painless script would also need to perform the set comparisons in memory, potentially leading to memory issues and poor performance. This performance would only degrade over time as the number of unique processes continues to naturally increase and add to the historical set identified in 1a.

With any watch, we aim to ensure Elasticsearch does as much of the "heavy lifting" as possible, through an optimised set of queries which minimise the work required in Painless scripts and maximise the inherent capabilities of Elasticsearch. This also requires us to ensure that we prepare our data to allow efficient querying. We therefore propose the following optimised approach:

1. Ensure a field process_signature is created at index time. This will be a concatenation of both the hostname and process name with a denoted separator. Whilst not essential, this simplifies the following queries and improves query performance.

2. An aggregation to identify all of the servers and their respective processes that have been started in the last N minutes. This would consist of a normal date filter with a terms aggregation on the field process_signature created in step 1 to produce a set of values A. This dataset should be quite small depending on the frequency of the watch execution, the number of servers, and the number of processes being started*.

3. A second query which utilises the results of step 2. This query would utilise a terms filter using the values of set A identified above, restricting to the period earlier than now-N minutes (i.e., processes started prior to the last N minutes). An aggregation would collect the values of the process_signature field similar to step 2, producing set B. Note: Rather than "now" we utilise the watches scheduled time in case there are delays in execution.

4. A Painless condition to determine whether the watch fires by checking if set A and set B are of equal size. If not, we return true indicating a new process has occurred.

5. In our transform step, we identify the specific difference between sets A and B using a painless script (i.e., what is in A, but not present in B). A final query uses the results of this step to retrieve the specific details of each process i.e. its respective server and name. This final payload is used in our alert.

* The exact value of N can impact the performance profile of this approach. Too small and Elasticsearch would be subject to many queries, potentially impacting the environment - although these queries would have small process_signature. Too large and the size of the set in step 1 is would increase, decreasing the efficiency and performance of the 2nd query. The value here is thus environment specific and depends on the number of servers, frequency of processes being started and the requirement for how soon an alert is required. Typically a value of around 5 minutes has proven effective.

We can now employ the same process to constructing the alert described in previous posts. Here we blog a little more succinctly that previous posts, assuming the user has an understanding of the Alerting architecture and execution process of a watch.

Step 0 - Creating our process_signature field

Creating a process signature field relies on the user selecting fields which best represent a unique identifier across server instances when combined. Too coarse identifier, which fails to encode sufficient properties of the processes characteristics, will result in the alert failing to detect new process effectively. Too granular and all processes will incorrectly be identified as being new.

For auditd CEF encoded data we utilise the following fields, concatenating them to form a single process_signature field with the delimiter "|" using a logstash filter as shown here:

  • agentHost - the host from which the event originated
  • ad.a0 - first argument of the command used to execute the process

The above concatenation could also be easily achieved with an ingest pipeline - example here. In the event you have data already indexed Elasticsearch provides several mechanism to assist. The update_by_query API can be used to generate the required field using an ingest processor - an example request, which assumes the same field names as this blog, is provided here. Alternatively the watch below could be modified to use a script enabled terms aggregation. This approach is not as efficient with respect to query performance but may act as a sufficient solution until the field can be generated at index time.

Step 1 - Setup Watch Trigger and Watch Input

Our first query identifies those processes which have started in the last N minutes. Here we assume N is 5 minutes. We restrict the results to those documents with a cat field value of EXECVE - the auditd event type for a process starting.

{
  "query": {
    "bool": {
      "filter": [
        {
          "range": {
            "deviceReceiptTime": {
              "gte": "{{ctx.trigger.scheduled_time}}||-5m",
        "lt": "{{ctx.trigger.scheduled_time}}"
            }
          }
        },
        {
          "term": {
            "deviceEventCategory": {
              "value": "EXECVE"
            }
          }
        }
      ]
    }
  },
  "aggs": {
    "process_hosts": {
      "terms": {
        "field": "process_signature",
        "size": 1000
      }
    }
  },
  "size": 0
}

Within the chained input we place the above query under the key "started_processes".

The aggregation process_hosts delivers us any processes started in the last 5 minutes. Here we limit to a 1000 results - this value would typically be tuned depending on the number of servers being monitored. The values here need to be passed to our 2nd terms query, responsible for identifying which of the process_signatures have occurred previously. To achieve this in a Watch we use a chained input and a mustache template to extract the values of the first query and inject into the 2nd. This query will be placed under the key "previously_started_processes".

{
  "query": {
    "bool": {
      "filter": [
        {
          "terms": {
            "process_signature": [
              "{{#ctx.payload.started_processes.aggregations.process_hosts.buckets}}{{key}}",
              "{{/ctx.payload.started_processes.aggregations.process_hosts.buckets}}"
            ]
          }
        },
        {
          "range": {
            "deviceReceiptTime": {
              "lt": "{{ctx.trigger.scheduled_time}}||-5m"
            }
          }
        },
        {
          "term": {
            "deviceEventCategory": {
              "value": "EXECVE"
            }
          }
        }
      ]
    }
  },
  "aggs": {
    "process_hosts": {
      "terms": {
        "field": "process_signature",
        "size": 1000
      }
    }
  },
  "size": 0
}

Notice how the above query contains a time period which looks at historical data older than the last 5 mins with a < scheduled_time-5m date range filter. We again capture the top 1000 processes - this value again is environment dependent, but should be the same as the first query.

The chained input exposes the results of both queries in namespaces under the keys "started_processes" and "previously_started_processes" within the watch payload as described here, thus allowing us to compare the results in our subsequent condition.

Note: In order to test this watch against the provided test data set we provide a script which performs a "sliding window" execution of the watch. This repeatedly executes the watch, each time adjusting the date filters to target the next 5 minute time range thus simulating execution against a live stream. More specifically, rather than using scheduled_time-5m we utilise ||-5m - where begins at the earliest time in the data before being incremented by 5 minutes for each execution. Instructions for running this script are provided here

Combining the above, our chain input looks like:

"input": {
 "chain": {
   "inputs": [
     {
       "started_processes": {
         "search": {
           "request": {
             "indices": [
               "<cef-auditd-{now/d}>",
               "<cef-auditd-{now/d-1d}>"
             ],
             "body": {
               "query": {
                 "bool": {
                   "filter": [
                     {
                       "range": {
                         "deviceReceiptTime": {
                           "gte": "{{ctx.trigger.scheduled_time}}||-{{ctx.metadata.time_period}}",
        "lt": "{{ctx.trigger.scheduled_time}}"
                         }
                       }
                     },
                     {
                       "term": {
                         "deviceEventCategory": {
                           "value": "EXECVE"
                         }
                       }
                     }
                   ]
                 }
               },
               "aggs": {
                 "process_hosts": {
                   "terms": {
                     "field": "process_signature",
                     "size": 1000
                   }
                 }
               },
               "size": 0
             }
           }
         }
       }
     },
     {
       "history_started_processes": {
         "search": {
           "request": {
             "indices": [
               "cef-auditd-*"
             ],
             "body": {
               "query": {
                 "bool": {
                   "filter": [
                     {
                       "terms": {
                         "process_signature": [
                           "{{#ctx.payload.started_processes.aggregations.process_hosts.buckets}}{{key}}",
                           "{{/ctx.payload.started_processes.aggregations.process_hosts.buckets}}"
                         ]
                       }
                     },
                     {
                       "range": {
                         "deviceReceiptTime": {
                           "lt": "{{ctx.trigger.scheduled_time}}||-{{ctx.metadata.time_period}}"
                         }
                       }
                     },
                     {
                       "term": {
                         "deviceEventCategory": {
                           "value": "EXECVE"
                         }
                       }
                     }
                   ]
                 }
               },
               "aggs": {
                 "process_hosts": {
                   "terms": {
                     "field": "process_signature",
                     "size": 1000
                   }
                 }
               },
               "size": 0
             }
           }
         }
       }
     }
   ]
 }
}

Step 2 - Set Up Watch Condition

On execution of our watch condition, we have the following items in our Watch payload:

1. A list of process signatures that have occurred in the last 5 minutes within the payload namespace "started_processes".

2. Those signatures from (1), which have also occurred previous to the last 5 minutes, under the payload namespace "history_started_processes".

If new processes have started, the sizes of our lists from (1) and (2) would therefore not be equal. This is easily tested with a painless condition:

{
  "script": "return ctx.payload.started_processes.aggregations.process_hosts.buckets.size() != ctx.payload.history_started_processes.aggregations.process_hosts.buckets.size();"
}

Step 3 - Transform Watch Input into Watch Payload

Assuming the above condition evaluates to true, the above watch fires, passing the same payloads to our transformation. Using the lambda and stream features of Painless, we are able to efficiently identify the difference between our two lists (i.e., what values are present in the list "started_processes" which are not present in "previously_started_processes").

{
  "script": "def history=ctx.payload.history_started_processes.aggregations.process_hosts.buckets.stream().map(p -> p.key).collect(Collectors.toList()); 
def new_starts = ctx.payload.started_processes.aggregations.process_hosts.buckets.stream().map(e -> e.key).filter(p -> !history.contains(p)); 
return new_starts.map(p -> ['process_name':/\\|/.split(p)[1],'host_name':/\\|/.split(p)[0],'@timestamp':ctx.trigger.scheduled_time]).collect(Collectors.toList());
"
}

The first line above collects those process signatures, which have occurred both in the last N minutes and also prior to the last N minutes, into a variable "history". The second line iterates over those processes started in the last N minutes, retaining only those not present in the variable history through a filter, storing the result in a variable "new_starts". Finally, this list of identified process signatures is restructured to a list of maps by splitting on the denoted signature delimiter '___'. Each resulting map entry represents the process started, with the fields hostname, process_name and @timestamp (the triggered_time time of the watch). This list of maps is returned from this transform step and implicitly assigned to payload. An example resulting payload is shown below:

[{ "@timestamp": "2017-05-28T16:31:48.533Z", "process_name": "/usr/bin/python", "host_name": "test-server-1"}
,{ "@timestamp": "2017-05-28T16:31:48.533Z", "process_name": "/usr/bin/perl", "host_name": "test-server-2"}]

Step 4 - Set Up Watch Actions

On detecting new processes being started we we fire two actions: a log action, to assist with debugging, and an index action to index one document per unusual (new) process that has started. This latter action provides the additional benefit of being able to visualise started processes in Kibana.

The log action is simple and renders the map constructed in Step 3 using a Mustache template, printing a line per process.

"logging": {
"text": """
Processes started:
{{#ctx.payload._value}}{{process_name}} on server {{host_name}}
{{/ctx.payload._value}}
"""
}

The index action aims to index a document for each new process; with a hostname, process name and time and at which we detected the process as being started (time here is an estimation based on when the watch triggered). To achieve this we place our list of maps passed from the previous step under the appropriate "_doc" key. This can again be achieved through a simple Painless script.

"index_payload": {
        "transform": {
          "script": "return ['_doc':ctx.payload._value];"
        },
        "index": {
          "index": "cef-auditd-watch-results",
          "doc_type": "doc"
        }
}

Step 5 - Putting it all together

Assembling the above produces the following. Notice how we parameterize our time period N into the metadata variable "time_period". We also assume our target indices, prefixed with "cef-auditd", utilise a time-based structure with day granularity. Each of our queries therefore explicitly targets the current and previous (to detect process starts at midnight) day's indices.

{
  "watch": {
    "metadata": {
      "time_period": "5m"
    },
    "trigger": {
      "schedule": {
        "interval": "5m"
      }
    },
    "input": {
      "chain": {
        "inputs": [
          {
            "started_processes": {
              "search": {
                "request": {
                  "indices": [
                    "<cef-auditd-{now/d}>","<cef-auditd-{now/d-1d}>"
                  ],
                  "body": {
                    "query": {
                      "bool": {
                        "filter": [
                          {
                            "range": {
                              "deviceReceiptTime": {
                                "gte": "{{ctx.trigger.scheduled_time}}||-{{ctx.metadata.time_period}}",
"lt": "{{ctx.trigger.scheduled_time}}"
                              }
                            }
                          },
                          {
                            "term": {
                              "deviceEventCategory": {
                                "value": "EXECVE"
                              }
                            }
                          }
                        ]
                      }
                    },
                    "aggs": {
                      "process_hosts": {
                        "terms": {
                          "field": "process_signature",
                          "size": 1000
                        }
                      }
                    },
                    "size": 0
                  }
                }
              }
            }
          },
          {
            "history_started_processes": {
              "search": {
                "request": {
                  "indices": [
                    "cef-auditd-*"
                  ],
                  "body": {
                    "query": {
                      "bool": {
                        "filter": [
                          {
                            "terms": {
                              "process_signature": [
                             "{{#ctx.payload.started_processes.aggregations.process_hosts.buckets}}{{key}}",
                                "{{/ctx.payload.started_processes.aggregations.process_hosts.buckets}}"
                              ]
                            }
                          },
                          {
                            "range": {
                              "deviceReceiptTime": {
                                "lt": "{{ctx.trigger.scheduled_time}}||-{{ctx.metadata.time_period}}"
                              }
                            }
                          },
                          {
                            "term": {
                              "deviceEventCategory": {
                                "value": "EXECVE"
                              }
                            }
                          }
                        ]
                      }
                    },
                    "aggs": {
                      "process_hosts": {
                        "terms": {
                          "field": "process_signature",
                          "size": 1000
                        }
                      }
                    },
                    "size": 0
                  }
                }
              }
            }
          }
        ]
      }
    },
    "condition": {
      "script": "return ctx.payload.started_processes.aggregations.process_hosts.buckets.size() != ctx.payload.history_started_processes.aggregations.process_hosts.buckets.size();"
    },
    "transform": {
      "script": "def history=ctx.payload.history_started_processes.aggregations.process_hosts.buckets.stream().map(p -> p.key).collect(Collectors.toList()); def new_starts = ctx.payload.started_processes.aggregations.process_hosts.buckets.stream().map(e -> e.key).filter(p -> !history.contains(p)); return new_starts.map(p -> ['process_name':/\\|/.split(p)[1],'host_name':/\\|/.split(p)[0],'@timestamp':ctx.trigger.scheduled_time]).collect(Collectors.toList());"
    },
    "actions": {
      "log": {
        "logging": {
          "text": """
Processes started:
{{#ctx.payload._value}}{{process_name}} on server {{host_name}}
{{/ctx.payload._value}}
"""
        }
      },
      "index_payload": {
        "transform": {
          "script": "return ['_doc':ctx.payload._value];"
        },
        "index": {
          "index": "cef-auditd-watch-results",
          "doc_type": "doc"
        }
      }
    }
  }
}

Using a heatmap and saved search in Kibana we are able to visualise the documents generated by the above watch indicating when new processes are started on a server. Notice how we frequently detect new processes at the start of the data, before the frequency of such events reduces as expected.

Final Thoughts

The above watch concludes our alerting series on Arcsight data. Whilst this represents our most complex problem, the watch illustrates the importance of preparing data at index time for efficient querying. Furthermore, we have demonstrated how query design can ensure Elasticsearch performs the computationally expensive component of the problem - ideal given this is what we designed it for, thus improving performance and scalability of our alert.

Per usual for our examples, the watch would need enhancing to use one of alternative actions, such as an email or Slack, in order to effectively notify an administrator that investigation is required. The choice of action will depend on whether the alert is used to simply visualise new process starts, as shown above, or proactively report in time-sensitive environments.

The user may also wish to consider increasing the granularity of the process_signature field. The above example considers only the first argument of the command issued and thus may not report on executions of the same process with different parameters. Increasing granularity, by adding additional parameters to the signature, will result in more events however.

Despite using an efficient query, the above watch will degrade in performance over time as more data is added to the system. Furthermore, it requires the user keep a complete history of all processes and servers in order to avoid processes being incorrectly flagged as new. To address these challenges, the user may wish to consider rolling up documents when discovery and visualisations on the data are no longer required. This would involve periodically producing a document per host, with a process_signature field holding a complete list of the processes executed historically. These "host" documents would need to be periodically updated as data is expired.

Despite the above providing a useful and reusable example which could be applied to similar problems assuming availability of data (e.g., "Alert me when a user logs into a server for the first time"), it is likely to result in alert fatigue, the condition whereby we are faced with an overwhelmingly large number of alerts. Our definition of "unusual" here is "new" which is likely to flag many processes and quickly result in alert fatigue. To address this we can propose several solutions:

  • Refine our definition of unusual and introduce further query signatures (e.g. only processes with root privileges that perform X actions)
  • Restrict our query to a subset of servers of interest
  • Modify the queries to look for new processes that have never started in the same time period. This would additional filter aggregations to look for the same processes at the current time of day (expanded to a 2hr window possibly) over the previous 1,2,3,4,5,6,and 7 days.

Although effective at identifying new process starts, even with the above refinements, these watches could have a low signal to noise ratio. In our next post we begin to explore X-Pack Machine Learning capabilities to automatically identify unusual processes more easily, and with a higher signal to noise ratio!

Interested in learning more? Check out the other posts in this ArcSight + Elasticsearch series.