Contents
1. 統合オプションの全体像
BigQuery と Spark を結びつける方法は大きく分けて マネージド型 と セルフホスト型 の二種類があります。選択肢ごとの特徴と想定ユースケースを以下に整理しました。
ポイント:データ転送コスト・レイテンシ・運用負荷の観点から、まずは「マネージドか自己管理か」を決めることが設計の出発点です。
| オプション | 提供形態 | 主なメリット | 想定ユースケース |
|---|---|---|---|
| Spark‑BigQuery コネクタ | ライブラリ(JAR) | Storage API 直結で高速、フィルタ・投影プッシュダウン対応 | Dataproc / Serverless Spark のバッチジョブ |
| Dataproc | フルマネージド Hadoop/Spark クラスタ | GCP リソースと同一リージョンに配置でき、IAM で細かく制御可能 | 大規模 ETL、機械学習前処理 |
| Serverless Spark (Dataproc Jobs API) | 完全サーバーレス(ジョブ単位) | インフラ管理不要、オンデマンド課金 | 短時間の集計・レポート生成 |
| Databricks on GCP | SaaS 型 Spark プラットフォーム | 高度なノートブック環境と UI 統合、Delta Lake 対応 | データサイエンス/ML パイプライン |
| AWS Glue (Spark) | クロスクラウド(Glue 4.0) | マルチクラウドでのデータ連携が可能 | 既に AWS 環境を持つ組織のハイブリッド構成 |
公式情報は以下を参照してください。
- BigQuery と Spark の接続方法 – https://cloud.google.com/bigquery/docs/connect-to-spark?hl=ja
- Dataproc ドキュメント – https://cloud.google.com/dataproc/docs
2. 前提条件と IAM ロール設定
2‑1. 必要な前提条件(導入文)
BigQuery と Spark を安全に連携させるためには、まず Storage API の有効化 と サービスアカウントへの最小権限付与 が必須です。これが整っていないと認証エラーやデータ転送失敗が頻発します。
必要な作業一覧
-
BigQuery Storage API の有効化
Cloud Console → 「API とサービス」→「ライブラリ」で BigQuery Storage API を有効にします。 -
サービスアカウントの作成(例:
spark-bq-sa)
bash
gcloud iam service-accounts create spark-bq-sa \
--display-name "Spark‑BigQuery Service Account" -
最小権限ロールの付与
| ロール | 用途 |
|---|---|
roles/bigquery.readSessionUser |
Storage API で読み取りセッションを作成 |
roles/bigquery.dataEditor(書き込みが必要な場合) |
テーブルへの INSERT / UPDATE |
roles/storage.objectViewer |
一時 GCS バケットへのオブジェクト閲覧 |
bash
PROJECT_ID=$(gcloud config get-value project)
SA_EMAIL="spark-bq-sa@$PROJECT_ID.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SA_EMAIL" \
--role="roles/bigquery.readSessionUser"
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SA_EMAIL" \
--role="roles/bigquery.dataEditor"
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SA_EMAIL" \
--role="roles/storage.objectViewer"
2‑2. 認証方式の選択肢(導入文)
運用フローに合わせて ADC、JSON キー、Secret Manager のいずれかを選びます。以下はそれぞれの設定手順です。
| 方法 | 設定手順 | 主な利用シーン |
|---|---|---|
| Application Default Credentials (ADC) | gcloud auth application-default login を実行し、ローカル環境に認証情報を保存。Dataproc クラスタはデフォルト SA が自動使用される。 |
開発・テスト環境 |
| JSON キー | サービスアカウントのキーを生成し、環境変数 GOOGLE_APPLICATION_CREDENTIALS にパスを設定。 |
CI/CD パイプラインや外部サーバ |
| Secret Manager | キーを Secret Manager に格納し、Dataproc 起動時に --properties=spark:spark.hadoop.google.cloud.auth.service.account.enable=true,spark:spark.hadoop.google.cloud.auth.service.account.json.keyfile=projects/PROJECT_ID/secrets/BQ_KEY/versions/latest を指定。 |
高セキュリティ要件の環境 |
3. Spark 環境の構築手順
3‑1. Dataproc クラスタ作成(導入文)
Dataproc は GCP のマネージド Hadoop/Spark 基盤です。ここでは 単一ノードクラスタ をテスト用に作成し、実運用時は必要に応じてスケールアウトできる構成へ拡張します。
コマンド例(gcloud CLI)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
REGION=asia-northeast1 # 必ずジョブ実行先と同一リージョンを指定 CLUSTER_NAME=spark-bq-demo PROJECT_ID=$(gcloud config get-value project) # Spark‑BigQuery コネクタ JAR をクラスタ起動時に自動ロードする例 CONNECTOR_JAR=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.28.0.jar gcloud dataproc clusters create $CLUSTER_NAME \ --region=$REGION \ --project=$PROJECT_ID \ --image-version=2.1-debian10 \ --single-node \ --master-machine-type=n1-standard-4 \ --properties=spark:spark.jars=$CONNECTOR_JAR |
ポイント
--propertiesの記法はkey=value形式で、^#^といったプレースホルダーは使用しません。- JAR のバージョンは 2024 年時点の安定版(0.28.0) を利用しています。将来のリリース情報は公式リポジトリで随時確認してください。
3‑2. ローカル / Google Colab 環境(導入文)
ローカルマシンや Colab 上でも同じコネクタを使えば、デバッグやプロトタイピングが高速化します。以下は PySpark を用いたセットアップ例です。
Colab でのインストール手順
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# 1. 必要なパッケージをインストール !apt-get install -y openjdk-11-jdk-headless > /dev/null !wget -q https://downloads.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz !tar xf spark-3.5.0-bin-hadoop3.tgz import os, sys os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64" sys.path.append("/content/spark-3.5.0-bin-hadoop3/python") # 2. SparkSession にコネクタ JAR を追加 connector_jar = "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.28.0.jar" from pyspark.sql import SparkSession spark = (SparkSession.builder .appName("colab-bq") .config("spark.jars", connector_jar) .getOrCreate()) |
注意:Colab では GCS バケットへのアクセスに
gcloud auth loginが必要です。認証が完了したら上記コードで接続できます。
4. コネクタ JAR の取得とバージョン選定
4‑1. 現行の安定版(導入文)
Spark‑BigQuery コネクタは Maven Central と Google が公開する GCS バケット の二か所から取得可能です。2024 年 10 月時点での最新安定版は 0.28.0 です。
| 入手方法 | 取得例 |
|---|---|
| Maven / sbt | libraryDependencies += "com.google.cloud.spark" %% "spark-bigquery-with-dependencies" % "0.28.0" |
| GCS 公開バケット | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.28.0.jar |
| 直接ダウンロード | https://repo1.maven.org/maven2/com/google/cloud/spark/spark-bigquery-with-dependencies_2.12/0.28.0/ |
4‑2. バージョン互換表(導入文)
| Spark バージョン | Scala バージョン | 推奨コネクタ |
|---|---|---|
| 3.5.x | 2.12 | 0.28.0 |
| 3.4.x | 2.12 | 0.27.1 |
| 3.3.x 以下 | 2.12 / 2.11 | 0.26.0(サポートは限定的) |
ベストプラクティス:使用中の Spark バージョンと Scala バージョンが一致していることを必ず確認し、公式リリースノートで互換性情報をチェックしてください。
5. データ入出力実装ガイド
5‑1. 認証方式別コードサンプル(導入文)
以下は ADC、JSON キー、Secret Manager の3種類の認証方法に対する PySpark と Scala の最小構成です。どれも spark-bigquery-with-dependencies がクラスパスに含まれている前提です。
ADC(Application Default Credentials)
PySpark
|
1 2 3 4 5 6 7 8 9 10 11 |
from pyspark.sql import SparkSession spark = (SparkSession.builder .appName("BQReadADC") .getOrCreate()) df = spark.read.format("bigquery") \ .option("table", "myproject.my_dataset.my_table") \ .load() df.show() |
Scala
|
1 2 3 4 5 6 7 8 9 10 11 |
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("BQReadADC") .getOrCreate() val df = spark.read.format("bigquery") .option("table", "myproject.my_dataset.my_table") .load() df.show() |
JSON キー
|
1 2 3 4 5 6 7 |
spark = (SparkSession.builder .appName("BQReadKey") .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "/path/to/key.json") .getOrCreate()) |
|
1 2 3 4 5 6 7 |
val spark = SparkSession.builder() .appName("BQReadKey") .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "/path/to/key.json") .getOrCreate() |
Secret Manager
|
1 2 3 4 5 6 7 |
spark = (SparkSession.builder .appName("BQReadSecret") .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "projects/PROJECT_ID/secrets/BQ_KEY/versions/latest") .getOrCreate()) |
ポイント:認証情報はコードにハードコーディングせず、環境変数や Secret Manager を活用して安全に管理してください。
5‑2. BigQuery から DataFrame へのロード(導入文)
filterPushdown と maxParallelism を適切に設定すると、転送データ量が大幅に削減されます。以下は PySpark の例です。
|
1 2 3 4 5 6 7 |
df = (spark.read.format("bigquery") .option("table", "myproject.sales.transactions") .option("filterPushdown", "true") # BQ 側で WHERE を実行 .option("maxParallelism", "2000") # 同時スキャン数上限 .load()) df.printSchema() |
5‑3. DataFrame → BigQuery 書き込み(導入文)
書き込み時は temporary GCS バケット が必須です。バケットとテーブルは同一リージョンに配置してください。
|
1 2 3 4 5 |
(df.write.format("bigquery") .option("temporaryGcsBucket", "my-bq-temp-bucket") # 同一リージョン .mode("append") # WRITE_APPEND 相当 .save("myproject.analytics.events")) |
| 書き込みパラメータ | 推奨設定例 |
|---|---|
temporaryGcsBucket |
my-bq-temp-bucket(リージョン一致) |
writeMethod |
"direct"(デフォルト、バッチ書き込みに最適) |
batchSizeBytes |
100 MiB(必要に応じて調整) |
createDisposition / writeDisposition |
CREATE_IF_NEEDED / WRITE_APPEND |
6. パフォーマンスチューニングとリージョン考慮点
6‑1. パーティション設定とプッシュダウン活用(導入文)
大規模テーブルを扱う場合は Spark のパーティション数 と BigQuery 側のフィルタプッシュダウン を組み合わせることで、スループットが向上します。
|
1 2 3 4 5 6 7 8 9 10 11 |
// 例:1 TB テーブル → 128 MiB/パーティション が目安 val targetSizeGB = 1024 val partitions = (targetSizeGB * 1024) / 128 spark.conf.set("spark.sql.shuffle.partitions", partitions) // フィルタプッシュダウン有効化(PySpark でも同様) df.filter($"event_date" >= "2024-01-01") .read.format("bigquery") .option("filterPushdown", "true") .load() |
6‑2. バッチ vs ストリーミング選択指針(導入文)
処理形態は 遅延要件 と コスト感覚 に応じて決めます。
| 項目 | バッチ処理 | ストリーミング |
|---|---|---|
| 主な遅延 | 数分〜数時間 | 秒単位 |
| コスト | 大量データは一括書き込みで低コスト | 小さなウィンドウごとに GCS 書き込みが発生しやすく若干高め |
| 実装難易度 | write.save() だけで完結 |
foreachBatch 等の追加ロジック必要 |
| 推奨シーン | 日次集計・ETL パイプライン | ダッシュボード更新・リアルタイム監視 |
6‑3. リージョン統一の重要性(導入文)
「リージョン不一致」エラーは最も頻出する障害 のひとつです。以下を必ず守ってください。
- BigQuery テーブル ↔ Dataproc クラスタ ↔ GCS バケット はすべて同一リージョン(例:
asia-northeast1)。 - マルチリージョンテーブル(US、EU)にアクセスする場合は、対象リージョンのいずれか 単一リージョン にクラスタを配置します。
- バケット作成時に
--location=REGIONを明示し、temporaryGcsBucketのロケーションと完全一致させます。
|
1 2 |
gsutil mb -l asia-northeast1 gs://my-bq-temp-bucket/ |
7. デバッグ・よくあるエラー対処法
| エラー種別 | 代表的メッセージ | 主な原因 | 推奨対策 |
|---|---|---|---|
| 認証エラー | Unable to locate Application Default Credentials |
ADC が設定されていない、または権限不足 | gcloud auth application-default login を実行し、サービスアカウントに bigquery.readSessionUser を付与 |
| 権限エラー | 403 Forbidden: Access Denied |
必要ロールが欠如 | IAM コンソールで対象 SA に roles/bigquery.dataEditor 等を追加 |
| スキーマ不整合 | Cannot resolve column name … |
DataFrame の列名・型が BQ と相違 | printSchema() で確認し、withColumnRenamed や cast で合わせる |
| リージョン不一致 | Temporary GCS bucket must be in the same location as the destination table |
バケットとテーブルのロケーションが異なる | バケット作成時に正しいリージョンを指定し、temporaryGcsBucket に同一バケットを使用 |
| ストレージ API レートリミット | ResourceExhausted: Rate limit exceeded |
同時スキャン数が上限超過 | maxParallelism を減らすか、ジョブ実行時間帯をずらす |
| 書き込みモードエラー | WRITE_TRUNCATE is not allowed for existing table |
テーブルが外部テーブル等で上書き不可 | WRITE_APPEND または新規テーブル作成オプションに切り替える |
デバッグのコツ:エラーメッセージ全文を検索し、公式ガイドや Stack Overflow の事例と照らし合わせると解決までの時間が短縮できます。
8. 実践サンプルユースケースと次のアクション
8‑1. ユースケース① :GCS → Spark 変換 → BigQuery ロード(導入文)
以下は Parquet データを GCS から取得し、簡単な前処理後に BigQuery に書き込むフローです。CI/CD パイプラインで自動実行できます。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
from pyspark.sql import SparkSession spark = (SparkSession.builder.appName("ETL_Pipeline") .config("spark.jars", "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.28.0.jar") .getOrCreate()) # 1. GCS 上の Parquet を読み込む raw = spark.read.parquet("gs://my-data-bucket/raw/events_2024-*.parquet") # 2. 必要な変換(例:日付抽出、不要列削除) transformed = (raw.withColumn("event_date", raw["timestamp"].cast("date")) .drop("raw_payload")) # 3. BigQuery に書き込む (transformed.write.format("bigquery") .option("temporaryGcsBucket", "my-bq-temp-bucket") .mode("append") .save("myproject.analytics.events_daily")) |
チェックリスト
- ☐
temporaryGcsBucketがテーブルと同一リージョンか - ☐ サービスアカウントに
bigquery.dataEditorとstorage.objectCreatorが付与されているか - ☐ コネクタ JAR のバージョンが Spark/Scala バージョンと一致しているか
8‑2. ユースケース② :Spark 集計 → Data Studio 用テーブル作成(導入文)
Spark 上で月次売上を集計し、Data Studio が直接参照できるテーブルに保存する例です。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ val spark = SparkSession.builder() .appName("MonthlySales") .getOrCreate() // BigQuery から取込む val srcDf = spark.read.format("bigquery") .option("table", "myproject.sales.transactions") .load() // 月次サマリを作成 val summary = srcDf.groupBy(col("region"), month(col("sale_timestamp")).as("month")) .agg(sum(col("amount")).as("monthly_sales")) // 書き出し(Data Studio 用テーブル) summary.write.format("bigquery") .option("temporaryGcsBucket", "my-bq-temp-bucket") .mode("overwrite") // 上書きで最新サマリを保持 .save("myproject.reporting.monthly_sales") |
ポイント
mode("overwrite")はテーブルが既に存在する場合に WRITE_TRUNCATE 相当です。- Data Studio のレポートは自動的に更新されるので、定期ジョブ(例:毎月 1 日実行)を Dataproc スケジューラで設定すると運用コストが削減できます。
8‑3. 次のアクション
| アクション | 内容 |
|---|---|
| ① 環境構築 | 上記 Dataproc 作成スクリプトを実行し、接続テスト (spark.read.format("bigquery")...) を確認 |
| ② IAM 設定の見直し | 必要最小権限だけが付与されているか IAM ポリシーをレビュー |
| ③ バージョン固定 | requirements.txt(Python)や pom.xml(Scala)にコネクタバージョン(0.28.0)を明示 |
| ④ パフォーマンス測定 | 本番規模のデータで shuffle partitions と maxParallelism をチューニングし、実行時間とコストを比較 |
| ⑤ CI/CD 統合 | Cloud Build / GitHub Actions から Dataproc ジョブを自動起動し、エラーハンドリングを実装 |
まとめ
- 統合オプションは「マネージド vs 自己管理」「バッチ vs ストリーミング」の観点で選定。GCP 内完結なら Dataproc または Serverless Spark が最もシンプル。
- 前提条件として Storage API の有効化と、サービスアカウントに
bigquery.readSessionUserなどの最小権限を付与することが必須。 - 環境構築は gcloud CLI の
--properties=spark:spark.jars=…を利用し、ローカルや Colab でも同一 JAR をロードすればデバッグが容易になる。 - コネクタ取得は Maven Central または GCS 公開バケットから最新安定版(0.28.0)を使用し、Spark/Scala バージョンと整合性を確認。
- 入出力実装では ADC が最も手軽で安全。
temporaryGcsBucketは必ず同一リージョンに作成し、filterPushdownとmaxParallelismで転送量削減を図る。 - パフォーマンスチューニングはパーティション数とプッシュダウン設定が鍵。バッチ・ストリーミングの選択は遅延要件とコスト感覚で決め、リージョン統一を徹底する。
- デバッグはエラーメッセージ全文検索と IAM/リージョン設定の見直しが効果的。
このガイドに沿って構築・テストを行えば、BigQuery と Spark のシームレスな連携 が実現でき、データパイプラインの信頼性とスループットが大幅に向上します。ぜひ本番環境で試し、必要に応じて各項目を自社要件に合わせてカスタマイズしてください。