StreamsとTasksは、Snowflakeでネイティブなデータパイプラインを構築するための中核機能です。Streamsがテーブルの変更差分(INSERT・UPDATE・DELETE)をキャプチャし、Tasksがその差分処理をスケジュール実行します。SnowPro試験ではData Engineering / Data Pipelineドメインで頻出のトピックです。
Streamは、ソーステーブルに対するDML変更を追跡するオブジェクトです。内部的にオフセットを持ち、前回の消費位置から現在までの差分レコードを提供します。各行には3つのメタデータ列が付与されます。
| メタデータ列 | 型 | 意味 |
|---|---|---|
| METADATA$ACTION | VARCHAR | INSERT または DELETE |
| METADATA$ISUPDATE | BOOLEAN | UPDATEの場合TRUE(DELETE+INSERTのペアとして記録) |
| METADATA$ROW_ID | VARCHAR | 行の一意識別子 |
-- Standard Stream(デフォルト:INSERT/UPDATE/DELETEすべて追跡)
CREATE OR REPLACE STREAM orders_stream
ON TABLE raw.orders;
-- Append-only Stream(INSERTのみ追跡、UPDATEとDELETEは無視)
CREATE OR REPLACE STREAM orders_append_stream
ON TABLE raw.orders
APPEND_ONLY = TRUE;
-- Streamの差分を確認
SELECT * FROM orders_stream;| Stream種類 | 追跡対象 | ユースケース |
|---|---|---|
| Standard | INSERT / UPDATE / DELETE | 完全なCDCパイプライン |
| Append-only | INSERTのみ | ログ・イベントデータの増分ロード |
| Insert-only | INSERTのみ(外部テーブル用) | 外部テーブルへの新規ファイル追加の追跡 |
CHANGES句を使うと、Streamオブジェクトを作成せずにテーブルの変更履歴をクエリできます。Streamと異なりオフセットを消費しないため、何度でも同じ差分を参照可能です。
-- 特定時刻以降の変更を取得
SELECT *
FROM raw.orders
CHANGES (INFORMATION => DEFAULT)
AT (TIMESTAMP => '2026-03-27 06:00:00'::TIMESTAMP_NTZ);
-- Statement IDを使った差分取得
SELECT *
FROM raw.orders
CHANGES (INFORMATION => APPEND_ONLY)
BEFORE (STATEMENT => '01b1a2b3-0601-1234-0000-abcdef012345');Taskは、SQL文やストアドプロシージャを指定スケジュールで自動実行するオブジェクトです。CRON式による柔軟なスケジューリングと、分単位の間隔指定に対応しています。
-- CRON式によるTask(毎日AM6:00 JST)
CREATE OR REPLACE TASK daily_merge_task
WAREHOUSE = ETL_WH
SCHEDULE = 'USING CRON 0 6 * * * Asia/Tokyo'
COMMENT = '日次の増分マージ処理'
AS
MERGE INTO mart.orders t
USING orders_stream s ON t.order_id = s.order_id
WHEN MATCHED AND METADATA$ACTION = 'DELETE' THEN DELETE
WHEN MATCHED AND METADATA$ISUPDATE = TRUE THEN
UPDATE SET t.amount = s.amount, t.updated_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED AND METADATA$ACTION = 'INSERT' THEN
INSERT (order_id, amount, created_at)
VALUES (s.order_id, s.amount, CURRENT_TIMESTAMP());
-- Taskの有効化(作成直後はSUSPENDED状態)
ALTER TASK daily_merge_task RESUME;WHEN SYSTEM$STREAM_HAS_DATA('stream_name')をTask定義に追加すると、Streamに未消費データがある場合のみTaskが実行されます。差分がないときの無駄なWarehouse起動を防止できます。
CREATE OR REPLACE TASK conditional_merge
WAREHOUSE = ETL_WH
SCHEDULE = '5 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
AS
MERGE INTO mart.orders t USING orders_stream s
ON t.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET t.amount = s.amount
WHEN NOT MATCHED THEN INSERT VALUES (s.order_id, s.amount);Tasksは親子関係を定義してDAG(有向非巡回グラフ)を構成できます。ルートTaskがスケジュールで起動し、子Taskは親の完了を待って順次実行されます。
-- ルートTask(スケジュールあり)
CREATE OR REPLACE TASK root_etl
WAREHOUSE = ETL_WH
SCHEDULE = '30 MINUTE'
AS SELECT 1;
-- 子Task(ルートの完了後に実行)
CREATE OR REPLACE TASK transform_orders
WAREHOUSE = ETL_WH
AFTER root_etl
AS
INSERT INTO mart.orders_agg
SELECT region, SUM(amount) FROM mart.orders GROUP BY region;
-- 孫Task(transform_ordersの完了後に実行)
CREATE OR REPLACE TASK notify_completion
WAREHOUSE = ETL_WH
AFTER transform_orders
AS
CALL system$send_email(...);
-- DAG全体の有効化はルートTaskをRESUMEする
ALTER TASK root_etl RESUME;Enterprise Edition以上では、Warehouse指定の代わりにServerless Taskを利用できます。Snowflakeがコンピュートリソースを自動管理するため、Warehouseの起動・サイジング・コスト管理が不要になります。
CREATE OR REPLACE TASK serverless_etl
USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL'
SCHEDULE = '10 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('events_stream')
AS
INSERT INTO analytics.events_processed
SELECT * FROM events_stream;| 比較項目 | Streams + Tasks | Dynamic Tables |
|---|---|---|
| パラダイム | 命令型(How to process) | 宣言型(What to compute) |
| 定義方法 | Stream作成 + Task作成 + DML記述 | SELECT文のみ定義 |
| 更新制御 | スケジュール(CRON/分間隔) | TARGET_LAG(遅延許容値) |
| 柔軟性 | 高(条件分岐・複数テーブル書き込み・エラーハンドリング) | 低(単一SELECT文の制約あり) |
| DAG構築 | タスクツリーで手動構成 | Dynamic Tables間の参照関係で自動構成 |
| 適用場面 | 複雑なCDCロジック・MERGE・外部通知連携 | 単純なSELECT変換の連鎖 |
Data Engineering
問題 1
ソーステーブルの変更分(INSERT・UPDATE・DELETE)を5分ごとに集約テーブルへMERGEしたい。ただしStreamに差分がないときはWarehouseを起動したくない。最も適切な構成はどれか。
正解: B
INSERT・UPDATE・DELETEすべてを追跡するにはStandard Streamが必要です(Append-onlyはINSERTのみ)。WHEN SYSTEM$STREAM_HAS_DATA条件をTaskに付与することで、差分がないときのWarehouse起動を防止できます。Dynamic TablesはMERGEのような手続き的ロジックを直接記述できないため、この要件には不向きです。
Streamを消費(SELECT)した後、データはどうなりますか?
StreamのデータはDMLトランザクション内で消費されると、そのトランザクションがコミットされた時点でStreamのオフセットが前進し、消費済みの変更レコードはStream上から消えます。単にSELECTで参照しただけでは消費されず、INSERT INTO ... SELECT FROM stream やMERGE ... USING streamのようにDMLの一部として使用された場合にのみ消費されます。この仕組みにより、Streamの消費と下流テーブルへの反映がトランザクションとして保証されます。
Serverless TaskとWarehouse Taskの使い分けはどうしますか?
Serverless Taskは、Snowflakeがコンピュートリソースを自動管理するため、ワークロードの変動が大きい場合やWarehouseの管理コストを省きたい場合に適しています。Warehouse Task(WAREHOUSE = wh_name を指定)は既存のWarehouseを使うため、コスト予測が立てやすく複数Taskで同一Warehouseを共有する場合に有効です。Serverless TaskはEnterprise Edition以上で利用可能です。
StreamsとTasksの組み合わせとDynamic Tablesはどのように使い分けますか?
Streams + Tasksは手続き的(命令型)なパイプライン構築で、MERGE・条件分岐・複数テーブルへの分配など柔軟なロジックを記述できます。Dynamic Tablesは宣言型で、SELECT文を定義するだけでSnowflakeが差分更新を自動管理します。パイプラインが単純なSELECT変換の連鎖であればDynamic Tables、複雑なCDCロジックやエラーハンドリングが必要な場合はStreams + Tasksが適しています。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Snowflake資格一覧|全11試験(SnowPro)の難易度・費用
Snowflake認定資格(SnowPro)全11試験の一覧・難易度・費用・出題範囲を徹底解説。...
Snowflake試験の難易度ランキング|全11資格を徹底比較
Snowflake(SnowPro)認定全11試験の難易度をランキング形式で比較。学習時間・合格に必要なスキルから分析。...
Snowflake資格の勉強方法|効率的な学習ルートと合格のコツ
Snowflake認定資格(SnowPro)に最短で合格するための勉強方法。公式リソース・学習スケジュールを徹底ガイド。...
SnowPro Core試験完全解説|出題範囲・問題例・合格戦略
SnowPro Core Certification(COF-C03)を徹底解説。出題範囲・100問の試験形式・合格ライ...
SnowPro Platform Associate完全解説|入門試験の攻略
SnowPro Associate: Platform Certification(SOL-C01)を徹底解説。最も簡単...