Engineering

The Go client for Elasticsearch: Configuration and customization

In a previous blog, we saw that the seemingly simple job of an Elasticsearch client — moving data between the calling code and the cluster — is actually quite complicated under the hood. Naturally, as much as we try to make the default behaviour of the client optimal for the majority of scenarios, there are situations where you want to configure, customize, or enable/disable certain features.

The Go client constructor accepts the elasticsearch.Config{} type, which provides a number of options to control the behaviour:

$ go doc -short github.com/elastic/go-elasticsearch/v7.Config
type Config struct {
  Addresses []string // A list of Elasticsearch nodes to use.
  // ...
}

Let's review the available options and examples of their usage.

Endpoints and security

The first thing you might want to do, unless you're just experimenting with the client locally, is to point it to a remote cluster. The most straightforward way of doing that is to export the ELASTICSEARCH_URL variable with a comma-separated list of node URLs. This follows the "Twelve-Factor" recommendation, leaves the configuration entirely out of the codebase, and plays well with cloud functions/lambdas and container orchestration systems such as Kubernetes.

// In main.go
es, err := elasticsearch.NewDefaultClient()
// ...

// On the command line
$ ELASTICSEARCH_URL=https://foo:bar@es1:9200 go run main.go

// For Google Cloud Function
$ gcloud functions deploy myfunction --set-env-vars ELASTICSEARCH_URL=https://foo:bar@es1:9200 ...

To configure the cluster endpoints directly (e.g., when you load them from a configuration file, or retrieve them from a secrets management system such as Vault) use the Addresses field with a slice of strings with URLs of the nodes you want to connect to, and the Username and Password fields for authentication:

var (
  clusterURLs = []string{"https://es1:9200", "https://es2:9200", "https://es3:9200"}
  username    = "foo"
  password    = "bar"
)
cfg := elasticsearch.Config{
  Addresses: clusterURLs,
  Username:  username,
  Password:  password,
}
es, err := elasticsearch.NewClient(cfg)
// ...

Use the APIKey field for authentication with the API keys, which are easier to manage via the Elasticsearch API or Kibana than usernames and passwords.

When using Elasticsearch Service on Elastic Cloud, you can point the client to the cluster by using the Cloud ID:

cfg := elasticsearch.Config{
  CloudID: "my-cluster:dXMtZWFzdC0xLZC5pbyRjZWM2ZjI2MWE3NGJm...",
  APIKey:  "VnVhQ2ZHY0JDZGJrUW...",
}
es, err := elasticsearch.NewClient(cfg)
// ...

Note: Don't forget that you still need to provide the authentication credentials when using the Cloud ID.

A common need in custom deployments or security-focused demos is to provide a certificate authority to the client so it can verify the server certificate. This can be achieved in multiple ways, but the most straightforward option is to use the CACert field, passing it a slice of bytes with the certificate payload:

cert, _ := ioutil.ReadFile("path/to/ca.crt")
cfg := elasticsearch.Config{
  // ...
  CACert: cert,
}
es, err := elasticsearch.NewClient(cfg)
// ...

Have a look at the _examples/security folder of the repository for a full demo, complete with generating custom certificates with the elasticsearch-certutil command-line utility and starting the cluster with corresponding configuration.

Global headers

In specific scenarios such as when using a token-based authentication or when interacting with proxies, you might need to add HTTP headers to client requests. While you can add them in the API calls, using the WithHeader() method, it's more convenient to set them globally, in the client configuration:

cfg := elasticsearch.Config{
  // ...
  Header: http.Header(map[string][]string{
    "Authorization": {"Bearer dGhpcyBpcyBub3QgYSByZWFs..."},
  }),
}
es, err := elasticsearch.NewClient(cfg)
// ...

Logging and metrics

During development, it is crucial to closely follow what is being sent to the Elasticsearch cluster and what is being received. The easiest way of achieving that is to simply print the details of the request and response to the console or a file. The Go client package provides several distinct logger components. For debugging during development, the estransport.ColorLogger is perhaps the most useful one — it prints succinct, formatted, and colorized information to the console:

cfg := elasticsearch.Config{
  Logger: &estransport.ColorLogger{Output: os.Stdout},
}
es, _ := elasticsearch.NewClient(cfg)
es.Info()
// > GET http://localhost:9200/ 200 OK 11ms

By default, the request and response body is not printed — in order to do so, set the corresponding logger options:

cfg := elasticsearch.Config{
  Logger: &estransport.ColorLogger{
    Output:             os.Stdout,
    EnableRequestBody:  true,
    EnableResponseBody: true,
  },
}
es, _ := elasticsearch.NewClient(cfg)
es.Info()
// > GET http://localhost:9200/ 200 OK 6ms
// >     « {
// >     «   "name" : "es1",
// >     «   "cluster_name" : "go-elasticsearch",
// >     ...

Note: The estransport.TextLogger component prints the information without using any special characters and colors.

When asking for help in our forums, it is often useful to present the list of operations in an agnostic format so they can be easily "replayed" locally, without understanding or installing a particular programming language. An ideal way of doing that is to format the output as a sequence of executable curl commands with the estransport.CurlLogger:

cfg := elasticsearch.Config{
  Logger: &estransport.CurlLogger{
    Output:             os.Stdout,
    EnableRequestBody:  true,
    EnableResponseBody: true,
  },
}
es, _ := elasticsearch.NewClient(cfg)
es.Index(
  "test",
  strings.NewReader(`{"title" : "logging"}`),
  es.Index.WithRefresh("true"),
  es.Index.WithPretty(),
  es.Index.WithFilterPath("result", "_id"),
)
// > curl -X POST -H 'Content-Type: application/json' 'http://localhost:9200/test/_doc?pretty&filter_path=result%2C_id&refresh=true' -d \
// > '{
// > "title": "logging"
// > }'
// > # => 2020-07-23T13:12:05Z [201 Created] 65ms
// > # {
// > #  "_id": "_YPNe3MBdF-KdkKEZqF_",
// > #  "result": "created"
// > # }

When logging the client operations in production, plain text output is not suitable, because you want to store the logs as structured data, quite possibly in Elasticsearch itself. In this case, you can use the estransport.JSONLogger to output the entries as JSON documents in an Elastic Common Schema (ECS)-compatible format:

cfg := elasticsearch.Config{
  Logger: &estransport.JSONLogger{Output: os.Stdout},
}
es, _ := elasticsearch.NewClient(cfg)
es.Info()
// > {"@timestamp":"2020-07-23T13:12:05Z","event":{"duration":10970000},"url":{"scheme":"http","domain":"localhost","port":9200,"path":"/","query":""},"http":{"request":{"method":"GET"},"response":{"status_code":200}}}

In keeping with the spirit of client extensibility, all the listed loggers are just implementations of the estransport.Logger interface. To use a custom logger, simply implement this interface:

$ go doc -short github.com/elastic/go-elasticsearch/v7/estransport.Logger
type Logger interface {
  LogRoundTrip(*http.Request, *http.Response, error, time.Time, time.Duration) error
  // ...
}

The _examples/logging/custom.go example demonstrates how to use the github.com/rs/zerolog package as the logging "driver" by implementing the interface for a CustomLogger type.

Another feature of the client, useful in production or for debugging, is the ability to export various metrics about itself: the number of requests and failures, the response status codes, and the details about the connections. Set the EnableMetrics option to true, and use the Metrics() method to retrieve the information. The _examples/instrumentation/expvar.go example shows an integration with the expvar package.

Note: The _examples/instrumentation folder contains interactive demos of integrating the Go client with Elastic APM and OpenCensus as well.

Retries

We’ve seen how the client manages connections and retries requests for specific conditions. Now let's have a look at the related configuration options.

By default, the client retries the request up to three times; to set a different limit, use the MaxRetries field. To change the list of response status codes which should be retried, use the RetryOnStatus field. Together with the RetryBackoff option, you can use it to retry requests when the server sends a 429 Too Many Requests response:

cfg := elasticsearch.Config{
  RetryOnStatus: []int{429, 502, 503, 504},
  RetryBackoff:  func(i int) time.Duration {
    // A simple exponential delay
    d := time.Duration(math.Exp2(float64(i))) * time.Second
    fmt.Printf("Attempt: %d | Sleeping for %s...\n", i, d)
    return d
  },
}
es, err := elasticsearch.NewClient(cfg)
// ...

Because the RetryBackoff function only returns a time.Duration, you can provide a more robust backoff implementation by using a third-party package such as github.com/cenkalti/backoff:

import "github.com/cenkalti/backoff/v4"

retryBackoff := backoff.NewExponentialBackOff()
retryBackoff.InitialInterval = time.Second

cfg := elasticsearch.Config{
  RetryOnStatus: []int{429, 502, 503, 504},
  RetryBackoff: func(i int) time.Duration {
    if i == 1 {
      retryBackoff.Reset()
    }
    d := retryBackoff.NextBackOff()
    fmt.Printf("Attempt: %d | Sleeping for %s...\n", i, d)
    return d
  },
}
es, err := elasticsearch.NewClient(cfg)
// ...

Discovering nodes

The DiscoverNodesOnStart and DiscoverNodesInterval settings control whether the client should perform node discovery ("sniffing") during initialization or periodically; both are disabled by default.

Note: Enable node discovery only when the client is connected to the cluster directly, not when the cluster is behind a proxy, which is also the case when using Elasticsearch Service.

The ConnectionPoolFunc field allows to provide a custom connection pool implementation, which satisfies the estransport.ConnectionPool interface. However, this is generally useful only in complicated and esoteric network topologies.

A custom connection selector, passed through the Selector field, might be more practically useful in certain situations, like the AWS zone-aware one we looked at in the previous article, or to implement a weighted round robin selector.

The most invasive configuration option is the Transport field, which allows you to completely replace the default HTTP client used by the package, namely http.DefaultTransport. You might want to do this in situations where you want to configure timeouts, TLS or proxy settings, or any other low-level HTTP details:

cfg := elasticsearch.Config{
  // ...
  Transport: &http.Transport{
    Proxy: ...
    MaxIdleConnsPerHost:   ...,
    ResponseHeaderTimeout: ...,
    DialContext:           (&net.Dialer{
      Timeout:   ...,
      KeepAlive: ...,
    }).DialContext,
    TLSClientConfig: &tls.Config{
      MinVersion: ...,
      // ...
    },
  },
}
es, err := elasticsearch.NewClient(cfg)
// ...

Custom transport

Because the Transport field accepts any implementation of http.RoundTripper, it is possible to pass a custom implementation. Let's look at an example where we count the number of requests, following the _examples/customization.go demo:

type CountingTransport struct {
  count uint64
}

func (t *CountingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
  atomic.AddUint64(&t.count, 1)
  return http.DefaultTransport.RoundTrip(req)
}

tp := CountingTransport{}
cfg := elasticsearch.Config{Transport: &tp}
es, err := elasticsearch.NewClient(cfg)
// ...
fmt.Printf("%80s\n", fmt.Sprintf("Total Requests: %d", atomic.LoadUint64(&tp.count)))

Typically, there's no need to replace the default HTTP client with a custom implementation, with one specific exception: mocking the client in unit tests. In the following example, the mockTransport type defines the RoundTripFunc field, which allows to return a specific response for specific tests.

// mockTransport defines a mocked transport for unit tests
type mockTransport struct {
  RoundTripFunc func(req *http.Request) (*http.Response, error)
}

// RoundTripFunc implements http.RoundTripper
func (t *mockTransport) RoundTrip(req *http.Request) (*http.Response, error) {
  return t.RoundTripFunc(req)
}

func TestClientSuccess(t *testing.T) {
  tp := &mockTransport{
    RoundTripFunc: func(req *http.Request) (*http.Response, error) {
      // Return a successful mock response
      return &http.Response{
        Status:     "200 OK",
        StatusCode: 200,
        Body:       ioutil.NopCloser(strings.NewReader("HELLO")),
      }, nil
    },
  }

  cfg := elasticsearch.Config{Transport: &tp}
  es, _ := elasticsearch.NewClient(cfg)
  res, _ := es.Info()

  t.Log(res)
}

The mocked response is printed out when the test is executed:

go test -v tmp/client_mocking_test.go
// === RUN   TestClientSuccess
//     TestClientSuccess: client_mocking_test.go:42: [200 OK] HELLO
// --- PASS: TestClientSuccess (0.00s)
// ...

Note: In specific situations, it is desirable to replace the HTTP client from the standard library with a more performant one, such as github.com/valyala/fasthttp, which has an observable performance difference. Run the benchmarks in _examples/fasthttp to measure the difference in your own environment.

This concludes our overview of the Go client configuration options. In the next part, we will focus on different ways of encoding and decoding JSON payloads, and the esutil.BulkIndexer helper.