Kafkaはメッセージングの枠を超え、ストリーム処理プラットフォームとして確立しています。特にステートフル処理は、集約・Join・ステートストアを正しく理解してはじめて安定稼働します。
本稿ではKafka Streamsを軸に、公式ドキュメントの動作仕様に基づいて用語・アーキテクチャ・落とし穴を整理します。CCDAK(Confluent Certified Developer for Apache Kafka)の出題傾向にも触れながら、実務で迷わない判断材料を提示します。
Kafkaのステートフル処理は、演算に必要な状態をタスクごとにローカルのステートストアに保持し、変更履歴をKafkaのチェンジログトピックへ書き出すことで耐障害性を確保します。復旧時はチェンジログから状態をリプレイしてストアを再構築します。
代表的なステートフル演算は、集約(count/reduce/aggregate)、ウィンドウ集約、Join(KStream-KStream、KStream-KTable、KTable-KTable)です。ウィンドウ処理では、時間境界(ウィンドウサイズ)と遅延許容量(grace)が結果の確定タイミングを決めます。Joinではコーパーティション(同じキー・同じパーティション数)と必要に応じたリパーティションが成立条件です。
| ストア種別 | 主な用途 | 保持とキー構造 |
|---|---|---|
| KeyValueStore | KTableの現在値、ストリームの集約結果(非ウィンドウ) | キー→最新値を保持。チェンジログはコンパクション主体 |
| WindowStore | ウィンドウ集約、ストリーム-ストリームJoinのウィンドウバッファ | キー+時間境界でセグメント化。保持期間はウィンドウサイズ+grace+α |
| SessionStore | セッションウィンドウ(ギャップ閉塞)の集約 | キー+セッション境界(可変長)。マージにより境界が変動 |
Kafka Streamsにおけるステートフル処理の流れ
非ウィンドウ集約はKeyValueStoreに最新値を保持し、チェンジログはコンパクションで効率化されます。ウィンドウ集約はWindowStoreに時間セグメントごとの部分集計を保持し、保持期間はウィンドウサイズとgraceを含めて十分に設定します。
graceは遅延イベントをどこまで取り込むかを定義します。grace経過後のイベントはウィンドウ更新の対象外となるため、ビジネス要件(到着遅延の分布)に合わせて設定が必要です。
Kafka Streamsでのウィンドウ集約とJoinの例(Java)
StreamsBuilder builder = new StreamsBuilder();
Serde<String> stringSerde = Serdes.String();
Serde<Order> orderSerde = Serdes.serdeFrom(new OrderSerializer(), new OrderDeserializer());
Serde<Customer> customerSerde = Serdes.serdeFrom(new CustomerSerializer(), new CustomerDeserializer());
KStream<String, Order> orders = builder.stream("orders", Consumed.with(stringSerde, orderSerde));
KStream<String, Payment> payments = builder.stream("payments", Consumed.with(stringSerde, Serdes.serdeFrom(new PaymentSerializer(), new PaymentDeserializer())));
// 5分タンブリング + 1分grace
TimeWindows windows = TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1));
KTable<Windowed<String>, Long> orderCount5m = orders
.groupByKey()
.windowedBy(windows)
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("order-count-5m")
.withKeySerde(stringSerde)
.withValueSerde(Serdes.Long())) ;
// KTableは現在値のスナップショットを保持
KTable<String, Customer> customers = builder.table(
"customers",
Consumed.with(stringSerde, customerSerde),
Materialized.<String, Customer, KeyValueStore<Bytes, byte[]>>as("customers-store")
.withKeySerde(stringSerde).withValueSerde(customerSerde)
);
// KStream-KTable join(同一キー)。コーパーティションが必要。必要に応じてリパーティションが自動作成される
KStream<String, EnrichedOrder> enriched = orders.join(
customers,
(order, customer) -> EnrichedOrder.of(order, customer),
Joined.with(stringSerde, orderSerde, customerSerde)
);
// KStream-KStreamの時間制約付きJoin
KStream<String, JoinedEvent> joined = orders.join(
payments,
(o, p) -> JoinedEvent.of(o, p),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
StreamJoined.with(stringSerde, orderSerde, Serdes.serdeFrom(new PaymentSerializer(), new PaymentDeserializer()))
);
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();KStream-KStream Joinは時間制約が必須で、両側のレコードがウィンドウ内で出会う必要があります。演算内部では各側の一時バッファとしてWindowStoreが使われます。遅延イベントの扱いはgraceで制御されます。
KStream-KTable Joinは、ストリームのイベントを到着時点のKTableスナップショットで拡張します。KTable側はKeyValueStoreで現在値のみ参照され、ウィンドウは不要です。KTable-KTable Joinは両側の更新を伝播して結果テーブルを更新します。外部キーJoinなどの拡張はバージョン依存のため、実装前に公式ドキュメントで対応状況を確認してください。
Kafka StreamsはデフォルトでRocksDBをローカルストアとして用います。書き込みはコミット境界でバッチ適用され、更新はチェンジログトピックへ非同期複製されます。タスク再配置時やノード障害時は、チェンジログをリプレイしてRocksDBを再構築します。
高可用性のために、同一アプリケーショングループ内でスタンバイタスクを有効化できます。スタンバイはチェンジログを平行追従しており、フェイルオーバー時のウォームスタートに寄与します。保持やコンパクション設定は、テーブルかウィンドウかによって最適値が異なります。
グルーピングやJoinの前段でキーが変わる操作(map/flatMap/selectKeyなど)を行うと、既存のパーティション配置は不整合になります。この場合、Kafka Streamsは中間トピックを挟んで自動リパーティションします。試験では、コーパーティション成立の必須条件と、自動リパーティションが生じる代表ケースがよく問われます。
実務では、入口でキーを明示的に整える、トピック作成時にパーティション数を合わせる、必要な箇所だけselectKeyを行うといった設計で中間トピックの氾濫を防ぎます。
Exactly-once処理はTransactional producerとprocessing.guarantee=exactly_once_v2の組み合わせで実現されます。ステート更新と出力の原子的コミットにより二重計上を防ぎます。コミット間隔は遅延と書き込み負荷のトレードオフです。
ウィンドウ結果の確定タイミングは、ストリーム時間の前進とgrace経過に依存します。結果の最終化・集計確定を外部に出す場合、ksqlDBやアプリ側でsuppression相当の制御を設けることがあります。試験では、graceの意味、遅延イベントの扱い、EOSの前提条件が頻出です。
CCDAK
問題 1
KStreamとKTableのJoinを行うアプリで、orders(KStream)のキーをselectKeyで変更した直後にcustomers(KTable)と結合したい。正しい設計として最も適切なのはどれか。
正解: A
KStream-KTable Joinは結合キーの一致とコーパーティションが前提。selectKeyでキーを変えた場合、必要に応じて中間トピックによるリパーティションが挿入され、そのトピックのキーとcustomersのキーが一致していれば結合可能。KStream-KTableにJoinWindowsは不要(時間制約は用いない)。ブロードキャストは行われない。
KTableのチェンジログはなぜコンパクションが推奨されるのか?
KTableはキーの現在値を表すため、古い更新は意味を持ちません。コンパクションにより最新値だけを残せば、復旧のリプレイ量を削減し、ストレージ消費も抑えられます。
RocksDBではなくインメモリストアを使うと何が変わる?
低レイテンシを得られますが、容量制限により状態を大きく持てません。プロセス再起動時はチェンジログからの再構築が必要なのは同じで、ウォームアップ時間やGCの影響を考慮します。
ウィンドウのgraceを0にするとどうなる?
遅延イベントは受け付けず、ウィンドウ境界を超えた時点で結果がほぼ確定します。遅延の多いデータ源では結果欠損のリスクが高まるため、実測に基づき適切なgraceを設定してください。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Kafka Topic と Partition の基礎: 分散とスケーラビリティの要
CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...
CCDAK 試験ガイド:出題範囲・配点・申込み・対策
Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...
Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点
CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...
Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性
レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...
Kafka の Offset とコミット: ポジション管理と at-least-once の基礎
CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...