SpringBoot3集成Kafka优雅实现信息消费发送

这篇具有很好参考价值的文章主要介绍了SpringBoot3集成Kafka优雅实现信息消费发送。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

       首先,你的JDK是否已经是8+了呢?
       其次,你是否已经用上SpringBoot3了呢?
       最后,这次分享的是SpringBoot3下的kafka发信息与消费信息。


一、场景说明

       这次的场景是springboot3+多数据源的数据交换中心(数仓)需要消费Kafka里的上游推送信息,这里做数据解析处理入库TDengine。

二、使用步骤

1.引入库

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

       简简单单,就这一个依赖就够了。

2.配置

spring:
  #kafka配置
  kafka:
    #bootstrap-servers: 192.168.200.72:9092,192.168.200.73:9092
    #bootstrap-servers: 192.168.200.83:9092,192.168.200.84:9092
    bootstrap-servers: localhost:9092
    client-id: dc-device-flow-analyze
    consumer:
      group-id: dc-device-flow-analyze-consumer-group
      max-poll-records: 10
      #Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项 【latest, earliest, none】
      auto-offset-reset: earliest
      #是否开启自动提交
      enable-auto-commit: false
      #自动提交的时间间隔
      auto-commit-interval: 1000
    producer:
      acks: 1
      batch-size: 4096
      buffer-memory: 40960000
      client-id: dc-device-flow-analyze-producer
      compression-type: zstd
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      retries: 3
      properties:
        spring.json.add.type.headers: false
        max.request.size: 126951500
    listener:
      ack-mode: MANUAL_IMMEDIATE
      concurrency: 1  #推荐设置为topic的分区数
      type: BATCH #开启批量监听

#消费topic配置
xiaotian:
  analyze:
    device:
      flow:
        topic:
          consumer: device-flow

3.消费


import com.xiaotian.datagenius.service.DataTransService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * 消费者listener
 *
 * @author zhengwen
 **/
@Slf4j
@Component
public class KafkaListenConsumer {

    @Autowired
    private DataTransService dataTransService;

    /**
     * 设备流水listenner
     *
     * @param records 消费信息
     * @param ack     Ack机制
     */
    @KafkaListener(topics = "${xiaotian.analyze.device.flow.topic.consumer}")
    public void deviceFlowListen(List<ConsumerRecord> records, Acknowledgment ack) {
        log.debug("=====设备流水deviceFlowListen消费者接收信息====");
        try {

            for (ConsumerRecord record : records) {
                log.debug("---开启线程解析设备流水数据:{}", record.toString());
                //具体service里取做逻辑
                dataTransService.deviceFlowTransSave(record);
            }

        } catch (Exception e) {
            log.error("----设备流水数据消费者解析数据异常:{}", e.getMessage(), e);
        } finally {
            //手动提交偏移量
            ack.acknowledge();
        }
    }


}

       消费与SpringBoot2的写法一样,没有任何改变。

4.发布信息


import cn.hutool.json.JSON;
import cn.hutool.json.JSONUtil;
import com.easylinkin.datagenius.core.Result;
import com.easylinkin.datagenius.core.ResultGenerator;
import com.easylinkin.datagenius.vo.KafkaMessageVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;

/**
 * kafka信息管理
 *
 * @author zhengwen
 **/
@Slf4j
@RestController
@RequestMapping("/kafka/push")
public class KafkaPushController {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * kafka的信息push发送
     *
     * @param kafkaMessageVo kafka信息对象
     * @return 推送结果
     */
    @PostMapping("/sendMsg")
    public Result sendMsg(@RequestBody KafkaMessageVo kafkaMessageVo) {

        String topic = kafkaMessageVo.getTopic();
        String msg = kafkaMessageVo.getMessage();
        log.debug(msg);
      
        JSON msgJson = JSONUtil.parseObj(msg);
        /* springboot2的写法
        ListenableFuture<SendResult<String, Object>> listenableFuture =  kafkaTemplate.send(topic, UUID.randomUUID().toString(), msgJson);

        //发送成功后回调
        SuccessCallback successCallback = new SuccessCallback() {
            @Override
            public void onSuccess(Object result) {
                log.debug("发送成功:{}", JSONUtil.toJsonStr(kafkaMessageVo));
            }
        };
        //发送失败回调
        FailureCallback failureCallback = new FailureCallback() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("发送失败", JSONUtil.toJsonStr(kafkaMessageVo), ex);
            }
        };
        listenableFuture.addCallback(successCallback, failureCallback);
        */
        //SpringBoot3的写法
        CompletableFuture<SendResult<String, Object>> completableFuture = kafkaTemplate.send(topic, UUID.randomUUID().toString(), msgJson);

        //执行成功回调
        completableFuture.thenAccept(result -> {
            log.debug("发送成功:{}", JSONUtil.toJsonStr(kafkaMessageVo));
        });
        //执行失败回调
        completableFuture.exceptionally(e -> {
            log.error("发送失败", JSONUtil.toJsonStr(kafkaMessageVo), e);
            return null;
        });

        return ResultGenerator.genSuccessResult();
    }
}

       这个发送信息就与springBoot2的写法一致了。原ListenableFuture类已过时了,现在SpringBoot3、JDK8+用CompletableFuture监听信息发送结果。


总结

1、SpringBoot3真香
2、Kafka的集成已经非常成熟了,资料也多。
       我这里这个SpringBoot3集成Kafka发送信息目前觉得是独家,你能找到的应该都还是使用的ListenableFuture类。
       好了,就写到这里,希望能帮到大家,uping!!!文章来源地址https://www.toymoban.com/news/detail-784156.html

到了这里,关于SpringBoot3集成Kafka优雅实现信息消费发送的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • kafka原理之springboot 集成批量消费

    由于 Kafka 的写性能非常高,因此项目经常会碰到 Kafka 消息队列拥堵的情况。遇到这种情况,我们可以通过并发消费、批量消费的方法进行解决。 手动提交非批量消费   String 类型接入 使用注解方式获取消息头、消息体   手动提交批量消费 想要批量消费,首先要开启批量

    2024年02月11日
    浏览(11)
  • springboot集成kafka消费手动启动停止

    在月结,或者某些时候,我们需要停掉kafka所有的消费端,让其暂时停止消费,而后等月结完成,再从新对消费监听恢复,进行消费,此动作不需要重启服务,最后源码下载 1.通过定时任务自动触发,通过@Scheduled,在某个时间点暂停kafka某个监听的消费,也可以在某个时间点

    2024年02月06日
    浏览(12)
  • kafka原理五之springboot 集成批量消费

    目录 前言 一、新建一个maven工程,添加kafka依赖 二、yaml配置文件 三、消息消费 手动提交非批量消费   String 类型接入 使用注解方式获取消息头、消息体 手动提交批量消费 ConsumerRecord类接收 String类接收 使用注解方式获取消息头、消息体,则也是使用 List 来接收: 并发消费

    2024年02月14日
    浏览(11)
  • springboot3使用自定义注解+AOP+redis优雅实现防重复提交

    springboot3使用自定义注解+AOP+redis优雅实现防重复提交

      ⛰️个人主页:     蒾酒 🔥 系列专栏 :《spring boot实战》 🌊 山高路远,行路漫漫,终有归途 目录 写在前面 实现思路 实现步骤 1.定义防重复提交注解 2.编写一个切面去发现该注解然后执行防重复提交逻辑 3.测试 依赖条件 1.接口上标记防重复提交注解 2.接口测试 写在最

    2024年04月11日
    浏览(11)
  • Springboot3.X整合Dubbo3.XSpringCloudAlibaba微服务 2022.0 + Springboot3.X 集成 Dubbo实现对外调用http内部调用RPC

    近期自己新开了一套SpringCloud Alibaba微服务项目,接口使用了对外HTTP,内部RPC的设计,具体点说就是外部用户或客户端通过Nginx访问到Gateway网关再分发到各个服务,内部各个服务之间统一使用Dubbo RPC进行通信。下面是Springboot3.x集成Dubbo的分享: 1. 需要的关键依赖 2. 启动程序入

    2024年02月15日
    浏览(14)
  • Java21 + SpringBoot3集成easy-captcha实现验证码显示和登录校验

    Java21 + SpringBoot3集成easy-captcha实现验证码显示和登录校验

    近日心血来潮想做一个开源项目,目标是做一款可以适配多端、功能完备的模板工程,包含后台管理系统和前台系统,开发者基于此项目进行裁剪和扩展来完成自己的功能开发。 本项目为前后端分离开发,后端基于 Java21 和 SpringBoot3 开发,后端使用 Spring Security 、 JWT 、 Spr

    2024年01月23日
    浏览(12)
  • SpringBoot 集成 EasyExcel 3.x 优雅实现 Excel 导入导出

    EasyExcel 是一个基于 Java 的、快速、简洁、解决大文件内存溢出的 Excel 处理工具。它能让你在不用考虑性能、内存的等因素的情况下,快速完成 Excel 的读、写等功能。 EasyExcel文档地址: https://easyexcel.opensource.alibaba.com/ 引入依赖 简单导出 以导出用户信息为例,接下来手把手教

    2024年02月15日
    浏览(18)
  • SpringBoot整合Kafka简单配置实现生产消费

    *本文基于SpringBoot整合Kafka,通过简单配置实现生产及消费,包括生产消费的配置说明、消费者偏移设置方式等。更多功能细节可参考 spring kafka 文档:https://docs.spring.io/spring-kafka/docs/current/reference/html 搭建Kafka环境,参考Kafka集群环境搭建及使用 Java环境:JDK1.8 Maven版本:apach

    2024年02月16日
    浏览(7)
  • SpringBoot3集成Quartz

    SpringBoot3集成Quartz

    目录 一、简介 二、工程搭建 1、工程结构 2、依赖管理 3、数据库 4、配置文件 三、Quartz用法 1、初始化加载 2、新增任务 3、更新任务 4、暂停任务 5、恢复任务 6、执行一次 7、删除任务 8、任务执行 四、参考源码 标签:Quartz.Job.Scheduler; Quartz由Java编写的功能丰富的开源作业

    2024年02月13日
    浏览(10)
  • SpringBoot3集成ElasticSearch

    SpringBoot3集成ElasticSearch

    目录 一、简介 二、环境搭建 1、下载安装包 2、服务启动 三、工程搭建 1、工程结构 2、依赖管理 3、配置文件 四、基础用法 1、实体类 2、初始化索引 3、仓储接口 4、查询语法 五、参考源码 标签:ElasticSearch8.Kibana8; Elasticsearch是一个分布式、RESTful风格的搜索和数据分析引擎

    2024年02月12日
    浏览(12)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包