kafka-docker でローカルに kafka クラスタを構築する - 駄文型 の続き。kafkaex/kafka_ex という Kafka クライアントで Producer を作ってみます。
ライブラリの取得
mix.exs
を編集して mix deps.get
するだけ。
参考: Introduction to Mix - Elixir
defmodule ProducerSampleEx.Mixfile do # ... def application do [ applications: [ :kafka_ex, :snappy ] ] end defp deps do [ {:kafka_ex, "~> 0.6.5"}, {:snappy, git: "https://github.com/fdmanana/snappy-erlang-nif"} ] end end
$ mix deps.get
設定
まず、kafka-docker でローカルに kafka クラスタを構築する - 駄文型 で作った Kafka コンテナのポートを確認。今回は 32776
32778
。
$ docker-compose ps Name Command State Ports ----------------------------------------------------------------------------------------------------------------------------------- 66f23109d78a_kafkadocker_zookeeper_1 /bin/sh -c /usr/sbin/sshd ... Up 0.0.0.0:32777->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp kafkadocker_kafka_1 start-kafka.sh Up 0.0.0.0:32778->9092/tcp kafkadocker_kafka_2 start-kafka.sh Up 0.0.0.0:32776->9092/tcp
次にKafkaEx.Config – kafka_ex を参考に config.exs
を書く。以下は最低限の設定。
config :kafka_ex, brokers: [ {"192.168.145.65", 32776}, # {"hostname", port} {"192.168.145.65", 32778} ], consumer_group: :no_consumer_group, use_ssl: false
iex で確認
iex -S mix
で立ち上げる。設定がおかしいとここでエラーがでる。
$ iex -S mix Erlang/OTP 19 [erts-8.3] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false] [dtrace] 11:53:23.055 [debug] Succesfully connected to broker "192.168.145.65":32776 11:53:23.056 [debug] Succesfully connected to broker "192.168.145.65":32778 11:53:23.075 [debug] Establishing connection to broker 1009: "192.168.145.65" on port 32778 11:53:23.076 [debug] Succesfully connected to broker "192.168.145.65":32778 11:53:23.076 [debug] Establishing connection to broker 1010: "192.168.145.65" on port 32776 11:53:23.077 [debug] Succesfully connected to broker "192.168.145.65":32776 Interactive Elixir (1.4.2) - press Ctrl+C to exit (type h() ENTER for help) iex(1)>
つながった! metadata/1
で確認できる。
iex(1)> KafkaEx.metadata(topic: "topic") %KafkaEx.Protocol.Metadata.Response{brokers: [%KafkaEx.Protocol.Metadata.Broker{host: "192.168.145.65", node_id: 1009, port: 32778, socket: nil}, %KafkaEx.Protocol.Metadata.Broker{host: "192.168.145.65", node_id: 1010, port: 32776, socket: nil}], topic_metadatas: [%KafkaEx.Protocol.Metadata.TopicMetadata{error_code: :no_error, partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :leader_not_available, isrs: [], leader: -1, partition_id: 0, replicas: []}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1010], leader: 1010, partition_id: 3, replicas: [1010]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1010], leader: 1010, partition_id: 1, replicas: [1010]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1009], leader: 1009, partition_id: 2, replicas: [1009]}], topic: "topic"}]}
あとは produce/4
で送るだけ!トピック名、パーティション番号、メッセージを渡す。
iex(2)> KafkaEx.produce("topic", 0, "msg") # opt は省略 :leader_not_available 11:56:41.715 [error] Leader for topic topic is not available
あれ?どうやら、トピック名が topic
だとだめそう。別のトピックを指定すると通る。
iex(3)> KafkaEx.produce("new_topic", 0, "msg") :ok
kafka-console-consumer.sh
で確認
前回同様、 Kafka Shell を起動して
$ start-kafka-shell.sh 192.168.145.65 192.168.145.65:32777 # Kafka Shell を起動
kafka-console-consumer.sh
を叩く。 KafkaEx.produce/4
でもう一度送り、メッセージが表示されればOK!!!
$ kafka-console-consumer.sh --topic new_topic --zookeeper $ZK Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by pas sing [bootstrap-server] instead of [zookeeper]. msg
ハマったところ
config.exs
の設定brokers
を{"192.168.145.65", "32776"}
と設定してしまう凡ミスを犯していた。
- トピック名が
topic
だとだめそう。 mix.exs
の設定- kafka_ex の README.md には
mod: {MyApp, []},
という行があったので入れていた。 - これはモジュールのコールバックの設定を行うためのもの。
- 入れておくと
MyApp.start/2
(今回の場合ProducerSampleEx.start/2
)を実行しようとしてしまう。 - 今回は不要なので削除した。
- kafka_ex の README.md には
- 作者: 伊橋正義,原田勝憲
- 出版社/メーカー: リクルートテクノロジーズ
- 発売日: 2014/04/01
- メディア: Kindle版
- この商品を含むブログ (1件) を見る