Google BigQueryからAmazon Redshiftにデータを移行してみる

AWS

2019.11.29

Topics

こんにちは、データサイエンスチームのmotchieです。

クラウド上のデータウェアハウス(DWH)といえば、Google BigQueryAmazon Redshiftなどが有名です。
DWH間でデータ移行の方法については、
RedshiftからBigQueryに移行する場合、BigQuery Data Transfer Service APIを使う方法等がありますが、BigQueryからRedshiftに移行する場合については、あまり情報が見当たらないように思います。

そこで本記事では、BigQueryからRedshiftへデータを移行する方法を一つご紹介したいと思います。

リソースの全体像

流れ

以下の流れでリソースを作成していきたいと思います。

  • CloudFormationでRedshiftクラスターを作成する
  • GCS経由でBigQueryのデータでS3に出力する
  • Glueを使ってデータをRedshiftに読み込ませる
  • Redshiftに接続してクエリを投げてみる
  • リソースの削除

前提条件として、AWS・GCP上でリソースにアクセスできる十分な権限が必要になります。

CloudFormationでRedshiftクラスターを作成する

まず、AWS上にRedshiftクラスターを作成していきます。
以下のAWSブログのCloudFormationテンプレートを活用したいと思います。

Amazon Web Services ブログ:AWS CloudFormation を使用して Amazon Redshift クラスターの作成を自動化する

上記のブログのテンプレートを活用することで、
AWSのベストプラクティスに沿って設計されたRedshiftクラスターを一発で作成することができます。
設計とテンプレートの詳細については、上記のブログをご覧ください。

各テンプレートのパラメーターについては、基本的にデフォルトの値を使用したいと思います。
ただし、今回の用途では、

  • GCSからS3へデータをコピーするための認証情報とバケット
  • GlueからRedshiftへアクセスするためのネットワーク設定
    を追加する必要があるため、上記のテンプレートをカスタマイズして、リソースを追加したいと思います。

テンプレートの取得

以下の3つのCloudFormationテンプレートを取得します。

  • aws-vpc-blog.template
  • linux-bastion-blog.templat
  • redshift-blog.template
1
2
3
aws s3 cp s3://aws-bigdata-blog/artifacts/Automate_Redshift_Cluster_Creation_CloudFormation/aws-vpc-blog.template ./
aws s3 cp s3://aws-bigdata-blog/artifacts/Automate_Redshift_Cluster_Creation_CloudFormation/linux-bastion-blog.template ./
aws s3 cp s3://aws-bigdata-blog/artifacts/Automate_Redshift_Cluster_Creation_CloudFormation/redshift-blog.template ./

テンプレートの修正

まず、linux-bastion-blog.template (Bastionスタック用テンプレート)を修正していきます。
なお、修正後のテンプレートの内容は以下で確認できます。
linux-bastion-blog.template

linux-bastion-blog.template
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
      - Name: AutoScalingGroupName
        Value: !Ref BastionAutoScalingGroup
 
  BigQueryDataS3Bucket:
    Type: 'AWS::S3::Bucket'
    DeletionPolicy: Delete
    Properties:
      AccessControl: Private
      BucketName: !Join [ "-", [ !Ref 'AWS::StackName', "bigquery-data-bucket", !Ref 'AWS::AccountId' ] ]
 
  S3UserForBoto:
    Type: AWS::IAM::User
    Properties:
      Policies:
        - PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Action:
                  - 's3:PutObject'
                Effect: Allow
                Resource: !Join
                  - "/"
                  - - !GetAtt BigQueryDataS3Bucket.Arn
                    - "*"
          PolicyName: s3-put-policy-from-gcs-for-boto
      UserName: !Join [ "-", [ !Ref 'AWS::StackName', "s3-user-for-boto", !Ref 'AWS::Region' ] ]
 
  AccessKeyForBoto:
    Type: AWS::IAM::AccessKey
    Properties:
      UserName: !Ref S3UserForBoto
 
  SecretsForBoto:
    Type: AWS::SecretsManager::Secret
    Properties:
      Name: !Sub "secrets-for-boto-user-${AWS::StackName}"
      SecretString: !Sub '{"aws_access_key_id": "${AccessKeyForBoto}", "aws_secret_access_key": "${AccessKeyForBoto.SecretAccessKey}"}'
 
Outputs:

上記の修正によって、以下のリソースが追加されます。

  • データ格納用S3バケット
  • gsutilコマンド用のIAMユーザー・認証情報
  • 認証情報格納用のSecretsManager

また、Redshiftスタックでデータ格納用S3バケット名が必要なため、以下の修正によって、値を出力するようにします。

linux-bastion-blog.template
581
582
583
584
585
586
587
588
589
590
591
BastionSecurityGroupID:
  Description: Use this Security Group to reference incoming traffic from the SSH bastion host/instance
  Value: !Ref BastionSecurityGroup
  Export:
    Name: !Sub '${AWS::StackName}-BastionSecurityGroupID'
  
BigQueryDataS3BucketName:
  Description: Bucket name will be imported in redshift cluter stack
  Value: !Ref BigQueryDataS3Bucket
  Export:
    Name: !Sub '${AWS::StackName}-BigQueryDataS3BucketName'

次に、redshift-blog.template (Redshiftスタック用テンプレート) を修正していきます。
なお、修正後のテンプレートの内容は以下で確認できます。
redshift-blog.template

redshift-blog.template
417
418
419
420
421
422
423
424
425
426
427
428
      -
        Key: Compliance
        Value: !Ref TagCompliance
 
RedshiftSecurityGroupIngress:
  Type: 'AWS::EC2::SecurityGroupIngress'
  Properties:
    IpProtocol: "-1"
    SourceSecurityGroupId: !Ref RedshiftSecurityGroup
    GroupId: !Ref RedshiftSecurityGroup
 
RedshiftLoggingS3Bucket:

GlueからVPC内のRedshiftにアクセスする際、ENIを利用します。
AWS ドキュメント:VPC の JDBC データストアに接続する
Redshift・Glueが利用するセキュリティグループに対して、すべてのTCP ポートに対する自己参照のインバウンドルールを追加します。

テンプレートの修正は以上です。
それでは、各スタックをデプロイしていきます。

スタックの作成

VPCスタック、Bastionスタック、Redshiftスタックの順で作成します。

まずパラメータをまとめて作成しておきます。
スタック名などは同じで問題ないですが、EMAIL_NOTIFICATION_LISTKEY_PAIR_NAME (事前にEC2キーペアの作成が必要です)は自身の値に変更してください。

1
2
3
4
5
6
7
8
9
10
11
12
# VPC stack parameters
VPC_STACK_NAME=techblog-vpc-stack
# Redshift stack parameters
REDSHIFT_STACK_NAME=techblog-redshift-stack
MASTER_USER_PASSWORD="TestPass2019"
GLUE_CATALOG_DB_NAME="bq-test-db"
#Bastion server stack parameters
BASTION_STACK_NAME=techblog-bastion-stack
EMAIL_NOTIFICATION_LIST="YOUR_EMAIL_ADDRESS"
KEY_PAIR_NAME="YOUR_EC2_KEY_PAIR_NAME"
# your ipaddress with CIDR
REMOTE_ACCESS_CIDR="$(dig +short myip.opendns.com @resolver1.opendns.com)/32"

VPCスタックを作成します。完了まで数分かかります。

1
2
3
4
5
6
aws cloudformation create-stack \
  --stack-name $VPC_STACK_NAME \
  --template-body file://aws-vpc-blog.template \
  --parameters \
    ParameterKey=ClassB,ParameterValue=0 \
  --capabilities CAPABILITY_IAM

マネジメントコンソールでCloudFormationを開き、スタック作成の完了を確認してから、Bastionスタックの作成に移ってください。
Bastionスタックを作成します。以下のコマンドを実行します。

1
2
3
4
5
6
7
8
9
10
aws cloudformation create-stack\
  --stack-name $BASTION_STACK_NAME\
  --template-body file://linux-bastion-blog.template\
  --parameters\
    ParameterKey=ParentVPCStack,ParameterValue=$VPC_STACK_NAME\
    ParameterKey=NotificationList,ParameterValue=$EMAIL_NOTIFICATION_LIST\
    ParameterKey=KeyPairName,ParameterValue=$KEY_PAIR_NAME\
    ParameterKey=RemoteAccessCIDR,ParameterValue=$REMOTE_ACCESS_CIDR\
    ParameterKey=EnableX11Forwarding,ParameterValue=true\
  --capabilities CAPABILITY_IAM CAPABILITY_NAMED_IAM

スタックの作成まで数分かかります。
スタックの出力する以下の値は、以降の手順で必要になるため、マネジメントコンソールで確認・取得しておいてください。

最後に、Redshiftスタックを作成します。以下のコマンドを実行します。

1
2
3
4
5
6
7
8
9
10
11
12
aws cloudformation create-stack\
  --stack-name $REDSHIFT_STACK_NAME\
  --template-body file://redshift-blog.template\
  --parameters\
    ParameterKey=ParentVPCStack,ParameterValue=$VPC_STACK_NAME\
    ParameterKey=ParentSSHBastionStack,ParameterValue=$BASTION_STACK_NAME\
    ParameterKey=MasterUserPassword,ParameterValue=$MASTER_USER_PASSWORD\
    ParameterKey=NotificationList,ParameterValue=$EMAIL_NOTIFICATION_LIST\
    ParameterKey=S3BucketForRedshiftIAMRole,ParameterValue=$S3_BUCKET_NAME\
    ParameterKey=GlueCatalogDatabase,ParameterValue=$GLUE_CATALOG_DB_NAME\
    ParameterKey=TagOwner,ParameterValue=$EMAIL_NOTIFICATION_LIST\
  --capabilities CAPABILITY_IAM CAPABILITY_NAMED_IAM

同様に、スタックの出力する以下の値は、以降の手順で必要になるため、マネジメントコンソールで確認・取得しておいてください。

以上でAWSのリソースが作成できました。次はGCPに移ります。

GCS経由でBigQueryのデータでS3に出力する

一般公開データセット

BigQueryでは、一般公開データセットと呼ばれる様々なデータがホストされており、
ユーザーは自身のデータと組み合わせて分析を行うことができます。

今回は、BigQueryの一般公開データセットの中から、Google Analytics Sampleのデータを使いたいと思います。

以下のようなクエリが実行できます。後ほど、Redshiftでも同様のクエリを投げてみたいと思います。

GCSバケットの作成

GCPのコンソールを開き、Cloud Shellを起動します。以下のコマンドはCloud Shellから実行します。
GCSのバケットを作成します。バケット名は適宜変更して実行してください。

1
2
GCS_BUCKET_NAME="techblog-bq-to-rs-test-1"
gsutil mb -l us-east1 gs://$GCS_BUCKET_NAME/

BigQueryからGCSにエクスポート

データをBigQueryからGCSに出力します。今回は bq extract CLIコマンドを使用します。
なお、出力データが1GBを超える場合は、ワイルドカード * を使用して、データを分割する必要があります。
Google Cloud ドキュメント:テーブルデータのエクスポート

1
bq --location=US extract --destination_format AVRO --compression SNAPPY 'bigquery-public-data:google_analytics_sample.ga_sessions_20170801' gs://$GCS_BUCKET_NAME/file*.avro

GCSからS3にデータをコピー

gsutilツールを利用して、GCSからS3にデータをコピーします。
Google Cloud ドキュメント:Cloud Storage の相互運用性
AWSマネジメントコンソールでSecret Managersを開き、Bastionスタックで作成した認証情報を取得します。

IAMユーザーの認証情報を ~/.boto に書き込みます。

~/.boto
1
2
3
[Credentials]
aws_access_key_id = XXXXXXXXXXXXXXXXXXXX
aws_secret_access_key = XXXXXXXXXXXXXXXXXXXXXXXXXXXX

GCSからS3にファイルをコピーします。
変数**S3_BUCKET_NAME**には、Bastionスタック作成時 BigQueryDataS3BucketName として出力されていたバケット名を入力してください。

1
2
S3_BUCKET_NAME="REPLACE_VALUE_WITH_YOUR_S3_BUCKET_NAME"
gsutil cp gs://$GCS_BUCKET_NAME/*.avro s3://$S3_BUCKET_NAME/

以上で、BigQueryのデータをS3に出力できました。
次にAWSに移って、Glueの設定を行っていきます。

Glueを使ってデータをRedshiftに読み込ませる

AWS Glueを使うことで、様々なデータソースと連携するETLジョブが作成できます。

まず、Glueのクローラを使い、S3データからデータカタログを作成します。

データのクロール

Connectionの設定

GlueからRedshiftへの接続を設定します。

ETLジョブの作成

最後にETLジョブを作成します。

なお、RedshiftはArray・Structのデータ型に対応していないため、ETLで型を変換してからRedshiftに読み込む必要があります。
Arrayはstring型で読み込まれているようです。
Arrayの中身をそれぞれ展開し、適切なデータ型を付与して格納したい場合、自身で変換を追加する必要があります。

今回、hits の列は除外し、customdimmensions のArrayの中身を展開し、long型の customdimensions.index とstring型のcustomdimensions.value としてRedshiftに格納したいと思います。
コード例は以下のようになります。修正箇所をハイライトしています。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
 
## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
 
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "bq-test-db", table_name = "techblog_bastion_stack_bigquery_data_bucket_xxx", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "bq-test-db", table_name = "techblog_bastion_stack_bigquery_data_bucket_xxx", transformation_ctx = "datasource0")
## @type: ExplodeArray
## @args: []
## @return: explodedframe0
## @inputs: [frame = datasource0]
import pyspark.sql.functions as F
from awsglue.dynamicframe import DynamicFrame
df = datasource0.toDF()
df = df.withColumn("exploded_customdimensions", F.explode("customdimensions"))
df = df.drop("customdimensions")
df = df.withColumn("customdimensions.index", df.exploded_customdimensions.index)
df = df.withColumn("customdimensions.value", df.exploded_customdimensions.value)
explodedframe0 = DynamicFrame.fromDF(df, glueContext, "exploded")
## @type: ApplyMapping
## @args: [mappings = [("visitorid", "long", "visitorid", "long"), ("visitnumber", "long", "visitnumber", "long"), ("visitid", "long", "visitid", "long"), ("visitstarttime", "long", "visitstarttime", "long"), ("date", "string", "date", "string"), ("totals.visits", "long", "`totals.visits`", "long"), ("totals.hits", "long", "`totals.hits`", "long"), ("totals.pageviews", "long", "`totals.pageviews`", "long"), ("totals.timeOnSite", "long", "`totals.timeOnSite`", "long"), ("totals.bounces", "long", "`totals.bounces`", "long"), ("totals.transactions", "long", "`totals.transactions`", "long"), ("totals.transactionRevenue", "long", "`totals.transactionRevenue`", "long"), ("totals.newVisits", "long", "`totals.newVisits`", "long"), ("totals.screenviews", "long", "`totals.screenviews`", "long"), ("totals.uniqueScreenviews", "long", "`totals.uniqueScreenviews`", "long"), ("totals.timeOnScreen", "long", "`totals.timeOnScreen`", "long"), ("totals.totalTransactionRevenue", "long", "`totals.totalTransactionRevenue`", "long"), ("totals.sessionQualityDim", "long", "`totals.sessionQualityDim`", "long"), ("trafficsource.referralPath", "string", "`trafficsource.referralPath`", "string"), ("trafficsource.campaign", "string", "`trafficsource.campaign`", "string"), ("trafficsource.source", "string", "`trafficsource.source`", "string"), ("trafficsource.medium", "string", "`trafficsource.medium`", "string"), ("trafficsource.keyword", "string", "`trafficsource.keyword`", "string"), ("trafficsource.adContent", "string", "`trafficsource.adContent`", "string"), ("trafficsource.adwordsClickInfo.campaignId", "long", "`trafficsource.adwordsClickInfo.campaignId`", "long"), ("trafficsource.adwordsClickInfo.adGroupId", "long", "`trafficsource.adwordsClickInfo.adGroupId`", "long"), ("trafficsource.adwordsClickInfo.creativeId", "long", "`trafficsource.adwordsClickInfo.creativeId`", "long"), ("trafficsource.adwordsClickInfo.criteriaId", "long", "`trafficsource.adwordsClickInfo.criteriaId`", "long"), ("trafficsource.adwordsClickInfo.page", "long", "`trafficsource.adwordsClickInfo.page`", "long"), ("trafficsource.adwordsClickInfo.slot", "string", "`trafficsource.adwordsClickInfo.slot`", "string"), ("trafficsource.adwordsClickInfo.criteriaParameters", "string", "`trafficsource.adwordsClickInfo.criteriaParameters`", "string"), ("trafficsource.adwordsClickInfo.gclId", "string", "`trafficsource.adwordsClickInfo.gclId`", "string"), ("trafficsource.adwordsClickInfo.customerId", "long", "`trafficsource.adwordsClickInfo.customerId`", "long"), ("trafficsource.adwordsClickInfo.adNetworkType", "string", "`trafficsource.adwordsClickInfo.adNetworkType`", "string"), ("trafficsource.adwordsClickInfo.targetingCriteria.boomUserlistId", "long", "`trafficsource.adwordsClickInfo.targetingCriteria.boomUserlistId`", "long"), ("trafficsource.adwordsClickInfo.isVideoAd", "boolean", "`trafficsource.adwordsClickInfo.isVideoAd`", "boolean"), ("trafficsource.isTrueDirect", "boolean", "`trafficsource.isTrueDirect`", "boolean"), ("trafficsource.campaignCode", "string", "`trafficsource.campaignCode`", "string"), ("device.browser", "string", "`device.browser`", "string"), ("device.browserVersion", "string", "`device.browserVersion`", "string"), ("device.browserSize", "string", "`device.browserSize`", "string"), ("device.operatingSystem", "string", "`device.operatingSystem`", "string"), ("device.operatingSystemVersion", "string", "`device.operatingSystemVersion`", "string"), ("device.isMobile", "boolean", "`device.isMobile`", "boolean"), ("device.mobileDeviceBranding", "string", "`device.mobileDeviceBranding`", "string"), ("device.mobileDeviceModel", "string", "`device.mobileDeviceModel`", "string"), ("device.mobileInputSelector", "string", "`device.mobileInputSelector`", "string"), ("device.mobileDeviceInfo", "string", "`device.mobileDeviceInfo`", "string"), ("device.mobileDeviceMarketingName", "string", "`device.mobileDeviceMarketingName`", "string"), ("device.flashVersion", "string", "`device.flashVersion`", "string"), ("device.javaEnabled", "boolean", "`device.javaEnabled`", "boolean"), ("device.language", "string", "`device.language`", "string"), ("device.screenColors", "string", "`device.screenColors`", "string"), ("device.screenResolution", "string", "`device.screenResolution`", "string"), ("device.deviceCategory", "string", "`device.deviceCategory`", "string"), ("geonetwork.continent", "string", "`geonetwork.continent`", "string"), ("geonetwork.subContinent", "string", "`geonetwork.subContinent`", "string"), ("geonetwork.country", "string", "`geonetwork.country`", "string"), ("geonetwork.region", "string", "`geonetwork.region`", "string"), ("geonetwork.metro", "string", "`geonetwork.metro`", "string"), ("geonetwork.city", "string", "`geonetwork.city`", "string"), ("geonetwork.cityId", "string", "`geonetwork.cityId`", "string"), ("geonetwork.networkDomain", "string", "`geonetwork.networkDomain`", "string"), ("geonetwork.latitude", "string", "`geonetwork.latitude`", "string"), ("geonetwork.longitude", "string", "`geonetwork.longitude`", "string"), ("geonetwork.networkLocation", "string", "`geonetwork.networkLocation`", "string"), ("fullvisitorid", "string", "fullvisitorid", "string"), ("userid", "string", "userid", "string"), ("clientid", "string", "clientid", "string"), ("channelgrouping", "string", "channelgrouping", "string"), ("socialengagementtype", "string", "socialengagementtype", "string"), ("customdimensions.index", "long", "`customdimensions.index`", "long"), ("customdimensions.value", "string", "`customdimensions.value`", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = explodedframe0]
applymapping1 = ApplyMapping.apply(frame = explodedframe0, mappings = [("visitorid", "long", "visitorid", "long"), ("visitnumber", "long", "visitnumber", "long"), ("visitid", "long", "visitid", "long"), ("visitstarttime", "long", "visitstarttime", "long"), ("date", "string", "date", "string"), ("totals.visits", "long", "`totals.visits`", "long"), ("totals.hits", "long", "`totals.hits`", "long"), ("totals.pageviews", "long", "`totals.pageviews`", "long"), ("totals.timeOnSite", "long", "`totals.timeOnSite`", "long"), ("totals.bounces", "long", "`totals.bounces`", "long"), ("totals.transactions", "long", "`totals.transactions`", "long"), ("totals.transactionRevenue", "long", "`totals.transactionRevenue`", "long"), ("totals.newVisits", "long", "`totals.newVisits`", "long"), ("totals.screenviews", "long", "`totals.screenviews`", "long"), ("totals.uniqueScreenviews", "long", "`totals.uniqueScreenviews`", "long"), ("totals.timeOnScreen", "long", "`totals.timeOnScreen`", "long"), ("totals.totalTransactionRevenue", "long", "`totals.totalTransactionRevenue`", "long"), ("totals.sessionQualityDim", "long", "`totals.sessionQualityDim`", "long"), ("trafficsource.referralPath", "string", "`trafficsource.referralPath`", "string"), ("trafficsource.campaign", "string", "`trafficsource.campaign`", "string"), ("trafficsource.source", "string", "`trafficsource.source`", "string"), ("trafficsource.medium", "string", "`trafficsource.medium`", "string"), ("trafficsource.keyword", "string", "`trafficsource.keyword`", "string"), ("trafficsource.adContent", "string", "`trafficsource.adContent`", "string"), ("trafficsource.adwordsClickInfo.campaignId", "long", "`trafficsource.adwordsClickInfo.campaignId`", "long"), ("trafficsource.adwordsClickInfo.adGroupId", "long", "`trafficsource.adwordsClickInfo.adGroupId`", "long"), ("trafficsource.adwordsClickInfo.creativeId", "long", "`trafficsource.adwordsClickInfo.creativeId`", "long"), ("trafficsource.adwordsClickInfo.criteriaId", "long", "`trafficsource.adwordsClickInfo.criteriaId`", "long"), ("trafficsource.adwordsClickInfo.page", "long", "`trafficsource.adwordsClickInfo.page`", "long"), ("trafficsource.adwordsClickInfo.slot", "string", "`trafficsource.adwordsClickInfo.slot`", "string"), ("trafficsource.adwordsClickInfo.criteriaParameters", "string", "`trafficsource.adwordsClickInfo.criteriaParameters`", "string"), ("trafficsource.adwordsClickInfo.gclId", "string", "`trafficsource.adwordsClickInfo.gclId`", "string"), ("trafficsource.adwordsClickInfo.customerId", "long", "`trafficsource.adwordsClickInfo.customerId`", "long"), ("trafficsource.adwordsClickInfo.adNetworkType", "string", "`trafficsource.adwordsClickInfo.adNetworkType`", "string"), ("trafficsource.adwordsClickInfo.targetingCriteria.boomUserlistId", "long", "`trafficsource.adwordsClickInfo.targetingCriteria.boomUserlistId`", "long"), ("trafficsource.adwordsClickInfo.isVideoAd", "boolean", "`trafficsource.adwordsClickInfo.isVideoAd`", "boolean"), ("trafficsource.isTrueDirect", "boolean", "`trafficsource.isTrueDirect`", "boolean"), ("trafficsource.campaignCode", "string", "`trafficsource.campaignCode`", "string"), ("device.browser", "string", "`device.browser`", "string"), ("device.browserVersion", "string", "`device.browserVersion`", "string"), ("device.browserSize", "string", "`device.browserSize`", "string"), ("device.operatingSystem", "string", "`device.operatingSystem`", "string"), ("device.operatingSystemVersion", "string", "`device.operatingSystemVersion`", "string"), ("device.isMobile", "boolean", "`device.isMobile`", "boolean"), ("device.mobileDeviceBranding", "string", "`device.mobileDeviceBranding`", "string"), ("device.mobileDeviceModel", "string", "`device.mobileDeviceModel`", "string"), ("device.mobileInputSelector", "string", "`device.mobileInputSelector`", "string"), ("device.mobileDeviceInfo", "string", "`device.mobileDeviceInfo`", "string"), ("device.mobileDeviceMarketingName", "string", "`device.mobileDeviceMarketingName`", "string"), ("device.flashVersion", "string", "`device.flashVersion`", "string"), ("device.javaEnabled", "boolean", "`device.javaEnabled`", "boolean"), ("device.language", "string", "`device.language`", "string"), ("device.screenColors", "string", "`device.screenColors`", "string"), ("device.screenResolution", "string", "`device.screenResolution`", "string"), ("device.deviceCategory", "string", "`device.deviceCategory`", "string"), ("geonetwork.continent", "string", "`geonetwork.continent`", "string"), ("geonetwork.subContinent", "string", "`geonetwork.subContinent`", "string"), ("geonetwork.country", "string", "`geonetwork.country`", "string"), ("geonetwork.region", "string", "`geonetwork.region`", "string"), ("geonetwork.metro", "string", "`geonetwork.metro`", "string"), ("geonetwork.city", "string", "`geonetwork.city`", "string"), ("geonetwork.cityId", "string", "`geonetwork.cityId`", "string"), ("geonetwork.networkDomain", "string", "`geonetwork.networkDomain`", "string"), ("geonetwork.latitude", "string", "`geonetwork.latitude`", "string"), ("geonetwork.longitude", "string", "`geonetwork.longitude`", "string"), ("geonetwork.networkLocation", "string", "`geonetwork.networkLocation`", "string"), ("fullvisitorid", "string", "fullvisitorid", "string"), ("userid", "string", "userid", "string"), ("clientid", "string", "clientid", "string"), ("channelgrouping", "string", "channelgrouping", "string"), ("socialengagementtype", "string", "socialengagementtype", "string"),("customdimensions.index", "long", "`customdimensions.index`", "long"), ("customdimensions.value", "string", "`customdimensions.value`", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [catalog_connection = "techblog-redshift-connection", connection_options = {"dbtable": "techblog_bastion_stack_bigquery_data_bucket_xxx", "database": "rsdev01"}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "techblog-redshift-connection", connection_options = {"dbtable": "techblog_bastion_stack_bigquery_data_bucket_xxx", "database": "rsdev01"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
job.commit()

ETLジョブが完了し、データがRedshiftに読み込まれました。

Redshiftに接続してクエリを投げてみる

Redshiftに接続し、クエリを投げてみたいと思います。
Bastionサーバにssh接続のコマンド、Redshiftへの接続コマンドは、それぞれBastionスタック・Redshiftスタックの出力で確認できます。

Bastionサーバにssh接続します。

1
ssh -i "your-keypair-name.pem" ec2-user@xxx.xxx.xxx.xxx

Redshiftに接続します。パスワードはスタック作成時に自身で設定したもの(MASTER_USER_PASSWORD)になります。

1
psql -h rsdev01-redshiftmain.cg35ndlm2xxz.ap-northeast-1.redshift.amazonaws.com -p 8200 -U rsadmin -d rsdev01

Redshiftにテーブルが作成されていることを確認します。

1
2
3
4
5
6
rsdev01-techblog-redshift-stack.cg35ndlm2xxz.ap-northeast-1.redshift.amazonaws.com:8200 rsadmin@rsdev01=# \d
                                  List of relations
 schema |                           name                           | type  |  owner
--------+----------------------------------------------------------+-------+---------
 public | techblog_bastion_stack_bigquery_data_bucket_xxxx| table | rsadmin
(1 row)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
rsdev01-techblog-redshift-stack.cg35ndlm2xxz.ap-northeast-1.redshift.amazonaws.com:8200 rsadmin@rsdev01=# \d techblog_bastion_stack_bigquery_data_bucket_xxx
                Table "public.techblog_bastion_stack_bigquery_data_bucket_xx"
                             Column                              |           Type           | Modifiers
-----------------------------------------------------------------+--------------------------+-----------
 visitorid                                                       | bigint                   |
 visitnumber                                                     | bigint                   |
 visitid                                                         | bigint                   |
 visitstarttime                                                  | bigint                   |
 date                                                            | character varying(65535) |
 totals.visits                                                   | bigint                   |
 totals.hits                                                     | bigint                   |
 totals.pageviews                                                | bigint                   |
 totals.timeonsite                                               | bigint                   |
 totals.bounces                                                  | bigint                   |
 totals.transactions                                             | bigint                   |
 totals.transactionrevenue                                       | bigint                   |
 totals.newvisits                                                | bigint                   |
 totals.screenviews                                              | bigint                   |
 totals.uniquescreenviews                                        | bigint                   |
 totals.timeonscreen                                             | bigint                   |
 totals.totaltransactionrevenue                                  | bigint                   |
 totals.sessionqualitydim                                        | bigint                   |
 trafficsource.referralpath                                      | character varying(65535) |
 trafficsource.campaign                                          | character varying(65535) |
 trafficsource.source                                            | character varying(65535) |
 trafficsource.medium                                            | character varying(65535) |
 trafficsource.keyword                                           | character varying(65535) |
 trafficsource.adcontent                                         | character varying(65535) |
 trafficsource.adwordsclickinfo.campaignid                       | bigint                   |
 trafficsource.adwordsclickinfo.adgroupid                        | bigint                   |
 trafficsource.adwordsclickinfo.creativeid                       | bigint                   |
 trafficsource.adwordsclickinfo.criteriaid                       | bigint                   |
 trafficsource.adwordsclickinfo.page                             | bigint                   |
 trafficsource.adwordsclickinfo.slot                             | character varying(65535) |
 trafficsource.adwordsclickinfo.criteriaparameters               | character varying(65535) |
 trafficsource.adwordsclickinfo.gclid                            | character varying(65535) |
 trafficsource.adwordsclickinfo.customerid                       | bigint                   |
 trafficsource.adwordsclickinfo.adnetworktype                    | character varying(65535) |
 trafficsource.adwordsclickinfo.targetingcriteria.boomuserlistid | bigint                   |
 trafficsource.adwordsclickinfo.isvideoad                        | boolean                  |
 trafficsource.istruedirect                                      | boolean                  |
 trafficsource.campaigncode                                      | character varying(65535) |
 device.browser                                                  | character varying(65535) |
 device.browserversion                                           | character varying(65535) |
 device.browsersize                                              | character varying(65535) |
 device.operatingsystem                                          | character varying(65535) |
 device.operatingsystemversion                                   | character varying(65535) |
 device.ismobile                                                 | boolean                  |
 device.mobiledevicebranding                                     | character varying(65535) |
 device.mobiledevicemodel                                        | character varying(65535) |
 device.mobileinputselector                                      | character varying(65535) |
 device.mobiledeviceinfo                                         | character varying(65535) |
 device.mobiledevicemarketingname                                | character varying(65535) |
 device.flashversion                                             | character varying(65535) |
 device.javaenabled                                              | boolean                  |
 device.language                                                 | character varying(65535) |
 device.screencolors                                             | character varying(65535) |
 device.screenresolution                                         | character varying(65535) |
 device.devicecategory                                           | character varying(65535) |
 geonetwork.continent                                            | character varying(65535) |
 geonetwork.subcontinent                                         | character varying(65535) |
 geonetwork.country                                              | character varying(65535) |
 geonetwork.region                                               | character varying(65535) |
 geonetwork.metro                                                | character varying(65535) |
 geonetwork.city                                                 | character varying(65535) |
 geonetwork.cityid                                               | character varying(65535) |
 geonetwork.networkdomain                                        | character varying(65535) |
 geonetwork.latitude                                             | character varying(65535) |
 geonetwork.longitude                                            | character varying(65535) |
 geonetwork.networklocation                                      | character varying(65535) |
 fullvisitorid                                                   | character varying(65535) |
 userid                                                          | character varying(65535) |
 clientid                                                        | character varying(65535) |
 channelgrouping                                                 | character varying(65535) |
 socialengagementtype                                            | character varying(65535) |
 customdimensions.index                                          | bigint                   |
 customdimensions.value                                          | character varying(65535) |

BigQueryと同様のクエリを投げてみます。

1
2
3
4
5
6
7
8
9
SELECT
  "device.browser",
  SUM ( "totals.transactions" ) AS total_transactions
FROM
  techblog_bastion_stack_bigquery_data_bucket_xxx
GROUP BY
  "device.browser"
ORDER BY
  total_transactions DESC;

以下の通り、同じ実行結果が確認できました。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
      device.browser      | total_transactions
--------------------------+--------------------
 Opera                    |
 Internet Explorer        |
 YaBrowser                |
 Safari (in-app)          |
 Mozilla Compatible Agent |
 Opera Mini               |
 Edge                     |
 UC Browser               |
 Android Browser          |
 Android Webview          |
 Nokia Browser            |
 Chrome                   |                 40
 Safari                   |                  3
 Firefox                  |                  1
(14 rows)

以上でBigQueryからRedshiftへの移行手順は終了です。

リソースの削除

削除が必要なリソースは以下の通りです。

  • GCSバケット (GCS_BUCKET_NAME)
  • S3バケット (S3_BUCKET_NAME )の中身(スタック削除時に中身が空である必要があります)
  • Redshiftスタック・Bastionスタック・VPCスタック(スタック間に依存関係があるため、この順番で一つずつ削除してください)

不要なコストを避けるため、検証後はAWS/GCPのマネジメントコンソールからリソースの削除をお願いします。

まとめ

この記事では、BigQueryからRedshiftにデータを移行する流れをご紹介しました。
少しでも参考になれば幸いです。

motchie

2017年4月、NHNテコラスに新卒入社。データサイエンスチームに所属し、AWSを活用したデータ分析サービスの設計開発を担当。 注力分野は自然言語処理、好きなAWSサービスはEMR、SageMaker。お洒落な音楽が好き。

Recommends

こちらもおすすめ

Special Topics

注目記事はこちら

 
:)