AWS GlueでネストされたJSONファイルをCSVファイルやParquetに変換する
はじめに
AWS GlueのRelationalizeというTransformを利用して、ネストされたJSONをCSVファイルやParquetに変換する方法をご紹介します。CSV形式に変換することでリレーショナルデータベースに簡単にインポートできます。また、Parquetフォーマットに変換することでAthena、Redshift Spectrum、EMRからより高速にクエリできるようになります。
Relationalizeとは
Relationalizeは、AWS Glueが提供するRelationalizeというTransformで、DynamicFrameをリレーショナル(行と列)形式に変換します。データのスキーマに基づいて、ネストした構造化データをフラットな構造化データに変換してDynamicFrameを作成します。出力は、複数の表にデータを書き込むことができる、DynamicFrameのコレクションです。
サンプル1:ネストされたJSON
変換したいネストしたJSONのデータは以下のとおりです。なお、AWS GlueのSparkは、JSONをminify形式、prettify形式の何れの形式でも変換可能です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | { "player" : { "username" : "user1" , "characteristics" : { "race" : "Human" , "class" : "Warlock" , "subclass" : "Dawnblade" , "power" : 300, "playercountry" : "USA" }, "arsenal" : { "kinetic" : { "name" : "Sweet Business" , "type" : "Auto Rifle" , "power" : 300, "element" : "Kinetic" }, "energy" : { "name" : "MIDA Mini-Tool" , "type" : "Submachine Gun" , "power" : 300, "element" : "Solar" }, "power" : { "name" : "Play of the Game" , "type" : "Grenade Launcher" , "power" : 300, "element" : "Arc" } }, "armor" : { "head" : "Eye of Another World" , "arms" : "Philomath Gloves" , "chest" : "Philomath Robes" , "leg" : "Philomath Boots" , "classitem" : "Philomath Bond" }, "location" : { "map" : "Titan" , "waypoint" : "The Rig" } } } |
サンプル2:フラット化されたJSON
以下のような、フラットなJSONに変換します。最終的には、この形式をCSVやParquetにファイル出力します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | { "player.username" : "user1" , "player.characteristics.race" : "Human" , "player.characteristics.class" : "Warlock" , "player.characteristics.subclass" : "Dawnblade" , "player.characteristics.power" : 300, "player.characteristics.playercountry" : "USA" , "player.arsenal.kinetic.name" : "Sweet Business" , "player.arsenal.kinetic.type" : "Auto Rifle" , "player.arsenal.kinetic.power" : 300, "player.arsenal.kinetic.element" : "Kinetic" , "player.arsenal.energy.name" : "MIDA Mini-Tool" , "player.arsenal.energy.type" : "Submachine Gun" , "player.arsenal.energy.power" : 300, "player.arsenal.energy.element" : "Solar" , "player.arsenal.power.name" : "Play of the Game" , "player.arsenal.power.type" : "Grenade Launcher" , "player.arsenal.power.power" : 300, "player.arsenal.power.element" : "Arc" , "player.armor.head" : "Eye of Another World" , "player.armor.arms" : "Philomath Gloves" , "player.armor.chest" : "Philomath Robes" , "player.armor.leg" : "Philomath Boots" , "player.armor.classitem" : "Philomath Bond" , "player.location.map" : "Titan" , "player.location.waypoint" : "The Rig" } |
JSONファイルからDynamicFrameを作成する方法
ファイルから直接DynamicFrameを生成する
ネストしていないキーバリュー形式のJSONファイルは、sparkSession.read.json(<file_path>)
で、DataFrameに変換できます。しかし、ネストしたJSONファイルは、この方式ではネストしたJSON形式を認識できません。
1 2 | datasource0 = DynamicFrame.fromDF(_datasource0, glueContext, 'datasource0' ) |
GlueデータカタログのテーブルからDynamicFrameを生成する
AWS GlueのコンソールからデータソースにGlueデータカタログのテーブル、データターゲットにS3(JSON)を指定すると、ApplyMapping.apply()にカラムの対応付けしたmappings引数を指定したコードが自動生成されます。
ApplyMapping.apply()でDynamicFrameを生成する
GlueデータカタログのテーブルからDynamicFrameを生成して、ApplyMapping.apply()で、フラットなJSONのDynamicFrameに変換します。フラットなJSONのDynamicFrameに変換するためのルールは、AWS Glueのコンソールのmappings引数で指定したキーと値に従います。mappings引数は、Glueが自動生成していますが、手で書くとなると辛い作業です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | applymapping1 = ApplyMapping. apply ( frame = datasource_df, mappings = [ ( "player.username" , "string" , "`player.username`" , "string" ), ( "player.characteristics.race" , "string" , "`player.characteristics.race`" , "string" ), ( "player.characteristics.class" , "string" , "`player.characteristics.class`" , "string" ), ( "player.characteristics.subclass" , "string" , "`player.characteristics.subclass`" , "string" ), ( "player.characteristics.power" , "int" , "`player.characteristics.power`" , "int" ), ( "player.characteristics.playercountry" , "string" , "`player.characteristics.playercountry`" , "string" ), ( "player.arsenal.kinetic.name" , "string" , "`player.arsenal.kinetic.name`" , "string" ), ( "player.arsenal.kinetic.type" , "string" , "`player.arsenal.kinetic.type`" , "string" ), ( "player.arsenal.kinetic.power" , "int" , "`player.arsenal.kinetic.power`" , "int" ), ( "player.arsenal.kinetic.element" , "string" , "`player.arsenal.kinetic.element`" , "string" ), ( "player.arsenal.energy.name" , "string" , "`player.arsenal.energy.name`" , "string" ), ( "player.arsenal.energy.type" , "string" , "`player.arsenal.energy.type`" , "string" ), ( "player.arsenal.energy.power" , "int" , "`player.arsenal.energy.power`" , "int" ), ( "player.arsenal.energy.element" , "string" , "`player.arsenal.energy.element`" , "string" ), ( "player.arsenal.power.name" , "string" , "`player.arsenal.power.name`" , "string" ), ( "player.arsenal.power.type" , "string" , "`player.arsenal.power.type`" , "string" ), ( "player.arsenal.power.power" , "int" , "`player.arsenal.power.power`" , "int" ), ( "player.arsenal.power.element" , "string" , "`player.arsenal.power.element`" , "string" ), ( "player.armor.head" , "string" , "`player.armor.head`" , "string" ), ( "player.armor.arms" , "string" , "`player.armor.arms`" , "string" ), ( "player.armor.chest" , "string" , "`player.armor.chest`" , "string" ), ( "player.armor.leg" , "string" , "`player.armor.leg`" , "string" ), ( "player.armor.classitem" , "string" , "`player.armor.classitem`" , "string" ), ( "player.location.map" , "string" , "`player.location.map`" , "string" ), ( "player.location.waypoint" , "string" , "`player.location.waypoint`" , "string" ) ], transformation_ctx = "applymapping1" ) |
Relationalize.apply()でDynamicFrameを生成する
GlueデータカタログのテーブルからDynamicFrameを生成して、Relationalize.apply()で、フラットなJSONのDynamicFrameCollectionに変換します。フラットなJSONのDataFrameに変換するためのルールは、ネストしたJSONの項目をピリオドで区切ったキーと値に従います。
1 2 3 4 5 6 7 | dfc_root_table_name = "root" # default value is "roottable" relationalize1 = Relationalize. apply ( frame = datasource_df, name = dfc_root_table_name, transformation_ctx = "relationalize1" ) result_data = relationalize1.select(dfc_root_table_name) |
ApplyMapping.apply()は、入出力のキーの対応を引数で渡す必要がありますが、Relationalize.apply()はネストしたJSONの項目をピリオドで区切った文字をキーにするので、コードが簡潔になりました。このコードは変換の設定が不要なので、ソースデータとターゲットデータを切り変えるだけで、ネストされたJSONデータをキーバリューのJSONデータに変換する機能を共通化できます。よって、以降ではこのRelationalizeを用いた方法を解説します。
Relationalizeを用いた変換コードの解説
ネストされたJSONからCSVファイルに変換する
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job # Begin variables to customize with your information glue_source_database = "default" glue_source_table = "players" dfc_root_table_name = "root" # default value is "roottable" # End variables to customize with your information args = getResolvedOptions(sys.argv, [ 'JOB_NAME' ]) sc = SparkContext() spark = glueContext.spark_session job = Job(glueContext) job.init(args[ 'JOB_NAME' ], args) # glueContextで、Glueデータカタログのテーブルからデータをロードして、DynamicFrameを生成する datasource0 = glueContext.create_dynamic_frame.from_catalog(database = glue_source_database, table_name = glue_source_table, transformation_ctx = "datasource0" ) # Glueのフレームワークで、Glueデータカタログのテーブルからデータをロードして、DynamicFrameを生成する relationalize1 = Relationalize. apply (frame = datasource0, staging_path = glue_temp_storage, name = dfc_root_table_name, transformation_ctx = "relationalize1" ) result_data = relationalize1.select(dfc_root_table_name) # Write CSV datasink2 = glueContext.write_dynamic_frame.from_options(frame = result_data, connection_type = "s3" , connection_options = { "path" : glue_relationalize_output_s3_path}, format = "csv" , transformation_ctx = "datasink2" ) job.commit() |
ネストされたJSONからParquetファイルに変換する
上記のコードの29〜31行目を下記に置き換えることで、Parquetフォーマットに変換できます。
1 2 3 | # Write Parquet datasink2 = glueContext.write_dynamic_frame.from_options(frame = result_data, connection_type = "s3" , connection_options = { "path" : glue_relationalize_output_s3_path}, format = "parquet" , transformation_ctx = "datasink2" ) |
最後に
Relationalizeを用いて、ネストされたJSONファイルをCSVファイルやParquetに変換する方法をご紹介しました。AWS GlueのRelationalizeはソースデータとターゲットデータを切り変えるだけで、ネストされたJSONデータをキーバリューのJSONデータに簡単に変換できます。JSONファイルに対してより大きなデータを効率よくクエリするには、構造化データに変換することでパフォーマンスやスキャン性能を改善します。さらにスキャンのサイズを小さくすることで、Redshift SpectrumやAthenaにおいては利用費の削減できますので、蓄積データについては変換することをご検討してください。