Interceptors
Interceptors are middleware functions that can modify HTTP requests and responses on every call to Elasticsearch. They are useful for injecting authentication credentials, adding custom headers, implementing observability, and handling challenge-response authentication protocols.
An interceptor wraps the transport's round-trip function. It receives the next function in the chain and returns a new function that can modify the request before calling next, and modify the response after.
The type signatures are defined in the elastictransport package:
type RoundTripFunc func(*http.Request) (*http.Response, error)
type InterceptorFunc func(next RoundTripFunc) RoundTripFunc
Interceptors are configured in elasticsearch.Config and applied at client creation. They cannot be changed after the transport is created.
es, err := elasticsearch.NewClient(elasticsearch.Config{
Interceptors: []elastictransport.InterceptorFunc{
myFirstInterceptor(),
mySecondInterceptor(),
},
})
Interceptors are applied left to right for requests and right to left for responses. In the example above:
myFirstInterceptormodifies the request firstmySecondInterceptormodifies the request second, then sends it to ElasticsearchmySecondInterceptorsees the response firstmyFirstInterceptorsees the response last
sequenceDiagram
participant App as Application
participant I1 as Interceptor_1
participant I2 as Interceptor_2
participant ES as Elasticsearch
App->>I1: request
I1->>I2: request
I2->>ES: request
ES-->>I2: response
I2-->>I1: response
I1-->>App: response
When credentials may change at runtime — for example, during token refresh or credential rotation — an interceptor can inject the latest credentials into each request dynamically.
func DynamicAuthInterceptor(provider *CredentialProvider) elastictransport.InterceptorFunc {
return func(next elastictransport.RoundTripFunc) elastictransport.RoundTripFunc {
return func(req *http.Request) (*http.Response, error) {
username, password := provider.Get()
req.SetBasicAuth(username, password)
return next(req)
}
}
}
- Wrap the next function in the chain.
- Retrieve the latest credentials at call time.
- Set the
Authorizationheader on the outgoing request. - Pass the request to the next interceptor (or transport).
The CredentialProvider uses a sync.RWMutex so credentials can be updated safely from another goroutine:
type CredentialProvider struct {
mu sync.RWMutex
username string
password string
}
func (p *CredentialProvider) Get() (string, string) {
p.mu.RLock()
defer p.mu.RUnlock()
return p.username, p.password
}
func (p *CredentialProvider) Update(username, password string) {
p.mu.Lock()
defer p.mu.Unlock()
p.username = username
p.password = password
}
Usage:
authProvider := NewCredentialProvider("user1", "password1")
es, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{"https://localhost:9200"},
Interceptors: []elastictransport.InterceptorFunc{
DynamicAuthInterceptor(authProvider),
},
})
// Later, rotate credentials — all future requests use the new credentials
authProvider.Update("user2", "password2")
In multi-tenant applications or impersonation scenarios, different requests may need different credentials. An interceptor can read credentials from the request's context.Context:
type basicAuthKey struct{}
type basicAuthValue struct {
username string
password string
}
// WithBasicAuth attaches credentials to a context.
func WithBasicAuth(ctx context.Context, username, password string) context.Context {
return context.WithValue(ctx, basicAuthKey{}, basicAuthValue{username, password})
}
func ContextAuthInterceptor() elastictransport.InterceptorFunc {
return func(next elastictransport.RoundTripFunc) elastictransport.RoundTripFunc {
return func(req *http.Request) (*http.Response, error) {
if auth, ok := req.Context().Value(basicAuthKey{}).(basicAuthValue); ok {
req.SetBasicAuth(auth.username, auth.password)
}
return next(req)
}
}
}
- Override credentials only if the context contains them.
- Requests without context credentials proceed with the client's default auth.
Usage:
es, err := elasticsearch.NewClient(elasticsearch.Config{
Username: "default_user",
Password: "default_password",
Interceptors: []elastictransport.InterceptorFunc{
ContextAuthInterceptor(),
},
})
// Uses default credentials
res, err := es.Info()
if err != nil {
log.Fatal(err)
}
defer res.Body.Close()
// Uses per-request credentials
ctx := WithBasicAuth(context.Background(), "tenant_a", "tenant_a_secret")
res, err = es.Info(es.Info.WithContext(ctx))
if err != nil {
log.Fatal(err)
}
defer res.Body.Close()
Challenge-response authentication (Kerberos/SPNEGO)
Interceptors can also handle challenge-response protocols. The following example implements Kerberos/SPNEGO authentication by retrying a request when a 401 challenge is received:
func KerberosInterceptor(tokenProvider func() (string, error)) elastictransport.InterceptorFunc {
return func(next elastictransport.RoundTripFunc) elastictransport.RoundTripFunc {
return func(req *http.Request) (*http.Response, error) {
resp, err := next(req)
if err != nil {
return nil, err
}
if resp.StatusCode == http.StatusUnauthorized {
authHeader := resp.Header.Get("WWW-Authenticate")
if strings.HasPrefix(authHeader, "Negotiate") {
resp.Body.Close()
token, err := tokenProvider()
if err != nil {
return nil, fmt.Errorf("failed to obtain Kerberos token: %w", err)
}
retryReq := req.Clone(req.Context())
retryReq.Header.Set("Authorization", "Negotiate "+token)
return next(retryReq)
}
}
return resp, nil
}
}
}
- Send the initial request without authentication.
- Close the 401 response body before retrying.
- Obtain a Kerberos token (in production, use a library like
gokrb5). - Retry the request with the
Authorization: Negotiateheader.
Custom observability via interceptors
While the client has built-in OpenTelemetry support, interceptors can add custom observability. Here is a logging interceptor that records request and response details:
func LoggingInterceptor() elastictransport.InterceptorFunc {
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
return func(next elastictransport.RoundTripFunc) elastictransport.RoundTripFunc {
return func(req *http.Request) (*http.Response, error) {
start := time.Now()
logger.Info("elasticsearch request started",
slog.String("method", req.Method),
slog.String("url", req.URL.String()),
)
resp, err := next(req)
duration := time.Since(start)
if err != nil {
logger.Error("elasticsearch request failed",
slog.Duration("duration", duration),
slog.String("error", err.Error()),
)
return nil, err
}
logger.Info("elasticsearch request completed",
slog.Int("status_code", resp.StatusCode),
slog.Duration("duration", duration),
)
return resp, nil
}
}
}
You can compose multiple interceptors for logging, metrics, and tracing:
es, err := elasticsearch.NewClient(elasticsearch.Config{
Interceptors: []elastictransport.InterceptorFunc{
LoggingInterceptor(),
MetricsInterceptor(requestCounter, requestDuration),
TracingInterceptor(tracer),
},
})
Prefer using the built-in OpenTelemetry instrumentation over custom interceptors for observability when possible. The built-in support follows OpenTelemetry semantic conventions and provides richer span attributes.
- Keep interceptors lightweight. Interceptors run on every request. Avoid expensive operations like disk I/O or network calls inside the hot path.
- Ordering matters. Place authentication interceptors before observability interceptors so that traces capture the final request state.
- Always call
next. Failing to callnext(req)will prevent the request from reaching Elasticsearch. Only skipnextif you intentionally want to short-circuit (e.g., returning a cached response). - Close response bodies on retry. If your interceptor retries a request (like the Kerberos example), always close the original response body to avoid resource leaks.
- Use
req.Clone()for retries. When retrying, clone the request to avoid mutating the original. - Handle errors from
next. Ifnextreturns an error, return it to the caller rather than swallowing it.