Snowflake

Snowflake Streams & Tasks: Complete Guide

2026-03-21
更新: 2026-03-27
NicheeLab Editorial Team

Streams and Tasks are the core building blocks for native data pipelines in Snowflake. Streams capture row-level changes (INSERT, UPDATE, DELETE) on a source table, and Tasks run the downstream processing on a schedule. This is a frequent topic in the Data Engineering / Data Pipeline domain of the SnowPro exams.

Streams (Change Data Capture)

A Stream is an object that tracks DML changes against a source table. It maintains an internal offset and surfaces the change records between the last consumed position and the current state. Each row is enriched with three metadata columns.

Metadata columnTypeMeaning
METADATA$ACTIONVARCHARINSERT or DELETE
METADATA$ISUPDATEBOOLEANTRUE for UPDATEs (recorded as a DELETE+INSERT pair)
METADATA$ROW_IDVARCHARUnique row identifier
-- Standard Stream (default: tracks all INSERT/UPDATE/DELETE)
CREATE OR REPLACE STREAM orders_stream
  ON TABLE raw.orders;

-- Append-only Stream (tracks INSERTs only; ignores UPDATE and DELETE)
CREATE OR REPLACE STREAM orders_append_stream
  ON TABLE raw.orders
  APPEND_ONLY = TRUE;

-- Inspect the Stream's change set
SELECT * FROM orders_stream;

Stream Types Compared

Stream typeTracked changesUse case
StandardINSERT / UPDATE / DELETEFull CDC pipelines
Append-onlyINSERTs onlyIncremental loads for logs and event data
Insert-onlyINSERTs only (for external tables)Tracking new file arrivals against external tables

Time-Travel Style Diffs with the CHANGES Clause

The CHANGES clause lets you query a table's change history without creating a Stream object. Unlike a Stream, it does not consume an offset, so you can reference the same diff as many times as you want.

-- Fetch changes since a specific timestamp
SELECT *
FROM raw.orders
  CHANGES (INFORMATION => DEFAULT)
  AT (TIMESTAMP => '2026-03-27 06:00:00'::TIMESTAMP_NTZ);

-- Fetch the diff relative to a Statement ID
SELECT *
FROM raw.orders
  CHANGES (INFORMATION => APPEND_ONLY)
  BEFORE (STATEMENT => '01b1a2b3-0601-1234-0000-abcdef012345');

Tasks (Scheduled Execution)

A Task is an object that executes a SQL statement or stored procedure automatically on a defined schedule. It supports both flexible CRON expressions and simple minute-based intervals.

-- CRON-driven Task (daily at 06:00 JST)
CREATE OR REPLACE TASK daily_merge_task
  WAREHOUSE = ETL_WH
  SCHEDULE = 'USING CRON 0 6 * * * Asia/Tokyo'
  COMMENT = 'Daily incremental merge job'
AS
MERGE INTO mart.orders t
USING orders_stream s ON t.order_id = s.order_id
WHEN MATCHED AND METADATA$ACTION = 'DELETE' THEN DELETE
WHEN MATCHED AND METADATA$ISUPDATE = TRUE THEN
  UPDATE SET t.amount = s.amount, t.updated_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED AND METADATA$ACTION = 'INSERT' THEN
  INSERT (order_id, amount, created_at)
  VALUES (s.order_id, s.amount, CURRENT_TIMESTAMP());

-- Activate the Task (newly created Tasks start in SUSPENDED state)
ALTER TASK daily_merge_task RESUME;

Conditional Execution with SYSTEM$STREAM_HAS_DATA

Adding WHEN SYSTEM$STREAM_HAS_DATA('stream_name') to a Task definition makes the Task run only when there is unconsumed data in the Stream. This prevents wasted Warehouse startups when no diff is available.

CREATE OR REPLACE TASK conditional_merge
  WAREHOUSE = ETL_WH
  SCHEDULE = '5 MINUTE'
  WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
AS
  MERGE INTO mart.orders t USING orders_stream s
  ON t.order_id = s.order_id
  WHEN MATCHED THEN UPDATE SET t.amount = s.amount
  WHEN NOT MATCHED THEN INSERT VALUES (s.order_id, s.amount);

Task Trees (DAGs)

Tasks can be wired into parent-child relationships to form a DAG (directed acyclic graph). The root Task fires on its schedule, and child Tasks run in order after their parent completes.

-- Root Task (has a schedule)
CREATE OR REPLACE TASK root_etl
  WAREHOUSE = ETL_WH
  SCHEDULE = '30 MINUTE'
AS SELECT 1;

-- Child Task (runs after the root completes)
CREATE OR REPLACE TASK transform_orders
  WAREHOUSE = ETL_WH
  AFTER root_etl
AS
  INSERT INTO mart.orders_agg
  SELECT region, SUM(amount) FROM mart.orders GROUP BY region;

-- Grandchild Task (runs after transform_orders completes)
CREATE OR REPLACE TASK notify_completion
  WAREHOUSE = ETL_WH
  AFTER transform_orders
AS
  CALL system$send_email(...);

-- Activate the entire DAG by RESUMEing the root Task
ALTER TASK root_etl RESUME;

Serverless Task

On Enterprise Edition and above, you can use Serverless Tasks instead of specifying a Warehouse. Snowflake manages compute automatically, removing the need to start, size, or budget for a dedicated Warehouse.

CREATE OR REPLACE TASK serverless_etl
  USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL'
  SCHEDULE = '10 MINUTE'
  WHEN SYSTEM$STREAM_HAS_DATA('events_stream')
AS
  INSERT INTO analytics.events_processed
  SELECT * FROM events_stream;

Comparison with Dynamic Tables

DimensionStreams + TasksDynamic Tables
ParadigmImperative (how to process)Declarative (what to compute)
How you define itCreate a Stream + create a Task + write DMLDefine a SELECT statement only
Refresh controlSchedule (CRON / minute intervals)TARGET_LAG (allowed staleness)
FlexibilityHigh (conditional logic, fan-out writes, error handling)Low (constrained to a single SELECT)
DAG constructionWired manually as a task treeInferred automatically from references between Dynamic Tables
Best fitComplex CDC logic, MERGE, external notification integrationChains of simple SELECT transformations

Monitoring and Operations

  • TASK_HISTORY: Use the INFORMATION_SCHEMA.TASK_HISTORY table function to review Task execution history and error details.
  • STREAM_HAS_DATA: A function that returns a BOOLEAN indicating whether a Stream has unconsumed data. Useful inside a Task's WHEN clause and for ad-hoc checks.
  • Stale Stream: Once a Stream exceeds its retention window (DATA_RETENTION_TIME_IN_DAYS + 14 days) it becomes "stale" and can no longer be consumed. Consume regularly to avoid this.
  • Privilege management: A Task runs with the privileges of its owning role, so granting EXECUTE TASK and designing roles around least privilege is critical.

Check Your Understanding

Data Engineering

問題 1

You want to MERGE source-table changes (INSERT, UPDATE, DELETE) into an aggregation table every 5 minutes, but you do not want the Warehouse to start when the Stream has no diff. Which configuration is most appropriate?

  1. Create a Dynamic Table with TARGET_LAG = '5 minutes'
  2. Create a Standard Stream and define a Task with a WHEN SYSTEM$STREAM_HAS_DATA condition and a MERGE statement
  3. Create an Append-only Stream and run INSERT INTO from a 5-minute Task
  4. Skip Streams and rely solely on a Task SCHEDULE to run a full table scan each time

正解: B

Tracking all of INSERT, UPDATE, and DELETE requires a Standard Stream (Append-only handles INSERTs only). Adding a WHEN SYSTEM$STREAM_HAS_DATA condition to the Task prevents the Warehouse from starting when there is no diff. Dynamic Tables cannot express procedural logic like MERGE directly, so they are not a good fit for this requirement.

Frequently Asked Questions

What happens to Stream data after it is consumed (SELECTed)?

When Stream data is consumed inside a DML transaction, the Stream's offset advances once that transaction commits, and the consumed change records disappear from the Stream. A plain SELECT does not consume the Stream; consumption only happens when the Stream is used as part of a DML statement such as INSERT INTO ... SELECT FROM stream or MERGE ... USING stream. This guarantees that Stream consumption and downstream table updates are atomic within a single transaction.

When should I use a Serverless Task vs. a Warehouse Task?

Serverless Tasks let Snowflake manage compute automatically, so they fit workloads with high variability or cases where you want to avoid Warehouse management overhead. Warehouse Tasks (specified via WAREHOUSE = wh_name) reuse an existing Warehouse, which makes costs more predictable and is useful when multiple Tasks share the same Warehouse. Serverless Tasks require Enterprise Edition or above.

How do I choose between Streams + Tasks and Dynamic Tables?

Streams + Tasks is an imperative approach to building pipelines, letting you write flexible logic such as MERGE, conditional branching, and fan-out writes to multiple tables. Dynamic Tables are declarative: you just define a SELECT statement and Snowflake manages incremental refresh automatically. Use Dynamic Tables when the pipeline is a chain of simple SELECT transformations, and use Streams + Tasks when you need complex CDC logic or custom error handling.

Check what you learned with practice questions

Practice with certification-focused question sets

無料で問題を解いてみる
Author

NicheeLab Editorial Team

NicheeLab editorial team focused on data engineering and cloud certification learning. Content is structured around practical study needs and official exam domains.


Related articles
Snowflake

Snowflake Certifications: All 11 Exams Explained (2026)

Every SnowPro certification — Associate, Core, Specialty, Ad...

Snowflake

Snowflake Exam Difficulty Ranking: All 11 Certs Compared (2026)

All 11 SnowPro exams ranked by difficulty with study-time es...

Snowflake

Snowflake Study Guide: Fastest Pass Route by Exam (2026)

How to pass SnowPro certifications efficiently — official ma...

Snowflake

SnowPro Core (COF-C03): Complete Exam Guide (2026)

Pass the SnowPro Core exam — six domains, scope, sample ques...

Snowflake

SnowPro Associate Platform (SOL-C01): Complete Guide (2026)

The entry-level SnowPro Associate exam — scope, weighting, s...

Browse all Snowflake articles (103)
© 2026 NicheeLab All rights reserved.