Contents
AWS GlueとPythonでデータパイプラインを構築する基礎知識
AWS Glueは、クラウド上で自動化されたETL(抽出・変換・読み込み)処理を実行するサーバーレスなサービスです。Pythonベースのスクリプトと連携することで、従来のETLツールよりも柔軟性とコスト効率に優れたデータパイプライン構築が可能になります。特に、S3との連携やPySparkを活用した並列処理はAWS Glueの強みであり、本記事ではこれらを中心に実践的な手順を解説します。
AWS Glueの役割と特徴
AWS Glueはデータウェアハウス構築やリアルタイム分析をサポートするためのETLサービスとして設計されています。主な特徴は以下の通りです。
- 自動メタデータ生成:S3やRDSなどのデータソースからカタログ情報を自動で収集
- PySparkとの連携:PythonスクリプトでDataFrame APIを活用した柔軟な変換処理が可能
- サーバーレス構成:インフラ管理不要で、ジョブ実行時にリソースを動的に割り当て
データパイプラインアーキテクチャの概要
AWS Glueベースのデータパイプラインは、S3に格納された原始データを入力として受け取り、変換処理を行った後、加工後のデータをS3やRedshiftなどに出力する構成が一般的です。このアーキテクチャでは以下のような要素が関与します(図略)。
| 項目 | 説明 |
|---|---|
| 入力ソース | S3、RDS、DynamoDBなどのストレージ |
| 変換処理 | PySparkやAWS GlueのDataFrame APIを使用した処理 |
| 出力先 | S3、Redshift、Snowflakeなど |
AWS Glueジョブの設定手順
データパイプライン構築の第一歩は、AWS Glueジョブを正しく設定することです。ここではコンソールとCLIでの基本的な作業フローを解説します。
Glueカタログの作成
Glueカタログは、ETL処理で参照するデータソース(S3やRDS)のメタ情報を一元管理するための機能です。以下が手順です。
- AWSコンソールから「Glue」サービスを開き、「Crawlers(クローラー)」を選択
- 「Create crawler(クローラー作成)」をクリックし、S3バケットやDynamoDBテーブルを指定
- クローラーの実行後、自動生成されたデータベースとテーブルがカタログに登録される
注意: データソースが頻繁に更新される場合は、スケジュール付きクローラーを作成して定期的にメタデータを更新する必要があります。
スクリプトのアップロードと実行設定
AWS Glueジョブは、PythonまたはScalaスクリプトで構築します。以下が基本的な手順です。
- スクリプトの作成:PySparkを使用したETL処理を記載する(例:
glue_script.py) - AWS Glueコンソールで「Jobs」を選択し、「Create job(ジョブ作成)」をクリック
- スクリプトファイルをアップロードし、実行環境(PythonバージョンやGlueバージョン)を指定
以下は、シンプルなスクリプト例です。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glue_context = GlueContext(sc) spark = glue_context.spark_session # S3からデータ読み込み df = spark.read.format("csv").option("header", "true").load("s3://your-bucket/input/") # 簡単な変換処理(例: 列名の置き換え) df_renamed = df.withColumnRenamed("old_col_name", "new_col_name") # 出力先に保存 df_renamed.write.format("parquet").mode("overwrite").save("s3://your-bucket/output/") |
ポイント: 実行環境のPythonバージョン(例: Python 3.9)や、依存ライブラリ(pandasなど)を明示的に指定する必要があります。
PythonスクリプトによるETL処理の実装
AWS Glueジョブで実行可能なPythonコードは、PySparkとDataFrame APIを活用することで、効率的なデータ変換が可能です。以下に具体的な処理例を示します。
PySparkとの連携
AWS GlueではバックエンドとしてApache Sparkを使用するため、PySparkの知識が必須です。代表的な処理ステップは以下の通りです。
- DataFrameの読み込み
- データ変換(フィルタリング・結合・集計)
- 出力形式に合わせた保存
以下は、CSVをParquet形式に変換する例です。
|
1 2 3 4 5 6 7 8 9 |
# S3からCSV読込 df = spark.read.option("header", "true").csv("s3://your-bucket/input/*.csv") # フィルタリング処理(例: 年齢が20歳以上のレコード) filtered_df = df.filter(df.age >= 20) # 出力先に保存(Parquet形式でパーティショニングあり) filtered_df.write.partitionBy("gender").format("parquet").mode("overwrite").save("s3://your-bucket/output/") |
注意点: パーティショニングやファイル形式の選定は、クエリ性能に大きく影響するため慎重に行う必要があります。
データ変換ロジックのサンプルコード
以下は、複雑なデータ変換を含む例です。UDF(ユーザー定義関数)や型チェックが含まれています。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
from pyspark.sql.functions import udf, col # 型チェッカーUDFの定義 def check_data_type(value): try: return float(value) except ValueError: return None udf_check = udf(check_data_type) # カラム変換処理(例: "price"カラムの値を浮動小数点に変換) converted_df = df.withColumn("price", udf_check(col("price"))).filter(col("price").isNotNull()) |
警告: UDFはSpark Executorに負荷をかけるため、大量データ処理には避けた方が良いです。可能であれば、ビルトイン関数やVectorized UDFを使用してください。
S3との連携方法とデータフロー設計
AWS GlueはS3との連携が不可欠です。入力/出力のバケット指定や、性能を高めるパーティショニング・ファイル形式の選定が重要です。
入力/出力バケットの指定
Glueジョブ実行時にS3パスを指定する必要があります。以下が一般的な構文です。
|
1 2 3 |
input_path = "s3://your-bucket/input/" output_path = "s3://your-bucket/output/" |
ポイント: 入力ファイルは通配子(例:
*.csv)で一括読み込み可能ですが、大規模データではS3 Prefixを指定して部分読み込みする方が効率的です。
S3バケット命名規則とセキュリティ設定の例:
- 命名規則:
company-name-environment-data-type(例:acme-prod-sales) - セキュリティ: IAMロールでアクセス制限を設定し、データ暗号化(SSE-KMSなど)を有効化
パーティショニングとファイル形式の選定
以下が性能向上に役立つ設定の具体例です(表)。
|
1 2 3 4 5 6 |
| パラメータ | 推奨値 | 補足 | |----------|--------|------| | **ファイル形式** | Parquet/ORC | 高圧縮とクエリ性能を確保 | | **パーティショニング** | 年齢・性別など | 並列処理の効率化に役立つ | | **圧縮設定** | GZIP / Snappy | データ量削減と読み込み速度向上 | |
例: 性別や年齢などのフィールドをパーティションキーとして指定すると、クエリ時に不要なデータのスキャンが避けられ、処理時間を短縮できます。
データ変換におけるベストプラクティス
ETL処理においては、データ品質管理とパフォーマンス最適化が不可欠です。以下に具体的な実装例を示します。
型チェックとデータ品質管理
入力データの型不一致や異常値を排除するには、DataFrame APIでフィルタリング処理を行うことが推奨されます。
|
1 2 3 |
# 数値カラムがNaNの場合にフィルタリング cleaned_df = df.filter(col("price").isNotNull() & (col("price") > 0)) |
注意: データ品質管理のため、変換後のデータを別バケットに保存してリトライ処理や監査用として活用するのも有効です。
並列処理の最適化方法
PySparkの特性を活かした並列処理は、以下のように設定することで実現できます。
- パーティション数の調整:
repartition()やcoalesce()でデータの分割・統合を行う - キャッシュ活用: 大規模なDataFrameが複数回使用される場合は
cache()を適用
|
1 2 3 |
# パーティショニング数を指定(例: 100パーティション) replicated_df = df.repartition(100, "user_id") |
AWS GlueとLambdaの統合ケース
Lambda関数とGlueジョブを連携することで、イベント駆動型処理が可能になります。特にS3イベントやCloudWatchアラームによるトリガー設定は、リアルタイム分析に有効です。
イベント駆動型処理の構成
Lambda関数を介してGlueジョブを起動する際には、以下のようなフローが典型的です。
- S3イベント発生(例: 新しいCSVファイルのアップロード)
- Lambdaトリガー実行 → GlueジョブをAPI経由で起動
- 処理結果をS3に保存し、次のステップへ
トリガー設定の具体例
AWSコンソール上でLambda関数にトリガーを追加する手順は以下の通りです。
- Lambdaコンソール → 関数を選択 → 「トリガー」タブを開く
- 「S3」を選択し、対象バケットとイベントタイプ(例:
s3:ObjectCreated:*)を指定 - Glueジョブの起動用API(Glue API or AWS CLI)をLambda関数に記述
以下はLambda関数でGlueジョブを実行するPythonコードの一部です。
|
1 2 3 4 5 6 7 |
import boto3 def lambda_handler(event, context): client = boto3.client('glue') response = client.start_job_run(JobName='your-glue-job-name') print(response) |
ポイント: Lambda関数は短期間で処理を終了する必要があるため、Glueジョブの実行に時間がかかる場合は別途ステータスチェックを行う必要があります。
まとめ
- AWS GlueではPythonスクリプトとPySparkを組み合わせてETL処理が可能
- S3との連携やデータ変換におけるベストプラクティスの確認が重要
- Lambdaと連携することでイベント駆動型処理が実現可能
記事内のサンプルコードをベースに、自社環境でパイプライン構築を開始してみましょう。