Curious about how to write more idiomatic concurrent code in Go? It’s not always easy or intuitive, even if you’ve done lots of concurrent programming in other languages. I’ve been lucky to have worked in a well-written code base, and had the expert advice of Beats core area lead Steffen Siering along the way.
In this post I’ll walk you through how we implemented a new scheduler for Heartbeat that is part of our upcoming 7.6.0 release. I’ll talk about the high level concurrency strategy for implementing this scheduler, and some of the challenges faced therein. This post assumes a general familiarity with concurrent programming, and focuses on the high level approach to concurrency within our scheduler.
If you don’t already know, Heartbeat is the agent behind the Uptime app. It regularly pings a site via ICMP, TCP, or HTTP on a schedule to determine whether it’s up or down, and also reports some stats on its performance. The Heartbeat scheduler is in charge of dispatching those tasks at a predetermined interval. So our scheduler performs a job much like cron or
For Heartbeat’s purposes we need a scheduler that:
- Is reliable (our old one had some issues that surfaced in high concurrency)
- Is fast and memory efficient
- Supports both the addition and removal of jobs without stopping the scheduler
- Manages spawned subtasks (essentially continuations) from multi-part jobs
- Supports limiting the maximum number of running sub-tasks
A naive initial attempt
My initial attempt to refactor the scheduler was going to be so easy to write. I had the bright idea to lean on the lightweightness of goroutines and take an approach that while only seemingly naive, would be beautifully minimal and leverage the efficiency of Golang’s goroutine scheduler and
time.Timer. I was very wrong here.
In this scheme, for each scheduled task we’d launch a goroutine that immediately blocked on a
time.Timer, looping after each task invocation. This meant that for N defined jobs, we’d always have N goroutines, mostly in a blocked state waiting for their timers to trigger. My thinking was that goroutines are super-lightweight, and
time.Timer is efficient. So this should just work, would be easy to implement, would check for correctness, and would be the sort of thing that you could get working in some sort of rough shape in an afternoon. Well, it turns out only the last part was correct. After building a PoC of this it turned out that goroutines are not as light as I’d hoped, and neither is the
This naive approach used much more memory (at least 20% more) than the old timer, ran slower, and used more CPU in intensive use cases. I didn’t keep exact stats because the results were clearly terrible, and it was obvious that we’d need to take a more robust approach. My hope that lightweight Go internals would let me write dead simple code was completely off.
A more robust approach
After trying, and failing to find a silver bullet to fix the previous approach, I realized I needed to start over. It became apparent that we had to manage the resources more explicitly, and try to minimize
time.Timer instances and goroutines. The standard approach to this is to use a priority queue to keep track of which timer is executing next. This is a classic approach to this problem used in other timer libraries. Implementing it in a thread-safe way was quite a bit more effort. After a lot more work than my naive approach, we wound up with something that performed significantly better than the old scheduler in terms of both CPU and memory utilization.
The new implementation combines:
- A priority queue implemented via a heap, storing of functions for the dispatch of tasks.
- Go’s WeightedSemaphore for limiting concurrent task execution.
- Go’s WaitGroup to wait for all of a job’s sub-tasks to return.
- Go’s atomic CompareAndSwap functions to transition between states.
- Go’s Context to provide an easy method of terminating and ongoing tasks from the root of the code.
Our scheduler code is split into several packages:
- scheduler contains the high level scheduler which accepts new jobs, and handles their removal. It also contains the high level logic around requeuing jobs, executing their sub-tasks, and limiting concurrency.
- scheduler/schedule represents the definition of a schedule, and offers a
Next(now) time.Timemethod, that returns the next scheduled execution based on a given now value.
- scheduler/timerqueue is a thread-safe queue that lets you schedule tasks for a single execution making efficient use of timers.
Minimizing timer instances with a priority queue
A key part of keeping things efficient is minimizing the number of running
time.Timer instances. We only really need one timer, activating when the next scheduled task is ready, and then a check to see how many tasks are due to be executed. We only need to reset this
time.Timer if a new task is scheduled that’s due to be executed before any other tasks. So, what we need is a way of tracking our tasks in order of execution.
The perfect data structure for this is a priority queue, or heap. You can find our implementation in Heartbeat’s timerqueue/heap.go. With a heap, we get O(log n) insertion and removal of elements. O(1) ‘peek’ing at the highest priority item, and O(log n) ‘pop’ing of the highest priority removal (peek + delete).
This means every time a new item is scheduled, adding it is a matter of:
- Peeking to see if it is scheduled to run before the current highest priority item O(1)
- Pushing it onto the queue O(log n)
Go comes with a heap implementation, but Go’s heap implementation is not thread-safe. The TimerQueue type we created provides thread-safe methods built on top of Go’s heap operations. We spawn a background goroutine that listens for events that mutate the heap, thus serializing access to the underlying data structure. If you’re familiar with the active object pattern, this should ring a bell.
Running recursive tasks using weighted semaphores and wait groups
When Heartbeat executes a job, there may be more than one sub-task invoked as a result. For instance, if
mode:all is set, Heartbeat will ping each IP returned from the DNS lookup as a separate sub-task. Heartbeat supports limiting the number of running sub-tasks (not jobs) via the
scheduler.limit option. The core logic around this can be found in scheduler.runRecursiveTask.
Let’s examine the way Heartbeat limits task execution here. You may be wondering why we limit tasks, not jobs. The reason is that we want users to be able to limit the number of file descriptors in use at one time. One task will use at most one file descriptor. This lets users plan around OS file descriptor limits with more predictability. Golang’s WeightedSemaphore is a great fit for our needs here. It lets you define a quantity of some resource, then reserve some portion of that capacity. If a goroutine attempts to grab more resources than are available it blocks, waiting for that resource to be released. In our case we grab one unit per sub-task.
We use Go’s WaitGroup to determine when all of a job’s sub-tasks are done so we can do some post-processing.
WaitGroup lets you block a goroutine until a counter tracking some number of inflight tasks reaches zero. One nice property of
WaitGroup is that if you’re kicking off N tasks, you can just increment it by N, instead of invoking
add N times.
Using atomic operations to handle state transitions
The scheduler has a simple life cycle defined by a simple state machine. It can be in either a
stopped state, with no backwards transitions. To manage these transitions, we use atomic compare and set (CAS) operations, which are wrapped in Beats’ utility atomic package. We use this utility package to ease some of the awkwardness of using the Go atomic package, which is a bit cumbersome.
The nice thing about using CAS for a state machine is that you can ensure that a transition happens exactly once. See Scheduler.Start for an example of how we use it to start the scheduler at most once, no matter how many times
Start() is invoked.
Using context to shut down spawned tasks
The scheduler controls the lifecycle of most Heartbeat operations. As a result, it’s responsible for shutting down all user defined monitors when asked. There’s no way to say ‘shut down any goroutines you may have spawned’ in Go. You have to let those goroutines gently know it’s time to shut down.
An idiomatic way to do this is by passing Go’s context.Context throughout your code. By passing this type throughout the scheduling code, we can easily abort the entire tree of downstream processes by cancelling the top level context. The next time a spawned Go process does a select, it can attempt a channel read off
(Context).Done() and stop processing immediately. This does require you to always check
(Context).Done() any time you may block in a select, though. It’s a little cumbersome, and can potentially be tricky to integrate with non-channel based concurrency like mutexes and weighted semaphores, but it’s a great way to write code that can respond to shutdown requests correctly. Additionally, network/io libraries usually take a context, meaning that our shutdown can be correctly handled outside of code we’ve directly written.
Wrapping things up
I hope this has been a useful post in terms of looking at the high level concurrency strategy for go code. If you have any questions, comments, or feedback drop us a line on the Heartbeat topic on Discuss. And if you’d like to see Heartbeat in action, check it out on demo.elastic.co.