Contents
Databricks 環境のセットアップと最新 Runtime の選択(2026/05 更新)
Databricks を本格的に利用するには、まず対象クラウド上にワークスペースを作成し、その上で推奨される Runtime バージョンを選択します。Runtime 13.3 LTS は 2026 年 5 月時点の最新長期サポート版であり、コスト・パフォーマンス・セキュリティの観点から最も安定した基盤となります。本節では GCP と AWS の両方に対応したクラスター作成手順と、Runtime のバージョン確認方法を解説します。
クラウド別クラスター作成手順
以下は UI 操作と CLI(databricks-cli)の両方で実行できる概要です。どちらの方法でも 「spark_version」 に 13.3.x-scala2.12 を指定すれば、最新 Runtime が利用できます。
| 手順 | GCP の場合 | AWS の場合 |
|---|---|---|
| 1️⃣ ワークスペース作成 | Google Cloud Console → Marketplace → 「Databricks」→ プロジェクト・リージョンを選択 | AWS Management Console → Service Catalog → 「Databricks」製品を起動 → VPC とサブネットを設定 |
| 2️⃣ クラスター画面へ遷移 | Databricks UI の左側メニュー Clusters > Create Cluster をクリック | 同様に Databricks UI からクラスター作成画面へ |
| 3️⃣ 設定入力 | 必要項目(クラスター名、ノードタイプ、ワーカー数など)を入力し Create | 必要項目を入力し Create |
CLI で自動化する例
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# 1. Databricks CLI の認証情報設定 databricks configure --token # 2. GCP 用クラスター作成(AWS でも同様にパラメータだけ変える) databricks clusters create \ --json '{ "cluster_name": "spark-pipeline-gcp", "spark_version": "13.3.x-scala2.12", "node_type_id": "n2-standard-8", "num_workers": 4, "autotermination_minutes": 30, "enable_elastic_disk": true }' |
ポイント
-spark_versionに「13.3.x‑scala2.12」を指定すると、Runtime 13.3 LTS が自動的に選択されます。
-enable_elastic_diskを有効化すると、ディスク容量が必要に応じて拡張され、ジョブ失敗リスクを低減できます。
Runtime バージョンの確認方法と推奨設定
Databricks UI の Compute > Cluster Details に表示される「Spark version」欄で実際にデプロイされたバージョンを確認してください。公式ドキュメントは「Runtime 13.3 は Apache Spark 4.0 系の機能をベースに構築されていますが、完全な互換性というよりも主要コンポーネントが更新されている」旨を示しています。そのため 「Spark 4.0 相当」 と表現するのは概念的であり、正確には Apache Spark 4.0 の機能セットが組み込まれた Runtime と理解してください。
| 設定項目 | 推奨値/設定例 |
|---|---|
| Spark version | 13.3.x(内部的に Spark 4.0 系) |
| Python runtime | 3.10 |
| ストレージ | DBFS + Cloud Storage(GCS / S3) |
| 自動スケーリング | 有効(最小 2、最大 8 ノード) |
| セキュリティ | IAM ロールで最小権限を付与、dbutils.secrets でシークレット管理 |
結論
最新 Runtime と自動スケーリング設定を組み合わせることで、コスト効率と処理性能のバランスが取れた基盤がすぐに構築できます。
ノートブック構造と SparkSession 初期化のベストプラクティス
データエンジニアはノートブックの可読性と実行安定性を同時に確保したいものです。本節では、チーム全体で共有しやすいテンプレート構造と、パフォーマンスに直結する SparkSession の推奨オプションを紹介します。
推奨ノートブックテンプレート
以下のセル順序は「目的・概要」から「結果確認」まで一貫した流れになるよう設計されています。各セルは 単一責任 を意識し、変更が必要な箇所だけを差分で更新できる点が利点です。
| セル番号 | 内容(簡潔な説明) |
|---|---|
| 1️⃣ | ノートブックメタ情報(目的・対象データ範囲) |
| 2️⃣ | ライブラリインポートと SparkSession 初期化 |
| 3️⃣ | 環境変数、シークレット取得、パラメータ定義 |
| 4️⃣ | データ抽出ロジック(外部ストレージ・DB 接続) |
| 5️⃣ | ビジネスロジック/変換処理(DataFrame API / SQL) |
| 6️⃣ | 書き込み先と成果物登録(Delta Lake 等) |
| 7️⃣ | ログ出力、メトリクス送信、実行結果サマリー |
ポイント
- 各セルの冒頭に簡単なコメントを残すことで、後から参照したときに処理意図がすぐ分かります。
- 「パラメータ定義」セルは Notebook パラメータ(dbutils.widgets)と組み合わせると、ジョブ実行時の再利用性が向上します。
SparkSession のチューニングポイント
Runtime 13.3 では Adaptive Query Execution がデフォルトで有効ですが、プロジェクト固有のワークロードに応じて以下のオプションを明示的に設定すると安心です。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
from pyspark.sql import SparkSession from pyspark import StorageLevel spark = (SparkSession.builder .appName("ETL_Pipeline") # 基本的なメモリ・CPU 設定(クラスターサイズに合わせて調整) .config("spark.executor.memory", "8g") .config("spark.driver.memory", "4g") .config("spark.sql.shuffle.partitions", "200") # Adaptive Query Execution の明示的有効化 .config("spark.sql.adaptive.enabled", "true") .config("spark.databricks.io.cache.enabled", "true") # 動的パーティションプルーニングの最適化 .config("spark.databricks.optimizer.dynamicPartitionPruning.enabled", "true") .getOrCreate()) |
| オプション | 効果・推奨設定 |
|---|---|
spark.sql.shuffle.partitions |
デフォルト 200。データ規模が小さい場合は 100、数十億レコード規模では 500〜800 が目安。 |
spark.databricks.io.cache.enabled |
DBFS キャッシュを有効化し、同一ファイルの再読込コストを削減。 |
spark.sql.adaptive.enabled |
クエリ実行時に自動で最適なジョイン方式やパーティション数を選択。 |
spark.databricks.optimizer.dynamicPartitionPruning.enabled |
パーティションプルーニングを強化し、フィルタ条件がないスキャンを回避。 |
結論
テンプレート化されたセル構成と上記オプションの組み合わせで、ノートブックは可読性・保守性ともに高く、実行時のパフォーマンスも安定します。
データソース接続と変換ロジック実装
本番環境では GCS / S3 だけでなく BigQuery や Snowflake といったデータウェアハウスとの連携が求められます。ここでは最新コネクタを用いた読み込み例と、バッチ・ストリーミング双方に対応できる ETL パターンを示します。
Cloud Storage(GCS / S3)からのデータ読込例
以下は Parquet と CSV の代表的なフォーマットです。spark.read.format に加えて option でヘッダーやスキーマ推測を制御できます。
|
1 2 3 4 5 6 7 8 9 10 |
# GCS (Parquet) df_gcs = spark.read.format("parquet") \ .load("gs://my-bucket/raw/events_2026-06-*.parquet") # S3 (CSV) df_s3 = spark.read.format("csv") \ .option("header", "true") \ .option("inferSchema", "true") \ .load("s3a://data-lake/staging/sales_2026/*.csv") |
ポイント
- GCS はgs://、S3 はs3a://プレフィックスを使用する点に注意してください。
- 大規模データの場合はspark.read.format(...).option("maxFilesPerTrigger", 10)で読み込み件数を制御するとメモリ圧迫を防げます。
BigQuery と Snowflake の最新コネクタ使用例
Databricks Runtime 13.3 に同梱されているコネクタは、バージョン管理が自動的に行われるため個別インストールは不要です。ただしプロジェクト固有の認証情報は Secret Scope 経由で安全に取得してください。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# BigQuery (spark-bigquery-connector 0.30.0) df_bq = spark.read.format("bigquery") \ .option("table", "my_project.dataset.sales") \ .load() # Snowflake (spark-snowflake_2.12 2.13.1) sf_options = { "sfURL": "abc123.snowflakecomputing.com", "sfUser": dbutils.secrets.get(scope="snowflake", key="user"), "sfPassword": dbutils.secrets.get(scope="snowflake", key="pwd"), "sfDatabase": "PROD_DB", "sfSchema": "PUBLIC", "sfWarehouse": "COMPUTE_WH" } df_sf = spark.read.format("snowflake").options(**sf_options).load() |
留意点
- Snowflake 接続時はspark.sql.shuffle.partitionsをデータサイズに合わせて増やすと、JOIN 時のスパークシャッフルが高速化します。
- BigQuery はサーバーレスクエリ実行になるため、読み込みフィルタは SQL 側で絞り込む方がネットワークコストを削減できます。
Structured Streaming を用いたリアルタイム ETL パターン
以下のコードは Kafka から増分データを取得し、ウィンドウ集計後に Delta Lake に書き出す典型的なストリーミングパイプラインです。withWatermark と trigger を適切に設定することで遅延データの処理とバッチ感覚のスケジューリングが実現できます。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
from pyspark.sql import functions as F # 1. Kafka ストリーム取得 raw_stream = (spark.readStream.format("kafka") .option("kafka.bootstrap.servers", "kafka-prod:9092") .option("subscribe", "events") .load() .selectExpr("CAST(value AS STRING) as json") .select(F.from_json("json", schema).alias("data")) .select("data.*")) # 2. ウィンドウ集計(5 分単位、遅延許容 10 分) agg_stream = (raw_stream .withWatermark("event_time", "10 minutes") .groupBy( F.window("event_time", "5 minutes"), "user_id" ) .agg(F.sum("amount").alias("total_amount"))) # 3. Delta Lake へ書き込み(append モード、チェックポイント必須) query = (agg_stream.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/tmp/checkpoints/agg_events") .trigger(processingTime="5 minutes") .start("/delta/aggregated/events")) |
ベストプラクティス
- チェックポイントは必ず永続的な Cloud Storage に配置し、障害復旧時に再利用できるようにします。
-triggerの間隔はビジネス要件とシステム負荷を踏まえて調整してください(例:1 分→リアルタイム性重視、10 分→コスト削減)。
宣言型パイプライン(Lakeflow)活用の検討
注記
Lakeflow はオープンソースでコミュニティが開発している宣言型 ETL フレームワークです。Databricks が公式にサポートしているわけではありませんが、GitHub 上で公開されており、一部企業で実装例が報告されています。導入時は 自己責任 での評価・テストを推奨します。
Lakeflow の概要と利用上の留意点
- 宣言型アプローチ:処理ステップは Python コードではなく YAML に記述し、可視化とバージョン管理が容易。
- 自動最適化:Lakeflow エンジンが Spark のキャッシュ・パーティショニングを推測し、Runtime 13.3 の Adaptive Query Execution と併用できる。
- テストフレンドリー:ステップごとに入力/期待出力を定義でき、
dbtestと組み合わせた単体テストが可能。 - サポートリスク:公式サポートが無いため、バグ修正や機能追加はコミュニティの貢献度に依存します。プロジェクトで採用する場合は fork して社内保守 を検討してください。
YAML 定義例と実行手順
以下は「抽出 → 集計 → 書き込み」のシンプルなパイプラインです。ファイル名は pipeline.yaml とし、ノートブックから lakeflow.run() で呼び出します。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
name: sales_daily_aggregation description: "日次売上集計パイプライン(Lakeflow デモ)" steps: - name: extract_raw type: spark.read format: delta path: /delta/raw/sales/ output: raw_df - name: aggregate_sales type: spark.sql sql: | SELECT DATE(event_time) AS sales_date, product_id, SUM(amount) AS total_amount FROM ${raw_df} GROUP BY DATE(event_time), product_id input: - raw_df output: agg_df - name: write_agg type: spark.write format: delta mode: overwrite path: /delta/aggregated/daily_sales/ input: - agg_df |
実行手順(ノートブック側)
|
1 2 3 4 5 6 7 8 |
# Lakeflow ライブラリは Runtime 13.3 のクラスターに事前インストール済みと想定 import lakeflow pipeline_path = "/Workspace/Projects/sales_pipeline/pipeline.yaml" result = lakeflow.run(pipeline_path) display(result) # 各ステップの出力 DataFrame が辞書形式で取得できる |
ポイント
-${raw_df}のように変数展開で前段階の DataFrame を参照できます。
-lakeflow.run()は内部的に SparkSession を再利用するため、別途セッション生成は不要です。
テスト・デプロイ・モニタリング、パフォーマンス最適化
本番運用ではコード品質の担保と迅速なリリースが不可欠です。Databricks が提供する Test Framework、Jobs API、外部 CI/CD ツールを組み合わせた実装例と、監視・チューニングのベストプラクティスをまとめます。
Databricks Test Framework と単体テスト例
dbtest ライブラリは DataFrame の等価性検証やスキーマチェックに特化しています。以下は前節で示した aggregate_sales ステップだけを対象としたテストです。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
import dbtest from lakeflow import run def test_aggregate_sales(): # ① テスト用入力データ作成 input_df = spark.createDataFrame([ ("2026-06-01 10:00:00", "P001", 100), ("2026-06-01 12:30:00", "P001", 150) ], ["event_time", "product_id", "amount"]) # ② Lakeflow のステップ限定実行 result = run( "/Workspace/Projects/sales_pipeline/pipeline.yaml", steps=["aggregate_sales"], inputs={"raw_df": input_df} ) # ③ 期待結果作成 expected = spark.createDataFrame([ ("2026-06-01", "P001", 250) ], ["sales_date", "product_id", "total_amount"]) # ④ アサーション dbtest.assert_frame_equal(result["agg_df"], expected) # テスト実行(ノートブック上でも可能) test_aggregate_sales() |
運用ヒント
- ノートブックの最終セルでdbutils.notebook.exit("PASS")とすれば、ジョブステータスにテスト結果を反映できます。
- CI パイプラインからはdatabricks jobs run-nowコマンドで上記ノートブックを呼び出し、終了コードで成功/失敗を判定します。
CI/CD とジョブスケジューリング
1. Jobs API による定期実行設定(例:日次集計)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
curl -X POST https://<databricks-instance>/api/2.0/jobs/create \ -H "Authorization: Bearer $DATABRICKS_TOKEN" \ -d '{ "name": "sales_daily_aggregation", "new_cluster": { "spark_version": "13.3.x-scala2.12", "node_type_id": "i3.xlarge", "num_workers": 4, "autotermination_minutes": 30 }, "notebook_task": { "notebook_path": "/Workspace/Projects/sales_pipeline/run_notebook" }, "schedule": { "quartz_cron_expression":"0 2 * * * ?", "timezone_id":"Asia/Tokyo" } }' |
2. GitHub Actions と連携した CI フロー(抜粋)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
name: Databricks ETL CI on: push: branches: [ main ] jobs: test-and-deploy: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - name: Install Databricks CLI run: pip install databricks-cli - name: Run unit tests on Databricks env: DATABRICKS_HOST: ${{ secrets.DATABRICKS_HOST }} DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }} run: | databricks jobs run-now --job-id 12345 # テストジョブ ID - name: Deploy to prod (if tests pass) if: success() env: DATABRICKS_HOST: ${{ secrets.DATABRICKS_HOST }} DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }} run: | databricks jobs run-now --job-id 67890 # 本番ジョブ ID |
ポイント
- テストジョブと本番ジョブを別々に管理すれば、失敗時のロールバックが容易です。
-databricks-cliの認証は GitHub Secrets に安全に保管してください。
監視・アラート設定とチューニングポイント
Databricks UI の Jobs タブでジョブ実行統計を確認できるほか、Cloud Logging(Stackdriver / CloudWatch)へエクスポートすると外部ダッシュボードとの連携が可能です。
ログエクスポート例(GCP)
|
1 2 3 4 |
gcloud logging sinks create databricks-sink \ "storage.googleapis.com/<bucket>/databricks-logs" \ --log-filter='resource.type="databricks_job" AND severity>=ERROR' |
パフォーマンスチューニングチェックリスト
| 項目 | 推奨設定・留意点 |
|---|---|
| キャッシュ | 再利用頻度が高い中間 DataFrame は df.persist(StorageLevel.MEMORY_AND_DISK) で保持。 |
| パーティショニング | 書き込み先テーブルは日付やキー列で repartition、ファイルサイズを 256 MiB 前後に調整。 |
| Shuffle 最小化 | Join 前に必要カラムだけ select、小規模テーブルは broadcast(df) ヒントでブロードキャスト。 |
| Adaptive Query Execution | デフォルト有効だが spark.sql.adaptive.coalescePartitions.enabled=true でパーティション数自動削減を促進。 |
| Dynamic Partition Pruning | spark.databricks.optimizer.dynamicPartitionPruning.enabled=true を明示的にオンにし、フィルタプッシュダウン効果を最大化。 |
結論
テスト・CI/CD・ジョブスケジューリングと監視・チューニングを一貫して実装すれば、コード変更から本番リリースまでのサイクルが数分に短縮され、運用コストも抑えられます。
まとめ
- Runtime の選択:Databricks Runtime 13.3 LTS は Spark 4.0 系機能を組み込んだ最新長期サポート版であり、自動スケーリングと組み合わせることでコスト最適化が実現できます。
- ノートブックベストプラクティス:統一テンプレートと SparkSession の推奨オプションにより、可読性・保守性・実行性能の三拍子が揃います。
- 最新コネクタ活用:GCS / S3、BigQuery、Snowflake への接続例と Structured Streaming パターンを組み合わせれば、バッチ・リアルタイム双方に対応したデータパイプラインが構築できます。
- 宣言型フレームワーク(Lakeflow):コミュニティベースの YAML 定義はコード量削減とテスト容易性を提供しますが、公式サポートが無い点はリスクとして認識し、社内での評価・保守体制を整えてから本番導入してください。
- テスト・CI/CD・モニタリング:Databricks Test Framework と外部 CI(GitHub Actions)を組み合わせた自動化パイプラインと、Cloud Logging 連携によるアラート設定で運用品質が大幅に向上します。
- パフォーマンス最適化:キャッシュ・パーティショニング・Shuffle 最小化、そして Runtime の Adaptive 機能を意識したチューニングは、スループットとコストの両立に不可欠です。
以上の手順とベストプラクティスに従って実装すれば、2026 年版 Databricks とオープンソースツール群(Lakeflow 等)を活用した本格的なデータパイプラインが短期間で完成し、スケーラビリティと信頼性を兼ね備えたデータ基盤を構築できます。