dbt

dbt Incremental の最適化: 高速な増分ジョブ設計

2026-04-19
NicheeLab編集部

増分更新は最小のデータだけを処理できる一方で、設計を誤るとフルスキャンや小ファイル問題により遅くなります。

本稿は公式ドキュメントの挙動に沿って、戦略選択、ウォーターマーク、MERGE/INSERT_OVERWRITE、遅延到着・削除、運用チューニングまでを一気通貫で最適化します。

増分モデルの基本と戦略選択

dbt の incremental は materialized='incremental' により、既存テーブルを保持しつつ差分のみを書き足す仕組みです。戦略は概ね append、merge、insert_overwrite の3系統で、アダプタごとに対応が異なります。Snowflake では merge が主流、Databricks(Delta) では merge もしくは partition 指定の insert_overwrite が実務で多用されます。

選択基準は「キー同一行の上書きが必要か」「パーティション単位での再生成が有利か」「ソースの遅延到着がどの程度あるか」です。unique_key を起点に同一行を更新できるなら merge、日次など明確なパーティション境界があり再計算が軽いなら insert_overwrite が有効です。

  • append: 単純追記。重複排除がいらないログ系のみで有効。
  • merge: unique_key で更新/挿入を制御。Snowflake・Delta の王道。
  • insert_overwrite: 指定パーティションだけ入れ替え。日付粒度の再計算に強い。
戦略主な対応エンジン典型ユースケースリスク・注意点
appendSnowflake, Databricks(Delta) ほか追記専用イベントログ重複・上書き不可。遅延到着や再計算に弱い
mergeSnowflake, Databricks(Delta)キー単位のアップサート、緩やかな遅延到着unique_key と結合条件の設計が肝。過大なステージ入力は遅い
insert_overwriteDatabricks(Delta) 等の Spark 系日付パーティションの再作成・バックフィルパーティション選択が粗いと書き込み過多。小ファイル増に注意

増分パイプラインの概念

Sourceraw/CDCStagingfilteredIncremental Modelmerge/overwriteAnalytics Tablespartitioned/clusteredraw/CDC → staging でフィルタ → 増分モデルで merge/overwrite → パーティション/クラスタ化された分析テーブル

最小構成の増分モデル骨子

{{ config(
  materialized='incremental',
  incremental_strategy='merge',  -- Snowflake/Delta で一般的
  unique_key='id',               -- アップサートのキー
  on_schema_change='append_new_columns'  -- アダプタ対応状況に依存
) }}

with src as (
  select *
  from {{ source('app', 'events') }}
  {% if is_incremental() %}
    -- ウォーターマークで入力を絞る(例: updated_at)
    where updated_at >= dateadd(day, -1, (select coalesce(max(updated_at), '1900-01-01') from {{ this }}))
  {% endif %}
)

select
  id,
  event_type,
  payload,
  updated_at
from src;

is_incremental とウォーターマーク設計

is_incremental() で差分条件を切り替える際、最大値参照だけに依存すると対象テーブルの集計がボトルネックになる場合があります。理想はソース側の更新時刻・増分キーをそのまま条件化し、必要に応じて「フェンス幅」を設けて遅延到着を取り込むことです。

ウォーターマーク列は単調増加・非NULL・タイムゾーン一貫が前提です。取り込み範囲は可変パラメータ化し、バックフィル時に上書きできるようにします。

  • 基本: ソースの updated_at >= run_started_at - lookback の形で制御
  • 遅延到着対策: 少し広めに読む(例: 24〜48時間) + 重複排除は下流で
  • 最大値参照の最適化: 参照対象を小さな要約テーブルに切替える

可変ウォーターマークの実装例(Jinja + 変数)

{% set lookback_hours = var('wm_lookback_hours', 24) %}
{% set lower_bound_expr %}dateadd(hour, -{{ lookback_hours }}, {{ run_started_at }}){% endset %}

{{ config(materialized='incremental', incremental_strategy='merge', unique_key='id') }}

with src as (
  select *
  from {{ source('crm', 'customers') }}
  {% if is_incremental() %}
    where updated_at >= {{ lower_bound_expr }}
  {% endif %}
)
,
dedup as (
  -- 重複排除: ソース内で同一キー最新行を選ぶ
  select asof.*
  from (
    select *, row_number() over(partition by id order by updated_at desc) as rn
    from src
  ) asof
  where rn = 1
)
select * from dedup;

MERGE 戦略の最適化(Snowflake / Databricks)

MERGE はアップサートの柔軟性が高い反面、結合対象の行数が多いと急激に遅くなります。最適化の第一歩は、is_incremental() 側で十分に入力を絞り、merge の結合キー(unique_key)を明確にすることです。ステージ側でキーごとに最新行へ集約してから MERGE するだけでも大きく効きます。

Snowflake ではクラスタリングキーの設計がスキャン効率に影響します。ALTER TABLE ... CLUSTER BY でキーを設定し、検索効率を高めます。Databricks(Delta) では OPTIMIZE と ZORDER BY によりデータスキップ性を向上させ、MERGE 後の断片化を抑えます。これらはコストを伴うため、post-hook で頻度を制御すると安定します。

  • MERGE 入力は「必要キーの最新レコード」に集約してから投入
  • Snowflake: CLUSTER BY のキーは join/filter に使う列を優先
  • Databricks(Delta): 大量更新の後だけ OPTIMIZE/ZORDER を実行

MERGE 最適化の実例(クラスタリング/OPTIMIZE を post-hook で)

{{ config(
  materialized='incremental',
  incremental_strategy='merge',
  unique_key=['account_id', 'effective_date'],
  post_hook=[
    {% raw %}
    "{% if target.type == 'snowflake' %}"
    "alter table {{ this }} cluster by (account_id, effective_date);",
    "{% endif %}"
    ,
    "{% if target.type in ['databricks','spark'] %}"
    "optimize {{ this }} zorder by (account_id);",
    "{% endif %}"
    {% endraw %}
  ]
) }}

with staged as (
  select *, row_number() over(partition by account_id, effective_date order by updated_at desc) as rn
  from {{ ref('stg_accounts_daily') }}
  {% if is_incremental() %}
    where updated_at >= dateadd(day, -2, {{ run_started_at }})
  {% endif %}
)
select account_id, effective_date, balance, updated_at
from staged
where rn = 1;

INSERT_OVERWRITE とパーティション運用(Databricks Delta)

Spark/Databricks では partition_by を指定した incremental_strategy='insert_overwrite' が使えます。これにより影響のあるパーティションのみ入れ替えられるため、日次や月次の再計算が高速になります。パーティションキーはフィルタで頻用し、カーディナリティがほどよい列(日付など)を選びます。

一方で細かすぎるパーティションは小ファイルを増やし、読み取り・メタデータ管理が遅くなります。定期的な OPTIMIZE と適切なファイルサイズに落ち着くようバッチの粒度を調整してください。Snowflake での insert_overwrite は一般的でないため、Snowflake では merge を優先します。

  • 日次パーティション + insert_overwrite はバックフィルに強い
  • 小ファイル対策: 書き込み並列と OPTIMIZE の頻度を制御
  • Snowflake は merge を基本とし、再計算はモデル分割で対処

Delta での INSERT_OVERWRITE 例(日次パーティション)

{{ config(
  materialized='incremental',
  incremental_strategy='insert_overwrite',
  partition_by=['dt']
) }}

with base as (
  select *, to_date(event_time) as dt
  from {{ source('app', 'clicks') }}
  {% if is_incremental() %}
    where event_time >= dateadd(day, -3, {{ run_started_at }})
  {% endif %}
)

select
  user_id,
  url,
  event_time,
  to_date(event_time) as dt
from base
-- insert_overwrite は上記 partition_by=['dt'] に基づき、影響日のパーティションのみ入替

遅延到着・削除の扱いとテスト戦略

遅延到着は「読み取り範囲を広め + 下流で最新行に正規化」で吸収します。行の削除は、ソースに削除フラグ(トゥームストーン)がある場合は MERGE の WHEN MATCHED AND deleted=1 THEN DELETE で対応可能です。ソースから物理削除されてしまう場合は、定期的なフル比較モデルやスナップショットで整合性を担保します。

品質担保には、unique や not_null の基本テストに加え、増分特有のリージョン(直近N日など)だけを検査する選択的テストが効率的です。モデルに lookback を組み込む際は、テストも同じウィンドウを参照できるよう vars を共用します。

  • 削除フラグがある場合: MERGE の WHEN MATCHED ... THEN DELETE を活用
  • 物理削除のみの場合: スナップショットや別モデルでの整合性チェックを検討
  • テストは直近パーティションに集中させ、高頻度で回す

削除フラグ対応の MERGE 条件とテスト定義

{{ config(materialized='incremental', incremental_strategy='merge', unique_key='id') }}

with src as (
  select * from {{ source('erp', 'orders_cdc') }}
  {% if is_incremental() %}
    where updated_at >= dateadd(day, -2, {{ run_started_at }})
  {% endif %}
), latest as (
  select *, row_number() over(partition by id order by updated_at desc) rn
  from src
)
select id, status, amount, updated_at, deleted
from latest
where rn = 1

-- dbt はアダプタの既定 MERGE 文を生成。deleted=1 の行は WHEN MATCHED AND deleted=1 THEN DELETE を条件に含めるカスタム化も可能(adapter/バージョンに依存)。

-- tests/schema.yml の一例
-- version: 2
-- models:
--   - name: fct_orders
--     tests:
--       - unique:
--           column_name: id
--       - not_null:
--           column_name: id
--       - relationships:
--           to: ref('dim_customers')
--           field: customer_id

実行計画・運用チューニング

高速化はクエリ最適化だけでなく、どのモデルをいつ実行するかの計画で決まります。state:modified+ やタグを使って真に変更の影響が及ぶモデルだけを選択し、並列度(threads)を環境の制約に合わせて調整します。大規模 MERGE を同時多発させないのが安定運用のコツです。

Databricks(Delta) では大きなバックフィル後のみ OPTIMIZE/VACUUM を実施するなど、メンテナンスの頻度を下げるとコストと時間を節約できます。Snowflake ではクラスタリングキー設定を見直し、不要なリクラスタリングを避けます。

  • モデル選択: dbt build -s state:modified+ で影響範囲のみ再実行
  • 並列度: 重い MERGE は同一ウェアハウスで直列化して競合を回避
  • メンテナンス: Delta は必要時のみ OPTIMIZE/VACUUM、Snowflake は適切な CLUSTER BY

実行選択とスケジューリング例

# 変更差分のみビルド(本番を参照してテスト環境で検証)
dbt build -s state:modified+ --defer --state target/artifacts --target prod

# 重い増分タグを夜間バッチに限定
dbt run -s tag:heavy_incremental --threads 2

# selectors.yml の例
# selectors:
#   - name: modified_plus_heavy
#     definition:
#       union:
#         - method: state
#           value: modified+
#         - method: tag
#           value: heavy_incremental

問題で確認

Analytics Engineer

問題 1

Databricks(Delta) で日次パーティションの売上テーブルを再計算するタスクが頻繁に発生します。遅延到着は最大48時間、同一キーの上書きは不要です。最も効率的な戦略はどれか。

  1. incremental_strategy='insert_overwrite' と partition_by=['sale_date'] を用い、is_incremental() で直近2〜3日のみを対象にする
  2. incremental_strategy='merge' を用い、全期間を入力にして毎回アップサートする
  3. materialized='table' にして毎回フルリビルドする
  4. append 戦略でひたすら追記し、誤差はダッシュボード側でフィルタする

正解: A

日次パーティションが明確で上書き要件がないため、影響日だけを入れ替える insert_overwrite が最も効率的です。遅延到着を考慮して 2〜3 日の範囲を読み、該当パーティションのみを置換します。merge 全期間やフルリビルドは不要な計算が多く、append は整合性を保てません。

よくある質問

unique_key は必ず必要ですか?

merge 戦略では実質必須です。キーなしでは同一行の突き合わせができず、重複や更新漏れが発生します。append や特定の insert_overwrite では不要ですが、整合性要件に応じてキー管理を検討してください。

Snowflake で insert_overwrite を使えますか?

一般的には merge が推奨です。Spark 系のような partition 指定の insert_overwrite は Snowflake では一般的でなく、再計算はモデル分割や別テーブルでの再生成で対応するのが現実的です。

OPTIMIZE や ZORDER は常に毎回実行すべきですか?

いいえ。Databricks(Delta) の OPTIMIZE/ZORDER はコストがかかります。大きなバックフィルや多数の小ファイルが生じた直後など、必要時だけ post-hook で条件実行するのが実務的です。

この記事で学んだ内容を問題で確認しましょう

16,000問以上の問題で実力チェック

無料で問題を解いてみる
この記事の著者

NicheeLab編集部

データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。


関連記事
dbt

dbt Model の基礎: SQL で定義する変換の最小単位

Analytics Engineer 向けに、dbt Model の定義、マテリアライゼーション、依存関係、インクリメン...

dbt

dbt Analytics Engineer 試験ガイド: 出題範囲・配点・申込の実務視点

dbt Analytics Engineer 認定の出題範囲、配点の考え方、申込から受験までの流れを、公式ドキュメントの...

dbt

dbt Cloud と dbt Core の違いと選び方:Analytics Engineer 試験に効く要点

dbt Cloud と dbt Core の機能差を、実務と資格対策の両面から整理。スケジューリング、IDE、RBAC、...

dbt

dbt プロジェクト構造ガイド: models / seeds / macros の実務レイアウト

Analytics Engineer 向けに、dbt プロジェクトのディレクトリ構造と命名規約、dbt_project....

dbt

dbt_project.yml の読み方:主要設定と命名を最短で掴む

dbt_project.yml の必須キー、命名解決(database.schema.identifier)、設定優先度...

dbtの記事一覧 (100件)
© 2026 NicheeLab All rights reserved.