第一步:引入maven依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
第二步:新增配置文件
以下为大致结构,供参考
spring:
kafka:
# 第一个kafka的配置
first:
bootstrap-servers: xxx.xxx.xxx.xxx:xxxx
producer:
retries: x
acks: -1
consumer:
enable-auto-commit: false
group-id: first-consumer
listener:
ack-mode: xx
# 第二个kafka的配置
second:
bootstrap-servers: xxx.xxx.xxx.xxx:xxxx
producer:
batch-size: xxxx
buffer-memory: xxxxxx
consumer:
auto-offset-reset: earliest
group-id: second-consumer
listener:
concurrency: xx
第三步:新增配置类
第一个kafka的配置类
/**
* 第一个kafka配置
*
* @author 小流慢影
* @date 2023年5月15日
*/
@Configuration
public class FirstKafkaConfig {
/**
* 读取第一个kafka配置
* Primary注解表示默认以这个为准
*
* @return 第一个kafka配置
*/
@Primary
@ConfigurationProperties(prefix = "spring.kafka.first")
@Bean
public KafkaProperties firstKafkaProperties() {
return new KafkaProperties();
}
/**
* 构建第一个kafka的生产者发送template
*
* @param firstKafkaProperties 第一个kafka配置
* @return 第一个kafka的生产者发送template
*/
@Primary
@Bean
public KafkaTemplate<String, String> firstKafkaTemplate(
@Autowired @Qualifier("firstKafkaProperties") KafkaProperties firstKafkaProperties) {
return new KafkaTemplate<>(firstProducerFactory(firstKafkaProperties));
}
/**
* 构建第一个kafka的消费者监听容器工厂
*
* @param firstKafkaProperties 第一个kafka配置
* @return 第一个kafka的消费者监听容器工厂
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
firstKafkaListenerContainerFactory(@Autowired @Qualifier("firstKafkaProperties") KafkaProperties firstKafkaProperties) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(firstConsumerFactory(firstKafkaProperties));
return factory;
}
/**
* 新建第一个kafka的消费者工厂
*
* @param firstKafkaProperties 第一个kafka配置
* @return 第一个kafka的消费者工厂
*/
private ConsumerFactory<? super Integer, ? super String> firstConsumerFactory(KafkaProperties firstKafkaProperties) {
return new DefaultKafkaConsumerFactory<>(firstKafkaProperties.buildConsumerProperties());
}
/**
* 新建第一个kafka的生产者工厂
*
* @param firstKafkaProperties 第一个kafka配置
* @return 第一个kafka的生产者工厂
*/
private DefaultKafkaProducerFactory<String, String> firstProducerFactory(KafkaProperties firstKafkaProperties) {
return new DefaultKafkaProducerFactory<>(firstKafkaProperties.buildProducerProperties());
}
}
第二个kafka的配置类
/**
* 第二个kafka配置
*
* @author 小流慢影
* @date 2023年5月15日
*/
@Configuration
public class SecondKafkaConfig {
/**
* 读取第二个kafka配置
*
* @return 第二个kafka配置
*/
@ConfigurationProperties(prefix = "spring.kafka.second")
@Bean
public KafkaProperties secondKafkaProperties() {
return new KafkaProperties();
}
/**
* 构建第二个kafka的生产者发送template
*
* @param secondKafkaProperties 第二个kafka配置
* @return 第二个kafka的生产者发送template
*/
@Bean
public KafkaTemplate<String, String> secondKafkaTemplate(
@Autowired @Qualifier("secondKafkaProperties") KafkaProperties secondKafkaProperties) {
return new KafkaTemplate<>(secondProducerFactory(secondKafkaProperties));
}
/**
* 构建第二个kafka的消费者监听容器工厂
*
* @param secondKafkaProperties 第二个kafka配置
* @return 第二个kafka的消费者监听容器工厂
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
secondKafkaListenerContainerFactory(@Autowired @Qualifier("secondKafkaProperties") KafkaProperties secondKafkaProperties) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(secondConsumerFactory(secondKafkaProperties));
return factory;
}
/**
* 新建第二个kafka的消费者工厂
*
* @param secondKafkaProperties 第二个kafka配置
* @return 第二个kafka的消费者工厂
*/
private ConsumerFactory<? super Integer, ? super String> secondConsumerFactory(KafkaProperties secondKafkaProperties) {
return new DefaultKafkaConsumerFactory<>(secondKafkaProperties.buildConsumerProperties());
}
/**
* 新建第二个kafka的生产者工厂
*
* @param secondKafkaProperties 第二个kafka配置
* @return 第二个kafka的生产者工厂
*/
private DefaultKafkaProducerFactory<String, String> secondProducerFactory(KafkaProperties secondKafkaProperties) {
return new DefaultKafkaProducerFactory<>(secondKafkaProperties.buildProducerProperties());
}
}
第四步:用
生产者用法
/**
* 第一个kafka配置默认就可以用,因为配置了@Primary
*/
@Resource
private KafkaTemplate<String, String> firstKafkaTemplate;
/**
* 第二个kafka配置需要指定下名字
*/
@Resource(name = "secondKafkaTemplate")
private KafkaTemplate<String, String> secondKafkaTemplate;
消费者用法
/**
* 测试消费者
* 这里要消费哪一个kafka消息,containerFactory就需要配成上面相对应的消费者监听容器工厂
* @param record 消息
* @param ack ack
*/
@KafkaListener(
containerFactory = "secondKafkaListenerContainerFactory",
topics = {"xxxx"},
groupId = "second-consumer")
public void testConsumer(ConsumerRecord<?, ?> record, Acknowledgment ack) {
//do something
}
文章来源地址https://www.toymoban.com/news/detail-582213.html
文章来源:https://www.toymoban.com/news/detail-582213.html
到了这里,关于springboot多kafka配置的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!