iTAC_Technical_Documents

アイタックソリューションズ株式会社

ブログ名

【Azure】Azure Databricks を使用してデータ操作を行う

はじめに

こちらの記事は【Azure】環境構築の続きとなります。
Azure Databricks の作成までがまだの方は先にこちらの記事を読んでAzure の環境構築を行って下さい。

本記事では、Azure Databricks を使用してのデータ操作を、「チュートリアル:Azure Databricks を使用してデータの抽出、変換、読み込みを行う」を行いたいと思います。

手順

前の記事で、クラスターの作成まで行っている前提で話を進めます。

  1. Notebookの作成
  2. ADLS Gen 2 アカウント用の既定のサービス プリンシパル資格情報を設定
  3. Azure Data Lake Storage Gen2 アカウントにサンプル データを取り込む
  4. Azure Data Lake Storage Gen2 アカウントからデータを抽出する
  5. Azure Databricks でデータを変換する
  6. Azure SQL Data Warehouse へのデータの読み込み

1.Notebookの作成

左の「Workspace」を選択し、「Users」より自身のアカウントを選択します。
選択すると下図のような表示になりますので、右クリックを行い、「Create」->「Notebook」を選択します。

f:id:shizuuuka0202:20200303001428p:plain

Notebook の名前(任意の名称)を入力し、利用する言語を選択し(ここでは Scala)、先ほど作成したクラスターを選択し、「Create」を選択します。

f:id:shizuuuka0202:20200303001458p:plain

これでNoteBookの作成は完了です。

2.ADLS Gen 2 アカウント用の既定のサービス プリンシパル資格情報を設定

次に、Spark セッションでアクセスされる ADLS Gen 2 アカウント用の既定のサービス プリンシパル資格情報を設定します。

項目
storage-account-name Azure Data Lake Storage Gen2 ストレージアカウント名
app-id AAD にアプリ登録した際のアプリケーションID
password AAD にアプリ登録し、その後作成したクライアントシークレットの値
fileSystemName ファイルシステム
tenant-id AAD にアプリ登録した際のテナントID
val storageAccountName = "<storage-account-name>"
val appID = "<app-id>"
val secret = "<secret>"
val fileSystemName = "<file-system-name>"
val tenantID = "<tenant-id>"

spark.conf.set("fs.azure.account.auth.type." + storageAccountName + ".dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type." + storageAccountName + ".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id." + storageAccountName + ".dfs.core.windows.net", "" + appID + "")
spark.conf.set("fs.azure.account.oauth2.client.secret." + storageAccountName + ".dfs.core.windows.net", "" + secret + "")
spark.conf.set("fs.azure.account.oauth2.client.endpoint." + storageAccountName + ".dfs.core.windows.net", "https://login.microsoftonline.com/" + tenantID + "/oauth2/token")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
dbutils.fs.ls("abfss://" + fileSystemName  + "@" + storageAccountName + ".dfs.core.windows.net/")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false")

上記コードをNoteBookのコマンドに貼り付けたら「Shift + Enter」でコードの実行を行ってください。
コードを実行すると、下記のように資格情報が表示されます。

f:id:shizuuuka0202:20200303001617p:plain

3.Azure Data Lake Storage Gen2 アカウントにサンプル データを取り込む

次に、Azure Data Lake Storage Gen2 にサンプルデータを取り込みます。
NoteBook の次のセルに以下のコードをコピーしてください。

%sh wget -P /tmp https://raw.githubusercontent.com/Azure/usql/master/Examples/Samples/Data/json/radiowebsite/small_radio_json.json

先ほどと同様に、「Shift + Enter」でコードの実行を行います。
これにより、一時的にクラスター内に「small_radio_json.json」というファイルをgithubよりダウンロードします。

続いて、ダウンロードした「small_radio_json.json」を Azure Data Lake Storage Gen2 に格納します。
NoteBook の次のセルに以下のコードをコピーして「Shift + Enter」でコードの実行を行ってください。

dbutils.fs.cp("file:///tmp/small_radio_json.json", "abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/")

コードの実行に成功したら、Azure Data Lake Storage Gen2 に「small_radio_json.json」が格納されたことを確認しましょう。

確認には以下の2通りの方法があります。

・Azure Storage Explorerを利用する方法
Azure Storage Explorer を利用して「small_radio_json.json」が格納されたことを確認します。
Azure Storage Explorerをまだダウンロードしていない場合はダウンロードして利用してください。

f:id:shizuuuka0202:20200303001548p:plain

・Azure Portal 画面を利用する方法
Azure Data Lake Storage Gen2 を選択し、左メニューの「Storage Explorer (プレビュー)」を選択し、「ファイルシステム」を開くと、ダウンロードした「small_radio_json.json」が表示されることを確認できます。

f:id:shizuuuka0202:20200303001810p:plain

4.Azure Data Lake Storage Gen2 アカウントからデータを抽出する

次に Azure Data Lake Storage Gen2 からデータを抽出します。
Notebook の次のセルに以下のコードをコピーし、「Shift + Enter」で実行してください。

val df = spark.read.json("abfss://" + file-system-name + "@" + storage-account-name + ".dfs.core.windows.net/small_radio_json.json")

これによりデータを取り込むことができました。
続いて取り込んだデータを表示します。
Notebook の次のセルに以下のコードをコピーし、「Shift + Enter」で実行してください。

df.show()

実行すると、以下のように「small_radio_json.json」のデータが出力されます。

f:id:shizuuuka0202:20200303001911p:plain

5.Azure Databricks でデータを変換する

続いて、Azure Databricks にデータを取り込むことができたので、今度はデータを加工を行いたいと思います。
未加工のサンプル データ ファイル「small_radio_json.json」は、ラジオ局のリスナー情報を収集したものであり、さまざまな列を含んでいます。

このデータを変換して、データセットから特定の列だけを取得します。作成したデータフレームから、firstName、lastName、gender、location、level の各列だけを取得します。

Notebook の次のセルに以下のコードをコピーし、「Shift + Enter」で実行してください。

val specificColumnsDf = df.select("firstname", "lastname", "gender", "location", "level")
specificColumnsDf.show()

実行すると、以下のように「small_radio_json.json」の指定した列(firstName、lastName、gender、location、level )のデータのみが出力されていることが確認できます。

f:id:shizuuuka0202:20200303001959p:plain

さらに上記のデータを変換し、level 列の名前を subscription_type に変更します。

Notebook の次のセルに以下のコードをコピーし、「Shift + Enter」で実行してください。

val renamedColumnsDF = specificColumnsDf.withColumnRenamed("level", "subscription_type")
renamedColumnsDF.show()

実行すると、以下のように最後の「level」列の名前が「subscription_type」に変更されたことが確認できます。

f:id:shizuuuka0202:20200303002052p:plain

このように、Azure Databricks に読み込んだデータを変換することができることを確認できました。

6.Azure SQL Data Warehouse へのデータの読み込み

最後に変換したデータを、 Azure SQL Data Warehouse にアップロードしてみたいと思います。
Azure Databricks 用の Azure SQL Data Warehouse コネクタを使用して、データフレームを SQL データ ウェアハウスのテーブルとして直接アップロードします。
前述のように、SQL Data Warehouse コネクタによって、Azure Blob Storage が一時ストレージとして使用され、Azure Databricks と Azure SQL Data Warehouse との間でデータがアップロードされます。
これは、PolyBase という仕組みを利用されています。PolyBaseについては、PolyBaseとはを確認してください。
それにはまず、そのストレージ アカウントに接続するための構成を指定する必要があります。

項目
blob-storage-account-name ストレージアカウント名
blob-container-name コンテナ―名
access-key Azure Blob Storage アカウント画面の左メニューにある「アクセスキー」から取得したキー
val blobStorage = "<blob-storage-account-name>.blob.core.windows.net"
val blobContainer = "<blob-container-name>"
val blobAccessKey =  "<access-key>"

上記コードを「Shift + Enter」で実行後、次のアクションより、Azure Databricks と Azure SQL Data Warehouse の間でデータを移動するときに使用する一時フォルダーを指定するためのコードを実行します。

val tempDir = "wasbs://" + blobContainer + "@" + blobStorage +"/tempDirs"

続いて、Azure Blob Storage のアクセス キーを構成に格納します。
このアクションにより、アクセスキーをプレーンテキストのままノートブックに保持する必要がなくなります。

val acntInfo = "fs.azure.account.key."+ blobStorage
sc.hadoopConfiguration.set(acntInfo, blobAccessKey)

続いてAzure SQL Data Warehouse インスタンスに接続するための値を指定します。
データベースサーバー名には「.database.windows.net」のように完全修飾サーバー名を入力してください。

項目
database-name データウェアハウス名
database-server-name データベースサーバー名
user-name サーバー管理者名
password サーバー管理者のパスワード
//SQL Data Warehouse related settings
val dwDatabase = "<database-name>"
val dwServer = "<database-server-name>"
val dwUser = "<user-name>"
val dwPass = "<password>"
val dwJdbcPort =  "1433"
val dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
val sqlDwUrl = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";$dwJdbcExtraOptions"
val sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass

最後に Azure SQL Data Warehouse に「SampleTable」というテーブルを作成し、変換済みのデータフレーム (renamedColumnsDf) をテーブルとして SQL データ ウェアハウスに格納します、
Notebook の次のセルに以下のコードをコピーし、「Shift + Enter」で実行してください。

spark.conf.set(
    "spark.sql.parquet.writeLegacyFormat",
    "true")

renamedColumnsDF.write.format("com.databricks.spark.sqldw").option("url", sqlDwUrlSmall).option("dbtable", "SampleTable").option( "forward_spark_azure_storage_credentials","True").option("tempdir", tempDir).mode("overwrite").save()

こちらのサンプルでは「mode」で「"overwrite"」を指定しているため、上書きとなります。
データの追加を行いたい場合は「mode」で「"append"」を指定してください。

"This request is not authorized to perform this operation (この要求には、この操作を実行する権限がありません)"と表示された場合

このサンプルでは 「forward_spark_azure_storage_credentials」 フラグを使用します。 これにより、SQL Data Warehouse は、アクセス キーを使用して BLOB ストレージからのデータにアクセスします。 これは、サポートされている唯一の認証方法です。 Azure Blob Storage が仮想ネットワークを選択するように制限されている場合、SQL Data Warehouse にはアクセス キーではなく、マネージド サービス ID が必要です。

Azure SQL Data Warehouse に SSMS で接続し、「SampleTable」という名前のデータベース内に、「renamedColumnsDf」 データフレームと同じデータが存在していることが確認できるかと思います。

以上で、Azure Data Lake Storage Gen2 に存在するファイルを Azure Databricks で読み取り、変換処理を行い、Azure SQL Data Warehouse に格納するというシナリオが実現できました。


前の記事へ 目次に戻る