Contents
- 1 導入 — BigQuery Python API 入門 手順の狙い
- 2 対象読者・前提スキル(BigQuery と Python)
- 3 開発環境の準備と認証(BigQuery Python API)
- 4 実践ハンズオン(最短コマンド順・期待出力)
- 5 サンプルコード集(主要操作別)
- 5.1 クライアント初期化(ADC / Secret Manager 経由)
- 5.2 クエリ実行(同期/非同期)と dry-run
- 5.3 非同期ジョブの追跡(待機と再試行)
- 5.4 to_dataframe(BigQuery Storage API 併用)
- 5.5 GCS → BigQuery ロード(load_table_from_uri)
- 5.6 DataFrame → BigQuery(小〜中バッチ)
- 5.7 BigQuery → GCS(抽出 / Parquet エクスポート)
- 5.8 ストリーミング挿入と冪等化(insert_rows_json)
- 5.9 ジョブ統計の取得(ジョブ種別ごとの注意)
- 5.10 再試行(指数バックオフ)
- 5.11 ユニットテスト(モックの例)
- 6 運用・監視・コスト・セキュリティ(運用向け注意点)
- 7 まとめ
導入 — BigQuery Python API 入門 手順の狙い
BigQuery Python API 入門 手順を短くまとめた実務向けガイドです。
BigQuery Python API 入門 手順として、認証からクエリ、ロード、運用までの最短フローと期待出力を示します。
サンプルコードとチェックリストでローカル → CI → 本番の移行を想定して検証できます。
対象読者・前提スキル(BigQuery と Python)
想定読者は初心者〜中級者を主に想定します。初心者は「クエリ実行〜簡単なロード」、中級者は「CI/運用・最適化」、上級者は「スロット運用や高スループット設計」を参照してください。
前提として Python 3.8 以上、SQL の基本、GCP プロジェクトと請求(Billing)の理解があるとスムーズに進められます。
開発環境の準備と認証(BigQuery Python API)
ここでは必須パッケージ、BigQuery Storage API の前提、認証方式の使い分け、CI での鍵管理パターンをまとめます。バージョンや依存関係を明記し、環境差で動かないリスクを下げます。
必須パッケージと推奨バージョン
以下は本稿で想定する最小バージョンの例です。環境によっては最新版の方が安全です。BigQuery Storage API を使う場合は pyarrow と pandas の互換性に注意してください。
|
1 2 3 4 5 6 7 8 9 10 |
python -m venv .venv source .venv/bin/activate pip install --upgrade pip pip install "google-cloud-bigquery>=3.0.0" \ "google-cloud-bigquery-storage>=2.0.0" \ "google-cloud-storage>=2.0.0" \ "google-cloud-secret-manager>=2.0.0" \ "pandas>=1.3.0" "pyarrow>=8.0.0" |
- 補足: to_dataframe を使うには google-cloud-bigquery-storage と pyarrow が必要です。バージョン差でクライアント名やメソッドの有無があるため、公式ライブラリドキュメントで使用ライブラリの互換性を確認してください。
ADC(Application Default Credentials)
ADC はローカル開発や Cloud Shell で便利です。以下のコマンドでローカルに ADC を設定します。
|
1 2 |
gcloud auth application-default login |
Python 側は基本的に次のように書きます。
|
1 2 3 |
from google.cloud import bigquery client = bigquery.Client() # ADC からプロジェクトと認証を自動取得 |
サービスアカウント鍵(JSON) — 最終手段
CI や外部環境で鍵を使う場合は、鍵の保管とアクセス制御に注意してください。鍵ファイルを直接リポジトリに置かないでください。
|
1 2 3 4 5 6 |
from google.cloud import bigquery from google.oauth2 import service_account credentials = service_account.Credentials.from_service_account_file("path/to/sa.json") client = bigquery.Client(project="my-project", credentials=credentials) |
推奨: 鍵は Secret Manager + KMS で管理し、短期的にローカルで展開する場合のみ取り出す運用にしてください。
Workload Identity / OIDC Federation(推奨:鍵レス運用)
GKE、Cloud Run、Cloud Functions、GitHub Actions などでは Workload Identity や OIDC Federation を使って鍵レスに認証することを推奨します。これにより長期鍵の取り扱いリスクを減らせます。設定手順やロール付与は公式ドキュメントを参照してください。
CI での秘密管理(Secret Manager + KMS のサンプル)
Secret Manager に保存したサービスアカウント JSON を CI から参照して一時的に credentials を作る例です。Secret Manager と KMS を組み合わせると安全性が高まります。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
from google.cloud import secretmanager from google.oauth2 import service_account import json from google.cloud import bigquery def client_from_secret(project_id: str, secret_id: str): sm = secretmanager.SecretManagerServiceClient() name = f"projects/{project_id}/secrets/{secret_id}/versions/latest" payload = sm.access_secret_version(name=name).payload.data.decode("utf-8") info = json.loads(payload) creds = service_account.Credentials.from_service_account_info(info) return bigquery.Client(project=project_id, credentials=creds) |
- 推奨権限: Secret Manager のアクセスは最小権限で付与し、ログを監査可能にします。
実践ハンズオン(最短コマンド順・期待出力)
代表的な作業フローを最短手順で示します。各手順に期待される出力例と、問題が起きたときにまず確認すべき点を添えます。
事前チェックリスト(ダイジェスト)
以下を事前に確認してください。簡潔なチェックでトラブルの多くは解消します。
- Billing が有効であること
- 必要な API(BigQuery, Cloud Storage, Secret Manager 等)が有効化済みであること
- 該当プロジェクトと dataset のロケーション設定(下で詳細)を確認
- SA に必要最小限の IAM 権限が割り当てられていること
最短手順(CLI と Python の組合せ)
まずは CLI で基本環境を整え、その後 Python でロード→クエリ→取得を実行します。期待出力例を併記します。
1) API を有効化(CLI)
|
1 2 |
gcloud services enable bigquery.googleapis.com storage.googleapis.com secretmanager.googleapis.com |
期待出力例(概略):
- "Operation completed" や "Service [bigquery.googleapis.com] enabled" 等
2) データセット作成(CLI)
|
1 2 |
bq --location=asia-northeast1 mk --dataset PROJECT_ID:my_dataset |
期待出力例:
- "Dataset 'PROJECT_ID:my_dataset' successfully created."
3) サンプル CSV を GCS にアップロード(CLI)
|
1 2 |
gsutil cp sample.csv gs://BUCKET_NAME/path/sample.csv |
期待出力例:
- "Copying file://sample.csv [Content-Type=text/csv]..."
4) GCS から BigQuery へロード(Python)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
from google.cloud import bigquery client = bigquery.Client() table_id = "PROJECT_ID.my_dataset.raw_events" job_config = bigquery.LoadJobConfig( source_format=bigquery.SourceFormat.CSV, skip_leading_rows=1, schema=[ bigquery.SchemaField("user_id", "STRING"), bigquery.SchemaField("event_ts", "TIMESTAMP"), bigquery.SchemaField("value", "FLOAT"), ], write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, ) uri = "gs://BUCKET_NAME/path/sample.csv" load_job = client.load_table_from_uri(uri, table_id, job_config=job_config) load_job.result() print("Loaded rows (approx):", getattr(load_job, "output_rows", "<unknown>")) |
期待出力例:
- Loaded rows (approx): 12345
5) dry-run でクエリコストを確認(Python)
|
1 2 3 4 5 6 7 8 9 |
sql = "SELECT user_id, COUNT(*) AS cnt FROM `PROJECT_ID.my_dataset.raw_events` WHERE DATE(event_ts)=@d GROUP BY user_id" job_config = bigquery.QueryJobConfig( query_parameters=[bigquery.ScalarQueryParameter("d", "DATE", "2025-12-01")], dry_run=True, use_query_cache=False ) dry_job = client.query(sql, job_config=job_config) print("Bytes to be processed (estimate):", getattr(dry_job, "total_bytes_processed", "<estimate not available>")) |
期待出力例:
- Bytes to be processed (estimate): 123456789
6) クエリ実行→DataFrame 取得(Python)
|
1 2 3 4 5 6 7 |
from google.cloud import bigquery_storage_v1 bqstorage = bigquery_storage_v1.BigQueryReadClient() query_job = client.query(sql, job_config=bigquery.QueryJobConfig(query_parameters=[...])) df = query_job.result().to_dataframe(bqstorage_client=bqstorage) print("DataFrame shape:", df.shape) |
期待出力例:
- DataFrame shape: (1000, 2)
7) ストリーミング挿入(簡易、Python)
|
1 2 3 4 5 6 |
import uuid rows = [{"user_id": "u1", "event_ts": "2025-12-01T12:00:00Z", "value": 1.2}] row_ids = [str(uuid.uuid4()) for _ in rows] errors = client.insert_rows_json("PROJECT_ID.my_dataset.streaming_table", rows, row_ids=row_ids) print("Insert errors:", errors) |
期待出力例:
-
Insert errors: []
-
トラブル時は Cloud Logging と job.errors、dataset/bucket ロケーションをまず確認してください。
サンプルコード集(主要操作別)
ここでは実務でよく使うスニペットを、用途別に整理して示します。各スニペットは環境に合わせて PROJECT_ID 等を置き換えてください。
クライアント初期化(ADC / Secret Manager 経由)
初期化の代表例と、BigQuery Storage API クライアントの生成例を示します。
|
1 2 3 4 5 6 7 8 9 10 |
from google.cloud import bigquery from google.cloud import bigquery_storage_v1 # ADC を使う標準パターン bq = bigquery.Client() bqstorage = bigquery_storage_v1.BigQueryReadClient() # 明示的な認証を使う(Secret Manager 経由で取得した場合) # client = client_from_secret(project_id="PROJECT_ID", secret_id="SECRET_ID") |
- 注意: bigquery_storage_v1 のクラス名や引数はライブラリバージョンにより差異があります。公式ドキュメントで使用バージョンに合う記述を確認してください。
クエリ実行(同期/非同期)と dry-run
同期実行と dry-run の例を示します。dry-run は概算バイト数を出すための手段です。
|
1 2 3 4 5 6 7 8 9 10 |
# 同期クエリ job = bq.query("SELECT COUNT(*) as cnt FROM `PROJECT_ID.my_dataset.table`") for row in job.result(): print(row.cnt) # dry-run cfg = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False) dry = bq.query("SELECT * FROM `PROJECT_ID.my_dataset.table`", job_config=cfg) print("Estimate bytes:", getattr(dry, "total_bytes_processed", "<estimate unavailable>")) |
- 注意点: dry-run の見積りは概算です。課金は実行時のオンデマンドまたはフラットレートに基づきます。
非同期ジョブの追跡(待機と再試行)
ジョブを投げて別処理で状態確認する場合の簡易ユーティリティです。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
import time from google.api_core import exceptions as gcp_exceptions def wait_job_done(client, job, poll_interval=5, timeout=600): start = time.time() while True: job = client.get_job(job.job_id) if job.state == "DONE": if job.errors: raise RuntimeError(job.errors) return job if time.time() - start > timeout: raise TimeoutError("Job did not finish in time") time.sleep(poll_interval) |
to_dataframe(BigQuery Storage API 併用)
大量データを効率的に DataFrame に取り込む方法。必要なライブラリが揃っていることを確認してください。
|
1 2 3 4 |
# query_job.result().to_dataframe の形式が確実に動作するバージョンを使用すること query_job = bq.query("SELECT * FROM `PROJECT_ID.my_dataset.small_table` LIMIT 10000") df = query_job.result().to_dataframe(bqstorage_client=bqstorage) |
- 重要: pyarrow と pandas のバージョン依存があります。to_dataframe が失敗する場合はライブラリのバージョンを確認してください。
GCS → BigQuery ロード(load_table_from_uri)
Parquet/Avro を推奨。CSV の場合はヘッダやスキップ行を明示してください。
|
1 2 3 4 5 6 7 8 9 |
job_config = bigquery.LoadJobConfig( source_format=bigquery.SourceFormat.PARQUET, write_disposition=bigquery.WriteDisposition.WRITE_APPEND, ) uri = "gs://BUCKET_NAME/path/*.parquet" load_job = bq.load_table_from_uri(uri, "PROJECT_ID.my_dataset.table_parquet", job_config=job_config) load_job.result() print("Loaded rows:", getattr(load_job, "output_rows", "<unknown>")) |
- 注意: load_job.output_rows が None になるケースやライブラリ差があるため、必要に応じて job._properties の statistics を参照する実装(下記参照)で補完してください。
DataFrame → BigQuery(小〜中バッチ)
pandas DataFrame から直接ロードする簡易例です。大規模データは一旦 GCS に吐いてロードを推奨します。
|
1 2 3 |
job = bq.load_table_from_dataframe(df, "PROJECT_ID.my_dataset.to_table", job_config=bigquery.LoadJobConfig(write_disposition=bigquery.WriteDisposition.WRITE_APPEND)) job.result() |
BigQuery → GCS(抽出 / Parquet エクスポート)
|
1 2 3 4 5 6 7 |
extract_job = bq.extract_table( "PROJECT_ID.my_dataset.agg_table", "gs://BUCKET_NAME/export/agg_table-*.parquet", job_config=bigquery.ExtractJobConfig(destination_format=bigquery.DestinationFormat.PARQUET) ) extract_job.result() |
ストリーミング挿入と冪等化(insert_rows_json)
低遅延用途では insert_rows_json を使い、insertId を付与して冪等化します。
|
1 2 3 4 5 6 7 |
import uuid rows = [{"user_id": "u1", "event_ts": "2025-12-01T12:00:00Z", "value": 1.2}] row_ids = [str(uuid.uuid4()) for _ in rows] errors = bq.insert_rows_json("PROJECT_ID.my_dataset.streaming_table", rows, row_ids=row_ids) if errors: print("Insert errors:", errors) |
- 注意点: ストリーミング挿入はストリーミングコストが発生します。高スループットはバッチに切り替えてコストと安定性を確保してください。
ジョブ統計の取得(ジョブ種別ごとの注意)
ジョブ種別(Query / Load / Extract 等)により統計情報の場所が異なります。以下は互換性を考慮した取得例です。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
from google.cloud.bigquery.job import QueryJob, LoadJob def extract_job_stats(job): # 完了を待つ(必要なら) if hasattr(job, "result"): job.result() stats = {} if isinstance(job, QueryJob): stats["bytes_processed"] = getattr(job, "total_bytes_processed", None) or job._properties.get("statistics", {}).get("totalBytesProcessed") try: rows = job.result() stats["result_rows"] = getattr(rows, "total_rows", None) except Exception: stats["result_rows"] = None elif isinstance(job, LoadJob): stats["output_rows"] = getattr(job, "output_rows", None) or job._properties.get("statistics", {}).get("load", {}).get("outputRows") else: stats["statistics"] = getattr(job, "statistics", None) or job._properties.get("statistics") return stats |
- 補足: job._properties は内部データで変化する可能性があります。公式ライブラリのバージョン差を確認し、堅牢にアクセスする実装を検討してください。
再試行(指数バックオフ)
一時的なネットワークエラーや 429/503 に対する簡易リトライの例です。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
import random, time from google.api_core import exceptions as gcp_exceptions def retry_with_backoff(func, max_attempts=5): backoff = 1.0 for attempt in range(max_attempts): try: return func() except (gcp_exceptions.ServiceUnavailable, gcp_exceptions.DeadlineExceeded, gcp_exceptions.TooManyRequests) as e: sleep = backoff + random.random() time.sleep(sleep) backoff *= 2 raise |
ユニットテスト(モックの例)
外部 API 呼び出しをモックしてロジック部分のみをテストする簡易例です。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
from unittest import mock import my_module def test_run_query(): with mock.patch("google.cloud.bigquery.Client") as MockClient: mock_client = MockClient.return_value mock_job = mock.Mock() mock_job.result.return_value = [{"user_id": "u1", "cnt": 3}] mock_client.query.return_value = mock_job res = my_module.run_query("SELECT ...") assert res[0]["user_id"] == "u1" |
運用・監視・コスト・セキュリティ(運用向け注意点)
運用では「ジョブの可視化」「異常の自動検知」「最小権限の認証設計」が重要です。ここでは監視指標・コスト比較・CI 設計・セキュリティのベストプラクティスをまとめます。
ジョブ監視とアラート設計
まずはジョブの状態をログ化し、重要指標でアラートを作ります。監視ポイント例は以下です。
- 失敗ジョブ数(短時間内で閾値を超えたらアラート)
- 処理バイト量(突発的な増加を検知)
- スロット使用率(フラットレート運用時)
- ストリーミングバッファサイズ(遅延時の指標)
Cloud Logging のジョブログや Cloud Monitoring のカスタムメトリクスを用いて通知を行ってください。
コスト最適化(dry-run、オンデマンド、フラットレート、ストリーミング)
用途ごとのコスト特性を簡潔に比較します。詳しい料金は公式ページを参照してください(下部にリンク)。
| 種類 | 課金単位 | 向き | 注意点 |
|---|---|---|---|
| オンデマンド | 処理バイト数(TB) | 不定期クエリ、小〜中規模 | 実行ごとに料金変動 |
| フラットレート(スロット) | 固定料金(月額) | 定常的な大量クエリ | スロット設計が必要 |
| ストリーミング挿入 | 挿入量・操作 | 低遅延挿入 | ストリーミング費用が発生、バッファ特性に注意 |
- 補足: クエリ実行前に dry-run でスキャン量を確認し、オンデマンドとフラットレートのどちらが有利か検討してください。公式料金ページ: https://cloud.google.com/bigquery/pricing
セキュリティと IAM 設計(最小権限の例)
最小権限でロールを割り当てるとリスクを下げられます。代表的な役割例(用途に応じてリソース単位で付与):
- ジョブ実行(クエリ/ロード): roles/bigquery.jobUser
- データ読み取り: roles/bigquery.dataViewer(データセット単位で付与)
- データ書き込み: roles/bigquery.dataEditor(テーブル/データセット単位)
- GCS からの読み取り: roles/storage.objectViewer(対象バケット)
-
Secret Manager 参照: roles/secretmanager.secretAccessor
-
推奨: 管理者ロールは必要最小限に限定し、可能な限りリソース(データセットやバケット)単位でアクセスを付与してください。
CI/CD と Workload Identity(鍵レス運用)
GitHub Actions 等では OIDC を使って短期トークンで GCP リソースにアクセスする設定を推奨します。大まかなステップは次の通りです。
- Workload Identity Pool を作成し、プロバイダを設定
- GitHub Actions から OIDC トークンでフェデレーションし、サービスアカウントを担わせる
- CI 側では長期鍵を持たない
(概念例、プレースホルダを使った設定例)
|
1 2 3 4 5 |
- uses: google-github-actions/auth@v1 with: workload_identity_provider: "projects/PROJECT_NUMBER/locations/global/workloadIdentityPools/POOL/providers/PROVIDER" service_account: "SERVICE_ACCOUNT_RESOURCE" # ここを自分の環境に置き換える |
トラブルシューティングチェックリスト(即効確認)
まずは以下を順に確認してください。多くの問題はこれで解決します。
- Billing が有効かどうか
- 必要な API が有効化されているか
- プロジェクト/データセット/バケットのロケーションが互換性を満たしているか
- サービスアカウントに必要最小限の権限が付与されているか
- job.errors や Cloud Logging にエラーメッセージが出ていないか
-
クォータやレート制限(429 等)に達していないか
-
補足: データセットと GCS バケットの「ロケーション」については例外やマルチリージョンの取り扱いがあるため、実運用では公式の「Dataset location」ドキュメントを参照して確認してください(リンクは参考情報に掲載)。
まとめ
要点を短くまとめます。認証・クライアント初期化、クエリ、ロード、ストリーミング、運用の主要フローを実行し、期待出力を確認してから本番運用に移行してください。
下に主要なチェックポイントを列挙します。
- 事前に Billing と API の有効化、ロケーション整合を確認すること。
- 認証は鍵レス(Workload Identity)を第一選択にし、必要に応じて Secret Manager/KMS を用いること。
- dry-run でクエリのスキャン量を確認し、オンデマンドとフラットレートの選択を検討すること。
- to_dataframe を使う場合は BigQuery Storage API(google-cloud-bigquery-storage)と pyarrow の互換性を確認すること。
- ジョブ統計や output_rows の取り方はジョブ種別とライブラリバージョンで差があるため、堅牢に取得する実装を用意すること。
参考情報
以下は該当機能や制約を確認するための公式ドキュメントです。実運用前に必ず最新の公式ドキュメントを参照してください。
- BigQuery のデータセットとリージョン(ロケーション): https://cloud.google.com/bigquery/docs/dataset-locations
- BigQuery Storage API(Python ライブラリ): https://googleapis.dev/python/bigquery-storage/latest/index.html
- BigQuery Python クライアント ライブラリ: https://googleapis.dev/python/bigquery/latest/index.html
- BigQuery 料金(Pricing): https://cloud.google.com/bigquery/pricing
- Workload Identity Federation(OIDC): https://cloud.google.com/iam/docs/workload-identity-federation
以上を基に、まずはテストプロジェクトでサンプルを実行し、ログとジョブ統計を確認してから本番適用してください。