延迟队列与SpringBoot实战

这篇具有很好参考价值的文章主要介绍了延迟队列与SpringBoot实战。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

延迟队列与SpringBoot实战

概念

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列

TTL介绍

TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列,那么这条消息如果在TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。

设置TTL
  • 消息设置TTL

    rabbitTemplate.convertAndSend("X", "XC", message + "ttl:" + ttl, msg -> {
                msg.getMessageProperties().setExpiration(ttl);
                return msg;
            });
    
  • 队列设置TTL

    args.put("x-message-ttl",15000);
    QueueBuilder.durable(QUEUE_B).withArguments(args).build();
    
  • 如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃

代码实战
配置POM
<dependencies>
        <!--RabbitMQ 依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--swagger-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!--RabbitMQ 测试依赖-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
配置application
spring.rabbitmq.host=192.168.31.232
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
配置Swagger
package com.vmware.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@Configuration
@EnableSwagger2
public class SwaggerConfig {
    @Bean
    public Docket webApiConfig() {
        return new Docket(DocumentationType.SWAGGER_2)
                .groupName("webApi")
                .apiInfo(webApiInfo())
                .select().build();
    }

    private ApiInfo webApiInfo() {
        return new ApiInfoBuilder()
                .title("rabbitmq 接口文档")
                .description("本文档描述了 rabbitmq 微服务接口定义")
                .version("1.0")
                .contact(new Contact("name", "url",
                        "email"))
                .build();
    }
}
代码架构图

创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是direct,创建一个死信队列 QD,它们的绑定关系如下

springboot 延时队列,RabbitMQ,spring boot,java-rabbitmq,rabbitmq

RabbitMQ配置类
package com.vmware.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {
    //普通交换机
    private static final String X_EXCHANGE = "X";
    //死信交换机
    private static final String Y_EXCHANGE = "Y";
    //普通队列A
    private static final String QUEUE_A = "QA";
    //普通队列B
    private static final String QUEUE_B = "QB";
    //普通队列C
    private static final String QUEUE_C = "QC";
    //死信队列D
    private static final String QUEUE_D = "QD";


    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }

    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(Y_EXCHANGE);
    }

    @Bean("queueA")
    public Queue queueA() {
        Map<String, Object> args = new HashMap<>();
        //设置死信交换机
        args.put("x-dead-letter-exchange", Y_EXCHANGE);
        //设置死信Routing Key
        args.put("x-dead-letter-routing-key", "YD");
        //设置超时
        args.put("x-message-ttl", 10000);
        //构建队列
        return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
    }

    @Bean("queueB")
    public Queue queueB() {
        Map<String, Object> args = new HashMap<>();
        //设置死信交换机
        args.put("x-dead-letter-exchange", Y_EXCHANGE);
        //设置死心Routing Key
        args.put("x-dead-letter-routing-key", "YD");
        //设置超时ttl
        args.put("x-message-ttl",15000);
        //构建队列
        return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
    }

    @Bean("queueC")
    public Queue queueC(){
        Map<String,Object> args=new HashMap<>();
        //设置死信交换机
        args.put("x-dead-letter-exchange", Y_EXCHANGE);
        //设置死信Routing Key
        args.put("x-dead-letter-routing-key", "YD");
        //构建队列
        return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
    }

    @Bean("queueD")
    public Queue queueD(){
        //构建死信队列D
        return QueueBuilder.durable(QUEUE_D).build();
    }

    //绑定普通交换机和队列A
    @Bean
    public Binding queueABindingX(){
        return BindingBuilder.bind(queueA()).to(xExchange()).with("XA");
    }

    //绑定普通交换机与队列B
    @Bean
    public Binding queueBBindingX(){
        return BindingBuilder.bind(queueB()).to(xExchange()).with("XB");
    }

    //绑定普通交换机与队列C
    @Bean
    public Binding queueCBindingX(){
        return BindingBuilder.bind(queueC()).to(xExchange()).with("XC");
    }

    //绑定死信交换机与死信队列
    @Bean
    public Binding queueDBindingY(){
        return BindingBuilder.bind(queueD()).to(yExchange()).with("YD");
    }
}
生产者
package com.vmware.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.Date;

@RestController
@RequestMapping("/ttl")
@Slf4j
public class SendMsgController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * @param message 消息
     * @apiNote 生产者代码
     */
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("当前时间:{},发送消息给两个队列:{}", new Date(), message);
        rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10秒的队列" + message);
        rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为15秒的队列" + message);
    }

    @GetMapping("/sendMsg/{message}/{ttl}")
    public void sendMsg(@PathVariable String message, @PathVariable String ttl) {
        rabbitTemplate.convertAndSend("X", "XC", message + "ttl:" + ttl, msg -> {
            msg.getMessageProperties().setExpiration(ttl);
            return msg;
        });
        log.info("当前时间:{},发送消息:{}给队列:XC,ttl:{}", new Date(), message, ttl);
    }
}

消费者
package com.vmware.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

@Slf4j
@Component
public class DeadLetterQueueConsumer {
    @RabbitListener(queues = {"QD"})
    public void receiveD(Message message, Channel channel) {
        log.info("当前时间:{} 死信队列收到消息:{}", new Date(), message);
    }
}
存在的问题

当生产者发布消息到延迟队列后,消息只能按顺序被消费者消费,当某一消息阻塞时间很长时则会导致其他消息一同阻塞,不能达到ttl到期优先被延时队列的消费者所消费的效果

优化

下载插件rabbitmq_delayed_message_exchange到rabbit的plugin目录下

  • 官网:https://www.rabbitmq.com/community-plugins.html

  • ubuntu下载方式

    cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.2/plugins
    sudo wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez
    
  • 启用插件

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    
  • 重启服务

    systemctl restart rabbitmq-server
    
  • 安装完成后可以在rabbit交换机页面看到x-delayed-message
    springboot 延时队列,RabbitMQ,spring boot,java-rabbitmq,rabbitmq

基于插件的延时队列代码实战

在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中
springboot 延时队列,RabbitMQ,spring boot,java-rabbitmq,rabbitmq文章来源地址https://www.toymoban.com/news/detail-546750.html

配置延时队列与交换机
package com.vmware.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayQueueConfig {
    private static final String DELAY_QUEUE_NAME = "delayed.queue";
    private static final String DELAY_EXCHANGE_NAME = "delayed.exchange";
    private static final String DELAY_ROUTING_KEY = "delayed.routingkey";

    @Bean
    public Queue delayQueue(){
        return new Queue(DELAY_QUEUE_NAME);
    }

    @Bean
    public CustomExchange delayExchange(){
        Map<String,Object> args =new HashMap<>();
        args.put("x-delayed-type", "direct");
        /**
         * 1.交换机名称
         * 2.交换机类型:插件类型
         * 3.是否持久化
         * 4.是否自动删除
         */
        return new CustomExchange(DELAY_EXCHANGE_NAME,"x-delayed-message",true,false,args);
    }

    @Bean
    public Binding delayQueueBindExchange(){
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(DELAY_ROUTING_KEY).noargs();
    }
}
生产者
package com.vmware.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.Date;

@RestController
@RequestMapping("/ttl")
@Slf4j
public class SendMsgController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendDelayMsg/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
        rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingkey", message, msg -> {
            msg.getMessageProperties().setDelay(delayTime);
            return msg;
        });
        log.info("当前时间:{},发送一条延迟{}毫秒的信息给队列 delayed.queue:{}", new Date(), delayTime, message);
    }
}
消费者
package com.vmware.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;

@Slf4j
@Component
public class DeadLetterQueueConsumer {
    @RabbitListener(queues = {"delayed.queue"})
    public void receiveDelayedQueue(Message message) {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到延时队列的消息:{}", new Date(), msg);
    }
}
实际效果
2022-07-19 23:33:18.021  INFO 23040 --- [nio-8080-exec-4] com.vmware.controller.SendMsgController  : 当前时间:Tue Jul 19 23:33:18 CST 2022,发送一条延迟20000毫秒的信息给队列 delayed.queue:哈哈哈
2022-07-19 23:33:23.349  INFO 23040 --- [nio-8080-exec-5] com.vmware.controller.SendMsgController  : 当前时间:Tue Jul 19 23:33:23 CST 2022,发送一条延迟2000毫秒的信息给队列 delayed.queue:哈
2022-07-19 23:33:25.332  INFO 23040 --- [ntContainer#0-1] c.v.consumer.DeadLetterQueueConsumer     : 当前时间:Tue Jul 19 23:33:25 CST 2022,收到延时队列的消息:哈
2022-07-19 23:33:37.830  INFO 23040 --- [ntContainer#0-1] c.v.consumer.DeadLetterQueueConsumer     : 当前时间:Tue Jul 19 23:33:37 CST 2022,收到延时队列的消息:哈哈哈
  • 可以看到前一条延时消息并没有阻塞到后面的消息

到了这里,关于延迟队列与SpringBoot实战的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • SpringBoot + RabbitMQ从延时队列中删除指定的值【RabbitMQ中的basicAck和basicNack的区别以及basicReject又是什么?】

    业务需求是,就是我本来是有一个order-queue队列绑定到了死信队列交换机order-dead-direct-exchange上,然后我的业务是,现在有一个用户下单但是没有付款,order-queue队列写入该条信息并计时24小时后如果用户还是未付款状态则移除到死信队列order-dead-queue中。问题来了,如果在这个

    2024年02月16日
    浏览(11)
  • 消息队列——spring和springboot整合rabbitmq

    消息队列——spring和springboot整合rabbitmq

    目录 spring整合rabbitmq——生产者 rabbitmq配置文件信息 倒入生产者工程的相关代码 简单工作模式 spring整合rabbitmq——消费者 spring整合rabbitmq——配置详解 SpringBoot整合RabbitMQ——生产者  SpringBoot整合RabbitMQ——消费者   使用原生amqp来写应该已经没有这样的公司了 创建两个工程

    2024年02月16日
    浏览(21)
  • RabbitMQ+SpringBoot企业版队列实战------【华为云版】

    RabbitMQ+SpringBoot企业版队列实战------【华为云版】

    安装Erlang 官网提示:https://www.erlang-solutions.com/resources/download.html 检测erlang 安装RabbitMQ  文件下载 官网下载地址:

    2024年02月07日
    浏览(11)
  • Docker版RabbitMQ安装延迟队列插件及延迟队列项目应用实战

    Docker版RabbitMQ安装延迟队列插件及延迟队列项目应用实战

    在项目中经常有延迟业务处理的背景,此时可以借助于Rabbitmq的延迟队列进行实现,但Rabbitmq本身并不支持延迟队列,但可以通过安装插件的方式实现延迟队列 首先确认目前项目使用的Rabbitmq的版本,这里博主的版本是3.9.15的。 访问 Rabbitmq的github网址,检索 delay 找到插件 rabb

    2024年02月02日
    浏览(15)
  • springboot kafka 实现延时队列

    springboot kafka 实现延时队列

    好文推荐: 2.5万字详解23种设计模式 基于Netty搭建websocket集群实现服务器消息推送 2.5万字讲解DDD领域驱动设计 延时队列:是一种消息队列,可以用于在指定时间或经过一定时间后执行某种操作。 小编已经做好了 Kafka延时队列的封装,以后只需要一行代码就可以实现kafka延时

    2024年02月03日
    浏览(9)
  • RabbitMQ+springboot用延迟插件实现延迟消息的发送

    RabbitMQ+springboot用延迟插件实现延迟消息的发送

    延迟队列:其实就是死信队列中消息过期的特殊情况 延迟队列应用场景: 可以用死信队列来实现,不过死信队列要等上一个消息消费成功,才会进行下一个消息的消费,这时候就需要用到延迟插件了,不过要线在docker上装一个插件 前置条件是在Docker中部署过RabbitMq。 1、打开

    2024年02月10日
    浏览(13)
  • Centos安装RabbitMQ,JavaSpring发送RabbitMQ延迟延时消息,JavaSpring消费RabbitMQ消息

    Centos安装RabbitMQ,JavaSpring发送RabbitMQ延迟延时消息,JavaSpring消费RabbitMQ消息

    erlang 和 rabbitmq 版本说明 https://www.rabbitmq.com/which-erlang.html 确认需要安装的mq版本以及对应的erlang版本。 RabbitMQ下载地址: https://packagecloud.io/rabbitmq/rabbitmq-server Erlang下载地址: https://packagecloud.io/rabbitmq/erlang RabbitMQ延迟消息插件下载 https://github.com/rabbitmq/rabbitmq-delayed-message-exc

    2024年02月08日
    浏览(13)
  • 【SpringBoot笔记29】SpringBoot集成RabbitMQ消息队列

    这篇文章,主要介绍SpringBoot如何集成RabbitMQ消息队列。 目录 一、集成RabbitMQ 1.1、引入amqp依赖 1.2、添加连接信息 1.3、添加RabbitMQ配置类

    2023年04月08日
    浏览(10)
  • RabbitMQ - 死信队列,延时队列

    RabbitMQ - 死信队列,延时队列

    死信队列: DLX 全称(Dead-Letter-Exchange),称之为死信交换器,当消息变成一个死信之后,如果这个消息所在的队列存在 x-dead-letter-exchange 参数,那么它会被发送到x-dead-letter-exchange对应值的交换器上,这个交换器就称之为死信交换器,与这个死信交换器绑定的队列就是死信队列

    2024年02月09日
    浏览(11)
  • springboot整合rabbitmq死信队列

    springboot整合rabbitmq死信队列

    什么是死信 需要测试死信队列,则需要先梳理整体的思路,如可以采取如下方式进行配置: 从上面的逻辑图中,可以发现大致的思路: .1. 消息队列分为正常交换机、正常消息队列;以及死信交换机和死信队列。 2. 正常队列针对死信信息,需要将数据 重新 发送至死信交换机

    2024年02月11日
    浏览(11)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包