Roy Russo is the Vice President of Engineering at Predikto, Inc, a predictive analytics startup based in Atlanta, GA. Roy was the Co-Founder and VP of Product Management for Atlanta-based Marketing Automation vendor, LoopFuse; recently acquired by Atlanta-based SalesFusion, Inc. Roy also helped co-found JBoss Portal, a JSR-168 compliant enterprise Java Portal, and represented JBoss on the Java Content Repository, JSR-170.
Predikto processes large volumes of data for asset-intensive industries in order to predict equipment failure. This information allows customers to pro-actively apply prescriptive maintenance, thereby avoiding downtime. The Predikto Enterprise Platform combines applies advanced data science and machine learning techniques and algorithms for customers across industries such as oil & gas, rail, fleet, manufacturing, and data gathered from the Industrial Internet of Things. It processes large volumes of data from disparate sources in heterogeneous formats, something that Elasticsearch is well-suited to store and query on.
Elasticsearch is a key component of the Predikto Enterprise Platform. Horizontal scalability, product maturity, and tooling all factored in the decision to adopt Elasticsearch at Predikto.
Mapping Distributed Assets
GeoJSON support is an important feature of Elasticsearch. For example, a railroad company may want to predict asset failure on moving locomotives, train cars, and even train car doors to prevent unnecessary stops that cost millions of dollars annually. Predikto factors the location of the asset (sometimes an actively moving asset like a train or fleet vehicle) and correlates the location with weather forecast data also stored in Elasticsearch, as weather often plays a role in device failure. Since Elasticsearch supports the GeoJSON standard out-of-the-box, it allowed Predikto to standardize sensor and device location data in GeoJSON format as well. Now, correlating latitude and longitude coordinates with weather data is a trivial matter.
The other benefit to having all location data standardized with GeoJSON and stored in Elasticsearch was that it made visualization simple. The Predikto user interface includes a large map, allowing users to visualize their assets as they move in real-time and highlight those assets that are likely to fail. Heavy use of the aggregations API enables a clean way to do time-series modeling,especially with features like terms, date_histogram, and top hits APIs.
Standardizing the format of geo-location data helped remove the custom transformation of location data for the user interface and the predictive analysis process.
Integrating Elasticsearch with Spark for big data ETL
There is no widely-adopted standard format for sensor data, although many IoT vendors are working on one. This means that the massive amounts of data flowing in to the Predikto system are in any manner of format, from CSV, TSV, log line, xml, and even some legacy formats that defy logic at first glance. All of this data must be transformed into a standard format in order for our machine learning algorithms and predictive analytics models to process.
This is where Apache Spark comes in. Being able to process large batches of data in memory, while reading and writing from and to Elasticsearch using the es-hadoop library maintained by Elastic enabled Predikto to have smooth ETL processes that improved performance from the previous, disk-bound ETL process by a factor of 10. Massaging the original data into the Predikto standard format often requires substantial processing power. In this instance, Elasticsearch, as a primary datasource, is having to perform heavy read operations, and finally, heavy write operations from dozens of Spark workers asynchronously during the ETL process.
The last point above helped remove a stumbling-block we ran in to when writing asynchronously from many Spark workers to Amazon S3. Because of S3’s eventual-consistency object store, the Spark master was unable to combine to worker-written file parts, as they were not immediately visible, thus causing the entire Spark process to fail. In the past, we remedied the issue by saving the output to a mounted EBS volume, and then copying the file to S3. This was error-prone and not performant at scale. Instead, we make heavy use of the bulk API to insert documents from Spark workers now in to Elasticsearch.
In the end, the Predikto standard format is a JSON-formatted representation which fits nicely with our use of Elasticsearch. Since the format is standardized, so are many of the Elasticsearch mappings, templates, analyzers, and tokenizers. This makes on-boarding new sensor data a trivial manner, thus differentiating us from many of our labor-intensive competitors.
Elasticsearch and Predikto’s dynamic reporting dashboards
Elasticsearch solved two problems for us with Spark.
- Act as a horizontally scalable document-store that supports native integration.
- Act as a fast-read document-store with dynamic querying capabilities for a BI-like dashboard.
The Predikto user-interface features a bird’s eye view of customers’ distributed assets using the data stored in Elasticsearch. Location visualization is made possible using LeafletJS, and Elasticsearch. Since LeafletJS and Elasticsearch both support the GeoJSON standard, point and aggregate plotting on the map is a trivial manner, facilitated by the elasticsearch-js SDK.
In addition to a mapping view, the Predikto interface allows users to analyze data in a myriad of different ways: plotting time-series data, grouping attributes, and even drilling down into any specific device’s current state, health score, and probability of failure. Dynamic querying capabilities come natural to Elasticsearch, and not having to micro-manage a query plan and design tables in advance allows Predikto ultimate flexibility in designing new features and reports for the user interface. However, it’s important to add that Elasticsearch would likely have not been adopted for use in advanced reporting at Predikto, if it were not for the recently-added aggregations API. Aggregations are a fundamental part of our user interface, as customers are monitoring real-time asset health across hundred or thousands of pieces of equipment.
Elasticsearch has proven to be an asset in our microservice-based big data architecture, allowing Predikto to scale smoothly in infrastructure and personnel. The simple and well-documented Elasticsearch APIs have made it easy for us to access to data in different ways and make Elasticsearch a core part of our run-time and reporting software.
Scaling Elasticsearch affordably
As a startup engineering team, time is at a premium and never our friend. We needed a document store that fit all of the above needs, yet was easy to implement and scale with. We wanted to avoid an anti-pattern; many of the solutions we evaluated, although simple to get started with, would become an anchor as we scaled. Elasticsearch has a great reputation and a variety of large scale deployments that vouch for its ability to scale gracefully.
The Elasticsearch AWS plugin helped get us started in avoiding split brain scenarios and in making node discovery a breeze. Now, we host several clusters across several AWS regions - some production, and some for testing and staging new releases. We implemented many of the well known optimization best practices discussed on the Elasticsearch blog and published articles, such as JVM tuning (set ES_MIN_MEM and ES_MAX_MEM equally), enabling mlockall, and adjusting index flushing timing. JVM tuning is an art, better left to experts, but we happen to have plenty of those on hand. Network topology was also somewhere we made adjustments, by employing client nodes to act as load balancers across the cluster. Although much of the Elasticsearch operations are bulk writes, even in the face of peak time usage, the most complex of queries will return in sub-second time.
Over time, as we have added more sensor data, our cluster has scaled to several dozen nodes, backed by large EBS volumes mounted on EC2 instances. Adding new nodes to the cluster has proven simple and efficient with Elasticsearch’s autoscaling capability of re-distributing shards across the cluster.
This is only the beginning
As we build out new features in the reporting dashboards and grow our customer base, our use of Elasticsearch today is only just beginning. We foresee a heavier utilization of Elasticsearch at every level. Operating at sensor-data scale is no easy task, but Elasticsearch removed many of the headaches associated with early-stage scaling of our architecture, and we see it as an integral part of our processes over the next billion documents and thereafter.