Kafka

Kafka Transactions in Practice: Exactly-Once Delivery and Designing transactional.id

2026-04-19
NicheeLab Editorial Team

When you need to avoid duplicate delivery in Kafka, or keep state consistent across multiple topics, simple retries and ad-hoc deduplication only get you so far. The official Exactly-Once Semantics (EOS) combine producer idempotence with transactions to deliver processing without duplicates or losses.

This article walks through the topics that show up most often on CCDAK (Confluent Certified Developer for Apache Kafka) — transactional.id, the idempotent producer, read_committed, and sendOffsetsToTransaction — from both an implementation and a design perspective. The content is grounded in the official Kafka documentation and stable concepts.

EOS Basics: How the Idempotent Producer and Transactions Fit Together

Kafka's Exactly-Once Semantics (EOS) is built in two layers. The first layer is the idempotent producer (enable.idempotence=true), which removes duplicates from a single producer to a single partition. The second layer is transactions, which enable atomic writes across multiple partitions and topics, and let you commit consumer offsets in the same transaction.

Idempotence alone cannot guarantee consistency across producer restarts or multiple partitions. With a producer that has a transactional.id, you can commit or abort the messages and consumed offsets together as a single unit of work, and a read_committed consumer never sees an in-between state.

  • The idempotent producer guarantees per-partition deduplication of resent messages
  • Transactions provide atomicity across multiple partitions and topics
  • Completing EOS requires isolation.level=read_committed on the consumer side
  • Include consumer offsets in the transaction via sendOffsetsToTransaction
  • CCDAK loves to test the distinction: idempotence = single partition; transactions = multiple partitions plus offset coordination

Designing transactional.id and Fencing

transactional.id is a stable identifier for what is conceptually the same logical producer. The transaction coordinator on the broker tracks a producer epoch per transactional.id to keep old and new producers consistent. When a producer restarts after a failure with the same transactional.id, it gets a new epoch and the old producer is fenced (invalidated).

The design priorities are scale and collision avoidance. It is not enough to make the ID unique per instance — assign it deterministically 1:1 with a unit of work (e.g. an input partition or a Kafka Streams task) so the same transactional.id is reused across rebalances and restarts and EOS is preserved. Never generate it randomly.

  • Stable and deterministic: reuse the same transactional.id for the same unit of work across restarts
  • Unique: never collide across parallel producers
  • Fencing: launching a new producer with the same transactional.id throws ProducerFencedException on the old one
  • Expiration: an idle transactional.id is eligible for cleanup after the broker-side transactional.id.expiration.ms
  • Broker constraint: the client's transaction.timeout.ms cannot exceed the broker's transaction.max.timeout.ms

API Workflow: init / begin / send / sendOffsetsToTransaction / commit

The minimal transaction API flow is initTransactions -> beginTransaction -> send records -> (if needed) sendOffsetsToTransaction -> commitTransaction (or abortTransaction). This writes commit markers to every involved partition, and a read_committed consumer sees only the committed records.

sendOffsetsToTransaction is the call that pulls consumer offsets into the same transaction across a read -> process -> write flow. Disable enable.auto.commit and let the application drive offset commits.

  • Call initTransactions once at process startup — it handshakes with the transaction coordinator
  • Keep the beginTransaction to commit/abort scope tight. Avoid overly long transactions
  • On failure, call abortTransaction to discard in-flight writes
  • Send order is preserved; idempotence makes retries safe
  • With consumer isolation.level=read_committed, aborted records stay invisible

Transaction flow (conceptual)

App (Producer with transactional.id)initTransactions / beginTransactionTransaction CoordinatorTopic AP0..PnTopic BP0..PmOffsets(__consumer_offsets)Transaction flow (commit markers are atomic)

A read_committed consumer sees only committed records.

Minimal Java producer (transactional) example

Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Idempotence and transactions
p.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
p.put(ProducerConfig.ACKS_CONFIG, "all");
// transactional.id must be stable and unique per unit of work
p.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "orders-task-3");
// Too large a value will be rejected by the broker limit
p.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);

try (KafkaProducer<String, String> producer = new KafkaProducer<>(p)) {
    producer.initTransactions();

    // Example consumer config (in the same process)
    Properties c = new Properties();
    c.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    c.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    c.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    c.put(ConsumerConfig.GROUP_ID_CONFIG, "orders-g");
    c.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    c.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(c);
    consumer.subscribe(Collections.singletonList("orders-in"));

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
        if (records.isEmpty()) continue;

        producer.beginTransaction();
        for (ConsumerRecord<String, String> r : records) {
            // Transform and send to a different topic
            ProducerRecord<String, String> out = new ProducerRecord<>("orders-out", r.key(), r.value());
            producer.send(out);
        }
        // Commit offsets within the same transaction
        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
        for (TopicPartition tp : records.partitions()) {
            List<ConsumerRecord<String, String>> part = records.records(tp);
            long lastOffset = part.get(part.size() - 1).offset();
            offsets.put(tp, new OffsetAndMetadata(lastOffset + 1));
        }
        producer.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata("orders-g"));
        producer.commitTransaction();
    }
} catch (ProducerFencedException e) {
    // A newer producer with the same transactional.id is already running -> stop this process and reassign
    throw e;
} catch (KafkaException e) {
    // On failure, abort and retry
    // producer.abortTransaction(); // call at the appropriate place
    throw e;
}

Consumer Settings and Tying Offsets into the Transaction

To complete EOS, set isolation.level=read_committed on the consumer so that uncommitted and aborted records are skipped. Leaving it at the default read_uncommitted exposes in-progress state and leads to duplicate processing or inconsistencies.

Disable auto commit and pull offsets into the producer's transaction via sendOffsetsToTransaction. That way the successful write and the offset advance commit together, so the processing never gets torn in half.

  • isolation.level=read_committed (consumer)
  • enable.auto.commit=false so the app controls commits
  • Don't forget sendOffsetsToTransaction(offsets, groupMetadata)
  • Design for rebalance resilience: tie transactional.id to a task or input partition
  • When integrating with external sinks (e.g. databases), avoid two-phase commit — either keep EOS bounded to Kafka -> Kafka, or design a separate transaction strategy on the external side

Operations: Timeouts, Monitoring, and Failure Recovery

transaction.timeout.ms is the maximum lifetime of a transaction. Past this point the broker aborts it. The client-side value can never exceed the broker's transaction.max.timeout.ms, so avoid huge transactions and split processing into small batches.

Transaction metadata is held in an internal topic, and commit/abort markers are also written to each data partition. A read_committed consumer filters aborted records so they stay invisible. On failover, when a new producer starts with the same transactional.id, the old producer is fenced (ProducerFencedException).

  • Monitoring focus: TransactionCoordinator/Producer metrics (commit/abort counts, errors, latency)
  • Understand transactional.id.expiration.ms and plan for the GC of unused IDs
  • Abort policy: call abortTransaction explicitly and draw clear retry boundaries
  • Batch granularity: size so the batch finishes within the timeout
  • SLA: EOS is great but has overhead — apply it only to flows that truly need it

Exam Prep and Pitfalls (CCDAK)

CCDAK keeps asking about the difference between idempotence and transactions, what transactional.id means, what read_committed does, and the purpose of sendOffsetsToTransaction. Don't just memorize options — being able to articulate which requirement calls for which feature is what earns you the points.

It also tests transactional.id design with multiple instances (stable, deterministic, unique) and how broker limits reject overly long transaction timeouts. Randomly generated transactional.id, auto commit left on, and leaving read_uncommitted in place are the classic wrong answers.

  • Idempotence alone: dedup from a single producer to a single partition
  • Transactions: atomic writes across multiple partitions plus simultaneous offset commits
  • read_committed: hides aborted records
  • transactional.id: stable identifier and the key to fencing
  • Make sure you understand the relationship between send order, retries, and acks=all
Delivery guaranteeRequirements (essentials)What is visible
At-most-onceAuto commit before send, or commit-before-sendLosses possible, but no duplicates
At-least-onceRetries plus manual commit (no transactions)Duplicates possible
Exactly-once (EOS)Idempotence + transactions + read_committed + sendOffsetsToTransactionCommitted records only (aborted records hidden)

Check Your Understanding

CCDAK

問題 1

You want to implement a flow that reads from orders-in, transforms records, writes to orders-out, and advances the offset, all with Exactly-Once delivery. The app fails over to another instance on failure. Which design is most appropriate?

  1. Generate a random transactional.id on every startup and set only enable.idempotence=true
  2. Assign and reuse a fixed transactional.id per unit of work (e.g. input partition), use idempotence + transactions, and configure sendOffsetsToTransaction and read_committed
  3. Leave enable.auto.commit=true and manually send to orders-out after processing
  4. Share the same transactional.id across multiple instances and send concurrently to boost throughput

正解: B

EOS requires idempotence + transactions + read_committed, and offsets must be folded into the same transaction via sendOffsetsToTransaction. Assign transactional.id stably and uniquely per unit of work, reuse it across restarts, and rely on fencing to invalidate the old process on failover. A loses EOS because the ID changes; C can expose in-progress state; D collides and either gets fenced or causes inconsistencies.

Frequently Asked Questions

Does the idempotent producer alone give you Exactly-Once?

No. Idempotence only deduplicates writes to a single partition. If you need atomicity across multiple partitions or topics, or to tie in consumer offsets, you must combine transactions backed by a transactional.id with a read_committed consumer.

Is it safer to set a longer transaction.timeout.ms?

Not necessarily. Any value above the broker's transaction.max.timeout.ms is rejected, and long-running transactions inflate lock, metadata, and retry costs while making aborts more expensive to roll back. The recommended pattern is to repeat begin -> commit in small units.

What happens if an old producer with the same transactional.id keeps running?

When a new producer starts with the same transactional.id, it is assigned a new epoch and the old producer receives ProducerFencedException, failing any further operations. This is fencing. The correct response is to stop or reinitialize the old process immediately.

Check what you learned with practice questions

Practice with certification-focused question sets

無料で問題を解いてみる
Author

NicheeLab Editorial Team

NicheeLab editorial team focused on data engineering and cloud certification learning. Content is structured around practical study needs and official exam domains.


Related articles
Kafka

Kafka Topics & Partitions: Distribution Fundamentals (2026)

How Kafka topics and partitions enable scale — ordering guar...

Kafka

CCDAK Exam Guide: Confluent Certified Developer (2026)

Complete prep for the CCDAK exam — Producer/Consumer API, St...

Kafka

CCAAK Exam Guide: Confluent Certified Administrator (2026)

Pass the CCAAK exam — cluster management, partitions, securi...

Kafka

Kafka Replicas & ISR: Fault Tolerance Explained (2026)

Replica placement, in-sync replicas (ISR), leader election. ...

Kafka

Kafka Offsets: Commit Modes & Consumer Position (2026)

Offset semantics — auto vs. manual commit, __consumer_offset...

Browse all Kafka articles (101)
© 2026 NicheeLab All rights reserved.