Golang RabbitMQ实现的延时队列

这篇具有很好参考价值的文章主要介绍了Golang RabbitMQ实现的延时队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


前言

之前做秒杀商城项目的时候使用到了延时队列来解决订单超时问题,本博客就总结一下Golang是如何利用RabbitMQ实现的延时队列的。

一、延时队列与应用场景

延迟队列是一种特殊类型的消息队列,用于在一定时间后将消息投递给消费者。它可以用于处理需要延迟执行的任务或者具有定时特性的业务场景。使用延迟队列可以灵活地控制消息的发送和处理时间,适用于很多场景,如订单超时处理、提醒任务等

具体应用场景有如下:

  1. 订单取消:当订单生成时,将订单消息发送到延迟队列中,并设置延迟时间为十分钟。消费者在十分钟后接收到订单消息并进行关闭操作。

  2. 店铺商品提醒:在店铺创建时,将提醒消息发送到延迟队列中,并设置延迟时间为十天。消费者在十天后接收到消息并发送提醒通知。

  3. 用户登录提醒:用户注册成功后,将提醒消息发送到延迟队列中,并设置延迟时间为三天。消费者在三天后接收到消息并发送短信提醒。

  4. 退款通知:当用户发起退款时,将通知消息发送到延迟队列中,并设置延迟时间为三天。消费者在三天后接收到消息并通知相关运营人员。

  5. 会议提醒:在会议预定时,将提醒消息发送到延迟队列中,并设置延迟时间为预定时间前十分钟。消费者在指定时间点前十分钟接收到消息并发送会议参加通知。

通过使用延迟队列,可以在指定的时间点触发任务,避免了轮询的低效方式,并且能够满足大量数据和时效性的需求。这种方法提供了更高的性能和实时性,并有效减轻了系统的负载。
下图是订单超时处理的流程图。
Golang RabbitMQ实现的延时队列,golang,rabbitmq,开发语言

二、RabbitMQ如何实现延时队列

虽然 rabbitmq 没有延时队列的功能,但是稍微变动一下也是可以实现的。
通过设置消息的 TTL 和 DLX 等参数,可以将消息转发到一个指定的队列中,以便在一定的时间后再进行处理。

实现延时队列的基本要素

1、存在一个倒计时机制:Time To Live(TTL)
2、当到达时间点的时候会触发一个发送消息的事件:Dead Letter Exchanges(DLX)

基于第一点,我利用的是消息存在过期时间这一特性, 消息一旦过期就会变成dead letter,可以让单独的消息过期,也可以设置整个队列消息的过期时间 而rabbitmq会有限取两个值的最小
**基于第二点,**是用到了rabbitmq的过期消息处理机制: . x-dead-letter-exchange 将过期的消息发送到指定的 exchange 中 . x-dead-letter-routing-key 将过期的消息发送到自定的 route当中

整体的实现原理如下

发送者将消息发送到延时队列上并设置过期时间,当过期时间到达时,消息会被自动转发到指定的交换机和队列中供接收者消费。
1、建立与 RabbitMQ 服务器的连接并创建通道。
2、发送者通过 ch.Publish 方法将消息发送到延时队列(“test_delay”)上,设置消息的过期时间。
3、延时队列中的消息在到达过期时间后会自动被发送到 “logs” 交换机,由交换机将消息广播给所有绑定的队列。
4、接收者通过监听 “test_logs” 队列接收并处理消息。当有消息到达时,会触发回调函数进行处理。
也就是说要实现延时队列,消费者必须试实现两个队列。
一个是延时队列(“test_delay”),另一个是接收延时消息的队列(“test_logs”)。

这两个队列的作用如下:
延时队列(“test_delay”):这个队列用于接收需要延时发送的消息。发送者通过将消息发送到延时队列,设置消息的过期时间。当消息过期时,RabbitMQ 会自动将消息转发到指定的交换机和队列中。
接收延时消息的队列(“test_logs”):这个队列用于接收延时消息。在示例中,这个队列是通过将 “test_logs” 队列绑定到 “logs” 交换机上来实现的。交换机会将消息广播给所有绑定的队列,因此当延时消息到达过期时间后,会被发送到这个队列中供消费者进行处理。
通过使用两个队列,消息可以被延时发送到指定的队列,并在过期后自动转发到接收队列,实现了延时发送和消费的功能。

三、Go语言实战

生产者

首先建立与 RabbitMQ 服务器的连接,并创建一个通道。然后,通过 ch.Publish 方法将消息发送到延时队列上。这里使用的是空字符串作为交换机(exchange),表示不选择任何交换机,只将消息发送到指定的队列(“test_delay”)。在消息的属性中,设置了消息的过期时间为 5 秒。


func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()
	body := "hello"
	// 将消息发送到延时队列上
	err = ch.Publish(
		"", 				// exchange 这里为空则不选择 exchange
		"test_delay",     	// routing key
		false,  			// mandatory
		false,  			// immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
			Expiration: "5000",	// 设置五秒的过期时间
		})
	failOnError(err, "Failed to publish a message")

	log.Printf(" [x] Sent %s", body)
}

消费者

同样建立与 RabbitMQ 服务器的连接,并创建一个通道。然后,声明了一个名为 “logs” 的交换机,类型为 “fanout”,并且可持久化,表示该交换机会将消息广播给所有绑定的队列。接着,声明了一个常规的队列 “test_logs”,并将其绑定到 “logs” 交换机上。之后,声明了一个延时队列 “test_delay”,并设置了该队列的 x-dead-letter-exchange 参数为 “logs”,即当消息过期时将消息发送到 “logs” 交换机。最后,通过 ch.Consume 方法监听 “test_logs” 队列,并在回调函数中处理接收到的消息。文章来源地址https://www.toymoban.com/news/detail-694202.html


func main() {
	// 建立链接
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 声明一个主要使用的 exchange
	err = ch.ExchangeDeclare(
		"logs",   // name
		"fanout", // type
		true,     // durable
		false,    // auto-deleted
		false,    // internal
		false,    // no-wait
		nil,      // arguments
	)
	failOnError(err, "Failed to declare an exchange")

	// 声明一个常规的队列, 其实这个也没必要声明,因为 exchange 会默认绑定一个队列
	q, err := ch.QueueDeclare(
		"test_logs",    // name
		false, // durable
		false, // delete when unused
		true,  // exclusive
		false, // no-wait
		nil,   // arguments
	)
	failOnError(err, "Failed to declare a queue")

    /**
     * 注意,这里是重点!!!!!
     * 声明一个延时队列, ß我们的延时消息就是要发送到这里
     */
	_, errDelay := ch.QueueDeclare(
		"test_delay",    // name
		false, // durable
		false, // delete when unused
		true,  // exclusive
		false, // no-wait
		amqp.Table{
			// 当消息过期时把消息发送到 logs 这个 exchange
			"x-dead-letter-exchange":"logs",
		},   // arguments
	)
	failOnError(errDelay, "Failed to declare a delay_queue")

	err = ch.QueueBind(
		q.Name, // queue name, 这里指的是 test_logs
		"",     // routing key
		"logs", // exchange
		false,
		nil)
	failOnError(err, "Failed to bind a queue")

	// 这里监听的是 test_logs
	msgs, err := ch.Consume(
		q.Name, // queue name, 这里指的是 test_logs
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Failed to register a consumer")

	forever := make(chan bool)

	go func() {
		for d := range msgs {
			log.Printf(" [x] %s", d.Body)
		}
	}()

	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
	<-forever
}

到了这里,关于Golang RabbitMQ实现的延时队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 基于golang多消息队列中间件的封装nsq,rabbitmq,kafka

    场景 在创建个人的公共方法库中有这样一个需求,就是不同的项目会用到不同的消息队列中间件,我的思路把所有的消息队列中间件进行封装一个消息队列接口(MQer)有两个方法一个生产一个消费,那么在实例化对象的时候根据配置文件指定当前项目使用的那个消息队列中

    2024年02月14日
    浏览(48)
  • RabbitMQ延时队列的实现原理和应用实例

    RabbitMQ延时队列的实现原理和应用实例

    TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。 目前有两种方法可以设置消息的 TTL: 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间; 第二种方法是对消息本身进行单独设置,每条消息

    2024年02月05日
    浏览(11)
  • 一种多策略下RabbitMQ的延时队列实现

    一种多策略下RabbitMQ的延时队列实现

    场景: 最近在开发一款系统中遇到这样一个场景,A系统开通套餐需要把套餐信息以邮件的形式发送给相关工作人员,经过人工审核通过后,在B系统里面开通,A系统会调B系统套餐列表接口查询套餐是否开通成功,开通成功则从A系统去完成订单,假如超过设定时间未开通成功,则关闭订

    2024年02月12日
    浏览(8)
  • RabbitMQ延时队列的详细介绍以及Java代码实现

    RabbitMQ延时队列的详细介绍以及Java代码实现

    前言:大家好,我是小威,24届毕业生,在一家满意的公司实习。本篇文章将详细介绍RabbitMQ的延时队列以及其详细代码实现。 如果文章有什么需要改进的地方还请大佬不吝赐教 👏👏。 小威在此先感谢各位大佬啦~~🤞🤞 🏠个人主页:小威要向诸佬学习呀 🧑个人简介:大

    2024年02月01日
    浏览(7)
  • RabbitMQ - 死信队列,延时队列

    RabbitMQ - 死信队列,延时队列

    死信队列: DLX 全称(Dead-Letter-Exchange),称之为死信交换器,当消息变成一个死信之后,如果这个消息所在的队列存在 x-dead-letter-exchange 参数,那么它会被发送到x-dead-letter-exchange对应值的交换器上,这个交换器就称之为死信交换器,与这个死信交换器绑定的队列就是死信队列

    2024年02月09日
    浏览(11)
  • 【技术分享】四、RabbitMQ “延时队列”

    【技术分享】四、RabbitMQ “延时队列”

    延时的含义为 等待一段时间,应用到RabbitMQ 消息 发布/订阅 模型中的概念就是,拿到消息后不想立即消费,等待一段时间再执行。 ex: 定时任务:十分钟后执行某种操作。 批量发送短信:用户量过大,一次性发送短信卡死,可以将几万条消息分布在10分钟内随机发送完成。

    2024年02月08日
    浏览(10)
  • .NET中使用RabbitMQ延时队列和死信队列

    .NET中使用RabbitMQ延时队列和死信队列

    延时队列是RabbitMQ中的一种特殊队列,它可以在消息到达队列后延迟一段时间再被消费。 延时队列的实现原理是通过使用消息的过期时间和死信队列来实现。当消息被发送到延时队列时,可以为消息设置一个过期时间,这个过期时间决定了消息在延时队列中等待的时间。如果

    2024年02月15日
    浏览(9)
  • 深入浅出RabbitMQ:顺序消费、死信队列和延时队列

    深入浅出RabbitMQ:顺序消费、死信队列和延时队列

    大家好,我是小❤,一个漂泊江湖多年的 985 非科班程序员,曾混迹于国企、互联网大厂和创业公司的后台开发攻城狮。 上篇文章(应对流量高峰的利器——消息中间件)中,我们已经介绍了消息中间件的用途,主要用作:解耦、削峰、异步通信、应用解耦,并介绍了业界常

    2024年02月03日
    浏览(9)
  • rabbitmq延时队列自动解锁库存

    rabbitmq延时队列自动解锁库存

    一、库存服务自动解锁库存 使用了最终一致性来解决分布式事务 当order服务出现异常回滚,此时ware服务无法回滚,怎么办? 使用seata全局事务虽然能在order服务出现异常导致 回滚 时使其他服务的也能同时回滚,但在流量大的情况下是使用加锁的方式,效率 低不适合并发量大

    2024年02月16日
    浏览(9)
  • 创建延时队列、springboot配置多个rabbitmq

    创建延时队列、springboot配置多个rabbitmq

    type选择fanout (图中已经绑定,红框为绑定过程) (图中已经绑定,红框为绑定过程) 延时队列时间到之后,将消息发送给queue.file_destroy,执行删除文件操作 RabbitConfig配置类 mq1 mq2 application-prod.yaml mq1消费端,发消息给mq2 mq2消费端用于递归删除文件 FileHelper工具类递归删除文件或文

    2024年02月11日
    浏览(9)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包