AWS Data Wranglerを使って、簡単にETL処理を実現する

チュートリアル内容

aws.amazon.com


これまで

  • Amazon Athena, Amazon Redshiftからデータを取得しETL(*)処理を行う際、PyAthenaやboto3、Pandasなどを利用することが多い。
    • 本来実施したいETLコーディングまでに接続設定を書いたり、各種コーディングが必要

ブレイクスルー

  • Data Wraglerを利用することで、ETLの処理の記述内容に集中することができる。
  • AthenaやS3上のCSVから数行でPandasを利用できる

Data Wraglerの特徴

  • インスタンスに対してpipでインストールできる
  • Lambda Layerとしての利用やGlue上でeggファイルをアップロードして利用できる。

チュートリアルの実行

Athena

1-4.サービス一覧から“Athena”を選択します。

1-5.下記クエリを実行し、Athenaのデータベースとテーブルを作成します。

データベースの作成

CREATE DATABASE [YOUR DATABASE NAME]; 

しかし、Athenaでエラーがでる f:id:triwave33:20191002000144p:plain

原因はデータベース名にハイフンをつけたから

Athenaの外部テーブル作成でエラー ハイフンに注意! | knowledge capsule

アンダーバーに変えて無事作成

テーブルの作成

"Only one sql allowed" エラーが発生 最後の2行のstatementが改行されていたのが原因。 Format queryボタンをクリックすると整列されて問題解決

確認のクエリ

select count(*) from green_tripdata; 

SageMaker Notebookの起動

新しいノートブックの作成

  • IAMロールのアタッチ:S3, Athena フルアクセス

ノートブック実行

・conda_python3カーネルで新規ファイル作成

・Data Wranglerのインストール

!pip install awswrangler

pythonからAthenaでクエリを実行するコードをノートブックで実行

import pandas
import awswrangler

session = awswrangler.Session()
df = session.pandas.read_sql_athena(
    sql="select * from green_tripdata",
    database="[YOUR DATABASE NAME]"
)

print(df)
ETLの実行

4-1.“trip_distance”が0のデータは分析対象外とみなし、行の削除処理を行います。

## trip_distanceが0の値を抽出、件数確認
rows_drop = df.index[df["trip_distance"] == 0.00]

print(df.loc[rows_drop].count())

## trip_distanceが0の値を削除、件数確認
df_drop = df.drop(rows_drop)
print(df_drop)

df_lens = df_drop.count()
print(df_lens)
さらにデータカラムの書き換え(省略)
CSVファイルをS3に出力
## trip_distanceが0の値を抽出、件数確認
rows_drop = df.index[df["trip_distance"] == 0.00]

print(df.loc[rows_drop].count())

## trip_distanceが0の値を削除、件数確認
df_drop = df.drop(rows_drop)
print(df_drop)

df_lens = df_drop.count()
print(df_lens)

お気持ち

  • S3, Athenaとの連携をSageMaker上で行うことができる
  • AWS Data Wranglerを使ってクエリを発行、pandas dfで取得
  • そのままPythonで処理できるところがメリット

疑問・解決していないこと

  • 各サービス(S3, Athena, SageMaker)が独立していて全体のアーキテクチャが見えにくい。一括して管理できるような仕組みはない?
  • Lambda layer としての利用やGlue上での利用の記載はない