A network of data consumers and producers with Go, UDP, Redis and ZeroMQ

Ismael Celis

This post was originally published on the New Bamboo blog, before New Bamboo joined thoughtbot in London.


The final setup

Introduction

I’m building a fleet of small network daemons using Go, UDP, Redis and ZeroMQ to collect and aggregate metrics from a multi-server application.

Context and motivation

For a side project I run (multi-tenant, e-commercey), I wanted to collect some basic usage and server data but wasn’t sure what solution to go for. Should I go with a combination of Statsd + Graphite? Maybe get all functional with Riemann?

The only thing I was sure was that I wanted a place to send the data to, and worry about what to do with it later. I also wanted something loosely-coupled enough that it would allow me to experiment with different approaches while not committing too much to a particular stack or framework (if we can’t experiment on our own project then what’s the point?).

I also wanted to give Go a try, if I’m perfectly honest. And Redis. And ZeroMQ, and all the cool things that I don’t often get to play with.

So this is what I did. I built a series of small, focused Go daemons that talk to each other over the internal network.

An events hub

The first one listens to events broadcast by other apps over UDP. @sjltaylor and I originally started working on this during one of our company hackdays. We chose UDP so that emitting events wouldn’t have performance implications for the producers (this is what Statsd does). Of course the risk is that you might not get all events, but that’s something I can live with at this point.

Go allows you to nicely hide the complexity of network services running concurrently in the same process:

// start up UDP daemon
daemon, err := udp.NewDaemon(udpHost)

// setup websockets hub
wshub := ws.HandleWebsocketsHub("/ws")

// Push incoming UDP events on to the websocket clients
daemon.Subscribe(wshub.Notifier)

// Start up PUB ZMQ client
fanoutObserver := fanout.NewZmq(zmqAddress)

// Push incoming UDP events down ZMQ pub/sub socket
daemon.Subscribe(fanoutObserver.Notifier)

The full repository is available on GitHub.

Once it gets the event data, the events hub daemon doesn’t do much else with it. First, it pushes it down a websocket server running in the same process. Then, it forwards the stream down a ZeroMQ pub/sub socket. This daemon is all about publishing a consolidated event stream to whoever is interested in it.

Producers and consumers

In my Ruby apps, sending UDP events to the hub is as simple as (JSON encoded):

socket = UDPSocket.new
event = ActiveSupport::JSON.encode(type: 'request', time: Time.now, app: app_name, account: account_name)
socket.send event, 0, UDP_HOST, UDP_PORT

Next, I wrote another Go HTTP app that serves up a 1 pixel gif and collects client-side request data. I embed this along with some JavaScript in my customers’ pages to collect pageview and uniques data across multiple domains. (Could I have used Google Analytics instead, or any number of mature analytics solutions? Certainly. Would that have been as much fun? No chance). This web analytics server does little more than collect the data into a standardised format, parse user agent info and push it down on the UDP socket where my events-hub daemon is listening.

// UDP client publishes events over UDP
pub := udp.NewPublisher(udp_host)

// HTTP router
router := mux.NewRouter()

// Bind an HTTP request handler that publishes parsed GET requests to UDP publishers
// PageviewsHandler is a function generator
router.HandleFunc("/r/{app_name}/{account_name}/{type}", PageviewsHandler(gif_path, pub)).Methods("GET")

I also wrote a few other scripts that collect basic server metrics and also push them periodically on to the UDP events hub.

So this is how all sorts of data get into a central hub, but I haven’t done anything with it yet. I have the producers but no consumers on the other end. On to that.

The first consumer I wrote is a client-side JavaScript app that listens on the hub’s websocket and displays some basic real-time charts. Having even this simple overview of the current state of a multi-server platform is quite satisfying. Especially if you wrote it yourself. This led me to start playing with a couple of potentially interesting ideas in JavaScript land, but that’s blog post for another day.

Real-time chart for the full even stream. Colour-coded bars are different accounts

I also at this point have the full stream of data going into a ZeroMQ socket. I can now write scripts in any language that subscribe to all or parts of it and do different things such as aggregating, persisting, forwarding to other services, etc. The following example Ruby script subscribes to “pageview” events and just prints them out to the terminal.

require 'zmq'
require 'msgpack'

context     = ZMQ::Context.new
channel     = "pageview"
sub         = context.socket ZMQ::SUB

sub.connect "tcp://xxx.xx.xx.x:6000"

sub.setsockopt ZMQ::SUBSCRIBE, channel

puts "Listening to events"

while line = sub.recv
  chan,  msg = line.split ' ', 2
  event = MessagePack.unpack(msg)
  puts "##{chan}: #{event['data']['account']} tz #{event['data']['tz']}"
end

Event data is serialised using MessagePack. I did say I wanted to use ALL THE THINGS.

This is nice. It means that I can still fairly easily push these data onto Statsd or other services if I choose to. Because all the individual parts communicate over the internal network, the whole thing can be scaled up by moving individual processes into separate machines or VMs.

Aggregating

So I wanted to make this data useful to my customers. Pageviews and uniques are a nice straight-forward metric, and my product should offer some basic overview even before they go and install Google Analytics (which they can).

To this effect, I wrote yet another Go script that subscribes to “pageview” events and increments Redis hashes where the keys are composed of segments of the current date. So if your account is “acme” and today is September 5 2013, the following data structures are populated and incremented in Redis.

Current year

Hash name encodes account, event type and year. Keys are month number in the year.

"acme/pageview/2013": {
  "08": 100,
  "09": 101 // <= this key gets incremented in September
}

Current month

Hash name encodes account, event type, year and month. Keys are day number in the month.

"acme/pageview/2013/09": {
  "01": 10,
  "02": 5,
  "03": 3,
  "04": 23,
  "05": 101 // <= this key gets incremented today
}

… And so on with current hour, minute or whatever granularity I want. The increments are performed atomically using Redis’ HINCRBY Hash operation. In Go:

// increment current hour in day
dayKey := fmt.Sprintf("track/%s/%s/%s/%s/%s", key, evtType, yearAsString, monthAsString, dayAsString)
conn.HIncrBy(dayKey, hourAsString, 1)

Note that the hash names look like URL paths. This is because the same daemon exposes these structures as a simple JSON HTTP API, almost straight from Redis. Both writing and reading the data is very fast, and enough for me to build nice simple charts my customers can see on their dashboards.

Pageviews timeseries graph

The future

The nice thing about a general events infrastructure like this is that you can start thinking about useful use cases after the fact. For example I might write a consumer that stores the full event history into ElasticSearch for easy storage and filtering, or small proxies that just forward the stream onto Librato, Mixpanel or any other 3rd party product I might want to test-drive. (in fact bootic stathat is a tiny one that buffers events and POSTs them to Stathat periodically).

As an architecture, lightweight as it is it’s got the potential to lead to new products and more value to the end customer. This could be the basis for a more robust webhooks system my customers could leverage to integrate with their own apps.

Go

Although conceptually this approach is essentially language agnostic, I’ve found that the Go language nicely suits this kind of small, fast and multi-transport approach. Even though I’m by no means a Go expert, it’s allowed me to get really far in designing fast, concurrent network software. I’ll try to blog about this in more detail at at later date.

Update

I’ve given a talk at the London Ruby User Group meetup where I further explain the work illustrated in this post. You can watch the video and slides.