Kafkaで「二重配信を避けたい」「複数トピックにまたがって一貫性を保ちたい」という要件に対し、単なる再試行や手動デデュープでは限界があります。公式のExactly-Onceセマンティクス(EOS)は、プロデューサの冪等化とトランザクションを組み合わせて、重複ないし欠損のない処理を実現します。
本稿は、CCDAK(Confluent Certified Developer for Apache Kafka)に頻出の論点である transactional.id、idempotent producer、read_committed、sendOffsetsToTransaction などを、実装と設計の双方から整理します。内容はKafka公式ドキュメントと安定概念に基づきます。
KafkaのExactly-Onceセマンティクス(EOS)は二層構えです。第一層は冪等プロデューサ(enable.idempotence=true)で、単一プロデューサから単一パーティションへの重複を排除します。第二層がトランザクションで、複数パーティション・複数トピック間の原子的な書き込みと、オフセットの同時コミットを可能にします。
冪等化だけではプロデューサ再起動やマルチパーティションへの一貫性は担保できません。transactional.id を持つプロデューサのトランザクションを使うと、1つの処理単位で『メッセージ群+消費オフセット』をまとめてコミットまたはアボートでき、read_committed なコンシューマからは中途半端な状態が見えません。
transactional.id は『同一論理プロデューサ』を表す安定識別子です。ブローカー上のトランザクション・コーディネータは transactional.id ごとにエポック(producer epoch)を管理し、新旧プロデューサの整合性を保ちます。障害後に同じ transactional.id で起動すると、新しいエポックが付与され、古いプロデューサはフェンシング(無効化)されます。
設計の勘所は、スケールと衝突回避です。インスタンス数ぶんユニークにするだけではなく、処理単位(例: 入力パーティション、Kafka Streams のタスク)と1対1で決定的に割り当てると、リバランスや再起動時でも同じ transactional.id を再利用でき、EOSが維持されます。ランダム生成は厳禁です。
トランザクションAPIの最小手順は、initTransactions → beginTransaction → レコード送信 → (必要なら)sendOffsetsToTransaction → commitTransaction(または abortTransaction)です。これにより、関連するすべてのパーティションにコミットマーカーが書かれ、read_committed なコンシューマにはコミット済みのみが見えます。
sendOffsetsToTransaction は『読み取り→処理→書き込み』の一連の処理で、コンシューマのオフセットを同じトランザクションに含めるための呼び出しです。enable.auto.commit は無効化し、アプリ側でオフセットコミットを制御します。
トランザクションの流れ(概念)
read_committed Consumer は commit 済みレコードのみを参照します。
Java プロデューサ(トランザクション)の最小実装例
Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 冪等化とトランザクション
p.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
p.put(ProducerConfig.ACKS_CONFIG, "all");
// transactional.id は処理単位に対して安定・一意に
p.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "orders-task-3");
// 長すぎるとブローカーの上限に拒否される
p.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);
try (KafkaProducer<String, String> producer = new KafkaProducer<>(p)) {
producer.initTransactions();
// コンシューマ設定の一例(同プロセスで扱う場合)
Properties c = new Properties();
c.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
c.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
c.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
c.put(ConsumerConfig.GROUP_ID_CONFIG, "orders-g");
c.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
c.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(c);
consumer.subscribe(Collections.singletonList("orders-in"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
if (records.isEmpty()) continue;
producer.beginTransaction();
for (ConsumerRecord<String, String> r : records) {
// 変換して別トピックへ
ProducerRecord<String, String> out = new ProducerRecord<>("orders-out", r.key(), r.value());
producer.send(out);
}
// 同一トランザクションでオフセットもコミット
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> part = records.records(tp);
long lastOffset = part.get(part.size() - 1).offset();
offsets.put(tp, new OffsetAndMetadata(lastOffset + 1));
}
producer.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata("orders-g"));
producer.commitTransaction();
}
} catch (ProducerFencedException e) {
// 同じ transactional.id の新プロデューサが先に稼働 -> このプロセスは停止して再配置
throw e;
} catch (KafkaException e) {
// 失敗時はアボートして再試行
// producer.abortTransaction(); // 適切な場所で呼び出す
throw e;
}EOSを完結させるには、コンシューマ側で isolation.level=read_committed を設定し、未コミットやアボート済みのレコードを読み飛ばす必要があります。デフォルトの read_uncommitted のままだと、途中の状態を可視化してしまい、二重処理や不整合の原因になります。
オフセットは auto commit を無効化し、sendOffsetsToTransaction を使ってプロデューサのトランザクションに含めます。これにより、書き込み成功とオフセット前進が同時にコミットされ、処理の『ちぎれ』が発生しません。
transaction.timeout.ms はトランザクションの最大継続時間です。これを超えるとブローカーがトランザクションをアボートします。クライアント側の設定はブローカーの transaction.max.timeout.ms を上回れないため、長大トランザクションを避け、処理を小さなバッチに分割します。
トランザクションのメタデータは内部トピックに保持され、コミット/アボートのマーカーが各データパーティションにも書き込まれます。read_committed コンシューマはアボート済みレコードを見えないようにフィルタします。フェイルオーバ時は、同じ transactional.id を持つ新プロデューサが起動すると古いプロデューサはフェンシングされます(ProducerFencedException)。
CCDAKでは、冪等化とトランザクションの違い、transactional.id の意味、read_committed の効果、sendOffsetsToTransaction の目的が頻出です。オプションの暗記だけでなく、どの要件に何が必要かを言語化できると得点しやすくなります。
また、マルチインスタンス時の transactional.id 設計(安定・決定的・一意)や、ブローカー上限によるトランザクションタイムアウトの拒否も問われます。ランダムな transactional.id、auto commit 併用、read_uncommitted 放置は典型的な誤りです。
| 配信保証 | 必要条件(要点) | 可視化されるデータ |
|---|---|---|
| At-most-once | 自動コミット先行や送信前コミット | 欠損はあり得るが重複はしない |
| At-least-once | 再試行・手動コミット(トランザクションなし) | 重複の可能性あり |
| Exactly-once(EOS) | 冪等+トランザクション+read_committed+sendOffsetsToTransaction | コミット済みのみ(アボートは非表示) |
CCDAK
問題 1
Kafkaで『orders-in を読み、加工して orders-out へ書き込み、同時にオフセットを進める』フローをExactly-Onceで実現したい。アプリは障害時に別インスタンスへフェイルオーバーする。最も適切な設計はどれか?
正解: B
EOSには冪等+トランザクション+read_committed が必要で、オフセットは sendOffsetsToTransaction で同一トランザクションに含めます。transactional.id は処理単位に対して安定・一意に割り当て再起動時も再利用し、フェイルオーバー時はフェンシングで旧プロセスを無効化します。AはIDが変わるためEOSが維持できず、Cは途中露出が起き得ます。Dは衝突し、フェンシングされるか不整合を招きます。
冪等プロデューサだけでExactly-Onceになりますか?
なりません。冪等化は単一パーティションに対する重複排除に留まります。複数パーティション・トピック間の原子性やオフセット連携が必要なら、transactional.id によるトランザクションと read_committed を併用してください。
transaction.timeout.ms を長くすれば安全ですか?
必ずしも安全ではありません。ブローカーの transaction.max.timeout.ms を超える値は拒否されますし、長大なトランザクションはロック・メタデータ・再試行コストを増やし、アボート時の巻き戻しも重くなります。短い単位で begin→commit を繰り返す設計が推奨です。
同じ transactional.id を持つ旧プロデューサが動き続けたらどうなりますか?
同じ transactional.id の新プロデューサが起動すると、新しいエポックが付与され、旧プロデューサは ProducerFencedException を受けて以降の操作が失敗します。これがフェンシングです。旧プロセスは速やかに停止または再初期化するのが正解です。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Kafka Topic と Partition の基礎: 分散とスケーラビリティの要
CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...
CCDAK 試験ガイド:出題範囲・配点・申込み・対策
Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...
Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点
CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...
Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性
レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...
Kafka の Offset とコミット: ポジション管理と at-least-once の基礎
CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...