Playing with Sockets and Processes in Elixir

Jon Yurek

I recently worked on a daemon application. No web stuff in this one. It served as a sink for real-time vehicle location data. It took all that data in on a normal TCP port. While I’ve done this kind of thing before, it’s been a while, and I hadn’t done it with Elixir before.

The actual mechanics of the port were simple and quite standard:

  1. Listen on a port.
  2. Accept incoming connections.
  3. Receive data.
  4. Pass it along.

This is the way things have been done since we came up with TCP. But since we’re sitting on top of the BEAM VM we don’t have to worry about any of these being in the same process or stepping on each others’ toes. We can:

  1. Spawn a process to Listen on a port.
  2. Spawn a process to accept incoming connections.
  3. Receive data and hand it to a process for buffering.
  4. Pass it along to another process.

Since each bit is in its own process, nothing blocks anything else, even if we use synchronous functions.

:gen_tcp

To access the raw TCP sockets, we’ll be using gen_tcp. It’s an Erlang module, because the need for this functionality has existed for a long time. You can tell you’re doing this in Elixir because you’ll access functions directly on an atom. The first thing we need to do is listen/2 on a socket. This is pretty straightforward:

defmodule Receiver do
  require Logger

  def start(port) do
    spawn fn ->
      case :gen_tcp.listen(port, [:binary, active: false, reuseaddr: true]) do
        {:ok, socket} ->
          Logger.info("Connected.")
          accept_connection(socket) # <--- We'll handle this next.
        {:error, reason} ->
          Logger.error("Could not listen: #{reason}")
      end
    end
  end
end

Here, active: false means we need to poll :gen_tcp for data. If we’d used true, we’d need to accept the incoming data as Elixir messages in our mailbox. That’s nice to have, but we don’t really need it, and it makes the workflow a little more complicated for what we’re trying to accomplish here. So we’ll go with active: false and poll for new data.

We don’t want the call to :gen_tcp.listen/2 to block everything. To prevent that, we wrap the whole function body inside spawn/1 so it runs in a different Process. Having listen/2 block is exactly what we want, but we don’t want it to preevnt anything else from running.

Acceptance

We’re in a Process listening for connections to the socket. When we have one, we want to be able to read data from it. But we also want to be able to handle more than one connection at a time. Never fear, we can spawn more processes!

The accept_connection function needs some implementation. Accepting a connection gives us our readable data source. Let’s fill that function in now.

def accept_connection(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  spawn fn ->
    {:ok, buffer_pid} = Buffer.create() # <--- this is next
    Process.flag(:trap_exit, true)
    serve(client, buffer_pid) # <--- and then we'll cover this
  end
  loop_accept(socket)
end

:gen_tcp.accept/1 waits for a connection to come in. When one does, we kick off a new process which creates a Buffer for it to use and starts right on listening for data in the serve function. We’ll write that function below.

What’s Buffer, you ask? It’s a module describing a line-oriented in-memory buffer that will hold input, parse out lines from that input, and send them individually to your sink. The details of it aren’t too important right now, but the management of its process is.

Window to the Soul

Inside this spawn, we create a Buffer for each socket connection and monitor that process. In Erlang (and so also in Elixir) processes are super lightweight. You’re supposed to be able to create them easily and recreate them easily.

To this end, there are two main ways to “care” about a process you create:

  1. spawn which starts a process and doesn’t care at all about its status
  2. spawn_link which starts a process “linked” to this one. The link means that if one crashes, so does the other. This is useful for making sure state doesn’t hang around and get stale and take up memory.

What neither do, though, is any kind of lifecycle tracking by default. Luckily, we do have some tools available to us.

  1. We can Process.monitor/1 any ol’ process we want, as long as we know the pid. If that process ends, we’ll get a :DOWN message. This works best with spawned Processes.
  2. We can trap exits with Process.flag(:trap_exit, true) like we have in the code above. This only works with spawn_linked Processes. When the child Process dies, the parent Process will receive an :EXIT message.

As you can see, it’s this second option that we’re using when we spawn our listener Process. We’ll know if that Buffer process ever crashes.

Collaborate and Listen

We’re listening for and accepting connections our socket. We’re trapping crashes on sub-Processes, and we’ve started up a Buffer to hold onto data while we receive it. So now we actually need to receive it. Since we started listening to the port with active: false we need to ask for the data. We’ll use :gen_tcp.recv/2 for that.

  def serve(socket, buffer_pid) do
    case :gen_tcp.recv(socket, 0) do
      {:ok, data} ->
        buffer_pid = maybe_recreate_buffer(buffer_pid) # <-- coming up next
        Buffer.receive(buffer_pid, data)
        serve(socket, buffer_pid)
      {:error, reason} ->
        Logger.info("Socket terminating: #{inspect reason}")
    end
  end

We wait forever for data to come in. When it does, we’ll make sure the Buffer is still running (or start a new one) and then hand the data over to it. Then we’ll keep waiting. Waiting like this is only possible because we’re doing it in a separate Process.

But the last piece of the puzzle is figuring out if the Buffer Process is still running. As we said above, we’ll get an :EXIT message if it crashed, so we should check our mailbox.

  defp maybe_recreate_buffer(original_pid) do
    receive do
      {:EXIT, ^original_pid, _reason} ->
        {:ok, new_buffer_pid} = Buffer.create()
        new_buffer_pid
    after
      10 ->
        original_pid
    end
  end

If we have an :EXIT waiting in our mailbox right now (or within the next 10 milliseconds), we should create a new Buffer. If there isn’t one, just continue on.

maybe_recreate_buffer/1 keeps things running transparently to the serve/2 function. Without a Buffer we can’t pipe data along to the functions that do real work on the information. So, with this, though the Buffer goes away, it’s immediately re-created.

The Sum Total

Now we have a simple socket server that works with line-oriented protocols and can hand off fully-formed lines into that server’s parser. The Buffer, while simple and probably unlikely to crash, handles crashes gracefully. We learned a bit about how exactly Processes relate to each other and how they can keep tabs on each other.

It’s important to note that this was written as an exploration of both sockets and Process interaction. It’s entirely likely that there’s a solution to this problem that is made more fault tolerant with the use of Supervisors, but that was not the goal of the exercise.