Elastic's Managed OTLP Endpoint lets your Python app send traces, metrics, and logs directly to Elastic Cloud using the standard OpenTelemetry SDK and just 2 environment variables. No Collector to deploy, configure, or maintain. This post walks through a fully instrumented Flask API with log-to-trace correlation, from first span to Kibana service map.
Prerequisites
- An Elastic Cloud account (Serverless or Cloud Hosted v9.0+)
- Python 3.9+
- Kibana's built-in sample eCommerce orders dataset (Add data > Sample data > Sample eCommerce orders)
Install the required packages:
pip install flask elasticsearch python-dotenv \
opentelemetry-api opentelemetry-sdk \
opentelemetry-exporter-otlp
What is the Elastic managed OTLP endpoint?
The Elastic Managed OTLP Endpoint is a hosted ingestion layer that accepts standard OTLP over HTTP or gRPC directly from your application, with no collector required.
OpenTelemetry uses OTLP (OpenTelemetry Protocol) to transport telemetry data.
In a traditional setup, your application sends data to an OpenTelemetry Collector, which then forwards it to a backend like Elasticsearch.
The Collector handles batching, retries, and routing, but it also means one more component to deploy, configure, and maintain.
Elastic's managed OTLP endpoint, recently GA, removes that step.
Your application sends OTLP data directly to an endpoint that Elastic operates for you.
The endpoint accepts standard OTLP over HTTP or gRPC, backed by a managed ingestion layer that handles scaling, buffering, and durability.
You use the same opentelemetry-exporter-otlp packages you would use with any backend.
Building a Python Flask API to instrument with OpenTelemetry
We'll build a small REST API in Python with Flask that lists and looks up eCommerce orders stored in Elasticsearch.
As a data source, we'll use Kibana's built-in sample data.
You can find the complete application code in the companion repository.
The walkthrough below builds the code step by step, but you can clone the repo and follow along if you prefer.
Before adding telemetry, let's look at the base application: a Flask API that queries Kibana's sample eCommerce index.
It has two endpoints: list recent orders and look up a single order by ID.
Create a file called app.py:
import os
from dotenv import load_dotenv
from elasticsearch import Elasticsearch
from flask import Flask, jsonify
load_dotenv()
# Elasticsearch client
es = Elasticsearch(
hosts=[os.environ["ES_URL"]],
api_key=os.environ["ES_API_KEY"],
)
INDEX = "kibana_sample_data_ecommerce"
app = Flask(__name__)
@app.route("/orders")
def list_orders():
response = es.search(
index=INDEX,
size=10,
sort=[{"order_date": "desc"}],
aggs={"total_revenue": {"sum": {"field": "taxful_total_price"}}},
)
hits = response["hits"]["hits"]
total_revenue = response["aggregations"]["total_revenue"]["value"]
orders = [
{
"order_id": h["_source"]["order_id"],
"customer": h["_source"]["customer_full_name"],
"total": h["_source"]["taxful_total_price"],
"date": h["_source"]["order_date"],
}
for h in hits
]
return jsonify({"orders": orders, "total_revenue": total_revenue})
@app.route("/orders/<order_id>")
def get_order(order_id):
response = es.search(
index=INDEX,
size=1,
query={"term": {"order_id": order_id}},
)
hits = response["hits"]["hits"]
if not hits:
return jsonify({"error": "Order not found"}), 404
return jsonify(hits[0]["_source"])
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5001)
Create a .env file with your Elasticsearch credentials.
To get the OTLP endpoint and API key, follow the Quickstart: Send OTLP data to Elastic guide.
OTEL_EXPORTER_OTLP_ENDPOINT=
OTEL_EXPORTER_OTLP_HEADERS=Authorization=ApiKey <api_key>
ES_URL=
ES_API_KEY=
This is a working API with no instrumentation. The next sections add traces, metrics, and logs one signal at a time.
Sending traces from Python
Let's start with traces using the Python OpenTelemetry SDK.
We'll wrap the Elasticsearch calls in spans so we can see how long each query takes.
Set up the tracer
Add the following to the top of app.py:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
# Create a resource identifying your service
resource = Resource.create({
SERVICE_NAME: "my-python-app"
})
# Set up the tracer provider with OTLP export
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
trace.set_tracer_provider(tracer_provider)
tracer = trace.get_tracer(__name__)
The OTLPSpanExporter() reads the endpoint and headers from the environment variables we set earlier, so no arguments are needed in the code.
Add spans to your endpoints
Now wrap each route handler in a parent span and nest a child span around the Elasticsearch call. This gives you a two-level trace: one span for the HTTP request and one for the database query inside it.
@app.route("/orders")
def list_orders():
with tracer.start_as_current_span("list-orders") as span:
with tracer.start_as_current_span("es.search") as es_span:
es_span.set_attribute("db.system", "elasticsearch")
es_span.set_attribute("db.elasticsearch.index", INDEX)
response = es.search(
index=INDEX,
size=10,
sort=[{"order_date": "desc"}],
aggs={"total_revenue": {"sum": {"field": "taxful_total_price"}}},
)
hits = response["hits"]["hits"]
total_revenue = response["aggregations"]["total_revenue"]["value"]
orders = [
{
"order_id": h["_source"]["order_id"],
"customer": h["_source"]["customer_full_name"],
"total": h["_source"]["taxful_total_price"],
"date": h["_source"]["order_date"],
}
for h in hits
]
span.set_attribute("orders.returned", len(orders))
return jsonify({"orders": orders, "total_revenue": total_revenue})
Each request creates a parent span (list-orders) with a child span (es.search) wrapping the Elasticsearch call.
The db.system and db.elasticsearch.index attributes are what Kibana uses to recognize the call as an Elasticsearch query in the trace waterfall.
Sending metrics from Python
Metrics reveal patterns across all requests: how many calls hit each endpoint, how response times trend over time. Let's add a counter and a histogram to capture that.
Set up the meter
Add the following to app.py:
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
# Set up the metric reader and provider
metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(),
export_interval_millis=10000 # Export every 10 seconds
)
meter_provider = MeterProvider(
resource=resource,
metric_readers=[metric_reader]
)
metrics.set_meter_provider(meter_provider)
meter = metrics.get_meter(__name__)
Create instruments
# Counter for total requests
request_counter = meter.create_counter(
name="app.requests.total",
description="Total number of requests",
unit="1"
)
# Histogram for request duration
request_duration = meter.create_histogram(
name="app.request.duration",
description="Request duration in seconds",
unit="s"
)
Record metrics in your endpoints
Update the route handler to record metrics:
import time
@app.route("/orders")
def list_orders():
start_time = time.time()
with tracer.start_as_current_span("list-orders") as span:
with tracer.start_as_current_span("es.search") as es_span:
es_span.set_attribute("db.system", "elasticsearch")
es_span.set_attribute("db.elasticsearch.index", INDEX)
response = es.search(
index=INDEX,
size=10,
sort=[{"order_date": "desc"}],
aggs={"total_revenue": {"sum": {"field": "taxful_total_price"}}},
)
hits = response["hits"]["hits"]
total_revenue = response["aggregations"]["total_revenue"]["value"]
orders = [
{
"order_id": h["_source"]["order_id"],
"customer": h["_source"]["customer_full_name"],
"total": h["_source"]["taxful_total_price"],
"date": h["_source"]["order_date"],
}
for h in hits
]
duration = time.time() - start_time
span.set_attribute("orders.returned", len(orders))
request_counter.add(1, {"endpoint": "/orders", "status": "200"})
request_duration.record(duration, {"endpoint": "/orders"})
return jsonify({"orders": orders, "total_revenue": total_revenue})
Sending logs from Python
Traces and metrics tell you what happened and how often. Logs capture the details: error messages, parameter values, and debug context. OpenTelemetry bridges Python's standard logging module so your existing log statements are exported as OTLP data alongside traces and metrics.
When you emit a log inside a span context, the SDK automatically attaches the trace and span IDs, so you can jump from a log line to the exact trace that produced it.
Set up the log provider
import logging
from opentelemetry._logs import set_logger_provider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
# Set up the logger provider
logger_provider = LoggerProvider(resource=resource)
logger_provider.add_log_record_processor(
BatchLogRecordProcessor(OTLPLogExporter())
)
set_logger_provider(logger_provider)
# Bridge Python's logging to OTel
handler = LoggingHandler(
level=logging.INFO,
logger_provider=logger_provider
)
logging.getLogger().addHandler(handler)
logging.getLogger().addHandler(logging.StreamHandler()) # Also print to terminal
logging.getLogger().setLevel(logging.INFO)
logger = logging.getLogger(__name__)
The StreamHandler() ensures logs also appear in your terminal while developing.
Without it, logs only go to Elastic and the terminal stays silent, which can be confusing during testing.
Emit logs from your endpoints
@app.route("/orders")
def list_orders():
start_time = time.time()
with tracer.start_as_current_span("list-orders") as span:
logger.info("Listing recent orders")
with tracer.start_as_current_span("es.search") as es_span:
es_span.set_attribute("db.system", "elasticsearch")
es_span.set_attribute("db.elasticsearch.index", INDEX)
response = es.search(
index=INDEX,
size=10,
sort=[{"order_date": "desc"}],
aggs={"total_revenue": {"sum": {"field": "taxful_total_price"}}},
)
hits = response["hits"]["hits"]
total_revenue = response["aggregations"]["total_revenue"]["value"]
orders = [
{
"order_id": h["_source"]["order_id"],
"customer": h["_source"]["customer_full_name"],
"total": h["_source"]["taxful_total_price"],
"date": h["_source"]["order_date"],
}
for h in hits
]
duration = time.time() - start_time
span.set_attribute("orders.returned", len(orders))
logger.info(
"Orders listed",
extra={"orders.returned": len(orders), "duration_s": round(duration, 4)}
)
request_counter.add(1, {"endpoint": "/orders", "status": "200"})
request_duration.record(duration, {"endpoint": "/orders"})
return jsonify({"orders": orders, "total_revenue": total_revenue})
Because the logger.info() calls happen inside a span context, OpenTelemetry automatically attaches the trace ID and span ID to each log record.
In Kibana, this means you can jump from a log line directly to the related trace.
Running the instrumented Flask app and generating traffic
With traces, metrics, and logs wired up, the application is fully instrumented. Let's run it, send a few requests, and verify the data arrives in Kibana.
python app.py
Note: Flask's built-in development server is fine for this tutorial. For production workloads, use a WSGI server like gunicorn.
Generate some traffic:
curl http://localhost:5001/orders
curl http://localhost:5001/orders/584677
curl http://localhost:5001/orders/does-not-exist
After a few seconds, open Kibana and go to Observability > APM > Services.
You should see my-python-app listed.
Viewing traces, metrics, and logs in Kibana
Now that telemetry data is flowing, let's walk through where each signal lands in Kibana and how they connect to each other.
Traces
Go to APM > Services > my-python-app > Transactions. Click on a list-orders transaction to open the trace waterfall.
You should see the parent span list-orders with a child span es.search.
The db.system and db.elasticsearch.index attributes we set earlier appear in the span details, and Kibana recognizes the child as an Elasticsearch query.
Failed transactions
For error visibility, open a get-order transaction for the /orders/does-not-exist request.
The transaction details show event.outcome: failure along with the order.id: does-not-exist attribute we attached.
This confirms that error status and custom attributes propagate correctly through the managed endpoint.
Correlated logs
Go to Discover and filter by service.name: my-python-app.
Expand a log document to see the body.text, trace_id, and span_id fields.
Because we emitted logs inside a span context, every log record carries the trace ID.
You can copy the trace_id value and go to APM to jump directly to the trace that produced that log.
Metrics
In Discover, switch to the metrics-* data view.
You should see the app.requests.total counter and app.request.duration histogram arriving at regular intervals.
You can also create a Lens visualization with app.requests.total on the Y axis and @timestamp on the X axis to see request volume over time.
Service map
Kibana builds the service map automatically from the span attributes we set earlier.
Because we tagged our Elasticsearch spans with db.system: elasticsearch, Kibana draws the dependency between my-python-app and elasticsearch, giving you a visual overview of your service and its backends.
Conclusion: full OpenTelemetry observability in Python without a collector
In this walkthrough, we instrumented a Flask API with traces, metrics, and logs using the standard OpenTelemetry SDK and two environment variables, then verified all three signals in Kibana with log-to-trace correlation built in.
The managed OTLP endpoint takes care of scaling, buffering, and durability, so you can focus on your application instead of operating ingestion infrastructure.
From here, consider auto-instrumenting with EDOT Python to remove the manual spans, or adding custom metrics that match your domain.
Next steps
- Explore the Elastic Managed OTLP Endpoint documentation for advanced configuration.
- Try
opentelemetry-instrumentor EDOT Python to auto-instrument Flask and the Elasticsearch client without manual spans. - Check the OpenTelemetry Python SDK docs for additional instrumentation options.
- For fine-grained control over histogram temporality, see the managed OTLP endpoint documentation.