前言
首先,你的JDK是否已经是8+了呢?
其次,你是否已经用上SpringBoot3了呢?
最后,这次分享的是SpringBoot3下的kafka发信息与消费信息。
一、场景说明
这次的场景是springboot3+多数据源的数据交换中心(数仓)需要消费Kafka里的上游推送信息,这里做数据解析处理入库TDengine。
二、使用步骤
1.引入库
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
简简单单,就这一个依赖就够了。
2.配置
spring:
#kafka配置
kafka:
#bootstrap-servers: 192.168.200.72:9092,192.168.200.73:9092
#bootstrap-servers: 192.168.200.83:9092,192.168.200.84:9092
bootstrap-servers: localhost:9092
client-id: dc-device-flow-analyze
consumer:
group-id: dc-device-flow-analyze-consumer-group
max-poll-records: 10
#Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项 【latest, earliest, none】
auto-offset-reset: earliest
#是否开启自动提交
enable-auto-commit: false
#自动提交的时间间隔
auto-commit-interval: 1000
producer:
acks: 1
batch-size: 4096
buffer-memory: 40960000
client-id: dc-device-flow-analyze-producer
compression-type: zstd
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retries: 3
properties:
spring.json.add.type.headers: false
max.request.size: 126951500
listener:
ack-mode: MANUAL_IMMEDIATE
concurrency: 1 #推荐设置为topic的分区数
type: BATCH #开启批量监听
#消费topic配置
xiaotian:
analyze:
device:
flow:
topic:
consumer: device-flow
3.消费
import com.xiaotian.datagenius.service.DataTransService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 消费者listener
*
* @author zhengwen
**/
@Slf4j
@Component
public class KafkaListenConsumer {
@Autowired
private DataTransService dataTransService;
/**
* 设备流水listenner
*
* @param records 消费信息
* @param ack Ack机制
*/
@KafkaListener(topics = "${xiaotian.analyze.device.flow.topic.consumer}")
public void deviceFlowListen(List<ConsumerRecord> records, Acknowledgment ack) {
log.debug("=====设备流水deviceFlowListen消费者接收信息====");
try {
for (ConsumerRecord record : records) {
log.debug("---开启线程解析设备流水数据:{}", record.toString());
//具体service里取做逻辑
dataTransService.deviceFlowTransSave(record);
}
} catch (Exception e) {
log.error("----设备流水数据消费者解析数据异常:{}", e.getMessage(), e);
} finally {
//手动提交偏移量
ack.acknowledge();
}
}
}
消费与SpringBoot2的写法一样,没有任何改变。
4.发布信息
import cn.hutool.json.JSON;
import cn.hutool.json.JSONUtil;
import com.easylinkin.datagenius.core.Result;
import com.easylinkin.datagenius.core.ResultGenerator;
import com.easylinkin.datagenius.vo.KafkaMessageVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
/**
* kafka信息管理
*
* @author zhengwen
**/
@Slf4j
@RestController
@RequestMapping("/kafka/push")
public class KafkaPushController {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* kafka的信息push发送
*
* @param kafkaMessageVo kafka信息对象
* @return 推送结果
*/
@PostMapping("/sendMsg")
public Result sendMsg(@RequestBody KafkaMessageVo kafkaMessageVo) {
String topic = kafkaMessageVo.getTopic();
String msg = kafkaMessageVo.getMessage();
log.debug(msg);
JSON msgJson = JSONUtil.parseObj(msg);
/* springboot2的写法
ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaTemplate.send(topic, UUID.randomUUID().toString(), msgJson);
//发送成功后回调
SuccessCallback successCallback = new SuccessCallback() {
@Override
public void onSuccess(Object result) {
log.debug("发送成功:{}", JSONUtil.toJsonStr(kafkaMessageVo));
}
};
//发送失败回调
FailureCallback failureCallback = new FailureCallback() {
@Override
public void onFailure(Throwable ex) {
log.error("发送失败", JSONUtil.toJsonStr(kafkaMessageVo), ex);
}
};
listenableFuture.addCallback(successCallback, failureCallback);
*/
//SpringBoot3的写法
CompletableFuture<SendResult<String, Object>> completableFuture = kafkaTemplate.send(topic, UUID.randomUUID().toString(), msgJson);
//执行成功回调
completableFuture.thenAccept(result -> {
log.debug("发送成功:{}", JSONUtil.toJsonStr(kafkaMessageVo));
});
//执行失败回调
completableFuture.exceptionally(e -> {
log.error("发送失败", JSONUtil.toJsonStr(kafkaMessageVo), e);
return null;
});
return ResultGenerator.genSuccessResult();
}
}
这个发送信息就与springBoot2的写法一致了。原ListenableFuture类已过时了,现在SpringBoot3、JDK8+用CompletableFuture监听信息发送结果。文章来源:https://www.toymoban.com/news/detail-784156.html
总结
1、SpringBoot3真香
2、Kafka的集成已经非常成熟了,资料也多。
我这里这个SpringBoot3集成Kafka发送信息目前觉得是独家,你能找到的应该都还是使用的ListenableFuture类。
好了,就写到这里,希望能帮到大家,uping!!!文章来源地址https://www.toymoban.com/news/detail-784156.html
到了这里,关于SpringBoot3集成Kafka优雅实现信息消费发送的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!