Consumer は poll を中心に動作し、グループ維持はハートビート、処理の確定はオフセットコミットで行います。
設定値の相互作用(max.poll.interval.ms, session.timeout.ms, heartbeat.interval.ms, enable.auto.commit など)を正しく理解することが、試験でも実務でも重要です。
KafkaConsumer は poll(Duration) を定期的に呼び出し、ブローカーからフェッチ済みのデータを返します。戻り件数は max.poll.records などで制御され、ブローカー側の fetch.min.bytes や fetch.max.wait.ms とも相互作用します。poll はグループ管理の進行(再バランス時のコールバック実行やハートビートスレッドとの協調)にも必須です。
初回 subscribe 後、Consumer は group.id ごとにグループへ join し、パーティションが割り当てられてからポーリングが本格化します。フェッチはバックグラウンドで先行取得されるため、安定運用では「定期的に短い間隔で poll を呼ぶ」ことが最重要のベストプラクティスです。
Consumer と Broker のやり取り(ポーリング/ハートビート/コミット)
最小のポーリングループ(Java)
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "g1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("orders"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> r : records) {
// アプリケーション処理
handle(r);
}
// バッチ後にコミット(少量・高信頼なら同期)
consumer.commitSync();
}
} finally {
consumer.close();
}Consumer はグループ維持のために定期的にハートビートを送信します。Java クライアントではハートビートは内部スレッドで送信されますが、アプリ側の poll が著しく遅れると max.poll.interval.ms を超過し、再バランスの対象になります。session.timeout.ms はコーディネータがハートビート途絶を検出するまでの時間、heartbeat.interval.ms はハートビート送信間隔の目安です。
実務では、長時間処理や外部I/Oでアプリスレッドがブロックし、ハートビートは生きているが max.poll.interval.ms 超過でリバランス、という事象が頻出します。対策としては、バッチを小さくする、pause/resume を使いバックログを制御する、必要に応じて max.poll.interval.ms を引き上げる、などが現実的です。
ハートビート関連の主要設定(Java)
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000); // 障害検知の上限
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000); // 送信間隔の目安
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // poll 間隔の上限(業務に合わせて)
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200); // 1回の処理件数を制御コミットは __consumer_offsets に対するグループ単位の保存操作です。enable.auto.commit=true の場合、poll で返した最後尾までの位置を auto.commit.interval.ms ごとに自動コミットします。手動コミットでは commitSync は確実だが待ち時間があり、commitAsync は高速だが一部失敗を許容します。処理完了後にコミットするのが少なくとも 1 回以上(at-least-once)の基本です。
バッチ末尾で commitAsync を多用しつつ、終了時やパーティション剥奪時(revoke)に commitSync で締める、という併用パターンが現場ではよく使われます。
| 戦略 | 遅延/信頼性 | 主な使い所 | 注意点 |
|---|---|---|---|
| commitSync() | 高遅延/高信頼 | 低スループットだが確実性が重要なバッチ | 一時的なコーディネータ不調でも待つためレイテンシが伸びる |
| commitAsync() | 低遅延/ベストエフォート | 高スループットのストリーミング処理 | 失敗時の再試行はコールバックで設計。順序の逆転に注意 |
| enable.auto.commit=true | 低遅延/単純 | 簡易な購読・検証用途 | 処理完了前にコミットされやすく at-most-once になりがち |
同期/非同期コミットの併用(Rebalance 対応)
AtomicBoolean running = new AtomicBoolean(true);
consumer.subscribe(Collections.singletonList("orders"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 再バランス直前は同期で確定
consumer.commitSync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) { }
});
try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(300));
process(records);
consumer.commitAsync((offsets, ex) -> {
if (ex != null) log.warn("async commit failed", ex);
});
}
} catch (WakeupException e) {
// shutdown 要請
} finally {
try { consumer.commitSync(); } finally { consumer.close(); }
}Consumer 単体でできるのは主に at-most-once か at-least-once のいずれかです。ほとんどの業務では、処理完了後にコミットする at-least-once を選び、重複は下流で冪等化(ユニークキー更新、アップサート、自然冪等な集計)で吸収します。
プロデューサーへ書き戻す read-process-write で重複も避けたい場合は、Idempotent Producer と Transactional Producer を使い、sendOffsetsToTransaction により出力とオフセットを同一トランザクションで確定します。これにより少なくとも Kafka 内では Exactly-once を満たせます。
トランザクション連携の断片(参考: Java)
producer.initTransactions();
while (running) {
ConsumerRecords<String, String> rs = consumer.poll(Duration.ofMillis(300));
producer.beginTransaction();
for (ConsumerRecord<String, String> r : rs) {
ProducerRecord<String, String> out = transform(r);
producer.send(out);
}
Map<TopicPartition, OffsetAndMetadata> offsets = currentOffsets(rs);
producer.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata("g1"));
producer.commitTransaction();
}処理が重い場合は、poll を止めずにワーカースレッドへハンドオフし、必要に応じて該当パーティションを pause してバックログを抑制します。完了したレコードのうち、各パーティションで連続して処理済みの最大オフセットまでを段階的にコミットします。
cooperative-sticky アサイナーを用いると再バランス時の停止時間が短くなる場合があります。いずれにせよ、onPartitionsRevoked で同期コミットして進捗を失わない設計が重要です。
pause/resume の例(Java)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
Set<TopicPartition> busy = calcBusyPartitions(inFlightMap);
if (!busy.isEmpty()) consumer.pause(busy);
processAsync(records, inFlightMap, doneQueue);
updateCommitsFrom(doneQueue, commitMap);
if (backlogLow(busy)) consumer.resume(busy);
consumer.commitAsync();重要メトリクスの例: commit-latency(平均/最大)、commit-rate、heartbeat-rate、再バランス発生回数、レコード遅延(コンシューマラグ)。クライアントにより名称は異なりますが、これらをダッシュボード化すると効果的です。poll の呼び出し間隔が長くなっていないか(クライアントによっては last-poll-seconds-ago など)も確認します。
代表的な例外: CommitFailedException(リバランスで割り当て喪失中にコミット)、WakeupException(安全なシャットダウン用)、RebalanceInProgress 等。これらは想定内エラーとしてハンドリングし、ログをノイズ化させないのが実務的です。
安全なシャットダウン(Java)
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
consumer.wakeup();
}));
try {
while (running.get()) {
ConsumerRecords<String, String> rs = consumer.poll(Duration.ofMillis(300));
process(rs);
}
} catch (WakeupException e) {
// 正常な停止トリガ
} finally {
try { consumer.commitSync(); } finally { consumer.close(); }
}CCDAK
問題 1
高スループットのコンシューマが時々、外部API呼び出しで最大90秒ブロックします。max.poll.interval.ms=300000、session.timeout.ms=45000、heartbeat.interval.ms=3000、enable.auto.commit=false とします。少なくとも1回以上の処理保証を維持しつつ、不要なリバランスを避ける実装として最も適切なのはどれか。
正解: A
pause/resume でバックログを制御しつつ、処理完了後にコミットすれば at-least-once を維持しやすい。max.poll.interval.ms 内で継続的に poll を呼ぶ前提にも合致する。auto.commit を有効化すると処理前コミットが起きやすく at-most-once に寄る。poll 回数を減らすと max.poll.interval.ms 超過でリバランスを誘発しやすい。ハートビート頻度だけ上げてもアプリ側の poll 遅延は解決しない。
enable.auto.commit を本番で使ってもよいですか?
検証や簡易用途では有用ですが、多くの本番系では処理完了後の明示コミット(commitSync/commitAsync)を推奨します。auto.commit は処理前に位置が進む可能性があり、障害時にデータロス(at-most-once)を招きやすいためです。
ハートビートは poll と独立していますか?
Java コンシューマは内部スレッドでハートビート送信を行いますが、アプリが長時間 poll を呼ばないと max.poll.interval.ms を超過してリバランスされます。つまり、ハートビートは継続しても poll の頻度要件は満たす必要があります。
commitSync は遅いのですか?どのように使い分けますか?
commitSync はコーディネータからの応答を待つためレイテンシが増えますが、確実性が高いのが利点です。通常は commitAsync を多用し、重要な区切り(シャットダウン、再バランス直前など)で commitSync を使う併用がバランスの良い選択です。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Kafka Topic と Partition の基礎: 分散とスケーラビリティの要
CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...
CCDAK 試験ガイド:出題範囲・配点・申込み・対策
Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...
Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点
CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...
Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性
レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...
Kafka の Offset とコミット: ポジション管理と at-least-once の基礎
CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...