kafka的消费者分区分配策略

这篇具有很好参考价值的文章主要介绍了kafka的消费者分区分配策略。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

kafka有三种分区分配策略

1.RoundRobin

2.Range

3.Sticky

一、RoundRobin

RoundRobin策略很简单~假设我们有三个Topic10个Partition,上图!

假设顺序为A-0,A-1,A-2...C-2

kafka的消费者分区分配策略

 不难看出轮询策略是将partition当做最小分配单位,将所有topic的partition都看作一个整体。然后为消费者轮询分配partition。当然得到此结果的前提是Consumer Group种的消费者订阅信息是一致的,如果订阅信息不一致,得到的结果也不均匀,下面举个例子:

kafka的消费者分区分配策略

 如图,Consumer0订阅Topic-A、B,Consumer1订阅Topic-B、C

顺序注意图中的Seq,先分配TopicA

第一轮 : Consumer-0: Topic-A-Partition0

由于Consumer-1没有订阅Topic-A,所以只能找到Topic-B给Consumer-1分配

于是 Consumer-1: Topic-B-Partition0

------------------------------------------------------------------------------------------------------

第二轮: Consumer-0: Topic-A-Partition0,Topic-A-Partition1

             Consumer-1: Topic-B-Partition0,Topic-B-Partition1

------------------------------------------------------------------------------------------------------

第三轮: Consumer-0: Topic-A-Partition0,Topic-A-Partition1,Topic-A-Partition2

             Consumer-1: Topic-B-Partition0,Topic-B-Partition1,Topic-B-Partition2

------------------------------------------------------------------------------------------------------

第四、五、六轮:

Consumer-0: Topic-A-Partition0,Topic-A-Partition1,Topic-A-Partition2

Consumer-1: Topic-B-Partition0,Topic-B-Partition1,Topic-B-Partition2,Topic-C-Partition-0,Topic-C-Partition-1,Topic-C-Partition-2

------------------------------------------------------------------------------------------------------

可以看到Consumer-1多消费了3个分区。所以在Consumer Group有订阅消息不一致的情况下,我们最好不要选用RoundRobin。

二、Range(默认的分配策略)

Range策略不同于RoundRobin之处在于Range策略是面向Topic分配的(RoundRobin面向Partition)

,假设顺序为Topic-A,Topic-B,Topic-C,那么分配过程为:

第一轮: Consumer-0: Topic-A-Partition0

             Consumer-1:Topic-A-Partition1

             Consumer-2:Topic-A-Partition2

------------------------------------------------------------------------------------------------------------------------------

第二轮: Consumer-0: Topic-A-Partition0,Topic-B-Partition0,Topic-B-Partition1

             Consumer-1:Topic-A-Partition1,Topic-B-Partition2

             Consumer-2:Topic-A-Partition2,Topic-B-Partition3

-------------------------------------------------------------------------------------------------------------------------------

第三轮: Consumer-0: Topic-A-Partition0,Topic-B-Partition0,Topic-B-Partition1,Topic-C-Partition-0

             Consumer-1:Topic-A-Partition1,Topic-B-Partition2,Topic-C-Partition-1

             Consumer-2:Topic-A-Partition2,Topic-B-Partition3,Topic-C-Partition-2

最终结果如图,很明显我们可以看到它的特点是以topic为主进行划分的,假设消费者数量为N,主题分区数量为M则有当前主题分配数量 = M%N==0? M/N +1 : M/N ;

kafka的消费者分区分配策略

 Range策略的缺点在于如果Topic足够多、且分区数量不能被平均分配时,会出现消费过载的情景,举一个例子

kafka的消费者分区分配策略

 可以看到此种情况已经相差3个分区,如果主题进一步扩大差距会愈发明显。

--------------------------------------------------------------------------------------------------------------------

三 、Sticky

kafka在0.11版本引入了Sticky分区分配策略,它的两个主要目的是:

1.分区的分配要尽可能的均匀,分配给消费者者的主题分区数最多相差一个;

2.分区的分配尽可能的与上次分配的保持相同。

当两者发生冲突时,第一个目标优先于第二个目标。以RoundRobin的不均衡为例,kafka的消费者分区分配策略

 此时的结果明显非常不均衡,如果使用Sticky策略的话结果应该是如此:

kafka的消费者分区分配策略

在这里我给出实际测试结果参考,稍后会将代码贴出供读者自行测试。

Sticky:

consumer-0:

topic :Topic-A;;;partition:2
topic :Topic-A;;;partition:1
topic :Topic-B;;;partition:2
topic :Topic-A;;;partition:0
topic :Topic-B;;;partition:0

 consumer-1:

topic :Topic-B;;;partition:1
topic :Topic-C;;;partition:1
topic :Topic-C;;;partition:0
topic :Topic-C;;;partition:2

笔者描述的逻辑仅供参考,是在仅笔者描述的情况下才出现的结果,实际结果以读者自行测试为准。

1.导入kafka依赖

 <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.12</artifactId>
       <version>1.1.1</version>
 </dependency>

2.启动zookeeper,启动kafka;(windows版本)

3.命令行切换到kafka目录下;执行以下命令创建Topic & partition

kafka-topics.bat --create --zookeeper 127.0.0.1:2181 --replication-factor 2 --partitions 3 --topic Topic-A

kafka-topics.bat --create --zookeeper 127.0.0.1:2181 --replication-factor 2 --partitions 3 --topic Topic-B

kafka-topics.bat --create --zookeeper 127.0.0.1:2181 --replication-factor 2 --partitions 3 --topic Topic-C

测试代码:

 consumer-0:

  public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9092,127.0.0.1:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-0");
        //设置分配策略
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,RangeAssignor.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Arrays.asList("Topic-A","Topic-B"), new ConsumerRebalanceListener() {

            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                System.err.println("onPartitionsRevoked=========================================== ");
                for (TopicPartition pt: collection
                     ) {
                    System.err.println("topic :"+pt.topic() + ";;;partition:"+pt.partition());
                }
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                System.err.println("onPartitionsAssigned=========================================== ");
                for (TopicPartition pt: collection
                ) {
                    System.err.println("topic :"+pt.topic() + ";;;partition:"+pt.partition());
                }
            }

            @Override
            public void onPartitionsLost(Collection<TopicPartition> partitions) {
                ConsumerRebalanceListener.super.onPartitionsLost(partitions);
            }
        });

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {

                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
            }
        }
    }

consumer-1

  public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9092,127.0.0.1:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-0");
        //设置分配策略
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,RangeAssignor.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Arrays.asList("Topic-B","Topic-C"), new ConsumerRebalanceListener() {

            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                System.err.println("onPartitionsRevoked=========================================== ");
                for (TopicPartition pt: collection
                     ) {
                    System.err.println("topic :"+pt.topic() + ";;;partition:"+pt.partition());
                }
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                System.err.println("onPartitionsAssigned=========================================== ");
                for (TopicPartition pt: collection
                ) {
                    System.err.println("topic :"+pt.topic() + ";;;partition:"+pt.partition());
                }
            }

            @Override
            public void onPartitionsLost(Collection<TopicPartition> partitions) {
                ConsumerRebalanceListener.super.onPartitionsLost(partitions);
            }
        });

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {

                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
            }
        }
    }

关于kafka的消费者分区分配策略就聊到这里,感谢大家,如果错误描述请指正,下章我们讨论kafka消费者重复消以及漏消费的问题

Kafka消费者重复消费数据以及漏消费问题_一念花开_的博客-CSDN博客文章来源地址https://www.toymoban.com/news/detail-456928.html

到了这里,关于kafka的消费者分区分配策略的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 分布式 - 消息队列Kafka:Kafka消费者的分区分配策略

    分布式 - 消息队列Kafka:Kafka消费者的分区分配策略

    Kafka 消费者负载均衡策略? Kafka 消费者分区分配策略? 1. 环境准备 创建主题 test 有5个分区,准备 3 个消费者并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。 ① 创建主题 test,该主题有5个分区,2个副本: ② 创建3个消费者CustomConsu

    2024年02月13日
    浏览(16)
  • Kafka3.0.0版本——消费者(RoundRobin分区分配策略以及再平衡)

    Kafka3.0.0版本——消费者(RoundRobin分区分配策略以及再平衡)

    RoundRobin 针对集群中 所有Topic而言。 RoundRobin 轮询分区策略,是把 所有的 partition 和所有的consumer 都列出来 ,然后 按照 hashcode 进行排序 ,最后通过 轮询算法 来分配 partition 给到各个消费者。 2.1、创建带有7个分区的sixTopic主题 在 Kafka 集群控制台,创建带有7个分区的sixTopi

    2024年02月07日
    浏览(11)
  • Kafka3.0.0版本——消费者(Sticky分区分配策略以及再平衡)

    Kafka3.0.0版本——消费者(Sticky分区分配策略以及再平衡)

    粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。 粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略, 首先会尽量均衡的放置分区到消费者上面, 在出现同一消

    2024年02月09日
    浏览(28)
  • kafka消费者api和分区分配和offset消费

    kafka消费者api和分区分配和offset消费

    消费者的消费方式为主动从broker拉取消息,由于消费者的消费速度不同,由broker决定消息发送速度难以适应所有消费者的能力 拉取数据的问题在于,消费者可能会获得空数据 Consumer Group(CG):消费者组 由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同

    2024年02月16日
    浏览(13)
  • Kafka学习---4、消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)

    Kafka学习---4、消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)

    1.1 Kafka消费方式 1、pull(拉)模式:consumer采用从broker中主动拉取数据。 2、push(推)模式:Kafka没有采用这种方式。因为broker决定消息发生速率,很难适应所有消费者的消费速率。例如推送的速度是50M/s,Consumer1、Consumer2就来不及处理消息。 pull模式不足之处是如果Kafka没有数

    2024年02月16日
    浏览(13)
  • 【消息队列】细说Kafka消费者的分区分配和重平衡

    【消息队列】细说Kafka消费者的分区分配和重平衡

    我们直到在性能设计中异步模式,一般要么是采用pull,要么采用push。而两种方式各有优缺点。 pull :说白了就是通过消费端进行主动拉去数据,会根据自身系统处理能力去获取消息,上有Broker系统无需关注消费端的消费能力。kafka采用pull模式 push : Broker主动推送消息到消费端

    2024年02月12日
    浏览(11)
  • Kafka3.0.0版本——消费者(分区的分配以及再平衡)

    Kafka3.0.0版本——消费者(分区的分配以及再平衡)

    1.1、消费者分区及消费者组的概述 一个consumer group中有多个consumer组成,一个 topic有多个partition组成。 1.2、如何确定哪个consumer来消费哪个partition的数据 Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。 可以通过配置参数 partition.assignment.strategy ,修改分

    2024年02月07日
    浏览(15)
  • 实现 Kafka 分区内消费者多线程顺序消费

    生产者在写的时候,可以指定一个 key,被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。 消费者从 partition 中取出来数据的时候,也一定是有顺序的。到这里,顺序还是没有错乱的。 但是消费者里可能会有多个线程来并发处理消息,而多个线程并发

    2024年02月07日
    浏览(14)
  • Kafka3.0.0版本——消费者(独立消费者消费某一个主题中某个分区数据案例__订阅分区)

    Kafka3.0.0版本——消费者(独立消费者消费某一个主题中某个分区数据案例__订阅分区)

    1.1、案例需求 创建一个独立消费者,消费firstTopic主题 0 号分区的数据,所下图所示: 1.2、案例代码 生产者往firstTopic主题 0 号分区发送数据代码 消费者消费firstTopic主题 0 分区数据代码 1.3、测试 在 IDEA 中执行消费者程序,如下图: 在 IDEA 中执行生产者程序 ,在控制台观察

    2024年02月09日
    浏览(17)
  • kafka复习:(22)一个分区只能被消费者组中的一个消费者消费吗?

    kafka复习:(22)一个分区只能被消费者组中的一个消费者消费吗?

    默认情况下,一个分区只能被消费者组中的一个消费者消费。但可以自定义PartitionAssignor来打破这个限制。 一、自定义PartitionAssignor. 二、定义两个消费者,给其配置上述PartitionAssignor. 在kafka创建只有一个分区的topic : study2023 创建一个生产者往study2023这个 topic发送消息: 分别

    2024年02月10日
    浏览(17)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包