Consumer Group は Kafka のスケールアウトと耐障害性の中核です。並列処理の上限はパーティションで決まり、グループ調整(再バランス)が可用性とレイテンシを左右します。
本稿は CCDAK の出題観点を踏まえつつ、運用で効く設定と挙動の境界条件を、公式ドキュメント準拠の安定した概念に絞って整理します。
Consumer Group は同一 group.id の複数コンシューマで構成され、同一トピックの各パーティションはグループ内のいずれか1コンシューマにのみ割り当てられます。したがって、最大の並列度は「グループ全体で割り当てられるパーティション数」によって上限が決まります。
順序保証はパーティション単位です。グループで水平スケールしても、単一パーティション内の順序は保持されますが、パーティションをまたぐ全体順序は保証されません。必要な並列度は事前にパーティション設計で確保します。コンシューマはスレッドセーフではないため、基本は1スレッド=1インスタンスで運用し、内部での並列化が必要ならワーカーキュー方式などでメッセージ処理を分離します。
Consumer Group とパーティション割り当て(概念図)
Consumer Group はブローカー上の Group Coordinator により管理され、メンバーは heartbeats を送信して存続を示します。セッション切れや構成変更が起きると再バランスが走り、パーティション割当が見直されます。再バランス中は一時的にフェッチ/処理が停止しうるため、頻度と時間の最適化がスループットとレイテンシの鍵です。
主な発火要因は、メンバーの参加/離脱、サブスクライブトピック集合の変更、トピックのパーティション数変更、セッションタイムアウト/最大ポール間隔超過などです。設定は session.timeout.ms、heartbeat.interval.ms、max.poll.interval.ms の整合と、割当戦略の選択が中心です。
| 観点 | Eager(従来)再バランス | Cooperative(増分)再バランス | 試験・実務での要点 |
|---|---|---|---|
| 停止の度合い | 全メンバーが一旦すべてのパーティションを手放すため処理中断が大きい | 影響するパーティションのみ段階的に移動し中断を最小化 | 低遅延・安定運用では Cooperative が推奨されやすい |
| ムーブ量 | 割当の再計算後に広範な再配置が起こりやすい | 必要最小限の再配置に留まる | ローリングリスタート時の影響差が顕著 |
| リスナー対応 | onPartitionsRevoked で全体コミット/クリーンアップが必要になりがち | 小刻みな revoke/assign に合わせた最小限コミットが可能 | リバランスリスナー実装で差が出る |
| 適用・利点 | 実装が単純。小規模/短命グループでは許容可能 | 安定性・継続処理が重視される常時稼働に有効 | 明示設定が安全(デフォルトは実装/バージョン依存) |
割当戦略は partition.assignment.strategy にリスト形式で指定します。RangeAssignor、RoundRobinAssignor、StickyAssignor、CooperativeStickyAssignor などが代表例です。実運用では CooperativeStickyAssignor による増分再バランスが広く推奨されますが、デフォルトはクライアントやディストリビューションにより異なるため、明示設定が安全です(バージョン差異を避ける目的)。
静的メンバーシップ(group.instance.id の設定)は、同一メンバーが再起動しても同一 ID と見なされるため、ローリングリスタート時の再バランスを大幅に抑制できます。タイムアウト群は相互関係を理解して設定し、max.poll.interval.ms 超過による不要な離脱を防ぎます。
コンシューマのオフセットはデフォルトで内部トピック __consumer_offsets にコミットされます。enable.auto.commit を使うと定期的に自動コミットされますが、処理の成否と無関係に進むため、少なくとも重要処理では手動コミット(commitSync/commitAsync)を検討します。
配信セマンティクスはコミット順序に依存します。処理の後でコミットすれば少なくとも一度(at-least-once)。処理の前にコミットすれば高々一度(at-most-once)ですがロスの可能性。厳密な厳密一度(exactly-once)は Kafka のトランザクションと組み合わせたプロデューサ/コンシューマ連携が必要で、設計単位が広がります。
並列度を上げるには、原則としてパーティション数を増やすか、グループ内のコンシューマ数を増やします。後者は前者の上限に縛られるため、計画段階で所要のパーティション数を見積もることが重要です。過剰なパーティションはメタデータ/レプリケーション負荷を増やすため、根拠あるスケール設計が必要です。
フェッチ/処理の最適化として、max.poll.records によるバッチ処理、fetch.min.bytes と fetch.max.wait.ms による帯域効率化、pause/resume による一時的なバックプレッシャ制御などが有効です。処理スレッドは内部キューで水平分散し、オフセットコミットは処理完了基準で管理します。
運用では、再バランス指標(回数・所要時間)、lag、フェッチレイテンシ、ハートビート遅延を可視化します。障害対応やデプロイ時は、静的メンバーシップと Cooperative 戦略を併用し、リリース単位の影響を最小化します。
ローリングリスタート時は 1 台ずつ停止・復帰し、メンバーの復帰を待ってから次に進みます。静的メンバーが維持されていれば、割当の再計算は局所化され、中断は最小限で済みます。CLI の kafka-consumer-groups --describe や --reset-offsets は状態把握・修正に有用です。
Cooperative + 静的メンバーの Java Consumer(要点抜粋)
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "orders-consumer-g");
// Cooperative Sticky を明示(順序は優先度)
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
// 静的メンバーシップ(各インスタンスで一意な ID を設定)
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, System.getenv("POD_NAME"));
// タイムアウトは処理時間とネットワーク特性に合わせて整合
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "600000");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
CountDownLatch latch = new CountDownLatch(1);
ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 処理済みのオフセットを確実にコミット(at-least-once)
try { consumer.commitSync(); } catch (Exception e) { /* ログ */ }
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 必要に応じてシークや初期化
}
};
consumer.subscribe(Arrays.asList("orders"), listener);
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> r : records) {
process(r); // アプリ固有処理(例外は握りつぶさず、DLQ 等へ)
}
consumer.commitAsync(); // 通常は非同期、終了/リボーク前は同期
}
} finally {
try { consumer.commitSync(); } catch (Exception ignore) {}
consumer.close();
}CCDAK
問題 1
トピック A は 6 パーティション。Consumer Group G に 8 インスタンスを起動し、CooperativeStickyAssignor と静的メンバーシップを設定した。最大の同時並列処理数と、ローリングリスタート時の挙動として最も正しい組み合わせはどれか。
正解: A
グループの最大並列度は割当可能なパーティション数(6)が上限。余剰の2インスタンスは未割当になる。Cooperative は段階的再配置で中断を最小化し、静的メンバーはローリング時の離脱/再参加による全面的な再割当を抑えるが、再バランス自体が完全に無くなるわけではない。
コンシューマ数を増やしてもスループットが伸びないのはなぜ?
合計パーティション数が並列度の上限だからです。未割当のコンシューマが発生します。まずは必要なパーティション数の設計見直し、もしくは処理のボトルネック(外部 DB や下流 API)の改善を検証してください。
順序保証を保ったまま並列度を上げる方法は?
キー設計で局所性を高め、パーティション数を増やします。順序保証はパーティション単位でのみ提供されるため、1 キー=1 パーティション内に収まるようハッシュ/キー設計を見直します。
再バランスによる停止時間を減らすには?
CooperativeStickyAssignor を明示、group.instance.id による静的メンバー化、max.poll.interval.ms を処理時間に見合う値へ、適切な session/heartbeat 設定、そして onPartitionsRevoked/Assigned を実装して最小限のコミット・再初期化に留めます。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Kafka Topic と Partition の基礎: 分散とスケーラビリティの要
CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...
CCDAK 試験ガイド:出題範囲・配点・申込み・対策
Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...
Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点
CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...
Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性
レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...
Kafka の Offset とコミット: ポジション管理と at-least-once の基礎
CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...