Imagine that you and I built an online virtual world. Neo, Trinity, and Morpheus are exploring our world (trying to figure out the nature of reality and truth, and possibly doing some kung-fu). A model of our world needs to live on our server, but also on each of their client devices.
Neo’s device, for example, needs to have a complete and consistent model of our world at all times to accurately render what’s happening and predict what will happen next. If his model diverges from the server model, or from the models seen by Trinity and Morpheus, he’ll experience a glitch in the Matrix, and reality itself may start to unravel!
What we need to provide each client is a consistent, ordered sequence of events that communicate everything that happens in the world. As long as all clients see this same ordered sequence of events (and assuming we didn’t write bugs or non-determinism), each client should be able to produce an identical model. If any events are missing or arrive out of order, bad things might happen. We’d also like updates to arrive as quickly as humanly (machinely?) possible.
Open sourcing EventStream
At FullStory, we’ve needed to solve problems like this several times, and we’re happy to share our core abstraction: EventStream. An EventStream is an ordered sequence of events connecting a single publisher to multiple subscribers. In a traditional Go channel, multiple readers compete to pull events from the channel, but in EventStream every subscriber sees the entire sequence of events. It’s like an immutable, append-only log with one writer and multiple readers.
I’ll talk about more interesting, real-world use cases below, but for now consider BWAMP. (If you’re not already a BWAMP user, you can read more about it here.) Under the hood, BWAMP works like a simple online chat room with no history. Clients need to be told two kinds of things:
Chat: person X sent BWAMP Y
Membership: persons X, Y, and Z are currently in the room.
Chat events take advantage of EventStream’s fast, consistent updates, instantly pushing updates to all connected clients. We can be sure that everyone in the room is seeing the exact same series of BWAMPs.
Membership events communicate when someone joins or leaves the room, so that all clients can maintain an accurate and up-to-date membership list. When a client first connects to a room, the server sends the complete list of all the current members, which clients store locally. Whenever someone joins or leaves, the server publishes an appropriate join or leave event. Connected clients apply these mutations to their user list, and everyone now has the same updated user list.
We’ll talk more about this later in the section on Log-replicated Model, but first let’s talk about how EventStream is implemented.
How EventStream works
Under the hood, EventStream is a linked list connecting a single publisher to multiple subscribers. The publisher keeps a reference to the tail of the list, so that it can publish new events by appending new nodes to the list. The tail node is “unready”. Every prior node in the list is “ready”. Publishing a new event marks the current tail node ready and appends a new unready node to be the new tail.
Each subscriber keeps its own reference to the first node it has not yet consumed. Subscribers iterate towards the tail of the list as they consume events. In the steady state, all subscribers will have iterated to the tail of the list, waiting for the next event to be published. Old nodes become unreachable (in the GC sense) once all subscribers have consumed them.
An EventStream is a linked list.
In the diagram above, Neo has consumed all ready nodes. He is waiting for the publisher to provide the next value and mark the tail node as ready. Trinity and Morpheus lag behind, having more nodes to consume. All older nodes (not pictured) are unreachable.
The internal structure of a node looks like this:
The tail of the list is the only node in an “unready” state. Its
ready channel is not yet closed, and its value and next pointer are both
nil. To publish a new event we must:
Assign the new event value into
Create a new
nodeto become the new tail
Link the old tail to the new tail by assigning old tail’s
nextto be the new tail
readychannel on the old tail, signaling subscribers
ready channel signals that a new event has been published, and serves as a data synchronization barrier in the Go memory model. Because all subscribers receive a copy of each value sent through the stream, all event values need to be effectively immutable.
A typical subscriber is an HTTP or gRPC stream handler whose inner loop looks something like this:
As soon as the
ready channel closes, all subscribers (parked on the
select statement) unpark and process the new event. (Aside: if you’re curious about the Go
select statement and why I’m using one here, see the section on Using select in this prior blog post.)
For the simple case where you only need to select over the EventStream plus the context, we provide an
Iterator decorator that yields a terser formulation:
So now we know what EventStream is and how it works, but how can we take advantage of it in the real world? The real power lies in the design patterns we build around EventStream. The primary design pattern we use is something I’ll call the “Log-replicated Model”.
To ensure that distributed clients reach the same, eventually consistent server state:
We define a model that we share between a server and all of its clients.
The server owns the original source-of-truth model, controls any changes to it, and publishes mutation events to an EventStream.
Each client gets an initial snapshot of the model plus a subscription to the EventStream.
A Log-replicated Model is much simpler than a Conflict-free Replicated Data Type (CRDT) or Operational Transformation (OT) model. In those more sophisticated models, distributed actors can apply concurrent mutations that must be merged together, potentially in different orders, to produce a consistent result. A Log-replicated Model has a single source of truth and a single ordered sequence of mutations. It’s much easier to implement correctly.
I’ll unpack this below, but first, I should offer a couple of real world examples of how we’ve put this into practice (beyond just BWAMP).
Real-world example #1: Solr monitor
At FullStory, we run hundreds of Apache Solr instances, containing tens of thousands of collections and hundreds of thousands of individual shard replicas. Solr keeps all of its configuration in Zookeeper, so if we need to know the IP address of a Solr instance, or better yet, which Solr node a specific collection and shard lives on, we have to ask Zookeeper.
But asking Zookeeper for routing information in the critical path of serving user queries added too much latency, and querying Zookeeper each time we needed to index a document would have made Zookeeper explode. We needed an up-to-date model of Solr’s routing information in each Solr client, so we wrote a sophisticated library using Zookeeper “watches” – which allow a client to read a piece of data, and then be notified the next time that piece of data changes.
This solution worked well at the time, and we even open-sourced it as solrmonitor. But we knew from the outset that this design would burden Zookeeper with
O(N*M) load as FullStory grew: (number of collections) * (number of Solr client instances). Zookeeper’s load began to rise sharply as our indexing service scaled up each day. As each new client came online, it needed to fetch a copy of the cluster state from Zookeeper and set watches). The number of watches registered with Zookeeper grew into the millions. We needed a better design!
solrmonitor into a service provided that better design. We took the existing
solrmonitor library and embedded it into a new gRPC streaming service backed by EventStream. Then we replaced the embedded, ZK based library in every other service with a gRPC client connected to
solrmonitor service. A small number of
solrmonitor servers talking directly to Zookeeper act as a massive force multiplier, publishing a Log-replicated Model far more efficiently than Zookeeper could.
Can you tell when we started and finished rolling out to all clients?
Zookeeper watch counts dropped to acceptable levels, the induced-load spikes completely vanished, and the
solrmonitor service itself needed fewer resources than we expected.
Real-world example #2: Webhook configuration
Webhooks allow FullStory customers to provide other applications with real-time information, delivering data to other applications as it happens. For example, customers may want to be notified via HTTP whenever a certain Custom Event is recorded.
Our indexing pipeline processes custom events, but it doesn’t know anything about webhooks. Instead, our pipeline notifies a dedicated microservice,
capnhook, which is responsible for delivering webhooks.
capnhook also owns and manages the customer configuration of webhooks– which events should trigger notifications for each customer.
Unfortunately, this design exemplified a common microservice problem: clean separation of concerns can lead to gross systemic inefficiencies. While FullStory processes millions of custom events per hour, only a tiny fraction trigger webhook notifications. Since our indexing pipeline could not tell the difference, it simply sent every custom event to
capnhook would then throw most of them away, but the sheer number of gRPC requests made
capnhook an order of magnitude more expensive than it needed to be.
It dawned on us that the total size of our entire webhook configuration (across all customers) is actually pretty small. Small enough to easily fit into memory. Small enough to apply the Log-replicated Model approach, wire up an EventStream over gRPC, and simply publish the entire model to every instance in our indexing pipeline. Now that the indexing pipeline has an accurate local model, it can cheaply filter events and only call
capnhook for events that actually trigger webhooks.
The most expensive no-ops you’ve ever seen.
Our indexing pipeline got more efficient, and the results for
capnhook were huge:
gRPC requests dropped by 90%
CPU usage and resource requirements dropped by 90%
Logging size dropped by 95%
How to write a Log-replicated Model
The following idealized Go struct demonstrates the server side of a Log-replicated Model:
Model abstractly represents some real, useful data model. We often design the model to be immutable and use copy-on-write to apply mutations. But sometimes it’s easier to make the model mutable and use copy-on-read instead. The important point is that once
ReadAndSubscribe returns the
Model back to the caller, it must be effectively immutable.
Mutation events must also be effectively immutable. As noted above, all stream values must be immutable to be safely shared among subscribers.
The read/write mutex (guarding both the snapshot and the EventStream) ensures that all clients observe a consistent association between the model and the stream. The snapshot is bound to a specific point in the stream: all previous mutations have been applied, and all subsequent mutations have not been applied. As long as mutations are applied deterministically, all subscribers will maintain identical models over time.
EventStream and gRPC streams
We’ve talked about the underlying mechanics of EventStream and the Log-replicated Model, but how do we actually wire up clients and servers? At FullStory, we use gRPC for all our service-to-service communication, and gRPC server streams are an excellent fit for EventStream. gRPC streams backed by EventStream power all of the use cases I’ve talked about.
Smart clients utilizing gRPC streams are typically provided by the service owners, because getting the nuances exactly right can be tricky. We put together a sample application to demonstrate how to write both server and client implementations.
The chatterbox sample is a gRPC streaming application that implements a simple command line chat service. It illustrates all of the pieces we’ve talked about:
gRPC streams (both server streams and bi-di)
Smart gRPC stream clients
You can peruse the code at github.com/fullstorydev/go/examples/chatterbox, but I recommend trying it out:
I don’t have space here to thoroughly describe everything in the sample, but I’ll try to hit some of the key points:
The monitoring client synchronously fetches a complete model from the server before returning control to the caller, and errors if it cannot. We write clients this way because we find it more robust to have microservices fail-fast on startup if they encounter problems. We don’t want a service instance to get into the “ready” state if it isn’t fully prepared to operate on a complete model.
After startup, the monitoring client runs an infinite loop, creating new gRPC streams whenever the old one is lost. New streams create a new model from scratch to replace the model from the previous stream (we may not be reconnecting to the same server instance, or even the same server version).
Client and Monitor intentionally duplicate a lot of code so that each example is clear.
Two different Model implementations illustrate copy-on-read vs. copy-on-write.
I hope you've found this article both interesting and useful. Ideally, your mind is already churning with ideas of how you might use EventStream to improve your own systems! If you find bugs or have suggestions, hit us up at github.com/fullstorydev/go with an issue or PR.
If you’re passionate about distributed systems and concurrency, or just want to work somewhere that is, we’re always looking for talented engineers. ❤️