サービスの最適化と実行

Dataflow パイプラインを作成し、テストした後で、Cloud Dataflow マネージド サービスを使用してそれをデプロイ、実行できます。Dataflow サービスでは、パイプライン コードが Dataflow ジョブになります。

Dataflow サービスは、Google Compute EngineGoogle Cloud Storage などの Google Cloud Platform サービスを完全に管理して Dataflow ジョブを実行し、必要なリソースを自動的に起動および破棄します。Dataflow サービスは、Dataflow Monitoring InterfaceDataflow コマンドライン インターフェースなどのツールを通じてジョブの公開設定を提供します。

パイプライン コードで実行パラメータを設定することで、Dataflow サービスがジョブを実行する方法の一部の側面を制御できます。

Cloud Platform リソースの管理に加えて、Dataflow サービスは分散並列処理の多くの側面を自動的に実行し、最適化します。たとえば、次のようなものです。

  • 並列化と分散。Dataflow は、データを自動的に分割し、並列処理のためにワーカーコードを Compute Engine インスタンスに分散します。
  • 最適化。Dataflow は、パイプライン コードを使用して、パイプラインの PCollection と変換を表す実行グラフを作成し、最も効率的なパフォーマンスとリソース使用率を実現するためにグラフを最適化します。Dataflow は、データ集計など、コストがかかる可能性のある操作も自動的に最適化します。
  • 自動チューニング機能。Dataflow サービスには、自動スケーリングや動的作業再調整など、リソース割り当てとデータ分割のオンザフライ調整を提供する機能がいくつか含まれます。これらの機能は、Dataflow サービスがジョブをできるだけ素早く効率的に実行するのに役立ちます。

パイプラインのライフサイクル: パイプライン コードから Dataflow ジョブまで

Dataflow プログラムを実行すると、Dataflow はすべての変換とその関連処理関数(DoFn など)を含む Pipeline オブジェクトを作成するコードから実行グラフを作成します。このフェーズは、グラフ作成時間と呼ばれます。グラフ作成中に、Dataflow はさまざまなエラーをチェックし、パイプライン グラフに無効な操作が含まれないことを確認します。実行グラフは JSON 形式に変換され、JSON 実行グラフは Dataflow サービス エンドポイントに転送されます。

注: グラフ作成は、パイプラインをローカルで実行したときにも行われますが、グラフは JSON に変換されたりサービスに転送されたりしません。代わりに、グラフは Dataflow プログラムを起動したのと同じマシンでローカルに実行されます。詳しくは、ローカル実行の設定をご覧ください。

Cloud Dataflow サービスは、JSON 実行グラフを検証します。グラフが検証されると、Dataflow サービスでジョブになります。Dataflow Monitoring Interface を使用して、ジョブ、その実行グラフ、ステータス、ログ情報を参照できるようになります。

Java

Dataflow サービスは、Dataflow プログラムを実行したマシンにレスポンスを送信します。このレスポンスは、Dataflow ジョブの jobId を含むオブジェクト DataflowPipelineJob にカプセル化されます。jobId を使用し、Dataflow Monitoring InterfaceDataflow コマンドライン インターフェースを使用してジョブの監視、追跡、トラブルシューティングを行うことができます。詳しくは、DataflowPipelineJob の API リファレンス情報をご覧ください。

Python

Dataflow サービスは、Dataflow プログラムを実行したマシンにレスポンスを送信します。このレスポンスは、Dataflow ジョブの job_id を含むオブジェクト DataflowPipelineResult にカプセル化されます。job_id を使用し、Dataflow Monitoring InterfaceDataflow コマンドライン インターフェースを使用してジョブの監視、追跡、トラブルシューティングを行うことができます。

実行グラフ

Dataflow は、変換と、Pipeline オブジェクトの作成時に使用したデータに基づいて、パイプラインを表すステップのグラフを作成します。これはパイプライン実行グラフです。

Cloud Dataflow SDK に含まれる WordCount サンプル プログラムには、テキストのコレクション内の個々の単語と各単語の発生数の読み取り、抽出、カウント、書式設定、書き込みを行う一連の変換が含まれます。次の図は、WordCount パイプライン内の変換が実行グラフにどのように展開されるかを示しています。

The transforms in the WordCount example program expanded into an execution graph
              of steps to be executed by the Cloud Dataflow service.
図 1: WordCount サンプルの実行グラフ

多くの場合、実行グラフは、パイプラインの作成時に変換を指定した順序とは異なります。これは、Dataflow サービスが管理対象のクラウド リソースに対して実行される前に実行グラフに対してさまざまな最適化と融合を実行するためです。Dataflow サービスは、パイプラインの実行時にデータの依存関係を尊重しますが、間にデータ依存関係がないステップは任意の順序で実行できます。

Dataflow Monitoring Interface でジョブを選択するときに Dataflow がパイプラインに対して生成した最適化されていない実行グラフを参照できます。

リソースの使用率と管理

Dataflow サービスは、Google Cloud Platform のリソースをジョブごとに完全に管理します。これには Google Compute Engine インスタンス(ワーカーまたは VM と呼ばれることがあります)の起動と停止、I/O と一時ファイル ステージングの両方に対するプロジェクトの Google Cloud Storage バケットへのアクセスが含まれます。 ただし、パイプラインが Google Cloud BigQueryGoogle Cloud Pub/Sub などの Cloud Platform データ ストレージ テクノロジーとやりとりする場合は、これらのサービスのリソースと割り当てを管理する必要があります。

Dataflow は、ユーザーが提供する Google Cloud Storage 内の場所をファイルのステージングに使用します。この場所はユーザーの制御下にあり、ジョブがそこから読み取る限り場所の有効性が維持されることを確認する必要があります。SDK の組み込みキャッシュはジョブの開始時間を短縮できるため、複数のジョブ実行に同じステージング場所を再利用できます。

ジョブ

Cloud Platform プロジェクトあたり 25 個までの Dataflow ジョブを同時に実行できます。

Dataflow サービスは現在、10 MB 以下のサイズのジョブ リクエストの処理に限定されています。ジョブ リクエストのサイズはパイプラインの JSON 表現に関連付けられています。パイプラインが大きいほど、リクエストが大きくなります。

パイプラインの JSON リクエストのサイズを見積もるには、次のオプションを指定してパイプラインを実行します。

Java

--dataflowJobFile=< path to output file >

Python

--dataflow_job_file=< path to output file >

このコマンドは、ジョブの JSON 表現をファイルに書き込みます。シリアル化されたファイルのサイズはリクエストのサイズの適切な推定値です。リクエストにはいくつかの追加情報が含まれるため、実際のサイズはわずかに大きくなります。

ワーカー

Dataflow サービスでは、現在最大でジョブあたり 1,000 個の Compute Engine インスタンスが許可されます。 デフォルトのマシンタイプは、バッチジョブの場合は n1-standard-1、ストリームの場合は n1-standard-4 です。したがって、デフォルトのマシンタイプを使用している場合、Dataflow サービスは最大でジョブあたり 4,000 コアを割り当てることができます。

Dataflow では、n1 シリーズのワーカーとカスタム マシンタイプがサポートされます。パイプラインの作成時に適切な実行パラメータを設定することで、パイプラインのマシンタイプを指定できます。

Java

マシンタイプを変更するには、--workerMachineType オプションを設定します。

Python

マシンタイプを変更するには、--worker_machine_type オプションを設定します。

リソース割り当て

Dataflow サービスは、ジョブの開始とワーカー インスタンスの最大数へのスケーリングのために、ジョブの実行に必要な Compute Engine リソース割り当てが Cloud Platform プロジェクトにあることをチェックします。十分なリソース割り当てが使用可能でない場合、ジョブの開始は失敗します。

Dataflow の自動スケーリング機能は、プロジェクトの使用可能 Compute Engine 割り当てによって制限されます。ジョブの開始時に十分な割り当てがあっても、別のジョブがプロジェクトの使用可能割り当ての残りを使用する場合、最初のジョブは実行されますが、完全にはスケーリングできません。

しかし、Dataflow サービスはプロジェクトのリソース割り当てを超えるジョブの割り当ての増加を管理しません。ユーザーは、Google Cloud Platform Console を使用できる追加のリソース割り当てについて必要なリクエストを行う責任があります。

永続ディスク リソース

Dataflow サービスは、ストリーミング ジョブの実行時に、ワーカー インスタンスあたり 15 個の永続ディスクに制限されています。各永続ディスクは、個々の Compute Engine 仮想マシンに対してローカルです。ジョブは永続ディスクより多くのワーカーを持つことができません。ワーカーとディスク間の 1:1 の比率が最小リソース割り当てです。

各永続ディスクのデフォルト サイズは、バッチモードで 250 GBストリーミング モードで 400 GB です。

場所

Dataflow サービスは、Compute Engine リソースをデフォルトでゾーン us-central1-f にデプロイします。この設定は、パイプラインの作成時に --zone オプションを指定することでオーバーライドできます。

並列化と分散

Dataflow サービスは、パイプラインの処理ロジックを自動的に並列化し、ジョブを実行するために割り当てたワーカーに分散します。Dataflow は、プログラミング モデル内の抽象化を使用して、並列処理機能を表します。たとえば、ParDo 変換により、Dataflow は処理コード(DoFn で表現)を、並列に実行する複数のワーカーに自動的に分散します。

ユーザーコードを構成する

DoFn コードは小さな独立したエンティティと考えることができます。異なるマシンで多くのインスタンスが実行され、それぞれお互いに関する知識がない可能性があります。 このため、純粋な関数(隠蔽された状態または外部状態に依存せず、目に見える副作用がなく、確定的な関数)は、DoFn の並列的、分散的な性質に理想的なコードです。

純粋な関数モデルは厳しく固定されているわけではありません。Dataflow サービスが保証しない事項にコードが依存しない限り、状態情報や外部初期化データは DoFn とその他の関数オブジェクトに対して有効である場合があります。ParDo 変換を構成し、DoFn を作成する場合は、次のガイドラインを念頭に置いてください。

  • Dataflow サービスは、入力 PCollection 内のすべての要素が DoFn インスタンスによって必ず 1 回のみ処理されることを保証します。
  • Dataflow サービスは、DoFn が何回呼び出されるかを保証しません。
  • Dataflow サービスは、分散された要素が厳密にどのようにグループ化されるかを保証しません。つまり、どの要素(存在する場合)がまとめて処理されるかを保証しません。
  • Dataflow サービスは、パイプラインで作成される DoFn インスタンスの正確な数を保証しません。
  • Dataflow サービスはフォールト トレラントであり、ワーカーに問題が発生した場合にコードを複数回再試行することがあります。Dataflow サービスはコードのバックアップ コピーを作成する場合があり、手動の副作用が問題になることがあります(たとえば、コードが一意の名前を持たない一時ファイルに依存するか、これを作成する場合など)。
  • Dataflow サービスは、DoFn インスタンスごとに要素の処理をシリアル化します。 コードは厳密にスレッドセーフである必要はありませんが、複数の DoFn インスタンス間で共有される状態はスレッドセーフである必要があります。

ユーザーコードの作成について詳しくは、プログラミング モデル ドキュメントのユーザー指定関数オブジェクトの要件をご覧ください。

エラーと例外の処理

データの処理中にパイプラインによって例外がスローされることがあります。これらのエラーのいくつかは一過性ですが(たとえば、一時的に外部サービスにアクセスできないなど)、破損したか解析不能な入力データや計算中の null ポインタを原因とするエラーなど、一部のエラーは永続的です。

エラーが任意のバンドル内の要素についてスローされた場合、Dataflow はそのバンドル内の要素を処理し、バンドル全体を再試行します。バッチモードで実行している場合、失敗した項目を含むバンドルは 4 回再試行されます。単一のバンドルが 4 回失敗した場合はパイプラインが完全に失敗します。ストリーミング モードで実行している場合、失敗した項目を含むバンドルは無期限に再試行され、パイプラインが恒久的に滞るおそれがあります。

融合の最適化

パイプラインの実行グラフの JSON フォームが検証されると、Dataflow サービスは最適化を実行するためにグラフを修正することがあります。このような最適化には、パイプラインの実行グラフ内の複数のステップまたは変換を単一のステップに融合することが含まれます。ステップを融合すると、Dataflow サービスはパイプラインの中間 PCollection をすべて実体化する必要がなくなります。実体化はメモリと処理のオーバーヘッドの点でコストが高くなることがあります。

パイプラインの作成で指定したすべての変換がサービスで実行されますが、それらは異なる順序で実行されることも、パイプラインを最も効率よく実行するために融合された、より大きな変換の一部として実行されることもあります。Dataflow サービスは、実行グラフのステップ間のデータ依存関係を尊重しますが、それ以外ではステップを任意の順序で実行できます。

融合の例

次の図は、Dataflow SDK for Java に含まれる WordCount サンプル プログラムの実行グラフを、効率的な実行のために Dataflow サービスによって最適化および融合する方法を示しています。

The execution graph for the WordCount example program optimized and with steps fused
              by the Cloud Dataflow service.
図 2: WordCount サンプルの最適化された実行グラフ

融合を阻止する

パイプラインで、Dataflow サービスが融合最適化を実行しないようにする必要があるケースがいくつかあります。これらは、Dataflow サービスがパイプラインの操作を融合する最適な方法を間違って推測し、それによって Dataflow サービスがすべての使用可能ワーカーを利用する能力が制限される可能性があるケースです。

たとえば、Dataflow がワーカー使用状況を最適化する能力が融合によって制限される 1 つのケースは、「高ファンアウト」の ParDo です。このような操作では、入力コレクションの要素が比較的少数でも、ParDo は数百または数千倍の要素数の出力を生成し、その後に別の ParDo が続くことがあります。Dataflow サービスがこれらの ParDo 操作を融合すると、中間 PCollection により多くの要素が含まれている場合でも、このステップの並列性は入力コレクション内のアイテム最大数に制限されます。

Dataflow サービスに中間 PCollection の実体化を強制する操作をパイプラインに追加することで、このような融合を防ぐことができます。次の操作の 1 つを使用することを検討してください。

  • GroupByKey を挿入し、最初の ParDo の後でグループ化解除できます。Dataflow サービスは、集約で ParDo 操作を決して融合しません。
  • 中間 PCollection副入力として別の ParDo に渡すことができます。Dataflow サービスは常に副入力を実体化します。

結合最適化

集約操作は、大規模なデータ処理における重要な概念です。集約は、概念的に非常に異なるデータをまとめて、関連付けに極めて有用にします。 Dataflow プログラミング モデルは、集約操作を GroupByKeyCoGroupByKeyCombine 変換として表します。

Dataflow の集約操作は、データセット全体でデータを結合します。これには、複数のワーカーにまたがる可能性のあるデータも含まれます。多くの場合、このような集約操作中に、インスタンスをまたがるデータを結合する前にデータをできるだけローカルに結合するのが最も効率的です。GroupByKey またはその他の集約変換を適用する場合、Dataflow サービスはメインのグループ化操作の前に部分的な結合を自動的にローカルで実行します

部分結合または複数レベル結合を実行する場合、Dataflow サービスはパイプラインがバッチデータとストリーミング データのどちらを操作するかに基づいて異なる決定を行います。制限付きデータの場合、サービスは効率性を重視し、できるだけローカルの結合を実行します。制限なしデータの場合、サービスは低レイテンシを重視し、部分結合は実行しないことがあります(レイテンシが増加するため)。

自動チューニング機能

Java

Dataflow サービスには、いくつかの自動チューニング機能が含まれます。この機能が有効になっている場合、Dataflow ジョブを実行中に動的にさらに最適化できます。これらの機能には、自動スケーリング動的作業再調整が含まれます。

自動スケーリング

自動スケーリングを有効にすると、Dataflow サービスは、ジョブの実行に必要な適切な数のワーカー インスタンスを自動的に選択します。また、Dataflow サービスは、実行時にジョブの特性を考慮して、より多くのワーカーまたは少数のワーカーを動的に再割り当てします。 パイプラインの特定の部分は他よりも計算負荷が高い場合があり、Dataflow サービスはジョブのこれらのフェーズ中に追加のワーカーを自動的に起動できます(また、不要になったときにそれらをシャットダウンします)。

自動スケーリングは、Dataflow SDK for Java バージョン 1.6.0 以降を使用して作成されたバッチ Dataflow ジョブでは既定で有効化されます。 パイプラインの実行時にオプション --autoscalingAlgorithm=NONE指定することで自動スケーリングを明示的に無効化できます。その場合、Dataflow サービスは --numWorkers オプションに基づいてワーカー数を設定します。これはデフォルトで 3 になります。

Dataflow ジョブが以前のバージョンの SDK を使用する場合は、パイプラインの実行時にオプション --autoscalingAlgorithm=THROUGHPUT_BASED指定することで自動スケーリングを有効化できます。

バッチ自動スケーリング

バッチモードの制限付きデータでは、Dataflow はパイプラインの各ステージのワーカー数とそのステージの現在のスループットに基づいてワーカー数を自動的に選択します。

ユーザーが実装したカスタム データソースがパイプラインで使用される場合、より多くの情報を Dataflow サービスの自動スケーリング アルゴリズムに提供し、パフォーマンスを向上させる可能性のあるいくつかのメソッドを実装できます。

  • BoundedSource サブクラスで、メソッド getEstimatedSizeBytes を実装します。Dataflow サービスは、パイプラインで使用するワーカーの初期数を計算するときに getEstimatedSizeBytes を使用します。
  • BoundedReader サブクラスで、メソッド getFractionConsumed を実装します。Dataflow サービスは、getFractionConsumed を使用して読み取りの進行状況を追跡し、読み取り中に使用する適切なワーカー数に収束します。

ストリーミング自動スケーリング

ストリーミングモードのパイプラインの自動スケーリングは、現在早期アクセスであり、参加は招待制です。ストリーミング自動スケーリングの早期アクセス プログラムへの参加に関心をお持ちの場合は、お問い合わせフォームに入力してお知らせください。

ストリーミング パイプラインを手動でスケーリングする

ストリーミング モードでの自動スケーリングが一般提供されるまでは、Dataflow の更新機能を使用して、ストリーミング パイプラインを実行するワーカー数を手動でスケーリングできます。

実行中にストリーミング パイプラインのスケーリングが必要になることがわかっている場合は、パイプラインの起動時に次の実行パラメータを設定してください。

  • --maxNumWorkers は、パイプラインで使用可能にするワーカーの最大数と同じに設定します。
  • --numWorkers は、実行開始時にパイプラインで使用するワーカーの初期数と同じに設定します。

パイプラインが実行されたら、パイプラインを更新し、--numWorkers パラメータを使用して新規ワーカー数を指定します。新規 --numWorkers に設定する値は、N から --maxNumWorkers までである必要があり、N--maxNumWorkers÷15 と等しくなります。

更新により、実行中のジョブは新しいワーカー数を使用して新規ジョブで置換されますが、前のジョブに関連付けられている状態情報はすべて保持されます。

動的作業再調整

Dataflow サービスの動的作業再調整機能では、サービスが実行時の条件に基づいて作業を動的に再分割できます。これらの条件は、次のものが含まれることがあります。

  • 作業割り当ての不均衡
  • 終了に予想より長い時間がかかるワーカー
  • 予想よりも早く終了するワーカー

Dataflow サービスはこれらの条件を自動的に検出し、未使用または十分に使用されていないワーカーに作業を動的に再割り当てして、ジョブの全体的な処理時間を短縮します。

制限事項

動的作業再調整は、Dataflow サービスが一部の入力データを並列に処理している場合にのみ行われます。データを外部入力ソースから読み取っている場合、実体化された中間 PCollection を操作している場合、または GroupByKey などの集約の結果を操作している場合です。ジョブの多数のステップが融合される場合、ジョブの中間 PCollection は少なくなり、動的作業再調整はソースの実体化された PCollection 内の要素の数に制限されます。動的作業再調整をパイプラインの特定の PCollection に適用できるようにするには、いくつかの異なる方法で融合を阻止し、動的並列性を確保します。

動的作業再調整は、単一のレコードよりも細かくデータを再並列化できません。処理時間の大幅な遅延を引き起こす個別レコードがデータに含まれる場合、Dataflow は個々の「ホット」レコードを分割して複数のワーカーに再分散できないため、ジョブが遅延する可能性があります。

パイプラインの最終出力に固定のシャード数を(たとえば TextIO.Write.withNumShards を記述することで)設定した場合、Dataflow は選択したシャード数に基づいて並列性を制限し、そのステップ(およびそれと融合されたステップ)の動的作業再調整を無効にします。

固定シャード制限は一時的とみなすことができ、Dataflow サービスの今後のリリースで変更の対象となる可能性があります。

カスタム データソースの操作

自分が提供するカスタム データソースをパイプラインで使用する場合は、メソッド splitAtFraction を実装して、ソースが動的作業再調整機能と連動できるようにする必要があります。

Python

この機能は、Dataflow SDK for Python ではまだサポートされていません。

フィードバックを送信...