KafkaでのExactly-Onceは、単なる「重複しない」ではなく、プロデューサの冪等性とトランザクション、コンシューマの読み取り分離(read_committed)、および(必要に応じて)Kafka Streamsの状態管理を組み合わせて初めて成立します。
本稿は公式ドキュメントの安定機能に基づき、Producer/Consumer/StreamsそれぞれでEOSをどう構成し、どこに注意すべきかを具体的にまとめました。試験対策用の比較表・図・サンプルコード・演習問題も含みます。
KafkaにおけるExactly-Once Semantics(EOS)は、メッセージの重複排除とオフセット管理を一貫させ、再試行や障害時でも「入力1回に対して出力も1回」を保証するための仕組みです。単にProducerの冪等性(enable.idempotence)だけでは不十分で、読み取り・書き込み・オフセットコミットを1つの原子的な単位に束ねるトランザクションが鍵となります。
EOSの適用範囲はKafka内です。外部DBやSaaSに対する最終的なExactly-Onceは、相手側のトランザクションや冪等キー設計(Outboxパターン等)が別途必要になります。Kafka内でのEOSを成立させる基本は、Transactional Producerとread_committedなConsumer、もしくはKafka Streamsのprocessing.guarantee=exactly_once_v2です。
冪等プロデューサ(enable.idempotence=true)は、同一パーティション内での重複生成を抑止します。これは再試行や一時的なブローカ障害時に有効ですが、読み取り側(コンシューマ)のオフセット管理までは関知しません。つまり、冪等性だけでは「取り込み→変換→出力→オフセット更新」を原子的にはできません。
そこでTransactional Producerの出番です。transactional.idを設定し、beginTransactionからcommitTransactionまでの間に send(出力) と sendOffsetsToTransaction(読み取ったオフセットを同一トランザクションに含める) を行います。これにより、万一の障害時でも「出力とオフセット」が同時にコミットされるか、あるいは同時に中断(Abort)され、Exactly-Onceが成立します。トランザクション協調はブローカ側のTransaction Coordinatorが担い、同一transactional.idに対するフェンシングでゾンビプロデューサを排除します。
Transactional ProducerによるConsume-Transform-Produceの原子化
Java: Transactional Producerでオフセットを同一トランザクションに含める
Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
p.put(ProducerConfig.ACKS_CONFIG, "all");
p.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
p.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
p.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
p.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "orders-tx-1");
KafkaProducer<String, String> producer = new KafkaProducer<>(p);
producer.initTransactions();
KafkaConsumer<String, String> consumer = /* read_committed で初期化 */ null;
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
if (!records.isEmpty()) {
producer.beginTransaction();
for (ConsumerRecord<String, String> r : records) {
// 変換して出力
ProducerRecord<String, String> out = new ProducerRecord<>("out-topic", r.key(), transform(r.value()));
producer.send(out);
}
// 読み取ったオフセットを同一トランザクションでコミット
Map<TopicPartition, OffsetAndMetadata> offsets = currentOffsets(records);
producer.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata("cg-transform"));
producer.commitTransaction();
}
} catch (ProducerFencedException | OutOfOrderSequenceException e) {
// フェンシングや順序違反: トランザクションを中断
producer.abortTransaction();
throw e;
} catch (Exception e) {
producer.abortTransaction();
throw e;
}
ConsumerがExactly-Onceを享受するには、isolation.level=read_committed を設定して、中断(Abort)されたトランザクション内のレコードを見えなくすることが必須です(既定はread_uncommitted)。これにより、Transactional ProducerがAbortした場合に不整合なレコードを取り込むことを防げます。
単純な取り込み(読み取って外部へ書く)でEOSを実現したい場合、外部系が同一トランザクションを共有しない限り、Kafka内のEOSだけでは不十分です。Kafka内でのconsume-transform-produceのパイプラインであれば、先述のTransactional Producerと sendOffsetsToTransaction を組み合わせるのが定石です。自動コミット(enable.auto.commit=true)を有効のまま使うと、出力が失敗してもオフセットだけ前進する恐れがあるため、EOSの文脈では避けます。
Java: Consumerの基本設定 (read_committed)
Properties c = new Properties();
c.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
c.put(ConsumerConfig.GROUP_ID_CONFIG, "cg-transform");
c.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
c.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
c.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
c.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(c);
Kafka Streamsは、アプリケーションの状態ストア(ロックステート/ロックステートストア)をchangelogトピックでバックアップし、内部的にTransactional Producerとread_committed Consumerを組み合わせてEOSを提供します。実務では processing.guarantee=exactly_once_v2 の設定が推奨です。これは従来のexactly_once(旧式)に対してレイテンシと可用性のバランスが改善されています。
内部トピック(状態のchangelog・再分配のrepartition)のreplication.factorやcleanup.policy=compactの適切な設定、アプリの再起動やリバランス時のフェンシング動作を理解しておくと、障害時の挙動が読みやすくなります。トポロジの出力先ごとにトランザクションがまとまり、Abortされたレコードは読み取り側のread_committedによって不可視になります。
Java: StreamsのEOS設定と最小トポロジ
Properties s = new Properties();
s.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
s.put(StreamsConfig.APPLICATION_ID_CONFIG, "payments-app");
s.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
StreamsBuilder b = new StreamsBuilder();
KStream<String, String> in = b.stream("in");
KStream<String, String> out = in.mapValues(v -> enrich(v));
out.to("out");
KafkaStreams streams = new KafkaStreams(b.build(), s);
streams.start();
配信保証は at-most-once, at-least-once, exactly-once の3類型で整理します。CCDAKでは、設定の組み合わせと副作用(重複・遅延・可視性)のトレードオフ理解が頻出です。
特に、冪等性だけではEOSにならない点、read_committedを設定しないConsumerはAbort済みレコードも読む点、Streamsでのexactly_once_v2指定と内部トピックの耐障害性設定は押さえておきましょう。
| 保証 | 送信側の要件 | 読み取り側の要件 | 重複と可視性 |
|---|---|---|---|
| At-most-once | acks=0|1, 冪等性/Txなし, 早めのオフセットコミット | 特に指定なし | 重複は基本なしだが取りこぼし発生しうる |
| At-least-once | acks=all 推奨, 冪等性は任意, 再試行あり | 重複排除はアプリで対応 | 重複あり(再処理前提), 取りこぼしは原則回避 |
| Exactly-once | enable.idempotence=true, transactional.id 設定, acks=all, sendOffsetsToTransaction | isolation.level=read_committed, 自動コミット無効 | Abortは不可視、重複排除はKafka内で担保 |
Transactional Producerは、同一transactional.idの多重起動で片方がProducerFencedExceptionとなり停止します。これはゾンビ防止の正しい挙動です。ローリングデプロイではインスタンス数とトランザクションの境界を意識しましょう。
タイムアウト(transaction.timeout.ms)超過やブローカ側の不可でAbortが増えると、待ち時間と再処理が増加します。トピックのmin.insync.replicas、ネットワーク、ストレージIO、GCなどを併せて観測します。重要メトリクスはtransaction関連のコミット/アボート率、プロデューサのretries・batchサイズ、コンシューマのレイテンシ(gauge)などです。
Java: 例外時の安全な中断と再初期化の雛形
try {
producer.beginTransaction();
// ... send
producer.commitTransaction();
} catch (ProducerFencedException e) {
// 同一transactional.idの別インスタンスにフェンスされた
closeQuietly(producer);
System.exit(1); // 再スケジューリングへ
} catch (KafkaException e) {
// それ以外はabortして再試行判断
safeAbort(producer);
// backoff + 再初期化
producer.initTransactions();
}
CCDAK
問題 1
Kafkaで、入力トピックを読み取り変換して出力トピックへ書き戻すアプリにおいて、Kafka内でExactly-Onceを成立させる最小構成として正しいものはどれか。
正解: C
Exactly-OnceをKafka内で成立させるには、Transactional Producerで出力とオフセットを同一トランザクションに含め(sendOffsetsToTransaction)、Consumerはread_committedでAbort済みレコードを不可視化し、auto-commitを無効にする必要がある。冪等性のみやauto-commit有効ではEOSは満たせない。
enable.idempotence と transactional.id の違いは何ですか?
enable.idempotence=true は同一パーティション内の再試行による重複送信を抑止します。transactional.id を設定してトランザクションを使うと、送信レコード群とコンシューマのオフセットコミットを原子的にまとめられ、Consume-Transform-ProduceのEOSが実現します。冪等性だけではオフセットとの原子性は担保できません。
max.in.flight.requests.per.connection はいくつにすべきですか?
EOSでは再試行時の順序乱れを避けるため、idempotence有効時に最大5以下が要件です。クライアントのデフォルトや自動調整に依存せず、試験でも5以下を推奨と覚えておくと安全です。
Kafka Streamsのexactly_onceとexactly_once_v2の違いは?
exactly_once_v2は内部実装が改良され、従来のexactly_onceに比べてレイテンシや再平衡時の挙動が改善されています。現在はexactly_once_v2の指定が推奨です。どちらもEOSを提供しますが、試験ではv2の指定と内部トピック/状態管理の理解がポイントになります。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Kafka Topic と Partition の基礎: 分散とスケーラビリティの要
CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...
CCDAK 試験ガイド:出題範囲・配点・申込み・対策
Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...
Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点
CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...
Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性
レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...
Kafka の Offset とコミット: ポジション管理と at-least-once の基礎
CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...