Watcher condition contextedit

Use a Painless script as a watch condition that determines whether to execute a watch or a particular action within a watch. Condition scripts return a Boolean value to indicate the status of the condition.

The following variables are available in all watcher contexts.

Variables

params (Map, read-only)
User-defined parameters passed in as part of the query.
ctx['watch_id'] (String, read-only)
The id of the watch.
ctx['id'] (String, read-only)
The server generated unique identifer for the run watch.
ctx['metadata'] (Map, read-only)
Metadata can be added to the top level of the watch definition. This is user defined and is typically used to consolidate duplicate values in a watch.
ctx['execution_time'] (ZonedDateTime, read-only)
The time the watch began execution.
ctx['trigger']['scheduled_time'] (ZonedDateTime, read-only)
The scheduled trigger time for the watch. This is the time the watch should be executed.
ctx['trigger']['triggered_time'] (ZonedDateTime, read-only)
The actual trigger time for the watch. This is the time the watch was triggered for execution.
ctx['payload'] (Map, read-only)
The accessible watch data based upon the watch input.

API

The standard Painless API is available.

To run this example, first follow the steps in context examples.

Return

boolean
Expects true if the condition is met, and false if it is not.

API

The standard Painless API is available.

Example

POST _watcher/watch/_execute
{
  "watch" : {
    "trigger" : { "schedule" : { "interval" : "24h" } },
    "input" : {
      "search" : {
        "request" : {
          "indices" : [ "seats" ],
          "body" : {
            "query" : {
              "term": { "sold": "true"}
            },
            "aggs" : {
              "theatres" : {
                "terms" : { "field" : "play" },
                "aggs" : {
                  "money" : {
                    "sum": { "field" : "cost" }
                  }
                }
              }
            }
          }
        }
      }
    },
    "condition" : {
      "script" :
      """
        return ctx.payload.aggregations.theatres.buckets.stream()       
          .filter(theatre -> theatre.money.value < 15000 ||
                             theatre.money.value > 50000)               
          .count() > 0                                                  
      """
    },
    "actions" : {
      "my_log" : {
        "logging" : {
          "text" : "The output of the search was : {{ctx.payload.aggregations.theatres.buckets}}"
        }
      }
    }
  }
}

The Java Stream API is used in the condition. This API allows manipulation of the elements of the list in a pipeline.

The stream filter removes items that do not meet the filter criteria.

If there is at least one item in the list, the condition evaluates to true and the watch is executed.

The following action condition script controls execution of the my_log action based on the value of the seats sold for the plays in the data set. The script aggregates the total sold seats for each play and returns true if there is at least one play that has sold over $50,000.

POST _watcher/watch/_execute
{
  "watch" : {
    "trigger" : { "schedule" : { "interval" : "24h" } },
    "input" : {
      "search" : {
        "request" : {
          "indices" : [ "seats" ],
          "body" : {
            "query" : {
              "term": { "sold": "true"}
            },
            "aggs" : {
              "theatres" : {
                "terms" : { "field" : "play" },
                "aggs" : {
                  "money" : {
                    "sum": { "field" : "cost" }
                  }
                }
              }
            }
          }
        }
      }
    },
    "actions" : {
      "my_log" : {
        "condition": {                                                
          "script" :
          """
            return ctx.payload.aggregations.theatres.buckets.stream()
              .anyMatch(theatre -> theatre.money.value > 50000)       
          """
        },
        "logging" : {
          "text" : "At least one play has grossed over $50,000: {{ctx.payload.aggregations.theatres.buckets}}"
        }
      }
    }
  }
}

This example uses a nearly identical condition as the previous example. The differences below are subtle and are worth calling out.

The location of the condition is no longer at the top level, but is within an individual action.

Instead of a filter, anyMatch is used to return a boolean value

The following example shows scripted watch and action conditions within the context of a complete watch. This watch also uses a scripted transform.

POST _watcher/watch/_execute
{
  "watch" : {
    "metadata" : { "high_threshold": 50000, "low_threshold": 15000 },
    "trigger" : { "schedule" : { "interval" : "24h" } },
    "input" : {
      "search" : {
        "request" : {
          "indices" : [ "seats" ],
          "body" : {
            "query" : {
              "term": { "sold": "true"}
            },
            "aggs" : {
              "theatres" : {
                "terms" : { "field" : "play" },
                "aggs" : {
                  "money" : {
                    "sum": { "field" : "cost" }
                  }
                }
              }
            }
          }
        }
      }
    },
    "condition" : {
      "script" :
      """
        return ctx.payload.aggregations.theatres.buckets.stream()
          .anyMatch(theatre -> theatre.money.value < ctx.metadata.low_threshold ||
                               theatre.money.value > ctx.metadata.high_threshold)
      """
    },
    "transform" : {
      "script":
      """
        return [
          'money_makers': ctx.payload.aggregations.theatres.buckets.stream()
            .filter(t -> {
                return t.money.value > ctx.metadata.high_threshold
            })
            .map(t -> {
                return ['play': t.key, 'total_value': t.money.value ]
            }).collect(Collectors.toList()),
          'duds' : ctx.payload.aggregations.theatres.buckets.stream()
            .filter(t -> {
                return t.money.value < ctx.metadata.low_threshold
            })
            .map(t -> {
                return ['play': t.key, 'total_value': t.money.value ]
            }).collect(Collectors.toList())
          ]
      """
    },
    "actions" : {
      "log_money_makers" : {
        "condition": {
          "script" : "return ctx.payload.money_makers.size() > 0"
        },
        "transform": {
          "script" :
          """
          def formatter = NumberFormat.getCurrencyInstance();
          return [
            'plays_value': ctx.payload.money_makers.stream()
              .map(t-> formatter.format(t.total_value) + ' for the play ' + t.play)
              .collect(Collectors.joining(", "))
          ]
          """
        },
        "logging" : {
          "text" : "The following plays contain the highest grossing total income: {{ctx.payload.plays_value}}"
        }
      },
      "log_duds" : {
        "condition": {
          "script" : "return ctx.payload.duds.size() > 0"
        },
        "transform": {
          "script" :
          """
          def formatter = NumberFormat.getCurrencyInstance();
          return [
            'plays_value': ctx.payload.duds.stream()
              .map(t-> formatter.format(t.total_value) + ' for the play ' + t.play)
              .collect(Collectors.joining(", "))
          ]
          """
        },
        "logging" : {
          "text" : "The following plays need more advertising due to their low total income: {{ctx.payload.plays_value}}"
        }
      }
    }
  }
}

The following example shows the use of metadata and transforming dates into a readable format.

POST _watcher/watch/_execute
{
  "watch" : {
    "metadata" : { "min_hits": 10000 },
    "trigger" : { "schedule" : { "interval" : "24h" } },
    "input" : {
      "search" : {
        "request" : {
          "indices" : [ "seats" ],
          "body" : {
            "query" : {
              "term": { "sold": "true"}
            },
            "aggs" : {
              "theatres" : {
                "terms" : { "field" : "play" },
                "aggs" : {
                  "money" : {
                    "sum": { "field" : "cost" }
                  }
                }
              }
            }
          }
        }
      }
    },
    "condition" : {
      "script" :
      """
        return ctx.payload.hits.total > ctx.metadata.min_hits
      """
    },
    "transform" : {
      "script" :
      """
        def theDate = ZonedDateTime.ofInstant(ctx.execution_time.toInstant(), ctx.execution_time.getZone());
        return ['human_date': DateTimeFormatter.RFC_1123_DATE_TIME.format(theDate),
                'aggregations': ctx.payload.aggregations]
      """
    },
    "actions" : {
      "my_log" : {
        "logging" : {
          "text" : "The watch was successfully executed on {{ctx.payload.human_date}} and contained {{ctx.payload.aggregations.theatres.buckets.size}} buckets"
        }
      }
    }
  }
}