kafka-consumer-消费者代码实例

这篇具有很好参考价值的文章主要介绍了kafka-consumer-消费者代码实例。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

1 消费一个主题

2 消费一个分区

3 消费者组案例


1 消费一个主题

消费topic为first的消息。

public class ConsumerTest{
    public void main(string[] args){
        // 0 配置
        Properties properties = new Properties();
        //连接bootstrap . servers
        properties.put(ConsumerConfig.BO0TSTRAP_SERVERS_CONFIG , " hadoop102:9092,                     hadoop103:9092");
        //反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,        StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //配置消费者组id,必须配置,没有也要配置,不然会抛出异常
        properties.put(ConsumerConfig.GROUP_ID_CONFIG , "test" );

        // 1 创建一个消费者" ", "hello"
        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);

        // 2 订阅主题first,可以订阅多个topic。
        ArrayList<String> topics = new ArrayList<>();topics.add( "first" );
        kafkaConsumer.subscribe(topics);
        
        // 3 消费数据
        while (true){
            //每一秒拉取一次数据。
            ConsumerRecords<String,String> consumerRecords =      kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String,String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }
}

2 消费一个分区

应用场景:当生产者将所有消息发往特定的某个主题分区。

消费first主题0号分区代码:

public class customConsumerPartition {
    public static void main(String[ ] args) {
        // 0 配置
        Properties properties = new Properties();
        //连接
        properties.put(ConsumerConfig.B00TSTRAP_SERVERS_CONFIG , " hadoop102:9092 , hadoop103:9092");
        //反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,    StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_    CONFI6,StringDeserializer.class.getName());
        //组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG , "test");l
        // 1 创建一个消费者
        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsume
r<>(properties);
        // 2 订阅主题对应的分区
        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition( topic: "first", partition: 0))kafkaconsumer.assign(topicPartitions);

        // 3 消费数据
        while (true){
            ConsumerRecords<String,String> consumerReconds =     kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String,String> consumerRecord : consumerRecords){
                system.out.println(consumerRecord);
            }
        }
    }
}

3 消费者组案例

测试同一个主题的分区数据,只能由一个消费者组中的一个消费者进行消费。

kafka-consumer-消费者代码实例

创建三个消费者对某一分区进行消费 。

将消费主题中的代码复制三份,由于group id是一样的,所以这三个消费者为同一消费者组。文章来源地址https://www.toymoban.com/news/detail-515365.html

  1. 生产者发送消息(方便阅读)
    kafka-consumer-消费者代码实例
    此时消息分布在0,1,2三个分区中。
  2. 消费结果发现,三个消费者每个消费者消费一个分区的数据。
    kafka-consumer-消费者代码实例
    kafka-consumer-消费者代码实例
    kafka-consumer-消费者代码实例

到了这里,关于kafka-consumer-消费者代码实例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • kafka-consumer-groups.sh消费者组管理

    kafka-consumer-groups.sh消费者组管理

      先调用 MetadataRequest 拿到所有在线Broker列表 再给每个Broker发送 ListGroupsRequest 请求获取 消费者组数据。 查看指定消费组详情 --group 查看所有消费组详情 --all-groups 查询消费者成员信息 --members 查询消费者状态信息 --state 删除指定消费组 --group 删除所有消费组 --all-groups 想要

    2024年02月03日
    浏览(26)
  • Kafka 架构深度解析:生产者(Producer)和消费者(Consumer)

    Kafka 架构深度解析:生产者(Producer)和消费者(Consumer)

    Apache Kafka 作为分布式流处理平台,其架构中的生产者和消费者是核心组件,负责实现高效的消息生产和消费。本文将深入剖析 Kafka 架构中生产者和消费者的工作原理、核心概念以及高级功能。 1 发送消息到 Kafka Kafka 生产者负责将消息发布到指定的主题。以下是一个简单的生

    2024年02月03日
    浏览(26)
  • 10、Kafka ------ 消费者组 和 消费者实例,分区 和 消费者实例 之间的分配策略

    10、Kafka ------ 消费者组 和 消费者实例,分区 和 消费者实例 之间的分配策略

    形象来说:你可以把主题内的多个分区当成多个子任务、多个子任务组成项目,每个消费者实例就相当于一个员工,假如你们 team 包含2个员工。 同理: 同一主题下,每个分区最多只会分给同一个组内的一个消费者实例 消费者以组的名义来订阅主题,前面的 kafka-console-consu

    2024年01月19日
    浏览(10)
  • kafka Consumer 消费者使用多线程并发执行,并保证顺序消费, 第一种使用纯线程方式、第二种使用Executors线程池

    kafka Consumer 消费者使用多线程并发执行,并保证顺序消费, 第一种使用纯线程方式、第二种使用Executors线程池

    网上搜索kafka消费者通过多线程进行顺序消费的内容都不太理想,或者太过复杂,所以自己写了几个demo,供大家参考指正。         单个消费者,每秒需要处理1000条数据,每条数据的处理时间为500ms,相同accNum(客户账号)的数据需要保证消费的顺序。 1、如果1秒钟生产

    2024年02月15日
    浏览(9)
  • 13、Kafka ------ kafka 消费者API用法(消费者消费消息代码演示)

    13、Kafka ------ kafka 消费者API用法(消费者消费消息代码演示)

    消费者API的核心类是 KafkaConsumer,它提供了如下常用方法: 下面这些方法都体现了Kafka是一个数据流平台,消费者通过这些方法可以从分区的任意位置、重新开始读取数据。 根据KafkaConsumer不难看出,使用消费者API拉取消息很简单,基本只要几步: 1、创建KafkaConsumer对象,创建

    2024年04月11日
    浏览(14)
  • kafka消费者报错Offset commit ......it is likely that the consumer was kicked out of the group的解决

    2022年10月份接到一个小功能,对接kafka将数据写到数据库,开始的需求就是无脑批量insert,随着时间的推移,业务需求有变更,kafka的生产消息频次越来越高,到今年7月份为止就每秒会有几十条甚至上百条,然后消费消息的代码就报错: Caused by: org.apache.kafka.clients.consumer.Com

    2024年02月07日
    浏览(11)
  • rabbitmq之Consumer Prefetch(消费者预取)

    官方文档: https://www.rabbitmq.com/consumer-prefetch.html https://www.rabbitmq.com/confirms.html#channel-qos-prefetch 测试”消息积压“场景:在消费者没有启动的情况下,生产者先生产很多消息。然后先开启一个a消费者,再开启b消费者,发现只有a消费者不断的消费旧的消息,而b消费者”无动于

    2024年02月11日
    浏览(11)
  • [RocketMQ] Consumer消费者启动主要流程源码 (六)

    [RocketMQ] Consumer消费者启动主要流程源码 (六)

    客户端常用的消费者类是DefaultMQPushConsumer, DefaultMQPushConsumer的构造器以及start方法的源码。 1.创建DefaultMQPushConsumer实例 最终都是调用下面四个参数的构造函数: 指定了命名空间、生产者组、RPC钩子和消费者之间消息分配的策略算法的构造器, 创建了一个DefaultMQPushConsumerImpl实例

    2024年02月16日
    浏览(12)
  • RocketMQ生产者和消费者都开启Message Trace后,Consume Message Trace没有消费轨迹

    RocketMQ生产者和消费者都开启Message Trace后,Consume Message Trace没有消费轨迹

    1、生产者和消费者所属同一个程序 2、生产者开启消息轨迹 3、消费者开启消息轨迹 4、生产者和消费者一起开启后,在RocketMQ可视化界面,无法查看到消息的消费轨迹 注:如果只开启生产者或消费者其中之一的消息轨迹,则消息的消费轨迹是正常的 无法展示消费轨迹 具体原

    2024年02月14日
    浏览(8)
  • RabbitMQ多消费者实例时,保证只有一个消费者进行消费(单活消费者模式)

    RabbitMQ多消费者实例时,保证只有一个消费者进行消费(单活消费者模式)

    有一种业务场景,当人员组织结构变更时,会有大量数据进行推送。这些数据类型有的是add,有的是update,并且必须先add,才能进行update。 这时,为了保证消费顺序,需要 只有一个实例进行按顺序消费,其他实例仅提供日常对外服务 ,不进行消息消费。当唯一消费实例无法

    2024年02月11日
    浏览(8)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包