When you need to avoid duplicate delivery in Kafka, or keep state consistent across multiple topics, simple retries and ad-hoc deduplication only get you so far. The official Exactly-Once Semantics (EOS) combine producer idempotence with transactions to deliver processing without duplicates or losses.
This article walks through the topics that show up most often on CCDAK (Confluent Certified Developer for Apache Kafka) — transactional.id, the idempotent producer, read_committed, and sendOffsetsToTransaction — from both an implementation and a design perspective. The content is grounded in the official Kafka documentation and stable concepts.
Kafka's Exactly-Once Semantics (EOS) is built in two layers. The first layer is the idempotent producer (enable.idempotence=true), which removes duplicates from a single producer to a single partition. The second layer is transactions, which enable atomic writes across multiple partitions and topics, and let you commit consumer offsets in the same transaction.
Idempotence alone cannot guarantee consistency across producer restarts or multiple partitions. With a producer that has a transactional.id, you can commit or abort the messages and consumed offsets together as a single unit of work, and a read_committed consumer never sees an in-between state.
transactional.id is a stable identifier for what is conceptually the same logical producer. The transaction coordinator on the broker tracks a producer epoch per transactional.id to keep old and new producers consistent. When a producer restarts after a failure with the same transactional.id, it gets a new epoch and the old producer is fenced (invalidated).
The design priorities are scale and collision avoidance. It is not enough to make the ID unique per instance — assign it deterministically 1:1 with a unit of work (e.g. an input partition or a Kafka Streams task) so the same transactional.id is reused across rebalances and restarts and EOS is preserved. Never generate it randomly.
The minimal transaction API flow is initTransactions -> beginTransaction -> send records -> (if needed) sendOffsetsToTransaction -> commitTransaction (or abortTransaction). This writes commit markers to every involved partition, and a read_committed consumer sees only the committed records.
sendOffsetsToTransaction is the call that pulls consumer offsets into the same transaction across a read -> process -> write flow. Disable enable.auto.commit and let the application drive offset commits.
Transaction flow (conceptual)
A read_committed consumer sees only committed records.
Minimal Java producer (transactional) example
Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Idempotence and transactions
p.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
p.put(ProducerConfig.ACKS_CONFIG, "all");
// transactional.id must be stable and unique per unit of work
p.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "orders-task-3");
// Too large a value will be rejected by the broker limit
p.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);
try (KafkaProducer<String, String> producer = new KafkaProducer<>(p)) {
producer.initTransactions();
// Example consumer config (in the same process)
Properties c = new Properties();
c.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
c.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
c.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
c.put(ConsumerConfig.GROUP_ID_CONFIG, "orders-g");
c.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
c.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(c);
consumer.subscribe(Collections.singletonList("orders-in"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
if (records.isEmpty()) continue;
producer.beginTransaction();
for (ConsumerRecord<String, String> r : records) {
// Transform and send to a different topic
ProducerRecord<String, String> out = new ProducerRecord<>("orders-out", r.key(), r.value());
producer.send(out);
}
// Commit offsets within the same transaction
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> part = records.records(tp);
long lastOffset = part.get(part.size() - 1).offset();
offsets.put(tp, new OffsetAndMetadata(lastOffset + 1));
}
producer.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata("orders-g"));
producer.commitTransaction();
}
} catch (ProducerFencedException e) {
// A newer producer with the same transactional.id is already running -> stop this process and reassign
throw e;
} catch (KafkaException e) {
// On failure, abort and retry
// producer.abortTransaction(); // call at the appropriate place
throw e;
}To complete EOS, set isolation.level=read_committed on the consumer so that uncommitted and aborted records are skipped. Leaving it at the default read_uncommitted exposes in-progress state and leads to duplicate processing or inconsistencies.
Disable auto commit and pull offsets into the producer's transaction via sendOffsetsToTransaction. That way the successful write and the offset advance commit together, so the processing never gets torn in half.
transaction.timeout.ms is the maximum lifetime of a transaction. Past this point the broker aborts it. The client-side value can never exceed the broker's transaction.max.timeout.ms, so avoid huge transactions and split processing into small batches.
Transaction metadata is held in an internal topic, and commit/abort markers are also written to each data partition. A read_committed consumer filters aborted records so they stay invisible. On failover, when a new producer starts with the same transactional.id, the old producer is fenced (ProducerFencedException).
CCDAK keeps asking about the difference between idempotence and transactions, what transactional.id means, what read_committed does, and the purpose of sendOffsetsToTransaction. Don't just memorize options — being able to articulate which requirement calls for which feature is what earns you the points.
It also tests transactional.id design with multiple instances (stable, deterministic, unique) and how broker limits reject overly long transaction timeouts. Randomly generated transactional.id, auto commit left on, and leaving read_uncommitted in place are the classic wrong answers.
| Delivery guarantee | Requirements (essentials) | What is visible |
|---|---|---|
| At-most-once | Auto commit before send, or commit-before-send | Losses possible, but no duplicates |
| At-least-once | Retries plus manual commit (no transactions) | Duplicates possible |
| Exactly-once (EOS) | Idempotence + transactions + read_committed + sendOffsetsToTransaction | Committed records only (aborted records hidden) |
CCDAK
問題 1
You want to implement a flow that reads from orders-in, transforms records, writes to orders-out, and advances the offset, all with Exactly-Once delivery. The app fails over to another instance on failure. Which design is most appropriate?
正解: B
EOS requires idempotence + transactions + read_committed, and offsets must be folded into the same transaction via sendOffsetsToTransaction. Assign transactional.id stably and uniquely per unit of work, reuse it across restarts, and rely on fencing to invalidate the old process on failover. A loses EOS because the ID changes; C can expose in-progress state; D collides and either gets fenced or causes inconsistencies.
Does the idempotent producer alone give you Exactly-Once?
No. Idempotence only deduplicates writes to a single partition. If you need atomicity across multiple partitions or topics, or to tie in consumer offsets, you must combine transactions backed by a transactional.id with a read_committed consumer.
Is it safer to set a longer transaction.timeout.ms?
Not necessarily. Any value above the broker's transaction.max.timeout.ms is rejected, and long-running transactions inflate lock, metadata, and retry costs while making aborts more expensive to roll back. The recommended pattern is to repeat begin -> commit in small units.
What happens if an old producer with the same transactional.id keeps running?
When a new producer starts with the same transactional.id, it is assigned a new epoch and the old producer receives ProducerFencedException, failing any further operations. This is fencing. The correct response is to stop or reinitialize the old process immediately.
Practice with certification-focused question sets
無料で問題を解いてみるNicheeLab Editorial Team
NicheeLab editorial team focused on data engineering and cloud certification learning. Content is structured around practical study needs and official exam domains.
Kafka Topics & Partitions: Distribution Fundamentals (2026)
How Kafka topics and partitions enable scale — ordering guar...
CCDAK Exam Guide: Confluent Certified Developer (2026)
Complete prep for the CCDAK exam — Producer/Consumer API, St...
CCAAK Exam Guide: Confluent Certified Administrator (2026)
Pass the CCAAK exam — cluster management, partitions, securi...
Kafka Replicas & ISR: Fault Tolerance Explained (2026)
Replica placement, in-sync replicas (ISR), leader election. ...
Kafka Offsets: Commit Modes & Consumer Position (2026)
Offset semantics — auto vs. manual commit, __consumer_offset...