Kafka-客户端使用

这篇具有很好参考价值的文章主要介绍了Kafka-客户端使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

理解Kafka正确使用方式

Kafka提供了两套客户端API,HighLevel API和LowLevel API。

  • HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。

  • LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,但是使用起来非常复杂,也更容易出错。只在极少数对性能要求非常极致的场景才会偶尔使用。

基础的客户端

引入Maven依赖:

<dependency>
	<groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.13</artifactId>
    <version>3.4.0</version>
</dependency>

消息发送者主流程

public class MyProducer
{
    // private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";
    private static final String BOOTSTRAP_SERVERS = "你的公网IP:9092";
    
    private static final String TOPIC = "disTopic";
    
    public static void main(String[] args)
        throws ExecutionException, InterruptedException
    {
        // PART1:设置发送者相关属性
        Properties props = new Properties();
        // 此处配置的是kafka的端口
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        // 配置key的序列化类
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 配置value的序列化类
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        
        Producer<String, String> producer = new KafkaProducer<>(props);
        CountDownLatch latch = new CountDownLatch(5);
        for (int i = 0; i < 5; i++)
        {
            // Part2:构建消息
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);
            // Part3:发送消息
            // 单向发送:不关心服务端的应答。
            // producer.send(record);
            // System.out.println("message "+i+" sended");
            
            // 同步发送:获取服务端应答消息前,会阻塞当前线程。
            // RecordMetadata recordMetadata = producer.send(record).get();
            // String topic = recordMetadata.topic();
            // int partition = recordMetadata.partition();
            // long offset = recordMetadata.offset();
            // String message = recordMetadata.toString();
            // System.out.println("message:[" + message + "] sended with topic:" + topic + "; partition:" + partition
            // + ";offset:" + offset);
            
            // 异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数
            producer.send(record, new Callback()
            {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e)
                {
                    if (null != e)
                    {
                        System.out.println("消息发送失败," + e.getMessage());
                        e.printStackTrace();
                    }
                    else
                    {
                        String topic = recordMetadata.topic();
                        long offset = recordMetadata.offset();
                        String message = recordMetadata.toString();
                        System.out
                            .println("message:[" + message + "] sended with topic:" + topic + ";offset:" + offset);
                    }
                    latch.countDown();
                }
            });
        }
        // 消息处理完才停止发送者。
        latch.await();
        producer.close();
    }
}

构建Producer分为三个步骤:

1、设置Producer核心属性:Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTSTRAP_SERVERS_CONFIG属性,显然就是指发送者要将消息发到哪个Kafka集群上。这是每个Producer必选的属性。在ProducerConfig中,对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。

2、构建消息:Kafka的消息是一个Key-Value结构的消息。其中,key和value都可以是任意对象类型。其中,key主要是用来进行Partition分区的,业务上更关心的是value。

3、使用Producer发送消息:通常用到的就是单向发送、同步发送和异步发送者三种发送方式。

异常信息: Expiring 1 record(s) for disTopic-0:120010 ms has passed since batch creation

修改kafka安装路径的config/server.properties配置,然后重启解决问题

# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://你的公网IP:9092

注意:云服务器配置安全组开放端口,虚拟机防火墙状态要关闭

消息消费者主流程

public class MyConsumer
{
    // private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";
    private static final String BOOTSTRAP_SERVERS = "你的公网IP:9092";
    
    private static final String TOPIC = "disTopic";
    
    public static void main(String[] args)
    {
        // PART1:设置发送者相关属性
        Properties props = new Properties();
        // kafka地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        // 每个消费者要指定一个group
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        // key序列化类
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        // value序列化类
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(TOPIC));
        // 自行调整Offset
        // consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC,0)));
        while (true)
        {
            // PART2:拉取消息
            // 100毫秒超时时间
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));
            // records.partitions().forEach(topicPartition -> {
            // String key = topicPartition.topic()+topicPartition.partition();
            // List<ConsumerRecord<String, String>> partionRecords = records.records(topicPartition);
            // long value = partionRecords.get(partionRecords.size()-1).offset();
            //
            // });
            // PART3:处理消息
            for (ConsumerRecord<String, String> record : records)
            {
                System.out.println("partition = " + record.partition() + "offset = " + record.offset() + ";key = "
                    + record.key() + "; value= " + record.value());
            }
            
            // 提交offset,消息就不会重复推送。
            consumer.commitSync(); // 同步提交,表示必须等到offset提交完毕,再去消费下一批数据。
            // consumer.commitAsync(); //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。
        }
    }
}

Consumer同样是分为三个步骤:

1、设置Consumer核心属性 :可选的属性都可以由ConsumerConfig类管理。在这个类中,同样对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。同样BOOTSTRAP_SERVERS_CONFIG是必须设置的属性。

2、拉取消息:Kafka采用Consumer主动拉取消息的Pull模式。consumer主动从Broker上拉取一批感兴趣的消息。

3、处理消息,提交位点:消费者将消息拉取完成后,就可以交由业务自行处理对应的这一批消息了。只是消费者需要向Broker提交偏移量offset。如果不提交Offset,Broker会认为消费者端消息处理失败了,还会重复进行推送。

kafka官方配置: Apache Kafka

从客户端属性来梳理客户端工作机制

Kafka的设计精髓,是在网络不稳定,服务也随时会崩溃的这些作死的复杂场景下,如何保证消息的高并发、高吞吐。首先要理解基础模型。

消费者分组消费机制

最重要的一个机制

在Consumer中,都需要指定一个GROUP_ID_CONFIG属性,表示当前Consumer所属的消费者组。

public static final String GROUP_ID_CONFIG = "group.id";
public static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy.";

//参数GROUP_INSTANCE_ID_CONFIG:给组成员设置一个固定的instanceId,可以减少Kafka不必要的rebalance

描述翻译:对于Consumer,如果需要在subcribe时使用组管理功能以及Kafka提供的offset管理策略,那就必须要配置GROUP_ID_CONFIG属性。

kafka客户端使用,kafka,分布式,java

消费者组作用:生产者往Topic下发消息时,会尽量均匀的将消息发送到Topic下的各个Partition当中。而这个消息,会向所有订阅了该Topic的消费者推送。推送时,每个ConsumerGroup中只会推送一份。也就是同一个消费者组中的多个消费者实例,只会共同消费一个消息副本。而不同消费者组之间,会重复消费消息副本。

Offset偏移量:表示每个消费者组在每个Partiton中已经消费处理的进度

#查看消费者组Offset记录情况
bin/kafka-consumer-groups.sh --bootstrap-server worker1:9092 --describe --group test

这个Offset偏移量,需要消费者处理完成后主动向Kafka的Broker提交。提交完成后,Broker就会更新消费进度,表示这个消息已经被这个消费者组处理完了。但是如果消费者没有提交Offset,Broker就会认为这个消息还没有被处理过,就会重新往对应的消费者组进行推送,不过这次,一般会尽量推送给同一个消费者组当中的其他消费者实例。

Kafka自动提交:

public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit";
private static final String ENABLE_AUTO_COMMIT_DOC = "If true the consumer's offset will be periodically committed in the background.";

如何提高Offset数据的安全性呢?

服务端:在Consumer中,实际上提供了AUTO_OFFSET_RESET_CONFIG参数,来指定消费者组在服务端的Offset不存在时如何进行后续消费。(有可能服务端初始化Consumer Group的Offset失败,也有可能Consumer Group当前的Offset对应的数据文件被过期删除了)

ConsumerConfig.AUTO_OFFSET_RESEWT_CONFIG :当Server端没有对应的Offset时如何处理。 可选项:

  • earliest: 自动设置为当前最早的offset

  • latest:自动设置为当前最晚的offset

  • none: 如果消费者组对应的offset找不到,就向Consumer抛异常。

  • 其他选项: 向Consumer抛异常。

客户端:

  • 异步提交:消费者在处理业务的同时,异步向Broker提交Offset。效率高,但是容易丢数据(处理失败但是offset提交了)

  • 同步提交:消费者保证处理完所有业务后,再提交Offset。消息不会因为offset丢失。业务处理失败不提交Offset,还可以重试。坏处是处理变慢,另外还可能重复消费(生产者推送消息给组内的其他消费者)

生产者拦截器机制

允许客户端在生产者消息发送到Kafka集群之前,对消息进行拦截,甚至可以修改消息内容

涉及到Producer中指定的一个参数:INTERCEPTOR_CLASSES_CONFIG

public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to use as interceptors. Implementing the <code>org.apache.kafka.clients.producer.ProducerInterceptor</code> interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.";

按照说明,定义一个自己的拦截器实现类:

public class MyInterceptor implements ProducerInterceptor
{
    // 发送消息时触发
    @Override
    public ProducerRecord onSend(ProducerRecord producerRecord)
    {
        System.out.println("prudocerRecord : " + producerRecord.toString());
        return producerRecord;
    }
    
    // 收到服务端响应时触发
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e)
    {
        System.out.println("acknowledgement recordMetadata:" + recordMetadata.toString());
    }
    
    // 连接关闭时触发
    @Override
    public void close()
    {
        System.out.println("producer closed");
    }
    
    // 整理配置项
    @Override
    public void configure(Map<String, ?> map)
    {
        System.out.println("=====config start======");
        for (Map.Entry<String, ?> entry : map.entrySet())
        {
            System.out.println("entry.key:" + entry.getKey() + " === entry.value: " + entry.getValue());
        }
        System.out.println("=====config end======");
    }
}

然后在生产者中指定拦截器类(多个拦截器类,用逗号隔开)

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.gao.kfk.basic.MyInterceptor");

拦截器机制一般用得比较少,主要用在一些统一添加时间等类似的业务场景。比如,用Kafka传递一些POJO,就可以用拦截器统一添加时间属性。但是我们平常用Kafka传递的都是String类型的消息,POJO类型的消息,Kafka可以传吗?这就要用到下面的消息序列化机制。

消息序列化机制

Producer指定了两个属性KEY_SERIALIZER_CLASS_CONFIG和VALUE_SERIALIZER_CLASS_CONFIG,对于这两个属性,在ProducerConfig中都有配套的说明属性。

public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
public static final String KEY_SERIALIZER_CLASS_DOC = "Serializer class for key that implements the <code>org.apache.kafka.common.serialization.Serializer</code> interface.";
public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
public static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>org.apache.kafka.common.serialization.Serializer</code> interface.";

通过这两个参数,可以指定消息生产者如何将消息的key和value序列化成二进制数据。

  • key是用来进行分区的可选项。Kafka通过key来判断消息要分发到哪个Partition。

    如果没有填写key,那么Kafka会使Round-robin轮询的方式,自动选择Partition。

    如果填写了key,那么会通过声明的Serializer序列化接口,将key转换成一个byte[]数组,然后对key进行hash,选择Partition。这样可以保证key相同的消息会分配到相同的Partition中。

  • Value是业务上比较关心的消息。Kafka同样需要将Value对象通过Serializer序列化接口,将Key转换成byte[]数组,这样才能比较好的在网络上传输Value信息,以及将Value信息落盘到操作系统的文件当中。

生产者要对消息进行序列化,那么消费者拉取消息时,自然需要进行反序列化。所以,在Consumer中,也有反序列化的两个配置

public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
public static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the <code>org.apache.kafka.common.serialization.Deserializer</code> interface.";
public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
public static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Deserializer</code> interface.";

在Kafka中,对于常用的一些基础数据类型,都已经提供了对应的实现类。但是,如果需要使用一些自定义的消息格式,比如自己定制的POJO,就需要定制具体的实现类了。

在自己进行序列化机制时,需要考虑的是如何用二进制来描述业务数据:

一种类型是定长的基础类型,比如Integer,Long,Double等。这些基础类型转化成二进制数组都是定长的。这类属性可以直接转成序列化数组,在反序列化时,只要按照定长去读取二进制数据就可以反序列化了。

另一种是不定长的浮动类型,比如String,或者基于String的JSON类型等。这种浮动类型的基础数据转化成二进制数组,长度都是不一定的。对于这类数据,通常的处理方式都是先往二进制数组中写入一个定长的数据的长度数据(Integer或者Long类型),然后再继续写入数据本身。这样,反序列化时,就可以先读取一个定长的长度,再按照这个长度去读取对应长度的二进制数据,这样就能读取到数据的完整二进制内容。

kafka客户端使用,kafka,分布式,java

序列化机制是在高并发场景中非常重要的一个优化机制。高效的序列化实现能够极大的提升分布式系统的网络传输以及数据落盘的能力。例如对于一个User对象,即可以使用JSON字符串这种简单粗暴的序列化方式,也可以选择按照各个字段进行组合序列化的方式。但是显然后者的占用空间比较小,序列化速度也会比较快。而Kafka在文件落盘时,也设计了非常高效的数据序列化实现,这也是Kafka高效运行的一大支撑。

在很多其他业务场景中,也需要我们提供更高效的序列化实现。例如使用MapReduce框架时,就需要自行定义数据的序列化方式。使用Netty框架进行网络调用时,为了防止粘包,也需要定制数据的序列化机制

消息分区路由机制

消息如何进行路由?

  • Producer会根据消息的key选择Partition,具体如何通过key找Partition呢?

  • 一个消费者组会共同消费一个Topic下的多个Partition中的同一套消息副本,那Consumer节点是不是可以决定自己消费哪些Partition的消息呢?

首先,在Producer中,可以指定一个Partitioner来对消息进行分配。

public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
private static final String PARTITIONER_CLASS_DOC = "A class to use to determine which partition to be send to when produce the records. Available options are:" +
    "<ul>" +
    "<li>If not set, the default partitioning logic is used. " +
    "This strategy will try sticking to a partition until at least " + BATCH_SIZE_CONFIG + " bytes is produced to the partition. It works with the strategy:" +
    "<ul>" +
    "<li>If no partition is specified but a key is present, choose a partition based on a hash of the key</li>" +
    "<li>If no partition or key is present, choose the sticky partition that changes when at least " + BATCH_SIZE_CONFIG + " bytes are produced to the partition.</li>" +
    "</ul>" +
    "</li>" +
    "<li><code>org.apache.kafka.clients.producer.RoundRobinPartitioner</code>: This partitioning strategy is that " +
    "each record in a series of consecutive records will be sent to a different partition(no matter if the 'key' is provided or not), " +
    "until we run out of partitions and start over again. Note: There's a known issue that will cause uneven distribution when new batch is created. " +
    "Please check KAFKA-9965 for more detail." +
    "</li>" +
    "</ul>" +
    "<p>Implementing the <code>org.apache.kafka.clients.producer.Partitioner</code> interface allows you to plug in a custom partitioner.";

这里就说明了Kafka是通过一个Partitioner接口的具体实现来决定一个消息如何根据Key分配到对应的Partition上的。可以很简单的实现一个自己的分配策略。

在之前的3.2.0版本,Kafka提供了三种默认的Partitioner实现类,RoundRobinPartitioner,DefaultPartitioner和UniformStickyPartitioner。目前后面两种实现已经标记为过期,被替换成了默认的实现机制。

对于生产者,默认的Sticky策略在给一个生产者分配了一个分区后,会尽可能一直使用这个分区。等待该分区的batch.size(默认16K)已满,或者这个分区的消息已完成 linger.ms(默认0毫秒,表示如果batch.size迟迟没有满后的等待时间)。RoundRobinPartitioner是在各个Partition中进行轮询发送,这种方式没有考虑到消息大小以及各个Broker性能差异,用得比较少。

另外可以自行指定一个Partitioner实现类,定制分区逻辑。在Partitioner接口中,核心要实现的就是partition方法。根据相关信息,选择一个Partition。比如用key对partition的个数取模之类的。而Topic下的所有Partition信息都在cluster参数中。

//获取所有的Partition信息。
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

然后,在Consumer中,可以指定一个PARTITION_ASSIGNMENT_STRATEGY分区分配策略,决定如何在多个Consumer实例和多个Partitioner之间建立关联关系。

public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "A list of class names or class types, " +
    "ordered by preference, of supported partition assignment strategies that the client will use to distribute " +
    "partition ownership amongst consumer instances when group management is used. Available options are:" +
    "<ul>" +
    "<li><code>org.apache.kafka.clients.consumer.RangeAssignor</code>: Assigns partitions on a per-topic basis.</li>" +
    "<li><code>org.apache.kafka.clients.consumer.RoundRobinAssignor</code>: Assigns partitions to consumers in a round-robin fashion.</li>" +
    "<li><code>org.apache.kafka.clients.consumer.StickyAssignor</code>: Guarantees an assignment that is " +
    "maximally balanced while preserving as many existing partition assignments as possible.</li>" +
    "<li><code>org.apache.kafka.clients.consumer.CooperativeStickyAssignor</code>: Follows the same StickyAssignor " +
    "logic, but allows for cooperative rebalancing.</li>" +
    "</ul>" +
    "<p>The default assignor is [RangeAssignor, CooperativeStickyAssignor], which will use the RangeAssignor by default, " +
    "but allows upgrading to the CooperativeStickyAssignor with just a single rolling bounce that removes the RangeAssignor from the list.</p>" +
    "<p>Implementing the <code>org.apache.kafka.clients.consumer.ConsumerPartitionAssignor</code> " +
    "interface allows you to plug in a custom assignment strategy.</p>";

同样,Kafka内置了一些实现方式,在通常情况下也都是最优的选择。可以实现自己的分配策略。

从上面介绍可以看到Kafka默认提供了三种消费者的分区分配策略

  • range策略: 比如一个Topic有10个Partiton(partition 0~9) 一个消费者组下有三个Consumer(consumer1~3)。Range策略就会将分区0~3分给一个Consumer,4~6给一个Consumer,7~9给一个Consumer。

  • round-robin策略:轮询分配策略,可以理解为在Consumer中一个一个轮流分配分区。比如0,3,6,9分区给一个Consumer,1,4,7分区给一个Consumer,然后2,5,8给一个Consumer

  • sticky策略:粘性策略。这个策略有两个原则:

    • 在开始分区时,尽量保持分区的分配均匀。比如按照Range策略分(这一步实际上是随机的)。

    • 分区的分配尽可能的与上一次分配的保持一致。比如在range分区的情况下,第三个Consumer的服务宕机了,那么按照sticky策略,就会保持consumer1和consumer2原有的分区分配情况。然后将consumer3分配的7~9分区尽量平均的分配到另外两个consumer上。这种粘性策略可以很好的保持Consumer的数据稳定性。

另外可以通过继承AbstractPartitionAssignor抽象类自定义消费者的订阅方式。

官方默认提供的生产者端的默认分区器以及消费者端的RangeAssignor+CooperativeStickyAssignor分配策略,在大部分场景下都是非常高效的算法。深入理解这些算法,对于你深入理解MQ场景,以及借此去横向对比理解其他的MQ产品,都是非常有帮助的。

那么在哪些场景下我们可以自己来定义分区器呢?例如如果在部署消费者时,如果我们的服务器配置不一样,就可以通过定制消费者分区器,让性能更好的服务器上的消费者消费较多的消息,而其他服务器上的消费者消费较少的消息,这样就能更合理的运用上消费者端的服务器性能,提升消费者的整体消费速度。

生产者消息缓存机制

Kafka生产者为了避免高并发请求对服务端造成过大压力,每次发消息时并不是一条一条发往服务端,而是增加了一个高速缓存,将消息集中到缓存后,批量进行发送。这种缓存机制也是高并发处理时非常常用的一种机制。

Kafka的消息缓存机制涉及到KafkaProducer中的两个关键组件: accumulator 和 sender

//1.记录累加器
int batchSize = Math.max(1, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
this.accumulator = new RecordAccumulator(logContext,batchSize,this.compressionType,lingerMs(config),retryBackoffMs,deliveryTimeoutMs, partitionerConfig,metrics,PRODUCER_METRIC_GROUP_NAME,time,apiVersions,transactionManager,new BufferPool(this.totalMemorySize, batchSize, metrics, time, PRODUCER_METRIC_GROUP_NAME));
//2. 数据发送线程
this.sender = newSender(logContext, kafkaClient, this.metadata);

其中RecordAccumulator,就是Kafka生产者的消息累加器。KafkaProducer要发送的消息都会在ReocrdAccumulator中缓存起来,然后再分批发送给kafka broker。

在RecordAccumulator中,会针对每一个Partition,维护一个Deque双端队列,这些Dequeue队列基本上是和Kafka服务端的Topic下的Partition对应的。每个Dequeue里会放入若干个ProducerBatch数据。KafkaProducer每次发送的消息,都会根据key分配到对应的Deque队列中。然后每个消息都会保存在这些队列中的某一个ProducerBatch中。而消息分发的规则,就是由上面的Partitioner组件完成的。

kafka客户端使用,kafka,分布式,java

这里主要涉及到两个参数

//RecordAccumulator缓冲区大小
public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";
private static final String BUFFER_MEMORY_DOC = "The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are "
                                                    + "sent faster than they can be delivered to the server the producer will block for <code>" + MAX_BLOCK_MS_CONFIG + "</code> after which it will throw an exception."
                                                    + "<p>"
                                                    + "This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since "
                                                    + "not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if "
                                                    + "compression is enabled) as well as for maintaining in-flight requests.";

//缓冲区每一个batch的大小
public static final String BATCH_SIZE_CONFIG = "batch.size";
private static final String BATCH_SIZE_DOC = "The producer will attempt to batch records together into fewer requests whenever multiple records are being sent"
                                                 + " to the same partition. This helps performance on both the client and the server. This configuration controls the "
                                                 + "default batch size in bytes. "
                                                 + "<p>"
                                                 + "No attempt will be made to batch records larger than this size. "
                                                 + "<p>"
                                                 + "Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent. "
                                                 + "<p>"
                                                 + "A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable "
                                                 + "batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a "
                                                 + "buffer of the specified batch size in anticipation of additional records."
                                                 + "<p>"
                                                 + "Note: This setting gives the upper bound of the batch size to be sent. If we have fewer than this many bytes accumulated "
                                                 + "for this partition, we will 'linger' for the <code>linger.ms</code> time waiting for more records to show up. "
                                                 + "This <code>linger.ms</code> setting defaults to 0, which means we'll immediately send out a record even the accumulated "
                                                 + "batch size is under this <code>batch.size</code> setting.";

这里面也提到了几个其他的参数,比如 MAX_BLOCK_MS_CONFIG ,默认60秒

接下来,sender就是KafkaProducer中用来发送消息的一个单独的线程。从这里可以看到,每个KafkaProducer对象都对应一个sender线程。他会负责将RecordAccumulator中的消息发送给Kafka。

kafka客户端使用,kafka,分布式,java

Sender也并不是一次就把RecordAccumulator中缓存的所有消息都发送出去,而是每次只拿一部分消息。他只获取RecordAccumulator中缓存内容达到BATCH_SIZE_CONFIG大小的ProducerBatch消息。当然,如果消息比较少,ProducerBatch中的消息大小长期达不到BATCH_SIZE_CONFIG的话,Sender也不会一直等待。最多等待LINGER_MS_CONFIG时长。然后就会将ProducerBatch中的消息读取出来。LINGER_MS_CONFIG默认值是0。

然后,Sender对读取出来的消息,会以Broker为key,缓存到一个对应的队列当中。这些队列当中的消息就称为InflightRequest。接下来这些Inflight就会一一发往Kafka对应的Broker中,直到收到Broker的响应,才会从队列中移除。这些队列也并不会无限缓存,最多缓存MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION(默认值为5)个请求。

生产者缓存机制的主要目的是将消息打包,减少网络IO频率。所以,在Sender的InflightRequest队列中,消息也不是一条一条发送给Broker的,而是一批消息一起往Broker发送。而这就意味着这一批消息是没有固定的先后顺序的。

其中涉及到的几个主要参数如下:

public static final String LINGER_MS_CONFIG = "linger.ms";
private static final String LINGER_MS_DOC = "The producer groups together any records that arrive in between request transmissions into a single batched request. "
    + "Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to "
    + "reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount "
    + "of artificial delay&mdash;that is, rather than immediately sending out a record, the producer will wait for up to "
    + "the given delay to allow other records to be sent so that the sends can be batched together. This can be thought "
    + "of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once "
    + "we get <code>" + BATCH_SIZE_CONFIG + "</code> worth of records for a partition it will be sent immediately regardless of this "
    + "setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the "
    + "specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting <code>" + LINGER_MS_CONFIG + "=5</code>, "
    + "for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.";



public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."
    + " Note that if this configuration is set to be greater than 1 and <code>enable.idempotence</code> is set to false, there is a risk of"
    + " message reordering after a failed send due to retries (i.e., if retries are enabled); "
    + " if retries are disabled or if <code>enable.idempotence</code> is set to true, ordering will be preserved."
    + " Additionally, enabling idempotence requires the value of this configuration to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "."
    + " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. ";

最后,Sender会通过其中的一个Selector组件完成与Kafka的IO请求,并接收Kafka的响应。

//org.apache.kafka.clients.producer.KafkaProducer#doSend
if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), appendCallbacks.getPartition());
                this.sender.wakeup();
            }

Kafka的生产者缓存机制是Kafka面对海量消息时非常重要的优化机制。合理优化这些参数,对于Kafka集群性能提升是非常重要的。比如如果你的消息体比较大,那么应该考虑加大batch.size,尽量提升batch的缓存效率。而如果Producer要发送的消息确实非常多,那么就需要考虑加大total.memory参数,尽量避免缓存不够造成的阻塞。如果发现生产者发送消息比较慢,那么可以考虑提升max.in.flight.requests.per.connection参数,这样能加大消息发送的吞吐量。

发送应答机制

在Producer将消息发送到Broker后,要怎么确定消息是不是成功发到Broker上了呢?

public static final String ACKS_CONFIG = "acks";
private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the "
									   + " durability of records that are sent. The following settings are allowed: "
									   + " <ul>"
									   + " <li><code>acks=0</code> If set to zero then the producer will not wait for any acknowledgment from the"
									   + " server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be"
									   + " made that the server has received the record in this case, and the <code>retries</code> configuration will not"
									   + " take effect (as the client won't generally know of any failures). The offset given back for each record will"
									   + " always be set to <code>-1</code>."
									   + " <li><code>acks=1</code> This will mean the leader will write the record to its local log but will respond"
									   + " without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after"
									   + " acknowledging the record but before the followers have replicated it then the record will be lost."
									   + " <li><code>acks=all</code> This means the leader will wait for the full set of in-sync replicas to"
									   + " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica"
									   + " remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting."
									   + "</ul>"
									   + "<p>"
									   + "Note that enabling idempotence requires this config value to be 'all'."
									   + " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.";
  • acks=0,生产者不关心Broker端有没有将消息写入到Partition,只发送消息。吞吐量最高,数据安全性最低。

  • acks=all or -1,生产者需要等Broker端的所有Partiton(Leader Partition以及其对应的Follower Partition都写完了才能得到返回结果,数据最安全,但是发消息等待更长时间,吞吐量最低。

  • acks设置成1,Leader Partition在完成自己的消息写入后,就向生产者返回结果。

在生产环境中,acks=0可靠性太差,很少使用。acks=1,一般用于传输日志等,允许个别数据丢失的场景。使用范围最广。acks=-1,一般用于传输敏感数据。

ack设置为all或者-1 ,Kafka也并不是强制要求所有Partition都写入数据后才响应。在Kafka的Broker服务端会有一个配置参数min.insync.replicas,控制Leader Partition在完成多少个Partition的消息写入后,往Producer返回响应。这个参数可以在broker.conf文件中进行配置。

min.insync.replicas
When a producer sets acks to "all" (or "-1"), min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend).
When used together, min.insync.replicas and acks allow you to enforce greater durability guarantees. A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with acks of "all". This will ensure that the producer raises an exception if a majority of replicas do not receive a write.

Type:	int
Default:	1
Valid Values:	[1,...]
Importance:	high
Update Mode:	cluster-wide

生产者消息幂等性

idempotence:幂等性

Kafka如何保证无论Producer向Broker发送多少次重复的数据,Broker端都只保留一条消息,而不会重复保存多条消息呢?这就是Kafka消息生产者的幂等性问题。

public static final String ENABLE_IDEMPOTENCE_CONFIG = "enable.idempotence";
public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer " 
	+ "retries due to broker failures, etc., may write duplicates of the retried message in the stream. "
	+ "Note that enabling idempotence requires <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE
	+ " (with message ordering preserved for any allowable value), <code>" + RETRIES_CONFIG + "</code> to be greater than 0, and <code>"
	+ ACKS_CONFIG + "</code> must be 'all'. "
	+ "<p>"
	+ "Idempotence is enabled by default if no conflicting configurations are set. "
	+ "If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. "
	+ "If idempotence is explicitly enabled and conflicting configurations are set, a <code>ConfigException</code> is thrown.";

// max.in.flight.requests.per.connection should be less than or equal to 5 when idempotence producer enabled to ensure message ordering
private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE = 5;

/** <code>max.in.flight.requests.per.connection</code> */
public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."
																		+ " Note that if this config is set to be greater than 1 and <code>enable.idempotence</code> is set to false, there is a risk of"
																		+ " message re-ordering after a failed send due to retries (i.e., if retries are enabled)."
																		+ " Additionally, enabling idempotence requires this config value to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "."
																		+ " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.";

理解分布式数据传递过程中的三个数据语义:at-least-once:至少一次;at-most-once:最多一次;exactly-once:精确一次。

at-least-once可以保证数据不丢失,但是不能保证数据不重复,ack级别设置为1或-1

at-most-once保证数据不重复,但是又不能保证数据不丢失,ack级别设置为0

Kafka为了保证消息发送的Exactly-once语义,增加了几个概念:

  • PID:为每个新的Producer在初始化过程中分配一个唯一的PID。这个PID对用户不可见。

  • Sequence Numer: 对于每个PID,这个Producer针对Partition会维护一个sequenceNumber。这是一个从0开始单调递增的数字。当Producer往同一个Partition发送消息时,这个Sequence Number就会加1。然后会随着消息一起发往Broker。

  • Broker端则会针对每个<PID,Partition>维护一个序列号(SN),只有当对应的SequenceNumber = SN+1时,Broker才会接收消息,同时将SN更新为SN+1。否则,SequenceNumber过小就认为消息已经写入了,不需要再重复写入。而如果SequenceNumber过大,就会认为中间可能有数据丢失了。对生产者就会抛出一个OutOfOrderSequenceException。

Kafka在打开idempotence幂等性控制后,在Broker端就会保证每条消息在一次发送过程中,Broker端最多只会刚刚好持久化一条。这样就能保证at-most-once语义。再加上将生产者的acks参数设置成1或-1,保证at-least-once语义,这样就整体上保证了Exactaly-once语义。

kafka客户端使用,kafka,分布式,java

生产者消息事务

通过生产者消息幂等性问题,能够解决单生产者消息写入单分区的的幂等性问题。

但是如果写入多个分区Partition,而这些Partition分布在不同Broker上。这时候就需要有一个事务机制,保证这一批消息同时成功的保持幂等性。或者这一批消息同时失败,这样生产者就可以开始进行整体重试。

Kafka引入了消息事务机制:

// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 提交事务
void commitTransaction() throws ProducerFencedException;
// 4 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

测试示例:

public class TransactionErrorDemo {

    private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";
    private static final String TOPIC = "disTopic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        // 此处配置的是kafka的端口
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        // 事务ID
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"111");
        // 配置key的序列化类
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        // 配置value的序列化类
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        Producer<String,String> producer = new KafkaProducer<>(props);
        producer.initTransactions();
        producer.beginTransaction();
        for(int i = 0; i < 5; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);
            //异步发送。
            producer.send(record);
            if(i == 3){
                //第三条消息放弃事务之后,整个这一批消息都回退了。
                System.out.println("error");
                producer.abortTransaction();
            }
        }
        System.out.println("message sended");
        try {
            Thread.sleep(10000);
        } catch (Exception e) {
            e.printStackTrace();
        }
//        producer.commitTransaction();
        producer.close();
    }
}

实际上,Kafka的事务消息还会做两件事情:

1、一个TransactionId只会对应一个PID

如果当前一个Producer的事务没有提交,而另一个新的Producer保持相同的TransactionId,这时旧的生产者会立即失效,无法继续发送消息。

2、跨会话事务对齐

如果某个Producer实例异常宕机了,事务没有被正常提交。那么新的TransactionId相同的Producer实例会对旧的事务进行补齐。保证旧事务要么提交,要么终止。这样新的Producer实例就可以以一个正常的状态开始工作。

一个Producer需要发送多条消息,通常比较安全的发送方式:

public class TransactionProducer {
    private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";
    private static final String TOPIC = "disTopic";
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        // 此处配置的是kafka的端口
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        // 事务ID。
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"111");
        // 配置key的序列化类
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        // 配置value的序列化类
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        Producer<String,String> producer = new KafkaProducer<>(props);
        producer.initTransactions();
        producer.beginTransaction();
        try{
            for(int i = 0; i < 5; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);
                //异步发送。
                producer.send(record);
            }
            producer.commitTransaction();
        }catch (ProducerFencedException e){
            producer.abortTransaction();
        }finally {
            producer.close();
        }
    }
}

事务ID这个参数可以任意起名,但是建议包含一定的业务唯一性。

生产者的事务消息机制保证了Producer发送消息的安全性,但并不保证已经提交的消息就一定能被所有消费者消费。

客户端流程总结

这些属性可以根据ProducerConfig和ConsumerConfig以及他们的父类CommonClientConfig去理解。

kafka客户端使用,kafka,分布式,java文章来源地址https://www.toymoban.com/news/detail-835155.html

SpringBoot集成Kafka

1、在SpringBoot项目中,引入Maven依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2、在application.properties中配置kafka相关参数

###########【Kafka集群】###########
spring.kafka.bootstrap-servers=worker1:9092,worker2:9093,worker3:9093
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3、应用中使用框架注入的KafkaTemplate发送消息

@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    // 发送消息
    @GetMapping("/kafka/normal/{message}")
    public void sendMessage1(@PathVariable("message") String normalMessage) {
        kafkaTemplate.send("topic1", normalMessage);
    }
}

4、使用@KafkaListener注解声明消息消费者

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic1"})
    public void onMessage1(ConsumerRecord<?, ?> record){
        // 消费的哪个topic、partition的消息,打印出消息内容
        System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }
}

到了这里,关于Kafka-客户端使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 【实践篇】Redis最强Java客户端(三)之Redisson 7种分布式锁使用指南

    【实践篇】Redis最强Java客户端(三)之Redisson 7种分布式锁使用指南

    前两章我们了解了《【实践篇】Redis最强Java客户端(一)之Redisson入门介绍》和《【实践篇】Redis最强Java客户端(二)之Redisson基础概念》本章第三章主要介绍Redisson的七种分布式锁,分别是简单锁、公平锁、可重入锁、红锁、读写锁、信号量和闭锁。下面是每种锁的基本概念、使用

    2024年02月09日
    浏览(16)
  • 分布式软件架构——客户端缓存

    分布式软件架构——客户端缓存

    当万维网刚刚出现的时候,浏览器的缓存机制差不多就已经存在了。在 HTTP 协议设计之初,人们便确定了服务端与客户端之间“无状态”(Stateless)的交互原则,即要求客户端的每次请求是独立的,每次请求无法感知、也不能依赖另一个请求的存在,这既简化了 HTTP 服务器的

    2024年02月12日
    浏览(49)
  • kafka 02——三个重要的kafka客户端

    kafka 02——三个重要的kafka客户端

    请参考下面的文章: Kafka 01——Kafka的安装及简单入门使用. AdminClient API: 允许管理和检测Topic、Broker以及其他Kafka对象。 Producer API: 发布消息到一个或多个API。 Consumer API: 订阅一个或多个Topic,并处理产生的消息。 如下: 完整的pom 关于配置,可参考官网: https://kafka.apa

    2024年02月13日
    浏览(20)
  • kafka客户端工具(Kafka Tool)的安装

    kafka客户端工具(Kafka Tool)的安装

    官方下载 根据不同的系统下载对应的版本,点击下载后双击,如何一直下一步,安装 kafka环境搭建请参考:CentOS 搭建Kafka集群 (1)连接kafka (2)简单使用  

    2024年04月23日
    浏览(22)
  • kafka客户端应用参数详解

    kafka客户端应用参数详解

    Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可: 1、消息发送者主流程  然后可以使用Kafka提供的Producer类,快速发送消息。 ​ 整体来说,构建Producer分为三个步骤: 设置Producer核心属性  :Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTST

    2024年02月07日
    浏览(15)
  • kafka:java集成 kafka(springboot集成、客户端集成)

    kafka:java集成 kafka(springboot集成、客户端集成)

    摘要 对于java的kafka集成,一般选用springboot集成kafka,但可能由于对接方kafka老旧、kafka不安全等问题导致kafak版本与spring版本不兼容,这个时候就得自己根据kafka客户端api集成了。 一、springboot集成kafka 具体官方文档地址:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/

    2023年04月22日
    浏览(47)
  • 一个基于Kafka客户端封装的工具,Kafka开发效率神器

    一个基于Kafka客户端封装的工具,Kafka开发效率神器

    GitHub源码https://github.com/zhangchuangiie/SimpleKafka 一个基于Kafka客户端封装的工具,Kafka开发效率神器 封装了常用的Kafka客户端操作,无需维护配置,无需初始化客户端,真正实现了一行代码调用 将连接池的维护封装在工具类里面,多线程使用也无需维护客户端集合 只需要集成1个

    2024年02月05日
    浏览(23)
  • 【Kafka】Kafka客户端认证失败:Cluster authorization failed.

    【Kafka】Kafka客户端认证失败:Cluster authorization failed.

    kafka客户端是公司内部基于spring-kafka封装的 spring-boot版本:3.x spring-kafka版本:2.1.11.RELEASE 集群认证方式:SASL_PLAINTEXT/SCRAM-SHA-512 经过多年的经验,以及实际验证,配置是没问题的,但是业务方反馈用相同的配置,还是报错! 封装的kafka客户端版本过低,高版本的配置项:secu

    2024年01月17日
    浏览(12)
  • Kafka客户端程序无法连接到Kafka集群的解决方法

    Kafka是一个高性能、分布式的流式数据平台,广泛用于构建实时数据流处理应用程序。然而,有时候我们可能会遇到Kafka客户端程序无法连接到Kafka集群的问题。在本文中,我将介绍一些可能导致连接问题的常见原因,并提供相应的解决方案。 网络配置问题 首先,确保Kafka集群

    2024年01月21日
    浏览(13)
  • kafka之java客户端实战

    kafka之java客户端实战

            Kafka提供了两套客户端API, HighLevel API和LowLevel API 。 HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,

    2024年01月17日
    浏览(11)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包