Enriquecimiento de datos de Elasticsearch con Logstash: Algunos ejemplos de seguridad

El enriquecimiento de datos proporciona conocimientos adicionales sobre sus datos y, por lo general, se usa en casos de seguridad. Estas son algunas preguntas comunes relacionadas con la seguridad:

  • ¿Hay alguna IP de red de robots visitando mi servidor web?
  • ¿Estas direcciones IP existen en una DNSBL externa (lista de bloqueo DNS)?
  • ¿Hay alguien en mi organización que visita una URL/un dominio de malware conocido?

Existen tres formas comunes de realizar el enriquecimiento de datos en Logstash: Elasticsearch, DNS y filtros de traducción (translate filters). Cualquiera que elija depende en gran parte de la fuente de datos para el enriquecimiento.

Y, si el rendimiento es una preocupación, el complemento de caché en memoria de Logstash puede ayudar.

Filtro de Logstash: Elasticsearch

Primero, examinemos el filtro de Logstash Elasticsearch. Permite realizar búsquedas contra índices de Elasticsearch. Los campos de interés en los resultados pueden incorporarse en el evento actual.

Por ejemplo, si su fuente de datos de amenazas está almacenada en un índice de Elasticsearch (por ejemplo, logstash-input-blueliv), es probable que este sea el método más fácil para la integración.

Por ejemplo, suponiendo que su índice de lista de botip tiene una estructura como la siguiente:

PUT botlist/ip/1
{
   "ip" : "1.1.1.1"
}

Entonces, en el filtro logstash elasticsearch, necesitará establecer la consulta y los campos que quiere llenar desde su índice de botlist. En este caso, está usando el campo "botip" para indicar que se encontró en el índice botlist. Luego, la sección "mutate" (mutación) agregará una etiqueta "botnet" en el evento final.

filter {
  elasticsearch {
    hosts => "your_es_host"
    index => "botlist"
    query => "ip:%{message}"    
    fields => { "ip" => "botip" }
    result_size => 1
    enable_sort => false
  }

  if [botip] {
    mutate {
      add_tag => ["botnet"]
      remove_field => "botip"
    }
  }
}

Si su sistema requiere acción de seguridad inmediata durante la introducción de datos, puede considerar agregar una consulta filtrada en elasticsearch. Consulte nuestra documentación oficial en consulta filtrada.

Filtro DNS Logstash

A continuación, el filtro DNS Logstash se puede usar para realizar búsquedas DNS para resolver un dominio a una dirección IP o una búsqueda inversa para asignar una IP a un dominio.

Un servicio DNSBL a veces requiere un formato DNS especial para búsquedas de resultados. Por ejemplo, spamhaus.org usa un formato de dirección IP inversa para verificar si una dirección está listada en su servicio.

El ejemplo siguiente demuestra cómo reformar una dirección IP usando un filtro grok, luego utilice el filtro DNS para verificar si dicho dominio está listado en zen.spamhaus.org:

filter {
   grok {
     match => { "message" => "%{WORD:addr1}.%{WORD:addr2}.%{WORD:addr3}.%{WORD:addr4}" }     <= extract different part of the IP address 
   }
   mutate {
     add_field => {
     "spamhaus_reverse_lookup" => "%{addr4}.%{addr3}.%{addr2}.%{addr1}.zen.spamhaus.org" . <= reformat the address for spamhaus query
     }
   }
   dns {
     resolve => [ "spamhaus_reverse_lookup" ] <= perform lookup using the reverse address format.
     nameserver => [ "10.0.1.1" ]
     add_tag => [ "dns_successful_lookup" ]
     action => replace
   }
   if "dns_successful_lookup" in [tags] {
     if [spamhaus_reverse_lookup] == "127.0.0.2" { <= 127.0.0.2 is a special return address from spamhaus that indicate the lookup address is listed in spamhaus spam database
       mutate {
          add_tag => [ "spam_address" ]
       }
     }
   }
}

Filtro Logstash Translate

Por último, pero no por ello menos importante, el filtro Logstash también puede alcanzar el objetivo de la búsqueda usando simplemente un diccionario para asignar valores coincidentes entre columnas/pares clave-valor.

Por ejemplo, Malware Domain List proporciona una fuente gratuita en formato CSV. Se puede convertir en una entrada para el filtro translate de la siguiente manera:

$ more malware.yaml
"213.155.12.XXX/sec/bin/upload/v1crypted.exe": "true" <= true indicate it's a known malware URL
"128.134.30.XXX/w.exe" : "true"
"114.203.87.XXX/help.asp" : "true"

Con esto, el ejemplo siguiente de filtro translate se puede usar para detectar si un mensaje o URL están listados en la fuente de malware:

$ more malware.cfg
input {
  stdin { 
    codec => json 
  }
}
filter {
  translate {
     field => "url"
     destination => "malware" <= if a match is found, the translate filter will assign the mapped value to the destination field. In this example, if the value in url match an entry in the input CSV, malware field will assign a 'true' value.
     dictionary_path => "malware.yaml"
  }
}
output {
  stdout {
     codec => rubydebug 
  }
}

Entrada

{ "url" : "128.134.30.XXX/w.exe" }

Salida

{
    "@timestamp" => 2018-01-15T09:53:10.829Z,
       <b>"malware" => "true",</b>
           "url" => "128.134.30.XXX/w.exe",
      "@version" => "1",
          "host" => "localhost"
}

Enriquecimiento a escala

Mientras Elasticsearch y los filtros para traducir son buenos para pequeñas cargas de trabajo, podemos mejorar esto proporcionando una capa de enriquecimiento escalable que no mantiene el estado en los nodos de Logstash.

Por ejemplo, con un prototipo de complemento de caché basado en memoria actualizado recientemente, podemos hacer búsquedas sin bloqueos extremadamente rápido en lo que sea que queramos coincidir. Algunos ejemplos podrían ser solicitudes de malware, datos de amenazas (IP incorrectas conocidas) o incluso capitales (IP del servidor para búsquedas de host). Las búsquedas en caché basada en memoria son búsquedas simples de clave/valor. El otro beneficio para la caché basada en memoria es que no se bloqueará en las actualizaciones. Como tenemos una carga de trabajo de lectura muy alta, no queremos bloquear las búsquedas cuando actualizamos nuestras claves de enriquecimiento. Como la caché basada en memoria es volátil y no persistirá después de los reinicios, debe tener esto en cuenta al actualizar los datos y asegurarse de almacenar esta información en un almacenamiento persistente de modo que pueda volver a llenar el caché basada en memoria si es necesario.

Al usar memcache y grupos de almacenamiento de memcache, podemos escalar y empujar todo el tráfico que queremos. Con la configuración adecuada, una sola instancia de caché basada en memoria puede manejar de manera moderada más de cien mil búsquedas por segundo. Por documento basado en memoria caché, sugieren más de 200 mil, pero hemos visto cantidades mayores con el ajuste apropiado. En una configuración de grupo, esto escalará linealmente.

El nuevo complemento basado en memoria caché de filtro de logstash es una reescritura de otras versiones que se han publicado. Esta versión admite lo siguiente

  • Grupos basados en memoria caché a través de hash constante
  • Espacios de nombres
  • Obtención múltiple/ajuste múltiple

El equipo de Logstash planea lanzar un complemento admitido basado en memoria caché con funcionalidad equivalente en el futuro pero por ahora, usemos el prototipo y comencemos con un caso de uso común de muestra.

Ejemplo: Búsqueda de Botip

Un caso de uso común es buscar IP de una fuente de spam/bot:

filter {
        memcached {
            hosts => ["127.0.0.1:11211"]
            get => {
                    "%{ip}" => "threat_src"
            }
        }
}

Nota: Este es simplemente un host basado en memoria caché para fines de demostración

En este ejemplo, necesitaríamos llenar las IP en caché basado en memoria. La clave sería la IP. Entonces, la solicitud real de caché basada en memoria sería de la siguiente manera.

$ telnet localhost 11211
set 1.1.1.1 0 900 5 data   <= set 1.1.1.1 as a botip address, 900 is the entry exipration time
botip                      <= this is the threat source name
STORED
get 1.1.1.1                <= double check
VALUE 1.1.1.1 0 5
botip
END

A continuación figura un ejemplo completo con archivo de configuración logstash, entrada y salida:

$ more memcache.cfg
input {
  stdin { 
    codec => json 
  }
}
filter {
  memcached {
     hosts => ["localhost:11211"]
     get => {
        "%{ip}" => "threat_src"
     }
  }
}
output {
  stdout {
     codec => rubydebug 
  }
}

Entrada

{ "ip" : "1.1.1.1" }

Salida

{
    "@timestamp" => 2018-01-17T03:54:36.642Z,
          "host" => "localhost",
            "ip" => "1.1.1.1",
    <b>"threat_src" => "botip",</b>
      "@version" => "1"
}

El valor devuelto por el filtro de caché basada en memoria se asignaría a la variable threat_src y aparecería en su registro como "threat_src" : "botip". Si no hay coincidencia, threat_src no se agregará en el registro final.

Con cualquiera de estos ejemplos, depende de usted mantener y llenar la caché basada en memoria con la información que desea enriquecer. Al usar grupos, asegúrese de que la configuración sea la misma en los dos nodos que llenan la caché basada en memoria, además de los nodos de Logstash que están suscribiendo a los nodos de caché basada en memoria.

En general, el enriquecimiento de datos al momento de la ingestión usando Logstash brinda a los analistas un acceso fácil y rápido a información contextual que les ayuda a identificar rápidamente problemas y otros asuntos interesantes en sus datos. Cuando se usa Elastic Stack para casos prácticos de análisis de seguridad, las técnicas de enriquecimiento, como aquellas que aparecen en este artículo, pueden ser muy valiosas para identificar rápidamente actividad relacionada con amenazas presentes en mensajes de eventos relacionados con la seguridad, lo que permite a los analistas investigar aún más y comenzar sus procesos de respuesta ante incidentes.