Kafka

Kafka の Offset とコミット: ポジション管理と at-least-once の基礎

2026-04-19
NicheeLab編集部

Kafka の「オフセット」は単なる数値ではなく、コンシューマグループの“次に読む位置”を意味します。どのタイミングでコミットするかは、ロスや重複の挙動を左右します。

本稿は CCDAK の出題傾向(オフセットの意味、コミット方法、デリバリセマンティクス、リバランスの影響)に沿って、安定した公式挙動をベースに解説します。

オフセットとポジションの基本

Kafka のオフセットはパーティションごとの連番で、コンシューマグループは各パーティションに対する「次に読むオフセット」を __consumer_offsets(コンパクション対象の内部トピック)に保存します。ここに記録されるのは“最後に処理したレコードの次”である点が重要です。

コンシューマは起動時にグループコーディネータから最新コミットを取得し、その位置から読み直します。コミットが存在しない場合は auto.offset.reset(earliest または latest)に従って開始位置が決まります。フェッチ位置(次に取得予定)とコミット済み位置(再起動時の開始点)は区別して考えます。

  • オフセットはトピックではなく“パーティション”単位で管理される
  • コミットは“次に読む”位置(例: 最後に処理=4 ならコミット=5)
  • 起動・再均衡時はコミット済み位置から再開する
  • __consumer_offsets はログコンパクションにより最新エントリ中心に保持される

オフセットとコミット位置の見取り図

Topic: payments, Partition: 00123456789offsetlast processed = 4committed (next) = 5 (group G1)nextToFetch = 7in-flight(フェッチ済・未コミット)再起動時は committed=5 から再開。in-flight 未コミット中の障害では 5,6 が再処理され得る(at-least-once)

コミット戦略の比較と選び方

Kafka コンシューマのコミットは自動(enable.auto.commit)か手動(commitSync/commitAsync)を選べます。どれを選ぶかで“ロスの可能性”と“重複の可能性”、さらにスループットや待ち時間が変わります。

at-least-once を守る基本は「副作用(外部書き込みなど)が完了してからコミット」です。処理前にコミットすると at-most-once(ロスの可能性あり)になり、コミットを遅らせるほど障害時の重複再処理が増えます。

  • 処理後コミット=ロス防止(重複は許容)
  • 処理前コミット=低レイテンシだがロスの可能性(試験では誤りパターン)
  • バッチごとコミットでオーバーヘッドと再処理量の折り合いを取る
戦略典型設定/APIロス/重複リスクレイテンシ/オーバーヘッド
自動コミットenable.auto.commit=true, auto.commit.interval.ms処理後に障害→ロスの可能性あり。重複は状況次第低い(簡便だが制御が粗い)
手動・同期コミットenable.auto.commit=false, commitSync()ロスは最小化。障害再開で重複あり高め(ACK待ち)
手動・非同期コミットenable.auto.commit=false, commitAsync()非同期で一部コミット喪失の可能性。重複あり低め(スループット重視)
バッチ同期コミットレコード N 件ごと or 間隔ごとに commitSync()重複はバッチ単位。ロスは最小化中程度(折衷)
トランザクション連携プロデューサの sendOffsetsToTransaction + read_committed読取-書込-コミットを一体化。重複・ロスの抑制やや高い(設計が複雑)

at-least-once を実現する実装パターン

基本ループは「poll → 処理(外部へ確定書き込み) → コミット」。バッチ処理のまとまりを意識し、重い処理で max.poll.interval.ms を超過しないように調整します。必要なら pause/resume でバックプレッシャをかけます。

外部システムが冪等(同じキーでの upsert、重複排除)であるほど、at-least-once の重複影響は抑えられます。exactly-once が必要な場合はトランザクション(プロデューサの EOS と read_committed)を検討します。

  • enable.auto.commit=false で明示コミット
  • 処理完了後に commitSync(成功経路)。高スループット時は適宜 commitAsync を併用
  • 例外時は未コミットにして次回再処理(必要に応じて DLQ へ転送)
  • バッチサイズ(max.poll.records)と処理時間のバランスを調整

Java Consumer(手動コミットで at-least-once)

// 必要ライブラリ: org.apache.kafka:kafka-clients
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "etl-payments-g1");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Collections.singletonList("payments"));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        // パーティション順序を保ちつつ処理
        for (TopicPartition tp : records.partitions()) {
            List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
            for (ConsumerRecord<String, String> r : tpRecords) {
                // 1) 外部システムへ確定書き込み(冪等/再実行可能に)
                writeToDbIdempotent(r.key(), r.value());
            }
            // 2) パーティション単位で最後のオフセット+1を同期コミット
            long lastOffset = tpRecords.get(tpRecords.size() - 1).offset();
            Map<TopicPartition, OffsetAndMetadata> commit = new HashMap<>();
            commit.put(tp, new OffsetAndMetadata(lastOffset + 1));
            try {
                consumer.commitSync(commit);
            } catch (CommitFailedException e) {
                // リバランス直後等。次ループで再処理(at-least-once)
                log.warn("commit failed, will retry by reprocessing", e);
            }
        }
    }
}

// 参考: 過負荷時は consumer.pause(assignments) / resume を併用して max.poll.interval.ms 超過を回避

リバランス時の落とし穴と対策

メンバーの参加/離脱やサブスクライブトピックの変化でリバランスが発生します。パーティションが revoke される直前までに処理済みオフセットをコミットしていないと、引き継いだコンシューマが過去分を重複処理します(at-least-once 的には許容だが、重複が増える)。

ConsumerRebalanceListener を使い、onPartitionsRevoked で即時コミットするのが定石です。Cooperative Sticky アサイナーでも、段階的な revoke ごとに同様の配慮が必要です。poll の空転や処理時間超過で max.poll.interval.ms を越えると強制リバランスになりがちなので、pause/resume やバッチ粒度調整で回避します。

  • onPartitionsRevoked で最後まで処理済みの位置をコミット
  • 処理が長い場合は pause してから外部書き込み→resume
  • session.timeout.ms/heartbeat.interval.ms はグループ安定性に影響
  • コミット頻度が高すぎるとオーバーヘッド増、低すぎると重複再処理が増える

オフセットのリセットと再処理の基本

コミットが存在しない場合の開始位置は auto.offset.reset=earliest/latest で決まります。OutOfRange になった場合(保持期間切れなど)は明示的に seekToBeginning/seekToEnd で位置決めするか、コンシューマグループのオフセットをツールでリセットします。

再処理が必要な場合は、割り当て後に特定パーティションへ seek して過去オフセットから読み直すか、別の group.id で“新規コンシューマグループとして”読み込みます。いずれも at-least-once では重複が発生する前提で外部書き込みを冪等にしておくのが安全です。

  • 一時的な巻き戻しは assign/seek(割り当て後)で制御
  • 広範囲のやり直しは kafka-consumer-groups の --reset-offsets を検討
  • auto.offset.reset は“コミットが無い時だけ”に適用される
  • 再処理の影響を局所化するため、対象パーティションを限定する

CCDAK 試験対策の要点と誤解しやすい点

コミットされるのは“次に読む位置”であり、最後に処理したレコードのオフセットそのものではありません。これを前提に、障害時の再開位置と重複の発生パターンを説明できる必要があります。

複数パーティションのオフセットを一括コミットしても、外部システムへの書き込みと原子的に結びつくわけではありません。外部副作用とコミットの間に障害が挟まれば重複は起こり得ます。exactly-once を目指すなら、トランザクション(sendOffsetsToTransaction と read_committed)を組み合わせる設計を選択します。

  • at-most-once=コミットが処理より先でロスの可能性あり
  • at-least-once=処理後にコミット、重複は許容
  • exactly-once=EOS(トランザクション)と冪等を前提に設計
  • max.poll.interval.ms 超過はリバランス→未コミット分の重複増
  • __consumer_offsets は内部トピック(直接更新しない)

問題で確認

CCDAK

問題 1

外部データベースへ確定書き込みを行う Kafka コンシューマで、at-least-once を維持したい。最も適切な手法はどれか?

  1. enable.auto.commit=true にして auto.commit.interval.ms を短くする
  2. enable.auto.commit=false とし、処理が完了した後に commitSync() する
  3. 処理前に commitAsync() してから書き込み、レイテンシを最小化する
  4. max.poll.records を 1 にすれば重複は発生しない

正解: B

処理(外部への確定書き込み)完了後に同期コミットするのが at-least-once の基本。B が正解。A と C は処理前コミットでロスの可能性があり、D は重複を根本的には防げない。

よくある質問

コミットされる値は“最後に処理したオフセット”ですか?それとも“次に読むオフセット”ですか?

“次に読むオフセット”です。例えば最後に処理したのが 4 なら、コミットするのは 5。再起動時は 5 から再開し、0〜4 は再処理されません。

重複を完全に消したいのですが、どうすれば良いですか?

コンシューマ側の手動コミットだけでは重複は起こり得ます。重複排除には、外部システムの冪等化(同一キーの upsert、一意制約)か、Kafka のトランザクション(プロデューサの EOS と sendOffsetsToTransaction、コンシューマは read_committed)を組み合わせる設計を検討します。

過去データを再処理したい場合、どのようにオフセットを戻せますか?

割り当て後に特定パーティションへ seek して戻す、あるいは管理ツール(kafka-consumer-groups --reset-offsets)でグループのオフセットを調整します。広範囲の再処理は新しい group.id を用いる方法も一般的です。

この記事で学んだ内容を問題で確認しましょう

16,000問以上の問題で実力チェック

無料で問題を解いてみる
この記事の著者

NicheeLab編集部

データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。


関連記事
Kafka

Kafka Topic と Partition の基礎: 分散とスケーラビリティの要

CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...

Kafka

CCDAK 試験ガイド:出題範囲・配点・申込み・対策

Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...

Kafka

Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点

CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...

Kafka

Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性

レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...

Kafka

Kafka の Offset とコミット: ポジション管理と at-least-once の基礎

CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...

Kafkaの記事一覧 (101件)
© 2026 NicheeLab All rights reserved.