Kafka Streams の Join は、リアルタイム処理の表現力を大きく左右します。試験(CCDAK)でも出題頻度が高く、実務でも「どの Join を選ぶか」「どのようにキーとパーティションを合わせるか」でパフォーマンスと正確性が決まります。
本稿では、Stream-Stream、Stream-Table、GlobalKTable を比較し、ウィンドウ、タイムスタンプ伝播、再パーティショニング、tombstone など公式挙動に基づく注意点を整理します。
Kafka Streams の Join は大きく3つ。イベント同士を一定の時間窓で突き合わせる Stream-Stream、ストリームに最新状態を付与する Stream-Table、分散を意識せず参照できる GlobalKTable です。選択の軸は「時間整合が必要か」「参照は最新1行でよいか」「同一パーティションに揃えられるか」です。
CCDAK 観点では、どの Join がウィンドウ必須か、どの結合種(inner/left/outer)が使えるか、結果レコードのタイムスタンプが何に由来するか、再パーティショニングの必要性と GlobalKTable のトレードオフを問われやすいです。
| Join 種類 | 主キー要件 | ウィンドウ要否 | サポート結合種 |
|---|---|---|---|
| Stream-Stream (KStream-KStream) | 両ストリーム同一キー | 必須(前後幅を指定) | inner / left / outer |
| Stream-Table (KStream-KTable) | ストリームのキー = テーブルのキー | 不要(常に最新行参照) | inner / left |
| Stream-GlobalKTable (KStream-GlobalKTable) | ストリームのキー→参照キーへ変換 | 不要 | inner / left |
基本スケルトン(共通 Serde とビルダー)
StreamsBuilder builder = new StreamsBuilder();
// 共通の Serde 設定はプロパティ or StreamJoined/Joined で明示
// 以降のセクションで具体的な join を示します。Stream-Stream は両入力をウィンドウにバッファし、同一キーで時間が重なるペアを照合します。JoinWindow は前後幅(before/after)を持ち、状態ストアに保持された相手側レコードとマッチします。到着順が前後しても、ウィンドウ保持内かつ遅延許容範囲にあれば結合されます。
結果レコードのタイムスタンプは2入力の最大値です。join 種は inner/left/outer が選べ、左外部は左側のイベントを基準に相手が無い場合も出力、完全外部はどちらか片方のみでも出力します。内部的には両側に RocksDB の状態ストアが作られ、保持期間はウィンドウ幅(before+after)に遅延許容分を足した値が必要です。
Stream-Stream 窓付き結合の概念図
time --->
left (orders): --- o@t5 ----- o@t8 -------- o@t14 -----
[<-- before 2m -->|<-- after 3m -->]
right(payments): ---- p@t6 ---- p@t10 -- p@t13 -------------
例: before=2m, after=3m の場合
- o@t8 は [t6..t11] の支払いと突合 → p@t6, p@t10 が候補
- o@t14 は [t12..t17] の支払いと突合 → p@t13 が候補
結果レコード ts = max(o.ts, p.ts)KStream-KStream 窓付き結合の例 (Java DSL)
KStream<String, Order> orders = builder.stream("orders");
KStream<String, Payment> payments = builder.stream("payments");
// 5分窓、前2分・後3分(メソッド名はバージョンで異なる場合あり)
JoinWindows win = JoinWindows.of(Duration.ofMinutes(5))
.before(Duration.ofMinutes(2))
.after(Duration.ofMinutes(3));
KStream<String, Enriched> joined = orders.join(
payments,
(o, p) -> Enriched.from(o, p),
win,
StreamJoined.with(Serdes.String(), orderSerde, paymentSerde)
);
// left/outer も同様に leftJoin / outerJoin を使用Stream-Table は「イベント + 最新状態」の付与です。窓は不要で、結合時点のテーブルの最新値を参照します。inner と left が利用可能で、outer はありません。テーブル側の値が null(tombstone)であれば、inner はドロップ、left は右値 null で出力されます。
結果レコードのタイムスタンプはストリーム側のものが引き継がれます。実行時にはテーブルの changelog 由来のローカルストアを参照します。パーティション整合が必要なため、必要に応じてストリーム側に内部再パーティショニングが入ります。
KStream-KTable の例(ユーザー属性の付与)
KStream<String, PageView> views = builder.stream("pageviews");
KTable<String, UserAttr> users = builder.table("users-changelog");
KStream<String, EnrichedView> enriched = views.leftJoin(
users,
(view, user) -> EnrichedView.of(view, user) // user が null の場合も考慮
);
enriched.to("pageviews-with-user");GlobalKTable は、テーブルトピックの全パーティションを各タスクに複製し、ローカルで即時参照できる仕組みです。KStream-GlobalKTable は、ストリームレコードから参照キーを導出するための KeyValueMapper を渡し、外部キー参照(例: 注文の productId から製品カテゴリを引く)を容易にします。
利点は再パーティショニング不要・パーティション不一致でも動作する点。欠点は各タスクが全データを保持するためメモリ/ローカルストレージの負荷が増える点です。大規模ディメンションではサイズ見積もりと RocksDB・changelog の運用を前提に設計します。
KStream-GlobalKTable の例(外部キー参照)
KStream<String, Order> orders = builder.stream("orders");
GlobalKTable<String, Product> products = builder.globalTable("products-changelog");
KStream<String, EnrichedOrder> enriched = orders.leftJoin(
products,
// ストリームレコードから GlobalKTable の参照キーを導出
(orderKey, order) -> order.getProductId(),
(order, product) -> EnrichedOrder.of(order, product) // product が null もあり得る
);
enriched.to("orders-enriched");Join の多くはキー整合が前提です。Kafka Streams は必要に応じて内部の再パーティショニングトピックを作成し、キーに基づいてデータを並び替えます。これには追加のネットワーク・ストレージコストがかかるため、可能ならソーストピック段階で「同じキー関数・同じパーティション数」に揃えるのがベストです。
Stream-Stream の状態ストアは左右に作られ、保持期間はウィンドウ幅と遅延許容分に依存します。保持が短すぎると遅延イベントが結合されず、長すぎるとストレージを圧迫します。Stream-Table/GlobalKTable はテーブル側が changelog によって常に最新へ更新され、tombstone により削除が伝播します。
キー整形と再パーティショニングの明示例
KStream<String, Event> in = builder.stream("input");
// 外部キー join を見据えて、参照キーで再キー化
KStream<String, Event> byRefKey = in.selectKey((k, v) -> v.getRefId());
// 以降の join は byRefKey を使うと内部再パーティションを減らせる場合がある試験では、各 Join の適用条件と結果のタイムスタンプ、サポートされる結合種、tombstone の影響、GlobalKTable の特性が頻出です。実装 API 名はバージョン差分があるため、概念と挙動を優先的に押さえてください。
Serdes/Joined の明示で型・キーを安定化
KStream<String, L> left = builder.stream("left");
KStream<String, R> right = builder.stream("right");
KStream<String, Out> out = left.join(
right,
(l, r) -> combine(l, r),
JoinWindows.of(Duration.ofMinutes(3)),
StreamJoined.with(Serdes.String(), lSerde, rSerde) // 既定Serdeの誤適用を防ぐ
);CCDAK
問題 1
注文イベントと支払いイベントを結合し、注文発生から前後合計5分の範囲でマッチさせたい。支払いが無い注文も可視化したい。結果レコードのタイムスタンプは最も遅いイベント時刻としたい。どの設計が最も適切か?
正解: A
時間窓が必要かつ支払いが無い注文も出力したいので Stream-Stream の left join が要件に合致。結果 ts は Stream-Stream では2入力の最大時刻。Stream-Table/GlobalKTable は時間窓ではなく最新値参照であり、支払い未到着の可視化には適さない。
Stream-Stream で遅延到着はどこまで許容される?
Join ウィンドウの保持と(利用バージョンで設定可能な)遅延許容分に収まる範囲。これを超えるとマッチしません。状態ストアの保持期間を十分に確保してください。
KStream-KTable でパーティション数が一致していなくても動く?
動作しますが、ストリーム側に内部再パーティショニングが入ることがあります。ネットワーク・ストレージ負荷増となるため、可能なら事前にコパーティションを揃えるのが推奨です。
GlobalKTable は巨大ディメンションでも使える?
各タスクに全データを複製するため、サイズが大きいとメモリ/ローカルストレージ負荷が高くなります。サイズが大きい場合は通常の KTable でコパーティションを整える設計を優先的に検討してください。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Kafka Topic と Partition の基礎: 分散とスケーラビリティの要
CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...
CCDAK 試験ガイド:出題範囲・配点・申込み・対策
Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...
Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点
CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...
Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性
レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...
Kafka の Offset とコミット: ポジション管理と at-least-once の基礎
CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...