Kafka の「オフセット」は単なる数値ではなく、コンシューマグループの“次に読む位置”を意味します。どのタイミングでコミットするかは、ロスや重複の挙動を左右します。
本稿は CCDAK の出題傾向(オフセットの意味、コミット方法、デリバリセマンティクス、リバランスの影響)に沿って、安定した公式挙動をベースに解説します。
Kafka のオフセットはパーティションごとの連番で、コンシューマグループは各パーティションに対する「次に読むオフセット」を __consumer_offsets(コンパクション対象の内部トピック)に保存します。ここに記録されるのは“最後に処理したレコードの次”である点が重要です。
コンシューマは起動時にグループコーディネータから最新コミットを取得し、その位置から読み直します。コミットが存在しない場合は auto.offset.reset(earliest または latest)に従って開始位置が決まります。フェッチ位置(次に取得予定)とコミット済み位置(再起動時の開始点)は区別して考えます。
オフセットとコミット位置の見取り図
Kafka コンシューマのコミットは自動(enable.auto.commit)か手動(commitSync/commitAsync)を選べます。どれを選ぶかで“ロスの可能性”と“重複の可能性”、さらにスループットや待ち時間が変わります。
at-least-once を守る基本は「副作用(外部書き込みなど)が完了してからコミット」です。処理前にコミットすると at-most-once(ロスの可能性あり)になり、コミットを遅らせるほど障害時の重複再処理が増えます。
| 戦略 | 典型設定/API | ロス/重複リスク | レイテンシ/オーバーヘッド |
|---|---|---|---|
| 自動コミット | enable.auto.commit=true, auto.commit.interval.ms | 処理後に障害→ロスの可能性あり。重複は状況次第 | 低い(簡便だが制御が粗い) |
| 手動・同期コミット | enable.auto.commit=false, commitSync() | ロスは最小化。障害再開で重複あり | 高め(ACK待ち) |
| 手動・非同期コミット | enable.auto.commit=false, commitAsync() | 非同期で一部コミット喪失の可能性。重複あり | 低め(スループット重視) |
| バッチ同期コミット | レコード N 件ごと or 間隔ごとに commitSync() | 重複はバッチ単位。ロスは最小化 | 中程度(折衷) |
| トランザクション連携 | プロデューサの sendOffsetsToTransaction + read_committed | 読取-書込-コミットを一体化。重複・ロスの抑制 | やや高い(設計が複雑) |
基本ループは「poll → 処理(外部へ確定書き込み) → コミット」。バッチ処理のまとまりを意識し、重い処理で max.poll.interval.ms を超過しないように調整します。必要なら pause/resume でバックプレッシャをかけます。
外部システムが冪等(同じキーでの upsert、重複排除)であるほど、at-least-once の重複影響は抑えられます。exactly-once が必要な場合はトランザクション(プロデューサの EOS と read_committed)を検討します。
Java Consumer(手動コミットで at-least-once)
// 必要ライブラリ: org.apache.kafka:kafka-clients
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "etl-payments-g1");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("payments"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
// パーティション順序を保ちつつ処理
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
for (ConsumerRecord<String, String> r : tpRecords) {
// 1) 外部システムへ確定書き込み(冪等/再実行可能に)
writeToDbIdempotent(r.key(), r.value());
}
// 2) パーティション単位で最後のオフセット+1を同期コミット
long lastOffset = tpRecords.get(tpRecords.size() - 1).offset();
Map<TopicPartition, OffsetAndMetadata> commit = new HashMap<>();
commit.put(tp, new OffsetAndMetadata(lastOffset + 1));
try {
consumer.commitSync(commit);
} catch (CommitFailedException e) {
// リバランス直後等。次ループで再処理(at-least-once)
log.warn("commit failed, will retry by reprocessing", e);
}
}
}
}
// 参考: 過負荷時は consumer.pause(assignments) / resume を併用して max.poll.interval.ms 超過を回避メンバーの参加/離脱やサブスクライブトピックの変化でリバランスが発生します。パーティションが revoke される直前までに処理済みオフセットをコミットしていないと、引き継いだコンシューマが過去分を重複処理します(at-least-once 的には許容だが、重複が増える)。
ConsumerRebalanceListener を使い、onPartitionsRevoked で即時コミットするのが定石です。Cooperative Sticky アサイナーでも、段階的な revoke ごとに同様の配慮が必要です。poll の空転や処理時間超過で max.poll.interval.ms を越えると強制リバランスになりがちなので、pause/resume やバッチ粒度調整で回避します。
コミットが存在しない場合の開始位置は auto.offset.reset=earliest/latest で決まります。OutOfRange になった場合(保持期間切れなど)は明示的に seekToBeginning/seekToEnd で位置決めするか、コンシューマグループのオフセットをツールでリセットします。
再処理が必要な場合は、割り当て後に特定パーティションへ seek して過去オフセットから読み直すか、別の group.id で“新規コンシューマグループとして”読み込みます。いずれも at-least-once では重複が発生する前提で外部書き込みを冪等にしておくのが安全です。
コミットされるのは“次に読む位置”であり、最後に処理したレコードのオフセットそのものではありません。これを前提に、障害時の再開位置と重複の発生パターンを説明できる必要があります。
複数パーティションのオフセットを一括コミットしても、外部システムへの書き込みと原子的に結びつくわけではありません。外部副作用とコミットの間に障害が挟まれば重複は起こり得ます。exactly-once を目指すなら、トランザクション(sendOffsetsToTransaction と read_committed)を組み合わせる設計を選択します。
CCDAK
問題 1
外部データベースへ確定書き込みを行う Kafka コンシューマで、at-least-once を維持したい。最も適切な手法はどれか?
正解: B
処理(外部への確定書き込み)完了後に同期コミットするのが at-least-once の基本。B が正解。A と C は処理前コミットでロスの可能性があり、D は重複を根本的には防げない。
コミットされる値は“最後に処理したオフセット”ですか?それとも“次に読むオフセット”ですか?
“次に読むオフセット”です。例えば最後に処理したのが 4 なら、コミットするのは 5。再起動時は 5 から再開し、0〜4 は再処理されません。
重複を完全に消したいのですが、どうすれば良いですか?
コンシューマ側の手動コミットだけでは重複は起こり得ます。重複排除には、外部システムの冪等化(同一キーの upsert、一意制約)か、Kafka のトランザクション(プロデューサの EOS と sendOffsetsToTransaction、コンシューマは read_committed)を組み合わせる設計を検討します。
過去データを再処理したい場合、どのようにオフセットを戻せますか?
割り当て後に特定パーティションへ seek して戻す、あるいは管理ツール(kafka-consumer-groups --reset-offsets)でグループのオフセットを調整します。広範囲の再処理は新しい group.id を用いる方法も一般的です。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Kafka Topic と Partition の基礎: 分散とスケーラビリティの要
CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...
CCDAK 試験ガイド:出題範囲・配点・申込み・対策
Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...
Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点
CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...
Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性
レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...
Kafka の Offset とコミット: ポジション管理と at-least-once の基礎
CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...