Cloud DataflowのテンプレートにPythonの外部パッケージを利用する

Cloud Dataflow + Python で作るテンプレートを登録する際に、pipでインストール可能なPyPiなどの外部パッケージをどうやって組み込むか調べました。

使う設定

結局ドキュメントは見つからなかったのですが、ソースコード読んでいたら以下のオプションを見つけました。

class SetupOptions(PipelineOptions):

  @classmethod
  def _add_argparse_args(cls, parser):
    # Options for installing dependencies in the worker.
    parser.add_argument(
        '--requirements_file',
        default=None,
        help=
        ('Path to a requirements file containing package dependencies. '
         'Typically it is produced by a pip freeze command. More details: '
         'https://pip.pypa.io/en/latest/reference/pip_freeze.html. '
         'If used, all the packages specified will be downloaded, '
         'cached (use --requirements_cache to change default location), '
         'and then staged so that they can be automatically installed in '
         'workers during startup. The cache is refreshed as needed '
         'avoiding extra downloads for existing packages. Typically the '
         'file is named requirements.txt.'))

引用元:https://github.com/apache/beam/blob/v2.16.0/sdks/python/apache_beam/options/pipeline_options.py#L721-L738

設定例

以下のようにPipeline OptionのSetupOptoinsに requirements_fileとして設定します。

options = PipelineOptions()
options.view_as(StandardOptions).runner = 'DataflowRunner'
setup_option = options.view_as(SetupOptions)
setup_option.requirements_file = "./requirements.txt"

# 省略

p = beam.Pipeline(options=options)

テンプレート作成

テンプレートを作成してみると、staging_locationに指定したバケットに requirements.txt がアップロードされます。

f:id:yomon8:20191216224005p:plain

テンプレート作成が完了してから見てみると、requirements.txtに記載したパッケージがアップロードされています。後はテンプレート実行すれば、パッケージを利用した処理が可能です。

f:id:yomon8:20191216224240p:plain

関連

Cloud ComposerからDataflowTemplateOperatorでwordcountジョブを実行する - YOMON8.NET