pandas API on Spark(pyspark.pandas)は、 使い慣れたPandasの構文をそのままApache Sparkの分散処理エンジン上で実行できるAPIです。 シングルノードのPandasでは扱えない大規模データセットを、 コードの書き換えを最小限にして処理できます。
この記事では、Koalasからの統合経緯、Pandasとの比較、DataFrame相互変換のコード例、分散環境での制約と非対応API、Pandas UDFとの違い、Databricks試験での出題ポイントまでを網羅的に解説します。
pandas API on Sparkは、Apache Spark 3.2で正式にSpark本体に統合されたAPIで、pyspark.pandasモジュールとして提供されています。 前身はDatabricksが開発したオープンソースプロジェクト「Koalas」で、 PandasユーザーがSparkの分散処理を透過的に利用できるよう設計されました。
最大の特徴は、PandasのDataFrameやSeriesと同じAPIを提供しつつ、内部的にはSpark DataFrameとして分散処理される点です。これにより数百GB〜数TB規模のデータを、Pandasの直感的なコードで処理可能です。
Koalasは2019年にDatabricksがOSSとしてリリースしました。 「PandasコードをそのままSparkで動かしたい」というニーズに応え、 PandasのAPI互換レイヤーをSpark上に実装したものです。 Spark 3.2(2022年1月リリース)でApache Spark本体に統合され、pyspark.pandasとして標準モジュールになりました。 現在はpip install koalasは不要で、 PySpark 3.2以降であれば追加インストールなしで利用可能です。
pandas API on Sparkのインポートは以下の1行で完了します。 慣例としてpsというエイリアスが使われます。
import pyspark.pandas as ps
# CSVファイルの読み込み(Pandasと同じ構文)
df = ps.read_csv("/mnt/data/sales.csv")
# Parquetファイルの読み込み
df = ps.read_parquet("/mnt/data/sales.parquet")
# Deltaテーブルの読み込み(Databricks固有)
df = ps.read_delta("/mnt/data/delta_table")
# 基本操作
df.head(10)
df.describe()
df.shape
df.dtypesネイティブPandasとpandas API on Sparkの主な違いを表にまとめます。試験ではこの違いが正確に理解できているかが問われます。
| 比較項目 | Pandas | pandas API on Spark |
|---|---|---|
| 実行エンジン | シングルノード(CPython) | Apache Spark分散処理 |
| データサイズ上限 | メモリに収まるサイズ | クラスタのストレージ容量 |
| API互換性 | 基準(100%) | 約80〜85%カバー |
| 遅延評価 | 即時評価(Eager) | 遅延評価(Lazy)+ 一部即時 |
| 行の順序保証 | 保証あり | 保証なし(分散のため) |
| インデックス | 完全サポート | 一部制限あり(iloc非効率) |
| インポート | import pandas as pd | import pyspark.pandas as ps |
| 実行環境 | ローカルPython | Sparkクラスタ上 |
Pandas DataFrame・Spark DataFrame・pyspark.pandas DataFrameの3種類のDataFrameを相互に変換する方法は試験頻出トピックです。各変換メソッドの違いを正確に覚えましょう。
import pandas as pd
import pyspark.pandas as ps
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# ---- 1. Pandas → pyspark.pandas ----
pandas_df = pd.DataFrame({"name": ["Alice", "Bob"], "age": [30, 25]})
ps_df = ps.from_pandas(pandas_df)
# ---- 2. pyspark.pandas → Pandas ----
# 注意: データ全体がドライバに集約される
pandas_df2 = ps_df.to_pandas()
# ---- 3. Spark DataFrame → pyspark.pandas ----
spark_df = spark.createDataFrame(
[("Alice", 30), ("Bob", 25)], ["name", "age"]
)
ps_df2 = spark_df.to_pandas_on_spark()
# 別の方法
ps_df2 = ps.DataFrame(spark_df)
# ---- 4. pyspark.pandas → Spark DataFrame ----
spark_df2 = ps_df.to_spark()
# ---- 5. Spark DataFrame → Pandas(参考)----
# 注意: データ全体がドライバに集約される
pandas_df3 = spark_df.toPandas()
# ---- 6. Pandas → Spark DataFrame(参考)----
spark_df3 = spark.createDataFrame(pandas_df)重要:toPandas()とto_pandas()は データ全体をドライバノードのメモリに集約します。 大規模データセットではOutOfMemoryErrorのリスクがあるため、to_spark()でSpark DataFrameに変換して処理するのが安全です。
pandas API on Sparkは約80〜85%のPandas APIをカバーしていますが、分散環境の特性上、以下のAPIや動作は制限されています。
| カテゴリ | 制限されるAPI・動作 | 理由 |
|---|---|---|
| インデックス操作 | iloc(位置ベースアクセス) | シャッフル発生で非効率 |
| Window系 | ewm(指数加重移動平均) | 行の順序依存で分散不適合 |
| 文字列操作 | str.extractall(正規表現全マッチ) | 複雑な状態管理が必要 |
| 結合 | DataFrame.append(非推奨) | Pandas本体でも非推奨 |
| ソート | 行順序の暗黙保証 | 分散パーティションのため非保証 |
| プロット | plot()系メソッド | データ集約が必要で非効率 |
| マルチインデックス | 一部のMultiIndex操作 | 分散環境での実装が複雑 |
to_pandas()でローカルに集約するapply()よりも組み込み集計関数(sum・mean・count)を優先するto_spark()で変換してから処理するsort_values()の結果は後続処理で順序が崩れる可能性があるため、 最終出力の直前でソートする「Pandas API on Spark」と「Pandas UDF」は名前が似ているため混同されやすいですが、目的と使い方が異なります。
| 比較項目 | pandas API on Spark | Pandas UDF(pandas_udf) |
|---|---|---|
| 目的 | Pandas構文で分散処理を記述 | Spark DataFrameにPandas関数を適用 |
| 入力 | pyspark.pandas DataFrame | Spark DataFrameのカラム/グループ |
| 出力 | pyspark.pandas DataFrame | Spark DataFrameのカラム |
| 内部処理 | Sparkプランに変換 | Apache Arrow経由でバッチ転送 |
| 使い方 | ps.read_csv()等から全操作 | @pandas_udfデコレータで関数定義 |
| 速度 | Spark APIと同等 | 通常Python UDFの数十〜数百倍 |
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
import pandas as pd
# Scalar Pandas UDF: カラム単位でPandas関数を適用
@pandas_udf(DoubleType())
def multiply_by_two(s: pd.Series) -> pd.Series:
return s * 2
df = spark.table("sales")
result = df.withColumn("doubled", multiply_by_two(df["amount"]))
# Grouped Map: グループ単位でPandas DataFrameを受け取り処理
def normalize(pdf: pd.DataFrame) -> pd.DataFrame:
pdf["normalized"] = (
(pdf["value"] - pdf["value"].mean()) / pdf["value"].std()
)
return pdf
df.groupby("department").applyInPandas(normalize, schema=output_schema)pandas API on SparkはSpark Developer AssociateとData Engineer Associate(DEA)の両試験で出題されます。 特に以下の3点が重要です。
import pyspark.pandas as psが正しいインポート文です。import koalas as ks(旧Koalas)やfrom pyspark import pandasは試験の誤答選択肢として出題されます。
to_pandas_on_spark()(Spark→pyspark.pandas)とtoPandas()(Spark→ネイティブPandas)の違いは頻出です。 前者は分散状態を維持し、後者はドライバにデータを集約します。
「行の順序が保証されない」「ilocが非効率」「一部のPandas関数が未サポート」といった制約が正誤問題で問われます。特にネイティブPandasとの動作の違いについて正確に理解しておく必要があります。
Spark Developer / DEA
問題 1
次のコードを実行した場合の動作について、正しい説明はどれですか? import pyspark.pandas as ps ps_df = ps.read_csv("/data/large_dataset.csv") result = ps_df.groupby("region").agg({"sales": "sum", "quantity": "mean"}) final = result.to_pandas()
正解: B
pyspark.pandasのread_csvは内部的にSparkのデータソースAPIを使用してデータを分散読み込みします(Aは誤り)。groupbyとaggもSpark DataFrameの操作に変換されて分散実行されます(Cは誤り)。to_pandas()はpyspark.pandas DataFrameをネイティブPandas DataFrameに変換し、その際にデータがドライバノードに集約されます。この例ではgroupby集計後の結果(リージョン数分の行数)のみが集約されるため、通常は安全に実行できます。Dはto_pandas_on_spark()の説明であり、to_pandas()はネイティブPandasに変換するため分散状態は維持されません。
Pandas API on SparkとネイティブPandasの違いは何ですか?
構文はほぼ同一ですが、実行エンジンが根本的に異なります。ネイティブPandasはシングルノードのメモリ上でデータを処理しますが、Pandas API on SparkはApache Sparkの分散処理エンジンで動作します。そのため数百GB〜数TB規模のデータをPandasの構文で処理可能です。ただし、行の順序が保証されない、iloc/locによるランダムアクセスが非効率、一部のPandas関数(cummax、ewmなど)が未実装といった分散環境固有の制約があります。
KoalasとPandas API on Sparkの関係は?
KoalasはPandas API on Sparkの前身プロジェクトです。DatabricksがOSSとして開発し、Apache Spark 3.2で本体に統合されました。現在はpyspark.pandasモジュールとして標準提供されており、Koalasをpipでインストールする必要はありません。KoalasのAPIはpyspark.pandasにほぼ完全に移行されています。移行ガイドはSpark公式ドキュメントに記載されています。
Pandas UDFとPandas API on Sparkの違いは何ですか?
両者は名前が似ていますが、目的と使い方が異なります。Pandas API on Spark(pyspark.pandas)はPandasの構文でSparkの分散処理を行うAPI全体であり、DataFrameの作成・変換・集計をPandas風に記述できます。一方、Pandas UDF(pandas_udf)はSpark DataFrameのカラムやグループに対してPandas関数を適用するための仕組みで、Apache Arrowによるベクトル化データ転送で通常のPython UDFより数十倍高速に動作します。試験ではこの違いが問われることがあります。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Databricks資格一覧|全7試験・難易度・勉強法
Databricks認定資格全7試験の一覧・難易度・出題範囲・合格ラインを徹底解説。2026年最新版の公式試験ガイドに準...
Databricks試験の難易度ランキング|全7資格を徹底比較
Databricks認定全7試験の難易度をランキング形式で徹底比較。合格率・学習時間・出題傾向から難易度を分析。...
Databricks資格の勉強方法|最短合格ルートと学習時間の目安
Databricks認定資格に最短で合格するための勉強方法を完全ガイド。公式リソース・問題集・学習スケジュールを徹底解説...
Databricks Data Engineer Associate完全解説|出題範囲・問題例・合格戦略
Databricks Certified Data Engineer Associate試験を徹底解説。5つの出題ドメイ...
Databricks Data Engineer Professional完全解説|上級試験の攻略法
Databricks Certified Data Engineer Professional試験を徹底解説。10の出題...