AWS GlueでネストされたJSONファイルをCSVファイルやParquetに変換する

クラウドでコスト削減はこちら

はじめに

AWS GlueのRelationalizeというTransformを利用して、ネストされたJSONをCSVファイルやParquetに変換する方法をご紹介します。CSV形式に変換することでリレーショナルデータベースに簡単にインポートできます。また、Parquetフォーマットに変換することでAthena、Redshift Spectrum、EMRからより高速にクエリできるようになります。

Relationalizeとは

Relationalizeは、AWS Glueが提供するRelationalizeというTransformで、DynamicFrameをリレーショナル(行と列)形式に変換します。データのスキーマに基づいて、ネストした構造化データをフラットな構造化データに変換してDynamicFrameを作成します。出力は、複数の表にデータを書き込むことができる、DynamicFrameのコレクションです。

サンプル1:ネストされたJSON

変換したいネストしたJSONのデータは以下のとおりです。なお、AWS GlueのSparkは、JSONをminify形式、prettify形式の何れの形式でも変換可能です。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
{
    "player": {
        "username": "user1",
        "characteristics": {
            "race": "Human",
            "class": "Warlock",
            "subclass": "Dawnblade",
            "power": 300,
            "playercountry": "USA"
        },
        "arsenal": {
            "kinetic": {
                "name": "Sweet Business",
                "type": "Auto Rifle",
                "power": 300,
                "element": "Kinetic"
            },
            "energy": {
                "name": "MIDA Mini-Tool",
                "type": "Submachine Gun",
                "power": 300,
                "element": "Solar"
            },
            "power": {
                "name": "Play of the Game",
                "type": "Grenade Launcher",
                "power": 300,
                "element": "Arc"
            }
        },
        "armor": {
            "head": "Eye of Another World",
            "arms": "Philomath Gloves",
            "chest": "Philomath Robes",
            "leg": "Philomath Boots",
            "classitem": "Philomath Bond"
        },
        "location": {
            "map": "Titan",
            "waypoint": "The Rig"
        }
    }
}

サンプル2:フラット化されたJSON

以下のような、フラットなJSONに変換します。最終的には、この形式をCSVやParquetにファイル出力します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{
    "player.username": "user1",
    "player.characteristics.race": "Human",
    "player.characteristics.class": "Warlock",
    "player.characteristics.subclass": "Dawnblade",
    "player.characteristics.power": 300,
    "player.characteristics.playercountry": "USA",
    "player.arsenal.kinetic.name": "Sweet Business",
    "player.arsenal.kinetic.type": "Auto Rifle",
    "player.arsenal.kinetic.power": 300,
    "player.arsenal.kinetic.element": "Kinetic",
    "player.arsenal.energy.name": "MIDA Mini-Tool",
    "player.arsenal.energy.type": "Submachine Gun",
    "player.arsenal.energy.power": 300,
    "player.arsenal.energy.element": "Solar",
    "player.arsenal.power.name": "Play of the Game",
    "player.arsenal.power.type": "Grenade Launcher",
    "player.arsenal.power.power": 300,
    "player.arsenal.power.element": "Arc",
    "player.armor.head": "Eye of Another World",
    "player.armor.arms": "Philomath Gloves",
    "player.armor.chest": "Philomath Robes",
    "player.armor.leg": "Philomath Boots",
    "player.armor.classitem": "Philomath Bond",
    "player.location.map": "Titan",
    "player.location.waypoint": "The Rig"
}

JSONファイルからDynamicFrameを作成する方法

ファイルから直接DynamicFrameを生成する

ネストしていないキーバリュー形式のJSONファイルは、sparkSession.read.json(<file_path>)で、DataFrameに変換できます。しかし、ネストしたJSONファイルは、この方式ではネストしたJSON形式を認識できません。

1
2
_datasource0 = spark.read.json('s3://mybucket/players/players.json')
datasource0 = DynamicFrame.fromDF(_datasource0, glueContext, 'datasource0')

GlueデータカタログのテーブルからDynamicFrameを生成する

AWS GlueのコンソールからデータソースにGlueデータカタログのテーブル、データターゲットにS3(JSON)を指定すると、ApplyMapping.apply()にカラムの対応付けしたmappings引数を指定したコードが自動生成されます。

ApplyMapping.apply()でDynamicFrameを生成する

GlueデータカタログのテーブルからDynamicFrameを生成して、ApplyMapping.apply()で、フラットなJSONのDynamicFrameに変換します。フラットなJSONのDynamicFrameに変換するためのルールは、AWS Glueのコンソールのmappings引数で指定したキーと値に従います。mappings引数は、Glueが自動生成していますが、手で書くとなると辛い作業です。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
applymapping1 = ApplyMapping.apply(
    frame = datasource_df,
    mappings = [
    ("player.username", "string", "`player.username`", "string"),
    ("player.characteristics.race", "string", "`player.characteristics.race`", "string"),
    ("player.characteristics.class", "string", "`player.characteristics.class`", "string"),
    ("player.characteristics.subclass", "string", "`player.characteristics.subclass`", "string"),
    ("player.characteristics.power", "int", "`player.characteristics.power`", "int"),
    ("player.characteristics.playercountry", "string", "`player.characteristics.playercountry`", "string"),
    ("player.arsenal.kinetic.name", "string", "`player.arsenal.kinetic.name`", "string"),
    ("player.arsenal.kinetic.type", "string", "`player.arsenal.kinetic.type`", "string"),
    ("player.arsenal.kinetic.power", "int", "`player.arsenal.kinetic.power`", "int"),
    ("player.arsenal.kinetic.element", "string", "`player.arsenal.kinetic.element`", "string"),
    ("player.arsenal.energy.name", "string", "`player.arsenal.energy.name`", "string"),
    ("player.arsenal.energy.type", "string", "`player.arsenal.energy.type`", "string"),
    ("player.arsenal.energy.power", "int", "`player.arsenal.energy.power`", "int"),
    ("player.arsenal.energy.element", "string", "`player.arsenal.energy.element`", "string"),
    ("player.arsenal.power.name", "string", "`player.arsenal.power.name`", "string"),
    ("player.arsenal.power.type", "string", "`player.arsenal.power.type`", "string"),
    ("player.arsenal.power.power", "int", "`player.arsenal.power.power`", "int"),
    ("player.arsenal.power.element", "string", "`player.arsenal.power.element`", "string"),
    ("player.armor.head", "string", "`player.armor.head`", "string"),
    ("player.armor.arms", "string", "`player.armor.arms`", "string"),
    ("player.armor.chest", "string", "`player.armor.chest`", "string"),
    ("player.armor.leg", "string", "`player.armor.leg`", "string"),
    ("player.armor.classitem", "string", "`player.armor.classitem`", "string"),
    ("player.location.map", "string", "`player.location.map`", "string"),
    ("player.location.waypoint", "string", "`player.location.waypoint`", "string")
    ],
    transformation_ctx = "applymapping1")

Relationalize.apply()でDynamicFrameを生成する

GlueデータカタログのテーブルからDynamicFrameを生成して、Relationalize.apply()で、フラットなJSONのDynamicFrameCollectionに変換します。フラットなJSONのDataFrameに変換するためのルールは、ネストしたJSONの項目をピリオドで区切ったキーと値に従います。

1
2
3
4
5
6
7
dfc_root_table_name = "root" # default value is "roottable"
relationalize1 = Relationalize.apply(
    frame = datasource_df,
    staging_path = "s3://mybucket/glue/tmp",
    name = dfc_root_table_name,
    transformation_ctx = "relationalize1")
    result_data = relationalize1.select(dfc_root_table_name)

ApplyMapping.apply()は、入出力のキーの対応を引数で渡す必要がありますが、Relationalize.apply()はネストしたJSONの項目をピリオドで区切った文字をキーにするので、コードが簡潔になりました。このコードは変換の設定が不要なので、ソースデータとターゲットデータを切り変えるだけで、ネストされたJSONデータをキーバリューのJSONデータに変換する機能を共通化できます。よって、以降ではこのRelationalizeを用いた方法を解説します。

Relationalizeを用いた変換コードの解説

ネストされたJSONからCSVファイルに変換する

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
 
# Begin variables to customize with your information
glue_source_database = "default"
glue_source_table = "players"
glue_temp_storage = "s3://mybucket/glue/tmp"
dfc_root_table_name = "root" # default value is "roottable"
# End variables to customize with your information
 
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
 
sc = SparkContext()
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
 
# glueContextで、Glueデータカタログのテーブルからデータをロードして、DynamicFrameを生成する
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = glue_source_database, table_name = glue_source_table, transformation_ctx = "datasource0")
 
# Glueのフレームワークで、Glueデータカタログのテーブルからデータをロードして、DynamicFrameを生成する
relationalize1 = Relationalize.apply(frame = datasource0, staging_path = glue_temp_storage, name = dfc_root_table_name, transformation_ctx = "relationalize1")
result_data = relationalize1.select(dfc_root_table_name)
 
# Write CSV
glue_relationalize_output_s3_path = "s3://mybucket/players_csv"
datasink2 = glueContext.write_dynamic_frame.from_options(frame = result_data, connection_type = "s3", connection_options = {"path": glue_relationalize_output_s3_path}, format = "csv", transformation_ctx = "datasink2")
 
job.commit()

ネストされたJSONからParquetファイルに変換する

上記のコードの29〜31行目を下記に置き換えることで、Parquetフォーマットに変換できます。

1
2
3
# Write Parquet
glue_relationalize_output_s3_path = "s3://mybucket/players_parquet"
datasink2 = glueContext.write_dynamic_frame.from_options(frame = result_data, connection_type = "s3", connection_options = {"path": glue_relationalize_output_s3_path}, format = "parquet", transformation_ctx = "datasink2")

最後に

Relationalizeを用いて、ネストされたJSONファイルをCSVファイルやParquetに変換する方法をご紹介しました。AWS GlueのRelationalizeはソースデータとターゲットデータを切り変えるだけで、ネストされたJSONデータをキーバリューのJSONデータに簡単に変換できます。JSONファイルに対してより大きなデータを効率よくクエリするには、構造化データに変換することでパフォーマンスやスキャン性能を改善します。さらにスキャンのサイズを小さくすることで、Redshift SpectrumやAthenaにおいては利用費の削減できますので、蓄積データについては変換することをご検討してください。

参考

Invent2017japan portal