Writing a Server Sent Events server in Go

This post was originally published on the New Bamboo blog, before New Bamboo joined thoughtbot in London.


the stream

I took the opportunity of our last New Bamboo Hack Day to write a Server Sent Events server in Go. The idea was to allow third parties to subscribe to a pre-existing, internal events stream for an e-commerce platform. I also needed to authenticate this service via access tokens so only authorised users or programs can access it.

I based my implementation largely on this example, and my own finalised production code is here. In this article I’ll take you through some of the code to illustrate how elegantly can Go model network concurrency. The patterns described below can be adapted to other problems where a server needs to keep open network connections and broadcast messages to all or some of them. This includes Websockets, chat servers, message brokers and others.

The server

The first step is to define a struct to represent the server instance and hold basic state.

// A Broker holds open client connections,
// listens for incoming events on its Notifier channel
// and broadcast event data to all registered connections
type Broker struct {

    // Events are pushed to this channel by the main events-gathering routine
    Notifier chan []byte

    // New client connections
    newClients chan chan []byte

    // Closed client connections
    closingClients chan chan []byte

    // Client connections registry
    clients map[chan []byte]bool
}

Notifier is a channel of byte arrays - an individual byte array representing a single event -. An external goroutine will push events to this channel as they come.

// This happens in a goroutine somewhere else.
broker.Notifier <- eventBytes

What and how events are pushed to the broker depends on your use case. My server receives JSON-encoded events from the outside via UDP and broadcasts them to any listeners.

newClients is a channel that takes and registers newly open HTTP connections. This is where things start getting interesting: newClients is a channel of channels! The reason will become clear in a second.

closingClients is notified when a client connection closes. Again, connections are themselves represented as channels.

clients is a map that holds the currently open connections, so incoming events can be broadcast to them.

Handling connections

Our Broker will implement Go’s http.Handler interface to handle HTTP connections. This will allow it to interop with other HTTP handlers such as routers or middleware (more on that later).

func (broker *Broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {

The rw response writer itself is only required to comply with the basic http.ResponseWriter interface, but for a streaming server we also need to make sure that it supports the http.Flusher interface (so we can flush buffered data down the connection as it comes). Lets check for support or bail out.

// Make sure that the writer supports flushing.
//
flusher, ok := rw.(http.Flusher)

if !ok {
    http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
    return
}

This is a good illustration of how Go interfaces allow you to compose behaviour into objects, and the Go standard library itself makes full use of this feature.

Now we’ll set basic headers to support keep-alive HTTP connections (so clients don’t close them early) and the “text/event-stream” content type for browsers that support Server Sent Events via the EventSource API. We’ll also add a Cross-origin Resource Sharing header so browsers on different domains can still connect.

// Set the headers related to event streaming.
rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Connection", "keep-alive")
rw.Header().Set("Access-Control-Allow-Origin", "*")

Next, each new connection creates a channel of events that it will register with the Broker. It does this by passing its events channel to the broker’s newClients channel.

// Each connection registers its own message channel with the Broker's connections registry
messageChan := make(chan []byte)

// Signal the broker that we have a new connection
broker.newClients <- messageChan

Before we carry on, let’s make sure we notify the broker if our connection dies for any reason. This is done by deferring a function until the current connection handler exits.

// Remove this client from the map of connected clients
// when this handler exits.
defer func() {
    broker.closingClients <- messageChan
}()

Next, we need to also notify when the connection is closed by the client (ie. the client disconnects). We do this by leveraging our ResponseWriter http.CloseNotifier interface, which gives us just that functionality.

// Listen to connection close and un-register messageChan
notify := rw.(http.CloseNotifier).CloseNotify()

go func() {
    <-notify
    broker.closingClients <- messageChan
}()

The CloseNotifier#CloseNotify() method returns a channel that will emit a boolean when the connection closes. We launch an anonymous function as a goroutine to listen for that case without blocking the outer HTTP handler.

Finally, we wait for this connection’s messageChan to give us events in a loop and write the event data to the response writer.

// block waiting for messages broadcast on this connection's messageChan
for {
  // Write to the ResponseWriter
  // Server Sent Events compatible
  fmt.Fprintf(rw, "data: %s\n\n", <-messageChan)

  // Flush the data immediatly instead of buffering it for later.
  flusher.Flush()
}

This loop will block on messageChan waiting for the next event. Because the loop never ends this will keep our connection handler open and listening on the socket until it’s closed by the client.

Running

So our broker spawns a new #ServeHTTP() connection handler for each new connection, and each handler will notify the broker’s channels when they are created and closed. Handlers will also listen on their own messageChan channels for incoming event data. But we still have to glue these channels together and make them actually work. For that we setup a factory for a broker instance and set it running and orchestrating all the different channels.

// Broker factory
func NewServer() (broker *Broker) {
  // Instantiate a broker
  broker = &Broker{
    Notifier:       make(chan []byte, 1),
    newClients:     make(chan chan []byte),
    closingClients: make(chan chan []byte),
    clients:        make(map[chan []byte]bool),
  }

  // Set it running - listening and broadcasting events
  go broker.listen()

  return
}

The key here is the Broker#listen() method. We launch it in a goroutine so it doesn’t block the main program. Its job is to select on the different channels and take the appropriate course of action.

// Listen on different channels and act accordingly
func (broker *Broker) listen() {
  for {
    select {
      case s := <-broker.newClients:
        // A new client has connected.
        // Register their message channel
        broker.clients[s] = true
        log.Printf("Client added. %d registered clients", len(broker.clients))

      case s := <-broker.closingClients:
        // A client has dettached and we want to
        // stop sending them messages.
        delete(broker.clients, s)
        log.Printf("Removed client. %d registered clients", len(broker.clients))

      case event := <-broker.Notifier:
        // We got a new event from the outside!
        // Send event to all connected clients
        for clientMessageChan, _ := range broker.clients {
          clientMessageChan <- event
        }
    }
  }
}

Go channels, used in this way, allow your programs to deal safely with concurrency while avoiding threads and locks. Lastly, we instantiate a broker and run it as an HTTP server in our main function. Because it implements the http.Handler interface, it can be passed to http.ListenAndServe() to start listening on a network address.

func main() {
  broker := NewServer()

  log.Fatal("HTTP server error: ", http.ListenAndServe("localhost:3000", broker))
}

Receiving events

The broker’s Notifier channel will broadcast events to all open connections, but where does the notifier get events from? That will depend on your use case - it could be events from an external source, a message queue, other connections, etc. For the sake of illustration I’ll start a go routine that pushes random events to the notifier at regular intervals.

func main() {

  broker := NewServer()

  go func() {
    for {
      time.Sleep(time.Second * 2)
      eventString := fmt.Sprintf("the time is %v", time.Now())
      log.Println("Receiving event")
      broker.Notifier <- []byte(eventString)
    }
  }()

  log.Fatal("HTTP server error: ", http.ListenAndServe("localhost:3000", broker))

}

Run it:

$ go run sse.go

Listen to the HTTP stream in your terminal:

$ curl localhost:3000

Try connecting from more than one terminal at once.

Finally, you can try in an EventSource-compliant browser:

var client = new EventSource("http://localhost:3000")
client.onmessage = function (msg) {
  console.log(msg)
}

The full working Gist is here.

In future articles I’ll show how you can use Go’s http.Handler interface to add authentication to a HTTP streaming server.