Contents
Apache Spark と PySpark の概要(Spark 3.5 系)
Spark 3.5 は 2023 年末にリリースされた最新の安定版で、クエリ最適化機能や API の利便性が大幅に向上しています。Python からは公式パッケージ PySpark が提供されており、ローカル環境でもクラウド上のマネージド Spark サービスでも同一コードで実行できます。本節では Spark 3.5 の主要特徴と、特に注目すべき機能を概観します。
- Catalyst Optimizer の改良:統計情報取得が自動化され、プラン選択が高速化。
- Adaptive Query Execution(AQE) がデフォルトでオフになっているものの、簡単な設定だけで実行時に最適化が働くようになります。
- Pandas API on Spark の拡張:分散環境下でも Pandas ライクなコードを書きやすくなり、
to_pandas()の呼び出し回数を減らすだけでメモリ使用量が大幅に削減できます。
根拠:Apache Spark 公式ドキュメント(Adaptive Query Execution)および、2023 年に実施された独立ベンチマーク(TPC‑DS, 1 TB データセット)では、AQE 有効化によりジョブ実行時間が 平均 22 %、最長で 30 % 短縮されることが報告されています。
開発環境の構築(ローカル & クラウド)
Spark アプリケーションはまずローカルで動作確認し、その後マネージドサービスや自前クラスタにデプロイするケースが一般的です。ここでは、Python 仮想環境の作成手順と、代表的なクラウド環境(Amazon EMR)で Spark 3.5 を利用する際のポイントを示します。
ローカルマシンでの Python 環境構築
- 仮想環境の作成(conda または venv いずれでも可)。
- PySpark のバージョンを
3.5.*に固定してインストール。
|
1 2 3 4 5 6 7 8 9 10 |
# conda の例 conda create -n spark35 python=3.10 -y conda activate spark35 pip install pyspark==3.5.* # venv の例 python -m venv spark35_env source spark35_env/bin/activate # Windows は .\Scripts\activate pip install pyspark==3.5.* |
インストール後は pyspark --version でバージョンを確認してください。
Amazon EMR で Spark 3.5 を使用する際の留意点
EMR のリリースラベル emr-6.15.0 は Spark 3.4 系までしかサポートしていません。Spark 3.5 を利用したい場合は、EMR 7.x 系(例:emr-7.2.0)を選択する必要があります。
|
1 2 3 4 5 6 7 8 |
aws emr create-cluster \ --name "Spark35Cluster" \ --release-label emr-7.2.0 \ # Spark 3.5 対応ラベル --applications Name=Spark \ --instance-type m5.xlarge \ --instance-count 3 \ --use-default-roles |
クラスター起動後は SSH 接続して spark-submit が利用可能です。ローカルで動作確認したコードをそのまま移行できる点が大きな利点です。
PySpark DataFrame 操作と SparkSQL 入門
DataFrame は Spark の中心概念であり、遅延評価により必要なタイミングでだけ計算が走ります。ここでは CSV/JSON の読み込みから基本的な変換・集計までの流れを示します。
データ読み込みと基本変換
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
from pyspark.sql import functions as F # CSV 読み込み(ヘッダーあり、スキーマ自動推測) df = spark.read.option("header", "true") \ .option("inferSchema", "true") \ .csv("/data/sales.csv") # 必要な列だけ抽出し、条件でフィルタリング filtered = df.select("date", "region", "revenue") \ .filter(F.col("revenue") > 1000) # 地域別の集計(平均・合計) agg = filtered.groupBy("region") \ .agg(F.avg("revenue").alias("avg_rev"), F.sum("revenue").alias("total_rev")) agg.show() |
SparkSQL を用いた同等クエリ
DataFrame を一時ビューに登録すれば、SQL 文で同様の集計が可能です。
|
1 2 3 4 5 6 7 8 9 10 11 |
df.createOrReplaceTempView("sales") sql_result = spark.sql(""" SELECT region, AVG(revenue) AS avg_rev, SUM(revenue) AS total_rev FROM sales WHERE revenue > 1000 GROUP BY region """) sql_result.show() |
SQL と API を組み合わせることで、既存の BI ツールやデータエンジニアリングパイプラインとの親和性が高まります。
パフォーマンスチューニング(AQE・キャッシュ・パーティショニング)
実運用で求められるスループットは、適切な設定とチューニングに依存します。本節では Spark 3.5 で有効化すべき主要オプションをまとめます。
Adaptive Query Execution(AQE)の最適設定
AQE は実行時に統計情報を再評価し、シャッフルやジョインの戦略を自動的に切り替えます。以下の 3 つのパラメータが特に効果的です。
| パラメータ | 推奨設定 | 効果 |
|---|---|---|
spark.sql.adaptive.enabled |
"true" |
AQE 本体を有効化 |
spark.sql.adaptive.coalescePartitions.enabled |
"true" |
小さなパーティションを自動統合し、タスク数を削減 |
spark.sql.adaptive.minPartitionSize |
"64MB"(必要に応じて調整) |
1 パーティションあたりの最小サイズを設定し、過剰分割を防止 |
|
1 2 3 4 5 6 |
spark.conf.setAll({ "spark.sql.adaptive.enabled": "true", "spark.sql.adaptive.coalescePartitions.enabled": "true", "spark.sql.adaptive.minPartitionSize": "64MB" }) |
この構成で、groupBy 後のパーティション数が自動的に最適化され、前述のベンチマークと同様に 20 % 前後 の実行時間短縮が期待できます。
キャッシュとパーティショニング
- キャッシュ:頻繁に再利用する DataFrame は
cache()でメモリ上に保持し、I/O コストを削減します。 - 均一なパーティション分割:データのキー偏りがある場合は
repartition(ハッシュベース)かcoalesce(縮小のみ)で調整します。
|
1 2 3 4 5 6 |
big_df = spark.read.parquet("/data/large_table/") big_df.cache() # メモリに保持 balanced = big_df.repartition(200, "user_id") # user_id 基準で均等化 balanced.write.mode("overwrite").parquet("/tmp/balanced") |
推奨デフォルト設定(Spark 3.5)
| 設定項目 | 推奨値 |
|---|---|
spark.sql.shuffle.partitions |
ワーカー数 × 2〜4 (例: 200‑400) |
spark.executor.memory |
4 GB 以上 |
spark.driver.maxResultSize |
2 GB |
MLlib と Structured Streaming のハンズオン
機械学習とリアルタイム処理は Spark の主要ユースケースです。以下に、マネージドクラスターでもそのまま実行できるサンプルコードを示します。
1. MLlib パイプライン(前処理 → 標準化 → ロジスティック回帰)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
from pyspark.ml import Pipeline from pyspark.ml.feature import VectorAssembler, StandardScaler from pyspark.ml.classification import LogisticRegression # データ読み込み data = spark.read.csv("/data/credit.csv", header=True, inferSchema=True) # 前処理ステージ assembler = VectorAssembler( inputCols=["age", "income", "balance"], outputCol="features_raw") scaler = StandardScaler(inputCol="features_raw", outputCol="features") # 学習モデル lr = LogisticRegression(featuresCol="features", labelCol="default") pipeline = Pipeline(stages=[assembler, scaler, lr]) model = pipeline.fit(data) # 推論例(上位 5 行) model.transform(data.limit(5)).select("features", "prediction").show() |
2. Structured Streaming による Kafka からのリアルタイム集計
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
from pyspark.sql.functions import window, col, avg # Kafka ソースからストリーム取得 stream_df = spark.readStream.format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "events") \ .load() \ .selectExpr("CAST(value AS STRING) as json") # JSON パース(簡易例) parsed = stream_df.selectExpr( "json_tuple(json, 'user_id', 'event_time', 'value') as (user_id, ts, val)" ).withColumn("ts", col("ts").cast("timestamp")) # 5 分ウィンドウでユーザー別平均値を算出 agg = parsed.groupBy(window(col("ts"), "5 minutes"), "user_id") \ .agg(avg(col("val")).alias("avg_val")) # コンソールへ出力(デバッグ用) query = agg.writeStream.outputMode("update") \ .format("console") \ .option("truncate", "false") \ .start() query.awaitTermination() |
上記コードは、ローカルの Spark セッションでも EMR クラスターでも同一の手順で実行できます。
まとめ
- Spark 3.5 は AQE の強化と Pandas API の拡張により、設定ひとつで約 20 % 程度のパフォーマンス向上が期待できる最新安定版です。
- ローカル環境は
conda/venvとpip install pyspark==3.5.*だけで構築可能。クラウドでは EMR 7 系リリースラベルを選択すれば Spark 3.5 が利用できます。 - DataFrame と SparkSQL は同一の Catalyst オプティマイザを共有するため、用途に応じて柔軟に切り替えられます。
- パフォーマンスチューニングは AQE の有効化、キャッシュ・適切なパーティショニング設定が鍵です。ベンチマーク結果を参考に自環境で微調整してください。
- MLlib と Structured Streaming のサンプルは、実務でもすぐに活用できる形で提示しました。
本稿の手順と設定例を踏襲すれば、初心者から中級者までが本番レベルの Spark アプリケーションを迅速に構築・運用できるようになります。