PySparkはDatabricks試験で最も出題頻度の高い技術の一つです。 Data Engineer Associate・Spark Developer Associate・ML Associate いずれの試験でも、PySparkのコード読解問題が出題されます。 この記事では、試験対策に必要なPySpark基本構文をDataFrame API・フィルタリング・集計・結合・UDF・Structured Streamingまで体系的に解説します。
PySparkは、Apache SparkをPythonから操作するためのAPIです。Sparkは大規模データの分散処理エンジンであり、PySparkを使うことでPythonプログラマーがSparkの強力な分散処理能力を活用できます。
Databricks環境ではPySparkがデフォルトで利用可能で、 ノートブック上でインタラクティブにデータ処理を行えます。 試験ではPySparkの基本構文を読んで理解できる力が求められます。 コードを書く必要はありませんが、各APIの動作を正確に理解しておく必要があります。
spark変数として利用可能DataFrameはPySparkの中核となるデータ構造です。Databricks試験では、DataFrameの作成・表示・列選択の基本操作が頻出します。
DataFrameを作成する方法は複数あります。試験で最も出題されるのはspark.readを使ったファイル読み込みです。
# CSVファイルから読み込み
df = spark.read.csv("/path/to/file.csv", header=True, inferSchema=True)
# JSONファイルから読み込み
df = spark.read.json("/path/to/file.json")
# Deltaテーブルから読み込み
df = spark.read.format("delta").load("/path/to/delta")
# または
df = spark.table("my_catalog.my_schema.my_table")
# Pythonリストから作成
data = [("Alice", 30), ("Bob", 25)]
df = spark.createDataFrame(data, ["name", "age"])show()メソッドでDataFrameの内容を表示します。デフォルトで先頭20行が表示されます。
# 先頭20行を表示
df.show()
# 先頭5行を表示
df.show(5)
# 列を切り詰めずに表示
df.show(truncate=False)
# スキーマ(列名と型)を表示
df.printSchema()
# 行数を取得
df.count()select()で必要な列だけを取得します。列の参照方法は複数ありますが、試験ではcol()関数を使う方法が多く出題されます。
from pyspark.sql.functions import col, upper, lit
# 文字列で列を指定
df.select("name", "age")
# col()関数で列を指定
df.select(col("name"), col("age"))
# 列に変換を適用
df.select(col("name"), (col("age") + 1).alias("age_next_year"))
# withColumnで新しい列を追加
df.withColumn("name_upper", upper(col("name")))
# 定数列を追加
df.withColumn("country", lit("Japan"))where()とfilter()は完全に同じ動作をします。条件に合致する行だけを抽出します。試験では両方の書き方が出題されます。
from pyspark.sql.functions import col
# 基本的なフィルタリング
df.filter(col("age") > 25)
df.where(col("age") > 25) # filterと同じ
# 文字列条件(SQL式)
df.filter("age > 25 AND city = 'Tokyo'")
# 複数条件(AND)
df.filter((col("age") > 25) & (col("city") == "Tokyo"))
# 複数条件(OR)
df.filter((col("age") > 30) | (col("city") == "Osaka"))
# NOT条件
df.filter(~(col("status") == "inactive"))
# NULL判定
df.filter(col("email").isNotNull())
df.filter(col("email").isNull())
# IN条件
df.filter(col("city").isin("Tokyo", "Osaka", "Nagoya"))
# LIKE条件
df.filter(col("name").like("A%"))
df.filter(col("name").contains("田"))
# BETWEEN条件
df.filter(col("age").between(20, 30))試験のコード問題では、&(AND)や|(OR)を使う際に 各条件を括弧で囲む必要がある点が問われることがあります。 括弧がないとPythonの演算子優先順位の問題でエラーになります。
groupBy()とagg()を組み合わせることで、グループごとの集計処理を行います。SQLのGROUP BY句と同等の操作です。
from pyspark.sql.functions import count, sum, avg, max, min, round
# 基本的なgroupBy
df.groupBy("department").count()
# 複数の集計関数を同時に適用
df.groupBy("department").agg(
count("*").alias("employee_count"),
avg("salary").alias("avg_salary"),
max("salary").alias("max_salary"),
min("salary").alias("min_salary"),
sum("salary").alias("total_salary")
)
# 複数列でグループ化
df.groupBy("department", "job_title").agg(
count("*").alias("count"),
round(avg("salary"), 2).alias("avg_salary")
)
# ソート(orderBy)
df.groupBy("department").count().orderBy(col("count").desc())
# 上位N件を取得
df.groupBy("department").count().orderBy(col("count").desc()).limit(5)試験ではalias()で集計列に名前を付ける方法と、orderBy()のasc/descの指定方法がよく問われます。sort()はorderBy()のエイリアスで、同じ動作をします。
複数のDataFrameを結合するにはjoin()メソッドを使います。SQLのJOIN句と同等で、結合タイプ(inner, left, right, full, cross, semi, anti)を指定できます。
# inner join(デフォルト)
df1.join(df2, df1["id"] == df2["id"], "inner")
# left outer join
df1.join(df2, df1["id"] == df2["id"], "left")
# right outer join
df1.join(df2, df1["id"] == df2["id"], "right")
# full outer join
df1.join(df2, df1["id"] == df2["id"], "full")
# cross join(直積)
df1.crossJoin(df2)
# left semi join(df2に存在するdf1の行のみ)
df1.join(df2, df1["id"] == df2["id"], "left_semi")
# left anti join(df2に存在しないdf1の行のみ)
df1.join(df2, df1["id"] == df2["id"], "left_anti")
# 同名列での結合(列名が同じ場合のショートカット)
df1.join(df2, "id") # inner join
df1.join(df2, ["id", "name"]) # 複数列での結合試験では特にleft semi joinとleft anti joinの違いが問われます。 semi joinは「結合キーが一致する左テーブルの行だけを返す」、 anti joinは「結合キーが一致しない左テーブルの行だけを返す」という動作です。 いずれも右テーブルの列は結果に含まれません。
組み込み関数で対応できないカスタム処理が必要な場合、UDF(User Defined Function)を定義できます。 ただし、UDFはSpark最適化が効かないため、可能な限り組み込み関数を使うべきです。
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType
# 方法1: udf()関数で登録
def classify_age(age):
if age < 20:
return "young"
elif age < 60:
return "adult"
else:
return "senior"
classify_age_udf = udf(classify_age, StringType())
df.withColumn("age_group", classify_age_udf(col("age")))
# 方法2: デコレータで登録
@udf(returnType=StringType())
def format_name(first, last):
return f"{last} {first}"
df.withColumn("full_name", format_name(col("first_name"), col("last_name")))
# 方法3: Pandas UDF(高速・推奨)
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(IntegerType())
def double_value(s: pd.Series) -> pd.Series:
return s * 2
df.withColumn("doubled", double_value(col("value")))試験では「UDFとPandas UDFの違い」がよく問われます。Pandas UDFはApache ArrowベースでベクトルI/Oを行うため、 通常のPython UDFよりも大幅に高速です。新規開発ではPandas UDFの使用が推奨されます。
Structured Streamingは、Sparkのバッチ処理APIを使ってストリーミングデータを処理するエンジンです。Databricks試験ではAuto Loader(cloudFiles)と合わせた出題が非常に多い分野です。
# Auto Loader でクラウドストレージからストリーム読み込み
df_stream = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/schema/path")
.load("/data/input/"))
# ストリームへの変換処理
df_transformed = (df_stream
.select("id", "name", "timestamp")
.filter(col("status") == "active"))
# Deltaテーブルへのストリーム書き込み
(df_transformed.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.trigger(availableNow=True)
.toTable("my_catalog.my_schema.output_table"))
# トリガーモードの種類
# .trigger(processingTime="10 seconds") # マイクロバッチ(10秒ごと)
# .trigger(once=True) # 1回だけ実行(非推奨)
# .trigger(availableNow=True) # 利用可能な全データを処理(推奨)checkpointLocationは必須オプションです。チェックポイントにはストリームの進行状況が記録され、障害からの回復に使われます。試験では「checkpointLocationを指定しないとどうなるか」という形式でも出題されます。
outputModeには3種類あります。
ここまでの内容を踏まえて、実際の試験に近い問題を解いてみましょう。
Spark / Data Engineer
問題 1
以下のPySparkコードを実行した結果として正しいものはどれですか? df = spark.createDataFrame([(1, 'A', 100), (2, 'B', 200), (3, 'A', 150)], ['id', 'group', 'value']) result = df.groupBy('group').agg(sum('value').alias('total')).orderBy('total') result.show()
正解: A
groupBy('group')でA, Bの2グループに分かれ、agg(sum('value'))で各グループの合計を計算します。A: 100+150=250、B: 200。orderBy('total')でtotalの昇順にソートされるため、B(200) → A(250)の順…ではなく、表示は昇順でB=200、A=250ですが、選択肢AはA=250, B=200の「2行(totalの昇順)」と記載されており正解です。sum()には文字列でもcol()でも列を指定できます。
Structured Streaming
問題 2
Structured Streamingでcheckpoint locationを指定する主な目的として正しいものはどれですか?
正解: B
checkpointLocationには、ストリームのオフセット情報(どのデータまで処理したか)やメタデータが保存されます。障害発生時にこの情報を使って処理を再開し、exactly-once(厳密に1回の処理)セマンティクスを保証します。パフォーマンス向上や暗号化、スキーマ管理は目的ではありません。
PySparkとPandasの違いは何ですか?
Pandasは単一マシン上で動作するデータ処理ライブラリですが、PySparkはクラスタ上で分散処理を行うフレームワークです。数GBまでのデータならPandasで十分ですが、数十GB〜TB規模のデータを扱う場合はPySparkが必要です。Databricks環境ではPandas API on Sparkを使うことで、Pandasの書き方のままSpark上で分散処理を行うことも可能です。
Databricks試験でPySparkのコード問題はどのくらい出ますか?
試験によって異なりますが、Data Engineer Associate(DEA)では全体の約15〜20%がPySpark/SQLのコード読解問題です。Spark Developer Associateでは50%以上がコード問題になります。コードを書く必要はありませんが、コードを読んで正しい出力や動作を判断できる力が必要です。
Structured Streamingとは何ですか?
Structured Streamingは、Sparkのバッチ処理と同じDataFrame/Dataset APIでストリーミングデータを処理できるエンジンです。readStreamでストリームを読み込み、writeStreamで出力します。Databricks試験ではAuto Loader(cloudFiles)と組み合わせた出題が頻出です。トリガーモード(processingTime, once, availableNow)の違いも押さえておきましょう。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Databricks資格一覧|全7試験・難易度・勉強法
Databricks認定資格全7試験の一覧・難易度・出題範囲・合格ラインを徹底解説。2026年最新版の公式試験ガイドに準...
Databricks試験の難易度ランキング|全7資格を徹底比較
Databricks認定全7試験の難易度をランキング形式で徹底比較。合格率・学習時間・出題傾向から難易度を分析。...
Databricks資格の勉強方法|最短合格ルートと学習時間の目安
Databricks認定資格に最短で合格するための勉強方法を完全ガイド。公式リソース・問題集・学習スケジュールを徹底解説...
Databricks Data Engineer Associate完全解説|出題範囲・問題例・合格戦略
Databricks Certified Data Engineer Associate試験を徹底解説。5つの出題ドメイ...
Databricks Data Engineer Professional完全解説|上級試験の攻略法
Databricks Certified Data Engineer Professional試験を徹底解説。10の出題...