ウィンドウ処理は「いつのデータをどこまでまとめるか」を決める時間軸の設計。Kafkaでは基本的にイベント時刻ベースで考えます。
Tumbling(固定長・非重複)、Hopping(固定長・重複)、Session(可変長・マージ)を区別して選ぶのが第一歩。
Kafkaのウィンドウ処理は、連続するストリームを時間で区切って集計・結合するための仕組みです。Kafka Streams DSLとksqlDBはいずれも原則としてイベント時刻(レコードのタイムスタンプ)を用います。ストリーム時間は、観測したイベント時刻の最大値で進みます。
遅延イベント(到着順が前後するデータ)を扱うには、ウィンドウ終端後にどの程度まで受け入れるか(Grace)を設定します。ウィンドウは「ストリーム時間が ウィンドウ終端+Grace に達した時点」で閉じられ、それ以降に属するレコードは遅着として破棄されます。
正しく時刻を扱うには、TimestampExtractor(Kafka Streams)やTIMESTAMP列(ksqlDB)を適切に指定し、期待するイベント時刻を付与することが重要です。
3種のウィンドウは境界の作り方と重なり方が決定的に異なります。要件(重複カウント可否、ユーザー行動のかたまり検出、レポートの粒度)に合わせて選択します。特に試験(CCDAK)では、定義と使いどころを問う設問が繰り返し出題されます。
Sessionは非活動ギャップで区切る可変長。遅延イベントを契機に別々のセッションがマージされることがある、という点が実務・試験ともに重要です。
| 種別 | 境界/長さ | オーバーラップ | Grace(遅延許容) |
|---|---|---|---|
| Tumbling | 固定長。例: 1分ごと [00:00,00:01) | なし(非重複) | ウィンドウ終端後、Graceまで受け入れ |
| Hopping | 固定長+スライド。例: 長さ5分・1分スライド | あり(同一イベントが複数ウィンドウに入る) | 終端後、Graceまで受け入れ(各ウィンドウごと) |
| Session | 可変長。非活動ギャップで区切り、活動で延長 | マージあり(遅延でセッション結合し得る) | セッション終端後、Graceまで受け入れ |
ウィンドウは、ストリーム時間がウィンドウ終端+Graceに到達するまで「開いたまま」です。ストリーム時間は観測したイベント時刻の最大値で進むため、将来時刻のレコードが到着すると過去ウィンドウが一気に閉じることがあります。
以下の図は、Tumblingウィンドウ [0,5), [5,10) と、Grace=2 の例です。ts=4 の遅延イベントがストリーム時間6の時点で到着しても、5+2=7 まではウィンドウ[0,5)が開いているため取り込まれます。
ウィンドウ終端とGraceのイメージ(Tumbling, Grace=2)
time --->
0 2 4 6 8 10
|----|----|----|----|----|
[ W1:0-5 ) [ W2:5-10 )
|-------------------------|-------------------------
イベント到着:
e1: ts=3, arrive@3 -> 属する: W1(オンタイム)
e2: ts=6, arrive@6 -> 属する: W2(オンタイム)=> stream-time = 6
e3: ts=4, arrive@6 -> 遅延だが W1 に取り込み可能(stream-time 6 < 5+Grace 7)
e4: ts=1, arrive@9 -> W1は既に閉鎖(stream-time>=7)=> 遅着として破棄Kafka Streams DSLでは、TimeWindows.ofSizeAndGrace と SessionWindows.ofInactivityGapAndGrace を用いてウィンドウ長とGraceを明示します。HoppingはadvanceByでスライド幅を指定します。Sessionではマージ関数(Aggregator Merger)が必須です。
ksqlDBでは、WINDOW句でTUMBLING/HOPPING/SESSIONを指定し、SIZE/ADVANCE BY/GRACE PERIODを合わせて宣言します。最終結果のみを取りたい場合はEMIT FINALを併用します(中間更新を抑制)。実際のバージョンにより句の細かな位置やサポートは変わることがあるため、運用前に該当バージョンのドキュメントを確認してください。
Kafka Streams DSL と ksqlDB のサンプル
// Kafka Streams (Java)
KStream<String, String> source = builder.stream("input");
// 1) Tumbling 1分 + Grace 5分(イベント時刻ベース)
KTable<Windowed<String>, Long> tumbling = source
.groupByKey()
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(1), Duration.ofMinutes(5)))
.count();
// 必要ならウィンドウ閉鎖後のみ出力(中間を抑制)
KTable<Windowed<String>, Long> tumblingFinal = tumbling
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
// 2) Hopping(長さ5分・1分スライド)+ Grace 5分
KTable<Windowed<String>, Long> hopping = source
.groupByKey()
.windowedBy(
TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(5))
.advanceBy(Duration.ofMinutes(1))
)
.count();
// 3) Session(非活動ギャップ5分・Grace 10分)
KTable<Windowed<String>, Long> sessions = source
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(10)))
.aggregate(
() -> 0L,
(key, value, agg) -> agg + 1,
(agg1, agg2) -> agg1 + agg2, // セッションマージ時の結合関数
Materialized.with(Serdes.String(), Serdes.Long())
);
// ksqlDB
-- ストリーム定義(イベント時刻としてtsを使用)
CREATE STREAM clicks (user_id VARCHAR, ts BIGINT)
WITH (KAFKA_TOPIC='clicks', VALUE_FORMAT='JSON', TIMESTAMP='ts');
-- Tumbling 1分 + Grace 5分、最終結果のみ
CREATE TABLE clicks_tumbling AS
SELECT user_id, COUNT(*) AS c
FROM clicks
WINDOW TUMBLING (SIZE 1 MINUTE, GRACE PERIOD 5 MINUTES)
GROUP BY user_id
EMIT FINAL;
-- Hopping(長さ5分・1分スライド)+ Grace 5分
CREATE TABLE clicks_hopping AS
SELECT user_id, COUNT(*) AS c
FROM clicks
WINDOW HOPPING (SIZE 5 MINUTES, ADVANCE BY 1 MINUTE, GRACE PERIOD 5 MINUTES)
GROUP BY user_id
EMIT FINAL;
-- Session(ギャップ60秒・Grace 5分)
CREATE TABLE clicks_session AS
SELECT user_id, COUNT(*) AS c
FROM clicks
WINDOW SESSION (60 SECONDS, GRACE PERIOD 5 MINUTES)
GROUP BY user_id
EMIT FINAL;
-- 注: 実運用ではバージョンによりWINDOW/GRACE句の詳細が異なる場合があります。ウィンドウ集計は内部に状態(ウィンドウストア)を保持します。保持期間は少なくとも「ウィンドウ長+Grace」以上が必要です。Kafka StreamsのofSizeAndGraceやofInactivityGapAndGraceを使う場合、この前提を満たす設定を行うのが実務上の定石です。
抑制(Suppression)はウィンドウ閉鎖まで中間結果を溜め、閉鎖時に最終結果のみを下流へ流します。untilWindowClosesは分かりやすい一方、バッファが増えメモリプレッシャーになるため、負荷・レイテンシ要件と合わせて評価します。ksqlDBではEMIT FINALが近い役割を担います。
出題の定番は、ウィンドウ種別の定義、イベント時刻と処理時刻の違い、Graceによる遅延許容、セッションマージの挙動です。特にSessionのマージ条件(非活動ギャップをまたぐ遅延により別セッションが結合する)と、ウィンドウの閉鎖条件(stream-time >= window-end + grace)は暗記レベルで押さえましょう。
Hoppingは重複カウントが前提である点、Suppression/EMIT FINALの使いどころ、保持期間が足りないと遅延を取り込めない点は実務でも誤解が多く、試験でもひっかけに使われます。
CCDAK
問題 1
Kafka Streamsでイベント時刻に基づく1分Tumbling集計を行い、ウィンドウ終端後5分まで遅延イベントを受け入れたい。最も適切な設定はどれか?
正解: A
遅延受け入れはGraceで制御する。ofSizeAndGrace(1分, 5分)が正解。advanceByはHoppingのスライド設定であり、遅延許容は設定しない。Suppressionは出力タイミングであり遅延の受け入れとは別。保持期間は十分に必要だが、Graceを0のままでは遅延を受け入れない。
Graceを超えて到着した遅着イベントはどうなる?捕捉する方法はある?
ウィンドウが閉鎖(stream-time >= window-end + grace)した後に到着したレコードは破棄されます(Kafka Streamsではメトリクスにカウントされる)。捕捉したい場合は、ウィンドウ前に独自の遅延判定(現在のstream-timeとレコード時刻の差など)で分岐し、別トピックに退避する等の処理を実装します。ksqlDBには標準の“遅着専用出力”はありません。
Sessionウィンドウのマージはいつ起きる?何が再計算される?
同一キーで、非活動ギャップをまたいでいた2つのセッションが、遅延イベントの到着などでつながるとマージが発生します。マージ時は開始・終了時刻が更新され、集計値はマージ関数(aggregator merger)で再計算されます。
イベント時刻と処理時刻のどちらを使うべき?
Kafka Streams/ksqlDBの設計意図はイベント時刻優先です。TimestampExtractorやTIMESTAMP列を適切に設定し、データに内在する時刻でウィンドウ化してください。処理時刻はレイテンシ評価や暫定可視化には使えますが、正確な集計には向きません。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Kafka Topic と Partition の基礎: 分散とスケーラビリティの要
CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...
CCDAK 試験ガイド:出題範囲・配点・申込み・対策
Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...
Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点
CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...
Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性
レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...
Kafka の Offset とコミット: ポジション管理と at-least-once の基礎
CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...