KStream はイベントの連続(追記ログ)、KTable はキーごとの最新状態(チェンジログ)を表します。両者の違いを理解すると、ジョインや集約、トピック設計で迷いません。
CCDAK では、KStream/KTable の選択、ジョインの時間セマンティクス、トピックのコンパクション、トゥームストーン(null 値)などが頻出論点です。
KStream は、トピックに到着する各レコードをそのまま時間順に扱う抽象です。各レコードは独立しており、ストリーム処理(map、filter、branch、windowed aggregation など)に向きます。KStream は「事実のイベント列」を表現します。
KTable は、キーごとの最新の値を持つテーブルの抽象で、チェンジログ(更新の流れ)から最新像をマテリアライズします。KTable の入力にはログコンパクション対象のトピックが適しています。null 値はトゥームストーンとして扱われ、そのキーの行を削除します。
GlobalKTable は全パーティションを各タスクに複製する読み取り専用のテーブルで、KStream とのキー以外での結合に使えます(全ノードに全データを展開するためサイズに注意)。
| 抽象 | 表すもの | 主な用途 |
|---|---|---|
| KStream | イベントの連続(履歴) | フィルタ、マップ、ウィンドウ集約、ストリーム-ストリーム結合 |
| KTable | キーの最新状態(現在形) | アップサート、集約結果の保持、ストリーム-テーブル結合の参照側 |
| GlobalKTable | 全ノードに複製された読み取りテーブル | キー以外での参照結合(KStream-GlobalKTable) |
KStream と KTable の概念的な位置づけ
KStream/KTable の基本的な組み立て(Java, Kafka Streams DSL)
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-kstream-ktable");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> events = builder.stream("events");
// KTable のソースは通常 compacted topic を推奨
KTable<String, String> profiles = builder.table("user-profiles");
KStream<String, String> enriched = events.leftJoin(
profiles,
(eventVal, profileVal) -> profileVal == null ? eventVal : eventVal + "|" + profileVal
);
enriched.to("events-enriched");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();KStream/KTable の正確な振る舞いはキーとパーティション整合性に依存します。ジョインや集約では、同一キーが同一パーティションに存在することが前提です。selectKey や groupBy を行うと、内部リパーティション・トピックが自動作成されます。
KTable を正しく最新像として保つには、入力トピックの cleanup.policy=compact を設定し、キーは安定(不変)であることが望まれます。値が null のレコードはトゥームストーンとなり、そのキーが削除されます。
Serdes(シリアライザ/デシリアライザ)はデータの互換性に直結します。デフォルト Serdes を設定しつつ、異なる型を扱う演算では明示的に Serdes を渡してください。
| 設計項目 | KStream の考え方 | KTable の考え方 | 試験の着眼点 |
|---|---|---|---|
| キーとパーティション | イベント送出時に正しいキー付与。必要なら selectKey 後に再パーティション | 同一キーの最新値を同一パーティションで維持 | ジョインはキー整合が前提。GlobalKTable は例外的に全体複製 |
| トピック方針 | 通常は delete/時間保持。履歴を蓄積 | compact または compact,delete で最新像+一定履歴 | KTable で compact を理解。null=delete の意味 |
| Serdes | デフォルト Serdes と演算ごとの Serdes 指定 | null 値対応の Serde 注意(Avro/JSON Schema での null 許容) | Serdes の不一致は試験の落とし穴 |
パーティション整合とタスク配置
Topic A (3 partitions) Topic B (3 partitions)
P0 -----> Task 0 <----- P0 P0 -----> Task 0 <----- P0
P1 -----> Task 1 <----- P1 P1 -----> Task 1 <----- P1
P2 -----> Task 2 <----- P2 P2 -----> Task 2 <----- P2
同じキーは同じハッシュ -> 同じパーティション -> 同じ Task でジョイン可能トピック作成(コンパクション)とキー整形の例
AdminClient admin = AdminClient.create(Map.of("bootstrap.servers", "localhost:9092"));
NewTopic compacted = new NewTopic("user-profiles", 3, (short) 3)
.configs(Map.of(
"cleanup.policy", "compact",
"min.cleanable.dirty.ratio", "0.1",
"segment.ms", "604800000" // 例: 7日
));
admin.createTopics(List.of(compacted));
StreamsBuilder b = new StreamsBuilder();
KStream<String, String> raw = b.stream("pageviews-raw");
KStream<String, String> keyed = raw.selectKey((k, v) -> extractUserId(v));
// キー変更 -> 内部リパーティション(throughで明示も可能)
KStream<String, String> repartitioned = keyed.through("pageviews-by-user");KStream はレコード単位の変換(map、filter、branch)に加え、ウィンドウ集約が可能です。ウィンドウ集約の結果は KTable(ウィンドウ付きの最新値)として表現されます。時間セマンティクスはイベント時刻を基本とし、遅延到着は許容期間(grace)内で取り込めます。
KTable は非ウィンドウのキー集約に向き、各更新で下流へ再計算の差分を流します(update 設定)。KTable 同士の結合・集約は、テーブルの変更イベントをトリガに再評価されます。
| 操作 | KStream での可否 | KTable での可否 | ウィンドウ/状態 |
|---|---|---|---|
| map/filter/branch | 可 | 不可(テーブルは更新駆動) | 非状態 |
| groupByKey + windowed aggregation | 可(結果は KTable) | 不可 | ウィンドウ+状態ストア |
| groupBy/aggregate(非ウィンドウ) | 可(結果は KTable) | 可(再計算で最新像) | 状態ストア |
| suppress(確定後に出力) | 可(集約後) | 可(集約後) | バッファ+境界確定 |
ウィンドウ時間線図(Tumbling の例)
time --->
|----[W1]----|----[W2]----|----[W3]----|
イベントは所属ウィンドウに集約され、ウィンドウ境界後に確定(grace を考慮)イベント時刻ベースのウィンドウ集約(Tumbling)
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> events = builder.stream("clicks", Consumed.with(Serdes.String(), Serdes.Long()));
TimeWindows windows = TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)); // 例: grace なし
KTable<Windowed<String>, Long> counts = events
.groupByKey()
.windowedBy(windows)
.count();
counts.toStream().to("clicks-per-minute", Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class), Serdes.Long()));KStream-KTable は参照(ルックアップ)型の結合で、ストリーム側レコードの処理時点における KTable の最新値と結合します。時間ウィンドウは不要です。KTable の後続更新は過去の結合結果を遡及しません。
KStream-KStream はウィンドウ必須の結合で、同一キーのレコードがウィンドウ内に現れたときに結合します。左右どちらのストリームにも到着順序や遅延の影響を受けます。
KTable-KTable はテーブル同士の結合で、どちらかのテーブルでキーの値が更新されると結合結果が再計算され、差分が下流に流れます。
| 結合タイプ | 時間要件 | 出力の性質 |
|---|---|---|
| KStream-KTable | ウィンドウ不要(処理時点の参照) | KStream(結合結果レコードが流れる) |
| KStream-KStream | ウィンドウ必須(イベント時刻) | KStream(ウィンドウ内マッチで出力) |
| KTable-KTable | ウィンドウ不要(更新駆動) | KTable(差分更新が伝播) |
3 種のジョインの直感図
KStream --lookup--> KTable => 参照時点の最新値
KStream <==window==> KStream => 時間窓内で結合
KTable <==update==> KTable => 更新のたびに再計算KStream-KTable と KStream-KStream の例
StreamsBuilder b = new StreamsBuilder();
KStream<String, String> pageviews = b.stream("pageviews");
KTable<String, String> users = b.table("user-profiles");
// 参照型(ウィンドウ不要)
KStream<String, String> enriched = pageviews.leftJoin(users, (pv, u) -> u == null ? pv : pv + "|user=" + u);
enriched.to("pageviews-enriched");
// ストリーム同士の結合(ウィンドウ必須)
KStream<String, String> clicks = b.stream("clicks");
JoinWindows jw = JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(30));
KStream<String, String> joined = pageviews.join(
clicks,
(pv, cl) -> pv + "&" + cl,
jw
);
joined.to("pv-clicks-joined");Kafka Streams は状態ストア(既定では RocksDB ローカル)に集約・結合などの状態を保持し、変更は内部チェンジログ・トピックに複製されます。障害時にはチェンジログから再構築され、スタンバイ・レプリカを有効化するとフェイルオーバ時間を短縮できます。
マテリアライズでストア名を明示すると、インタラクティブクエリや外部キャッシュ統合にも備えられます。発行頻度はキャッシュ設定と commit.interval に影響されます。トランザクションと冪等プロデューサにより、状態更新と出力の一貫性(いわゆる exactly-once 処理セマンティクス)が提供されます。
| ストア種別 | 用途 | 永続化/復元ポイント | 補足 |
|---|---|---|---|
| KeyValueStore | 集約結果や KTable のマテリアライズ | チェンジログ・トピック | 最も一般的 |
| WindowStore | ウィンドウ集約の中間・結果 | チェンジログ+セグメント化 | Windowed<Key> をキーに持つ |
| SessionStore | セッションウィンドウの集約 | チェンジログ | 開始・終了時刻を内部管理 |
状態ストアとチェンジログの関係
マテリアライズと EOS(環境互換に注意)の設定例
Properties props = new Properties();
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); // 環境により EXACTLY_ONCE を選択
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10485760); // キャッシュ 10MB(例)
StreamsBuilder b = new StreamsBuilder();
KTable<String, Long> counts = b.stream("events", Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey()
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("event-counts")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())
);
counts.toStream().to("events-counts");CCDAK では、KStream/KTable の役割、結合ごとの時間セマンティクス、トピックのコンパクション、トゥームストーンの意味、リパーティションの発生条件、状態管理と処理保証の設定が問われます。実務でも同じ点で障害・過負荷・不整合が起きやすいため、設計時に明確化しておきます。
特に KTable での null 値は削除を意味するため、スキーマ進化やデシリアライズ失敗で意図せず null を生成しないように注意してください。キャッシュが有効だと下流への出力が遅延するため、テストでは一時的にキャッシュを無効化して観測性を高める手もあります。
| 落とし穴 | 症状 | 回避策/確認ポイント |
|---|---|---|
| トピックが非コンパクション | KTable の状態が膨張・再起動で復元長時間 | cleanup.policy=compact(必要に応じて compact,delete)を設定 |
| キー不整合のままジョイン | 結合ヒット率が低い/ゼロ | selectKey で整合、パーティション数とハッシュを合わせる |
| 意図しない null 値 | KTable の行が削除される | スキーマで null 許容を明示、デシリアライズ失敗の監視 |
| キャッシュの影響を誤解 | 下流に結果が出ないように見える | テスト時は CACHE_MAX_BYTES=0、commit.interval を短縮 |
| 無計画な再パーティション | 内部トピック増加・スループット低下 | キー設計を事前に固定、必要最小限に selectKey/groupBy |
トゥームストーン(null)による削除の流れ
トゥームストーンの送出と観測性向上(テスト用)
Properties p = new Properties();
p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); // テスト時は即時フラッシュ
StreamsBuilder b = new StreamsBuilder();
KStream<String, String> updates = b.stream("user-updates");
// 削除条件を満たすと null を出力 -> compacted topic でトゥームストーン
updates
.mapValues((v) -> shouldDelete(v) ? null : v)
.to("user-profiles"); // cleanup.policy=compact 推奨
// KTable 側で null を検知して監視に送る
KTable<String, String> profiles = b.table("user-profiles");
profiles.toStream()
.filter((k, v) -> v == null)
.mapValues(v -> "deleted")
.to("audit-deletes");CCDAK
問題 1
ページビューのストリームに、ユーザープロファイルの最新情報を付与したい。プロファイルは userId キーでアップサートされ、削除は null 値(トゥームストーン)で表現される。適切な実装はどれか。
正解: A
参照型の付与は KStream-KTable が適切で、処理時点の KTable 最新像をルックアップできる。プロファイルはアップサートと削除(null)を表現するため compacted topic が望ましい。KStream-KStream はウィンドウが必要で参照結合の要件に合わない。KTable-KTable はテーブル変更で再計算されるため、イベント駆動の付与には不向き。
KTable と GlobalKTable の違いは?いつ GlobalKTable を使うべき?
KTable はパーティション分割された最新像で、結合にはキー整合が前提。GlobalKTable は全パーティションを各タスクに複製する読み取り専用テーブルで、KStream 側のフィールドを任意に参照キーにマッピングして結合できる。データ量が小さく、頻繁な参照を行いたい場合に有効(ただしメモリ/ネットワークコストに注意)。
KTable で null 値を受け取ると何が起きる?
null 値はトゥームストーンであり、そのキーに対応する行が削除される。コンパクションにより履歴は圧縮され、最終的に削除情報のみが保持または除去される。意図しない削除を避けるため、スキーマで null 許容を明示し、デシリアライズ失敗の監視を行うこと。
exactly-once 処理セマンティクスは KStream/KTable にどう関係する?
状態更新(KTable のマテリアライズや集約)と出力トピックへの書き込みを単一トランザクションでコミットし、一貫性を保つ。処理保証は processing.guarantee で設定し、ブローカー側の冪等/トランザクション機能が有効であることが前提。試験では概念と要件(状態・出力の原子的コミット)を理解しておく。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Kafka Topic と Partition の基礎: 分散とスケーラビリティの要
CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...
CCDAK 試験ガイド:出題範囲・配点・申込み・対策
Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...
Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点
CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...
Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性
レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...
Kafka の Offset とコミット: ポジション管理と at-least-once の基礎
CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...