Manual instrumentation with OpenTelemetry for Python applications

observability-launch-series-2-python-manual_(1).jpg

DevOps and SRE teams are transforming the process of software development. While DevOps engineers focus on efficient software applications and service delivery, SRE teams are key to ensuring reliability, scalability, and performance. These teams must rely on a full-stack observability solution that allows them to manage and monitor systems and ensure issues are resolved before they impact the business.  

Observability across the entire stack of modern distributed applications requires data collection, processing, and correlation often in the form of dashboards. Ingesting all system data requires installing agents across stacks, frameworks, and providers — a process that can be challenging and time-consuming for teams who have to deal with version changes, compatibility issues, and proprietary code that doesn't scale as systems change.      

Thanks to OpenTelemetry (OTel), DevOps and SRE teams now have a standard way to collect and send data that doesn't rely on proprietary code and have a large support community reducing vendor lock-in.  

In a previous blog, we also reviewed how to use the OpenTelemetry demo and connect it to Elastic®, as well as some of Elastic’s capabilities with OpenTelemetry and Kubernetes. 

In this blog, we will show how to use manual instrumentation for OpenTelemetry with the Python service of our application called Elastiflix. This approach is slightly more complex than using automatic instrumentation.

The beauty of this is that there is no need for the otel-collector! This setup enables you to slowly and easily migrate an application to OTel with Elastic according to a timeline that best fits your business.

Application, prerequisites, and config

The application that we use for this blog is called Elastiflix, a movie streaming application. It consists of several micro-services written in .NET, NodeJS, Go, and Python.

Before we instrument our sample application, we will first need to understand how Elastic can receive the telemetry data.

configuration

All of Elastic Observability’s APM capabilities are available with OTel data. Some of these include:

  • Service maps
  • Service details (latency, throughput, failed transactions)
  • Dependencies between services, distributed tracing
  • Transactions (traces)
  • Machine learning (ML) correlations
  • Log correlation

In addition to Elastic’s APM and a unified view of the telemetry data, you will also be able to use Elastic’s powerful machine learning capabilities to reduce the analysis, and alerting to help reduce MTTR.

Prerequisites

View the example source code

The full source code, including the Dockerfile used in this blog, can be found on GitHub. The repository also contains the same application without instrumentation. This allows you to compare each file and see the differences.

The following steps will show you how to instrument this application and run it on the command line or in Docker. If you are interested in a more complete OTel example, take a look at the docker-compose file here, which will bring up the full project.

Before we begin, let’s look at the non-instrumented code first.

This is our simple Python Flask application that can receive a GET request. (This is a portion of the full main.py file.)

from flask import Flask, request
import sys

import logging 
import redis 
import os
import ecs_logging
import datetime
import random
import time 

redis_host = os.environ.get('REDIS_HOST') or 'localhost'
redis_port = os.environ.get('REDIS_PORT') or 6379

application_port = os.environ.get('APPLICATION_PORT') or 5000

app = Flask(__name__)

# Get the Logger
logger = logging.getLogger("app")
logger.setLevel(logging.DEBUG)

# Add an ECS formatter to the Handler
handler = logging.StreamHandler()
handler.setFormatter(ecs_logging.StdlibFormatter())
logger.addHandler(handler)
logging.getLogger('werkzeug').setLevel(logging.ERROR)
logging.getLogger('werkzeug').addHandler(handler)

r = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)

@app.route('/favorites', methods=['GET'])
def get_favorite_movies():
    user_id = str(request.args.get('user_id'))   

    logger.info('Getting favorites for user ' + user_id, extra={
        "event.dataset": "favorite.log",
        "user.id": request.args.get('user_id')
    })
     
    favorites = r.smembers(user_id)
    
    # convert to list
    favorites = list(favorites)
    logger.info('User ' + user_id + ' has favorites: ' + str(favorites), extra={
        "event.dataset": "favorite.log",
        "user.id": user_id
    })
    return { "favorites": favorites}

logger.info('App startup')
app.run(host='0.0.0.0', port=application_port)
logger.info('App Stopped')

Step-by-step guide

Step 0. Log in to your Elastic Cloud account

This blog assumes you have an Elastic Cloud account — if not, follow the instructions to get started on Elastic Cloud.

trial

Step 1. Install and initialize OpenTelemetry

As a first step, we’ll need to add some additional libraries to our application. 

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.sdk.resources import Resource

This code imports necessary OpenTelemetry libraries, including those for tracing, exporting, and instrumenting specific libraries like Flask, Requests, and Redis.

Next we read the variables:

OTEL_EXPORTER_OTLP_HEADERS
OTEL_EXPORTER_OTLP_ENDPOINT 

And then initialize the exporter.

otel_exporter_otlp_headers = os.environ.get('OTEL_EXPORTER_OTLP_HEADERS')

otel_exporter_otlp_endpoint = os.environ.get('OTEL_EXPORTER_OTLP_ENDPOINT')

exporter = OTLPSpanExporter(endpoint=otel_exporter_otlp_endpoint, headers=otel_exporter_otlp_headers)

In order to pass additional parameters to OpenTelemetry, we will read the OTEL_RESOURCE_ATTRIBUTES variable and convert it into an object.

resource_attributes = os.environ.get('OTEL_RESOURCE_ATTRIBUTES') or 'service.version=1.0,deployment.environment=production'
key_value_pairs = resource_attributes.split(',')
result_dict = {}

for pair in key_value_pairs:
    key, value = pair.split('=')
    result_dict[key] = value

Next, we will then use these parameters to populate the resources configuration.

resourceAttributes = {
     "service.name": otel_service_name,
     "service.version": result_dict['service.version'],
     "deployment.environment": result_dict['deployment.environment']
}

resource = Resource.create(resourceAttributes)

We then set up the trace provider using the previously created resource. The trace provider will allow us to create spans later after getting a tracer instance from it.

Additionally, we specify the use of BatchSPanProcessor. The Span processor is an interface that allows hooks for span start and end method invocations.

In OpenTelemetry, different Span processors are offered. The BatchSPanProcessor batches span and sends them in bulk. Multiple Span processors can be configured to be active at the same time using the MultiSpanProcessor. See OpenTelemetry documentation

Additionally, we added the resource module. This allows us to specify attributes such as service.name, version, and more. See OpenTelemetry semantic conventions documentation for more details.

provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(exporter)
provider.add_span_processor(processor)

# Sets the global default tracer provider
trace.set_tracer_provider(provider)

# Creates a tracer from the global tracer provider
tracer = trace.get_tracer(otel_service_name)

Finally, because we are using Flask and Redis, we also add the following, which allows us to automatically instrument both Flask and Redis.

Technically you could consider this “cheating.” We are using some parts of the Python auto-instrumentation. However, it’s generally a good approach to resort to using some of the auto-instrumentation modules. This saves you a lot of time, and in addition, it ensures that functionality like distributed tracing will work automatically for any requests you receive or send.

FlaskInstrumentor().instrument_app(app)
RequestsInstrumentor().instrument()
RedisInstrumentor().instrument()

Step 2. Adding Custom Spans

Now that we have everything added and initialized, we can add custom spans.

If we want to have additional instrumentation for a part of our app, we simply wrap the /favoritesGET function code using Python with:

with tracer.start_as_current_span("add_favorite_movies", set_status_on_exception=True) as span:
        ...

The wrapped code is as follows:

@app.route('/favorites', methods=['GET'])
def get_favorite_movies():
    # add artificial delay if enabled
    if delay_time > 0:
        time.sleep(max(0, random.gauss(delay_time/1000, delay_time/1000/10)))

    with tracer.start_as_current_span("get_favorite_movies") as span:
        user_id = str(request.args.get('user_id'))   

        logger.info('Getting favorites for user ' + user_id, extra={
            "event.dataset": "favorite.log",
            "user.id": request.args.get('user_id')
        })
        
        favorites = r.smembers(user_id)
        
        # convert to list
        favorites = list(favorites)
        logger.info('User ' + user_id + ' has favorites: ' + str(favorites), extra={
            "event.dataset": "favorite.log",
            "user.id": user_id
        })

Additional code

In addition to modules and span instrumentation, the sample application also checks some environment variables at startup. When sending data to Elastic without an OTel collector, the OTEL_EXPORTER_OTLP_HEADERS variable is required as it contains the authentication. The same is true for OTEL_EXPORTER_OTLP_ENDPOINT, the host where we’ll send the telemetry data.

otel_exporter_otlp_headers = os.environ.get('OTEL_EXPORTER_OTLP_HEADERS')
# fail if secret token not set
if otel_exporter_otlp_headers is None:
    raise Exception('OTEL_EXPORTER_OTLP_HEADERS environment variable not set')


otel_exporter_otlp_endpoint = os.environ.get('OTEL_EXPORTER_OTLP_ENDPOINT')
# fail if server url not set
if otel_exporter_otlp_endpoint is None:
    raise Exception('OTEL_EXPORTER_OTLP_ENDPOINT environment variable not set')
else:
    exporter = OTLPSpanExporter(endpoint=otel_exporter_otlp_endpoint, headers=otel_exporter_otlp_headers)

Final code
For comparison, this is the instrumented code of our sample application. You can find the full source code in GitHub.

from flask import Flask, request
import sys

import logging 
import redis 
import os
import ecs_logging
import datetime
import random
import time 

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

#Using grpc exporter since per the instructions in OTel docs this is needed for any endpoint receiving OTLP.

from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
#from opentelemetry.instrumentation.wsgi import OpenTelemetryMiddleware
from opentelemetry.sdk.resources import Resource

redis_host = os.environ.get('REDIS_HOST') or 'localhost'
redis_port = os.environ.get('REDIS_PORT') or 6379
otel_traces_exporter = os.environ.get('OTEL_TRACES_EXPORTER') or 'otlp'
otel_metrics_exporter = os.environ.get('OTEL_TRACES_EXPORTER') or 'otlp'
environment = os.environ.get('ENVIRONMENT') or 'dev'
otel_service_version = os.environ.get('OTEL_SERVICE_VERSION') or '1.0.0'
resource_attributes = os.environ.get('OTEL_RESOURCE_ATTRIBUTES') or 'service.version=1.0,deployment.environment=production'

otel_exporter_otlp_headers = os.environ.get('OTEL_EXPORTER_OTLP_HEADERS')
# fail if secret token not set
if otel_exporter_otlp_headers is None:
    raise Exception('OTEL_EXPORTER_OTLP_HEADERS environment variable not set')
#else:
#    otel_exporter_otlp_fheaders= f"Authorization=Bearer%20{secret_token}"

otel_exporter_otlp_endpoint = os.environ.get('OTEL_EXPORTER_OTLP_ENDPOINT')
# fail if server url not set
if otel_exporter_otlp_endpoint is None:
    raise Exception('OTEL_EXPORTER_OTLP_ENDPOINT environment variable not set')
else:
    exporter = OTLPSpanExporter(endpoint=otel_exporter_otlp_endpoint, headers=otel_exporter_otlp_headers)


key_value_pairs = resource_attributes.split(',')
result_dict = {}

for pair in key_value_pairs:
    key, value = pair.split('=')
    result_dict[key] = value

resourceAttributes = {
     "service.name": result_dict['service.name'],
     "service.version": result_dict['service.version'],
     "deployment.environment": result_dict['deployment.environment']
#     # Add more attributes as needed
}

resource = Resource.create(resourceAttributes)


provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(exporter)
provider.add_span_processor(processor)

# Sets the global default tracer provider
trace.set_tracer_provider(provider)

# Creates a tracer from the global tracer provider
tracer = trace.get_tracer("favorite")


application_port = os.environ.get('APPLICATION_PORT') or 5000

app = Flask(__name__)


FlaskInstrumentor().instrument_app(app)
#OpenTelemetryMiddleware().instrument()
RequestsInstrumentor().instrument()
RedisInstrumentor().instrument()

#app.wsgi_app = OpenTelemetryMiddleware(app.wsgi_app)

# Get the Logger
logger = logging.getLogger("app")
logger.setLevel(logging.DEBUG)

# Add an ECS formatter to the Handler
handler = logging.StreamHandler()
handler.setFormatter(ecs_logging.StdlibFormatter())
logger.addHandler(handler)
logging.getLogger('werkzeug').setLevel(logging.ERROR)
logging.getLogger('werkzeug').addHandler(handler)

r = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)

@app.route('/favorites', methods=['GET'])
def get_favorite_movies():
    with tracer.start_as_current_span("get_favorite_movies") as span:
        user_id = str(request.args.get('user_id'))   

        logger.info('Getting favorites for user ' + user_id, extra={
            "event.dataset": "favorite.log",
            "user.id": request.args.get('user_id')
        })
        
        favorites = r.smembers(user_id)
        
        # convert to list
        favorites = list(favorites)
        logger.info('User ' + user_id + ' has favorites: ' + str(favorites), extra={
            "event.dataset": "favorite.log",
            "user.id": user_id
        })
        return { "favorites": favorites}

logger.info('App startup')
app.run(host='0.0.0.0', port=application_port)
logger.info('App Stopped')

Step 3. Running the Docker image with environment variables

As specified in the OTEL documentation, we will use environment variables and pass in the configuration values to enable it to connect with Elastic Observability’s APM server.  

Because Elastic accepts OTLP natively, we just need to provide the Endpoint and authentication where the OTEL Exporter needs to send the data, as well as some other environment variables.

Getting Elastic Cloud variables
You can copy the endpoints and token from Kibana® under the path `/app/home#/tutorial/apm`.

apm agents

You will need to copy the following environment variables:

OTEL_EXPORTER_OTLP_ENDPOINT
OTEL_EXPORTER_OTLP_HEADERS

Build the image

docker build -t  python-otel-manual-image .

Run the image

docker run \
       -e OTEL_EXPORTER_OTLP_ENDPOINT="<REPLACE WITH OTEL_EXPORTER_OTLP_ENDPOINT>" \
       -e OTEL_EXPORTER_OTLP_HEADERS="Authorization=Bearer <REPLACE WITH TOKEN>" \
       -e OTEL_RESOURCE_ATTRIBUTES="service.version=1.0,deployment.environment=production,service.name=python-favorite-otel-manual" \
       -p 3001:3001 \
       python-otel-manual-image

You can now issue a few requests in order to generate trace data. Note that these requests are expected to return an error, as this service relies on a connection to Redis that you don’t currently have running. As mentioned before, you can find a more complete example using docker-compose here.

curl localhost:500/favorites
# or alternatively issue a request every second

while true; do curl "localhost:5000/favorites"; sleep 1; done;

Step 4. Explore traces, metrics, and logs in Elastic APM

Now that the service is instrumented, you should see the following output in Elastic APM when looking at the transactions section of your Python service:

graph-1

Notice how this is slightly different from the auto-instrumented version, as we now also have our custom span in this view.

graph-2

Is it worth it?

This is the million-dollar question. Depending on what level of detail you need, it's potentially necessary to manually instrument. Manual instrumentation lets you add custom spans, custom labels, and metrics where you want or need them. It allows you to get a level of detail that otherwise would not be possible and is oftentimes important for tracking business-specific KPIs.

Your operations, and whether you need to troubleshoot or analyze the performance of specific parts of the code, will dictate when and what to instrument. But it’s helpful to know that you have the option to manually instrument.

If you noticed we didn’t yet instrument metrics, that is another blog. We discussed logs in a previous blog.

Conclusion

In this blog, we discussed the following:

  • How to manually instrument Python with OpenTelemetry 
  • How to properly initialize OpenTelemetry and add a custom span
  • How to easily set the OTLP ENDPOINT and OTLP HEADERS with Elastic without the need for a collector

Hopefully, this provides an easy-to-understand walk-through of instrumenting Python with OpenTelemetry and how easy it is to send traces into Elastic. 

Don’t have an Elastic Cloud account yet? Sign up for Elastic Cloud and try out the auto-instrumentation capabilities that I discussed above. I would be interested in getting your feedback about your experience in gaining visibility into your application stack with Elastic

The release and timing of any features or functionality described in this post remain at Elastic's sole discretion. Any features or functionality not currently available may not be delivered on time or at all.