kafkaのConsumer Groupを試してみたメモです。
前回までは1つのconsumerで試していたので、
複数のconsumerと、複数のConsumer Groupのパターンを試してみました。
- Consumer Group
- 設定
- Producer
- Consumer
- Consumer × 1
- Consumer × 2
- Consumer × 4
- Consumer × 5
- 2 Group (Consumer × 2, Consumer × 4)
Consumer Group
Consumer Groupについての説明は下記の公式ドキュメントに記載があります。
https://kafka.apache.org/documentation/#intro_consumers
この図は公式に載っている
topic, partition, consumer, consumer groupの関係の図です。
複雑に見えますが、以下の認識です。
- consumerは1つのConsumer Groupに属する。
- Consumer Groupには複数のconsumerを含めることが出来る。
- topicのメッセージはConsumer Group単位で読み込む。
(topicのメッセージは複数のConsumer Groupが読み込むことが出来る) - topicの各partitionのメッセージはConsumer Group内の1つのconsumerだけが読むことが出来る。
(同一のpartitionをConsumer Groupの複数のconsumerが読むことは出来ない) - Consumer Group内のconsumerはtopicの1つ以上のpartitionを読み込むことが出来る。
1つのConsumer Groupに複数のconsumerがある場合と、
複数のConsumer Groupに複数のconsumerがある場合の挙動を確認してみます。
下記バージョンで試してみます。
設定
Brokerの設定を変更し、partition数を4つに設定します。
zookeeperとKafkaを起動します。
server.properties
$ sudo vi /usr/local/kafka/config/server.properties num.partitions=4
Producer
Producerは以前使った、Producerを使用します。
partitionerは特に指定せず(default)、keyをnull、valueを連番でproduceしてみます。
produceしたメッセージのpartition, topic, valueを標準出力します。
ProducerWithDefaultPartition.java
public void syncSend() { // configuration Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); IntStream.range(0, 100) .forEach(i -> { String value = "num" + i; ProducerRecord<String, String> record = new ProducerRecord<>("topicPar", value); try { // sync send Future<RecordMetadata> send = producer.send(record); RecordMetadata recordMetadata = send.get(); System.out.print("partition: " + recordMetadata.partition() + ", "); System.out.print("topic: " + recordMetadata.topic() + ", "); System.out.println("value: " + value); } catch (Exception e) { e.printStackTrace(); } }); }
Consumer
Consumerは1スレッドで複数起動することができないので、
マルチスレッドでconsumeするためのConsumeTaskを作成します。
生成時にconsumer group名、consumer名、topic名を指定します。
consumer group名を指定しない場合は、固定のconsumer group名にします。
読み込んだメッセージのconsumer group、consumer, partition, topic, key, valueを標準出力します。
ConsumeTask.java
package com.example.producer.client.kafka.consumergroup; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class ConsumeTask implements Runnable { private final KafkaConsumer<String, String> consumer; private final String consumerGroupName; private final String consumerName; private final String topic; public ConsumeTask(String consumerName, String topic) { this("myConsumerGroup", consumerName, topic); } public ConsumeTask(String consumerGroupName, String consumerName, String topic) { this.consumerGroupName = consumerGroupName; this.consumerName = consumerName; this.topic = topic; // configuration Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupName); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer<>(properties); } @Override public void run() { consumer.subscribe(Collections.singletonList(topic)); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1_000)); // print information about topic record records.forEach(record -> { System.out.println("group: " + consumerGroupName + ", consumer: " + consumerName + ", partition: " + record.partition() + ", topic: " + record.topic() + ", key: " + record.key() + ", value: " + record.value() ); }); } } finally { consumer.close(); } } }
こんな感じでConsumeTaskをExecutorServiceで複数スレッドを生成して呼び出します。
これで複数のconsumerを起動することが出来ます。
int numConsumers = 2; ExecutorService executorService = Executors.newFixedThreadPool(numConsumers); IntStream.rangeClosed(1, numConsumers) .forEach(i -> { ConsumeTask consumeTask = new ConsumeTask("myConsumer" + i, "five"); executorService.submit(consumeTask); });
Consumer × 1
ConsumerGroup内に1つのConsumerのパターン
1つのConsumerGroupで1つのConsumerで試してみます。
結果
Producerログ
keyを指定していないので、0~3の4つのpartitionにラウンドロビンでメッセージが送信されています。
partition: 3, topic: sin, value: num0 partition: 0, topic: sin, value: num1 partition: 2, topic: sin, value: num2 partition: 1, topic: sin, value: num3 partition: 3, topic: sin, value: num4 partition: 0, topic: sin, value: num5 partition: 2, topic: sin, value: num6 partition: 1, topic: sin, value: num7 partition: 3, topic: sin, value: num8 partition: 0, topic: sin, value: num9 : : partition: 2, topic: sin, value: num90 partition: 1, topic: sin, value: num91 partition: 3, topic: sin, value: num92 partition: 0, topic: sin, value: num93 partition: 2, topic: sin, value: num94 partition: 1, topic: sin, value: num95 partition: 3, topic: sin, value: num96 partition: 0, topic: sin, value: num97 partition: 2, topic: sin, value: num98 partition: 1, topic: sin, value: num99
Consumerログ
1つのConsumerですべてのpartitionからメッセージを読めているのが分かります。
group: myConsumerGroup, consumer: myConsumer, partition: 3, topic: sin, key: null, value: num0 group: myConsumerGroup, consumer: myConsumer, partition: 2, topic: sin, key: null, value: num2 group: myConsumerGroup, consumer: myConsumer, partition: 2, topic: sin, key: null, value: num6 group: myConsumerGroup, consumer: myConsumer, partition: 3, topic: sin, key: null, value: num4 group: myConsumerGroup, consumer: myConsumer, partition: 0, topic: sin, key: null, value: num1 group: myConsumerGroup, consumer: myConsumer, partition: 0, topic: sin, key: null, value: num5 group: myConsumerGroup, consumer: myConsumer, partition: 1, topic: sin, key: null, value: num3 group: myConsumerGroup, consumer: myConsumer, partition: 1, topic: sin, key: null, value: num7 group: myConsumerGroup, consumer: myConsumer, partition: 3, topic: sin, key: null, value: num8 group: myConsumerGroup, consumer: myConsumer, partition: 0, topic: sin, key: null, value: num9 : : group: myConsumerGroup, consumer: myConsumer, partition: 0, topic: sin, key: null, value: num89 group: myConsumerGroup, consumer: myConsumer, partition: 1, topic: sin, key: null, value: num87 group: myConsumerGroup, consumer: myConsumer, partition: 2, topic: sin, key: null, value: num94 group: myConsumerGroup, consumer: myConsumer, partition: 3, topic: sin, key: null, value: num92 group: myConsumerGroup, consumer: myConsumer, partition: 3, topic: sin, key: null, value: num96 group: myConsumerGroup, consumer: myConsumer, partition: 0, topic: sin, key: null, value: num93 group: myConsumerGroup, consumer: myConsumer, partition: 1, topic: sin, key: null, value: num91 group: myConsumerGroup, consumer: myConsumer, partition: 1, topic: sin, key: null, value: num95 group: myConsumerGroup, consumer: myConsumer, partition: 2, topic: sin, key: null, value: num98 group: myConsumerGroup, consumer: myConsumer, partition: 0, topic: sin, key: null, value: num97 group: myConsumerGroup, consumer: myConsumer, partition: 1, topic: sin, key: null, value: num99
Consumer × 2
ConsumerGroup内に2つのConsumerのパターン
1つのConsumerGroupに2つのConsumerで試してみます。
結果
Producerログ
先程と同じ。
keyを指定していないので、0~3の4つのpartitionにラウンドロビンでメッセージが送信されています。
partition: 0, topic: two, value: num0 partition: 2, topic: two, value: num1 partition: 1, topic: two, value: num2 partition: 3, topic: two, value: num3 partition: 0, topic: two, value: num4 partition: 2, topic: two, value: num5 partition: 1, topic: two, value: num6 partition: 3, topic: two, value: num7 partition: 0, topic: two, value: num8 partition: 2, topic: two, value: num9 : : partition: 1, topic: two, value: num90 partition: 3, topic: two, value: num91 partition: 0, topic: two, value: num92 partition: 2, topic: two, value: num93 partition: 1, topic: two, value: num94 partition: 3, topic: two, value: num95 partition: 0, topic: two, value: num96 partition: 2, topic: two, value: num97 partition: 1, topic: two, value: num98 partition: 3, topic: two, value: num99
Consumerログ
myConsumer1はpartition0とpartition1からメッセージを読み込み、
myConsumer2はpartition2とpartition3からメッセージを読み込んでいるのが分かります。
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: two, key: null, value: num0 group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: two, key: null, value: num1 group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: two, key: null, value: num2 group: myConsumerGroup, consumer: myConsumer2, partition: 3, topic: two, key: null, value: num3 group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: two, key: null, value: num4 group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: two, key: null, value: num5 group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: two, key: null, value: num6 group: myConsumerGroup, consumer: myConsumer2, partition: 3, topic: two, key: null, value: num7 group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: two, key: null, value: num8 group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: two, key: null, value: num9 group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: two, key: null, value: num10 group: myConsumerGroup, consumer: myConsumer2, partition: 3, topic: two, key: null, value: num11 : : group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: two, key: null, value: num88 group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: two, key: null, value: num89 group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: two, key: null, value: num90 group: myConsumerGroup, consumer: myConsumer2, partition: 3, topic: two, key: null, value: num91 group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: two, key: null, value: num92 group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: two, key: null, value: num93 group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: two, key: null, value: num94 group: myConsumerGroup, consumer: myConsumer2, partition: 3, topic: two, key: null, value: num95 group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: two, key: null, value: num96 group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: two, key: null, value: num97 group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: two, key: null, value: num98 group: myConsumerGroup, consumer: myConsumer2, partition: 3, topic: two, key: null, value: num99
Consumer × 4
ConsumerGroup内に4つのConsumerのパターン
1つのConsumerGroupに4つのConsumerで試してみます。
partition数と同じConsumerの数です。
結果
Producerログ
先程と同じ。
keyを指定していないので、0~3の4つのpartitionにラウンドロビンでメッセージが送信されています。
partition: 2, topic: four, value: num0 partition: 1, topic: four, value: num1 partition: 3, topic: four, value: num2 partition: 0, topic: four, value: num3 partition: 2, topic: four, value: num4 partition: 1, topic: four, value: num5 partition: 3, topic: four, value: num6 partition: 0, topic: four, value: num7 partition: 2, topic: four, value: num8 partition: 1, topic: four, value: num9 : : partition: 3, topic: four, value: num90 partition: 0, topic: four, value: num91 partition: 2, topic: four, value: num92 partition: 1, topic: four, value: num93 partition: 3, topic: four, value: num94 partition: 0, topic: four, value: num95 partition: 2, topic: four, value: num96 partition: 1, topic: four, value: num97 partition: 3, topic: four, value: num98 partition: 0, topic: four, value: num99
Consumerログ
myConsumer1はpartition0,
myConsumer2はpartition1,
myConsumer3はpartition2,
myConsumer4はpartition3からそれぞれメッセージを読み込んでいるのが分かります。
それぞれのconsumerがそれぞれ1つずつpartitionを読み込んでいます。
group: myConsumerGroup, consumer: myConsumer3, partition: 2, topic: four, key: null, value: num0 group: myConsumerGroup, consumer: myConsumer2, partition: 1, topic: four, key: null, value: num1 group: myConsumerGroup, consumer: myConsumer4, partition: 3, topic: four, key: null, value: num2 group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: four, key: null, value: num3 group: myConsumerGroup, consumer: myConsumer3, partition: 2, topic: four, key: null, value: num4 group: myConsumerGroup, consumer: myConsumer2, partition: 1, topic: four, key: null, value: num5 group: myConsumerGroup, consumer: myConsumer4, partition: 3, topic: four, key: null, value: num6 group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: four, key: null, value: num7 group: myConsumerGroup, consumer: myConsumer3, partition: 2, topic: four, key: null, value: num8 group: myConsumerGroup, consumer: myConsumer2, partition: 1, topic: four, key: null, value: num9 group: myConsumerGroup, consumer: myConsumer4, partition: 3, topic: four, key: null, value: num10 group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: four, key: null, value: num11 : : group: myConsumerGroup, consumer: myConsumer4, partition: 3, topic: four, key: null, value: num90 group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: four, key: null, value: num91 group: myConsumerGroup, consumer: myConsumer3, partition: 2, topic: four, key: null, value: num92 group: myConsumerGroup, consumer: myConsumer2, partition: 1, topic: four, key: null, value: num93 group: myConsumerGroup, consumer: myConsumer4, partition: 3, topic: four, key: null, value: num94 group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: four, key: null, value: num95 group: myConsumerGroup, consumer: myConsumer3, partition: 2, topic: four, key: null, value: num96 group: myConsumerGroup, consumer: myConsumer2, partition: 1, topic: four, key: null, value: num97 group: myConsumerGroup, consumer: myConsumer4, partition: 3, topic: four, key: null, value: num98 group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: four, key: null, value: num99
Consumer × 5
ConsumerGroup内に5つのConsumerのパターン
1つのConsumerGroupに4つのConsumerで試してみます。
partition数よりもconsumerの数が1つが多いです。
結果
Producerログ
先程と同じ。
keyを指定していないので、0~3の4つのpartitionにラウンドロビンでメッセージが送信されています。
partition: 3, topic: five, value: num0 partition: 0, topic: five, value: num1 partition: 2, topic: five, value: num2 partition: 1, topic: five, value: num3 partition: 3, topic: five, value: num4 partition: 0, topic: five, value: num5 partition: 2, topic: five, value: num6 partition: 1, topic: five, value: num7 partition: 3, topic: five, value: num8 partition: 0, topic: five, value: num9 : : partition: 2, topic: five, value: num90 partition: 1, topic: five, value: num91 partition: 3, topic: five, value: num92 partition: 0, topic: five, value: num93 partition: 2, topic: five, value: num94 partition: 1, topic: five, value: num95 partition: 3, topic: five, value: num96 partition: 0, topic: five, value: num97 partition: 2, topic: five, value: num98 partition: 1, topic: five, value: num99
Consumerログ
myConsumer1はpartition0,
myConsumer2はpartition1,
myConsumer3はpartition2,
myConsumer4はpartition3からそれぞれメッセージを読み込んでいるのが分かります。
myConsumer5は何も読めないので、持て余していることが分かります。
group: myConsumerGroup, consumer: myConsumer4, partition: 3, topic: five, key: null, value: num0 group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: five, key: null, value: num1 group: myConsumerGroup, consumer: myConsumer3, partition: 2, topic: five, key: null, value: num2 group: myConsumerGroup, consumer: myConsumer2, partition: 1, topic: five, key: null, value: num3 group: myConsumerGroup, consumer: myConsumer4, partition: 3, topic: five, key: null, value: num4 group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: five, key: null, value: num5 group: myConsumerGroup, consumer: myConsumer3, partition: 2, topic: five, key: null, value: num6 group: myConsumerGroup, consumer: myConsumer2, partition: 1, topic: five, key: null, value: num7 group: myConsumerGroup, consumer: myConsumer4, partition: 3, topic: five, key: null, value: num8 group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: five, key: null, value: num9 group: myConsumerGroup, consumer: myConsumer3, partition: 2, topic: five, key: null, value: num10 group: myConsumerGroup, consumer: myConsumer2, partition: 1, topic: five, key: null, value: num11 : : group: myConsumerGroup, consumer: myConsumer3, partition: 2, topic: five, key: null, value: num90 group: myConsumerGroup, consumer: myConsumer2, partition: 1, topic: five, key: null, value: num91 group: myConsumerGroup, consumer: myConsumer4, partition: 3, topic: five, key: null, value: num92 group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: five, key: null, value: num93 group: myConsumerGroup, consumer: myConsumer3, partition: 2, topic: five, key: null, value: num94 group: myConsumerGroup, consumer: myConsumer2, partition: 1, topic: five, key: null, value: num95 group: myConsumerGroup, consumer: myConsumer4, partition: 3, topic: five, key: null, value: num96 group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: five, key: null, value: num97 group: myConsumerGroup, consumer: myConsumer3, partition: 2, topic: five, key: null, value: num98 group: myConsumerGroup, consumer: myConsumer2, partition: 1, topic: five, key: null, value: num99
2 Group (Consumer × 2, Consumer × 4)
2つのConsumerGroup。
ConsumerGroup1には2つのConsumer、
ConsumerGroup2には4つのConsumer
のパターン。
この図に近い形です。(Brokerのサーバは別れていませんが)
ConsumerGroupを2つ作成して、それぞれのgroupからconsumeしてみます。
結果
Producerログ
先程と同じ。
keyを指定していないので、0~3の4つのpartitionにラウンドロビンでメッセージが送信されています。
partition: 3, topic: gtopic, value: num0 partition: 0, topic: gtopic, value: num1 partition: 2, topic: gtopic, value: num2 partition: 1, topic: gtopic, value: num3 partition: 3, topic: gtopic, value: num4 partition: 0, topic: gtopic, value: num5 partition: 2, topic: gtopic, value: num6 partition: 1, topic: gtopic, value: num7 partition: 3, topic: gtopic, value: num8 partition: 0, topic: gtopic, value: num9 : : partition: 2, topic: gtopic, value: num90 partition: 1, topic: gtopic, value: num91 partition: 3, topic: gtopic, value: num92 partition: 0, topic: gtopic, value: num93 partition: 2, topic: gtopic, value: num94 partition: 1, topic: gtopic, value: num95 partition: 3, topic: gtopic, value: num96 partition: 0, topic: gtopic, value: num97 partition: 2, topic: gtopic, value: num98 partition: 1, topic: gtopic, value: num99
Consumerログ
group1
consumer1はpartition0とpartition1からメッセージを読み込み、
consumer2はpartition2とpartition3からメッセージを読み込んでいるのが分かります。
group: group1, consumer: consumer1, partition: 0, topic: gtopic, key: null, value: num1 group: group1, consumer: consumer2, partition: 3, topic: gtopic, key: null, value: num0 group: group1, consumer: consumer2, partition: 2, topic: gtopic, key: null, value: num2 group: group1, consumer: consumer1, partition: 1, topic: gtopic, key: null, value: num3 group: group1, consumer: consumer2, partition: 3, topic: gtopic, key: null, value: num4 group: group1, consumer: consumer1, partition: 0, topic: gtopic, key: null, value: num5 group: group1, consumer: consumer2, partition: 2, topic: gtopic, key: null, value: num6 group: group1, consumer: consumer1, partition: 1, topic: gtopic, key: null, value: num7 group: group1, consumer: consumer2, partition: 3, topic: gtopic, key: null, value: num8 group: group1, consumer: consumer1, partition: 0, topic: gtopic, key: null, value: num9 group: group1, consumer: consumer2, partition: 2, topic: gtopic, key: null, value: num10 group: group1, consumer: consumer1, partition: 1, topic: gtopic, key: null, value: num11 : : group: group1, consumer: consumer2, partition: 2, topic: gtopic, key: null, value: num90 group: group1, consumer: consumer1, partition: 1, topic: gtopic, key: null, value: num91 group: group1, consumer: consumer2, partition: 3, topic: gtopic, key: null, value: num92 group: group1, consumer: consumer1, partition: 0, topic: gtopic, key: null, value: num93 group: group1, consumer: consumer2, partition: 2, topic: gtopic, key: null, value: num94 group: group1, consumer: consumer1, partition: 1, topic: gtopic, key: null, value: num95 group: group1, consumer: consumer2, partition: 3, topic: gtopic, key: null, value: num96 group: group1, consumer: consumer1, partition: 0, topic: gtopic, key: null, value: num97 group: group1, consumer: consumer2, partition: 2, topic: gtopic, key: null, value: num98 group: group1, consumer: consumer1, partition: 1, topic: gtopic, key: null, value: num99
group2
consumer1はpartition0,
consumer2はpartition1,
consumer3はpartition2,
consumer4はpartition3からそれぞれメッセージを読み込んでいるのが分かります。
group: group2, consumer: consumer4, partition: 3, topic: gtopic, key: null, value: num0 group: group2, consumer: consumer1, partition: 0, topic: gtopic, key: null, value: num1 group: group2, consumer: consumer3, partition: 2, topic: gtopic, key: null, value: num2 group: group2, consumer: consumer2, partition: 1, topic: gtopic, key: null, value: num3 group: group2, consumer: consumer4, partition: 3, topic: gtopic, key: null, value: num4 group: group2, consumer: consumer1, partition: 0, topic: gtopic, key: null, value: num5 group: group2, consumer: consumer3, partition: 2, topic: gtopic, key: null, value: num6 group: group2, consumer: consumer2, partition: 1, topic: gtopic, key: null, value: num7 group: group2, consumer: consumer4, partition: 3, topic: gtopic, key: null, value: num8 group: group2, consumer: consumer1, partition: 0, topic: gtopic, key: null, value: num9 group: group2, consumer: consumer3, partition: 2, topic: gtopic, key: null, value: num10 group: group2, consumer: consumer2, partition: 1, topic: gtopic, key: null, value: num11 : : group: group2, consumer: consumer3, partition: 2, topic: gtopic, key: null, value: num90 group: group2, consumer: consumer2, partition: 1, topic: gtopic, key: null, value: num91 group: group2, consumer: consumer4, partition: 3, topic: gtopic, key: null, value: num92 group: group2, consumer: consumer1, partition: 0, topic: gtopic, key: null, value: num93 group: group2, consumer: consumer3, partition: 2, topic: gtopic, key: null, value: num94 group: group2, consumer: consumer2, partition: 1, topic: gtopic, key: null, value: num95 group: group2, consumer: consumer4, partition: 3, topic: gtopic, key: null, value: num96 group: group2, consumer: consumer1, partition: 0, topic: gtopic, key: null, value: num97 group: group2, consumer: consumer3, partition: 2, topic: gtopic, key: null, value: num98 group: group2, consumer: consumer2, partition: 1, topic: gtopic, key: null, value: num99
確認したのはこんなとこです。
【参考】
consumeタスクはこのあたりを参考にしました。
https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/
サンプルコードは下記に置きました。
Producer
github.com
Consumer
github.com
おわり。