AWS Data Wranglerを使って、簡単にETL処理を実現する
チュートリアル内容
これまで
- Amazon Athena, Amazon Redshiftからデータを取得しETL(*)処理を行う際、PyAthenaやboto3、Pandasなどを利用することが多い。
- 本来実施したいETLコーディングまでに接続設定を書いたり、各種コーディングが必要
ブレイクスルー
- Data Wraglerを利用することで、ETLの処理の記述内容に集中することができる。
- AthenaやS3上のCSVから数行でPandasを利用できる
Data Wraglerの特徴
- インスタンスに対してpipでインストールできる
- Lambda Layerとしての利用やGlue上でeggファイルをアップロードして利用できる。
チュートリアルの実行
Athena
1-4.サービス一覧から“Athena”を選択します。
1-5.下記クエリを実行し、Athenaのデータベースとテーブルを作成します。
データベースの作成
CREATE DATABASE [YOUR DATABASE NAME];
しかし、Athenaでエラーがでる
原因はデータベース名にハイフンをつけたから
Athenaの外部テーブル作成でエラー ハイフンに注意! | knowledge capsule
アンダーバーに変えて無事作成
テーブルの作成
"Only one sql allowed" エラーが発生 最後の2行のstatementが改行されていたのが原因。 Format queryボタンをクリックすると整列されて問題解決
確認のクエリ
select count(*) from green_tripdata;
SageMaker Notebookの起動
新しいノートブックの作成
- IAMロールのアタッチ:S3, Athena フルアクセス
ノートブック実行
・conda_python3カーネルで新規ファイル作成
・Data Wranglerのインストール
!pip install awswrangler
・pythonからAthenaでクエリを実行するコードをノートブックで実行
import pandas import awswrangler session = awswrangler.Session() df = session.pandas.read_sql_athena( sql="select * from green_tripdata", database="[YOUR DATABASE NAME]" ) print(df)
ETLの実行
4-1.“trip_distance”が0のデータは分析対象外とみなし、行の削除処理を行います。
## trip_distanceが0の値を抽出、件数確認 rows_drop = df.index[df["trip_distance"] == 0.00] print(df.loc[rows_drop].count()) ## trip_distanceが0の値を削除、件数確認 df_drop = df.drop(rows_drop) print(df_drop) df_lens = df_drop.count() print(df_lens)
さらにデータカラムの書き換え(省略)
CSVファイルをS3に出力
## trip_distanceが0の値を抽出、件数確認 rows_drop = df.index[df["trip_distance"] == 0.00] print(df.loc[rows_drop].count()) ## trip_distanceが0の値を削除、件数確認 df_drop = df.drop(rows_drop) print(df_drop) df_lens = df_drop.count() print(df_lens)
お気持ち
- S3, Athenaとの連携をSageMaker上で行うことができる
- AWS Data Wranglerを使ってクエリを発行、pandas dfで取得
- そのままPythonで処理できるところがメリット
疑問・解決していないこと
- 各サービス(S3, Athena, SageMaker)が独立していて全体のアーキテクチャが見えにくい。一括して管理できるような仕組みはない?
- Lambda layer としての利用やGlue上での利用の記載はない