25 April 2018 Engineering

Pizza Delivery Metrics with Composite Aggregations in Elasticsearch

By Jason Zucchetto

Composite aggregations are a powerful new feature in Elasticsearch 6.1.  To show the full power of the feature, we’ll walk through creating an analytics service for Sliceline, a fictional pizza delivery company.

Composite aggregations allow us to:

  • Quickly paginate through aggregation results
  • Build new indices from aggregation results
  • Develop APIs backed by Elasticsearch aggregations with consistent response times for large result sets
Screen Shot 2018-04-26 at 10.32.36 AM.png

Paginating aggregations for pizza deliveries

Sliceline delivers pizzas to millions of customers. As with any pizza delivery service, each pizza is unique, and made to the customers specifications. The Sliceline business is now interested in viewing and analyzing customer buying habits, many of which will require nicely formatted results

Address

Deliveries

355 First St, San Francisco, CA

6

961 Union St, San Francisco, CA

8

123 Baker Way, San Francisco, CA

4

1700 Powell St, San Francisco, CA

10

900 Union St, San Francisco, CA

5

Results 101-105 of 10,000,000 results next page

The table above has always been possible with an Elasticsearch query, the limitation in versions previous to Elasticsearch 6.1 meant all results prior to the page being displayed were returned to the application. For instance, to create the table above (results 101-105), we need to query for the top 105 results, and ignore the first 100 results.

// Standard terms aggregation to create the table above
GET /pizza/delivery/_search
{
  "size": 0,
  "aggs": {
    "group_by_deliveries": {
      "terms": {
        "field": "full_address",
        "size" : 105
      }
    }
  }
}

Introducing Composite Aggregations, a better way to paginate aggregations

Composite aggregations allow us to paginate our aggregation query above, and provide a method for users (and machines, in the case of APIs) to quickly move through a result set.

Insert sample dataset

Let’s start by inserting our sample data set: pizza deliveries! We suggest performing these commands in Console, found via Kibana dev tools.

// Create our pizza index
PUT /pizza
{
  "mappings" : {
    "delivery" : {
      "properties": {
        // We won’t separate the address out into street, city, state, etc. in an effort to keep the tutorial simple
        "full_address": {
          "type": "keyword"
        },
        "order" : {
          "type" : "text"
        },
        "num_pizzas" : {
          "type" : "integer"
        },
        "timestamp" : {
          "type" : "date"
        }
      }
    }
  }
}

// Insert sample pizza deliveries
POST /pizza/delivery/_bulk
{ "index": { "_id": 1 }}
{"full_address" : "355 First St, San Francisco, CA", "order" : "cheese", "num_pizzas" : 2, "timestamp": "2018-04-10T12:25" }
{ "index": { "_id": 2 }}
{"full_address" : "961 Union St, San Francisco, CA", "order" : "cheese", "num_pizzas" : 3, "timestamp": "2018-04-11T12:25" }
{ "index": { "_id": 3 }}
{"full_address" : "123 Baker St, San Francisco, CA", "order" : "vegan", "num_pizzas" : 1, "timestamp": "2018-04-18T12:25" }
{ "index": { "_id": 4 }}
{"full_address" : "1700 Powell St, San Francisco, CA", "order" : "cheese", "num_pizzas" : 5, "timestamp": "2018-04-18T12:25" }
{ "index": { "_id": 5 }}
{"full_address" : "900 Union St, San Francisco, CA", "order" : "pepperoni", "num_pizzas" : 4, "timestamp": "2018-04-18T12:25" }
{ "index": { "_id": 6 }}
{"full_address" : "355 First St, San Francisco, CA", "order" : "pepperoni", "num_pizzas" : 3, "timestamp": "2018-04-10T12:25" }
{ "index": { "_id": 7 }}
{"full_address" : "961 Union St, San Francisco, CA", "order" : "cheese", "num_pizzas" : 1, "timestamp": "2018-04-12T12:25" }
{ "index": { "_id": 8 }}
{"full_address" : "100 First St, San Francisco, CA", "order" : "pepperoni", "num_pizzas" : 3, "timestamp": "2018-04-11T12:25" }
{ "index": { "_id": 9 }}
{"full_address" : "101 First St, San Francisco, CA", "order" : "cheese", "num_pizzas" : 5, "timestamp": "2018-04-11T12:25" }
{ "index": { "_id": 10 }}
{"full_address" : "355 First St, San Francisco, CA", "order" : "cheese", "num_pizzas" : 10, "timestamp": "2018-04-10T12:25" }
{ "index": { "_id": 11 }}
{"full_address" : "100 First St, San Francisco, CA", "order" : "pepperoni", "num_pizzas" : 4, "timestamp": "2018-04-11T14:25" }
    

Using Composite Aggregations to Paginate Our Results

Let’s revisit our original terms aggregation and provide a more efficient mechanism for paginating results with composite aggregations.

GET /pizza/delivery/_search
{
  "size" : 0,
  "track_total_hits" : false,
  "aggs" : {
    "group_by_deliveries": {
      "composite" : {
        "size": 3,
        "sources" : [
          { "by_address": { "terms" : { "field": "full_address" } } }
        ]
      }
    } 
  } 
}

The aggregation above will give us the first page of results (notice the size parameter is set to 3):

Address

Deliveries

100 First St, San Francisco, CA

2

101 First St, San Francisco, CA

1

123 Baker St, San Francisco, CA

1

Now, let’s provide a fast way to show the next page of results using composite aggregations. Since “123 Baker St, San Francisco, CA” was the last result from our previous query, we’ll need to specify this value in the “after” field within our composite aggregation.

GET /pizza/delivery/_search
{
  "size" : 0,
  "track_total_hits" : false,
  "aggs" : {
    "group_by_deliveries": {
      "composite" : {
        // Value from last result of previous query
        "after" : { "by_address" : "123 Baker St, San Francisco, CA" },
        "size": 3,
        "sources" : [
          { "by_address": { "terms" : { "field": "full_address" } } }
        ]
      }
    } 
  } 
}

We’ve also included a parameter called “track_total_hits” and set it to false, this allows Elasticsearch to terminate the query early after it has found enough buckets for the result.

Create a Date Histogram for Each Address

Let’s use multiple sources within our composite aggregation to group results by address, and create a date histogram for each address!

GET /pizza/delivery/_search
{
  "size" : 0,
  "track_total_hits" : false,
  "aggs" : {
     "group_by_deliveries": {
       "composite" : {
         "size": 3,
         "sources" : [
           { "by_address": { "terms" : { "field": "full_address" } } },
           { "histogram": { "date_histogram" : { "field": "timestamp", "interval": "1d" } } }
         ]
      }
    } 
  } 
}

Our query above will return the following result:

{
    "took": 1,
    "timed_out": false,
    "_shards": {
      "total": 5,
      "successful": 5,
      "skipped": 0,
      "failed": 0
    },
    "hits": {
      "total": -1,
      "max_score": 0,
      "hits": []
    },
    "aggregations": {
      "group_by_deliveries": {
        "buckets": [
          {
            "key": {
              "by_address": "1700 Powell St, San Francisco, CA",
              "histogram": 1524009600000
            },
            "doc_count": 1
          },
          {
            "key": {
              "by_address": "355 First St, San Francisco, CA",
              "histogram": 1523318400000
            },
            "doc_count": 3
          },
          {
            "key": {
              "by_address": "900 Union St, San Francisco, CA",
              "histogram": 1524009600000
            },
            "doc_count": 1
          }
        ]
      }
    }
  }

The aggregation above will create a histogram for each address, with a one day interval for values. We can now use the “after” parameter, described in the previous example, to quickly move through the result set:

GET /pizza/delivery/_search
{
  "size" : 0,
  "track_total_hits" : false,
  "aggs" : {
    "group_by_deliveries": {
      "composite" : {
        // Last result from previous query
        "after" : {
          "by_address":  "900 Union St, San Francisco, CA",
          "histogram": 1524009600000
        },
        "size": 3,
        "sources" : [
            { "by_address": { "terms" : { "field": "full_address" } } },
            { "histogram": { "date_histogram" : { "field": "timestamp", "interval": "1d" } } }
        ]
      }
    } 
  } 
}

Yielding the following result:

{
    "took": 1,
    "timed_out": false,
    "_shards": {
      "total": 5,
      "successful": 5,
      "skipped": 0,
      "failed": 0
    },
    "hits": {
      "total": -1,
      "max_score": 0,
      "hits": []
    },
    "aggregations": {
      "group_by_deliveries": {
        "buckets": [
          {
            "key": {
              "by_address": "961 Union St, San Francisco, CA",
              "histogram": 1523404800000
            },
            "doc_count": 1
          },
          {
            "key": {
              "by_address": "961 Union St, San Francisco, CA",
              "histogram": 1523491200000
            },
            "doc_count": 1
          }
        ]
      }
    }
  }

Average Number of Pizzas By Day

It would be nice to know the average number of pizzas delivered per day, for every address in our index. Composite aggregations allow us specify sub-aggregations, such as an average:

GET /pizza/delivery/_search
{
  "size" : 0,
  "track_total_hits" : false,
  "aggs" : {
    "group_by_deliveries": {
      "composite" : {
        "size": 3,
        "sources" : [
          { "by_address": { "terms" : { "field": "full_address" } } },
          { "histogram": { "date_histogram" : { "field": "timestamp", "interval": "1d" } } }
        ]
      },
      "aggregations": {
        "avg_pizzas_per_day": {
           "avg": { "field": "num_pizzas" }
        }
      }
    } 
  } 
}

With the following result:

{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": -1,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "group_by_deliveries": {
      "buckets": [
        {
          "key": {
            "by_address": "100 First St, San Francisco, CA",
            "histogram": 1523404800000
          },
          "doc_count": 2,
          "avg_pizzas_per_day": {
            "value": 3.5
          }
        },
        {
          "key": {
            "by_address": "101 First St, San Francisco, CA",
            "histogram": 1523404800000
          },
          "doc_count": 1,
          "avg_pizzas_per_day": {
            "value": 5
          }
        },
        {
          "key": {
            "by_address": "123 Baker St, San Francisco, CA",
            "histogram": 1524009600000
          },
          "doc_count": 1,
          "avg_pizzas_per_day": {
            "value": 1
          }
        }
      ]
    }
  }
}
    

You can see from the result above that 100 First St averages 3.5 pizzas delivered per day!

Sequential Access

There is no method to jump directly to a page of aggregation results, sequential access is required. You may have seen this method referred to as “range based pagination” in other systems. Even with an RDBMS, it is usually more efficient to use a range based pagination strategy over skip and limit, which becomes more expensive as users move farther through the result set.

Sort Order

With composite aggregations, it is not possible to sort by anything other than the key used in the aggregation (such as our “by_address” value used in the previous example). By limiting sort options, this ensures responses for composite aggregations are always fast and accurate.

Composite Aggregations in Elasticsearch

The composite aggregation in Elasticsearch is a powerful new feature, allowing you to quickly paginate over large aggregation result sets sequentially and with predictable response times. The composite aggregation is also used by other upcoming Elasticsearch features, such as data rollups (build optimized indices from aggregated items) and SQL group_by (paginate over a large set of results).

We encourage you to learn more about the feature through the Elasticsearch documentation on composite aggregations, and download the latest version of Elasticsearch to try the examples above!