dbt は実行ごとに run_results.json を生成し、各ノードのステータスや実行時間、アダプタ応答を記録します。これは単なるログではなく、パイプラインのパフォーマンスや品質を定量化できる信頼できるデータ源です。
この記事では、公式ドキュメントの安定した仕様に基づき、run_results.json の構造理解からデータ基盤への取り込み、SLO 監視までを一気通貫で解説します。Analytics Engineer 試験の観点も随所に織り込みます。
run_results.json は dbt の実行結果をまとめたアーティファクトです。dbt Core では通常 target ディレクトリに出力され、dbt Cloud では各ジョブ実行の Artifacts として取得できます。各エントリは実行対象ノードごとの結果を表し、全体のメタデータには dbt バージョン、生成時刻、invocation_id などが含まれます。
代表的なフィールドは次のとおりです。metadata(dbt_schema_version, dbt_version, generated_at, invocation_id 等)、results(unique_id, status, execution_time, timing, adapter_response, message, thread_id 等の配列)、elapsed_time(実行全体の経過時間)。adapter_response の内容はアダプタ依存ですが、rows_affected 等の値が入ることがあります。
| アーティファクト | 主な内容 | 更新タイミング |
|---|---|---|
| run_results.json | 各ノードの実行結果・所要時間・アダプタ応答 | 各コマンド実行後(run, test, seed など) |
| manifest.json | プロジェクトの依存関係・ノード定義・マクロ解決結果 | コンパイル時/実行時 |
| catalog.json | オブジェクトのカラムプロファイル(記述・型など) | catalog コマンド実行時 |
アーティファクトの生成フロー(概要)
dbt CLI/Cloud
|
v
[Compile & Execute]
|
+--> target/manifest.json
|
+--> target/run_results.json <-- 実行結果
|
+--> target/logs/ ...run_results.json の最小例(抜粋)
{
"metadata": {
"dbt_version": "1.x.x",
"generated_at": "2024-04-01T12:34:56Z",
"invocation_id": "e1c2..."
},
"results": [
{
"unique_id": "model.my_project.dim_customers",
"status": "success",
"execution_time": 12.34,
"adapter_response": {"rows_affected": 1234},
"timing": [{"name": "execute", "started_at": "...", "completed_at": "..."}],
"message": null
}
],
"elapsed_time": 45.67
}results[].execution_time はノードの純粋な実行秒数で、ボトルネック特定に有効です。status は success, error, skipped(およびテストでの fail など)が入り、再試行や選択条件の影響を見分けられます。adapter_response はアダプタ依存で、Snowflake や Databricks では rows_affected や実行コードなどが返ることがあります。
実務では、invocation_id をキーに 1 実行をまとまりとして集計し、ノード単位の P95 実行時間、成功率、行数変化(rows_affected)を可視化します。高頻度で遅いノードや、成功率が不安定なノードを抽出して改善順序を決めます。
| ステータス | 意味 | 監視での扱い |
|---|---|---|
| success | ノードが正常終了 | 成功率の分母・分子に計上 |
| error | 実行時エラーで失敗 | 重大インシデント。アラート必須 |
| skipped | 選択外や依存関係で未実行 | 成功率分母から除外(方針により扱い調整) |
1 ノードの実行タイムライン(概念)
[compile]----[execute================]----[post]
^ start ^ end
execution_time = end - startPython: 遅いノード Top5 を抽出(run_results.json を直接解析)
import json, pathlib
p = pathlib.Path('target/run_results.json')
with p.open() as f:
data = json.load(f)
rows = []
for r in data.get('results', []):
rows.append({
'unique_id': r.get('unique_id'),
'status': r.get('status'),
'sec': float(r.get('execution_time', 0.0)),
'rows_affected': (r.get('adapter_response') or {}).get('rows_affected')
})
slow = sorted([x for x in rows if x['status']=='success'], key=lambda x: x['sec'], reverse=True)[:5]
for i, x in enumerate(slow, 1):
print(f"{i}. {x['unique_id']} {x['sec']:.2f}s rows_affected={x['rows_affected']}")dbt test の結果も run_results.json に記録されます。テストの unique_id は一般に test.<package>.... の形式で識別でき、status は success / fail / error が使われます。fail はアサーション違反、error は実行エラーを示します。
失敗件数の正確な数え方はテストの種類やアダプタ実装に依存します。run_results.json はテストごとの合否を示すのに十分ですが、違反行数などの詳細はテスト SQL やログと併用して確認するのが実務的です。
| テスト種別 | 実行単位 | run_results での識別 |
|---|---|---|
| 汎用テスト(generic) | モデル/カラム単位でのアサーション展開 | unique_id が test.<pkg>.<name>.<hash> 形式 |
| 個別テスト(singular) | 任意 SQL によるアサーション | unique_id が test.<pkg>.<file_name>... 形式 |
モデルとテストの関係(概念)
model.my_project.dim_customers
|
+-- test.my_project.not_null_dim_customers_id (success)
+-- test.my_project.unique_dim_customers_id (fail)Python: テスト合格率を計算
import json
with open('target/run_results.json') as f:
data = json.load(f)
results = [r for r in data.get('results', []) if 'test.' in (r.get('unique_id') or '')]
passed = sum(1 for r in results if r.get('status')=='success')
failed = sum(1 for r in results if r.get('status') in ('fail','error'))
rate = (passed / (passed + failed)) * 100 if (passed+failed)>0 else None
print({'tests': len(results), 'passed': passed, 'failed': failed, 'pass_rate_pct': rate})run_results.json は生成ごとに 1 ファイルなので、オブジェクトストレージへ保存し、invocation_id と generated_at をキーにデータウェアハウスへ取り込みます。まずは RAW 層で JSON 全体を 1 カラムに保持し、その後 FLATTEN/EXPLODE で results 配列を正規化して分析用のワイド表を作るのが堅実です。
主キーは invocation_id + unique_id の組み合わせが実務的に扱いやすく、二重取り込み対策になります。行数や型はアダプタ依存なので、数値項目はキャスト時に NULL 許容で設計します。
| 層 | 目的 | 主なカラム/形式 |
|---|---|---|
| RAW(JSON) | 元データ保持・再処理可能性 | artifact VARIANT/STRING, load_ts |
| STG(展開) | results 配列の行展開 | invocation_id, unique_id, status, execution_time, adapter_response |
| DM(分析) | ダッシュボード・SLO 指標 | ノード粒度の P95, 成功率, rows_affected など |
取り込みパイプライン(Snowflake 例)
[Object Storage]
|
COPY INTO (RAW VARIANT)
|
FLATTEN(results)
|
ANALYTICS TABLES (SLO/可視化)Snowflake: JSON の展開と正規化(概念例)
create or replace table raw_run_results(artifact variant);
-- 取り込みは COPY INTO で stage から artifact にロード(省略)
-- 展開(results 配列を 1 行ずつに)
create or replace view stg_run_results as
select
artifact:metadata:invocation_id::string as invocation_id,
artifact:metadata:generated_at::timestamp_tz as generated_at,
r.value:unique_id::string as unique_id,
r.value:status::string as status,
r.value:execution_time::float as execution_time,
r.value:adapter_response as adapter_response
from raw_run_results,
lateral flatten(input => artifact:results) r;
-- 分析用テーブル(例: rows_affected を抽出、NULL 許容)
create or replace view dm_run_results as
select
invocation_id,
unique_id,
status,
execution_time,
try_to_number(adapter_response:rows_affected) as rows_affected,
generated_at
from stg_run_results;SLO の基本は、invocation_id 単位の成功率と、ノード単位の P95 実行時間です。run_results.json はどのノードがどれくらい時間を要したかを安定的に提供するため、スケジュール実行の健全性を定量化できます。周期性のあるワークロードでは、曜日・時間帯ごとに P95 を別管理すると誤検知を避けられます。
回帰検知は、直近 N 回の移動中央値や P95 の閾値超過を監視します。失敗の多いテストは別のダッシュボードで合格率と分けて表示し、原因の切り分け(SQL エラーか、データ品質の失敗か)を明確にします。
| メトリクス | 定義例 | 注意点 |
|---|---|---|
| 成功率 | success / (success + error) | skipped の扱いは分母から外す方針が多い |
| P95 実行時間 | execution_time の 95 パーセンタイル | スケジューラ遅延ではなく DB 実行時間を指標に |
| 行数変化 | rows_affected の推移 | アダプタ依存・NULL ありを許容 |
SLO 監視フロー(概念)
run_results.json -> RAW -> STG -> DM
| |
v v
成功率算出 P95 実行時間
\______________/
|
AlertingDatabricks(PySpark): P95 実行時間を算出
from pyspark.sql import functions as F
from pyspark.sql import types as T
# JSON を読み込み(ファイル配置は環境に合わせて設定)
df_raw = spark.read.json('/mnt/artifacts/run_results/*.json')
# results を explode
results = df_raw.select(
F.col('metadata.invocation_id').alias('invocation_id'),
F.explode_outer('results').alias('r')
)
fact = results.select(
'invocation_id',
F.col('r.unique_id').alias('unique_id'),
F.col('r.status').alias('status'),
F.col('r.execution_time').cast('double').alias('execution_time')
)
p95 = (fact
.where(F.col('status')=='success')
.groupBy('unique_id')
.agg(F.expr('percentile_approx(execution_time, 0.95)').alias('p95_sec')))
p95.orderBy(F.desc('p95_sec')).show(20, truncate=False)Analytics Engineer 試験では、アーティファクトの役割の違いと、パフォーマンス診断に使うべき指標の理解が問われやすいです。遅いモデルの特定は run_results.json の execution_time を使うのが正解で、manifest.json は構造・依存関係の把握が主目的です。
落とし穴として、ephemeral モデルは単体では実行されず、親モデルにインライン展開されるため、run_results.json に独立した結果として現れません。また seed は dbt seed 実行でのみ結果が出ます。rows_affected はアダプタ依存で常に入るとは限らない点にも注意します。
| 誤解しがちな点 | 正しい理解 | 対策 |
|---|---|---|
| manifest.json で実行時間が分かる | 実行時間は run_results.json に記録 | 遅延分析は run_results.json を参照 |
| ephemeral も毎回結果行がある | 親にインライン展開され独立結果は出ない | ソース SQL の複雑化に注意し、親モデルを監視 |
| rows_affected は常に取得できる | アダプタ依存で NULL の場合あり | NULL 安全なスキーマと可視化 |
ノード種別と run_results への出現
model (materialized) -> 出現
seed -> seed 実行時に出現
snapshot -> 出現
test -> 出現(success/fail/error)
ephemeral -> 独立しては出現しないBash: 実行後に run_results.json を保存・転送する(概念)
set -euo pipefail
run_ts=$(date -u +%Y%m%dT%H%M%SZ)
dbt run --select state:modified || true
# target/run_results.json を保存
mkdir -p artifacts/$run_ts
cp target/run_results.json artifacts/$run_ts/
# 例: AWS S3 へアップロード(環境に合わせて)
# aws s3 cp artifacts/$run_ts/run_results.json s3://my-bucket/dbt/run_results/$run_ts.jsonAnalytics Engineer
問題 1
dbt 実行の中で最も時間がかかったモデルを特定したい。最も適切なデータ源とフィールドの組み合わせはどれか。
正解: A
実行時間は run_results.json の各結果エントリに execution_time として記録される。manifest.json や catalog.json は構造やメタデータであり、実行時間比較には適さない。
run_results.json はどこで入手できますか?
dbt Core ではプロジェクトの target/run_results.json に生成されます。dbt Cloud ではジョブ実行の Artifacts からダウンロードできます。
ephemeral モデルは run_results.json に記録されますか?
ephemeral は単体で実行されず親モデルにインライン展開されるため、独立した結果エントリとしては現れません。
rows_affected は常に取得できますか?
いいえ。adapter_response の内容はアダプタ依存です。多くのアダプタで rows_affected が入ることがありますが、NULL の場合もあるため、解析時は NULL 許容で扱ってください。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
dbt Model の基礎: SQL で定義する変換の最小単位
Analytics Engineer 向けに、dbt Model の定義、マテリアライゼーション、依存関係、インクリメン...
dbt Analytics Engineer 試験ガイド: 出題範囲・配点・申込の実務視点
dbt Analytics Engineer 認定の出題範囲、配点の考え方、申込から受験までの流れを、公式ドキュメントの...
dbt Cloud と dbt Core の違いと選び方:Analytics Engineer 試験に効く要点
dbt Cloud と dbt Core の機能差を、実務と資格対策の両面から整理。スケジューリング、IDE、RBAC、...
dbt プロジェクト構造ガイド: models / seeds / macros の実務レイアウト
Analytics Engineer 向けに、dbt プロジェクトのディレクトリ構造と命名規約、dbt_project....
dbt_project.yml の読み方:主要設定と命名を最短で掴む
dbt_project.yml の必須キー、命名解決(database.schema.identifier)、設定優先度...