At Tapjoy, analytics is core to our platform. On an average day, we’re processing over 2 million messages per minute through our analytics pipeline. These messages are generated by various user events in our platform and eventually aggregated for a close-to-realtime view of the system.
For our technology stack, we use Kafka to store the messages, Spark to consume and aggregate the data, and Postgres to store the aggregates. In order to deliver the data in a timely and accurate manner, we aimed to build a system that was scalable, highly available, performant, and cost-effective.
In this post we look at how we handled the at-least-once semantics of our Kafka pipeline through real-time deduping in order to ensure the integrity / accuracy of the data.
Messaging systems tend to provide one of two types of delivery semantics for consumers: at-least-once and at-most-once. At-most-once semantics means messages may get lost; at-least-once semantics means messages may be duplicated. For critical systems, data loss is unacceptable. For example, if we’re reporting ad conversions that our partners will use to make spend decisions then our data must be accurate.
At Tapjoy, every messaging system we use provides at-least-once delivery guarantees: SQS, RabbitMQ, Kafka, etc. However, we want to process messages *exactly once* in order to avoid over-reporting analytics. This poses the question: if the same message gets written to Kafka twice, how do you ensure that message is only processed once by our client application code?
This is where dedupe comes into play.
Given the scale of the pipeline and the downstream dependence on accurate data, any dedupe solution must be:
At a high-level, the approach we’re looking at isn’t much different than a standard dedupe algorithm. For every message:
This is the approach we use in a lot of places and it’s the general strategy we use here.
In Spark, ETL jobs are run in multiple stages. At any point, a stage may fail and need to be retried or a job may get killed and the application needs to get restarted. The impact of this is that if a job dies *after* we’ve marked the transaction ids as having been seen, then we need to make sure the job is able to process that same message when it’s re-run.
To address this issue, our dedupe process is tweaked to the following:
The Kafka partition + offset is what we refer to as the “owner id” in our system — it’s basically which message in Kafka owns the transaction id that’s being processed. This allows ETL jobs to get retried multiple times if failures are encountered.
Other solutions, such as a memory-mapped hashmap in the Spark ETL process or use of our Postgresql cluster, were considered. However, each fell short in different areas ranging from difficulty in scaling / providing high availability to ensuring efficient performance and data storage.
All things considered, Redis proved to be the best fit for our needs.
As mentioned earlier, we wanted a solution that allowed us to detect duplicate messages that were 12–24 hours apart. With 2 million messages / minute, memory quickly becomes a valuable resource. As a result, it was important for us to store data in the most effective manner in order to make best use of the available memory in Redis.
In the case of Tapjoy’s analytics pipeline, message transaction ids are UUIDs and are mapped to the Kafka partition + offset when it was first seen (e.g. partition 3, offset 81418110694).
In a naive implementation, the UUID keys are 36 bytes (32 characters with 4 dashes) and the values that represent the Kafka owner id are a comma-delimited string like “1,64942845052” (13 bytes). With an approximate 64 byte overhead per key in Redis, this would leave the amount of memory required for 24 hours of data to be:
(overhead + key_bytes + value_bytes) * keys_per_minute * 60 * 24
(64 + 36 + 13) * 2,000,000 * 60 * 24 = 300GB
That’s going to cost a lot to have that memory, so let’s see if we can do better…
To get our first improvement in memory efficiency, consider the best way to represent the keys / values being stored.
Under the hood, UUIDs are actually made from 16 bytes of information. If we can take those 16 bytes and convert them to their binary representation, this would result in more than a 50% reduction in memory usage of just the key data. This would mean that instead of representing a UUID as “ce059644–18a0–4f27-bc2b-c2a2d4d4e7bf”, we could represent it as “\xbf@\xd4\x91V&IG\x9f5\x9a\xf9\x16K\x9b\xc8”.
This translation can be done like so:
val uuid = java.util.UUID.fromString(“ce059644–18a0–4f27-bc2b-c2a2d4d4e7bf”)
val hi = uuid.getMostSignificantBits
val lo = uuid.getLeastSignificantBits
// => Array(-50, 5, -106, 68, 24, -96, 79, 39, -68, 43, -62, -94, -44, -44, -25, -65)
Using this as a reference, we can use a similar algorithm to represent the owner id. We know the owner id is a combination of the partition id and the offset id. As a result, the question becomes: what’s the smallest number of bytes we need to represent both of those values?
Partitions are represented in Kafka as integers (4 bytes), but our maximum partition number is only 6. If we assume that we’ll never go above 32,767 partitions for a single Kafka topic, then we can represent the partition in 2 bytes (2¹⁵ – 1).
Offsets are represented in Kafka as longs (8 bytes), but the only requirement that we have is that we don’t see the same offset value within a partition over a 24 hour period. The reason we have this requirement is that we only intend on storing up to 24 hours worth of message ids in Redis. Therefore, if we end up effectively reusing offset values, it’s okay as long as it happens outside of that 24 hour window.
Given the above, if we target 6 bytes to represent the Kafka offset, this means we’ll rollover every (2⁴⁷ – 1) messages (281,474,976,710,656 messages). This is way beyond what we normally see in a 24 hour period (we see about 2B messages).
This means our owner id effectively gets calculated like so:
ByteBuffer.allocate(2).putShort(partition.toShort).array ++ ByteBuffer.allocate(8).putLong(offset).array.slice(2, 8)
The end result is that instead of requiring 13 bytes to represent Partition 1 / Offset 64942845052, it only requires 8 bytes.
Given all of this, the new calculation is:
(per_key_overhead + key_bytes + value_bytes) * keys_per_minute * 1440
(64 + 16 + 8) * 2,000,000 * 60 * 24 = 230GB
Great, we’ve saved ourselves 70GB — but the amount of memory being stored is still pretty high! Let’s see if we can do any better…
At this point, the only thing we could potentially do to reduce the amount of memory required is reduce the number of keys that we’re actually storing. At first blush, this seems impossible — if we need to store 2B unique message ids in Redis, then how can you reduce the number of keys? This is where Redis’s Hash operations come into play (e.g. `HSET`, `HGET`, `HSETNX`, etc.).
Our original inspiration for using Redis’s Hash data type to reduce the total number of 1st-level Redis keys being stored came from a blog post by the Instagram Engineering team ([previously posted here](http://instagram-engineering.tumblr.com/post/12202313862/storing-hundreds-of-millions-of-simple-key-value)). Under the hood, Redis will try to use a [ziplist](http://download.redis.io/redis-stable/src/ziplist.c) data structure to store the keys/values for Hashes. This structure is significantly more memory efficient than using normal Redis keys if you don’t have different TTL requirements for each key.
To utilize Redis Hash keys, we effectively need to create buckets of message ids. In our case, we can bucket messages using a timestamp within the message. We know through our own testing, and Instagram’s own results, that we see diminishing returns with buckets containing more than 1,000 keys. We also don’t want to exceed the [hash-max-zipmap-entries](https://redis.io/topics/memory-optimization) value since the data would no longer be stored as a ziplist. In evaluating memory usage, we find that storing between 100 and 1,000 keys per bucket is ideal. This gives us room to grow without having too much of an impact on memory consumption.
If we target 100 keys per bucket, the next piece is to figure out how many buckets we’re going to need. That calculation is:
buckets = keys_per_minute / keys_per_bucket
buckets_per_min = 2,000,000 / 100 = 20,000
buckets_per_sec = 20,000 / 60 = 333
We can now run our new calculation for how much memory will be consumed:
((per_key_overhead + key_bytes + value_bytes) * keys_per_minute) * 1440 +
((per_bucket_overhead + bucket_key_bytes) * buckets_per_minute * 1440
((1 + 16 + 8) * 2,000,000) * 60 * 24 +
((64 + 14) * 20,000 * 60 * 24
= (72,000,000,000 + 2,246,400,000) = 69GB
In the above formula:
per_bucket_overheadis the same as the
per_key_overheadused in previously calculations
per_key_overheadis now the number of additional bytes Redis’s Hash data type requires to store the key
With this change, we get our biggest savings — we’re now at 30% of previous usage!
Alright, now that we’ve got our technology and we know how we’re storing the data, the next piece is to figure out how we do it fast.
Recall we’re processing 2M messages per minute in our Spark cluster. Relative to everything else that happens in our ETL every minute, we have to make sure the dedupe process doesn’t add too much time to our jobs. If deduping was too expensive, our ETL could start falling behind.
For our system, we wanted to target a total time of 1s to be spent generating requests for Redis, storing the data, and removing the dupes every minute.
Despite the fact that we’re using this bucketed approach above, we still need to write 2M keys to Redis. The naive implementation here is to send 2 million
HGET commands to Redis (see HSETNX). For example:
message_ids.each do |message_id, owner_id|
client.HSETNX(message_id, owner_id) # claim owner
client.HGET(message_id) # determine winner
As you might imagine, this is pretty slow. This requires 4M separate commands to be processed by Redis and 4M responses to be processed by the client. Even if you were to shard the buckets across multiple Redis servers and parallelize commands, the performance is unacceptable.
For 2M messages sharded across 3 Redis instances, it takes about 26s. Way beyond what we’re trying to target.
Redis offers pipelining that allows commands to continue to get processed by Redis even while the client is reading the results. As documented by Redis, this can have a pretty good impact on your performance. For example:
client.pipeline do |pipeline|
message_ids.each do |message_id, owner_id|
pipeline.HSETNX(message_id, owner_id) # claim owner
pipeline.HGET(message_id) # determine winner
If we take the same approach as above, but with the commands sent via a pipeline instead, it takes about 3s to process that same 2M messages. Pretty good! — about an 8x performance improvement, though still above our target.
Redis supports Lua scripting that allows us to run logic as close to the data as possible (within the Redis process itself!) and control what actually gets returned to the client. The reasons Lua scripting might offer us a benefit here are:
The dedupe lua script we landed on ended up looking like so:
if existing owner matches or not set
By switching to a Lua script, we bring the time required down to 1.4s — about as close as we’re going to get to our target!
Update: Expanded on this to explain our need for sharding
So far we’ve built a solution that addresses our needs today — great! At this point, the question is: how do we architect this solution to allow us to scale to 10x the traffic we’re handling today? Instead of 2 million messages / minute, how could we meet our memory and performance requirements while processing 20 million messages / minute? This is where sharding comes into play.
First, let’s talk about memory requirements. If we assume that 10x the amount of traffic is roughly equivalent to 10x memory usage over a 24 hour period, we’re suddenly talking about storing 700GB worth of data in Redis. Since we use AWS, the largest possible instance available in Elasticache only has 237GB. This means we need a way to be able to store message ids across multiple Redis instances — and always have the same message ids stored on the same Redis instance.
At the same time, we don’t necessarily want to scale up 237GB at a time. Instead, we want to add capacity as we need it — which means using smaller servers. This helps us keep costs down as much as we can. In a sharding solution, by simply adding new Redis clusters to increase memory capacity, we’ve effectively created limitless capacity to store messages.
As we scale, there are going to be some concerns around performance. If we scale out to 10x traffic, we expect the processing time to dedupe all of those messages to be 10x as well (14 seconds instead of 1.4 seconds!). To meet our real-time needs, this is unacceptable.
The naive solution is to simply parallelize requests to our Redis cluster. However, there’s a big gotcha here: Redis is single threaded. This means, at any time, Redis is only able to process a single request at a time. To truly parallelize requests to Redis and gain the performance benefits of doing so, we need to run requests across multiple Redis clusters.
So what does a sharding solution like this actually look like? Let’s say, for example, we’re going to use 3 Redis servers and 300 bucket keys per second. Based on the contents of a message, we’re going to have to choose a Redis instance / bucket shard to store the message id. To choose a shard, you typically use a hashing function. For example:
If the number of shards changes, then the hashing function is going to start storing messages in different shards than we were before.
In order to account for this scenario, we effectively have the concept of a “previous” Redis configuration and a “current” Redis configuration. “Previous” configurations apply to all messages with a timestamp prior to some maximum time. Any messages with transaction times *after* that maximum time will start using the “current” (new) configuration. This allows us to effectively transition from one cluster to another or from one hashing algorithm to another without losing any of the data we’ve stored from previous hours.
Deduping at scale is a hard problem and there are certainly other effective solutions employed elsewhere in the industry. In fact, even the folks at Kafka are working on a standard solution. Typically you hope that either your actions are idempotent or accuracy isn’t so critical that a complex system like this is necessary. In our case, this was important to ensuring the integrity of our analytics pipeline.
Hopefully, this post serves as a helpful example of tackling a thorny engineering problem (“Oh no, I have a huge pipeline of data flying around and I can’t keep track of whether I’ve seen this before!”) in a deliberate way.