Google BigQueryからAmazon Redshiftにデータを移行してみる
こんにちは、データサイエンスチームのmotchieです。
クラウド上のデータウェアハウス(DWH)といえば、Google BigQueryやAmazon Redshiftなどが有名です。
DWH間でデータ移行の方法については、
RedshiftからBigQueryに移行する場合、BigQuery Data Transfer Service APIを使う方法等がありますが、BigQueryからRedshiftに移行する場合については、あまり情報が見当たらないように思います。
そこで本記事では、BigQueryからRedshiftへデータを移行する方法を一つご紹介したいと思います。
リソースの全体像
流れ
以下の流れでリソースを作成していきたいと思います。
- CloudFormationでRedshiftクラスターを作成する
- GCS経由でBigQueryのデータでS3に出力する
- Glueを使ってデータをRedshiftに読み込ませる
- Redshiftに接続してクエリを投げてみる
- リソースの削除
前提条件として、AWS・GCP上でリソースにアクセスできる十分な権限が必要になります。
CloudFormationでRedshiftクラスターを作成する
まず、AWS上にRedshiftクラスターを作成していきます。
以下のAWSブログのCloudFormationテンプレートを活用したいと思います。
Amazon Web Services ブログ:AWS CloudFormation を使用して Amazon Redshift クラスターの作成を自動化する
上記のブログのテンプレートを活用することで、
AWSのベストプラクティスに沿って設計されたRedshiftクラスターを一発で作成することができます。
設計とテンプレートの詳細については、上記のブログをご覧ください。
各テンプレートのパラメーターについては、基本的にデフォルトの値を使用したいと思います。
ただし、今回の用途では、
- GCSからS3へデータをコピーするための認証情報とバケット
- GlueからRedshiftへアクセスするためのネットワーク設定
を追加する必要があるため、上記のテンプレートをカスタマイズして、リソースを追加したいと思います。
テンプレートの取得
以下の3つのCloudFormationテンプレートを取得します。
- aws-vpc-blog.template
- linux-bastion-blog.templat
- redshift-blog.template
1 2 3 | aws s3 cp s3: //aws-bigdata-blog/artifacts/Automate_Redshift_Cluster_Creation_CloudFormation/aws-vpc-blog .template ./ aws s3 cp s3: //aws-bigdata-blog/artifacts/Automate_Redshift_Cluster_Creation_CloudFormation/linux-bastion-blog .template ./ aws s3 cp s3: //aws-bigdata-blog/artifacts/Automate_Redshift_Cluster_Creation_CloudFormation/redshift-blog .template ./ |
テンプレートの修正
まず、linux-bastion-blog.template
(Bastionスタック用テンプレート)を修正していきます。
なお、修正後のテンプレートの内容は以下で確認できます。
linux-bastion-blog.template
500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 | - Name: AutoScalingGroupName Value: !Ref BastionAutoScalingGroup BigQueryDataS3Bucket: Type: 'AWS::S3::Bucket' DeletionPolicy: Delete Properties: AccessControl: Private BucketName: !Join [ "-", [ !Ref 'AWS::StackName', "bigquery-data-bucket", !Ref 'AWS::AccountId' ] ] S3UserForBoto: Type: AWS::IAM::User Properties: Policies: - PolicyDocument: Version: 2012-10-17 Statement: - Action: - 's3:PutObject' Effect: Allow Resource: !Join - "/" - - !GetAtt BigQueryDataS3Bucket.Arn - "*" PolicyName: s3-put-policy-from-gcs-for-boto UserName: !Join [ "-", [ !Ref 'AWS::StackName', "s3-user-for-boto", !Ref 'AWS::Region' ] ] AccessKeyForBoto: Type: AWS::IAM::AccessKey Properties: UserName: !Ref S3UserForBoto SecretsForBoto: Type: AWS::SecretsManager::Secret Properties: Name: !Sub "secrets-for-boto-user-${AWS::StackName}" SecretString: !Sub '{"aws_access_key_id": "${AccessKeyForBoto}", "aws_secret_access_key": "${AccessKeyForBoto.SecretAccessKey}"}' Outputs: |
上記の修正によって、以下のリソースが追加されます。
- データ格納用S3バケット
- gsutilコマンド用のIAMユーザー・認証情報
- 認証情報格納用のSecretsManager
また、Redshiftスタックでデータ格納用S3バケット名が必要なため、以下の修正によって、値を出力するようにします。
581 582 583 584 585 586 587 588 589 590 591 | BastionSecurityGroupID: Description: Use this Security Group to reference incoming traffic from the SSH bastion host/instance Value: !Ref BastionSecurityGroup Export: Name: !Sub '${AWS::StackName}-BastionSecurityGroupID' BigQueryDataS3BucketName: Description: Bucket name will be imported in redshift cluter stack Value: !Ref BigQueryDataS3Bucket Export: Name: !Sub '${AWS::StackName}-BigQueryDataS3BucketName' |
次に、redshift-blog.template
(Redshiftスタック用テンプレート) を修正していきます。
なお、修正後のテンプレートの内容は以下で確認できます。
redshift-blog.template
417 418 419 420 421 422 423 424 425 426 427 428 | - Key: Compliance Value: !Ref TagCompliance RedshiftSecurityGroupIngress: Type: 'AWS::EC2::SecurityGroupIngress' Properties: IpProtocol: "-1" SourceSecurityGroupId: !Ref RedshiftSecurityGroup GroupId: !Ref RedshiftSecurityGroup RedshiftLoggingS3Bucket: |
GlueからVPC内のRedshiftにアクセスする際、ENIを利用します。
AWS ドキュメント:VPC の JDBC データストアに接続する
Redshift・Glueが利用するセキュリティグループに対して、すべてのTCP ポートに対する自己参照のインバウンドルールを追加します。
テンプレートの修正は以上です。
それでは、各スタックをデプロイしていきます。
スタックの作成
VPCスタック、Bastionスタック、Redshiftスタックの順で作成します。
まずパラメータをまとめて作成しておきます。
スタック名などは同じで問題ないですが、EMAIL_NOTIFICATION_LIST
と KEY_PAIR_NAME
(事前にEC2キーペアの作成が必要です)は自身の値に変更してください。
1 2 3 4 5 6 7 8 9 10 11 12 | # VPC stack parameters VPC_STACK_NAME=techblog-vpc-stack # Redshift stack parameters REDSHIFT_STACK_NAME=techblog-redshift-stack MASTER_USER_PASSWORD= "TestPass2019" GLUE_CATALOG_DB_NAME= "bq-test-db" #Bastion server stack parameters BASTION_STACK_NAME=techblog-bastion-stack EMAIL_NOTIFICATION_LIST= "YOUR_EMAIL_ADDRESS" KEY_PAIR_NAME= "YOUR_EC2_KEY_PAIR_NAME" # your ipaddress with CIDR REMOTE_ACCESS_CIDR= "$(dig +short myip.opendns.com @resolver1.opendns.com)/32" |
VPCスタックを作成します。完了まで数分かかります。
1 2 3 4 5 6 | aws cloudformation create-stack \ --stack-name $VPC_STACK_NAME \ --template-body file : //aws-vpc-blog .template \ --parameters \ ParameterKey=ClassB,ParameterValue=0 \ --capabilities CAPABILITY_IAM |
マネジメントコンソールでCloudFormationを開き、スタック作成の完了を確認してから、Bastionスタックの作成に移ってください。
Bastionスタックを作成します。以下のコマンドを実行します。
1 2 3 4 5 6 7 8 9 10 | aws cloudformation create-stack\ --stack-name $BASTION_STACK_NAME\ --template-body file : //linux-bastion-blog .template\ --parameters\ ParameterKey=ParentVPCStack,ParameterValue=$VPC_STACK_NAME\ ParameterKey=NotificationList,ParameterValue=$EMAIL_NOTIFICATION_LIST\ ParameterKey=KeyPairName,ParameterValue=$KEY_PAIR_NAME\ ParameterKey=RemoteAccessCIDR,ParameterValue=$REMOTE_ACCESS_CIDR\ ParameterKey=EnableX11Forwarding,ParameterValue= true \ --capabilities CAPABILITY_IAM CAPABILITY_NAMED_IAM |
スタックの作成まで数分かかります。
スタックの出力する以下の値は、以降の手順で必要になるため、マネジメントコンソールで確認・取得しておいてください。
最後に、Redshiftスタックを作成します。以下のコマンドを実行します。
1 2 3 4 5 6 7 8 9 10 11 12 | aws cloudformation create-stack\ --stack-name $REDSHIFT_STACK_NAME\ --template-body file : //redshift-blog .template\ --parameters\ ParameterKey=ParentVPCStack,ParameterValue=$VPC_STACK_NAME\ ParameterKey=ParentSSHBastionStack,ParameterValue=$BASTION_STACK_NAME\ ParameterKey=MasterUserPassword,ParameterValue=$MASTER_USER_PASSWORD\ ParameterKey=NotificationList,ParameterValue=$EMAIL_NOTIFICATION_LIST\ ParameterKey=S3BucketForRedshiftIAMRole,ParameterValue=$S3_BUCKET_NAME\ ParameterKey=GlueCatalogDatabase,ParameterValue=$GLUE_CATALOG_DB_NAME\ ParameterKey=TagOwner,ParameterValue=$EMAIL_NOTIFICATION_LIST\ --capabilities CAPABILITY_IAM CAPABILITY_NAMED_IAM |
同様に、スタックの出力する以下の値は、以降の手順で必要になるため、マネジメントコンソールで確認・取得しておいてください。
以上でAWSのリソースが作成できました。次はGCPに移ります。
GCS経由でBigQueryのデータでS3に出力する
一般公開データセット
BigQueryでは、一般公開データセットと呼ばれる様々なデータがホストされており、
ユーザーは自身のデータと組み合わせて分析を行うことができます。
今回は、BigQueryの一般公開データセットの中から、Google Analytics Sampleのデータを使いたいと思います。
以下のようなクエリが実行できます。後ほど、Redshiftでも同様のクエリを投げてみたいと思います。
GCSバケットの作成
GCPのコンソールを開き、Cloud Shellを起動します。以下のコマンドはCloud Shellから実行します。
GCSのバケットを作成します。バケット名は適宜変更して実行してください。
1 2 | GCS_BUCKET_NAME= "techblog-bq-to-rs-test-1" gsutil mb -l us-east1 gs: // $GCS_BUCKET_NAME/ |
BigQueryからGCSにエクスポート
データをBigQueryからGCSに出力します。今回は bq extract
CLIコマンドを使用します。
なお、出力データが1GBを超える場合は、ワイルドカード *
を使用して、データを分割する必要があります。
Google Cloud ドキュメント:テーブルデータのエクスポート
1 | bq --location=US extract --destination_format AVRO --compression SNAPPY 'bigquery-public-data:google_analytics_sample.ga_sessions_20170801' gs: // $GCS_BUCKET_NAME /file *.avro |
GCSからS3にデータをコピー
gsutilツールを利用して、GCSからS3にデータをコピーします。
Google Cloud ドキュメント:Cloud Storage の相互運用性
AWSマネジメントコンソールでSecret Managersを開き、Bastionスタックで作成した認証情報を取得します。
IAMユーザーの認証情報を ~/.boto
に書き込みます。
1 2 3 | [Credentials] aws_access_key_id = XXXXXXXXXXXXXXXXXXXX aws_secret_access_key = XXXXXXXXXXXXXXXXXXXXXXXXXXXX |
GCSからS3にファイルをコピーします。
変数**S3_BUCKET_NAME**
には、Bastionスタック作成時 BigQueryDataS3BucketName
として出力されていたバケット名を入力してください。
1 2 | S3_BUCKET_NAME= "REPLACE_VALUE_WITH_YOUR_S3_BUCKET_NAME" gsutil cp gs: // $GCS_BUCKET_NAME/*.avro s3: // $S3_BUCKET_NAME/ |
以上で、BigQueryのデータをS3に出力できました。
次にAWSに移って、Glueの設定を行っていきます。
Glueを使ってデータをRedshiftに読み込ませる
AWS Glueを使うことで、様々なデータソースと連携するETLジョブが作成できます。
まず、Glueのクローラを使い、S3データからデータカタログを作成します。
データのクロール
Connectionの設定
GlueからRedshiftへの接続を設定します。
ETLジョブの作成
最後にETLジョブを作成します。
なお、RedshiftはArray・Structのデータ型に対応していないため、ETLで型を変換してからRedshiftに読み込む必要があります。
Arrayはstring型で読み込まれているようです。
Arrayの中身をそれぞれ展開し、適切なデータ型を付与して格納したい場合、自身で変換を追加する必要があります。
今回、hits
の列は除外し、customdimmensions
のArrayの中身を展開し、long型の customdimensions.index
とstring型のcustomdimensions.value
としてRedshiftに格納したいと思います。
コード例は以下のようになります。修正箇所をハイライトしています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [TempDir, JOB_NAME] args = getResolvedOptions(sys.argv, [ 'TempDir' , 'JOB_NAME' ]) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args[ 'JOB_NAME' ], args) ## @type: DataSource ## @args: [database = "bq-test-db", table_name = "techblog_bastion_stack_bigquery_data_bucket_xxx", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "bq-test-db" , table_name = "techblog_bastion_stack_bigquery_data_bucket_xxx" , transformation_ctx = "datasource0" ) ## @type: ExplodeArray ## @args: [] ## @return: explodedframe0 ## @inputs: [frame = datasource0] import pyspark.sql.functions as F from awsglue.dynamicframe import DynamicFrame df = datasource0.toDF() df = df.withColumn( "exploded_customdimensions" , F.explode( "customdimensions" )) df = df.drop( "customdimensions" ) df = df.withColumn( "customdimensions.index" , df.exploded_customdimensions.index) df = df.withColumn( "customdimensions.value" , df.exploded_customdimensions.value) explodedframe0 = DynamicFrame.fromDF(df, glueContext, "exploded" ) ## @type: ApplyMapping ## @args: [mappings = [("visitorid", "long", "visitorid", "long"), ("visitnumber", "long", "visitnumber", "long"), ("visitid", "long", "visitid", "long"), ("visitstarttime", "long", "visitstarttime", "long"), ("date", "string", "date", "string"), ("totals.visits", "long", "`totals.visits`", "long"), ("totals.hits", "long", "`totals.hits`", "long"), ("totals.pageviews", "long", "`totals.pageviews`", "long"), ("totals.timeOnSite", "long", "`totals.timeOnSite`", "long"), ("totals.bounces", "long", "`totals.bounces`", "long"), ("totals.transactions", "long", "`totals.transactions`", "long"), ("totals.transactionRevenue", "long", "`totals.transactionRevenue`", "long"), ("totals.newVisits", "long", "`totals.newVisits`", "long"), ("totals.screenviews", "long", "`totals.screenviews`", "long"), ("totals.uniqueScreenviews", "long", "`totals.uniqueScreenviews`", "long"), ("totals.timeOnScreen", "long", "`totals.timeOnScreen`", "long"), ("totals.totalTransactionRevenue", "long", "`totals.totalTransactionRevenue`", "long"), ("totals.sessionQualityDim", "long", "`totals.sessionQualityDim`", "long"), ("trafficsource.referralPath", "string", "`trafficsource.referralPath`", "string"), ("trafficsource.campaign", "string", "`trafficsource.campaign`", "string"), ("trafficsource.source", "string", "`trafficsource.source`", "string"), ("trafficsource.medium", "string", "`trafficsource.medium`", "string"), ("trafficsource.keyword", "string", "`trafficsource.keyword`", "string"), ("trafficsource.adContent", "string", "`trafficsource.adContent`", "string"), ("trafficsource.adwordsClickInfo.campaignId", "long", "`trafficsource.adwordsClickInfo.campaignId`", "long"), ("trafficsource.adwordsClickInfo.adGroupId", "long", "`trafficsource.adwordsClickInfo.adGroupId`", "long"), ("trafficsource.adwordsClickInfo.creativeId", "long", "`trafficsource.adwordsClickInfo.creativeId`", "long"), ("trafficsource.adwordsClickInfo.criteriaId", "long", "`trafficsource.adwordsClickInfo.criteriaId`", "long"), ("trafficsource.adwordsClickInfo.page", "long", "`trafficsource.adwordsClickInfo.page`", "long"), ("trafficsource.adwordsClickInfo.slot", "string", "`trafficsource.adwordsClickInfo.slot`", "string"), ("trafficsource.adwordsClickInfo.criteriaParameters", "string", "`trafficsource.adwordsClickInfo.criteriaParameters`", "string"), ("trafficsource.adwordsClickInfo.gclId", "string", "`trafficsource.adwordsClickInfo.gclId`", "string"), ("trafficsource.adwordsClickInfo.customerId", "long", "`trafficsource.adwordsClickInfo.customerId`", "long"), ("trafficsource.adwordsClickInfo.adNetworkType", "string", "`trafficsource.adwordsClickInfo.adNetworkType`", "string"), ("trafficsource.adwordsClickInfo.targetingCriteria.boomUserlistId", "long", "`trafficsource.adwordsClickInfo.targetingCriteria.boomUserlistId`", "long"), ("trafficsource.adwordsClickInfo.isVideoAd", "boolean", "`trafficsource.adwordsClickInfo.isVideoAd`", "boolean"), ("trafficsource.isTrueDirect", "boolean", "`trafficsource.isTrueDirect`", "boolean"), ("trafficsource.campaignCode", "string", "`trafficsource.campaignCode`", "string"), ("device.browser", "string", "`device.browser`", "string"), ("device.browserVersion", "string", "`device.browserVersion`", "string"), ("device.browserSize", "string", "`device.browserSize`", "string"), ("device.operatingSystem", "string", "`device.operatingSystem`", "string"), ("device.operatingSystemVersion", "string", "`device.operatingSystemVersion`", "string"), ("device.isMobile", "boolean", "`device.isMobile`", "boolean"), ("device.mobileDeviceBranding", "string", "`device.mobileDeviceBranding`", "string"), ("device.mobileDeviceModel", "string", "`device.mobileDeviceModel`", "string"), ("device.mobileInputSelector", "string", "`device.mobileInputSelector`", "string"), ("device.mobileDeviceInfo", "string", "`device.mobileDeviceInfo`", "string"), ("device.mobileDeviceMarketingName", "string", "`device.mobileDeviceMarketingName`", "string"), ("device.flashVersion", "string", "`device.flashVersion`", "string"), ("device.javaEnabled", "boolean", "`device.javaEnabled`", "boolean"), ("device.language", "string", "`device.language`", "string"), ("device.screenColors", "string", "`device.screenColors`", "string"), ("device.screenResolution", "string", "`device.screenResolution`", "string"), ("device.deviceCategory", "string", "`device.deviceCategory`", "string"), ("geonetwork.continent", "string", "`geonetwork.continent`", "string"), ("geonetwork.subContinent", "string", "`geonetwork.subContinent`", "string"), ("geonetwork.country", "string", "`geonetwork.country`", "string"), ("geonetwork.region", "string", "`geonetwork.region`", "string"), ("geonetwork.metro", "string", "`geonetwork.metro`", "string"), ("geonetwork.city", "string", "`geonetwork.city`", "string"), ("geonetwork.cityId", "string", "`geonetwork.cityId`", "string"), ("geonetwork.networkDomain", "string", "`geonetwork.networkDomain`", "string"), ("geonetwork.latitude", "string", "`geonetwork.latitude`", "string"), ("geonetwork.longitude", "string", "`geonetwork.longitude`", "string"), ("geonetwork.networkLocation", "string", "`geonetwork.networkLocation`", "string"), ("fullvisitorid", "string", "fullvisitorid", "string"), ("userid", "string", "userid", "string"), ("clientid", "string", "clientid", "string"), ("channelgrouping", "string", "channelgrouping", "string"), ("socialengagementtype", "string", "socialengagementtype", "string"), ("customdimensions.index", "long", "`customdimensions.index`", "long"), ("customdimensions.value", "string", "`customdimensions.value`", "string")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = explodedframe0] applymapping1 = ApplyMapping. apply (frame = explodedframe0, mappings = [( "visitorid" , "long" , "visitorid" , "long" ), ( "visitnumber" , "long" , "visitnumber" , "long" ), ( "visitid" , "long" , "visitid" , "long" ), ( "visitstarttime" , "long" , "visitstarttime" , "long" ), ( "date" , "string" , "date" , "string" ), ( "totals.visits" , "long" , "`totals.visits`" , "long" ), ( "totals.hits" , "long" , "`totals.hits`" , "long" ), ( "totals.pageviews" , "long" , "`totals.pageviews`" , "long" ), ( "totals.timeOnSite" , "long" , "`totals.timeOnSite`" , "long" ), ( "totals.bounces" , "long" , "`totals.bounces`" , "long" ), ( "totals.transactions" , "long" , "`totals.transactions`" , "long" ), ( "totals.transactionRevenue" , "long" , "`totals.transactionRevenue`" , "long" ), ( "totals.newVisits" , "long" , "`totals.newVisits`" , "long" ), ( "totals.screenviews" , "long" , "`totals.screenviews`" , "long" ), ( "totals.uniqueScreenviews" , "long" , "`totals.uniqueScreenviews`" , "long" ), ( "totals.timeOnScreen" , "long" , "`totals.timeOnScreen`" , "long" ), ( "totals.totalTransactionRevenue" , "long" , "`totals.totalTransactionRevenue`" , "long" ), ( "totals.sessionQualityDim" , "long" , "`totals.sessionQualityDim`" , "long" ), ( "trafficsource.referralPath" , "string" , "`trafficsource.referralPath`" , "string" ), ( "trafficsource.campaign" , "string" , "`trafficsource.campaign`" , "string" ), ( "trafficsource.source" , "string" , "`trafficsource.source`" , "string" ), ( "trafficsource.medium" , "string" , "`trafficsource.medium`" , "string" ), ( "trafficsource.keyword" , "string" , "`trafficsource.keyword`" , "string" ), ( "trafficsource.adContent" , "string" , "`trafficsource.adContent`" , "string" ), ( "trafficsource.adwordsClickInfo.campaignId" , "long" , "`trafficsource.adwordsClickInfo.campaignId`" , "long" ), ( "trafficsource.adwordsClickInfo.adGroupId" , "long" , "`trafficsource.adwordsClickInfo.adGroupId`" , "long" ), ( "trafficsource.adwordsClickInfo.creativeId" , "long" , "`trafficsource.adwordsClickInfo.creativeId`" , "long" ), ( "trafficsource.adwordsClickInfo.criteriaId" , "long" , "`trafficsource.adwordsClickInfo.criteriaId`" , "long" ), ( "trafficsource.adwordsClickInfo.page" , "long" , "`trafficsource.adwordsClickInfo.page`" , "long" ), ( "trafficsource.adwordsClickInfo.slot" , "string" , "`trafficsource.adwordsClickInfo.slot`" , "string" ), ( "trafficsource.adwordsClickInfo.criteriaParameters" , "string" , "`trafficsource.adwordsClickInfo.criteriaParameters`" , "string" ), ( "trafficsource.adwordsClickInfo.gclId" , "string" , "`trafficsource.adwordsClickInfo.gclId`" , "string" ), ( "trafficsource.adwordsClickInfo.customerId" , "long" , "`trafficsource.adwordsClickInfo.customerId`" , "long" ), ( "trafficsource.adwordsClickInfo.adNetworkType" , "string" , "`trafficsource.adwordsClickInfo.adNetworkType`" , "string" ), ( "trafficsource.adwordsClickInfo.targetingCriteria.boomUserlistId" , "long" , "`trafficsource.adwordsClickInfo.targetingCriteria.boomUserlistId`" , "long" ), ( "trafficsource.adwordsClickInfo.isVideoAd" , "boolean" , "`trafficsource.adwordsClickInfo.isVideoAd`" , "boolean" ), ( "trafficsource.isTrueDirect" , "boolean" , "`trafficsource.isTrueDirect`" , "boolean" ), ( "trafficsource.campaignCode" , "string" , "`trafficsource.campaignCode`" , "string" ), ( "device.browser" , "string" , "`device.browser`" , "string" ), ( "device.browserVersion" , "string" , "`device.browserVersion`" , "string" ), ( "device.browserSize" , "string" , "`device.browserSize`" , "string" ), ( "device.operatingSystem" , "string" , "`device.operatingSystem`" , "string" ), ( "device.operatingSystemVersion" , "string" , "`device.operatingSystemVersion`" , "string" ), ( "device.isMobile" , "boolean" , "`device.isMobile`" , "boolean" ), ( "device.mobileDeviceBranding" , "string" , "`device.mobileDeviceBranding`" , "string" ), ( "device.mobileDeviceModel" , "string" , "`device.mobileDeviceModel`" , "string" ), ( "device.mobileInputSelector" , "string" , "`device.mobileInputSelector`" , "string" ), ( "device.mobileDeviceInfo" , "string" , "`device.mobileDeviceInfo`" , "string" ), ( "device.mobileDeviceMarketingName" , "string" , "`device.mobileDeviceMarketingName`" , "string" ), ( "device.flashVersion" , "string" , "`device.flashVersion`" , "string" ), ( "device.javaEnabled" , "boolean" , "`device.javaEnabled`" , "boolean" ), ( "device.language" , "string" , "`device.language`" , "string" ), ( "device.screenColors" , "string" , "`device.screenColors`" , "string" ), ( "device.screenResolution" , "string" , "`device.screenResolution`" , "string" ), ( "device.deviceCategory" , "string" , "`device.deviceCategory`" , "string" ), ( "geonetwork.continent" , "string" , "`geonetwork.continent`" , "string" ), ( "geonetwork.subContinent" , "string" , "`geonetwork.subContinent`" , "string" ), ( "geonetwork.country" , "string" , "`geonetwork.country`" , "string" ), ( "geonetwork.region" , "string" , "`geonetwork.region`" , "string" ), ( "geonetwork.metro" , "string" , "`geonetwork.metro`" , "string" ), ( "geonetwork.city" , "string" , "`geonetwork.city`" , "string" ), ( "geonetwork.cityId" , "string" , "`geonetwork.cityId`" , "string" ), ( "geonetwork.networkDomain" , "string" , "`geonetwork.networkDomain`" , "string" ), ( "geonetwork.latitude" , "string" , "`geonetwork.latitude`" , "string" ), ( "geonetwork.longitude" , "string" , "`geonetwork.longitude`" , "string" ), ( "geonetwork.networkLocation" , "string" , "`geonetwork.networkLocation`" , "string" ), ( "fullvisitorid" , "string" , "fullvisitorid" , "string" ), ( "userid" , "string" , "userid" , "string" ), ( "clientid" , "string" , "clientid" , "string" ), ( "channelgrouping" , "string" , "channelgrouping" , "string" ), ( "socialengagementtype" , "string" , "socialengagementtype" , "string" ),( "customdimensions.index" , "long" , "`customdimensions.index`" , "long" ), ( "customdimensions.value" , "string" , "`customdimensions.value`" , "string" )], transformation_ctx = "applymapping1" ) ## @type: ResolveChoice ## @args: [choice = "make_cols", transformation_ctx = "resolvechoice2"] ## @return: resolvechoice2 ## @inputs: [frame = applymapping1] resolvechoice2 = ResolveChoice. apply (frame = applymapping1, choice = "make_cols" , transformation_ctx = "resolvechoice2" ) ## @type: DropNullFields ## @args: [transformation_ctx = "dropnullfields3"] ## @return: dropnullfields3 ## @inputs: [frame = resolvechoice2] dropnullfields3 = DropNullFields. apply (frame = resolvechoice2, transformation_ctx = "dropnullfields3" ) ## @type: DataSink ## @args: [catalog_connection = "techblog-redshift-connection", connection_options = {"dbtable": "techblog_bastion_stack_bigquery_data_bucket_xxx", "database": "rsdev01"}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"] ## @return: datasink4 ## @inputs: [frame = dropnullfields3] datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "techblog-redshift-connection" , connection_options = { "dbtable" : "techblog_bastion_stack_bigquery_data_bucket_xxx" , "database" : "rsdev01" }, redshift_tmp_dir = args[ "TempDir" ], transformation_ctx = "datasink4" ) job.commit() |
ETLジョブが完了し、データがRedshiftに読み込まれました。
Redshiftに接続してクエリを投げてみる
Redshiftに接続し、クエリを投げてみたいと思います。
Bastionサーバにssh接続のコマンド、Redshiftへの接続コマンドは、それぞれBastionスタック・Redshiftスタックの出力で確認できます。
Bastionサーバにssh接続します。
1 | ssh -i "your-keypair-name.pem" ec2-user@xxx.xxx.xxx.xxx |
Redshiftに接続します。パスワードはスタック作成時に自身で設定したもの(MASTER_USER_PASSWORD
)になります。
1 | psql -h rsdev01-redshiftmain.cg35ndlm2xxz.ap-northeast-1.redshift.amazonaws.com -p 8200 -U rsadmin -d rsdev01 |
Redshiftにテーブルが作成されていることを確認します。
1 2 3 4 5 6 | rsdev01-techblog-redshift-stack.cg35ndlm2xxz.ap-northeast-1.redshift.amazonaws.com:8200 rsadmin@rsdev01= # \d List of relations schema | name | type | owner --------+----------------------------------------------------------+-------+--------- public | techblog_bastion_stack_bigquery_data_bucket_xxxx| table | rsadmin (1 row) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 | rsdev01-techblog-redshift-stack.cg35ndlm2xxz.ap-northeast-1.redshift.amazonaws.com:8200 rsadmin@rsdev01= # \d techblog_bastion_stack_bigquery_data_bucket_xxx Table "public.techblog_bastion_stack_bigquery_data_bucket_xx" Column | Type | Modifiers -----------------------------------------------------------------+--------------------------+----------- visitorid | bigint | visitnumber | bigint | visitid | bigint | visitstarttime | bigint | date | character varying(65535) | totals.visits | bigint | totals.hits | bigint | totals.pageviews | bigint | totals.timeonsite | bigint | totals.bounces | bigint | totals.transactions | bigint | totals.transactionrevenue | bigint | totals.newvisits | bigint | totals.screenviews | bigint | totals.uniquescreenviews | bigint | totals.timeonscreen | bigint | totals.totaltransactionrevenue | bigint | totals.sessionqualitydim | bigint | trafficsource.referralpath | character varying(65535) | trafficsource.campaign | character varying(65535) | trafficsource. source | character varying(65535) | trafficsource.medium | character varying(65535) | trafficsource.keyword | character varying(65535) | trafficsource.adcontent | character varying(65535) | trafficsource.adwordsclickinfo.campaignid | bigint | trafficsource.adwordsclickinfo.adgroupid | bigint | trafficsource.adwordsclickinfo.creativeid | bigint | trafficsource.adwordsclickinfo.criteriaid | bigint | trafficsource.adwordsclickinfo.page | bigint | trafficsource.adwordsclickinfo.slot | character varying(65535) | trafficsource.adwordsclickinfo.criteriaparameters | character varying(65535) | trafficsource.adwordsclickinfo.gclid | character varying(65535) | trafficsource.adwordsclickinfo.customerid | bigint | trafficsource.adwordsclickinfo.adnetworktype | character varying(65535) | trafficsource.adwordsclickinfo.targetingcriteria.boomuserlistid | bigint | trafficsource.adwordsclickinfo.isvideoad | boolean | trafficsource.istruedirect | boolean | trafficsource.campaigncode | character varying(65535) | device.browser | character varying(65535) | device.browserversion | character varying(65535) | device.browsersize | character varying(65535) | device.operatingsystem | character varying(65535) | device.operatingsystemversion | character varying(65535) | device.ismobile | boolean | device.mobiledevicebranding | character varying(65535) | device.mobiledevicemodel | character varying(65535) | device.mobileinputselector | character varying(65535) | device.mobiledeviceinfo | character varying(65535) | device.mobiledevicemarketingname | character varying(65535) | device.flashversion | character varying(65535) | device.javaenabled | boolean | device.language | character varying(65535) | device.screencolors | character varying(65535) | device.screenresolution | character varying(65535) | device.devicecategory | character varying(65535) | geonetwork.continent | character varying(65535) | geonetwork.subcontinent | character varying(65535) | geonetwork.country | character varying(65535) | geonetwork.region | character varying(65535) | geonetwork.metro | character varying(65535) | geonetwork.city | character varying(65535) | geonetwork.cityid | character varying(65535) | geonetwork.networkdomain | character varying(65535) | geonetwork.latitude | character varying(65535) | geonetwork.longitude | character varying(65535) | geonetwork.networklocation | character varying(65535) | fullvisitorid | character varying(65535) | userid | character varying(65535) | clientid | character varying(65535) | channelgrouping | character varying(65535) | socialengagementtype | character varying(65535) | customdimensions.index | bigint | customdimensions.value | character varying(65535) | |
BigQueryと同様のクエリを投げてみます。
1 2 3 4 5 6 7 8 9 | SELECT "device.browser" , SUM ( "totals.transactions" ) AS total_transactions FROM techblog_bastion_stack_bigquery_data_bucket_xxx GROUP BY "device.browser" ORDER BY total_transactions DESC ; |
以下の通り、同じ実行結果が確認できました。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | device.browser | total_transactions --------------------------+-------------------- Opera | Internet Explorer | YaBrowser | Safari (in-app) | Mozilla Compatible Agent | Opera Mini | Edge | UC Browser | Android Browser | Android Webview | Nokia Browser | Chrome | 40 Safari | 3 Firefox | 1 (14 rows) |
以上でBigQueryからRedshiftへの移行手順は終了です。
リソースの削除
削除が必要なリソースは以下の通りです。
- GCSバケット (
GCS_BUCKET_NAME
) - S3バケット (
S3_BUCKET_NAME
)の中身(スタック削除時に中身が空である必要があります) - Redshiftスタック・Bastionスタック・VPCスタック(スタック間に依存関係があるため、この順番で一つずつ削除してください)
不要なコストを避けるため、検証後はAWS/GCPのマネジメントコンソールからリソースの削除をお願いします。
まとめ
この記事では、BigQueryからRedshiftにデータを移行する流れをご紹介しました。
少しでも参考になれば幸いです。
2017年4月、NHNテコラスに新卒入社。データサイエンスチームに所属し、AWSを活用したデータ分析サービスの設計開発を担当。 注力分野は自然言語処理、好きなAWSサービスはEMR、SageMaker。お洒落な音楽が好き。
Recommends
こちらもおすすめ
-
機械学習の受託案件を通じて気づいた5つのこと
2019.3.8
-
AWS初心者におすすめの勉強法と運用のポイント
2019.5.30
-
AWS GlueとAmazon Machine Learningでの予測モデル
2017.12.17
-
Google BigQueryからAmazon Redshiftにデータを移行してみる
2019.11.29
-
推薦システムの基本的な評価指標について整理してみた
2017.6.16
Special Topics
注目記事はこちら
【資料ダウンロード】よく分かる!「AWS請求代行サービス」
2019.9.13
AWSを活用したAIによるキャスト評価システムの構築
2019.7.15