Kafka入门,分区的分配再平衡(二十)

这篇具有很好参考价值的文章主要介绍了Kafka入门,分区的分配再平衡(二十)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

分区的分配以及再平衡

Kafka入门,分区的分配再平衡(二十),kafka,linq,分布式

1、kafka有四种主流的分区策略:Range,RoundRobin,Sticky,CooperativeSticky。可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Ranage+CooperativeSticky。Kafka可以同事使用多个分区分配策略。

参数 描述
heartbeat.interval.ms Kafka消费者和coordinator之间的心跳时间,默认3s。该条目的值必须小于session.timeout.ms,也不应该高于session.timeoyt.ms的1/3
session.timeout.ms Kafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者执行再平衡
max.poll.interval.ms 消费者处理消息的最大时长,默认是5分钟,超过该值被移除,消费者执行再平衡
partition.assignment.strategy 消费者分区分配策略,默认策略是Range+CooperativeStickt。Kafka可以同事使用多个分区分配策略。可以选择策略包括:Range,RoundRobin,sticky,CooperativeSticky

Range以及再平衡

Kafka入门,分区的分配再平衡(二十),kafka,linq,分布式

Range分区策略原理
Range是对每个topic而言
首先对同一个topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
加入现在有7个分区,3个消费者,排序后的分区将会是0,1,2,3,4,5,6消费者排序完之后将会是C0,C1,C2
通过partitions数/consumer数来决定每个消费者应该消费几个分区,如果除不尽,那么前面几个消费者将会多消费一个分区。
例如。7/3=2余1,除不尽,那么消费者C0便会多消费者1个分区。8/3=2余2,除不尽,那么C0和C1分别多消费一个。
注意:如果只是针对一个topic而言,C0消费者多一个分区影响不是很大,但是如果有N个topic,那么针对每个topic,消费者C0都将多消费一个分区,topic越多,C0消费的分区会比其他消费者明显多消费N个分区
容易尝试数据倾斜
测试代码

package com.longer.range;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * 测试指定分区(partition)
 */
public class Producer {
    public static void main(String[] args) throws InterruptedException {
        //1、创建kafka生产者得配置对象
        Properties properties=new Properties();
        //2、给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //3、key value 序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //4、创建kafka生产者对象
        KafkaProducer<String,String> producer=new KafkaProducer<String, String>(properties);
        for (int i = 0; i < 500; i++) {
            //指定数据发送到1号分区,key为空(IDEA中,ctrl+p查看参数)
            producer.send(new ProducerRecord<>("two", "longer " + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if(e==null){
                        System.out.println(String.format("主题:%s,分区:%s",metadata.topic(),metadata.partition()));
                        return;
                    }
                    e.printStackTrace();
                }

            });
            Thread.sleep(1000);
        }
        //关闭资源
        producer.close();
    }
}

package com.longer.range;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumer1 {
    public static void main(String[] args) {
        //创建消费者的配置对象
        Properties properties=new Properties();
        //2、给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //配置序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //配置消费者组(组名任意起名)必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //创建消费者对象
        KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
        //注册要消费的主题
        ArrayList<String> topics=new ArrayList<>();
        topics.add("two");
        kafkaConsumer.subscribe(topics);
        while (true){
            //设置1s中消费一批数据
            ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
            //打印消费到的数据
            for(ConsumerRecord<String,String> record:consumerRecords){
                System.out.println(record);
            }
        }
    }
}

用一个消费者每一秒发送一条信息,三个消费者接收。观察打印情况。再停止其中一个消费者,再观察情况。
(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
1 号消费者:消费到 3、4 号分区数据。
2 号消费者:消费到 5、6 号分区数据。
0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者。
说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需
要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
(2)再次重新发送消息观看结果(45s 以后)。
1 号消费者:消费到 0、1、2、3 号分区数据。
2 号消费者:消费到 4、5、6 号分区数据。
说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。

RoundRobin 以及再平衡

Kafka入门,分区的分配再平衡(二十),kafka,linq,分布式
RoundRobin针对集群所有Topic而言
RoundRobin沦陷分区策略,是把所有的partition和所有的consumer都列出来,然后按照hashcode而进行排序,最后通过沦陷算法来分配partition给各个消费者。
修改分区策略

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");

测试代码

public class CustomConsumer1 {
    public static void main(String[] args) {
        //创建消费者的配置对象
        Properties properties=new Properties();
        //2、给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //配置序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //配置消费者组(组名任意起名)必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //修改分区策略
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
        //创建消费者对象
        KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
        //注册要消费的主题
        ArrayList<String> topics=new ArrayList<>();
        topics.add("two");
        kafkaConsumer.subscribe(topics);
        while (true){
            //设置1s中消费一批数据
            ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
            //打印消费到的数据
            for(ConsumerRecord<String,String> record:consumerRecords){
                System.out.println(record);
            }
        }
    }
}

(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
1 号消费者:消费到 2、5 号分区数据
2 号消费者:消费到 4、1 号分区数据
0 号消费者的任务会按照 RoundRobin 的方式,把数据轮询分成 0 、6 和 3 号分区数据,
分别由 1 号消费者或者 2 号消费者消费。
说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需
要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
(2)再次重新发送消息观看结果(45s 以后)。
1 号消费者:消费到 0、2、4、6 号分区数据
2 号消费者:消费到 1、3、5 号分区数据
说明:消费者 0 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配。

Sticky 以及再平衡

粘性分区定义:可以理解为分配的结果带有”粘性的“,即再执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配变动,可以节省大量的开销。
粘性分区时Kafka从0.11.x版本开始引入这种分配策略,首先会尽量保持原有分配的分区不变化
测试代码

package com.longer.sticky;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumer1 {
    public static void main(String[] args) {
        //创建消费者的配置对象
        Properties properties=new Properties();
        //2、给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //配置序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //配置消费者组(组名任意起名)必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //修改分区策略
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
        //创建消费者对象
        KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
        //注册要消费的主题
        ArrayList<String> topics=new ArrayList<>();
        topics.add("two");
        kafkaConsumer.subscribe(topics);
        while (true){
            //设置1s中消费一批数据
            ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
            //打印消费到的数据
            for(ConsumerRecord<String,String> record:consumerRecords){
                System.out.println(record);
            }
        }
    }
}

(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
1 号消费者:消费到 2、5、3 号分区数据。
2 号消费者:消费到 4、6 号分区数据。
0 号消费者的任务会按照粘性规则,尽可能均衡的随机分成 0 和 1 号分区数据,分别
由 1 号消费者或者 2 号消费者消费。
说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需
要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
(2)再次重新发送消息观看结果(45s 以后)。
1 号消费者:消费到 2、3、5 号分区数据。
2 号消费者:消费到 0、1、4、6 号分区数据。
说明:消费者 0 已经被踢出消费者组,所以重新按照粘性方式分配

总结

range会造成数据倾斜,RoundRobin不会造成,但是分区调整不会考虑最小变动。sticky,尽量少的调整分配变动,可以节省大量的开销。文章来源地址https://www.toymoban.com/news/detail-528969.html

到了这里,关于Kafka入门,分区的分配再平衡(二十)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • Kafka3.0.0版本——消费者(Sticky分区分配策略以及再平衡)

    Kafka3.0.0版本——消费者(Sticky分区分配策略以及再平衡)

    粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。 粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略, 首先会尽量均衡的放置分区到消费者上面, 在出现同一消

    2024年02月09日
    浏览(27)
  • 分布式 - 消息队列Kafka:Kafka消费者的分区分配策略

    分布式 - 消息队列Kafka:Kafka消费者的分区分配策略

    Kafka 消费者负载均衡策略? Kafka 消费者分区分配策略? 1. 环境准备 创建主题 test 有5个分区,准备 3 个消费者并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。 ① 创建主题 test,该主题有5个分区,2个副本: ② 创建3个消费者CustomConsu

    2024年02月13日
    浏览(16)
  • kafka入门(九):kafka分区分配策略

    kafka分区分配策略 参数: Kafka提供了消费者客户端参数partition.assignment.strategy来设置消费者与订阅主题之间的分区分配策略。 默认情况下,此参数的值为 org.apache.kafka.clients.consumer.RangeAssignor,即采用RangeAssignor分配策略。除此之外,Kafka还提供了另外两种分配策略:RoundRobinAs

    2024年01月21日
    浏览(16)
  • kafka入门(八):kafka分区分配策略

    kafka分区分配策略 参数: Kafka提供了消费者客户端参数partition.assignment.strategy来设置消费者与订阅主题之间的分区分配策略。 默认情况下,此参数的值为 org.apache.kafka.clients.consumer.RangeAssignor,即采用RangeAssignor分配策略。除此之外,Kafka还提供了另外两种分配策略:RoundRobinAs

    2024年01月25日
    浏览(14)
  • 消息队列-Kafka-消费方如何分区与分区重平衡

    消息队列-Kafka-消费方如何分区与分区重平衡

    消费分区 资料来源于网络 消费者订阅的入口:KafkaConsumer#subscribe 消费者消费的入口:KafkaConsumer#poll 处理流程: 对元数据重平衡处理:KafkaConsumer#updateAssignmentMetadataIfNeeded 协调器的拉取处理:onsumerCoordinator#poll 执行已完成的【消费进度】提交请求的回调函数:invokeCompletedOff

    2024年04月14日
    浏览(14)
  • kafka分区分配策略

    kafka分区分配策略

    现有主流消息中间件都是生产者-消费者模型,主要角色都是:Producer - Broker - Consumer,上手起来非常简单,但仍有需要知识点需要我们关注,才能避免一些错误的使用情况,或者使用起来更加高效,例如本篇要讲的kafka分区分配策略。 在开始前我们先简单回顾一下kafka消息存储

    2024年02月16日
    浏览(15)
  • Kafka消费分组和分区分配策略

    Kafka消费分组和分区分配策略

    同一个消费组里的消费者不能消费同一个分区,不同消费组的消费组可以消费同一个分区 (即同一个消费组里面的消费者只能在一个分区中) 用过 Kafka 的同学用过都知道,每个 Topic 一般会有很多个 partitions。为了使得我们能够及时消费消息,我们也可能会启动多个 Consumer

    2024年02月05日
    浏览(9)
  • Kafka 原理以及分区分配策略剖析

    Kafka 原理以及分区分配策略剖析

    一、简介 Apache Kafka 是一个分布式的流处理平台(分布式的基于发布/订阅模式的消息队列【Message Queue】)。 流处理平台有以下3个特性: 可以让你发布和订阅流式的记录。这一方面与消息队列或者企业消息系统类似。 可以储存流式的记录,并且有较好的容错性。 可以在流式

    2023年04月08日
    浏览(11)
  • kafka(一:分区数据不均衡(数据倾斜),分区分配策略)

    https://cloud.tencent.com/developer/article/1755177 可以设置一个新的列,根据这个列进行hash。

    2024年01月22日
    浏览(11)
  • kafka的消费者分区分配策略

    kafka的消费者分区分配策略

    kafka有三种分区分配策略 1.RoundRobin 2.Range 3.Sticky 一、RoundRobin RoundRobin策略很简单~假设我们有三个Topic10个Partition,上图! 假设顺序为A-0,A-1,A-2...C-2  不难看出轮询策略是将partition当做最小分配单位,将所有topic的partition都看作一个整体。然后为消费者轮询分配partition。当然得到

    2024年02月06日
    浏览(13)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包