SpringBoot 集成 Kafka 高级实现

这篇具有很好参考价值的文章主要介绍了SpringBoot 集成 Kafka 高级实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、简介

        之前博客中记录了直接使用Kafka客户端实现生产者和消费者之间的交互,这种方式通过设置各种参数编码繁琐,因此通过SpringBoot集成Kafka成为一种常用的实现,下面就详细介绍 SpringBoot 是如何和Kafka进行集成的,本文主要参考官网进行学习(Messaging)。

2、引入依赖
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
3、生产者
3.1、生产者配置文件
spring:
  kafka:
    bootstrap-servers: node-1:9092,node-2:9092
    producer:
      # 指定key-value 序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 指定缓冲区一批大小,默认16k
      batch-size: *
      # 指定缓冲区总大小,默认32m
      buffer-memory: *
      # 指定消息确认方式
      acks: -1
      # 指定消息发送压缩方式
      compression-type: snappy
      # 配置额外参数
      properties:
        # 自定义分区
        partitioner.class:
        # 指定发送延迟时间,默认0ms
        linger.ms: 
3.2、java代码实现生产者发送消息
public class KafkaProducerController {
    @Autowired
    private KafkaTemplate<String, String> template;

    public String test(){
// 发送消息,send方法有不同的重载
        template.send("topic1", "hello long!");
        return "ok!";
    }
}
3.3、自定义 KafkaTemplate

         通过自定义的 KafkaTemplate 可以快速指定需要的生产者参数,能够做到高度可控,灵活编码。

@SpringBootConfiguration
@ConfigurationProperties(prefix="kafka")
public class KafkaProducerConfig {
    private String bootstrap_servers_config;
    private String retries_config;
    private String batch_size_config;
    private String linger_ms_config;
    private String buffer_memory_config;
    private String topic;
    @Bean
    public KafkaTemplate kafkaTemplate(){
        HashMap<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrap_servers_config);
        configs.put(ProducerConfig.RETRIES_CONFIG,retries_config);
        configs.put(ProducerConfig.BATCH_SIZE_CONFIG,batch_size_config);
        configs.put(ProducerConfig.LINGER_MS_CONFIG,linger_ms_config);
        configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG,buffer_memory_config);
        configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,RoundRobinPartitioner.class);
        //设置序列化
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        //设置自定义分区
        DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(configs);
        return new KafkaTemplate(producerFactory);
    }
}
4、消费者
4.1、消费者配置文件
spring:
  kafka:
    bootstrap-servers: node-1:9092,node-2:9092
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: test  # 指定消费者组
4.2、消费者java代码实现
@SpringBootConfiguration
public class KafkaConsumerConfig {
    @KafkaListener(topics = {"topic1"})
    public void consumer(String msg){
        System.out.println("consumer massage from kafka: " + msg);
    }
}
4.3、@KafkaListener参数详解

注:topicPartitions和topics、topicPattern不能同时使用

示例如下:

@KafkaListener(id = "test1", // 监听器ID(唯一)
               groupId = "test", // 设置消费者组
               topicPartitions = { // 配置topic和分区:有两个topic,分别为topic1、topic2,topic1只接收分区0,2的消息;
                    // topic2接收分区0和分区1的消息,但是分区1的消费者初始位置为10
                    @TopicPartition(topic = "topic1", partitions = { "0", "2" }),                      
                    @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = 
                                 @PartitionOffset(partition = "1", initialOffset = "9"))},
                    properties = {"enable.auto.commit:false","max.poll.interval.ms:6000"})

参数详解: 

参数 描述

 topic

指定要监听哪些topic(与topicPattern、topicPartitions 三选一)
topicPattern 匹配Topic进行监听(与topics、topicPartitions 三选一)
topicPartitions 显式分区分配
errorHandler 异常处理,填写 beanName
properties 配置其他属性

注意:

1)、消费者组使用优先级:groupId > id > 配置文件中指定的消费者组。

2)、如果 groupId 不存在,id 存在,但是在注解中将 idIsGroup 设置为 false,则使用配置文件中的消费者组。

3)、@KafkaListener 注解中和配置文件中的相同配置优先级高于配置文件。

4.4、自定义消费者异常
@Component
public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
        return null;
    }

    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
    	//do someting
        return null;
    }
}
5、总结

        本文详细介绍 SpringBoot 集成 kafka,举例说明生产者和消费者的使用方式,以及一些自定义参数如何配置,帮助大家进一步熟悉在 SpringBoot 框架下 kafka的使用。关于kafka 幂等性、事务等更高级用法,将会在公众号分享。

        本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:it自学社团。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)

SpringBoot 集成 Kafka 高级实现,java技术,kafka,spring boot,kafka,后端文章来源地址https://www.toymoban.com/news/detail-793935.html

到了这里,关于SpringBoot 集成 Kafka 高级实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Spring Boot】集成Kafka实现消息发送和订阅

    最近忙着搞低代码开发,好久没新建spring项目了,结果今天心血来潮准备建个springboot项目 注意Type选Maven,java选8,其他默认 点下一步后完成就新建了一个spring boot项目,配置下Maven环境,主要是settings.xml文件,里面要包含阿里云仓库,不然可能依赖下载不下来 在maven配置没问

    2024年02月09日
    浏览(22)
  • SpringBoot3集成Kafka优雅实现信息消费发送

           首先,你的JDK是否已经是8+了呢?        其次,你是否已经用上SpringBoot3了呢?        最后,这次分享的是SpringBoot3下的kafka发信息与消费信息。        这次的场景是springboot3+多数据源的数据交换中心(数仓)需要消费Kafka里的上游推送信息,这里做数据

    2024年02月02日
    浏览(22)
  • HBase高级特性:HBase与Kafka集成

    HBase高级特性:HBase与Kafka集成 HBase是一个分布式、可扩展、高性能的列式存储系统,基于Google的Bigtable设计。它可以存储大量数据,并提供快速的随机读写访问。HBase是Hadoop生态系统的一部分,可以与HDFS、ZooKeeper等其他组件集成。 Kafka是一个分布式流处理平台,可以处理实时

    2024年02月21日
    浏览(18)
  • Kafka:springboot集成kafka收发消息

    kafka环境搭建参考Kafka:安装和配置_moreCalm的博客-CSDN博客 1、springboot中引入kafka依赖 2、配置application.yml 传递String类型的消息 3、controller实现消息发送接口 4、component中实现接收类HelloListener  5、测试 浏览器访问该接口并查看控制台         接收成功   传递对象类型的消息

    2024年02月13日
    浏览(19)
  • 在Spring Boot微服务集成Kafka客户端(spring-kafka)操作Kafka

    记录 :457 场景 :在Spring Boot微服务集成Kafka客户端spring-kafka-2.8.2操作Kafka。使用Spring封装的KafkaTemplate操作Kafka生产者Producer。使用Spring封装的@KafkaListener操作Kafka的消费者Consumer。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka安装 :https://blog.csdn.net/zhangbeizhen1

    2024年02月09日
    浏览(33)
  • Spring集成Kafka

    我负责的其中一个项目,接口的交互量在千万级/d,所以要存储大量的日志,为了防止日志的存储影响到系统的性能,所以在技术选型就决定了使用Kafka中间件和一个日志存储系统来负责日志的存储。 使用Kafka 的优点: 1.Kafka 是一种高吞吐量的分布式消息系统,可以支持水平

    2024年02月14日
    浏览(15)
  • 在Spring Boot微服务集成spring-kafka操作Kafka集群

    记录 :461 场景 :在Spring Boot微服务集成spring-kafka-2.8.2操作Kafka集群。使用KafkaTemplate操作Kafka集群的生产者Producer。使用@KafkaListener操作Kafka集群的消费者Consumer。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka集群安装 :https://blog.csdn.net/zhangbeizhen18/article/details

    2024年02月10日
    浏览(25)
  • SpringBoot3.1.7集成Kafka和Kafka安装

    我们在很多系统开发都需要用到消息中间件,目前来说Kafka凭借其优秀的性能,使得它的使用率已经是名列前茅了,所以今天我们将它应用到我们的系统 在使用一个中间件一定要考虑版本的兼容性,否则后面会遇到很多问题,首先我们打开Spring的官网:Spring for Apache Kafka Spr

    2024年01月23日
    浏览(15)
  • Springboot 集成kafka

    一、创建项目并导入pom依赖 二、修改application.yml配置 1. producer 生产端的配置 2. consumer 消费端的配置,需要给consumer配置一个group-id 三、生产者生产消息,消费者消费消息 1. 简单消费 producer生产者中使用自动注入的方式创建KafkaTemplate 对象 consumer消费消息,使用@KafkaListener注解

    2024年02月09日
    浏览(17)
  • SpringBoot 集成 Kafka 配置

    自定义分区器 生产者 消费者 配置文件 自定义分区器 生产者 消费者配置 消费数据过滤 消费异常处理类 消费者配置  @KafkaListener

    2024年02月15日
    浏览(16)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包