Finding suspicious client IPs by using scripted metricsedit

With data frame transforms, you can use scripted metric aggregations on your data. These aggregations are flexible and make it possible to perform very complex processing. Let’s use scripted metrics to identify suspicious client IPs in the web log sample dataset.

We transform the data such that the new index contains the sum of bytes and the number of distinct URLs, agents, incoming requests by location, and geographic destinations for each client IP. We also use a scripted field to count the specific types of HTTP responses that each client IP receives. Ultimately, the example below transforms web log data into an entity centric index where the entity is clientip.

POST _data_frame/transforms/_preview
{
  "source": {
    "index": "kibana_sample_data_logs",
    "query": { 
      "range" : {
        "timestamp" : {
          "gte" : "now-30d/d"
        }
      }
    }
  },
  "dest" : { 
    "index" : "sample_weblogs_by_clientip"
  },
  "pivot": {
    "group_by": {  
      "clientip": { "terms": { "field": "clientip" } }
      },
    "aggregations": {
      "url_dc": { "cardinality": { "field": "url.keyword" }},
      "bytes_sum": { "sum": { "field": "bytes" }},
      "geo.src_dc": { "cardinality": { "field": "geo.src" }},
      "agent_dc": { "cardinality": { "field": "agent.keyword" }},
      "geo.dest_dc": { "cardinality": { "field": "geo.dest" }},
      "responses.total": { "value_count": { "field": "timestamp" }},
      "responses.counts": { 
        "scripted_metric": {
          "init_script": "state.responses = ['error':0L,'success':0L,'other':0L]",
          "map_script": """
            def code = doc['response.keyword'].value;
            if (code.startsWith('5') || code.startsWith('4')) {
              state.responses.error += 1 ;
            } else if(code.startsWith('2')) {
              state.responses.success += 1;
            } else {
              state.responses.other += 1;
            }
            """,
          "combine_script": "state.responses",
          "reduce_script": """
            def counts = ['error': 0L, 'success': 0L, 'other': 0L];
            for (responses in states) {
              counts.error += responses['error'];
              counts.success += responses['success'];
              counts.other += responses['other'];
            }
            return counts;
            """
          }
        },
      "timestamp.min": { "min": { "field": "timestamp" }},
      "timestamp.max": { "max": { "field": "timestamp" }},
      "timestamp.duration_ms": { 
        "bucket_script": {
          "buckets_path": {
            "min_time": "timestamp.min.value",
            "max_time": "timestamp.max.value"
          },
          "script": "(params.max_time - params.min_time)"
        }
      }
    }
  }
}

This range query limits the transform to documents that are within the last 30 days at the point in time the data frame transform checkpoint is processed. For batch data frames this occurs once.

This is the destination index for the data frame. It is ignored by _preview.

The data is grouped by the clientip field.

This scripted_metric performs a distributed operation on the web log data to count specific types of HTTP responses (error, success, and other).

This bucket_script calculates the duration of the clientip access based on the results of the aggregation.

The preview shows you that the new index would contain data like this for each client IP:

{
  "preview" : [
    {
      "geo" : {
        "src_dc" : 12.0,
        "dest_dc" : 9.0
      },
      "clientip" : "0.72.176.46",
      "agent_dc" : 3.0,
      "responses" : {
        "total" : 14.0,
        "counts" : {
          "other" : 0,
          "success" : 14,
          "error" : 0
        }
      },
      "bytes_sum" : 74808.0,
      "timestamp" : {
        "duration_ms" : 4.919943239E9,
        "min" : "2019-06-17T07:51:57.333Z",
        "max" : "2019-08-13T06:31:00.572Z"
      },
      "url_dc" : 11.0
    },
    ...
  }

This data frame makes it easier to answer questions such as:

  • Which client IPs are transferring the most amounts of data?
  • Which client IPs are interacting with a high number of different URLs?
  • Which client IPs have high error rates?
  • Which client IPs are interacting with a high number of destination countries?