RocketMQ 常见面试题(三)

这篇具有很好参考价值的文章主要介绍了RocketMQ 常见面试题(三)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RocketMQ如何保证消息不丢失

消息丢失主要发生在下面三个地方:

  1. 消息生产者将消息发送到RocketMQ Broker的这个过程可能出现消息丢失。

  2. RocketMQ Broker接收到生产者发送的消息存储的过程消息可能丢失。

  3. 消费者处理失败,但是将错误进行捕捉,导致消息出现虚假的消费成功。实际上没有消费,但是在MQ看来消费完成了消费。

所以我们可以分别从 Producer、Broker 和 Consumer 三个方法来考虑。

Producer 方面

RocketMQ消息生产方式有三种:同步发送消息、异步发送消息、One-Way发送消息。不同的发送方式试用不同的场景:

  • 同步发送消息: 重要的通知(订单状态的更新)、短信系统。

  • 异步发送消息: 通常用于响应时间敏感的业务场景。

  • One-way: 主要用于对可靠性要求不高的场景,在金融的场景下不适用。一般是用于日志收集。

如果想要消息不丢失,这里可以采取采取send()同步发消息,发送结果是同步感知的。发送失败后可以重试,设置重试次数。默认3次。

// 甚至重试次数
producer.setRetryTimesWhenSendFailed(10);

下面给大家看看同步和异步发送消息的测试案例

RocketMQ Java SDK提供了同步发送消息和异步发送消息的方式。

下面是两种普通Java代码实现同步和异步发送消息的例子

同步发送消息:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        // 指定NameServer地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动生产者实例
        producer.start();
        // 创建消息实例,指定主题、标签和消息内容
        Message message = new Message("topic_test", "tag_test", "Hello RocketMQ".getBytes());
        // 同步发送消息
        producer.send(message);
        // 关闭生产者实例
        producer.shutdown();
    }
}

异步发送消息:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        // 指定NameServer地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动生产者实例
        producer.start();
        // 创建消息实例,指定主题、标签和消息内容
        Message message = new Message("topic_test", "tag_test", "Hello RocketMQ".getBytes());
        // 异步发送消息
        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("发送成功,消息ID:" + sendResult.getMsgId());
            }
            @Override
            public void onException(Throwable throwable) {
                System.out.println("发送失败,原因:" + throwable.getMessage());
            }
        });
        // 关闭生产者实例
        producer.shutdown();
    }
}

在同步发送消息中,调用producer.send(message)即可发送消息,代码会一直阻塞等待消息发送成功或失败的响应。在异步发送消息中,通过producer.send(message, sendCallback)方法发送消息,同时传入一个回调函数sendCallback,当发送成功或失败时,会回调该函数进行处理。

下面使用SpringBoot整合RocketMQ实现同步和异步发送消息的例子

同步发送消息:

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class SyncProducerController {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @GetMapping("/sync")
    public String syncProducer() {
        // 发送消息,指定主题、标签和消息内容
        SendResult sendResult = rocketMQTemplate.syncSend("topic_test", "Hello RocketMQ");
        // 返回消息ID
        return sendResult.getMsgId();
    }
}

异步发送消息:

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class AsyncProducerController {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @GetMapping("/async")
    public String asyncProducer() {
        // 发送消息,指定主题、标签和消息内容,并传入回调函数
        rocketMQTemplate.asyncSend("topic_test", "Hello RocketMQ", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                // 发送成功
                System.out.println("发送成功,消息ID:" + sendResult.getMsgId());
            }
            @Override
            public void onException(Throwable throwable) {
                // 发送失败
                System.out.println("发送失败,原因:" + throwable.getMessage());
            }
        });
        // 返回消息
        return "消息发送中...";
    }
}

在SpringBoot中,我们可以通过注入RocketMQTemplate来发送消息,同时在发送消息时指定主题、标签和消息内容。在异步发送消息中,同样可以传入回调函数来处理发送成功或失败的情况。

Broker 方面

Broker方面主要可以做两件事:

  1. 修改 Broker 刷盘策略:修改刷盘策略为同步刷盘,默认情况下是异步刷盘的(ASYNC_FLUSH ),可以修改为同步刷盘flushDiskType = SYNC_FLUSH;
  2. 还有就是对 Broker 进行集群部署,当然部署方案有很多,选择适合业务需求的即可。

Consumer 方面

  1. 设置消费者端的 ACK 模式为“同步刷盘”,即当消息被消费后,立即向磁盘刷写确认信息。这样可以确保消息在消费后立即被持久化,避免了因系统故障或其他原因导致消息丢失的可能性。

  2. 使用 RocketMQ 提供的顺序消费功能,确保消息能够按照指定的顺序被消费。这样可以避免由于消息顺序混乱导致的业务错误。

  3. 在消费者端开启消息重试机制,当消费失败时自动进行重试。可以设置重试次数和重试时间间隔,确保消息能够尽快被消费成功。

  4. 对于关键业务消息,可以采用多级别的消息存储方式,确保消息在多个层面都有备份,从而提高数据可靠性。

  5. 定期备份消息数据,并建立容灾机制。在发生故障时,可以及时恢复数据,避免消息丢失。

大家还可以参考一下这篇文章——如何保证RocketMQ消息不丢失文章来源地址https://www.toymoban.com/news/detail-503534.html

到了这里,关于RocketMQ 常见面试题(三)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • java常见面试题:什么是迭代器模式(Iterator Pattern)?如何实现迭代器模式?

    迭代器模式(Iterator Pattern)是设计模式中的一种,它提供了一种顺序访问一个聚合对象(如列表、集合等)中各个元素的方法,而又不需要暴露该对象的内部表示。使用迭代器模式,可以方便地遍历一个聚合对象的所有元素,而不需要了解该对象的底层结构。 迭代器模式主

    2024年01月18日
    浏览(19)
  • RocketMQ是是如何管理消费进度的?又是如何保证消息成功消费的?

    RocketMQ是是如何管理消费进度的?又是如何保证消息成功消费的?

    consumer的每个实例是靠队列分配来决定如何消费消息的。那么消费进度具体是如何管理的,又是如何保证消息成功消费的?(RocketMQ有保证消息肯定消费成功的特性,失败则重试) 什么是ACK 消息确认机制 在实际使用RocketMQ的时候我们并不能保证每次发送的消息都刚好能被消费者

    2023年04月13日
    浏览(14)
  • RocketMQ和Kafka的区别,以及如何保证消息不丢失和重复消费

    RocketMQ和Kafka的区别,以及如何保证消息不丢失和重复消费

    性能(单台) 语言 多语言支持客户端 优缺点 RocketMQ 十万级 java java 模型简单、接口易用,在阿里有大规模应用 文档少,支持的语言少 Kafka 百万级 服务端scala,客户端java 主流语言均支持 天生分布式、性能最好,常用于大数据领域 运维难度大,对zookeeper强依赖,多副本机制

    2024年01月16日
    浏览(11)
  • 常见面试题之框架篇

    常见面试题之框架篇

    不是线程安全的,是这样的。 当多用户同时请求一个服务时,容器会给每一个请求分配一个线程,这是多个线程会并发执行该请求对应的业务逻辑(成员方法),如果该处理逻辑中有对该单列状态的修改(体现为该单例的成员属性),则必须考虑线程同步问题。 Spring 框架并

    2024年02月10日
    浏览(10)
  • MYSQL常见面试题汇总

    MYSQL常见面试题汇总

    英杰社区 https://bbs.csdn.net/topics/617804998 1、三大范式 2、DML 语句和 DDL 语句区别 3、主键和外键的区别 4、drop、delete、truncate 区别 5、基础架构 6、MyISAM 和 InnoDB 有什么区别? 7、推荐自增id作为主键问题 8、为什么 MySQL 的自增主键不连续 9、redo log 是做什么的? 10、redo log 的刷盘时

    2024年02月16日
    浏览(7)
  • Docker常见面试题 | 答案

    目录 1、Docker 是什么? 2、Docker的三大核心是什么? 3、仓库、镜像、容器的关系是? 4、Docker与虚拟机的区别 5、Docker容器的集中状态 6、如何把主机的东西拷贝到容器内部? 7、进入容器的方法有哪些? 8、如何让容器随着 Docker 服务启动而自动启动? 9、如何指定容器的端口映

    2024年01月20日
    浏览(14)
  • Java并发常见面试题

    Java并发常见面试题

    何为进程? 进程是程序的一次执行过程,是系统运行程序的基本单位,因此进程是动态的。系统运行程序,是一个进程从创建、运行到消亡的过程。 在Java中,当我们启动main函数时其实就是启动了一个JVM的进程,而main函数所在的线程就是这个进程中的一个线程,也称主线程

    2024年02月05日
    浏览(11)
  • ZooKeeper常见面试题

    1、Zookeeper是什么框架 分布式的、开源的分布式应用程序协调服务,原本是Hadoop、HBase的一个重要组件。 应用场景 Zookeeper的功能很强大,应用场景很多,结合我实际工作中使用Dubbo框架的情况,Zookeeper主要是做注册中心用。 基于Dubbo框架开发的提供者、消费者都向Zookeeper注册

    2024年02月10日
    浏览(9)
  • React常见面试题

    React 是一个用于构建用户界面的 JavaScript 库。它由 Facebook 开发,被认为是 MV* 模式中 V(视图)层的一部分。React 的核心思想是组件化编程,通过将应用切分成多个组件,开发者可以更 小而简单地管理代码,并且复用性更高。 优点: 高效:React使用虚拟DOM技术,可以最小化

    2024年02月07日
    浏览(7)
  • Redis常见面试题

    用缓存,主要有两个用途: 高性能 、 高并发 。 高性能 假设这么个场景,你有个操作,一个请求过来,吭哧吭哧你各种乱七八糟操作 mysql,半天查出来一个结果,耗时 600ms。但是这个结果可能接下来几个小时都不会变了,或者变了也可以不用立即反馈给用户。那么此时咋办

    2023年04月09日
    浏览(28)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包