City Bikes Part two

UPDATE: This article refers to our hosted Elasticsearch offering by an older name, Found. Please note that Found is now known as Elastic Cloud.

Reindexing and Query Optimization with Filters

In this article we will look at basic memory optimization of our queries and while doing so, we will write a small script for reindexing our data.

City Bikes Continued…

In the previous City Bikes article we left the index with the following mappings:

mappings: {
    bikerack: {
        properties: {
            timestamp: {
                format: date_time_no_millis
                type: date
            }
            bicycles: {
                type: long
            }
            weekday: {
                type: string
            }
            location: {
                properties: {
                    lon: {
                        type: double
                    }
                    lat: {
                        type: double
                    }
                }
            }
            name: {
                type: string
            }
            locks: {
                type: long
            }
            hourOfDay: {
                type: string
            }
        }
    }
}

In retrospect this mapping is not ideal. The only field I gave some thought was the timestamp, but it could actually have been auto detected as well. Now that we have some data and some idea of what queries we would like, let’s have closer look at our mappings. The timestamp field is a date and date_time_no_millis has ample resolution, so we’ll leave that as is. The bicycles and locks fields have been correctly auto detected as numerical types, but the default integral type of long is wasting memory. Looking at our data it is evident that the largest racks have about 30 bikes or locks and that they fill fit nicely into one byte. The weekday field and the hourOfDay fields have been detected as strings since I by mistake used quotes in the initial version of the parser. With values from 1 to 7 and 0 to 23 they also fit into the smallest integer type available, byte. The location field is stored as a nested field with lon and lat of type double. This is actually one of the supported ways for Elasticsearch to interpret geo_point, but it is not detected by the default mappings. By changing this to a geo_point we get more options for using location when querying and filtering. Lastly the name field is detected as a string, which is correct, but it uses the default analyzer. As we will see later on in this article, we’re better off changing it to not_analyzed.

Reindexing

Using the command below we create a new index with the new mappings. Once it’s created we change the parse script from the previous article to push documents to the new index, oslo3. Of course, in a production setup it’s preferable to use index aliases so that you don’t have to alter other systems just because of a name change.

curl -XPOST 'http://<cluster_id>.foundcluster.com:9200/oslo3' -d '{
    "settings" : {
        "number_of_shards" : 1
    },
    "mappings" : {
        "bikerack" : {
            "_source" : { "enabled" : true },
            "properties" : {
                "timestamp": {
                    "format": "date_time_no_millis",
                    "type": "date"
                },
                "bicycles": {
                    "type": "byte"
                },
                "weekday": {
                    "type": "byte"
                },
                "location": {
                    "type": "geo_point"
                },
                "name": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "locks": {
                    "type": "byte"
                },
                "hourOfDay": {
                    "type": "byte"
                }
            }
        }
    }
}'

Now we have to add all the previously indexed documents to the new index. Luckily Elasticsearch provides us with most of the stuff we need. We just have to create a small script to tie it all together. Firstly our script will be using the _source field which contains the original document sent to Elasticsearch. Secondly we will be using the scan query type which allows to traverse all the documents in the index in an efficient manner and lastly we will combine that with the scroll feature which allows us to retrieve the result of the scan in appropriate sized chunks. Then we simply push each chunk to the new index using the bulk api. Now, in order to use the bulk api I have switched from using wabisabi to elastic4s. Elastic4s is more feature complete as it wraps the java client, but it does require using the transport protocol. Luckily, Found has recently launched support for the transport protocol, although still in a private beta.

Setting up a Client Connection

Add found-transport-module and elastic4s to your build path using your favorite build tool. Both are found in maven central as “no.found.elasticsearch” % “elasticsearch-transport-module” and “com.sksamuel.elastic4s” % “elastic4s_2.10”, respectively. Using them together is just a matter of creating a TransportClient with the appropriate settings and then wrap it in an ElasticClient like this:

  val settings = ImmutableSettings.settingsBuilder()
    .put("transport.type", "no.found.elasticsearch.transport.netty.FoundNettyTransportModule")
    .put("transport.found.api-key", "<api key>")
    .put("cluster.name", "<cluster name>")
    .put("client.transport.ignore_cluster_name", false)
    .build();
  val client = {
    val address = new InetSocketTransportAddress("<cluster_id>.foundcluster.com", 9343);
    ElasticClient.fromClient(new TransportClient(settings).addTransportAddress(address))
  }

Getting the Data

Now we are going to get the data by issuing a scan query on the old index with a specified scroll size. Note that the first query does not return any documents, just a scroll id. We then use the scroll id with the search scroll endpoint to retrieve the first chunk of documents. This chunk also includes the scroll id for the next chunk and so on. To make the whole shebang easy to process without forcing to hold everything in memory we wrap it up in a Scala Stream like this:

  def getData() = {
    val res = client.execute {
      search in "oslo" searchType Scan scroll "10m" size 500
    }
    println("Executed first query")
    def fetch(previous: Future[SearchResponse]) = {
      previous.flatMap {
        res =>
          {
            client.searchScroll(res.getScrollId(), "10m")
          }
      }
    }
    def toStream(current: Future[SearchResponse]): Stream[SearchResponse] = {
      val result = Await.result(current, Duration.Inf)
      if (result.getHits().getHits().length > 0) {
        result #:: toStream(fetch(current))
      } else {
        Stream.empty
      }
    }
    toStream(fetch(res))
  }

Pushing the Data

Elastic4s wraps all return types into scala.concurrent.Futures. This makes it easy to execute operations with Elasticsearch in a nonblocking and asynchronous fashion, but that also implies out of order completion. For both illustration and potential debugging purposes I iterate the data stream together with the documentsCount stream. Using the map function I transform the documents in each scroll result into a list of indexing operations which then is fed to the bulk method in ElasticClient, as is seen in following code:

  def documentCounts(i: Int): Stream[Int] = i #:: documentCounts(i + 500)
  val indexingResults = for {
    (result, i) <- getData.zip(documentCounts(0))
    val bulkResult = client.bulk(
      result.getHits().getHits().map(_.getSourceAsString()).map {
        s => index into "oslo3/bikerack" source new StringSource(s)
      }: _*)
    val _ = bulkResult.onComplete {
      case Success(r) => {
        if (r.hasFailures()) {
          println(s"Indexing failure batch($i): ${r.buildFailureMessage()}")
        } else {
          println(s"Indexing success batch($i): [${r.getTook()}], [${r.getItems().length}]")
        }
      }
      case Failure(e) => {
        println(s"Indexing transport error batch($i): ${e.getLocalizedMessage()}")
      }
    }
  } yield bulkResult
  Await.result(Future.sequence(indexingResults), Duration.Inf)

This results in one bulk operation for each scroll chunk, or one read and one write operation for every 500 documents.

The complete reindexing script:

import scala.Array.canBuildFrom
import scala.collection.immutable.Stream.consWrapper
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration.Duration
import scala.util.Failure
import scala.util.Success
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.search.SearchResponse
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.settings.ImmutableSettings
import org.elasticsearch.common.transport.InetSocketTransportAddress
import com.sksamuel.elastic4s.ElasticClient
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.ElasticDsl.index
import com.sksamuel.elastic4s.ElasticDsl.search
import com.sksamuel.elastic4s.SearchType.Scan
import com.sksamuel.elastic4s.source.Source
import scala.concurrent.ExecutionContext
object Reindexer extends App {
  import ExecutionContext.Implicits.global
  val settings = ImmutableSettings.settingsBuilder()
    .put("transport.type", "no.found.elasticsearch.transport.netty.FoundNettyTransportModule")
    .put("transport.found.api-key", "<api key>")
    .put("cluster.name", "<cluster name>")
    .put("client.transport.ignore_cluster_name", false)
    .build();
  val client = {
    val address = new InetSocketTransportAddress("<cluster_id>.foundcluster.com", 9343);
    ElasticClient.fromClient(new TransportClient(settings).addTransportAddress(address))
  }
  def getData() = {
    val res = client.execute {
      search in "oslo" searchType Scan scroll "10m" size 500
    }
    println("Executed first query")
    def fetch(previous: Future[SearchResponse]) = {
      previous.flatMap {
        res =>
          {
            client.searchScroll(res.getScrollId(), "10m")
          }
      }
    }
    def toStream(current: Future[SearchResponse]): Stream[SearchResponse] = {
      val result = Await.result(current, Duration.Inf)
      if (result.getHits().getHits().length > 0) {
        result #:: toStream(fetch(current))
      } else {
        Stream.empty
      }
    }
    toStream(fetch(res))
  }
  def documentCounts(i: Int): Stream[Int] = i #:: documentCounts(i + 500)
  val indexingResults = for {
    (result, i) <- getData.zip(documentCounts(0))
    val bulkResult = client.bulk(
      result.getHits().getHits().map(_.getSourceAsString()).map {
        s => index into "oslo3/bikerack" source new StringSource(s)
      }: _*)
    val _ = bulkResult.onComplete {
      case Success(r) => {
        if (r.hasFailures()) {
          println(s"Indexing failure batch($i): ${r.buildFailureMessage()}")
        } else {
          println(s"Indexing success batch($i): [${r.getTook()}], [${r.getItems().length}]")
        }
      }
      case Failure(e) => {
        println(s"Indexing transport error batch($i): ${e.getLocalizedMessage()}")
      }
    }
  } yield bulkResult
  Await.result(Future.sequence(indexingResults), Duration.Inf)
  client.close()
  println("Done")
}
class StringSource(override val json: String) extends Source

After having successfully reindexed all documents from the Oslo index, the new index takes up only 150MB, opposed to the 180MB of the old index. Saving 30MB is no big deal, but this does reduce the size by about 17%, and given that the index now only has about two months of data those 17% will one day imply that I can postpone upgrading the cluster to a plan with more memory. Many developers have made the mistake of using the smallest number type only to get integer overflow issues in production a year later, but in this case I expect the number of racks and the density of the racks to grow rather than the size of the racks. And as the number of racks grow, more documents will have to be indexed by the minute.

Optimizing Queries

As the hour of day field now is a numeric we can use the histogram field on the hour of day field directly:

curl http://<cluster_id>.foundcluster.com:9200/oslo3/bikerack/_search?pretty=true -XPOST -d '{
"query": {
    "match": {"name": "92-Blindernveien 5"}
},
"facets" : {
        "histo1" : {
            "histogram" : {
                "key_field" : "hourOfDay",
                "value_field" : "bicycles",
                "interval" : 1
            }
        }
    }
}'

As the name field no longer is analyzed we can use a terms filter for matching the names. The main difference between using a filter and a query is that filters don’t produce rankings, only include or exclude documents; however filters are more appropriate for caching. Technically we could have used the terms filter before as well, but then we would have had to find unique terms for each name. For instance, when searching for observations of rack “92-Blindernveien 5” we would have to use one or more of the terms: “92”, “blindernveien” or “5”. Term number 5 would not have been very useful in this scenario as it’s shared with the names of several other racks. As the name filter is no longer analyzed it simply indexed exactly as written and we can use the term filter with the entire name like this:

curl http://<cluster_id>.foundcluster.com:9200/oslo3/bikerack/_search?pretty=true -XPOST -d '{
"query": {
    "filtered": {
        "filter": {
            "term": {"name": "92-Blindernveien 5"}
        }
    }
},
"facets" : {
        "histo1" : {
            "histogram" : {
                "key_field" : "hourOfDay",
                "value_field" : "bicycles",
                "interval" : 1
            }
        }
}   
}'

There is one more optimization that can easily be applied to our query. As you may have noticed, it still includes a hits section with documents that we are not really interested in. They were nice to have when we tested our query, to see that it included the documents we wanted for the facet, but once we’re confident about that, we’re only interested in the facet for this query. One option is to set the query size to zero, but Elasticsearch actually has a specific search type, appropriate for this kind of analysis. If search type = “count” is specified, then no documents are retrieved when executing the query against each shard, only document count and facets are retrieved.

curl http://<cluster_id>.foundcluster.com:9200/oslo3/bikerack/_search?pretty=true\&search_type=count -XPOST -d '{
"query": {
    "filtered": {
        "filter": {
            "term": {"name": "92-Blindernveien 5"}
        }
    }
},
"facets" : {
        "histo1" : {
            "histogram" : {
                "key_field" : "hourOfDay",
                "value_field" : "bicycles",
                "interval" : 1
            }
        }
}   
}'

Using Location in Queries

Let’s make use of the location. I have always had an hypothesis that the racks in the city center have a different usage pattern than the rest. Let’s test the geodistance filter:

curl http://<cluster_id>.foundcluster.com:9200/oslo3/bikerack/_search?pretty=true\&search_type=count -XPOST -d '{
"query": {
    "filtered": {
        "filter": {        
            "geo_distance": {"distance": "500m", "location": [10.742771,59.91082]}
        }
    }
},
"facets" : {
        "unique_racks" : {
            "terms" : {
                "field" : "name",
                "size" : "50"
            }
        }
}   
}'

This lists the names of the racks within 500 meters of the city center coordinate. Remember that we changed the name field to not_analyzed? That’s the reason the exact name indexed becomes a term. Had we not made this change the terms facet would have had output terms like 5, 92 and blindernveien. Once we have a filter (or query for that matter) it’s simple to generate the histogram facet for those racks, giving us the average values of the racks in the city center as a whole.

curl http://<cluster_id>.foundcluster.com:9200/oslo3/bikerack/_search?pretty=true\&search_type=count -XPOST -d '{
"query": {
    "filtered": {
        "filter": {        
            "geo_distance": {"distance": "500m", "location": [10.742771,59.91082]}
        }
    }
},
"facets" : {
        "histo1" : {
            "histogram" : {
                "key_field" : "hourOfDay",
                "value_field" : "bicycles",
                "interval" : 1
            }
        }
}  
}'

Of course, when aggregating statistics for several racks at once, we should account for the fact that the racks have different capacity. By using the value_script argument we can calculate the fill percentage of each rack. One thing to note is that you either have to use key_script and value_script or key_field and value_field, you cannot interchange the two. Like in the previous article I multiply the weekday number with one hundred and add the the hour of day so that the histogram facet gets one unique number for each hour of the week - and it’s still fairly easy to read. Now we get the average fill ratio for every rack in the city center for every hour of the week:

curl http://<cluster_id>.foundcluster.com:9200/oslo3/bikerack/_search?pretty=true\&search_type=count -XPOST -d '{
"query": {
    "filtered": {
        "filter": {        
            "geo_distance": {"distance": "500m", "location": [10.742771,59.91082]}
        }
    }
},
"facets" : {
        "histo1" : {
            "histogram" : {
                "key_script" : "doc['weekday'].value.intValue() * 100 + doc['hourOfDay'].value",
                "value_script" : "doc['bicycles'].value.doubleValue() / (doc['bicycles'].value + doc['locks'].value)",
                "interval" : 1
            }
        }
}  
}'

Having set my mind on visualising this data so that I might finally get some answers from my dataset I put all the values into Excel. However, one problem emerged right away. Some of the hours had NaN (Not a Number) values like this:

"key" : 217,
"count" : 779,
"min" : 0.0,
"max" : 0.9166666666666666,
"total" : "NaN",
"total_count" : 779,
"mean" : "NaN"

After refreshing my rusty Java knowledge I soon realized that this was probably due to dividing zero by zero. Combining a few terms filters with an and filter gave me the proof:

curl http://<cluster_id>.foundcluster.com:9200/oslo3/bikerack/_search?pretty=true -XPOST -d '{
"query": {
    "filtered": {
        "filter": {
            "and": [
                {"geo_distance": {"distance": "500m", "location": [10.742771,59.91082]}},
                {"term": {"hourOfDay": 17}},
                {"term": {"weekday": 2}},
                {"term": {"bicycles": 0}},
                {"term": {"locks": 0}}
            ]
        }
    }
} 
}'

On this specific day and time there was one rack located in the city center with no locks and no bicycles. Clearly, the fill ratio is undefined when the rack is both full and empty at the same time; in this case NaN is the correct answer for this rack, but it destroys our aggregated values as any operation on a NaN value results in NaN. For the sake of this calculation, let’s just skip them by wrapping our geo_distance filter in a bool filter and adding the term filters in the must_not section.

curl http://<cluster_id>.foundcluster.com:9200/oslo3/bikerack/_search?pretty=true\&search_type=count -XPOST -d '{
"query": {
    "filtered": {
        "filter": {
            "bool": {
                "must": {"geo_distance": {"distance": "500m", "location": [10.742771,59.91082]}},
                "must_not": {
                    "and": [
                        {"term": {"bicycles": 0}},
                        {"term": {"locks": 0}}
                    ]
                }
            }
        }
    }
}, 
"facets" : {
        "histo1" : {
            "histogram" : {
                "key_script" : "doc['weekday'].value.intValue() * 100 + doc['hourOfDay'].value",
                "value_script" : "doc['bicycles'].value.doubleValue() / (doc['bicycles'].value + doc['locks'].value)",
                "interval" : 1
            }
        }
}  
}'

In the above query we are still using filters instead of queries, but the way the filters are composed are actually suboptimal in terms of caching. This is a piece of information I personally had a hard time extracting from the official documentation, but which is explained very well in this blog post. The short answer is that most filters can be represented as bitsets. Bitsets are a very efficient way of telling whether a document id is included in a collection or not. The difference between the bool filter and and, or and not filters is that the bool filter is capable of utilizing the bitset caches whereas the other works on a document by document basis. The advantage of the latter is that filters which are not capable of producing bitsets may end up processing fewer documents. The filters that don’t support bitsets are: geo filters, filters with scripts and the numeric range filter. All other filters should be wrapped in a bool filter. With this knowledge we rearrange the filter composition and get:

curl http://<cluster_id>.foundcluster.com:9200/oslo3/bikerack/_search?pretty=true\&search_type=count -XPOST -d '{
"query": {
    "filtered": {
        "filter": {
            "and": [
                "bool": {
                    "must_not": {
                        "bool": {
                            "must": [            
                               {"term": {"bicycles": 0}},
                               {"term": {"locks": 0}}
                            ]
                        }
                    }
                },
                {"geo_distance": {"distance": "500m", "location": [10.742771,59.91082]}}
            ]
        }
    }
}, 
"facets" : {
        "histo1" : {
            "histogram" : {
                "key_script" : "doc['weekday'].value.intValue() * 100 + doc['hourOfDay'].value",
                "value_script" : "doc['bicycles'].value.doubleValue() / (doc['bicycles'].value + doc['locks'].value)",
                "interval" : 1
            }
        }
}  
}'

Now our data looks good. Let’s compare this with the racks outside the 500m radius by replacing the geo_distance filter with a slighty more flexible alternative, the geo_distance_range filter. The geo_distance_range filter allows to specify lower, upper or both boundaries.

curl http://<cluster_id>.foundcluster.com:9200/oslo3/bikerack/_search?pretty=true\&search_type=count -XPOST -d '{
"query": {
    "filtered": {
        "filter": {
            "and": [
                "bool": {
                    "must_not": {
                        "bool": {
                            "must": [            
                               {"term": {"bicycles": 0}},
                               {"term": {"locks": 0}}
                            ]
                        }
                    }
                },
                {"geo_distance_range": {"from": "500m", "location": [10.742771,59.91082]}}
            ]
        }
    }
}, 
"facets" : {
        "histo1" : {
            "histogram" : {
                "key_script" : "doc['weekday'].value.intValue() * 100 + doc['hourOfDay'].value",
                "value_script" : "doc['bicycles'].value.doubleValue() / (doc['bicycles'].value + doc['locks'].value)",
                "interval" : 1
            }
        }
}  
}'

With clean data it’s easy to do visualization and the trends are clear: almost every time the fill ratio increases in the city center it decreases outside the city center and vice versa.

Average fill level of bike racks inside and outside city centre

Average fill level of bike racks inside and outside city centre

Conclusion

This wraps up our work on the city bikes scenario for now. We have seen the importance of defining appropriate mappings, both in terms of query flexibility and performance. We have also discussed the difference between a filter and a query and of course, we have showed how to reindex after changing the index mappings. For future work, I plan to write an article where I’ll look at implementing an indexing strategy similar to that of Logstash, which makes it easy to purge old documents. If you have any questions or ideas for topics that would be cool to investigate in the City bikes series, please leave a comment below.