この記事の続きです。
以下にもある通り、今書いている時点ではApache BeamのPython SDKはビルトインでJDBC対応していません。
PythonでJDBCドライバ使いたかったのはDataflowのPython SDK使ってもJDBC接続使いたかったからです。
上記の記事でJDBCをPythonから使えるところは確認できているので、今度はDataflowにテンプレート登録してみます。
Pythonコード準備
requirements.txt
を準備します。
※ 記事書いている時点のJayDeBeApiのPyPi上のバージョンだとJPype1==0.7では動かないが修正される模様
JPype1==0.6.3 JayDeBeApi
最初の参照記事に沿って、 mssql-jdbc-7.4.1.jre8.jar
も準備しておきます。
最初に参照した記事と同じくSQL Serverに接続するスクリプトです。
ポイントは以下のオプションでローカルPC上のJARを指定すると、Dataflowのテンプレートに取り込んでくれることです。
worker_options.dataflow_worker_jar = JDBC_JAR_FILE
from __future__ import absolute_import import argparse from contextlib import contextmanager from past.builtins import unicode import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions, GoogleCloudOptions, StandardOptions, WorkerOptions import jaydebeapi import logging ########################### # DB接続関連 ########################### CLASS_NAME = 'com.microsoft.sqlserver.jdbc.SQLServerDriver' USER_NAME = 'DBユーザ名' PASSWORD = 'DBユーザパスワード' DATABASE = 'データベース名' HOST = 'DBホスト名' # JDBCのSQL Server用の書式です JDBC_URL = 'jdbc:sqlserver://{}:1433;database={};encrypt=true;'.format( HOST, DATABASE) SQL_QUERY = 'SELECT TOP(100) * FROM dbo.TableA' # ローカルPCにあるJDBCドライバのJARファイルを指定 JDBC_JAR_FILE = './mssql-jdbc-7.4.1.jre8.jar' ########################### # Dataflow関連 ########################### JOB_TEMPLATE_NAME = 'azuredatabasejdbc' REGION = 'asia-northeast1' VPC_NAME = 'GCPのVPC名' SUBNET = 'GCPのサブネット名' # ローカルPCに準備しておいたrequirements.txtを指定 REQUIREMENTS_FILE = './requirements.txt' def logging_row(row): logging.info(row) def parse_jdbc_entry(table_data): for r in table_data: yield [c.value if hasattr(c, 'value') else c for c in r] @contextmanager def open_db_connection(class_name, jdbc_url, user, password): conn = jaydebeapi.connect(class_name, jdbc_url, driver_args=[user, password]) try: yield conn finally: conn.close() if __name__ == '__main__': import sys parser = argparse.ArgumentParser() parser.add_argument('--project') parser.add_argument('--region') parser.add_argument('--base-bucket') known_args, _ = parser.parse_known_args(sys.argv) options = PipelineOptions() options.view_as(StandardOptions).runner = 'DataflowRunner' worker_options = options.view_as(WorkerOptions) worker_options.dataflow_worker_jar = JDBC_JAR_FILE worker_options.network = VPC_NAME # サブネットはregions/REGION/subnetworks/SUBNETWORK の形式で指定 worker_options.subnetwork = 'regions/{}/subnetworks/{}'.format( REGION, SUBNET) setup_option = options.view_as(SetupOptions) setup_option.requirements_file = REQUIREMENTS_FILE gcp_options = options.view_as(GoogleCloudOptions) gcp_options.project = known_args.project gcp_options.region = known_args.region gcp_options.template_location = 'gs://{}/dataflow/templates/{}'.format( known_args.base_bucket, JOB_TEMPLATE_NAME) gcp_options.temp_location = 'gs://{}/temp/{}/'.format( known_args.base_bucket, JOB_TEMPLATE_NAME) gcp_options.staging_location = 'gs://{}/dataflow/staging/{}/'.format( known_args.base_bucket, JOB_TEMPLATE_NAME) with open_db_connection(CLASS_NAME, JDBC_URL, USER_NAME, PASSWORD) as conn: cur = conn.cursor() cur.execute(SQL_QUERY) p = beam.Pipeline(options=options) (p | 'Select Table' >> beam.Create(parse_jdbc_entry(cur.fetchall())) | 'Logging Rows' >> beam.Map(logging_row)) result = p.run() result.wait_until_finish()
作業用GCSバケット作成
作業用のGCSバケットを作成します。
$ gsutil mb -l asia-northeast1 gs://my-df-bucket-1220
Dataflowテンプレート登録
Pythonスクリプトに引数を渡していくのですが、JayDeBeApiが内部でJDBCのJarを探せるようにCLASSPATHを設定してから実行します。
$ export CLASSPATH=./mssql-jdbc-7.4.1.jre8.jar $ python azure_database.py --project your-project --region asia-northeast1 --base-bucket my-df-bucket-1220
テンプレートが登録されると、指定したJDBCのJARファイルは、 staging_location
として設定したGCSの場所に dataflow-worker.jar
として纏められて配置されます。
Dataflowテンプレートの実行
テンプレートからジョブを実行で、先ほど登録したテンプレートを実行してみます。
以下の定数で設定してあったSELECT文の実行結果が表示されました。
SQL_QUERY = 'SELECT TOP(100) * FROM dbo.TableA'