In this series of articles we will highlight the thought process, mistakes, experiments, discoveries and ultimately design choices we followed to route our customers’ transactional notifications at different rates using Kafka and Cassandra.
Introduction
Our transactional API is made for 1-to-1 interactions and the ability to send transactional notifications has always been a core feature of Batch.
This API allows developers to programmatically send personally tailored push messages to large audiences as fast as your rate allows it.
While transactional messages account for a modest percentage of the billions of notifications we send each month they usually carry very high-value, time-sensitive messages — hence the need for great reliability and ease of use. For instance, rate limitations are handled by us as we didn’t want to let developers carry the weight of implementing rate limitations themselves. Just send as you go and don’t worry about the rate.
In this article, broken down into two parts, I will detail our approach to build this mission-critical piece of infrastructure. The first part focuses on conception, the second on implementation.It assumes that you are already familiar with Cassandra and Kafka — or at least looking to improve your understanding of it.
The transactional throttler
Our entire infrastructure relies on microservices. Kafka is its backbone.
To route a single notification from its input source (the API) to the output (the notification cloud provider a.k.a Apple and Google’s APNS and FCM) several services are involved.
Here we’ll focus on the Transactional Throttler shown below (you are looking at the last evolution). Its role is obviously to buffer notifications and throttle them at the correct customer’s rate on the output end. Each bunch of notifications is split apart into single notifications and enqueued into Kafka. To avoid notifications being sent too late (too many notifications for a too low rate), we enforce a default TTL
of 6 hours; you can specify yours — a smaller one — as well.
Genesis
4 years ago when we were only working on the foundations of Batch and still talking to app developers to grasp their needs, it quickly appeared that transactional notifications were a must have.
We did not find the perfect design on our first attempt, far from it.When we first designed this application we had concluded it had to be a queue somehow, a per consumer queue to be more specific, because each consumer had his own rate, and of course stored somewhere.
Back then, we were barely using Kafka — in its early version (0.8). We could have created one topic per customer, which would have greatly simplified our task, but multiplying the number of topics in Kafka was not easy to maintain and absolutely not recommended. We did not choose Kafka.
Cassandra 1.2 on the other hand was already a key part of our stack. But the idea of using Cassandra as a queue was more of an anti-pattern than a great idea. Unlike with Kafka, it was hard to imagine how to synchronize concurrent consumers accessing the queue. For those reasons, we did not choose Cassandra either.
As far as I can recall we ended up being on a tight schedule in the early days (not an excuse though) as the first beta release of the platform was approaching. So we implemented what seemed to be the quickest and most obvious solution: a queue using MySQL. The notifications were directly stored from the API into MySQL, and several instances of the throttler were consuming this queue. By using transactions we were able to ensure a concurrent access.
For the following years new customers kept flowing. The more developers were using this feature, the more our initial product design became a burden. Not because MySQL is a bad product — MySQL is still a key part of some of our products — but in our case it did not quite cut it, here is why.
- Designing a queue in MySQL was not a great idea (either)
- MySQL doesn’t scale horizontally
- Replication is hard, you can setup slaves but master switching is manual
- We had to fight with lock contentions: more customers means more inserts & more reads
- There is no TTL on rows, we had to delete rows on our own to keep the size of the database under control
- We were more comfortable working with Cassandra under heavy load than MySQL
We scrambled to bring patches and fixes as the product was greatly evolving always trying to make our use of MySQL more reliable or at least reduce the failure risk and relieve the heavy load on it. But in the end it was not enough.
A complete redesign
The time had come to definitely reconsider the throttler design and synthesize all the new thoughts and ideas we had during those early years.
Kafka had become unavoidable for communications between our different applications. No exception here, as our first move was introducing a Kafka topic between the API and the throttler. First because our response time on the API was not dependant on MySQL anymore but on Kafka — what a relief! — and secondly we were able to consume in parallel using a new consumer group and start building new experimentations.
Then we analyzed the way the current throttler was working and we found that only a few customers were sending enough notifications to exceed their rate. Thereby a lot of insertions in MySQL were just useless and we could have sent those notifications directly.
From this point forward, instead of considering this product as a big stored queue, we started thinking as an overflow-buffer. This truly was a mind-changer, because a buffer does not imply the same constraints: it can be temporarily unavailable/deactivated, have some lag, and obviously is smaller than a queue. The product was now looking more like a live sender with an aside buffer.
The concept was pretty simple: for each notification to send that would cause the customer rate to be exceeded, we will store it in a per customer buffer; and once the actual consumed rate will become low again, the buffer will be drained to the output, always honoring the customer rate.
An idea was born! :)
Storing the buffer
Reasonably we could just have simply rewritten the throttler to send notifications directly if the rate allowed it and keep the current stack going. But we wanted to get rid of MySQL for resiliency and reliability reasons.
As it usually happens when you have to rewrite a service, the opportunity to rewrite it from scratch became the most obvious way to go, at least in my own experience.
Despite the idea that designing a buffer into Cassandra was an anti-pattern — because a buffer still looks like a queue — we had the feeling that this was the right choice for us, we knew that it would be distributed and reliable. After working with Cassandra for more than 5 years we had the confidence that we certainly would avoid the common mistakes and leverage the advantages of Cassandra for this design.
Attempting a design with Cassandra
Here is a sum up of our initial thought process to use Cassandra as our buffer storage:
- We wanted to store a queue per customer: in the primary key, we would certainly have the customer id.
- We wanted the ability to read back the next sorted elements per customer using one query: we would likely use Cassandra wide rows for that.
- A wide row implying a clustering key, we needed a unique sortable key for this purpose: a time based
UUID
looked suitable for this. - To avoid hotspots, we needed the data and the queries to be spread evenly over the Cassandra nodes. Having only the customer id as a partition key was not enough so we added a partition in which we could store a hash on any unique value (like the
TUUID
itself). - We did not want to keep those rows indefinitely — after all this was only a buffer — so we defined a
TTL
: 3 days was greater than our 6 hours default TTL and enough for debugging purposes. - Considering we are using Cassandra 3 we thought we could use the TimeWindowCompactionStrategy that would gather
SSTables
of the same period (we chose 1 day) and get rid of the entireSSTables
once theTTL
is reached.
So let us take a look at the first schema we produced with a table we obviously named overflow_buffer
:
1 CREATE TABLE overflow_buffer
2 (
3 customer_id text,
4 partition int,
5 time tuuid,
6 recipient text,
7 message text,
8 PRIMARY KEY ((customer_id, partition), time)
9 )
10 WITH CLUSTERING ORDER BY (time ASC) AND default_time_to_live = 259200
11 AND compaction = {'class': 'TimeWindowCompactionStrategy', 'compaction_window_size': '1', 'compaction_window_unit': 'DAYS'}
That seemed to be a good start.
- We would insert notifications in a way that will not generate hotspots on Cassandra.
- As long as we were not selecting expired rows (don’t ever select too many tombstones in Cassandra) we were able to retrieve the stored notifications and handle them.
Some polish
Before going further, we had already noticed two small modifications that would improve our first attempt:
- Cassandra keeps tombstones for a period before collecting and deleting them to avoid ghost rows, in other words to avoid deleted rows reappearing. The main solution is to run a repair on the column family before this deadline and ensure the tombstones have been correctly replicated on all replicas. But in our case we were using
TTLs
and we were not deleting anything. Setting 1 hour (3600s) to thegc_grace_seconds
would be enough. - Cassandra uses a bloom filter to quickly skip the
SSTables
that do not contain the key you are looking for. The compaction strategy we were using would gather all rows for one day into a small number ofSSTables
. If we appended to the partition key the day (GMT), we would help Cassandra speed up lookups on selection, despite we would be only able to read the rows for one day per query — which sounded like an acceptable compromise.
1 CREATE TABLE overflow_buffer
2 (
3 customer_id text,
4 day int,
5 partition int,
6 time tuuid,
7 recipient text,
8 message text,
9 PRIMARY KEY ((customer_id, day, partition), time)
10 )
11 WITH CLUSTERING ORDER BY (time ASC) AND default_time_to_live = 259200
12 AND compaction = {'class': 'TimeWindowCompactionStrategy', 'compaction_window_size': '1', 'compaction_window_unit': 'DAYS'}
13 AND gc_grace_seconds = 3600
Bringing concurrent access
Distributing the throttler over several instances was of course a main requirement. But distribution comes along with concurrent access and synchronisation issues.
At this point, we still did not know how we would keep track of the consumed items in our buffer, but we certainly knew that resolving the concurrency issue would help us on this later. Indeed if two throttlers were consuming from/writing to the same buffer concurrently, how to make sure their access would be synchronized? How to avoid them handling the same notification and send it twice?
Except by using Lightweight Transactions, which are costly for the cluster, Cassandra was not offering any other ways to synchronise operations. Then we had to be smarter and find another solution.
This is where Kafka took his first main role.
Why not resolving concurrency by avoiding concurrency?
Taking advantage of partition assignment
As you may have noticed, we were already using partitioning with both Cassandra and Kafka: we did spread our buffer on several partitions into Cassandra to avoid hotspots, and we introduced an input Kafka topic, consumed by several throttler instances, that was also by design splitted into several partitions (this is how distribution/repartition works with Kafka).
Gracefully, since version 0.9, Kafka automatically handle partitions assignment to each consumer in a consistent way. Which means that we were ensured that one and only one consumer would consume a partition at a time. Hence the topic was still accessed in a concurrent fashion but all the complexity was tackled down by Kafka during the assignment phase. One caveat though, the concurrency potential is certainly made possible by the partitioning, but is also limited by the number of partitions.
We thought it pretty handy to take advantage of this feature offered by Kafka. We chose to use the same partitions for the buffer storage and for the input consumption: to one input partition was associated one partition of the buffer — the same actually. This way the buffer would have the same number of partitions than the input topic, and each time a partition would be assigned to a consumer, this consumer would be in charge of:
- Reading from this partition
- Sending what has to be sent directly from this partition
- Buffering what couldn’t have been sent into the dedicated buffer for this partition and for each customer
- Reading back from the buffer for this partition
Big step! Using Kafka for distributing (or for atomicity) is something we’re doing a lot at Batch.
Sublimate the union between Kafka & Cassandra
However, as the conception were making progress, a field in our Cassandra schema drew our attention: the time TUUID
field. It was not perfectly fitting our needs anymore.
The nice part of the TUUID
type was the sortable property, this is why we had chosen it in the first place, but the drawback of this type resided into the time based part. The probability was extremely low, but under certain conditions we may lose the guarantee that all inserts will be read back in the same order.
In Java for example, inside a same running instance, the UUIDs
class provided into the standard Cassandra client library, prevents you from generating a new tuuid from a clock that went back in time since the previous tuuid generation. But after a rebalance, initiated by Kafka, the same partitions may be assigned to some other instances with a clock slightly unsynced — which is not recommended if you are using Cassandra — and then we would have taken the risk to produce some non continuous tuuid values.
Another flaw of using a TUUID
type field in our case was the uniqueness, which was also why we had chosen it. Indeed, while handling a record from Kafka, we can experience a lot of issues like: unable to read/decode, unable to insert into the buffer, unable to write on the output topic, unable to commit, and so on…
This is generally where you have to choose wisely when in your process you will commit the consumed offsets.
For instance we could have chosen to commit right after reading from Kafka, but if an error occurred we would not have the chance to retry. Actually, for one record, the later we commit the more we have a chance to retry, but under one condition: the operations between the read and the commit have to be idempotent. Since each generation of a new tuuid was ensured to be unique (almost), in case of a redo we would not have be able to write again into our buffer in an idempotent manner.
We had to choose another value that would both respect the continuous ordering and the idempotent contract. Here again, because we introduced this input topic, Kafka was already providing us another unique, continuous and sortable value inside the same partition. This value is a sequential id number called the offset uniquely identifying each record within a partition. Looks perfect!
Choosing the offset completely transformed the ‘Cassandra buffer’ as a ‘Kafka dedicated buffer’, but we thought this was something we could easily afford — and that was exactly the main idea behind this rewrite.
The cassandra table representing our buffer was then looking like:
1CREATE TABLE overflow_buffer
2 (
3 customer_id text,
4 day int,
5 partition int,
6 offset bigint,
7 recipient text,
8 message text,
9 PRIMARY KEY ((customer_id, day, partition), offset)
10 )
11 WITH CLUSTERING ORDER BY (time ASC) AND default_time_to_live = 259200
12 AND compaction = {'class': 'TimeWindowCompactionStrategy', 'compaction_window_size': '1', 'compaction_window_unit': 'DAYS'}
13 AND gc_grace_seconds = 3600
The partition and the offset are both information coming from Kafka.
Note that we kept the column day into the partition. We will never have to select rows older than a few hours (beyond that they will be expired). At most, we’ll have to execute two requests to find a bunch of rows.
Storing the buffer state
Great. We had one last requirement to meet: how do we know if a row has already been handled?
A wrong solution would have been deleting the handled records, because this would have led you to select tombstones — and as I mentioned earlier if you do this, you are headed for troubles. Selecting a row full of tombstones is a waste of time for Cassandra and and be sure the database will protect itself against it!
A first element of the answer would be to maintain another table with the last offset handled for each customer and partition. The primary key would certainly have been like ((customer_id, partition))
. This solution was perfectly acceptable but was coming with one drawback: at startup or after a rebalance, the scheduler had to read back all the last offsets for each customers for the partition it was now handling.
I have never been really comfortable with the fact of reading back an entire Cassandra table, I always thought it was a bad design, plus we were having a much better solution!
Gracefully, Kafka comes with a huge feature — my favorite one:a changelog; a feature used by Kafka itself to store the commit offsets for each consumer groups (since 0.8).The principle is simple: you produce your changes into a topic, defined by a key and a value (for instance the customer_id
as key and an offset as a value) and Kafka only maintains for each partition and each key the last value.
How does it work? Kafka comes with two strategies to reduce the disk space used by a topic:
- By setting a retention and dropping stale records,
- By compacting a topic and keeping the last item for each key. If the value is null, the item is completely deleted - the strategy used for a changelog.
That’s the theory. In practice, the compactions are triggered on different thresholds and with some delays. If you consume this topic from the beginning there is a significant chance you will retrieve several changes per key and not, as we explained earlier, only on the last one. No big deal, you just have to do the same: keep locally the last value for each key.
This was looking perfect for us. We could store in a dedicated topic the last offset, using the customer_id
as key and the same partition as the input topic (and by extension the buffer). To completely remove this key from the topic when the buffer was becoming empty, we had just to push the value null. With this strategy, the startup procedure was much easier: we just have to read the topic from the beginning for each assigned partitions and construct the exhaustive list of customers, with their respective offset, that still had items into their buffer.
Conclusion
Well, at this particular moment we had a pretty good idea of where we were going, and how to handle the overflow buffer. I have to say that using the Kafka as a changelog was very promising!
In the next part, which talks about the implementation, we will discover that the changelog was a bit more complicated than what we just had planed. Indeed, to reach the requirements we’d defined for this project we would have to make some changes/improvements.
Thanks for reading so far! Let’s see you on the next part ;)
Nicolas DOUILLET
Head of Engineering @ Batch