Kafka

Kafka Transactions 実践ガイド: Exactly-Once 送信と transactional.id の設計

2026-04-19
NicheeLab編集部

Kafkaで「二重配信を避けたい」「複数トピックにまたがって一貫性を保ちたい」という要件に対し、単なる再試行や手動デデュープでは限界があります。公式のExactly-Onceセマンティクス(EOS)は、プロデューサの冪等化とトランザクションを組み合わせて、重複ないし欠損のない処理を実現します。

本稿は、CCDAK(Confluent Certified Developer for Apache Kafka)に頻出の論点である transactional.id、idempotent producer、read_committed、sendOffsetsToTransaction などを、実装と設計の双方から整理します。内容はKafka公式ドキュメントと安定概念に基づきます。

EOSの基本: 冪等プロデューサとトランザクションの関係

KafkaのExactly-Onceセマンティクス(EOS)は二層構えです。第一層は冪等プロデューサ(enable.idempotence=true)で、単一プロデューサから単一パーティションへの重複を排除します。第二層がトランザクションで、複数パーティション・複数トピック間の原子的な書き込みと、オフセットの同時コミットを可能にします。

冪等化だけではプロデューサ再起動やマルチパーティションへの一貫性は担保できません。transactional.id を持つプロデューサのトランザクションを使うと、1つの処理単位で『メッセージ群+消費オフセット』をまとめてコミットまたはアボートでき、read_committed なコンシューマからは中途半端な状態が見えません。

  • 冪等プロデューサは重複排除(同一メッセージの再送)をパーティション単位で保証
  • トランザクションは複数パーティション・トピックを横断した原子性を提供
  • EOSを完結させるにはコンシューマ側の isolation.level=read_committed が必須
  • オフセットは sendOffsetsToTransaction でトランザクションに含める
  • CCDAKでは『冪等=単一パーティション』『トランザクション=複数パーティション+オフセット連携』を問われやすい

transactional.id の設計とフェンシング

transactional.id は『同一論理プロデューサ』を表す安定識別子です。ブローカー上のトランザクション・コーディネータは transactional.id ごとにエポック(producer epoch)を管理し、新旧プロデューサの整合性を保ちます。障害後に同じ transactional.id で起動すると、新しいエポックが付与され、古いプロデューサはフェンシング(無効化)されます。

設計の勘所は、スケールと衝突回避です。インスタンス数ぶんユニークにするだけではなく、処理単位(例: 入力パーティション、Kafka Streams のタスク)と1対1で決定的に割り当てると、リバランスや再起動時でも同じ transactional.id を再利用でき、EOSが維持されます。ランダム生成は厳禁です。

  • 安定・決定的: transactional.id は再起動後も同一処理単位に同じ値を再利用
  • ユニーク: 並列プロデューサ間で衝突させない
  • フェンシング: 同じ transactional.id の新プロデューサ起動で旧プロデューサは ProducerFencedException
  • 有効期限: transactional.id はブローカー側の transactional.id.expiration.ms 経過でクリーンアップ対象
  • ブローカー制約: クライアントの transaction.timeout.ms は broker の transaction.max.timeout.ms を超えられない

APIワークフロー: init/begin/send/sendOffsetsToTransaction/commit

トランザクションAPIの最小手順は、initTransactions → beginTransaction → レコード送信 → (必要なら)sendOffsetsToTransaction → commitTransaction(または abortTransaction)です。これにより、関連するすべてのパーティションにコミットマーカーが書かれ、read_committed なコンシューマにはコミット済みのみが見えます。

sendOffsetsToTransaction は『読み取り→処理→書き込み』の一連の処理で、コンシューマのオフセットを同じトランザクションに含めるための呼び出しです。enable.auto.commit は無効化し、アプリ側でオフセットコミットを制御します。

  • initTransactions はプロセス起動時に1回。トランザクション・コーディネータと握手
  • beginTransaction 〜 commit/abort のスコープを明確に。長すぎるトランザクションは避ける
  • 失敗時は abortTransaction で中途半端な書き込みを破棄
  • 送信順序は保持されるが、冪等化と組み合わせて再送を安全化
  • コンシューマの isolation.level=read_committed でアボート済みレコードは非表示

トランザクションの流れ(概念)

App (Producer with transactional.id)initTransactions / beginTransactionTransaction CoordinatorTopic AP0..PnTopic BP0..PmOffsets(__consumer_offsets)トランザクションの流れ(Commit markers は atomic)

read_committed Consumer は commit 済みレコードのみを参照します。

Java プロデューサ(トランザクション)の最小実装例

Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 冪等化とトランザクション
p.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
p.put(ProducerConfig.ACKS_CONFIG, "all");
// transactional.id は処理単位に対して安定・一意に
p.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "orders-task-3");
// 長すぎるとブローカーの上限に拒否される
p.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);

try (KafkaProducer<String, String> producer = new KafkaProducer<>(p)) {
    producer.initTransactions();

    // コンシューマ設定の一例(同プロセスで扱う場合)
    Properties c = new Properties();
    c.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    c.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    c.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    c.put(ConsumerConfig.GROUP_ID_CONFIG, "orders-g");
    c.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    c.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(c);
    consumer.subscribe(Collections.singletonList("orders-in"));

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
        if (records.isEmpty()) continue;

        producer.beginTransaction();
        for (ConsumerRecord<String, String> r : records) {
            // 変換して別トピックへ
            ProducerRecord<String, String> out = new ProducerRecord<>("orders-out", r.key(), r.value());
            producer.send(out);
        }
        // 同一トランザクションでオフセットもコミット
        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
        for (TopicPartition tp : records.partitions()) {
            List<ConsumerRecord<String, String>> part = records.records(tp);
            long lastOffset = part.get(part.size() - 1).offset();
            offsets.put(tp, new OffsetAndMetadata(lastOffset + 1));
        }
        producer.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata("orders-g"));
        producer.commitTransaction();
    }
} catch (ProducerFencedException e) {
    // 同じ transactional.id の新プロデューサが先に稼働 -> このプロセスは停止して再配置
    throw e;
} catch (KafkaException e) {
    // 失敗時はアボートして再試行
    // producer.abortTransaction(); // 適切な場所で呼び出す
    throw e;
}

コンシューマ設定とオフセットのトランザクション連携

EOSを完結させるには、コンシューマ側で isolation.level=read_committed を設定し、未コミットやアボート済みのレコードを読み飛ばす必要があります。デフォルトの read_uncommitted のままだと、途中の状態を可視化してしまい、二重処理や不整合の原因になります。

オフセットは auto commit を無効化し、sendOffsetsToTransaction を使ってプロデューサのトランザクションに含めます。これにより、書き込み成功とオフセット前進が同時にコミットされ、処理の『ちぎれ』が発生しません。

  • isolation.level=read_committed(コンシューマ)
  • enable.auto.commit=false でアプリ制御
  • sendOffsetsToTransaction(offsets, groupMetadata) を忘れない
  • リバランスに強い設計: transactional.id をタスクや入力パーティションに結び付ける
  • 外部シンク(DBなど)と整合させる場合は二段階コミットでなく『Kafka→Kafka』の範囲にEOSを閉じるか、外部側のトランザクション戦略を別途検討

運用: タイムアウト、監視、失敗時のリカバリ

transaction.timeout.ms はトランザクションの最大継続時間です。これを超えるとブローカーがトランザクションをアボートします。クライアント側の設定はブローカーの transaction.max.timeout.ms を上回れないため、長大トランザクションを避け、処理を小さなバッチに分割します。

トランザクションのメタデータは内部トピックに保持され、コミット/アボートのマーカーが各データパーティションにも書き込まれます。read_committed コンシューマはアボート済みレコードを見えないようにフィルタします。フェイルオーバ時は、同じ transactional.id を持つ新プロデューサが起動すると古いプロデューサはフェンシングされます(ProducerFencedException)。

  • 監視の要点: TransactionCoordinator/Producer 指標(コミット/アボート数、エラー、レイテンシ)
  • transactional.id.expiration.ms を理解し、未使用IDのGCを待つ運用に備える
  • 中断時の方針: 明確に abortTransaction し、再試行の境界を作る
  • バッチ粒度: タイムアウト内に収まるサイズに調整
  • SLA: EOSは便利だがオーバーヘッドあり。必要なフローに限定して適用

試験対策と落とし穴(CCDAK向け)

CCDAKでは、冪等化とトランザクションの違い、transactional.id の意味、read_committed の効果、sendOffsetsToTransaction の目的が頻出です。オプションの暗記だけでなく、どの要件に何が必要かを言語化できると得点しやすくなります。

また、マルチインスタンス時の transactional.id 設計(安定・決定的・一意)や、ブローカー上限によるトランザクションタイムアウトの拒否も問われます。ランダムな transactional.id、auto commit 併用、read_uncommitted 放置は典型的な誤りです。

  • 冪等のみ: 単一プロデューサ→単一パーティションの重複抑止
  • トランザクション: 複数パーティション原子化+オフセットコミットの同時性
  • read_committed: アボート済みレコードを不可視化
  • transactional.id: 安定識別子+フェンシングの鍵
  • 送信順序・再送・ack=all の関係を整理しておく
配信保証必要条件(要点)可視化されるデータ
At-most-once自動コミット先行や送信前コミット欠損はあり得るが重複はしない
At-least-once再試行・手動コミット(トランザクションなし)重複の可能性あり
Exactly-once(EOS)冪等+トランザクション+read_committed+sendOffsetsToTransactionコミット済みのみ(アボートは非表示)

問題で確認

CCDAK

問題 1

Kafkaで『orders-in を読み、加工して orders-out へ書き込み、同時にオフセットを進める』フローをExactly-Onceで実現したい。アプリは障害時に別インスタンスへフェイルオーバーする。最も適切な設計はどれか?

  1. 各起動ごとにランダムな transactional.id を生成し、enable.idempotence=true のみ設定する
  2. 固定の transactional.id を処理単位(例: 入力パーティション)に割り当てて再利用し、冪等+トランザクションを使い、sendOffsetsToTransaction と read_committed を設定する
  3. enable.auto.commit=true のまま、処理後に手動で orders-out へ送信する
  4. 複数インスタンスで同じ transactional.id を共有し、同時に送信してスループットを上げる

正解: B

EOSには冪等+トランザクション+read_committed が必要で、オフセットは sendOffsetsToTransaction で同一トランザクションに含めます。transactional.id は処理単位に対して安定・一意に割り当て再起動時も再利用し、フェイルオーバー時はフェンシングで旧プロセスを無効化します。AはIDが変わるためEOSが維持できず、Cは途中露出が起き得ます。Dは衝突し、フェンシングされるか不整合を招きます。

よくある質問

冪等プロデューサだけでExactly-Onceになりますか?

なりません。冪等化は単一パーティションに対する重複排除に留まります。複数パーティション・トピック間の原子性やオフセット連携が必要なら、transactional.id によるトランザクションと read_committed を併用してください。

transaction.timeout.ms を長くすれば安全ですか?

必ずしも安全ではありません。ブローカーの transaction.max.timeout.ms を超える値は拒否されますし、長大なトランザクションはロック・メタデータ・再試行コストを増やし、アボート時の巻き戻しも重くなります。短い単位で begin→commit を繰り返す設計が推奨です。

同じ transactional.id を持つ旧プロデューサが動き続けたらどうなりますか?

同じ transactional.id の新プロデューサが起動すると、新しいエポックが付与され、旧プロデューサは ProducerFencedException を受けて以降の操作が失敗します。これがフェンシングです。旧プロセスは速やかに停止または再初期化するのが正解です。

この記事で学んだ内容を問題で確認しましょう

16,000問以上の問題で実力チェック

無料で問題を解いてみる
この記事の著者

NicheeLab編集部

データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。


関連記事
Kafka

Kafka Topic と Partition の基礎: 分散とスケーラビリティの要

CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...

Kafka

CCDAK 試験ガイド:出題範囲・配点・申込み・対策

Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...

Kafka

Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点

CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...

Kafka

Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性

レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...

Kafka

Kafka の Offset とコミット: ポジション管理と at-least-once の基礎

CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...

Kafkaの記事一覧 (100件)
© 2026 NicheeLab All rights reserved.