Kafka

CCDAK対策: KafkaのExactly-Once SemanticsをProducer・Consumer・Streamsで実現する要点

2026-04-19
NicheeLab編集部

KafkaでのExactly-Onceは、単なる「重複しない」ではなく、プロデューサの冪等性とトランザクション、コンシューマの読み取り分離(read_committed)、および(必要に応じて)Kafka Streamsの状態管理を組み合わせて初めて成立します。

本稿は公式ドキュメントの安定機能に基づき、Producer/Consumer/StreamsそれぞれでEOSをどう構成し、どこに注意すべきかを具体的にまとめました。試験対策用の比較表・図・サンプルコード・演習問題も含みます。

Exactly-Onceの定義と前提整理

KafkaにおけるExactly-Once Semantics(EOS)は、メッセージの重複排除とオフセット管理を一貫させ、再試行や障害時でも「入力1回に対して出力も1回」を保証するための仕組みです。単にProducerの冪等性(enable.idempotence)だけでは不十分で、読み取り・書き込み・オフセットコミットを1つの原子的な単位に束ねるトランザクションが鍵となります。

EOSの適用範囲はKafka内です。外部DBやSaaSに対する最終的なExactly-Onceは、相手側のトランザクションや冪等キー設計(Outboxパターン等)が別途必要になります。Kafka内でのEOSを成立させる基本は、Transactional Producerとread_committedなConsumer、もしくはKafka Streamsのprocessing.guarantee=exactly_once_v2です。

  • 冪等性: 同一パーティション内の再試行による重複送信を抑止
  • トランザクション: 送信レコードとオフセットコミットを原子的に保護
  • 読み取り分離: Consumerはread_committedで中断(Abort)トランザクションを不可視化
  • Kafka Streams: 内部トピックと状態ストアでEOSを包括的に実現

ProducerでのEOS: 冪等性とトランザクションの使い分け

冪等プロデューサ(enable.idempotence=true)は、同一パーティション内での重複生成を抑止します。これは再試行や一時的なブローカ障害時に有効ですが、読み取り側(コンシューマ)のオフセット管理までは関知しません。つまり、冪等性だけでは「取り込み→変換→出力→オフセット更新」を原子的にはできません。

そこでTransactional Producerの出番です。transactional.idを設定し、beginTransactionからcommitTransactionまでの間に send(出力) と sendOffsetsToTransaction(読み取ったオフセットを同一トランザクションに含める) を行います。これにより、万一の障害時でも「出力とオフセット」が同時にコミットされるか、あるいは同時に中断(Abort)され、Exactly-Onceが成立します。トランザクション協調はブローカ側のTransaction Coordinatorが担い、同一transactional.idに対するフェンシングでゾンビプロデューサを排除します。

  • 必須・推奨設定例: enable.idempotence=true, acks=all, retries>0, max.in.flight.requests.per.connection<=5, transactional.id=任意固定値
  • ブローカ整合性: min.insync.replicas>=2 と acks=all の組み合わせで耐障害性を確保
  • トランザクションタイムアウト: producer.transaction.timeout.ms は broker の transaction.max.timeout.ms 以下に
  • 異常時: ProducerFencedException, OutOfOrderSequenceException 発生時は abortTransaction で巻き戻し

Transactional ProducerによるConsume-Transform-Produceの原子化

poll()beginTransactionsend(records)sendOffsetsToTransactioncommitTransactionConsumerread_committedTransaction CoordinatorProducertxn.id=X, idempotent=trueTopic Partitions__consumer_offsets

Java: Transactional Producerでオフセットを同一トランザクションに含める

Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
p.put(ProducerConfig.ACKS_CONFIG, "all");
p.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
p.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
p.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
p.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "orders-tx-1");

KafkaProducer<String, String> producer = new KafkaProducer<>(p);
producer.initTransactions();

KafkaConsumer<String, String> consumer = /* read_committed で初期化 */ null;

try {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
  if (!records.isEmpty()) {
    producer.beginTransaction();
    for (ConsumerRecord<String, String> r : records) {
      // 変換して出力
      ProducerRecord<String, String> out = new ProducerRecord<>("out-topic", r.key(), transform(r.value()));
      producer.send(out);
    }
    // 読み取ったオフセットを同一トランザクションでコミット
    Map<TopicPartition, OffsetAndMetadata> offsets = currentOffsets(records);
    producer.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata("cg-transform"));
    producer.commitTransaction();
  }
} catch (ProducerFencedException | OutOfOrderSequenceException e) {
  // フェンシングや順序違反: トランザクションを中断
  producer.abortTransaction();
  throw e;
} catch (Exception e) {
  producer.abortTransaction();
  throw e;
}

Consumer側の要点: read_committedとオフセット管理

ConsumerがExactly-Onceを享受するには、isolation.level=read_committed を設定して、中断(Abort)されたトランザクション内のレコードを見えなくすることが必須です(既定はread_uncommitted)。これにより、Transactional ProducerがAbortした場合に不整合なレコードを取り込むことを防げます。

単純な取り込み(読み取って外部へ書く)でEOSを実現したい場合、外部系が同一トランザクションを共有しない限り、Kafka内のEOSだけでは不十分です。Kafka内でのconsume-transform-produceのパイプラインであれば、先述のTransactional Producerと sendOffsetsToTransaction を組み合わせるのが定石です。自動コミット(enable.auto.commit=true)を有効のまま使うと、出力が失敗してもオフセットだけ前進する恐れがあるため、EOSの文脈では避けます。

  • isolation.level=read_committed を明示設定
  • enable.auto.commit=false とし、オフセットはProducerトランザクションで送信
  • 外部出力にEOSを広げるには、相手側のトランザクション or 冪等キー設計が必要

Java: Consumerの基本設定 (read_committed)

Properties c = new Properties();
c.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
c.put(ConsumerConfig.GROUP_ID_CONFIG, "cg-transform");
c.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
c.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
c.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
c.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(c);

Kafka StreamsでのEOS: processing.guarantee=exactly_once_v2

Kafka Streamsは、アプリケーションの状態ストア(ロックステート/ロックステートストア)をchangelogトピックでバックアップし、内部的にTransactional Producerとread_committed Consumerを組み合わせてEOSを提供します。実務では processing.guarantee=exactly_once_v2 の設定が推奨です。これは従来のexactly_once(旧式)に対してレイテンシと可用性のバランスが改善されています。

内部トピック(状態のchangelog・再分配のrepartition)のreplication.factorやcleanup.policy=compactの適切な設定、アプリの再起動やリバランス時のフェンシング動作を理解しておくと、障害時の挙動が読みやすくなります。トポロジの出力先ごとにトランザクションがまとまり、Abortされたレコードは読み取り側のread_committedによって不可視になります。

  • processing.guarantee=exactly_once_v2 を明示
  • 内部トピックの replication.factor と min.insync.replicas を適切に
  • アプリID(application.id)は安定値にし、ステートを正しく復元
  • レコードキー設計(正しいパーティショニング)で順序保証を維持

Java: StreamsのEOS設定と最小トポロジ

Properties s = new Properties();
s.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
s.put(StreamsConfig.APPLICATION_ID_CONFIG, "payments-app");
s.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);

StreamsBuilder b = new StreamsBuilder();
KStream<String, String> in = b.stream("in");
KStream<String, String> out = in.mapValues(v -> enrich(v));
out.to("out");

KafkaStreams streams = new KafkaStreams(b.build(), s);
streams.start();

配信保証の比較とCCDAKで問われやすいポイント

配信保証は at-most-once, at-least-once, exactly-once の3類型で整理します。CCDAKでは、設定の組み合わせと副作用(重複・遅延・可視性)のトレードオフ理解が頻出です。

特に、冪等性だけではEOSにならない点、read_committedを設定しないConsumerはAbort済みレコードも読む点、Streamsでのexactly_once_v2指定と内部トピックの耐障害性設定は押さえておきましょう。

  • 冪等性=重複送信抑止(Producer内)であり、オフセット原子化にはトランザクションが必要
  • read_committedでAbort済みレコードを不可視化
  • StreamsのEOSは状態ストアと内部トピック設計が前提
保証送信側の要件読み取り側の要件重複と可視性
At-most-onceacks=0|1, 冪等性/Txなし, 早めのオフセットコミット特に指定なし重複は基本なしだが取りこぼし発生しうる
At-least-onceacks=all 推奨, 冪等性は任意, 再試行あり重複排除はアプリで対応重複あり(再処理前提), 取りこぼしは原則回避
Exactly-onceenable.idempotence=true, transactional.id 設定, acks=all, sendOffsetsToTransactionisolation.level=read_committed, 自動コミット無効Abortは不可視、重複排除はKafka内で担保

運用・トラブルシュート: フェンシング/タイムアウト/メトリクス

Transactional Producerは、同一transactional.idの多重起動で片方がProducerFencedExceptionとなり停止します。これはゾンビ防止の正しい挙動です。ローリングデプロイではインスタンス数とトランザクションの境界を意識しましょう。

タイムアウト(transaction.timeout.ms)超過やブローカ側の不可でAbortが増えると、待ち時間と再処理が増加します。トピックのmin.insync.replicas、ネットワーク、ストレージIO、GCなどを併せて観測します。重要メトリクスはtransaction関連のコミット/アボート率、プロデューサのretries・batchサイズ、コンシューマのレイテンシ(gauge)などです。

  • 例外別初動: ProducerFenced→プロセス停止と再配置、OutOfOrderSequence→abort後に再初期化
  • ブローカ設定との整合: transaction.max.timeout.ms と producer.transaction.timeout.ms
  • 内部トピック(Streams): replication.factor とクリーンアップポリシーを監視
  • 再試行時の順序維持: max.in.flight.requests.per.connection<=5 を遵守

Java: 例外時の安全な中断と再初期化の雛形

try {
  producer.beginTransaction();
  // ... send
  producer.commitTransaction();
} catch (ProducerFencedException e) {
  // 同一transactional.idの別インスタンスにフェンスされた
  closeQuietly(producer);
  System.exit(1); // 再スケジューリングへ
} catch (KafkaException e) {
  // それ以外はabortして再試行判断
  safeAbort(producer);
  // backoff + 再初期化
  producer.initTransactions();
}

問題で確認

CCDAK

問題 1

Kafkaで、入力トピックを読み取り変換して出力トピックへ書き戻すアプリにおいて、Kafka内でExactly-Onceを成立させる最小構成として正しいものはどれか。

  1. 冪等プロデューサ(enable.idempotence=true)のみ。Consumerは既定値のままでよい。
  2. トランザクションありのProducerと、Consumerはauto-commitを有効にする。read_uncommittedでよい。
  3. トランザクションありのProducerでsendOffsetsToTransactionを使い、Consumerはread_committedかつauto-commit無効。
  4. Kafka Streamsを使わず、Producer/Consumerとも既定設定のままにしてアプリ側で重複排除する。

正解: C

Exactly-OnceをKafka内で成立させるには、Transactional Producerで出力とオフセットを同一トランザクションに含め(sendOffsetsToTransaction)、Consumerはread_committedでAbort済みレコードを不可視化し、auto-commitを無効にする必要がある。冪等性のみやauto-commit有効ではEOSは満たせない。

よくある質問

enable.idempotence と transactional.id の違いは何ですか?

enable.idempotence=true は同一パーティション内の再試行による重複送信を抑止します。transactional.id を設定してトランザクションを使うと、送信レコード群とコンシューマのオフセットコミットを原子的にまとめられ、Consume-Transform-ProduceのEOSが実現します。冪等性だけではオフセットとの原子性は担保できません。

max.in.flight.requests.per.connection はいくつにすべきですか?

EOSでは再試行時の順序乱れを避けるため、idempotence有効時に最大5以下が要件です。クライアントのデフォルトや自動調整に依存せず、試験でも5以下を推奨と覚えておくと安全です。

Kafka Streamsのexactly_onceとexactly_once_v2の違いは?

exactly_once_v2は内部実装が改良され、従来のexactly_onceに比べてレイテンシや再平衡時の挙動が改善されています。現在はexactly_once_v2の指定が推奨です。どちらもEOSを提供しますが、試験ではv2の指定と内部トピック/状態管理の理解がポイントになります。

この記事で学んだ内容を問題で確認しましょう

16,000問以上の問題で実力チェック

無料で問題を解いてみる
この記事の著者

NicheeLab編集部

データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。


関連記事
Kafka

Kafka Topic と Partition の基礎: 分散とスケーラビリティの要

CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...

Kafka

CCDAK 試験ガイド:出題範囲・配点・申込み・対策

Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...

Kafka

Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点

CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...

Kafka

Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性

レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...

Kafka

Kafka の Offset とコミット: ポジション管理と at-least-once の基礎

CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...

Kafkaの記事一覧 (101件)
© 2026 NicheeLab All rights reserved.