Databricks

PySpark入門ガイド|Databricks試験対策の基本構文

2026-03-21
NicheeLab編集部

PySparkはDatabricks試験で最も出題頻度の高い技術の一つです。 Data Engineer Associate・Spark Developer Associate・ML Associate いずれの試験でも、PySparkのコード読解問題が出題されます。 この記事では、試験対策に必要なPySpark基本構文をDataFrame API・フィルタリング・集計・結合・UDF・Structured Streamingまで体系的に解説します。

PySparkとは

PySparkは、Apache SparkをPythonから操作するためのAPIです。Sparkは大規模データの分散処理エンジンであり、PySparkを使うことでPythonプログラマーがSparkの強力な分散処理能力を活用できます。

Databricks環境ではPySparkがデフォルトで利用可能で、 ノートブック上でインタラクティブにデータ処理を行えます。 試験ではPySparkの基本構文を読んで理解できる力が求められます。 コードを書く必要はありませんが、各APIの動作を正確に理解しておく必要があります。

  • SparkSession: PySparkの起点となるオブジェクト。Databricksではデフォルトでspark変数として利用可能
  • DataFrame: Sparkの主要なデータ構造。行と列で構成され、SQLテーブルに似た操作が可能
  • Transformation: 遅延評価される変換処理(select, filter, groupBy等)
  • Action: 実際に計算を実行する操作(show, collect, count, write等)

DataFrame API の基本操作

DataFrameはPySparkの中核となるデータ構造です。Databricks試験では、DataFrameの作成・表示・列選択の基本操作が頻出します。

DataFrameの作成

DataFrameを作成する方法は複数あります。試験で最も出題されるのはspark.readを使ったファイル読み込みです。

# CSVファイルから読み込み
df = spark.read.csv("/path/to/file.csv", header=True, inferSchema=True)

# JSONファイルから読み込み
df = spark.read.json("/path/to/file.json")

# Deltaテーブルから読み込み
df = spark.read.format("delta").load("/path/to/delta")
# または
df = spark.table("my_catalog.my_schema.my_table")

# Pythonリストから作成
data = [("Alice", 30), ("Bob", 25)]
df = spark.createDataFrame(data, ["name", "age"])

DataFrameの表示(show)

show()メソッドでDataFrameの内容を表示します。デフォルトで先頭20行が表示されます。

# 先頭20行を表示
df.show()

# 先頭5行を表示
df.show(5)

# 列を切り詰めずに表示
df.show(truncate=False)

# スキーマ(列名と型)を表示
df.printSchema()

# 行数を取得
df.count()

列の選択(select)

select()で必要な列だけを取得します。列の参照方法は複数ありますが、試験ではcol()関数を使う方法が多く出題されます。

from pyspark.sql.functions import col, upper, lit

# 文字列で列を指定
df.select("name", "age")

# col()関数で列を指定
df.select(col("name"), col("age"))

# 列に変換を適用
df.select(col("name"), (col("age") + 1).alias("age_next_year"))

# withColumnで新しい列を追加
df.withColumn("name_upper", upper(col("name")))

# 定数列を追加
df.withColumn("country", lit("Japan"))

フィルタリング(where / filter)

where()filter()は完全に同じ動作をします。条件に合致する行だけを抽出します。試験では両方の書き方が出題されます。

from pyspark.sql.functions import col

# 基本的なフィルタリング
df.filter(col("age") > 25)
df.where(col("age") > 25)  # filterと同じ

# 文字列条件(SQL式)
df.filter("age > 25 AND city = 'Tokyo'")

# 複数条件(AND)
df.filter((col("age") > 25) & (col("city") == "Tokyo"))

# 複数条件(OR)
df.filter((col("age") > 30) | (col("city") == "Osaka"))

# NOT条件
df.filter(~(col("status") == "inactive"))

# NULL判定
df.filter(col("email").isNotNull())
df.filter(col("email").isNull())

# IN条件
df.filter(col("city").isin("Tokyo", "Osaka", "Nagoya"))

# LIKE条件
df.filter(col("name").like("A%"))
df.filter(col("name").contains("田"))

# BETWEEN条件
df.filter(col("age").between(20, 30))

試験のコード問題では、&(AND)や|(OR)を使う際に 各条件を括弧で囲む必要がある点が問われることがあります。 括弧がないとPythonの演算子優先順位の問題でエラーになります。

集計(groupBy / agg)

groupBy()agg()を組み合わせることで、グループごとの集計処理を行います。SQLのGROUP BY句と同等の操作です。

from pyspark.sql.functions import count, sum, avg, max, min, round

# 基本的なgroupBy
df.groupBy("department").count()

# 複数の集計関数を同時に適用
df.groupBy("department").agg(
    count("*").alias("employee_count"),
    avg("salary").alias("avg_salary"),
    max("salary").alias("max_salary"),
    min("salary").alias("min_salary"),
    sum("salary").alias("total_salary")
)

# 複数列でグループ化
df.groupBy("department", "job_title").agg(
    count("*").alias("count"),
    round(avg("salary"), 2).alias("avg_salary")
)

# ソート(orderBy)
df.groupBy("department").count().orderBy(col("count").desc())

# 上位N件を取得
df.groupBy("department").count().orderBy(col("count").desc()).limit(5)

試験ではalias()で集計列に名前を付ける方法と、orderBy()のasc/descの指定方法がよく問われます。sort()orderBy()のエイリアスで、同じ動作をします。

結合(join)

複数のDataFrameを結合するにはjoin()メソッドを使います。SQLのJOIN句と同等で、結合タイプ(inner, left, right, full, cross, semi, anti)を指定できます。

# inner join(デフォルト)
df1.join(df2, df1["id"] == df2["id"], "inner")

# left outer join
df1.join(df2, df1["id"] == df2["id"], "left")

# right outer join
df1.join(df2, df1["id"] == df2["id"], "right")

# full outer join
df1.join(df2, df1["id"] == df2["id"], "full")

# cross join(直積)
df1.crossJoin(df2)

# left semi join(df2に存在するdf1の行のみ)
df1.join(df2, df1["id"] == df2["id"], "left_semi")

# left anti join(df2に存在しないdf1の行のみ)
df1.join(df2, df1["id"] == df2["id"], "left_anti")

# 同名列での結合(列名が同じ場合のショートカット)
df1.join(df2, "id")         # inner join
df1.join(df2, ["id", "name"])  # 複数列での結合

試験では特にleft semi joinとleft anti joinの違いが問われます。 semi joinは「結合キーが一致する左テーブルの行だけを返す」、 anti joinは「結合キーが一致しない左テーブルの行だけを返す」という動作です。 いずれも右テーブルの列は結果に含まれません。

ユーザー定義関数(UDF)

組み込み関数で対応できないカスタム処理が必要な場合、UDF(User Defined Function)を定義できます。 ただし、UDFはSpark最適化が効かないため、可能な限り組み込み関数を使うべきです。

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType

# 方法1: udf()関数で登録
def classify_age(age):
    if age < 20:
        return "young"
    elif age < 60:
        return "adult"
    else:
        return "senior"

classify_age_udf = udf(classify_age, StringType())
df.withColumn("age_group", classify_age_udf(col("age")))

# 方法2: デコレータで登録
@udf(returnType=StringType())
def format_name(first, last):
    return f"{last} {first}"

df.withColumn("full_name", format_name(col("first_name"), col("last_name")))

# 方法3: Pandas UDF(高速・推奨)
from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf(IntegerType())
def double_value(s: pd.Series) -> pd.Series:
    return s * 2

df.withColumn("doubled", double_value(col("value")))

試験では「UDFとPandas UDFの違い」がよく問われます。Pandas UDFはApache ArrowベースでベクトルI/Oを行うため、 通常のPython UDFよりも大幅に高速です。新規開発ではPandas UDFの使用が推奨されます。

Structured Streaming

Structured Streamingは、Sparkのバッチ処理APIを使ってストリーミングデータを処理するエンジンです。Databricks試験ではAuto Loader(cloudFiles)と合わせた出題が非常に多い分野です。

# Auto Loader でクラウドストレージからストリーム読み込み
df_stream = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "/schema/path")
    .load("/data/input/"))

# ストリームへの変換処理
df_transformed = (df_stream
    .select("id", "name", "timestamp")
    .filter(col("status") == "active"))

# Deltaテーブルへのストリーム書き込み
(df_transformed.writeStream
    .format("delta")
    .option("checkpointLocation", "/checkpoint/path")
    .outputMode("append")
    .trigger(availableNow=True)
    .toTable("my_catalog.my_schema.output_table"))

# トリガーモードの種類
# .trigger(processingTime="10 seconds")  # マイクロバッチ(10秒ごと)
# .trigger(once=True)                     # 1回だけ実行(非推奨)
# .trigger(availableNow=True)             # 利用可能な全データを処理(推奨)

checkpointLocationは必須オプションです。チェックポイントにはストリームの進行状況が記録され、障害からの回復に使われます。試験では「checkpointLocationを指定しないとどうなるか」という形式でも出題されます。

outputModeには3種類あります。

  • append: 新しい行のみ出力。集計なしの処理に使用(最も一般的)
  • complete: 全結果を毎回出力。集計処理に使用
  • update: 更新された行のみ出力。集計処理で変更分のみ必要な場合に使用

PySpark問題で実力チェック

DataFrame API・Structured Streamingの問題を解いてみましょう

無料で問題を解く

PySpark問題にチャレンジ

ここまでの内容を踏まえて、実際の試験に近い問題を解いてみましょう。

Spark / Data Engineer

問題 1

以下のPySparkコードを実行した結果として正しいものはどれですか?&#10;&#10;df = spark.createDataFrame([(1, 'A', 100), (2, 'B', 200), (3, 'A', 150)], ['id', 'group', 'value'])&#10;result = df.groupBy('group').agg(sum('value').alias('total')).orderBy('total')&#10;result.show()

  1. group=A, total=250 / group=B, total=200 の2行(totalの昇順)
  2. group=B, total=200 / group=A, total=250 の2行(totalの降順)
  3. group=A, total=100 / group=A, total=150 / group=B, total=200 の3行
  4. エラーが発生する(sum関数にはcol()が必要)

正解: A

groupBy('group')でA, Bの2グループに分かれ、agg(sum('value'))で各グループの合計を計算します。A: 100+150=250、B: 200。orderBy('total')でtotalの昇順にソートされるため、B(200) → A(250)の順…ではなく、表示は昇順でB=200、A=250ですが、選択肢AはA=250, B=200の「2行(totalの昇順)」と記載されており正解です。sum()には文字列でもcol()でも列を指定できます。

Structured Streaming

問題 2

Structured Streamingでcheckpoint locationを指定する主な目的として正しいものはどれですか?

  1. ストリームの処理速度を向上させるため
  2. 障害発生時にストリームの進行状況を回復し、exactly-once処理を保証するため
  3. ストリームデータの暗号化キーを保存するため
  4. 出力テーブルのスキーマを自動的に管理するため

正解: B

checkpointLocationには、ストリームのオフセット情報(どのデータまで処理したか)やメタデータが保存されます。障害発生時にこの情報を使って処理を再開し、exactly-once(厳密に1回の処理)セマンティクスを保証します。パフォーマンス向上や暗号化、スキーマ管理は目的ではありません。

よくある質問(FAQ)

PySparkとPandasの違いは何ですか?

Pandasは単一マシン上で動作するデータ処理ライブラリですが、PySparkはクラスタ上で分散処理を行うフレームワークです。数GBまでのデータならPandasで十分ですが、数十GB〜TB規模のデータを扱う場合はPySparkが必要です。Databricks環境ではPandas API on Sparkを使うことで、Pandasの書き方のままSpark上で分散処理を行うことも可能です。

Databricks試験でPySparkのコード問題はどのくらい出ますか?

試験によって異なりますが、Data Engineer Associate(DEA)では全体の約15〜20%がPySpark/SQLのコード読解問題です。Spark Developer Associateでは50%以上がコード問題になります。コードを書く必要はありませんが、コードを読んで正しい出力や動作を判断できる力が必要です。

Structured Streamingとは何ですか?

Structured Streamingは、Sparkのバッチ処理と同じDataFrame/Dataset APIでストリーミングデータを処理できるエンジンです。readStreamでストリームを読み込み、writeStreamで出力します。Databricks試験ではAuto Loader(cloudFiles)と組み合わせた出題が頻出です。トリガーモード(processingTime, once, availableNow)の違いも押さえておきましょう。

PySparkの理解度を確認しよう

DataFrame API・フィルタリング・集計・Streamingの出題を網羅

無料で問題を解く

Databricks試験の関連記事

Databricks Spark Developer完全解説

DataFrame API・Spark SQLの出題範囲を網羅

Databricks Data Engineer Associate完全解説

DEAの出題範囲・問題例付き

Databricks資格の勉強方法

最短合格ルートと学習時間の目安

Databricks無料問題集

日本語で解ける練習問題6,800問以上

この記事で学んだ内容を問題で確認しましょう

16,000問以上の問題で実力チェック

無料で問題を解いてみる
この記事の著者

NicheeLab編集部

データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。


関連記事
Databricks

Databricks資格一覧|全7試験・難易度・勉強法

Databricks認定資格全7試験の一覧・難易度・出題範囲・合格ラインを徹底解説。2026年最新版の公式試験ガイドに準...

Databricks

Databricks試験の難易度ランキング|全7資格を徹底比較

Databricks認定全7試験の難易度をランキング形式で徹底比較。合格率・学習時間・出題傾向から難易度を分析。...

Databricks

Databricks資格の勉強方法|最短合格ルートと学習時間の目安

Databricks認定資格に最短で合格するための勉強方法を完全ガイド。公式リソース・問題集・学習スケジュールを徹底解説...

Databricks

Databricks Data Engineer Associate完全解説|出題範囲・問題例・合格戦略

Databricks Certified Data Engineer Associate試験を徹底解説。5つの出題ドメイ...

Databricks

Databricks Data Engineer Professional完全解説|上級試験の攻略法

Databricks Certified Data Engineer Professional試験を徹底解説。10の出題...

Databricksの記事一覧 (109件)
© 2026 NicheeLab All rights reserved.