RocketMQ如何保证消息不丢失
消息丢失主要发生在下面三个地方:
-
消息生产者将消息发送到RocketMQ Broker的这个过程可能出现消息丢失。
-
RocketMQ Broker接收到生产者发送的消息存储的过程消息可能丢失。
-
消费者处理失败,但是将错误进行捕捉,导致消息出现虚假的消费成功。实际上没有消费,但是在MQ看来消费完成了消费。
所以我们可以分别从 Producer、Broker 和 Consumer 三个方法来考虑。
Producer 方面
RocketMQ消息生产方式有三种:同步发送消息、异步发送消息、One-Way发送消息。不同的发送方式试用不同的场景:
-
同步发送消息: 重要的通知(订单状态的更新)、短信系统。
-
异步发送消息: 通常用于响应时间敏感的业务场景。
-
One-way: 主要用于对可靠性要求不高的场景,在金融的场景下不适用。一般是用于日志收集。
如果想要消息不丢失,这里可以采取采取send()同步发消息,发送结果是同步感知的。发送失败后可以重试,设置重试次数。默认3次。
// 甚至重试次数
producer.setRetryTimesWhenSendFailed(10);
下面给大家看看同步和异步发送消息的测试案例
RocketMQ Java SDK提供了同步发送消息和异步发送消息的方式。
下面是两种普通Java代码实现同步和异步发送消息的例子
同步发送消息:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 指定NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者实例
producer.start();
// 创建消息实例,指定主题、标签和消息内容
Message message = new Message("topic_test", "tag_test", "Hello RocketMQ".getBytes());
// 同步发送消息
producer.send(message);
// 关闭生产者实例
producer.shutdown();
}
}
异步发送消息:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 指定NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者实例
producer.start();
// 创建消息实例,指定主题、标签和消息内容
Message message = new Message("topic_test", "tag_test", "Hello RocketMQ".getBytes());
// 异步发送消息
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功,消息ID:" + sendResult.getMsgId());
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送失败,原因:" + throwable.getMessage());
}
});
// 关闭生产者实例
producer.shutdown();
}
}
在同步发送消息中,调用producer.send(message)即可发送消息,代码会一直阻塞等待消息发送成功或失败的响应。在异步发送消息中,通过producer.send(message, sendCallback)方法发送消息,同时传入一个回调函数sendCallback,当发送成功或失败时,会回调该函数进行处理。
下面使用SpringBoot整合RocketMQ实现同步和异步发送消息的例子
同步发送消息:
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class SyncProducerController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/sync")
public String syncProducer() {
// 发送消息,指定主题、标签和消息内容
SendResult sendResult = rocketMQTemplate.syncSend("topic_test", "Hello RocketMQ");
// 返回消息ID
return sendResult.getMsgId();
}
}
异步发送消息:
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class AsyncProducerController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/async")
public String asyncProducer() {
// 发送消息,指定主题、标签和消息内容,并传入回调函数
rocketMQTemplate.asyncSend("topic_test", "Hello RocketMQ", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 发送成功
System.out.println("发送成功,消息ID:" + sendResult.getMsgId());
}
@Override
public void onException(Throwable throwable) {
// 发送失败
System.out.println("发送失败,原因:" + throwable.getMessage());
}
});
// 返回消息
return "消息发送中...";
}
}
在SpringBoot中,我们可以通过注入RocketMQTemplate来发送消息,同时在发送消息时指定主题、标签和消息内容。在异步发送消息中,同样可以传入回调函数来处理发送成功或失败的情况。
Broker 方面
Broker方面主要可以做两件事:
- 修改 Broker 刷盘策略:修改刷盘策略为同步刷盘,默认情况下是异步刷盘的(ASYNC_FLUSH ),可以修改为同步刷盘flushDiskType = SYNC_FLUSH;
- 还有就是对 Broker 进行集群部署,当然部署方案有很多,选择适合业务需求的即可。
Consumer 方面
-
设置消费者端的 ACK 模式为“同步刷盘”,即当消息被消费后,立即向磁盘刷写确认信息。这样可以确保消息在消费后立即被持久化,避免了因系统故障或其他原因导致消息丢失的可能性。
-
使用 RocketMQ 提供的顺序消费功能,确保消息能够按照指定的顺序被消费。这样可以避免由于消息顺序混乱导致的业务错误。
-
在消费者端开启消息重试机制,当消费失败时自动进行重试。可以设置重试次数和重试时间间隔,确保消息能够尽快被消费成功。
-
对于关键业务消息,可以采用多级别的消息存储方式,确保消息在多个层面都有备份,从而提高数据可靠性。
-
定期备份消息数据,并建立容灾机制。在发生故障时,可以及时恢复数据,避免消息丢失。文章来源:https://www.toymoban.com/news/detail-503534.html
大家还可以参考一下这篇文章——如何保证RocketMQ消息不丢失文章来源地址https://www.toymoban.com/news/detail-503534.html
到了这里,关于RocketMQ 常见面试题(三)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!