Watching event dataedit

If you are indexing event data, such as log messages, network traffic, or a web feed, you can create a watch to email notifications when certain events occur. For example, if you index a feed of RSVPs for meetup events happening around the world, you can create a watch that alerts you to interesting events.

To index the meetup data, you can use Logstash to ingest live data from the Meetup.com streaming API, http://stream.meetup.com/2/rsvps.

To ingest this data with Logstash:

  1. Download Logstash and unpack the archive file.
  2. Create a Logstash configuration file that uses the Logstash standard input and the Logstash standard output and save it in logstash-{version} directory as livestream.conf:

    input {
      stdin {
        codec => json 
      }
    }
    filter {
      date {
        match => [ "event.time", "UNIX_MS" ]
        target => "event_time"
      }
    }
    output { 
      stdout {
        codec => rubydebug
      }
      elasticsearch {
        hosts => "http://localhost:9200"
        user  => "elastic"
        password  => "x-pack-test-password"
      }
    }

    The meetup data is formatted in JSON.

    Index the meetup data into Elasticsearch.

  3. To start indexing the meetup data, pipe the RSVP stream into Logstash and specify your livestream.conf configuration file.

    curl http://stream.meetup.com/2/rsvps | bin/logstash -f livestream.conf

Now that you’re indexing the meetup RSVPs, you can set up a watch that lets you know about events you might be interested in. For example, let’s create a watch that runs every hour, looks for events that talk about Open Source, and sends an email with information about the events.

To set up the watch:

  1. Specify how often you want to run the watch by adding a schedule trigger to the watch:

    {
      "trigger": {
        "schedule": {
          "interval": "1h"
        }
      },
  2. Load data into the watch payload by creating an input that searches the meetup data for events that have Open Source as a topic. You can use aggregations to group the data by city, consolidate references to the same events, and sort the events by date.

    "input": {
        "search": {
          "request": {
            "indices": [
              "logstash" 
            ],
            "body": {
              "size": 0,
              "query": {
                "bool": {
                  "filter": [
                    {
                      "range": {
                        "@timestamp": {
                          "gte": "now-3h"
                        }
                      }
                    },
                    {
                      "match": {
                        "group.group_topics.topic_name": "Open Source" 
                      }
                    }
                  ]
                }
              },
              "aggs": {
                "group_by_city": {
                  "terms": {
                    "field": "group.group_city.keyword", 
                    "size": 5
                  },
                  "aggs": {
                    "group_by_event": {
                      "terms": {
                        "field": "event.event_url.keyword", 
                        "size": 5
                      },
                      "aggs": {
                        "get_latest": {
                          "terms": {
                            "field": "@timestamp", 
                            "size": 1,
                            "order": {
                              "_key": "desc"
                            }
                          },
                          "aggs": {
                            "group_by_event_name": {
                              "terms": {
                                "field": "event.event_name.keyword" 
                              }
                            }
                          }
                        }
                      }
                    }
                  }
                }
              }
            }
          }
        }
      },

    logstash is the default index alias for the Logstash indices containing the meetup data. By default, the Logstash index lifecycle management (ILM) policy rolls this alias to a new index when the index size reaches 50GB or becomes 30 days old. For more information, see ILM defaults in Logstash.

    Find all of the RSVPs with Open Source as a topic.

    Group the RSVPs by city.

    Consolidate multiple RSVPs for the same event.

    Sort the events so the latest events are listed first.

    Group the events by name.

  3. To determine whether or not there are any Open Source events, add a compare condition that checks the watch payload to see if there were any search hits.

    "compare" : { "ctx.payload.hits.total" : { "gt" : 0 }}
  4. To send an email when Open Source events are found, add an email action:

    "actions": {
        "email_me": {
          "throttle_period": "10m",
          "email": {
            "from": "<from:email address>",
            "to": "<to:email address>",
            "subject": "Open Source Events",
            "body": {
              "html": "Found events matching Open Source: <ul>{{#ctx.payload.aggregations.group_by_city.buckets}}<          li>{{key}} ({{doc_count}})<ul>{{#group_by_event.buckets}}
              <li><a href=\"{{key}}\">{{get_latest.buckets.0.group_by_event_name.buckets.0.key}}</a>
              ({{doc_count}})</li>{{/group_by_event.buckets}}</ul></li>
              {{/ctx.payload.aggregations.group_by_city.buckets}}</ul>"
            }
          }
        }
      }

To enable Watcher to send emails, you must configure an email account in elasticsearch.yml. For more information, see Configuring email accounts.

The complete watch looks like this:

PUT _watcher/watch/meetup
{
  "trigger": {
    "schedule": {
      "interval": "1h"
    }
  },
  "input": {
    "search": {
      "request": {
        "indices": [
          "logstash"
        ],
        "body": {
          "size": 0,
          "query": {
            "bool": {
              "filter": [
                {
                  "range": {
                    "@timestamp": {
                      "gte": "now-3h"
                    }
                  }
                },
                {
                  "match": {
                    "group.group_topics.topic_name": "Open Source"
                  }
                }
              ]
            }
          },
          "aggs": {
            "group_by_city": {
              "terms": {
                "field": "group.group_city.keyword",
                "size": 5
              },
              "aggs": {
                "group_by_event": {
                  "terms": {
                    "field": "event.event_url.keyword",
                    "size": 5
                  },
                  "aggs": {
                    "get_latest": {
                      "terms": {
                        "field": "@timestamp",
                        "size": 1,
                        "order": {
                          "_key": "desc"
                        }
                      },
                      "aggs": {
                        "group_by_event_name": {
                          "terms": {
                            "field": "event.event_name.keyword"
                          }
                        }
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
  },
  "condition": {
    "compare": {
      "ctx.payload.hits.total": {
        "gt": 0
      }
    }
  },
  "actions": {  
    "email_me": {
      "throttle_period": "10m",
      "email": {
        "from": "username@example.org",  
        "to": "recipient@example.org",   
        "subject": "Open Source events",
        "body": {
          "html": "Found events matching Open Source: <ul>{{#ctx.payload.aggregations.group_by_city.buckets}}<li>{{key}} ({{doc_count}})<ul>{{#group_by_event.buckets}}<li><a href=\"{{key}}\">{{get_latest.buckets.0.group_by_event_name.buckets.0.key}}</a> ({{doc_count}})</li>{{/group_by_event.buckets}}</ul></li>{{/ctx.payload.aggregations.group_by_city.buckets}}</ul>"
         }
      }
    }
  }
}

The email body can include Mustache templates to reference data in the watch payload. By default,it will be sanitized to block dangerous content.

Replace the from address with the email address you configured in elasticsearch.yml.

Replace the to address with your email address to receive notifications.

Now that you’ve created your watch, you can use the _execute API to run it without waiting for the schedule to trigger execution:

POST _watcher/watch/meetup/_execute