Kafka Streams の状態管理は、集計・結合・ウィンドウなどの有状態処理の肝です。デフォルトで RocksDB をローカルに埋め込み、障害時は changelog トピックから復旧します。
本稿では、RocksDB の基本、再起動時の復旧フロー、スタンバイレプリカ、運用チューニング、試験(CCDAK)で問われやすいポイントを、実務に即して整理します。
Kafka Streams は、演算の中間結果を State Store に保存します。マテリアライズされた State Store は、同名の changelog トピックで冗長化され、障害時にリプレイして復旧できます。
デフォルトの永続ストアは RocksDB(プロセス内に組み込み)です。オンヒープのメモリストアも選べますが、プロセス終了で内容が消えるため、再起動後は changelog の全リプレイが必要になります。RocksDB はローカルディスクに永続化されるため、チェックポイントが一致していれば増分のみを適用して素早く復旧できます。
| ストア種別 | 永続性 | 再起動時の復旧 | パフォーマンス/I-O |
|---|---|---|---|
| In-memory(オンヒープ) | なし(プロセス終了で消える) | changelog を先頭から全リプレイ | 低レイテンシ、GC 影響を受ける |
| RocksDB(デフォルト) | あり(ローカルディスク) | チェックポイント以降の差分のみ適用が可能 | ディスク I/O、キャッシュで低レイテンシを実現 |
| RocksDB + スタンバイ | あり(複数ノードで温める) | フェイルオーバ時にスタンバイを昇格し差分を最小化 | 追加のネットワーク/ストレージ負荷 |
プロセスが再起動すると、各タスクはローカル state.dir 配下の RocksDB とチェックポイントファイル(changelog の適用済み位置)を確認します。ローカル状態が存在し、チェックポイントとタスク割当が一致すれば、changelog の差分だけを読み込んで整合性を回復し、処理を再開します。
ローカル状態が失われた場合やチェックポイントが不整合な場合は、該当ストアの changelog を先頭からリプレイして完全復旧します。Exactly-once v2 を有効にしていると、トランザクション境界で復旧が打ち切られ、ストアと出力トピックの一貫性が保たれます。協調的リバランス(ステートフルタスクのスティッキー配置)が有効なクラスターでは、同一ホストにタスクを残しやすいためローカル状態の再利用率が上がります。
RocksDB ストアの復旧パス
各 State Store は key-value の更新を changelog トピックに反映します。changelog はログ圧縮により、最新のキー更新が保持され、古いバージョンは削除されます。削除操作はトンブストーン(キーのみ、値は null)として書き込まれ、復旧時にも削除が正しく反映されます。
正確な復旧のためには、ストアと changelog で同一のシリアライザ/デシリアライザ(Serde)を使うこと、内部トピックのレプリケーション係数を十分に確保することが重要です。RocksDB 側のフラッシュタイミングやキャッシュは復旧の論理には影響しません。復旧は常に changelog の順序に従って適用されます。
num.standby.replicas を設定すると、各パーティションの State Store を別ノードでバックグラウンドに保温できます。フェイルオーバ時はスタンバイが Active に昇格し、差分のみの適用で復旧が完了します。代償としてネットワークとストレージのコストがかかります。
協調的リバランスではステートフルタスクのスティッキー配置により、スケールイン/アウト時も同一ホストにタスクが留まりやすく、ローカル state.dir の再利用によって復旧コストが抑えられます。コンテナ運用では state.dir を永続ボリュームに載せることで同様の効果が得られます。
state.dir は十分な I/O と容量を持つローカル永続ディスクに置き、コンテナならホストボリュームをマウントします。ファイルディスクリプタ上限や I/O スケジューラ設定も重要です。
RocksDB は RocksDBConfigSetter でライトバッファやブロックキャッシュを調整できます。キャッシュ(cache.max.bytes.buffering)とコミット間隔(commit.interval.ms)は、レイテンシと changelog 書き込み量のトレードオフです。復旧スループットは changelog 消費のネットワークとディスク I/Oが支配するため、max.partition.fetch.bytes などのコンシューマ設定も見直します。
Exactly-once v2(processing.guarantee=exactly_once_v2)を推奨。内部トピック(changelog/repartition)のレプリケーション係数を 3 に揃え、障害時のデータ損失を防ぎます。
設定例(Java Properties + Streams DSL)
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "payments-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
// 永続ディスク配下に配置
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams/state");
// 復旧短縮(コストとトレードオフ)
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
// Exactly-once v2
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// キャッシュとコミット間隔
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 256 * 1024 * 1024L); // 256MB
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 500);
// 内部トピックのレプリケーション(本番は 3 推奨)
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
// 復旧スループットに影響
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4 * 1024 * 1024); // 4MB
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Payment> payments = builder.stream("payments");
KTable<String, Long> counts = payments
.groupByKey()
.count(Materialized.as("payments-counts")); // デフォルトで RocksDB + changelog
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();ローカル state.dir を誤って消すと、完全復旧(changelog 全リプレイ)になり時間を要します。時間短縮にはスタンバイやボリューム永続化、協調的リバランスを組み合わせます。
チェックポイントが壊れている場合は、ログに復旧フォールバックが記録され、先頭からのリプレイに切り替わります。整合性が取れないと判断したら、該当タスクのローカルディレクトリを削除して再生成させるのが確実です。
トポロジ変更でストア名や Serde を変えると、既存の changelog との互換性が崩れ復旧に失敗します。安全に初期化する必要があれば、アプリケーションリセットツールで入力オフセットと内部トピックをクリーンにしてから再デプロイします。
CCDAK
問題 1
Kafka Streams アプリがクラッシュ後、同一ホストで直ちに再起動された。RocksDB のローカル状態とチェックポイントが健全で、processing.guarantee は exactly_once_v2、num.standby.replicas=0。再起動時の復旧について最も正しい説明はどれか。
正解: A
RocksDB のローカル状態とチェックポイントが健全なら、Streams は changelog の差分だけを適用して増分復旧し、処理を再開する。復旧の真実は changelog であり、入力トピックのオフセットは用いない。スタンバイがなくても復旧は可能だが、ローカル状態がない場合は先頭からの完全リプレイが必要になる。
In-memory ストアに切り替えると復旧はどう変わる?
プロセス終了で状態が消えるため、再起動時は常に changelog を先頭から全リプレイする。ディスク I/O は減るが、復旧時間が長くなりやすい。堅牢性重視の本番では RocksDB を推奨。
チェックポイントが破損・不整合な場合は?
Streams は安全側に倒して先頭からの完全復旧にフォールバックする。繰り返す場合は該当タスクの state.dir 配下を削除して再生成し、ディスク健全性と Serde 互換性を確認する。
コンテナ運用で復旧を速くするコツは?
state.dir を永続ボリュームに置く、協調的リバランスでタスクのスティッキー性を確保、num.standby.replicas を 1 に、内部トピックのレプリケーションを 3 に、ブートストラップ時の I/O 帯域を確保する。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Kafka Topic と Partition の基礎: 分散とスケーラビリティの要
CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...
CCDAK 試験ガイド:出題範囲・配点・申込み・対策
Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...
Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点
CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...
Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性
レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...
Kafka の Offset とコミット: ポジション管理と at-least-once の基礎
CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...