SpringBoot 集成 Kafka 高级实现

这篇具有很好参考价值的文章主要介绍了SpringBoot 集成 Kafka 高级实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、简介

        之前博客中记录了直接使用Kafka客户端实现生产者和消费者之间的交互,这种方式通过设置各种参数编码繁琐,因此通过SpringBoot集成Kafka成为一种常用的实现,下面就详细介绍 SpringBoot 是如何和Kafka进行集成的,本文主要参考官网进行学习(Messaging)。

2、引入依赖
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
3、生产者
3.1、生产者配置文件
spring:
  kafka:
    bootstrap-servers: node-1:9092,node-2:9092
    producer:
      # 指定key-value 序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 指定缓冲区一批大小,默认16k
      batch-size: *
      # 指定缓冲区总大小,默认32m
      buffer-memory: *
      # 指定消息确认方式
      acks: -1
      # 指定消息发送压缩方式
      compression-type: snappy
      # 配置额外参数
      properties:
        # 自定义分区
        partitioner.class:
        # 指定发送延迟时间,默认0ms
        linger.ms: 
3.2、java代码实现生产者发送消息
public class KafkaProducerController {
    @Autowired
    private KafkaTemplate<String, String> template;

    public String test(){
// 发送消息,send方法有不同的重载
        template.send("topic1", "hello long!");
        return "ok!";
    }
}
3.3、自定义 KafkaTemplate

         通过自定义的 KafkaTemplate 可以快速指定需要的生产者参数,能够做到高度可控,灵活编码。

@SpringBootConfiguration
@ConfigurationProperties(prefix="kafka")
public class KafkaProducerConfig {
    private String bootstrap_servers_config;
    private String retries_config;
    private String batch_size_config;
    private String linger_ms_config;
    private String buffer_memory_config;
    private String topic;
    @Bean
    public KafkaTemplate kafkaTemplate(){
        HashMap<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrap_servers_config);
        configs.put(ProducerConfig.RETRIES_CONFIG,retries_config);
        configs.put(ProducerConfig.BATCH_SIZE_CONFIG,batch_size_config);
        configs.put(ProducerConfig.LINGER_MS_CONFIG,linger_ms_config);
        configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG,buffer_memory_config);
        configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,RoundRobinPartitioner.class);
        //设置序列化
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        //设置自定义分区
        DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(configs);
        return new KafkaTemplate(producerFactory);
    }
}
4、消费者
4.1、消费者配置文件
spring:
  kafka:
    bootstrap-servers: node-1:9092,node-2:9092
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: test  # 指定消费者组
4.2、消费者java代码实现
@SpringBootConfiguration
public class KafkaConsumerConfig {
    @KafkaListener(topics = {"topic1"})
    public void consumer(String msg){
        System.out.println("consumer massage from kafka: " + msg);
    }
}
4.3、@KafkaListener参数详解

注:topicPartitions和topics、topicPattern不能同时使用

示例如下:

@KafkaListener(id = "test1", // 监听器ID(唯一)
               groupId = "test", // 设置消费者组
               topicPartitions = { // 配置topic和分区:有两个topic,分别为topic1、topic2,topic1只接收分区0,2的消息;
                    // topic2接收分区0和分区1的消息,但是分区1的消费者初始位置为10
                    @TopicPartition(topic = "topic1", partitions = { "0", "2" }),                      
                    @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = 
                                 @PartitionOffset(partition = "1", initialOffset = "9"))},
                    properties = {"enable.auto.commit:false","max.poll.interval.ms:6000"})

参数详解: 

参数 描述

 topic

指定要监听哪些topic(与topicPattern、topicPartitions 三选一)
topicPattern 匹配Topic进行监听(与topics、topicPartitions 三选一)
topicPartitions 显式分区分配
errorHandler 异常处理,填写 beanName
properties 配置其他属性

注意:

1)、消费者组使用优先级:groupId > id > 配置文件中指定的消费者组。

2)、如果 groupId 不存在,id 存在,但是在注解中将 idIsGroup 设置为 false,则使用配置文件中的消费者组。

3)、@KafkaListener 注解中和配置文件中的相同配置优先级高于配置文件。

4.4、自定义消费者异常
@Component
public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
        return null;
    }

    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
    	//do someting
        return null;
    }
}
5、总结

        本文详细介绍 SpringBoot 集成 kafka,举例说明生产者和消费者的使用方式,以及一些自定义参数如何配置,帮助大家进一步熟悉在 SpringBoot 框架下 kafka的使用。关于kafka 幂等性、事务等更高级用法,将会在公众号分享。

        本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:it自学社团。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)

SpringBoot 集成 Kafka 高级实现,java技术,kafka,spring boot,kafka,后端文章来源地址https://www.toymoban.com/news/detail-793935.html

到了这里,关于SpringBoot 集成 Kafka 高级实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Spring Boot】集成Kafka实现消息发送和订阅

    最近忙着搞低代码开发,好久没新建spring项目了,结果今天心血来潮准备建个springboot项目 注意Type选Maven,java选8,其他默认 点下一步后完成就新建了一个spring boot项目,配置下Maven环境,主要是settings.xml文件,里面要包含阿里云仓库,不然可能依赖下载不下来 在maven配置没问

    2024年02月09日
    浏览(43)
  • SpringBoot3集成Kafka优雅实现信息消费发送

           首先,你的JDK是否已经是8+了呢?        其次,你是否已经用上SpringBoot3了呢?        最后,这次分享的是SpringBoot3下的kafka发信息与消费信息。        这次的场景是springboot3+多数据源的数据交换中心(数仓)需要消费Kafka里的上游推送信息,这里做数据

    2024年02月02日
    浏览(51)
  • HBase高级特性:HBase与Kafka集成

    HBase高级特性:HBase与Kafka集成 HBase是一个分布式、可扩展、高性能的列式存储系统,基于Google的Bigtable设计。它可以存储大量数据,并提供快速的随机读写访问。HBase是Hadoop生态系统的一部分,可以与HDFS、ZooKeeper等其他组件集成。 Kafka是一个分布式流处理平台,可以处理实时

    2024年02月21日
    浏览(26)
  • Kafka:springboot集成kafka收发消息

    kafka环境搭建参考Kafka:安装和配置_moreCalm的博客-CSDN博客 1、springboot中引入kafka依赖 2、配置application.yml 传递String类型的消息 3、controller实现消息发送接口 4、component中实现接收类HelloListener  5、测试 浏览器访问该接口并查看控制台         接收成功   传递对象类型的消息

    2024年02月13日
    浏览(38)
  • 在Spring Boot微服务集成Kafka客户端(spring-kafka)操作Kafka

    记录 :457 场景 :在Spring Boot微服务集成Kafka客户端spring-kafka-2.8.2操作Kafka。使用Spring封装的KafkaTemplate操作Kafka生产者Producer。使用Spring封装的@KafkaListener操作Kafka的消费者Consumer。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka安装 :https://blog.csdn.net/zhangbeizhen1

    2024年02月09日
    浏览(51)
  • Spring集成Kafka

    我负责的其中一个项目,接口的交互量在千万级/d,所以要存储大量的日志,为了防止日志的存储影响到系统的性能,所以在技术选型就决定了使用Kafka中间件和一个日志存储系统来负责日志的存储。 使用Kafka 的优点: 1.Kafka 是一种高吞吐量的分布式消息系统,可以支持水平

    2024年02月14日
    浏览(29)
  • 在Spring Boot微服务集成spring-kafka操作Kafka集群

    记录 :461 场景 :在Spring Boot微服务集成spring-kafka-2.8.2操作Kafka集群。使用KafkaTemplate操作Kafka集群的生产者Producer。使用@KafkaListener操作Kafka集群的消费者Consumer。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka集群安装 :https://blog.csdn.net/zhangbeizhen18/article/details

    2024年02月10日
    浏览(43)
  • SpringBoot3.1.7集成Kafka和Kafka安装

    我们在很多系统开发都需要用到消息中间件,目前来说Kafka凭借其优秀的性能,使得它的使用率已经是名列前茅了,所以今天我们将它应用到我们的系统 在使用一个中间件一定要考虑版本的兼容性,否则后面会遇到很多问题,首先我们打开Spring的官网:Spring for Apache Kafka Spr

    2024年01月23日
    浏览(40)
  • Springboot Kafka 集成配置

    Springboot 配置使用 Kafka 前言 一、Linux 安装 Kafka 二、构建项目 三、引入依赖 四、配置文件 生产者 yml 方式 Config 方式 消费者 yml 方式 Config 方式 五、开始写代码 生产者 发送 成功回调和异常处理 消费者 接收 异常处理 七、开始测试 测试普通单条消息 测试消费者异常处理 测试

    2024年02月08日
    浏览(43)
  • SpringBoot 集成 Kafka 配置

    自定义分区器 生产者 消费者 配置文件 自定义分区器 生产者 消费者配置 消费数据过滤 消费异常处理类 消费者配置  @KafkaListener

    2024年02月15日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包