Inspired by an excellent post by @tyler_treat, You Cannot Have Exactly-Once Delivery, I wanted to dive deeper into exactly once processing, specifically in the context of Apache Storm and Trident. Reading Tyler's post and the subsequent Hacker News discussion, I am reminded of this quote by Dan Ariely, which although aimed at describing how enigmatic big data is, I believe applies to exactly-once processing as well:
Big data is like teenage sex: everyone talks about it, nobody really knows how to do it, everyone thinks everyone... http://t.co/tREI1mRQ— Dan Ariely (@danariely) January 6, 2013
So, why is exactly-once that important anyway? And why is it impossible to achieve? To understand this, let's imagine a group of soldiers advancing on an enemy position. It appears the enemy position is strongly fortified, so the soldiers send a message to ask for artillery assistance that states "Fire 20 Degrees South." No acknowledgement is received, so again they send a message "Fire 20 Degrees South." Well, if it turns out the original message was received, the artillery fire will now have understood to aim 20 degrees South twice, or 40 degrees - which would be very unfortunate. In this example, there is no way to know if the original message delivery failed or the acknowledgement failed.
Storm & Trident Background
If you are familiar with Storm and Trident you can skip over the next
Storm's documentation showcases that Trident will allow you to achieve exactly-once processing. This is quite a claim, let’s examine.
Storm is an open source distributed real time stream processing platform. Storm has a scalable architecture and strong fault tolerance mechanisms, that enable processing of unbounded data streams at scale. Storm guarantees that every event will be processed "at least once". On top of storm, Trident provides a higher level abstraction, that theoretically guarantees exactly once processing.
Below are some key concepts in Storm and Trident, most are excerpts directly from the documentation. Some of the terms are used both for Trident and for Storm. Note that in this post, I will always refer to these terms in Trident's context.
- Tuple - The tuple is the main data structure in Storm. A tuple is a named list of values, where each value can be of any type.
- Spout - A spout is a source of data stream. Generally spouts will read tuples from an external source and emit them.
- Function - A function takes in a set of input fields and emits zero or more tuples as output. The fields of the output tuple are appended to the original input tuple in the stream.
- State - A state is a way to persist or aggregate a stream in a backing store.
In the scope of this blog post, we'll only consider states that are designed to
persist the data, e.g. to a database or to a queue (these states are used by
A key difference between Trident and Storm is that Trident processes the stream
in mini batches, referred to as "transactions.” This is different from Storm in
that Storm performs tuple-by-tuple processing. This concept is very similar to
database transactions; every transaction is assigned a transaction ID. Once all
processing for a transaction is successfully completed, the transaction is
considered successful, however, a failure in processing one of the transaction's
tuples will cause the entire transaction to be retransmitted. For each State,
Trident will call
beginCommit at the beginning of the transaction, and
commit at the end of it.
Exactly once processing with Storm and Trident
If we look at the image below from Trident's documentation ("yes" means exactly-once guaranteed), it seems fairly simple to achieve exactly-once semantics with Trident. All we need is a transactional spout and a transactional state and poof, exactly-once processing will automagically occur.
Unfortunately, this is not the case.
It is very difficult to write a transactional state. Even in Storm's own GitHub repository, there isn't a single transactional persisting state! (As stated above, Map States are outside from the scope of this blog post, but they will be referenced later.)
Some googling reveals that there have been attempts to write transactional persisting states. Take for example KafkaState, which is also referenced in the Storm Real-Time Processing Cookbook (both written by the same author). However, using this will not actually provide exactly-once processing. In this specific example, if something is retransmitted after it has been written to Kafka, it will be written again, causing unwanted duplication.
Diving deeper, even if the backing store supports transactions, we still cannot write a proper transactional state.
Let’s assume we want to persist our data stream to a transactional database.
Theoretically, it should be very simple. When
beginCommit is called, we will
start a transaction. Every time
updateState is called, we will write the tuple
into the database (or just keep the tuples in memory and write them as a batch
before committing). Finally, when
commit is called, we will commit the
changes to the database.
But in practice this implementation does not work.
The first problem arises when trying to distribute the state in order to increase throughput. In doing so, The state's commit is now called several times. But wait! When should we commit the transaction to the database? There is no way to commit all of the transactions from the different threads atomically. Moreover, even the single threaded implementation is flawed, as we cannot both commit to the database and send the ack back atomically. So we may fail after committing to the database and the transaction will be repeated by Trident -- and here again we get unwanted duplication.
Fake it 'til you make it
At this point, we are at a pretty rough spot. We cannot guarantee exactly-once semantics, but there are real life scenarios where this is very important. Idempotency to the rescue! In the artillery example, an idempotent protocol would define the message to be: "Aim to Azimuth 285 degrees.” What’s different about this message? It is an absolute rather than a relative measurement meaning the resulting action would be the same regardless of the number of times the message gets processed.
Fortunately, we can use idempotent states to fake exactly-once semantics. This was already stated both by Tyler and by Trident's documentation -- all of the transactional state examples are in fact idempotent states. Going back to Storm's GitHub repository, transactional Map States do exist. With a key-value store, it's possible to store a value mapped to a key and its transaction ID. In case of retransmission, the processing can be repeated with the value from the previous transaction. These are also, in fact, idempotent states.
But writing idempotent states, while possible, is no easy task. A simple example from non-distributed systems is the TCP protocol, which guarantees "exactly once.” TCP guarantees that regardless of packet loss or retransmissions, the application will receive every byte sent exactly once. TCP uses sequence numbers and strong ordering for idempotency. The illustration below demonstrates how TCP implements idempotency.
There is one sender thread and one receiver thread. The sender appends sequence number and length to each message, so the receiver knows which sequence numbers were already processed. If a message with past sequence number is retransmitted, it will be ignored by the receiver. This is a very simple implementation, but it is indeed idempotent (the entire Internet is based on this idempotency).
An example of a distributed idempotent backing store would be a transactional
database that enforces primary key uniqueness. More specifically, writing to a
MySQL table with a primary key and using
INSERT ... IGNORE guarantees one record
per key, regardless of how many times this key was written.
Another example of an idempotent distributed store is Kafka with log compaction. If you are reading up to this point you are probably using Kafka (f you are not, I highly recommend it!). Kafka is a distributed commit log. With log compaction it guarantees that if messages are written with keys, each key is unique in the tail of the log. Let’s assume we use KafkaState and we are already using transactional spout (see illustration below). Additionally, assume our function is adding a unique key for each tuple, which is also deterministic, i.e. the same key is always generated for the same tuple. Then, the number of times we write this tuple to Kafka does not have any impact. If the red tuples in the illustration below were retransmitted and written twice into the topic, log compaction guarantees that the tail of the log would contain every tuple exactly once.
The irony of transactional states is that the whole concept of "use transactional state to guarantee exactly once" is irrelevant with idempotent states.
While it is true that transaction ID can be helpful, especially for making stateful aggregations, when writing an idempotent state, it is not always necessary. The above example of KafkaState that writes to a topic with log compaction enabled is an example of utilizing idempotent state without using transaction IDs. Therefore, let’s revisit the image from trident documentation now adding idempotent state methods. Now, the state does not need to be transactional in order to fake exactly-once semantics:
So let’s put this all together. Using Storm without Trident and with an idempotent bolt instead (e.g. one that writes to a compacted Kafka topic), would provide stronger guarantees than using Trident with a non-transactional spout. This makes sure Storm's at-least once guarantee becomes exactly-once with an idempotent bolt, whereas Trident can only guarantee at-most once with a non-transactional spout.
In short, Storm and Trident are no exception to exactly-once semantics -- it’s just not possible. It can, however, be "faked" thanks to idempotency.