Snowflake

Streams & Tasks

2026-03-21
更新: 2026-03-27
NicheeLab編集部

StreamsTasksは、Snowflakeでネイティブなデータパイプラインを構築するための中核機能です。Streamsがテーブルの変更差分(INSERT・UPDATE・DELETE)をキャプチャし、Tasksがその差分処理をスケジュール実行します。SnowPro試験ではData Engineering / Data Pipelineドメインで頻出のトピックです。

Streams(変更データキャプチャ)

Streamは、ソーステーブルに対するDML変更を追跡するオブジェクトです。内部的にオフセットを持ち、前回の消費位置から現在までの差分レコードを提供します。各行には3つのメタデータ列が付与されます。

メタデータ列意味
METADATA$ACTIONVARCHARINSERT または DELETE
METADATA$ISUPDATEBOOLEANUPDATEの場合TRUE(DELETE+INSERTのペアとして記録)
METADATA$ROW_IDVARCHAR行の一意識別子
-- Standard Stream(デフォルト:INSERT/UPDATE/DELETEすべて追跡)
CREATE OR REPLACE STREAM orders_stream
  ON TABLE raw.orders;

-- Append-only Stream(INSERTのみ追跡、UPDATEとDELETEは無視)
CREATE OR REPLACE STREAM orders_append_stream
  ON TABLE raw.orders
  APPEND_ONLY = TRUE;

-- Streamの差分を確認
SELECT * FROM orders_stream;

Stream種類の比較

Stream種類追跡対象ユースケース
StandardINSERT / UPDATE / DELETE完全なCDCパイプライン
Append-onlyINSERTのみログ・イベントデータの増分ロード
Insert-onlyINSERTのみ(外部テーブル用)外部テーブルへの新規ファイル追加の追跡

CHANGES句によるTime Travel的な差分取得

CHANGES句を使うと、Streamオブジェクトを作成せずにテーブルの変更履歴をクエリできます。Streamと異なりオフセットを消費しないため、何度でも同じ差分を参照可能です。

-- 特定時刻以降の変更を取得
SELECT *
FROM raw.orders
  CHANGES (INFORMATION => DEFAULT)
  AT (TIMESTAMP => '2026-03-27 06:00:00'::TIMESTAMP_NTZ);

-- Statement IDを使った差分取得
SELECT *
FROM raw.orders
  CHANGES (INFORMATION => APPEND_ONLY)
  BEFORE (STATEMENT => '01b1a2b3-0601-1234-0000-abcdef012345');

Tasks(スケジュール実行)

Taskは、SQL文やストアドプロシージャを指定スケジュールで自動実行するオブジェクトです。CRON式による柔軟なスケジューリングと、分単位の間隔指定に対応しています。

-- CRON式によるTask(毎日AM6:00 JST)
CREATE OR REPLACE TASK daily_merge_task
  WAREHOUSE = ETL_WH
  SCHEDULE = 'USING CRON 0 6 * * * Asia/Tokyo'
  COMMENT = '日次の増分マージ処理'
AS
MERGE INTO mart.orders t
USING orders_stream s ON t.order_id = s.order_id
WHEN MATCHED AND METADATA$ACTION = 'DELETE' THEN DELETE
WHEN MATCHED AND METADATA$ISUPDATE = TRUE THEN
  UPDATE SET t.amount = s.amount, t.updated_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED AND METADATA$ACTION = 'INSERT' THEN
  INSERT (order_id, amount, created_at)
  VALUES (s.order_id, s.amount, CURRENT_TIMESTAMP());

-- Taskの有効化(作成直後はSUSPENDED状態)
ALTER TASK daily_merge_task RESUME;

SYSTEM$STREAM_HAS_DATAによる条件実行

WHEN SYSTEM$STREAM_HAS_DATA('stream_name')をTask定義に追加すると、Streamに未消費データがある場合のみTaskが実行されます。差分がないときの無駄なWarehouse起動を防止できます。

CREATE OR REPLACE TASK conditional_merge
  WAREHOUSE = ETL_WH
  SCHEDULE = '5 MINUTE'
  WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
AS
  MERGE INTO mart.orders t USING orders_stream s
  ON t.order_id = s.order_id
  WHEN MATCHED THEN UPDATE SET t.amount = s.amount
  WHEN NOT MATCHED THEN INSERT VALUES (s.order_id, s.amount);

タスクツリー(DAG)

Tasksは親子関係を定義してDAG(有向非巡回グラフ)を構成できます。ルートTaskがスケジュールで起動し、子Taskは親の完了を待って順次実行されます。

-- ルートTask(スケジュールあり)
CREATE OR REPLACE TASK root_etl
  WAREHOUSE = ETL_WH
  SCHEDULE = '30 MINUTE'
AS SELECT 1;

-- 子Task(ルートの完了後に実行)
CREATE OR REPLACE TASK transform_orders
  WAREHOUSE = ETL_WH
  AFTER root_etl
AS
  INSERT INTO mart.orders_agg
  SELECT region, SUM(amount) FROM mart.orders GROUP BY region;

-- 孫Task(transform_ordersの完了後に実行)
CREATE OR REPLACE TASK notify_completion
  WAREHOUSE = ETL_WH
  AFTER transform_orders
AS
  CALL system$send_email(...);

-- DAG全体の有効化はルートTaskをRESUMEする
ALTER TASK root_etl RESUME;

Serverless Task

Enterprise Edition以上では、Warehouse指定の代わりにServerless Taskを利用できます。Snowflakeがコンピュートリソースを自動管理するため、Warehouseの起動・サイジング・コスト管理が不要になります。

CREATE OR REPLACE TASK serverless_etl
  USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL'
  SCHEDULE = '10 MINUTE'
  WHEN SYSTEM$STREAM_HAS_DATA('events_stream')
AS
  INSERT INTO analytics.events_processed
  SELECT * FROM events_stream;

Dynamic Tablesとの比較

比較項目Streams + TasksDynamic Tables
パラダイム命令型(How to process)宣言型(What to compute)
定義方法Stream作成 + Task作成 + DML記述SELECT文のみ定義
更新制御スケジュール(CRON/分間隔)TARGET_LAG(遅延許容値)
柔軟性高(条件分岐・複数テーブル書き込み・エラーハンドリング)低(単一SELECT文の制約あり)
DAG構築タスクツリーで手動構成Dynamic Tables間の参照関係で自動構成
適用場面複雑なCDCロジック・MERGE・外部通知連携単純なSELECT変換の連鎖

モニタリングと運用

  • TASK_HISTORY:INFORMATION_SCHEMA.TASK_HISTORYテーブル関数でTaskの実行履歴・エラー詳細を確認
  • STREAM_HAS_DATA:Streamに未消費データがあるかをBOOLEANで返す関数。Task内のWHEN句だけでなく、手動確認にも利用可能
  • Stale Stream:Streamの保持期間(DATA_RETENTION_TIME_IN_DAYS + 14日)を超えると「stale」となり、消費不可になる。定期的な消費が必要
  • 権限管理:Taskは所有ロールの権限で実行されるため、EXECUTE TASK権限の付与とロールの最小権限設計が重要

問題で確認

Data Engineering

問題 1

ソーステーブルの変更分(INSERT・UPDATE・DELETE)を5分ごとに集約テーブルへMERGEしたい。ただしStreamに差分がないときはWarehouseを起動したくない。最も適切な構成はどれか。

  1. Dynamic Tableを作成しTARGET_LAG = '5 minutes'を設定する
  2. Standard Streamを作成し、TaskにWHEN SYSTEM$STREAM_HAS_DATA条件とMERGE文を定義する
  3. Append-only Streamを作成し、5分間隔のTaskでINSERT INTOを実行する
  4. Streamを使わず、TaskのSCHEDULEのみで毎回フルテーブルスキャンを実行する

正解: B

INSERT・UPDATE・DELETEすべてを追跡するにはStandard Streamが必要です(Append-onlyはINSERTのみ)。WHEN SYSTEM$STREAM_HAS_DATA条件をTaskに付与することで、差分がないときのWarehouse起動を防止できます。Dynamic TablesはMERGEのような手続き的ロジックを直接記述できないため、この要件には不向きです。

よくある質問

Streamを消費(SELECT)した後、データはどうなりますか?

StreamのデータはDMLトランザクション内で消費されると、そのトランザクションがコミットされた時点でStreamのオフセットが前進し、消費済みの変更レコードはStream上から消えます。単にSELECTで参照しただけでは消費されず、INSERT INTO ... SELECT FROM stream やMERGE ... USING streamのようにDMLの一部として使用された場合にのみ消費されます。この仕組みにより、Streamの消費と下流テーブルへの反映がトランザクションとして保証されます。

Serverless TaskとWarehouse Taskの使い分けはどうしますか?

Serverless Taskは、Snowflakeがコンピュートリソースを自動管理するため、ワークロードの変動が大きい場合やWarehouseの管理コストを省きたい場合に適しています。Warehouse Task(WAREHOUSE = wh_name を指定)は既存のWarehouseを使うため、コスト予測が立てやすく複数Taskで同一Warehouseを共有する場合に有効です。Serverless TaskはEnterprise Edition以上で利用可能です。

StreamsとTasksの組み合わせとDynamic Tablesはどのように使い分けますか?

Streams + Tasksは手続き的(命令型)なパイプライン構築で、MERGE・条件分岐・複数テーブルへの分配など柔軟なロジックを記述できます。Dynamic Tablesは宣言型で、SELECT文を定義するだけでSnowflakeが差分更新を自動管理します。パイプラインが単純なSELECT変換の連鎖であればDynamic Tables、複雑なCDCロジックやエラーハンドリングが必要な場合はStreams + Tasksが適しています。

この記事で学んだ内容を問題で確認しましょう

16,000問以上の問題で実力チェック

無料で問題を解いてみる
この記事の著者

NicheeLab編集部

データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。


関連記事
Snowflake

Snowflake資格一覧|全11試験(SnowPro)の難易度・費用

Snowflake認定資格(SnowPro)全11試験の一覧・難易度・費用・出題範囲を徹底解説。...

Snowflake

Snowflake試験の難易度ランキング|全11資格を徹底比較

Snowflake(SnowPro)認定全11試験の難易度をランキング形式で比較。学習時間・合格に必要なスキルから分析。...

Snowflake

Snowflake資格の勉強方法|効率的な学習ルートと合格のコツ

Snowflake認定資格(SnowPro)に最短で合格するための勉強方法。公式リソース・学習スケジュールを徹底ガイド。...

Snowflake

SnowPro Core試験完全解説|出題範囲・問題例・合格戦略

SnowPro Core Certification(COF-C03)を徹底解説。出題範囲・100問の試験形式・合格ライ...

Snowflake

SnowPro Platform Associate完全解説|入門試験の攻略

SnowPro Associate: Platform Certification(SOL-C01)を徹底解説。最も簡単...

Snowflakeの記事一覧 (102件)
© 2026 NicheeLab All rights reserved.