Java轻松使用Kafka生产者,消费者

这篇具有很好参考价值的文章主要介绍了Java轻松使用Kafka生产者,消费者。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Java轻松使用Kafka生产者,消费者

一、环境说明

  1. 项目中需要下面的依赖:(版本自定义)
<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>1.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.0.2</version>
        </dependency>

2.yml配置文件设置

 kafka:
    bootstrap-servers: ip:端口
    jaas:
      enabled: false
    listener:
      type: single
      concurrency: 3
    consumer:
#      key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
      group-id: group-id数据
      auto-offset-reset: latest
      enable-auto-commit: false
      max-poll-records: 100

  # kafka topic
  task:
    execute:
      topic: topic名称

二、生产者

1.简单生产者的书写:

@Component
public class SendKafkaUtil {

    @Autowired
    KafkaTemplate<Object, Object> kafkaTemplate;



    @Value("${spring.task.execute.topic}")
    private String topic;


    public void sendMessageKafka() {
        kafkaTemplate.send(topic, "{json数据}");
    }

}

三、消费者

Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。

1.简单消费者的书写:

 @KafkaListener(topics = {"${spring.kafka.topic}"}, groupId = "${spring.kafka.consumer.group-id}")
    public void processMessage(List<ConsumerRecord<String, String>> records){
        logger.info("kafka消费消息数量:" + records.size());
}

2.消费者批量消费的书写(批量控制:max-poll-records: 100)

 

@Bean
    public KafkaListenerContainerFactory<?> batchFactory(KafkaProperties properties) {
      Map<String, Object> consumerProperties = properties.buildConsumerProperties();
      ConcurrentKafkaListenerContainerFactory<String, String> factory = new
        ConcurrentKafkaListenerContainerFactory<>();
      factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProperties));
      factory.setBatchListener(true); // 开启批量监听
      return factory;
    }
  }
  /**
   * 消费者1
   * 批处理统一方法
   *
   * @param records
   */

  @KafkaListener(topics = {"${spring.task.execute.topic}"},containerFactory = "batchFactory",topicPartitions = {
    @TopicPartition(partitions = {"0"}, topic = "${spring.task.execute.topic}") })
  public void consumer1(List<ConsumerRecord<String, Object>> records) throws IOException {
    log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
    log.info("Id1 records size " +  records.size());
    // todo数据逻辑处理
  }
  /**
   * 消费者2
   * 批处理统一方法
   *
   * @param records
   */

  @KafkaListener(topics = {"${spring.task.execute.topic}"}, containerFactory = "batchFactory",topicPartitions = {
    @TopicPartition(partitions = {"1"}, topic = "${spring.task.execute.topic}") })
  public void consumer2(List<ConsumerRecord<String, Object>> records) throws IOException {
    log.info("Id2 Listener, Thread ID: " + Thread.currentThread().getId());
    log.info("Id2 records size " + records.size());
   // todo数据逻辑处理
  }

注:多消费者时,需要对应kafka中配置的分区;多少的Partition就有多少个消费者,以免资源浪费文章来源地址https://www.toymoban.com/news/detail-612885.html

到了这里,关于Java轻松使用Kafka生产者,消费者的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • kafka生产者和消费者配置介绍

    每个kafka broker中配置文件 server.properties 默认必须配置的属性如下: **bootstrap.servers** - 指定生产者客户端连接kafka集群所需的broker地址列表,格式为host1:port1,host2:port2,可以设置一个或多个。这里并非需要所有的broker地址,因为生产者会从给定的broker里寻找其它的broker。 **key

    2024年02月12日
    浏览(13)
  • kafka生产者和消费者(python版)

    生产者 消费者 消费者中的组名主要用户针对主题的偏移量进行更改,也涉及到主题中分区的问题, kafka工具类 此工具类基本上拿过去就可以用 疑问 当消费者链接kafka时发现topic没有未读的消息怎样退出呢,默认是在一直等待,但是我期望没有要读的消息的时候直接退出即可

    2024年02月16日
    浏览(9)
  • 笔记:配置多个kafka生产者和消费者

    如果只有一个kafka,那么使用自带的KafkaAutoConfiguration配置类即可,对应已有属性类KafkaProperties,属性前缀为spring.kafka.xxx; 本文记录配置多个kafka的情况,即在KafkaAutoConfiguration的基础上,自定义额外的kafka生产者和消费者。 适用场景:需要消费来源于不同kafka的消息、需要在不

    2024年02月15日
    浏览(19)
  • Kafka生产者与消费者api示例

    Kafka生产者与消费者api示例

      一个正常的生产逻辑需要具备以下几个步骤 配置生产者参数及创建相应的生产者实例 构建待发送的消息 发送消息 关闭生产者实例 采用默认分区方式将消息散列的发送到各个分区当中    对于properties配置的第二种写法,相对来说不会出错,简单举例:   1.kafka的生产者可

    2024年02月07日
    浏览(15)
  • Kafka:主题创建、分区修改查看、生产者、消费者

    Kafka:主题创建、分区修改查看、生产者、消费者

    1.创建主题 2.查看所有主题 3.查看详细主题 序号从0开始计算 Partition:分区数,该主题有3个分区 Replica:副本数,该主题有3个副本 Leader:副本数中的主的序号,生产消费的对象 1.修改分区数 修改的分区数量不可以小于或者等于当前主题分区的数量,否则会报错 在根目录kaf

    2024年02月11日
    浏览(18)
  • Kafka系列:查看Topic列表、消息消费情况、模拟生产者消费者

    Kafka系列:查看Topic列表、消息消费情况、模拟生产者消费者

    执行topic删除命令时,出现提示 这条命令其实并不执行删除动作,仅仅是在zookeeper上标记该topic要被删除而已,同时也提醒用户一定要提前打开delete.topic.enable开关,否则删除动作是不会执行的。 解决办法: a)在server.properties中设置delete.topic.enable参数为ture b)如下操作: 1.登

    2023年04月26日
    浏览(11)
  • Linux安装Kafka,创建topic、生产者、消费者

    Linux安装Kafka,创建topic、生产者、消费者

    1.创建安装目录/usr/local/kafka mkdir /usr/local/kafka 2.进入安装包目录 cd /usr/local/kafka  3.下载安装包 wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz 4.解压安装包 tar -zxvf kafka_2.12-3.3.1.tgz 5.进入cd kafka_2.12-3.3.1目录 cd kafka_2.12-3.3.1/ 6.修改zookeeper配置 cat ./config/zookeeper.properties | grep

    2023年04月17日
    浏览(19)
  • 探究:kafka生产者/消费者与多线程安全

    探究:kafka生产者/消费者与多线程安全

    目录 1. 多线程安全 1.1. 生产者是多线程安全的么? 1.1. 消费者是多线程安全的么? 2. 消费者规避多线程安全方案 2.1. 每个线程维护一个kafkaConsumer 2.2. [单/多]kafkaConsumer实例 + 多worker线程 2.3.方案优缺点对比         Kafka生产者是 线程安全 的,可以在多个线程中共享一个

    2023年04月26日
    浏览(12)
  • Kafka 架构深度解析:生产者(Producer)和消费者(Consumer)

    Kafka 架构深度解析:生产者(Producer)和消费者(Consumer)

    Apache Kafka 作为分布式流处理平台,其架构中的生产者和消费者是核心组件,负责实现高效的消息生产和消费。本文将深入剖析 Kafka 架构中生产者和消费者的工作原理、核心概念以及高级功能。 1 发送消息到 Kafka Kafka 生产者负责将消息发布到指定的主题。以下是一个简单的生

    2024年02月03日
    浏览(26)
  • 在Windows上搭建Kafka环境的步骤,包括安装Java、下载Kafka、配置Zookeeper和Kafka、启动Zookeeper和Kafka、创建主题和生产者/消费者等

    1. 安装Java Kafka需要Java环境支持。可以从Oracle官网下载JDK,或者使用OpenJDK。 2. 下载Kafka 可以从Kafka官网下载Kafka二进制压缩包。解压后可以看到bin、config、libs等目录。 3. 配置Zookeeper Kafka依赖Zookeeper实现分布式协作。可以使用Kafka自带的Zookeeper,也可以独立安装Zookeeper。 如果使

    2024年02月11日
    浏览(11)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包