増分更新は最小のデータだけを処理できる一方で、設計を誤るとフルスキャンや小ファイル問題により遅くなります。
本稿は公式ドキュメントの挙動に沿って、戦略選択、ウォーターマーク、MERGE/INSERT_OVERWRITE、遅延到着・削除、運用チューニングまでを一気通貫で最適化します。
dbt の incremental は materialized='incremental' により、既存テーブルを保持しつつ差分のみを書き足す仕組みです。戦略は概ね append、merge、insert_overwrite の3系統で、アダプタごとに対応が異なります。Snowflake では merge が主流、Databricks(Delta) では merge もしくは partition 指定の insert_overwrite が実務で多用されます。
選択基準は「キー同一行の上書きが必要か」「パーティション単位での再生成が有利か」「ソースの遅延到着がどの程度あるか」です。unique_key を起点に同一行を更新できるなら merge、日次など明確なパーティション境界があり再計算が軽いなら insert_overwrite が有効です。
| 戦略 | 主な対応エンジン | 典型ユースケース | リスク・注意点 |
|---|---|---|---|
| append | Snowflake, Databricks(Delta) ほか | 追記専用イベントログ | 重複・上書き不可。遅延到着や再計算に弱い |
| merge | Snowflake, Databricks(Delta) | キー単位のアップサート、緩やかな遅延到着 | unique_key と結合条件の設計が肝。過大なステージ入力は遅い |
| insert_overwrite | Databricks(Delta) 等の Spark 系 | 日付パーティションの再作成・バックフィル | パーティション選択が粗いと書き込み過多。小ファイル増に注意 |
増分パイプラインの概念
最小構成の増分モデル骨子
{{ config(
materialized='incremental',
incremental_strategy='merge', -- Snowflake/Delta で一般的
unique_key='id', -- アップサートのキー
on_schema_change='append_new_columns' -- アダプタ対応状況に依存
) }}
with src as (
select *
from {{ source('app', 'events') }}
{% if is_incremental() %}
-- ウォーターマークで入力を絞る(例: updated_at)
where updated_at >= dateadd(day, -1, (select coalesce(max(updated_at), '1900-01-01') from {{ this }}))
{% endif %}
)
select
id,
event_type,
payload,
updated_at
from src;is_incremental() で差分条件を切り替える際、最大値参照だけに依存すると対象テーブルの集計がボトルネックになる場合があります。理想はソース側の更新時刻・増分キーをそのまま条件化し、必要に応じて「フェンス幅」を設けて遅延到着を取り込むことです。
ウォーターマーク列は単調増加・非NULL・タイムゾーン一貫が前提です。取り込み範囲は可変パラメータ化し、バックフィル時に上書きできるようにします。
可変ウォーターマークの実装例(Jinja + 変数)
{% set lookback_hours = var('wm_lookback_hours', 24) %}
{% set lower_bound_expr %}dateadd(hour, -{{ lookback_hours }}, {{ run_started_at }}){% endset %}
{{ config(materialized='incremental', incremental_strategy='merge', unique_key='id') }}
with src as (
select *
from {{ source('crm', 'customers') }}
{% if is_incremental() %}
where updated_at >= {{ lower_bound_expr }}
{% endif %}
)
,
dedup as (
-- 重複排除: ソース内で同一キー最新行を選ぶ
select asof.*
from (
select *, row_number() over(partition by id order by updated_at desc) as rn
from src
) asof
where rn = 1
)
select * from dedup;MERGE はアップサートの柔軟性が高い反面、結合対象の行数が多いと急激に遅くなります。最適化の第一歩は、is_incremental() 側で十分に入力を絞り、merge の結合キー(unique_key)を明確にすることです。ステージ側でキーごとに最新行へ集約してから MERGE するだけでも大きく効きます。
Snowflake ではクラスタリングキーの設計がスキャン効率に影響します。ALTER TABLE ... CLUSTER BY でキーを設定し、検索効率を高めます。Databricks(Delta) では OPTIMIZE と ZORDER BY によりデータスキップ性を向上させ、MERGE 後の断片化を抑えます。これらはコストを伴うため、post-hook で頻度を制御すると安定します。
MERGE 最適化の実例(クラスタリング/OPTIMIZE を post-hook で)
{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key=['account_id', 'effective_date'],
post_hook=[
{% raw %}
"{% if target.type == 'snowflake' %}"
"alter table {{ this }} cluster by (account_id, effective_date);",
"{% endif %}"
,
"{% if target.type in ['databricks','spark'] %}"
"optimize {{ this }} zorder by (account_id);",
"{% endif %}"
{% endraw %}
]
) }}
with staged as (
select *, row_number() over(partition by account_id, effective_date order by updated_at desc) as rn
from {{ ref('stg_accounts_daily') }}
{% if is_incremental() %}
where updated_at >= dateadd(day, -2, {{ run_started_at }})
{% endif %}
)
select account_id, effective_date, balance, updated_at
from staged
where rn = 1;Spark/Databricks では partition_by を指定した incremental_strategy='insert_overwrite' が使えます。これにより影響のあるパーティションのみ入れ替えられるため、日次や月次の再計算が高速になります。パーティションキーはフィルタで頻用し、カーディナリティがほどよい列(日付など)を選びます。
一方で細かすぎるパーティションは小ファイルを増やし、読み取り・メタデータ管理が遅くなります。定期的な OPTIMIZE と適切なファイルサイズに落ち着くようバッチの粒度を調整してください。Snowflake での insert_overwrite は一般的でないため、Snowflake では merge を優先します。
Delta での INSERT_OVERWRITE 例(日次パーティション)
{{ config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by=['dt']
) }}
with base as (
select *, to_date(event_time) as dt
from {{ source('app', 'clicks') }}
{% if is_incremental() %}
where event_time >= dateadd(day, -3, {{ run_started_at }})
{% endif %}
)
select
user_id,
url,
event_time,
to_date(event_time) as dt
from base
-- insert_overwrite は上記 partition_by=['dt'] に基づき、影響日のパーティションのみ入替遅延到着は「読み取り範囲を広め + 下流で最新行に正規化」で吸収します。行の削除は、ソースに削除フラグ(トゥームストーン)がある場合は MERGE の WHEN MATCHED AND deleted=1 THEN DELETE で対応可能です。ソースから物理削除されてしまう場合は、定期的なフル比較モデルやスナップショットで整合性を担保します。
品質担保には、unique や not_null の基本テストに加え、増分特有のリージョン(直近N日など)だけを検査する選択的テストが効率的です。モデルに lookback を組み込む際は、テストも同じウィンドウを参照できるよう vars を共用します。
削除フラグ対応の MERGE 条件とテスト定義
{{ config(materialized='incremental', incremental_strategy='merge', unique_key='id') }}
with src as (
select * from {{ source('erp', 'orders_cdc') }}
{% if is_incremental() %}
where updated_at >= dateadd(day, -2, {{ run_started_at }})
{% endif %}
), latest as (
select *, row_number() over(partition by id order by updated_at desc) rn
from src
)
select id, status, amount, updated_at, deleted
from latest
where rn = 1
-- dbt はアダプタの既定 MERGE 文を生成。deleted=1 の行は WHEN MATCHED AND deleted=1 THEN DELETE を条件に含めるカスタム化も可能(adapter/バージョンに依存)。
-- tests/schema.yml の一例
-- version: 2
-- models:
-- - name: fct_orders
-- tests:
-- - unique:
-- column_name: id
-- - not_null:
-- column_name: id
-- - relationships:
-- to: ref('dim_customers')
-- field: customer_id高速化はクエリ最適化だけでなく、どのモデルをいつ実行するかの計画で決まります。state:modified+ やタグを使って真に変更の影響が及ぶモデルだけを選択し、並列度(threads)を環境の制約に合わせて調整します。大規模 MERGE を同時多発させないのが安定運用のコツです。
Databricks(Delta) では大きなバックフィル後のみ OPTIMIZE/VACUUM を実施するなど、メンテナンスの頻度を下げるとコストと時間を節約できます。Snowflake ではクラスタリングキー設定を見直し、不要なリクラスタリングを避けます。
実行選択とスケジューリング例
# 変更差分のみビルド(本番を参照してテスト環境で検証)
dbt build -s state:modified+ --defer --state target/artifacts --target prod
# 重い増分タグを夜間バッチに限定
dbt run -s tag:heavy_incremental --threads 2
# selectors.yml の例
# selectors:
# - name: modified_plus_heavy
# definition:
# union:
# - method: state
# value: modified+
# - method: tag
# value: heavy_incrementalAnalytics Engineer
問題 1
Databricks(Delta) で日次パーティションの売上テーブルを再計算するタスクが頻繁に発生します。遅延到着は最大48時間、同一キーの上書きは不要です。最も効率的な戦略はどれか。
正解: A
日次パーティションが明確で上書き要件がないため、影響日だけを入れ替える insert_overwrite が最も効率的です。遅延到着を考慮して 2〜3 日の範囲を読み、該当パーティションのみを置換します。merge 全期間やフルリビルドは不要な計算が多く、append は整合性を保てません。
unique_key は必ず必要ですか?
merge 戦略では実質必須です。キーなしでは同一行の突き合わせができず、重複や更新漏れが発生します。append や特定の insert_overwrite では不要ですが、整合性要件に応じてキー管理を検討してください。
Snowflake で insert_overwrite を使えますか?
一般的には merge が推奨です。Spark 系のような partition 指定の insert_overwrite は Snowflake では一般的でなく、再計算はモデル分割や別テーブルでの再生成で対応するのが現実的です。
OPTIMIZE や ZORDER は常に毎回実行すべきですか?
いいえ。Databricks(Delta) の OPTIMIZE/ZORDER はコストがかかります。大きなバックフィルや多数の小ファイルが生じた直後など、必要時だけ post-hook で条件実行するのが実務的です。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
dbt Model の基礎: SQL で定義する変換の最小単位
Analytics Engineer 向けに、dbt Model の定義、マテリアライゼーション、依存関係、インクリメン...
dbt Analytics Engineer 試験ガイド: 出題範囲・配点・申込の実務視点
dbt Analytics Engineer 認定の出題範囲、配点の考え方、申込から受験までの流れを、公式ドキュメントの...
dbt Cloud と dbt Core の違いと選び方:Analytics Engineer 試験に効く要点
dbt Cloud と dbt Core の機能差を、実務と資格対策の両面から整理。スケジューリング、IDE、RBAC、...
dbt プロジェクト構造ガイド: models / seeds / macros の実務レイアウト
Analytics Engineer 向けに、dbt プロジェクトのディレクトリ構造と命名規約、dbt_project....
dbt_project.yml の読み方:主要設定と命名を最短で掴む
dbt_project.yml の必須キー、命名解決(database.schema.identifier)、設定優先度...