Kafka

Kafka Consumer API 基礎: ポーリング・ハートビート・コミットを実務とCCDAK観点で整理

2026-04-19
NicheeLab編集部

Consumer は poll を中心に動作し、グループ維持はハートビート、処理の確定はオフセットコミットで行います。

設定値の相互作用(max.poll.interval.ms, session.timeout.ms, heartbeat.interval.ms, enable.auto.commit など)を正しく理解することが、試験でも実務でも重要です。

ポーリングの基本とフェッチの流れ

KafkaConsumer は poll(Duration) を定期的に呼び出し、ブローカーからフェッチ済みのデータを返します。戻り件数は max.poll.records などで制御され、ブローカー側の fetch.min.bytes や fetch.max.wait.ms とも相互作用します。poll はグループ管理の進行(再バランス時のコールバック実行やハートビートスレッドとの協調)にも必須です。

初回 subscribe 後、Consumer は group.id ごとにグループへ join し、パーティションが割り当てられてからポーリングが本格化します。フェッチはバックグラウンドで先行取得されるため、安定運用では「定期的に短い間隔で poll を呼ぶ」ことが最重要のベストプラクティスです。

  • 必ず短い周期で poll を呼ぶ(タイマーループやブロッキング処理の分離)
  • max.poll.records で1回の処理量を抑制してリバランスやコミット遅延を防ぐ
  • auto.offset.reset は新規購読時の開始位置(latest/earliest)を決める
  • ネットワーク・フェッチはバックグラウンドで行われるが、アプリ側の poll サイクルが遅いと処理全体が滞る

Consumer と Broker のやり取り(ポーリング/ハートビート/コミット)

poll() recordsheartbeatscommit offsetsProducerTopicP0 | P1 | P2BrokerConsumer Group G1C1 / C2Group Coordinator__consumer_offsetsinternal topic

最小のポーリングループ(Java)

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "g1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200);

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("orders"));

try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
    for (ConsumerRecord<String, String> r : records) {
      // アプリケーション処理
      handle(r);
    }
    // バッチ後にコミット(少量・高信頼なら同期)
    consumer.commitSync();
  }
} finally {
  consumer.close();
}

ハートビートとタイムアウトの関係

Consumer はグループ維持のために定期的にハートビートを送信します。Java クライアントではハートビートは内部スレッドで送信されますが、アプリ側の poll が著しく遅れると max.poll.interval.ms を超過し、再バランスの対象になります。session.timeout.ms はコーディネータがハートビート途絶を検出するまでの時間、heartbeat.interval.ms はハートビート送信間隔の目安です。

実務では、長時間処理や外部I/Oでアプリスレッドがブロックし、ハートビートは生きているが max.poll.interval.ms 超過でリバランス、という事象が頻出します。対策としては、バッチを小さくする、pause/resume を使いバックログを制御する、必要に応じて max.poll.interval.ms を引き上げる、などが現実的です。

  • session.timeout.ms は「ハートビートが来ないと判断するまで」の上限。短すぎると不安定、長すぎると障害検知が遅い。
  • heartbeat.interval.ms は通常 session.timeout.ms の1/3程度が目安。
  • max.poll.interval.ms は「次の poll 呼び出しまでの許容最大時間」。超過すると参加権を失いリバランス。
  • GCや外部API待ちなどの長時間ブロックは避け、必要ならワーカースレッドへハンドオフしつつ poll は継続。

ハートビート関連の主要設定(Java)

props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000);       // 障害検知の上限
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);      // 送信間隔の目安
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);     // poll 間隔の上限(業務に合わせて)
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200);            // 1回の処理件数を制御

オフセットコミット戦略の比較

コミットは __consumer_offsets に対するグループ単位の保存操作です。enable.auto.commit=true の場合、poll で返した最後尾までの位置を auto.commit.interval.ms ごとに自動コミットします。手動コミットでは commitSync は確実だが待ち時間があり、commitAsync は高速だが一部失敗を許容します。処理完了後にコミットするのが少なくとも 1 回以上(at-least-once)の基本です。

バッチ末尾で commitAsync を多用しつつ、終了時やパーティション剥奪時(revoke)に commitSync で締める、という併用パターンが現場ではよく使われます。

  • at-most-once: 先にコミット→その後に処理(失われ得る)。
  • at-least-once: 処理完了→コミット(重複はあり得る)。
  • Exactly-once: トランザクションを使う read-process-write の場合に達成可能(sendOffsetsToTransaction)。
戦略遅延/信頼性主な使い所注意点
commitSync()高遅延/高信頼低スループットだが確実性が重要なバッチ一時的なコーディネータ不調でも待つためレイテンシが伸びる
commitAsync()低遅延/ベストエフォート高スループットのストリーミング処理失敗時の再試行はコールバックで設計。順序の逆転に注意
enable.auto.commit=true低遅延/単純簡易な購読・検証用途処理完了前にコミットされやすく at-most-once になりがち

同期/非同期コミットの併用(Rebalance 対応)

AtomicBoolean running = new AtomicBoolean(true);
consumer.subscribe(Collections.singletonList("orders"), new ConsumerRebalanceListener() {
  @Override
  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    // 再バランス直前は同期で確定
    consumer.commitSync();
  }
  @Override
  public void onPartitionsAssigned(Collection<TopicPartition> partitions) { }
});

try {
  while (running.get()) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(300));
    process(records);
    consumer.commitAsync((offsets, ex) -> {
      if (ex != null) log.warn("async commit failed", ex);
    });
  }
} catch (WakeupException e) {
  // shutdown 要請
} finally {
  try { consumer.commitSync(); } finally { consumer.close(); }
}

配信保証と実務的な落としどころ

Consumer 単体でできるのは主に at-most-once か at-least-once のいずれかです。ほとんどの業務では、処理完了後にコミットする at-least-once を選び、重複は下流で冪等化(ユニークキー更新、アップサート、自然冪等な集計)で吸収します。

プロデューサーへ書き戻す read-process-write で重複も避けたい場合は、Idempotent Producer と Transactional Producer を使い、sendOffsetsToTransaction により出力とオフセットを同一トランザクションで確定します。これにより少なくとも Kafka 内では Exactly-once を満たせます。

  • 冪等化の基本: 主キー単位の upsert、自然冪等(例: 最大値更新)、重複検出キャッシュ
  • トランザクションは対象が Kafka 内に閉じる場合に効果的。外部DBと分散トランザクションは別設計が必要
  • レイテンシ要件と可用性に応じてコミット間隔を調整(小さくすると再処理量は減るがコミットオーバーヘッドは増える)

トランザクション連携の断片(参考: Java)

producer.initTransactions();
while (running) {
  ConsumerRecords<String, String> rs = consumer.poll(Duration.ofMillis(300));
  producer.beginTransaction();
  for (ConsumerRecord<String, String> r : rs) {
    ProducerRecord<String, String> out = transform(r);
    producer.send(out);
  }
  Map<TopicPartition, OffsetAndMetadata> offsets = currentOffsets(rs);
  producer.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata("g1"));
  producer.commitTransaction();
}

長時間処理とリバランス制御のパターン

処理が重い場合は、poll を止めずにワーカースレッドへハンドオフし、必要に応じて該当パーティションを pause してバックログを抑制します。完了したレコードのうち、各パーティションで連続して処理済みの最大オフセットまでを段階的にコミットします。

cooperative-sticky アサイナーを用いると再バランス時の停止時間が短くなる場合があります。いずれにせよ、onPartitionsRevoked で同期コミットして進捗を失わない設計が重要です。

  • max.poll.records を抑えて1サイクルの仕事量を限定
  • バックログが閾値を超えたパーティションだけ pause、減ったら resume
  • in-flight の最小連続オフセットまでコミットして再処理量を最小化
  • ワーカースレッドでの例外は検知して再試行やデッドレターへ迂回

pause/resume の例(Java)

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
Set<TopicPartition> busy = calcBusyPartitions(inFlightMap);
if (!busy.isEmpty()) consumer.pause(busy);
processAsync(records, inFlightMap, doneQueue);
updateCommitsFrom(doneQueue, commitMap);
if (backlogLow(busy)) consumer.resume(busy);
consumer.commitAsync();

監視とトラブルシュート要点(CCDAK想定)

重要メトリクスの例: commit-latency(平均/最大)、commit-rate、heartbeat-rate、再バランス発生回数、レコード遅延(コンシューマラグ)。クライアントにより名称は異なりますが、これらをダッシュボード化すると効果的です。poll の呼び出し間隔が長くなっていないか(クライアントによっては last-poll-seconds-ago など)も確認します。

代表的な例外: CommitFailedException(リバランスで割り当て喪失中にコミット)、WakeupException(安全なシャットダウン用)、RebalanceInProgress 等。これらは想定内エラーとしてハンドリングし、ログをノイズ化させないのが実務的です。

  • コンシューマラグは group と topic-partition ごとに監視
  • 再バランス時の onPartitionsRevoked で同期コミットを徹底
  • シャットダウンは wakeup→ループ脱出→finally で commitSync→close
  • ネットワーク/ブローカー障害時はリトライで吸収されるが、待ち時間がSLAを超えないか確認

安全なシャットダウン(Java)

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
  consumer.wakeup();
}));

try {
  while (running.get()) {
    ConsumerRecords<String, String> rs = consumer.poll(Duration.ofMillis(300));
    process(rs);
  }
} catch (WakeupException e) {
  // 正常な停止トリガ
} finally {
  try { consumer.commitSync(); } finally { consumer.close(); }
}

問題で確認

CCDAK

問題 1

高スループットのコンシューマが時々、外部API呼び出しで最大90秒ブロックします。max.poll.interval.ms=300000、session.timeout.ms=45000、heartbeat.interval.ms=3000、enable.auto.commit=false とします。少なくとも1回以上の処理保証を維持しつつ、不要なリバランスを避ける実装として最も適切なのはどれか。

  1. max.poll.records を抑え、処理中パーティションを pause し、処理完了後に同期コミットを行う
  2. enable.auto.commit を true にしてコミット負荷を下げる
  3. poll を5分ごとに1回だけ呼ぶようにし、長時間処理を1回でまとめる
  4. heartbeat.interval.ms を0に近づけてハートビート頻度を極端に上げる

正解: A

pause/resume でバックログを制御しつつ、処理完了後にコミットすれば at-least-once を維持しやすい。max.poll.interval.ms 内で継続的に poll を呼ぶ前提にも合致する。auto.commit を有効化すると処理前コミットが起きやすく at-most-once に寄る。poll 回数を減らすと max.poll.interval.ms 超過でリバランスを誘発しやすい。ハートビート頻度だけ上げてもアプリ側の poll 遅延は解決しない。

よくある質問

enable.auto.commit を本番で使ってもよいですか?

検証や簡易用途では有用ですが、多くの本番系では処理完了後の明示コミット(commitSync/commitAsync)を推奨します。auto.commit は処理前に位置が進む可能性があり、障害時にデータロス(at-most-once)を招きやすいためです。

ハートビートは poll と独立していますか?

Java コンシューマは内部スレッドでハートビート送信を行いますが、アプリが長時間 poll を呼ばないと max.poll.interval.ms を超過してリバランスされます。つまり、ハートビートは継続しても poll の頻度要件は満たす必要があります。

commitSync は遅いのですか?どのように使い分けますか?

commitSync はコーディネータからの応答を待つためレイテンシが増えますが、確実性が高いのが利点です。通常は commitAsync を多用し、重要な区切り(シャットダウン、再バランス直前など)で commitSync を使う併用がバランスの良い選択です。

この記事で学んだ内容を問題で確認しましょう

16,000問以上の問題で実力チェック

無料で問題を解いてみる
この記事の著者

NicheeLab編集部

データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。


関連記事
Kafka

Kafka Topic と Partition の基礎: 分散とスケーラビリティの要

CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...

Kafka

CCDAK 試験ガイド:出題範囲・配点・申込み・対策

Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...

Kafka

Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点

CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...

Kafka

Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性

レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...

Kafka

Kafka の Offset とコミット: ポジション管理と at-least-once の基礎

CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...

Kafkaの記事一覧 (101件)
© 2026 NicheeLab All rights reserved.