
Transactions have been supported in Apache Kafka® for several years now. Obviously, the concept of transactions has been around in relational databases for several decades. Even though transactions in Kafka are similar to transactions in relational databases, there are some notable differences. Let's start studying Kafka transactions by looking at the picture below, where we show just one partition to keep things simple. Even though we only show one partition, a transaction in Kafka can span several topics and partitions. In our example, there are nine messages in the partition with the offset of each message shown inside the small square in the top left-hand corner.
In the picture we see two transactions, one with two messages (M1-M2) and another one with 5 messages (M3-M7). Each transaction ends with what is known as a control message (more details on that later). The control message indicates whether the transaction was committed or rolled back (aborted). In the picture we do not show if the transactions were committed or aborted as the messages would be identical in either case (except the contents of the control message). The key takeaway from the picture is that, all messages of a transaction are returned to the consumers, or none at all. For example, it is not possible that a consumer can read message M1 but not M2.
Another important thing to notice is that even when a transaction is aborted, its messages are still stored in the partition log. They are not magically erased or deleted from the log.
The first step in adding support for transactions in your application is to create a transactional producer. When you create a new producer, by default it will NOT be transactional. This is to avoid unnecessary overhead in your system unless your application really requires transactionality. Creating a transactional producer is really easy, though. All you need to do is to add ProducerConfig.TRANSACTIONAL_ID_CONFIG config to your properties.
Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer-123");
Depending on your application, picking a good transactional id may not be so easy though. Some of the key requirements for the transactional id are the following
The last requirement is the hardest to implement, especially if you have many producer instances running on multple hosts producing to the same topic. In other words, you can't just pick a random string a be done with it. If you have more than one transactional producer, you cannot use a fixed string either.
It should also be mentioned that having a transactional producer requires 'exactly once' delivery of messages. This is controlled by the ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG property, which is set to 'true' by default so you should not have to worry about it.
After creating the transactional producer, you must call initTransactions to initialize the state of the producer. This must only be done once after creating the producer, no matter how many transactions your code will handle.
For each transaction, you must first call beginTransaction, then add some records and finally call commitTransaction to commit the messages. When to call the commit method really depends on your application logic, i.e. what records must appear together or not at all.
KafkaProducerproducer = new KafkaProducer<>(props); producer.initTransactions(); producer.beginTransaction(); producer.send(new ProducerRecord<>("mytopic", "key1", "value1")); producer.send(new ProducerRecord<>("mytopic", "key2", "value2")); producer.commitTransaction();
As in most transactional systems, you should keep transactions as short and quick as you possibly can without breaking your application logic. You should always try to avoid calling any external systems (e.g. databases or other services) inside the transaction. As blocking I/O is orders of magnitude slower than running local code, it will expand the duration of the transaction considerably. Ideally you should call the external system first, then start the transaction, add your records and finally commit. Naturally, making external calls outside the transaction boundaries is not always possible, but it is something you should aim for.
In some cases you do not want to commit the records you have already added, but abort your transaction instead. Probably the most common case is if your code throws an exception, a situation where you would typically want to roll back the entire transaction. Another example is if your application logic encounters some condition where you do not want to proceed with the remaining records and discard the already-added records.
With the Kafka producer, all you have to do is to call the abortTransaction method and the whole transaction will be rolled back.
KafkaProducerproducer = new KafkaProducer<>(props); producer.initTransactions(); producer.beginTransaction(); producer.send(new ProducerRecord<>("mytopic", "key1", "value1")); producer.send(new ProducerRecord<>("mytopic", "key2", "value2")); if(someCondition()) producer.abortTransaction();
The concept of isolation is very important in transactional systems, as it's one of the four ACID properties. In a nutshell, isolation dictates how transactions of one user are visible to other users. There exists three different read phenomena that are generally unwanted (dirty reads, non-repeatable reads and phantom reads) and the isolation level tells which ones are possible and which ones are not. There are several isolation levels defined for database systems, but Kafka consumers only support two of them.
You set the consumer isolation level using the ISOLATION_LEVEL_CONFIG property. If you have no transactional producers, then the value of the configuration does not matter. If you do have transactional producers and your isolation level is set to read_committed, your consumer will only see messages that have been committed. Transactions that have been aborted or are still pending will not be visible to the consumer.
As mentioned earlier, when transactional producers are used, your partition logs will contain control messages in addition to your regular messages. The control messages are used for indicating when a transaction has been completed (aborted or committed). These control messages will never be returned to the application (by standard client libraries) so you cannot actually see them. However, you can actually verify their existence by using standard Kafka tools. For example, write the following code and run it against a newly created empty topic.
KafkaProducerproducer = new KafkaProducer<>(props); producer.initTransactions(); producer.beginTransaction(); producer.send(new ProducerRecord<>("mytopic", "tx1 key1", "tx1 value1")); producer.send(new ProducerRecord<>("mytopic", "tx1 key2", "tx1 value2")); producer.commitTransaction(); producer.beginTransaction(); producer.send(new ProducerRecord<>("mytopic", "tx2 key1", "tx2 value1")); producer.send(new ProducerRecord<>("mytopic", "tx2 key2", "tx2 value2")); producer.commitTransaction();
If you now look at the messages in the gradient fox web console, you will see the following messages.
As you can see, there is a gap (offset 2) between the offsets of the first and the second transaction. That 'gap' is actually the control message. You do not see the gap for the second control message, as the second control message is the last message in the partition.
If you now add one more message in the gradient fox console, you will see where the control message of the second transaction is located as well (offset 5).
There are several basic mistakes you might make when you first start writing your application code. Below is a list of some the common error messages you could encounter in your application logs.