Migrating 1 billion log lines from OpenSearch to Elasticsearch

elastic-blog-header-1-billion-log-lines.png

What are the current options to migrate from OpenSearch to Elasticsearch®?

OpenSearch is a fork of Elasticsearch 7.10 that has diverged quite a bit from itself lately, resulting in a different set of features and also different performance, as this benchmark shows (hint: it’s currently much slower than Elasticsearch).

Given the differences between the two solutions, restoring a snapshot from OpenSearch is not possible, nor is reindex-from-remote, so our only option is then using something in between that will read from OpenSearch and write to Elasticsearch.

This blog will show you how easy it is to migrate from OpenSearch to Elasticsearch for better performance and less disk usage!

1 - arrows

1 billion log lines

We are going to use part of the data set we used for the benchmark, which takes about half a terabyte on disk, including replicas, and spans over a week ( January 1–7, 2023).

We have in total 1,009,165,775 documents that take 453.5GB of space in OpenSearch, including the replicas. That’s 241.2KB per document. This is going to be important later when we enable a couple optimizations in Elasticsearch that will bring this total size way down without sacrificing performance!

This billion log line data set is spread over nine indices that are part of a datastream we are calling logs-myapplication-prod. We have primary shards of about 25GB in size, according to the best practices for optimal shard sizing. A GET _cat/indices show us the indices we are dealing with:

index                              docs.count pri rep pri.store.size store.size
.ds-logs-myapplication-prod-000049  102519334   1   1         22.1gb     44.2gb
.ds-logs-myapplication-prod-000048  114273539   1   1         26.1gb     52.3gb
.ds-logs-myapplication-prod-000044  111093596   1   1         25.4gb     50.8gb
.ds-logs-myapplication-prod-000043  113821016   1   1         25.7gb     51.5gb
.ds-logs-myapplication-prod-000042  113859174   1   1         24.8gb     49.7gb
.ds-logs-myapplication-prod-000041  112400019   1   1         25.7gb     51.4gb
.ds-logs-myapplication-prod-000040  113362823   1   1         25.9gb     51.9gb
.ds-logs-myapplication-prod-000038  110994116   1   1         25.3gb     50.7gb
.ds-logs-myapplication-prod-000037  116842158   1   1         25.4gb     50.8gb

Both OpenSearch and Elasticsearch clusters have the same configuration: 3 nodes with 64GB RAM and 12 CPU cores. Just like in the benchmark, the clusters are running in Kubernetes.

Moving data from A to B

Typically, moving data from one Elasticsearch cluster to another is easy as a snapshot and restore if the clusters are compatible versions of each other or a reindex from remote if you need real-time synchronization and minimized downtime. These methods do not apply when migrating data from OpenSearch to Elasticsearch because the projects have significantly diverged from the 7.10 fork. However, there is one method that will work: scrolling.

Scrolling

Scrolling involves using an external tool, such as Logstash®, to read data from the source cluster and write it to the destination cluster. This method provides a high degree of customization, allowing us to transform the data during the migration process if needed. Here are a couple advantages of using Logstash:

  • Easy parallelization: It’s really easy to write concurrent jobs that can read from different “slices” of the indices, essentially maximizing our throughput.
  • Queuing: Logstash automatically queues documents before sending.
  • Automatic retries: In the event of a failure or an error during data transmission, Logstash will automatically attempt to resend the data; moreover, it will stop querying the source cluster as often, until the connection is re-established, all without manual intervention.

Scrolling allows us to do an initial search and to keep pulling batches of results from Elasticsearch until there are no more results left, similar to how a “cursor” works in relational databases.

A scrolled search takes a snapshot in time by freezing the segments that make the index up until the time the request is made, preventing those segments from merging. As a result, the scroll doesn’t see any changes that are made to the index after the initial search request has been made.

Migration strategies

Reading from A and writing in B in can be slow without optimization because it involves paginating through the results, transferring each batch over the network to Logstash, which will assemble the documents in another batch and then transfer those batches over the network again to Elasticsearch, where the documents will be indexed. So when it comes to such large data sets, we must be very efficient and extract every bit of performance where we can.

Let’s start with the facts — what do we know about the data we need to transfer? We have nine indices in the datastream, each with about 100 million documents. Let’s test with just one of the indices and measure the indexing rate to see how long it takes to migrate. The indexing rate can be seen by activating the monitoring functionality in Elastic® and then navigating to the index you want to inspect.

Scrolling in the deep
The simplest approach for transferring the log lines over would be to make Elasticsearch scroll over the entire data set and check it later when it finishes. Here we will introduce our first two variables: PAGE_SIZE and BATCH_SIZE. The former is how many records we are going to bring from the source every time we query it, and the latter is how many documents are going to be assembled together by Logstash and written to the destination index.

2 - scrolling in the deep
Deep scrolling

With such a large data set, the scroll slows down as this deep pagination progresses. The indexing rate starts at 6,000 docs/second and steadily descends down to 700 docs/second because the pagination gets very deep. Without any optimization, it would take us 19 days (!) to migrate the 1 billion documents. We can do better than that!

3 - index rate for a deep scroll
Indexing rate for a deep scroll

Slice me nice
We can optimize scrolling by using an approach called Sliced scroll, where we split the index in different slices to consume them independently.

Here we will introduce our last two variables: SLICES and WORKERS. The amount of slices cannot be too small as the performance decreases drastically over time, and it can’t be too big as the overhead of maintaining the scrolls would counter the benefits of a smaller search.

4 - slice me nice
Sliced scroll

Let’s start by migrating a single index (out of the nine we have) with different parameters to see what combination gives us the highest throughput.

SLICES

PAGE_SIZE

WORKERS

BATCH_SIZE

Average Indexing Rate

3

500350013,319 docs/sec

3

1,00031,00013,048 docs/sec

4

250425010,199 docs/sec
4500450012,692 docs/sec
41,00041,00010,900 docs/sec
5500550012,647 docs/sec
51,00051,00010,334 docs/sec
52,00052,00010,405 docs/sec
102501025014,083 docs/sec
1025041,00012,014 docs/sec
1050041,00010,956 docs/sec

It looks like we have a good set of candidates for maximizing the throughput for a single index, in between 12K and 14K documents per second. That doesn't mean we have reached our ceiling. Even though search operations are single threaded and every slice will trigger sequential search operations to read data, that does not prevent us from reading several indices in parallel.

By default, the maximum number of open scrolls is 500 — this limit can be updated with the search.max_open_scroll_context cluster setting, but the default value is enough for this particular migration.

5 - indexing rate

Let’s migrate

Preparing our destination indices

We are going to create a datastream called logs-myapplication-reindex to write the data to, but before indexing any data, let’s ensure our index template and index lifecycle management configurations are properly set up. An index template acts as a blueprint for creating new indices, allowing you to define various settings that should be applied consistently across your indices.

Index lifecycle management policy
Index lifecycle management (ILM) is equally vital, as it automates the management of indices throughout their lifecycle. With ILM, you can define policies that determine how long data should be retained, when it should be rolled over into new indices, and when old indices should be deleted or archived. Our policy is really straightforward:

PUT _ilm/policy/logs-myapplication-lifecycle-policy
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {
            "max_primary_shard_size": "25gb"
          }
        }
      },
      "warm": {
        "min_age": "0d",
        "actions": {
          "forcemerge": {
            "max_num_segments": 1
          }
        }
      }
    }
  }
}

Index template (and saving 23% in disk space)
Since we are here, we’re going to go ahead and enable Synthetic Source, a clever feature that allows us to store and discard the original JSON document while still reconstructing it when needed from the stored fields.

For our example, enabling Synthetic Source resulted in a remarkable 23.4% improvement in storage efficiency, reducing the size required to store a single document from 241.2KB in OpenSearch to just 185KB in Elasticsearch.

Our full index template is therefore:

PUT _index_template/logs-myapplication-reindex
{
  "index_patterns": [
    "logs-myapplication-reindex"
  ],
  "priority": 500,
  "data_stream": {},
  "template": {
    "settings": {
      "index": {
        "lifecycle.name": "logs-myapplication-lifecycle-policy",
        "codec": "best_compression",
        "number_of_shards": "1",
        "number_of_replicas": "1",
        "query": {
          "default_field": [
            "message"
          ]
        }
      }
    },
    "mappings": {
      "_source": {
        "mode": "synthetic"
      },
      "_data_stream_timestamp": {
        "enabled": true
      },
      "date_detection": false,
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "agent": {
          "properties": {
            "ephemeral_id": {
              "type": "keyword",
              "ignore_above": 1024
            },
            "id": {
              "type": "keyword",
              "ignore_above": 1024
            },
            "name": {
              "type": "keyword",
              "ignore_above": 1024
            },
            "type": {
              "type": "keyword",
              "ignore_above": 1024
            },
            "version": {
              "type": "keyword",
              "ignore_above": 1024
            }
          }
        },
        "aws": {
          "properties": {
            "cloudwatch": {
              "properties": {
                "ingestion_time": {
                  "type": "keyword",
                  "ignore_above": 1024
                },
                "log_group": {
                  "type": "keyword",
                  "ignore_above": 1024
                },
                "log_stream": {
                  "type": "keyword",
                  "ignore_above": 1024
                }
              }
            }
          }
        },
        "cloud": {
          "properties": {
            "region": {
              "type": "keyword",
              "ignore_above": 1024
            }
          }
        },
        "data_stream": {
          "properties": {
            "dataset": {
              "type": "keyword",
              "ignore_above": 1024
            },
            "namespace": {
              "type": "keyword",
              "ignore_above": 1024
            },
            "type": {
              "type": "keyword",
              "ignore_above": 1024
            }
          }
        },
        "ecs": {
          "properties": {
            "version": {
              "type": "keyword",
              "ignore_above": 1024
            }
          }
        },
        "event": {
          "properties": {
            "dataset": {
              "type": "keyword",
              "ignore_above": 1024
            },
            "id": {
              "type": "keyword",
              "ignore_above": 1024
            },
            "ingested": {
              "type": "date"
            }
          }
        },
        "host": {
          "type": "object"
        },
        "input": {
          "properties": {
            "type": {
              "type": "keyword",
              "ignore_above": 1024
            }
          }
        },
        "log": {
          "properties": {
            "file": {
              "properties": {
                "path": {
                  "type": "keyword",
                  "ignore_above": 1024
                }
              }
            }
          }
        },
        "message": {
          "type": "match_only_text"
        },
        "meta": {
          "properties": {
            "file": {
              "type": "keyword",
              "ignore_above": 1024
            }
          }
        },
        "metrics": {
          "properties": {
            "size": {
              "type": "long"
            },
            "tmin": {
              "type": "long"
            }
          }
        },
        "process": {
          "properties": {
            "name": {
              "type": "keyword",
              "ignore_above": 1024
            }
          }
        },
        "tags": {
          "type": "keyword",
          "ignore_above": 1024
        }
      }
    }
  }
}

Building a custom Logstash image

We are going to use a containerized Logstash for this migration because both clusters are sitting on a Kubernetes infrastructure, so it's easier to just spin up a Pod that will communicate to both clusters.

Since OpenSearch is not an official Logstash input, we must build a custom Logstash image that contains the logstash-input-opensearch plugin. Let’s use the base image from docker.elastic.co/logstash/logstash:8.10.0 and just install the plugin:

FROM docker.elastic.co/logstash/logstash:8.10.0

USER logstash
WORKDIR /usr/share/logstash
RUN bin/logstash-plugin install logstash-input-opensearch

Writing a Logstash pipeline

Now we have our Logstash Docker image, and we need to write a pipeline that will read from OpenSearch and write to Elasticsearch.

The input

input {
    opensearch {
      hosts => ["os-cluster:9200"]
      ssl => true
      ca_file => "/etc/logstash/certificates/opensearch-ca.crt"
      user => "${OPENSEARCH_USERNAME}"
      password => "${OPENSEARCH_PASSWORD}"
      index => "${SOURCE_INDEX_NAME}"
      slices => "${SOURCE_SLICES}"
      size => "${SOURCE_PAGE_SIZE}"
      scroll => "5m"
      docinfo => true
      docinfo_target => "[@metadata][doc]"
    }
}

Let’s break down the most important input parameters. The values are all represented as environment variables here:

  • hosts: Specifies the host and port of the OpenSearch cluster. In this case, it’s connecting to “os-cluster” on port 9200.
  • index: Specifies the index in the OpenSearch cluster from which to retrieve logs. In this case, it’s “logs-myapplication-prod” which is a datastream that contains the actual indices (e.g., .ds-logs-myapplication-prod-000049).
  • size: Specifies the maximum number of logs to retrieve in each request.
  • scroll: Defines how long a search context will be kept open on the OpenSearch server. In this case, it’s set to “5m,” which means each request must be answered and a new “page” asked within five minutes.
  • docinfo and docinfo_target: These settings control whether document metadata should be included in the Logstash output and where it should be stored. In this case, document metadata is being stored in the [@metadata][doc] field — this is important because the document’s _id will be used as the destination id as well.

The ssl and ca_file are highly recommended if you are migrating from clusters that are in a different infrastructure (separate cloud providers). You don’t need to specify a ca_file if your TLS certificates are signed by a public authority, which is likely the case if you are using a SaaS and your endpoint is reachable over the internet. In this case, only ssl => true would suffice. In our case, all our TLS certificates are self-signed, so we must also provide the Certificate Authority (CA) certificate.

The (optional) filter
We could use this to drop or alter the documents to be written to Elasticsearch if we wanted, but we are not going to, as we want to migrate the documents as is. We are only removing extra metadata fields that Logstash includes in all documents, such as "@version" and "host". We are also removing the original "data_stream" as it contains the source data stream name, which might not be the same in the destination.

filter {
    mutate {
        remove_field => ["@version", "host", "data_stream"]
    }
}

The output
The output is really simple — we are going to name our datastream logs-myapplication-reindex and we are using the document id of the original documents in document_id, to ensure there are no duplicate documents. In Elasticsearch, datastream names follow a convention <type>-<dataset>-<namespace> so our logs-myapplication-reindex datastream has “myapplication” as dataset and “prod” as namespace.

elasticsearch {
    hosts => "${ELASTICSEARCH_HOST}"

    user => "${ELASTICSEARCH_USERNAME}"
    password => "${ELASTICSEARCH_PASSWORD}"

    document_id => "%{[@metadata][doc][_id]}"

    data_stream => "true"
    data_stream_type => "logs"
    data_stream_dataset => "myapplication"
    data_stream_namespace => "prod"
}

Deploying Logstash

We have a few options to deploy Logstash: it can be deployed locally from the command line, as a systemd service, via docker, or on Kubernetes.

Since both of our clusters are deployed in a Kubernetes environment, we are going to deploy Logstash as a Pod referencing our Docker image created earlier. Let’s put our pipeline inside a ConfigMap along with some configuration files (pipelines.yml and config.yml).

In the below configuration, we have SOURCE_INDEX_NAME, SOURCE_SLICES, SOURCE_PAGE_SIZE, LOGSTASH_WORKERS, and LOGSTASH_BATCH_SIZE conveniently exposed as environment variables so you just need to fill them out.

apiVersion: v1
kind: Pod
metadata:
  name: logstash-1
spec:
  containers:
  - name: logstash
    image: ugosan/logstash-opensearch-input:8.10.0
    imagePullPolicy: Always
    env: 
      - name: SOURCE_INDEX_NAME
        value: ".ds-logs-benchmark-dev-000037"
      - name: SOURCE_SLICES
        value: "10"
      - name: SOURCE_PAGE_SIZE
        value: "500"
      - name: LOGSTASH_WORKERS
        value: "4"
      - name: LOGSTASH_BATCH_SIZE
        value: "1000"
      - name: OPENSEARCH_USERNAME
        valueFrom:
          secretKeyRef:
            name: os-cluster-admin-password
            key: username
      - name: OPENSEARCH_PASSWORD
        valueFrom:
          secretKeyRef:
            name: os-cluster-admin-password
            key: password
      - name: ELASTICSEARCH_USERNAME
        value: "elastic"
      - name: ELASTICSEARCH_PASSWORD
        valueFrom:
          secretKeyRef:
            name: es-cluster-es-elastic-user
            key: elastic
    resources:
      limits:
        memory: "4Gi"
        cpu: "2500m"
      requests: 
        memory: "1Gi"
        cpu: "300m"
    volumeMounts:
      - name: config-volume
        mountPath: /usr/share/logstash/config
      - name: etc
        mountPath: /etc/logstash
        readOnly: true
  volumes:  
  - name: config-volume
    projected:
      sources:
      - configMap:
          name: logstash-configmap
          items:
            - key: pipelines.yml
              path: pipelines.yml
            - key: logstash.yml
              path: logstash.yml
  - name: etc
    projected:
      sources:
      - configMap:
          name: logstash-configmap
          items:
            - key: pipeline.conf
              path: pipelines/pipeline.conf
      - secret:
          name: os-cluster-http-cert
          items:
            - key: ca.crt
              path: certificates/opensearch-ca.crt
      - secret:
          name: es-cluster-es-http-ca-internal 
          items:
            - key: tls.crt
              path: certificates/elasticsearch-ca.crt
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-configmap
data:
  pipelines.yml: |
    - pipeline.id: reindex-os-es
      path.config: "/etc/logstash/pipelines/pipeline.conf"
      pipeline.batch.size: ${LOGSTASH_BATCH_SIZE}
      pipeline.workers: ${LOGSTASH_WORKERS}
  logstash.yml: |
    log.level: info
    pipeline.unsafe_shutdown: true
    pipeline.ordered: false
  pipeline.conf: |
    input {  
        opensearch {
          hosts => ["os-cluster:9200"]
          ssl => true
          ca_file => "/etc/logstash/certificates/opensearch-ca.crt"
          user => "${OPENSEARCH_USERNAME}"
          password => "${OPENSEARCH_PASSWORD}"
          index => "${SOURCE_INDEX_NAME}"
          slices => "${SOURCE_SLICES}"
          size => "${SOURCE_PAGE_SIZE}"
          scroll => "5m"
          docinfo => true
          docinfo_target => "[@metadata][doc]"
        }
    }

    filter {
        mutate {
            remove_field => ["@version", "host", "data_stream"]
        }
    }

    output {
        elasticsearch {
            hosts => "https://es-cluster-es-http:9200"
            ssl => true
            ssl_certificate_authorities => ["/etc/logstash/certificates/elasticsearch-ca.crt"]
            ssl_verification_mode => "full"

            user => "${ELASTICSEARCH_USERNAME}"
            password => "${ELASTICSEARCH_PASSWORD}"

            document_id => "%{[@metadata][doc][_id]}"

            data_stream => "true"
            data_stream_type => "logs"
            data_stream_dataset => "myapplication"
            data_stream_namespace => "reindex"
        }
    }

That’s it.

After a couple hours, we successfully migrated 1 billion documents from OpenSearch to Elasticsearch and even saved 23% plus on disk storage! Now that we have the logs in Elasticsearch how about extracting actual business value from them? Logs contain so much valuable information - we can not only do all sorts of interesting things with AIOPS, like Automatically Categorize those logs, but also extract business metrics and detect anomalies on them, give it a try.

OpenSearch

Elasticsearch

IndexdocssizeIndexdocssizeDiff.
.ds-logs-myapplication-prod-00003711684215827285520870logs-myapplication-reindex-0000371168421582199843532921.46%
.ds-logs-myapplication-prod-00003811099411627263291740logs-myapplication-reindex-0000381109941162154001108223.45%
.ds-logs-myapplication-prod-00004011336282327872438186logs-myapplication-reindex-0000401133628232223464193222.50%
.ds-logs-myapplication-prod-00004111240001927618801653logs-myapplication-reindex-0000411124000192205945386822.38%
.ds-logs-myapplication-prod-00004211385917426686723701logs-myapplication-reindex-0000421138591742109376610823.41%
.ds-logs-myapplication-prod-00004311382101627657006598logs-myapplication-reindex-0000431138210162205945475222.52%
.ds-logs-myapplication-prod-00004411109359627281936915logs-myapplication-reindex-0000441110935962155951342223.43%
.ds-logs-myapplication-prod-00004811427353928111420495logs-myapplication-reindex-0000481142735392226439893923.21%
.ds-logs-myapplication-prod-00004910251933423731274338logs-myapplication-reindex-0000491025193341930725000120.56%

Interested in trying Elasticsearch? Start our 14-day free trial.

The release and timing of any features or functionality described in this post remain at Elastic's sole discretion. Any features or functionality not currently available may not be delivered on time or at all.