Tech Topics

Streaming POST Requests to Elastic APM in Ruby with http.rb and IO.pipe

To keep the memory footprint of our Elastic APM agents small, the agents stream events as they happen via long-running POST requests to APM Server The server accepts ndjson which is JSON objects separated with newline \n characters. Can’t get simpler than that.

Streaming in Ruby can be tricky — Ruby is synchronous, so with regular HTTP requests, the program will block and hold everything else back until the request is finished. This is of course not what we want in this case, as our app should continue to serve our users while it is sending its events one by one to APM Server.

We instead start the request in a thread and pipe the events from our main program thread to the request thread using Ruby’s built-in IO.pipe The wonderful Rubygem http.rb supports IO objects so we’ll depend on that.

require 'bundler/inline'

gemfile do
  source 'https://rubygems.org'
  gem 'http'
end

read_pipe, write_pipe = IO.pipe

# Unfortunately for our case Http.rb calls `rewind` on the IO
# object after the request ends but pipes don't have such a method.
# We'll add an method stub to circumvent this:
read_pipe.define_singleton_method(:rewind) { nil }

# First we make an HTTP client and set the Transfer-Encoding
# header to let the server know that we'll be sending in chunks
client = HTTP.headers({ 'Transfer-Encoding' => 'chunked' }) 

# In a thread we open the request and give it the read pipe
request_thread = Thread.new do
  # This thread is blocked on the request below while the main
  # program thread continues on
  client.post('https://example.com', body: read_pipe).flush
end

# Next let's simulate 10 events with a bit of pause between them
1.upto(10).each do |number|
  data = { number: number }
  write_pipe.write("#{data.to_json}\r\n")
  sleep 0.2
end

# Then when we're done sending our data, we'll close the pipe
# which will also end the request
write_pipe.close

# When the request's body is closed the request thread will reach
# its end. We'll wait for it before we end the script.
request_thread.join

This is a constructed example script of course. In a real-world instance of the Ruby APM agent we have timeouts and byte size limits for the requests that, when reached, ends the current request and starts a new one.

We also have GZip – let’s add that to our example:

require 'bundler/inline'

gemfile do
  source 'https://rubygems.org'
  gem 'http'
end

# We'll need the built-in zlib library
require 'zlib'

read_pipe, write_pipe = IO.pipe

read_pipe.define_singleton_method(:rewind) { nil }

# Toggle binary mode for the write pipe
write_pipe.binmode
client = HTTP.headers({
  'Transfer-Encoding' => 'chunked',
  # Add Content-Encoding header to let the server know we are
  # gzip'ing now
  'Content-Encoding' => 'gzip'
}) 
request_thread = Thread.new do
  client.post('https://example.com/', body: read_pipe).flush
end

# Initialize a new GZip pipe and put it in front of our existing
# write pipe
gzip_pipe = Zlib::GzipWriter.new(write_pipe)
(1..10).each do |number|
  data = { number: number }
  # Writing looks the same but uses the gzip pipe instead
  gzip_pipe.write("#{data.to_json}\r\n")
  sleep 0.2
end

# Close the gzip pipe instead
gzip_pipe.close

request_thread.join

There you have it

Streaming requests in Ruby might require a bit more effort than in, say, JavaScript with its evented nature, but this isn’t too bad, is it? Doing this, the agent can release its objects as soon as they are sent. The Ruby APM agent uses this approach from 2.0 and onwards.

Adding Elastic APM to your Ruby app is as easy as ever, requiring as little as adding the gem to your Gemfile. Learn how and get the agent streaming metrics for your app today. As always, if you have any questions or feedback, start a new thread on our APM forum.