Kafka系列之消息重新消费

这篇具有很好参考价值的文章主要介绍了Kafka系列之消息重新消费。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

概述

需求来源,在review前人留下的屎山代码时发现如下截图所示的代码片段:
kafka设置从最新消费,Kafka,kafka
也就是说代码是空实现的。另外,从类名定义也知道需求未实现。

于是有此需求:已经消费过的消息重新消费。

调研

调研下来,主要有以下3种可能性方案

实现方案

  1. 修改偏移量,即offset,可通过脚本快速实现
  2. 新增Group,这将使 Kafka 认为您正在使用一个新的消费者组,并从起始偏移量开始重新消费消息。需通过代码实现
  3. 消息产生者重新发送消息,实际业务中不太常见,也不现实

Kafka的偏移量的保存方式,根据不同版本号有3种方式:保存在zookeeper中、保存在kafka的自带_consumer_offset这个topic中、保存在自定义的存储系统中。

版本

首先需要知道Kafka什么版本,找到Kafka的安装目录。可以通过find命令:find / -name '*kafka*'
根据命令输出得知安装目录为:/usr/local/kafka,进入到libs目录,发现很多kafka_2.11-2.2.0.*文件。其中,2.11为scala版本,2.2.0为kafka版本。

脚本

使用Kafka自带的bin目录下的kafka-consumer-groups.sh脚本设置消费者组(consumer group)的位移, 这是0.11.0.0版本提供的新功能且只适用于新版本consumer。在此版本之前,如果要为已有的consumer group调整位移必须要手动编写Java程序调用KafkaConsumer.seek()方法。

使用此脚本可修改consumer group的位移,有个前提:consumer group必须是inactive的,即不能是处于正在工作中的状态。

格式:
./kafka-consumer-groups.sh --bootstrap-server ip:9092 --group <> --topic <topic> --reset-offsets <offset_option>

如:./kafka-consumer-groups.sh --bootstrap-server 172.100.200.200:9092 --group collect_data_business_service --topic topic_event:0 --reset-offsets --to-offset 195725 --execute

topic:指定topic的作用域

  • --all-topics:为consumer group下所有topic的所有分区调整位移
  • --topic t1 --topic t2:为指定的若干个topic的所有分区调整位移
  • --topic t1:0,1,2:为指定的topic的某个分区调整位移

reset-offsets:确定位移重设策略

  • --to-earliest:把位移调整到分区当前最小位移
  • --to-latest:把位移调整到分区当前最新位移
  • --to-current:把位移调整到分区当前位移
  • --to-offset <offset>:把位移调整到指定位移处
  • --shift-by N:把位移调整到当前位移 + N处,N为负数表示向前移动
  • --to-datetime <datetime>:把位移调整到大于给定时间的最早位移处,datetime格式yyyy-MM-ddTHH:mm:ss.xxx
  • --by-duration <duration>:把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,如PT0H5M0S
  • --from-file <file>:从CSV文件中读取调整策略

常见报错:

  • Error: Executing consumer group command failed due to org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
    原因:命令指定的IP有误,或端口不对,或未开放等原因导致连接超时
  • Error: Assignments can only be reset if the group ‘collect_data_business_service’ is inactive, but the current state is Stable.
    原因:在修改offset时,需要停止消费者应用程序,如杀掉Java进程,停止container容器等
  • WARN New offset (0) is lower than earliest offset for topic partition topic_event-0. Value will be set to 195551 (kafka.admin.ConsumerGroupCommand$)
    原因:

代码

也可以通过代码的方式,指定一个新的消费者组,即group.id

@Override
public void afterPropertiesSet() throws Exception {
    Properties p = new Properties();
    p.setProperty("bootstrap.servers", "172.100.200.200:9092");
    p.setProperty("group.id", "collect_data_business_service");
    p.setProperty("auto.offset.reset", "earliest");
    p.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    p.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(p);
    consumer.subscribe(Collections.singleton("topic_event"));
    consumer.poll(Duration.ofSeconds(1));
    consumer.seekToBeginning(consumer.assignment());
    while (true) {
        ConsumerRecords<String, String> message = consumer.poll(Duration.ofSeconds(1));
        log.info("topic_event---collect_data_business_service--消费消息:" + message);
        CollectDataV2Content content = JsonUtil.jsonToBean(message.toString(), CollectDataV2Content.class);
        service.saveContent(content);
    }
}

配置:文章来源地址https://www.toymoban.com/news/detail-781278.html

  • auto.offset.reset:只能配置以下3种情况:latest, earliest, none,Spring-Kafka应用启动时会检查此项配置,不正确的配置会报错,应用启动失败
  • key.deserializer:必须配置,否则报错:Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value.,应用启动失败。
  • value.deserializer:同上

参考

到了这里,关于Kafka系列之消息重新消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 分布式 - 消息队列Kafka:Kafka 消费者的消费位移

    分布式 - 消息队列Kafka:Kafka 消费者的消费位移

    01. Kafka 分区位移 对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。偏移量从0开始,每个新消息的偏移量比前一个消息的偏移量大1。 每条消息在分区中的位置信息由一个叫位移(Offset)的数据来表征。分区位移总是从 0 开始,假设一

    2024年02月12日
    浏览(10)
  • kafka入门(一):kafka消息发送与消费

    kafka的基础概念 Producer (消息生产者) 向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息。 Consumer (消息消费者) 订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。 Consumer Group (消费者组) 每个消费

    2024年04月12日
    浏览(10)
  • 13、Kafka ------ kafka 消费者API用法(消费者消费消息代码演示)

    13、Kafka ------ kafka 消费者API用法(消费者消费消息代码演示)

    消费者API的核心类是 KafkaConsumer,它提供了如下常用方法: 下面这些方法都体现了Kafka是一个数据流平台,消费者通过这些方法可以从分区的任意位置、重新开始读取数据。 根据KafkaConsumer不难看出,使用消费者API拉取消息很简单,基本只要几步: 1、创建KafkaConsumer对象,创建

    2024年04月11日
    浏览(14)
  • 分布式 - 消息队列Kafka:Kafka消费者和消费者组

    分布式 - 消息队列Kafka:Kafka消费者和消费者组

    1. Kafka 消费者是什么? 消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者

    2024年02月13日
    浏览(11)
  • 分布式 - 消息队列Kafka:Kafka 消费者消费位移的提交方式

    分布式 - 消息队列Kafka:Kafka 消费者消费位移的提交方式

    最简单的提交方式是让消费者自动提交偏移量,自动提交 offset 的相关参数: enable.auto.commit:是否开启自动提交 offset 功能,默认为 true; auto.commit.interval.ms:自动提交 offset 的时间间隔,默认为5秒; 如果 enable.auto.commit 被设置为true,那么每过5秒,消费者就会自动提交 poll() 返

    2024年02月12日
    浏览(13)
  • Kafka消息消费流程详解

    在分布式系统中,Kafka是一种常用的消息队列系统,用于实现高可靠性的消息传递。本文将介绍Kafka消息消费的流程,并提供相应的示例代码。 Kafka消费者的流程可以概括为以下几个步骤: 创建Kafka消费者实例; 订阅一个或多个主题; 拉取消息记录; 处理消息; 提交消费位

    2024年02月09日
    浏览(9)
  • Kafka如何保证消息的消费顺序【全局有序、局部有序】、Kafka如何保证消息不被重复消费、Kafka为什么这么快?【重点】、Kafka常见问题汇总【史上最全】

    Kafka如何保证消息的消费顺序【全局有序、局部有序】、Kafka如何保证消息不被重复消费、Kafka为什么这么快?【重点】、Kafka常见问题汇总【史上最全】

    目录 Kafka消息生产 一个Topic对应一个Partition 一个Topic对应多个Partition Kafka消息的顺序性保证(Producer、Consumer) 全局有序 局部有序  max.in.flight.requests.per.connection参数详解 Kafka的多副本机制 Kafka的follower从leader同步数据的流程 Kafka的follower为什么不能用于消息消费 Kafka的多分区

    2024年04月11日
    浏览(14)
  • kafka如何避免消息重复消费

    kafka如何避免消息重复消费

    Kafka 避免消息重复消费通常依赖于以下策略和机制: Kafka使用Consumer Group ID来跟踪每个消费者所读取的消息。确保每个消费者都具有唯一的Group ID。如果多个消费者属于同一个Group ID,那么它们将共享消息,但每个分区的消息只能由一个消费者处理。 Kafka会记录每个消费者组消

    2024年01月15日
    浏览(13)
  • 查看kafka消息消费堆积情况

    查看kafka消息消费堆积情况

    查看主题命令 展示topic列表 描述topic 查看topic某分区偏移量最大(小)值 增加topic分区数 删除topic:慎用,只会删除zookeeper中的元数据,消息文件须手动删除 方法一: 方法二: 待验证 查看topic消费进度,必须参数为–group, 不指定–topic,默认为所有topic, 列出所有主题中的

    2024年03月13日
    浏览(9)
  • Kafka 消息发送和消费流程

    流程如下: Producer 端直接将消息发送到 Broker 中的 Leader 分区中 Broker 对应的 Leader 分区收到消息会先写入 Page Cache,定时刷盘进行持久化(顺序写入磁盘) Follower 分区拉取 Leader 分区的消息,并保持与 Leader 分区数据一致,待消息拉取完毕后需要给 Leader 分区回复 ACK 确认消息

    2024年02月12日
    浏览(13)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包