外部でホストされている Kafka からのメッセージを Cloud Dataflow を使用して処理する

この記事では、Dataflow と KafkaIO を使用してメッセージを処理する場合に考慮すべき重要なネットワーキング関連の問題について説明します。この場合、Kafka は Google Cloud の外部にありますが、Dataflow を使用して Google Cloud の内部でメッセージを処理します。Kafka Apache Beam 変換のリリースにより、Apache Beam と Dataflow の機能を使用して Kafka からのメッセージを処理できるようになりました。次の図に、一般的なシナリオの 1 つを示します。このシナリオでは Dataflow を使用してメッセージを処理しますが、Kafka がホストされる場所はオンプレミスのときもあれば、別のパブリック クラウド、たとえば Amazon Web Services(AWS)のときもあります。

Google Cloud の外部で Kafka メッセージを処理する

Google Cloud 上のリソースと Google Cloud 以外のリソースをリンクするには、さまざまな接続オプションがあります。

Google Cloud では、予測可能なパフォーマンスと信頼性という点で Dedicated Interconnect が最良のオプションですが、サードパーティが新しい回線をプロビジョニングする必要があるため、セットアップに時間がかかることがあります。VPN ベースのトポロジを使用している場合は、ハイスループット VPN のセットアップを検討してください。Dedicated Interconnect と IPsec VPN の両方式は、Virtual Private Cloud(VPC)の RFC 1918 IP アドレスに直接アクセスできるため、Kafka の構成を簡素化できます。パブリック IP ベースのトポロジでは、必要となるネットワーキング作業がほとんどないため、すぐに使い始めることができます。

どちらのトポロジでも、Dataflow インスタンスと同じサブネットワーク内の、別の Compute Engine インスタンスの Kafka クライアントからメッセージを送受信して、接続を検証することをおすすめします。

ストリームを処理するワークロードでは、レイテンシも重要な考慮事項です。Dataflow が利用可能な Google Cloud リージョンをよく確認して、Kafka クラスタの近くにある Google Cloud リージョンを選択します。ネットワーク パフォーマンスを最適化するためのヒントについては、Google Cloud ネットワークのパフォーマンスを向上させるための 5 つのステップをご覧ください。

ここでは、次の図に示すネットワーク トポロジについて説明します。

共有 RFC 1918 アドレス空間

デフォルトでは、Dataflow はデフォルトの VPC ネットワークでインスタンスを起動します。この動作は、外部でホストされている Kafka クラスタにパブリック IP アドレスで到達できる場合には有効です。プライベート ネットワーク トポロジで、Cloud Router で明示的に定義されたルートによって Google Cloud 内のサブネットワークが Kafka クラスタに接続されている場合は、Dataflow インスタンスをどこに配置するかを自分で制御できることが必要になります。次のコードサンプルに示すように、Dataflow を使用して networksubnetwork実行パラメータを構成できます。

mvn compile exec:java \
    -Dexec.mainClass=[YOUR_PIPELINE_JAVA_CLASS] \
    -Dexec.args="--project=[YOUR_GCP_PROJECT]
    --network="[YOUR_DATAFLOW_NETWORK]" \
    --subnetwork="[YOUR_DATAFLOW_SUBNET]" \
    --runner=DataflowRunner"

対応するサブネットワークで、Dataflow がスケールアウトを目的にインスタンスを起動する際に十分な数の IP アドレスを使用できることを確認してください。また、Dataflow インスタンスを起動するために別のネットワークを作成する場合は、プロジェクト内のすべての仮想マシン間の TCP トラフィックを有効にするファイアウォール ルールを設定するようにしてください。このファイアウォール ルールは、デフォルトのネットワークにはすでに構成されています。

プライベート ネットワーク トポロジでは、通常通りに Kafka を構成して、可用性、セキュリティ、耐久性を確保するためのベスト プラクティスに従います。

次の図は、公共のインターネットからアクセスできる、3 つの Kafka ブローカーからなるクラスタを安全にホストするためのアーキテクチャの例を示しています。

パブリック IP アドレス空間

トラフィックは公共のインターネットで送受信されるため、ネットワークやサブネットワークを構成する必要はありません。ただし、プライベート ネットワーク トポロジの場合は、Dataflow ネットワークから対応する Kafka クラスタのパブリック IP アドレスへのルートが存在する限り、ネットワークとサブネットワークを指定できます。

上の図に示すアーキテクチャでは、セキュア ソケット レイヤ(SSL)を使用して外部クライアントと Kafka 間のトラフィックを保護し、ブローカー間の通信に平文を使用します。Kafka リスナーが、内部通信と外部通信の両方に使用されるネットワーク インターフェースにバインドする場合、リスナーを構成するのは簡単です。ただし、AWS にデプロイする場合などの多くのシナリオでは、外部にアドバタイズされる、クラスタ内の Kafka ブローカーのアドレスは、Kafka が使用する内部ネットワーク インターフェースのものとは異なります。このようなシナリオでは、このサンプルの server.properties スニペットに示す advertised.listeners プロパティを使用できます。

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

この構成では、外部クライアントはポート 9093 を使用して SSL チャネル経由で接続し、内部クライアントはポート 9092 を使用して平文チャネル経由で接続します。advertised.listeners でアドレスを指定する際は、外部トラフィックでも内部トラフィックでも同じインスタンスに解決される DNS 名(このサンプルの場合は、kafkabroker-n.mydomain.com)を使用してください。パブリック IP アドレスは内部トラフィックでは解決できない可能性があるため、パブリック IP アドレスを使用すると機能しない場合があります。