springboot多kafka配置

这篇具有很好参考价值的文章主要介绍了springboot多kafka配置。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

第一步:引入maven依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

第二步:新增配置文件

以下为大致结构,供参考
spring:
  kafka:
    # 第一个kafka的配置
    first:
      bootstrap-servers: xxx.xxx.xxx.xxx:xxxx
      producer:
        retries: x
        acks: -1
      consumer:
        enable-auto-commit: false
        group-id: first-consumer
      listener:
      	ack-mode: xx
    # 第二个kafka的配置
    second:
      bootstrap-servers: xxx.xxx.xxx.xxx:xxxx
      producer:
        batch-size: xxxx
        buffer-memory: xxxxxx
      consumer:
        auto-offset-reset: earliest
        group-id: second-consumer
      listener:
      	concurrency: xx

第三步:新增配置类

第一个kafka的配置类
/**
 * 第一个kafka配置
 *
 * @author 小流慢影
 * @date 2023年5月15日
 */
@Configuration
public class FirstKafkaConfig {

    /**
     * 读取第一个kafka配置
     * Primary注解表示默认以这个为准
     *
     * @return 第一个kafka配置
     */
    @Primary
    @ConfigurationProperties(prefix = "spring.kafka.first")
    @Bean
    public KafkaProperties firstKafkaProperties() {
        return new KafkaProperties();
    }

    /**
     * 构建第一个kafka的生产者发送template
     *
     * @param firstKafkaProperties 第一个kafka配置
     * @return 第一个kafka的生产者发送template
     */
    @Primary
    @Bean
    public KafkaTemplate<String, String> firstKafkaTemplate(
            @Autowired @Qualifier("firstKafkaProperties") KafkaProperties firstKafkaProperties) {
        return new KafkaTemplate<>(firstProducerFactory(firstKafkaProperties));
    }

    /**
     * 构建第一个kafka的消费者监听容器工厂
     *
     * @param firstKafkaProperties 第一个kafka配置
     * @return 第一个kafka的消费者监听容器工厂
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
    firstKafkaListenerContainerFactory(@Autowired @Qualifier("firstKafkaProperties") KafkaProperties firstKafkaProperties) {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(firstConsumerFactory(firstKafkaProperties));
        return factory;
    }

    /**
     * 新建第一个kafka的消费者工厂
     *
     * @param firstKafkaProperties 第一个kafka配置
     * @return 第一个kafka的消费者工厂
     */
    private ConsumerFactory<? super Integer, ? super String> firstConsumerFactory(KafkaProperties firstKafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(firstKafkaProperties.buildConsumerProperties());
    }

    /**
     * 新建第一个kafka的生产者工厂
     *
     * @param firstKafkaProperties 第一个kafka配置
     * @return 第一个kafka的生产者工厂
     */
    private DefaultKafkaProducerFactory<String, String> firstProducerFactory(KafkaProperties firstKafkaProperties) {
        return new DefaultKafkaProducerFactory<>(firstKafkaProperties.buildProducerProperties());
    }
}
第二个kafka的配置类
/**
 * 第二个kafka配置
 *
 * @author 小流慢影
 * @date 2023年5月15日
 */
@Configuration
public class SecondKafkaConfig {

    /**
     * 读取第二个kafka配置
     *
     * @return 第二个kafka配置
     */
    @ConfigurationProperties(prefix = "spring.kafka.second")
    @Bean
    public KafkaProperties secondKafkaProperties() {
        return new KafkaProperties();
    }

    /**
     * 构建第二个kafka的生产者发送template
     *
     * @param secondKafkaProperties 第二个kafka配置
     * @return 第二个kafka的生产者发送template
     */
    @Bean
    public KafkaTemplate<String, String> secondKafkaTemplate(
            @Autowired @Qualifier("secondKafkaProperties") KafkaProperties secondKafkaProperties) {
        return new KafkaTemplate<>(secondProducerFactory(secondKafkaProperties));
    }

    /**
     * 构建第二个kafka的消费者监听容器工厂
     *
     * @param secondKafkaProperties 第二个kafka配置
     * @return 第二个kafka的消费者监听容器工厂
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
    secondKafkaListenerContainerFactory(@Autowired @Qualifier("secondKafkaProperties") KafkaProperties secondKafkaProperties) {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(secondConsumerFactory(secondKafkaProperties));
        return factory;
    }

    /**
     * 新建第二个kafka的消费者工厂
     *
     * @param secondKafkaProperties 第二个kafka配置
     * @return 第二个kafka的消费者工厂
     */
    private ConsumerFactory<? super Integer, ? super String> secondConsumerFactory(KafkaProperties secondKafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(secondKafkaProperties.buildConsumerProperties());
    }

    /**
     * 新建第二个kafka的生产者工厂
     *
     * @param secondKafkaProperties 第二个kafka配置
     * @return 第二个kafka的生产者工厂
     */
    private DefaultKafkaProducerFactory<String, String> secondProducerFactory(KafkaProperties secondKafkaProperties) {
        return new DefaultKafkaProducerFactory<>(secondKafkaProperties.buildProducerProperties());
    }
}

第四步:用

生产者用法
/**
 * 第一个kafka配置默认就可以用,因为配置了@Primary
 */
@Resource
private KafkaTemplate<String, String> firstKafkaTemplate;

/**
 * 第二个kafka配置需要指定下名字
 */
@Resource(name = "secondKafkaTemplate")
private KafkaTemplate<String, String> secondKafkaTemplate;
消费者用法
/**
 * 测试消费者
 * 这里要消费哪一个kafka消息,containerFactory就需要配成上面相对应的消费者监听容器工厂
 * @param record 消息
 * @param ack ack
 */
@KafkaListener(
        containerFactory = "secondKafkaListenerContainerFactory",
        topics = {"xxxx"},
        groupId = "second-consumer")
public void testConsumer(ConsumerRecord<?, ?> record, Acknowledgment ack) {
    //do something
}

文章来源地址https://www.toymoban.com/news/detail-582213.html

到了这里,关于springboot多kafka配置的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • kafka配置多个消费者groupid kafka多个消费者消费同一个partition(java)

    kafka配置多个消费者groupid kafka多个消费者消费同一个partition(java)

    kafka是由Apache软件基金会开发的一个开源流处理平台。kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 kafka中partition类似数据库中的分表数据,可以起到水平扩展数据的目的,比如有a,b,c,d,e,f 6个数据,某个topic有两个partition,一

    2024年01月22日
    浏览(80)
  • SpringBoot整理-Spring Boot配置

    Spring Boot 的配置系统是其核心功能之一,旨在简化 Spring 应用的配置过程。Spring Boot 提供了一种灵活的方式来配置你的应用,无论是通过外部配置文件,环境变量,命令行参数还是在代码中直接配置。以下是关于 Spring Boot 配置的几个重要方面: 配置文件 application.prop

    2024年01月25日
    浏览(16)
  • Spring boot使用Kafka Java反序列化漏洞 CVE-2023-34040

    Spring boot使用Kafka Java反序列化漏洞 CVE-2023-34040

    背景:公司项目扫描到 Spring-Kafka上使用通配符模式匹配进行的安全绕过漏洞 CVE-2023-20873 中等风险 | 2023年8月23日 | CVE-2023-34040 在Spring for Apache Kafka 3.0.9及更早版本以及2.9.10及更早版本中,存在可能的反序列化攻击向量,但只有在应用了不常见的配置时才会出现。攻击者必须在

    2024年02月07日
    浏览(24)
  • 【SpringBoot应用篇】Spring Boot 配置HTTP 响应内容压缩

    5、默认情况下,要执行压缩,响应的长度至少为 2048 字节,可以通过 server.compression.min-response-size 属性配置。 6、默认情况下,仅当响应的内容类型为以下内容之一时,才会对其进行压缩,可以通过 mime-types 属性配置:text/html,text/xml,text/plain,text/css,text/javascript,application/javasc

    2024年02月16日
    浏览(11)
  • SpringBoot+jasypt-spring-boot-starter实现配置文件明文加密

    SpringBoot+jasypt-spring-boot-starter实现配置文件明文加密

    springboot:2.1.4.RELEASE JDK:8 jasypt-spring-boot-starter:3.0.2 Jasypt默认算法为PBEWithMD5AndDES,该算法需要一个加密密钥,可以在应用启动时指定(环境变量)。也可以直接写入配置文件 3.1 application.properties配置文件版 加密后,可删除jasypt.encryptor.password配置;发版时可在命令行中配置 3.2 函数

    2024年02月15日
    浏览(16)
  • Spring Boot入门(04):SpringBoot实现多环境配置文件切换 | 超级详细,建议收藏

    Spring Boot入门(04):SpringBoot实现多环境配置文件切换 | 超级详细,建议收藏

            在开发和部署Spring Boot应用的过程中,经常需要在不同的环境中进行配置,比如开发环境、测试环境、生产环境等。为了方便管理和部署,我们需要实现多环境配置文件切换。本篇教程将带你轻松搞定不同环境部署问题,让你的应用在各个环境中稳定运行。无论你是

    2024年02月12日
    浏览(18)
  • 【Java】Spring Boot配置动态数据源

    1.1 创建动态数据源 通过实现Spring提供的AbstractRoutingDataSource类,可以实现自己的数据源选择逻辑,从而可以实现数据源的动态切换。 1.2 创建动态数据源配置类 跟配置静态多数据源一样,需要手动配置下面的三个 Bean,只不过DynamicDataSource类的targetDataSources是空的。 1.3 创建动

    2024年02月09日
    浏览(12)
  • 解决Spring Boot跨域问题(配置JAVA类)

    解决Spring Boot跨域问题(配置JAVA类)

    跨域问题指的是不同端口之间,使用 ajax 无法相互调用的问题。跨域问题本质是浏览器的一种保护机制,它是为了保证用户的安全,防止恶意网站窃取数据。 比如前端用的端口号为8081,后端用的端口号为8080,后端想接收前端发送的数据就会出现跨域问题。 如图所示: 这里

    2024年01月17日
    浏览(44)
  • Java 框架面试题-Spring Boot自定义配置与自动配置共存

    Java 框架面试题-Spring Boot自定义配置与自动配置共存

    Spring Boot 是一个快速开发框架,可以简化 Spring 应用程序的开发,其中自定义配置是其中一个非常重要的特性。 在 Spring Boot 中,自定义配置允许开发者以自己的方式来配置应用程序。自定义配置可以用于覆盖默认配置,也可以用于添加新的配置项。本文将详细介绍 java框架面

    2023年04月11日
    浏览(13)
  • Java实战:Spring Boot application.yml配置文件详解

    本文将详细介绍Spring Boot application.yml 配置文件的使用和配置项。我们将探讨 application.yml 文件的基本概念,以及如何使用它来配置Spring Boot应用程序的各个方面。此外,我们将通过具体的示例来展示如何配置不同的Spring Boot组件,如数据源、数据库、缓存、邮件服务等。本文适

    2024年04月24日
    浏览(15)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包