RabbitTemplate的创建与配置

这篇具有很好参考价值的文章主要介绍了RabbitTemplate的创建与配置。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

创建与配置

RabbitTemplate是我们在与SpringAMQP整合的时候进行发送消息的关键类
该类提供了丰富的发送消息的方法,包括可靠性消息投递、回调监听消息接口ConfirmCallback、返回值确认接口
ReturnCallback等等同样我们需要注入到Spring容器中,然后直接使用。
在与spring整合时需要实例化,但是在与Springboot整合时,只需要添加配置文件即可
首先将其注入到bean里面:
如果自己不注入
那么RabbitAutoConfiguration中也会最RabbitTemplate进行自动装配。

        @Bean
        @ConditionalOnSingleCandidate(ConnectionFactory.class)
        @ConditionalOnMissingBean({RabbitOperations.class})
        public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate();
            configurer.configure(template, connectionFactory);
            return template;
        }

connectionFactory是用于创建与rabbitMq服务器之间通信的链接工场类。
configurer对rabbitTemplate进行配置
配置方法

    public void configure(RabbitTemplate template, ConnectionFactory connectionFactory) {
        PropertyMapper map = PropertyMapper.get();
        template.setConnectionFactory(connectionFactory);
        if (this.messageConverter != null) {
            template.setMessageConverter(this.messageConverter);
        }

        template.setMandatory(this.determineMandatoryFlag());
        Template templateProperties = this.rabbitProperties.getTemplate();
        if (templateProperties.getRetry().isEnabled()) {
            template.setRetryTemplate((new RetryTemplateFactory(this.retryTemplateCustomizers)).createRetryTemplate(templateProperties.getRetry(), Target.SENDER));
        }

        templateProperties.getClass();
        map.from(templateProperties::getReceiveTimeout).whenNonNull().as(Duration::toMillis).to(template::setReceiveTimeout);
        templateProperties.getClass();
        map.from(templateProperties::getReplyTimeout).whenNonNull().as(Duration::toMillis).to(template::setReplyTimeout);
        templateProperties.getClass();
        map.from(templateProperties::getExchange).to(template::setExchange);
        templateProperties.getClass();
        map.from(templateProperties::getRoutingKey).to(template::setRoutingKey);
        templateProperties.getClass();
        map.from(templateProperties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue);
    }

从配置方法中可以看出来,用户可以对rabbitTemplate做的一些自定义操作。

mandatory

一. Confirm消息确认机制和Return机制
Confirm消息确认机制: 生产者向MQ投递完消息后,要求MQ返回一个应答,生产者异步接收该应答,用来确定该消息是否正常的发送到了Broker, 从而保障消息的可靠性投递
Return消息返回机制:该机制用于处理一些不可路由的消息。如果生产在发送消息时,发现当前的exchange不存在或者指定的路由key找不到时,生产者可以开启该模式来监听这种不可达的消息,以进行后续。(如果不开启的话,broker会自动删除该消息)
这里要注意的是,只要消息到达了MQ就换返回Confirm消息,接下来MQ再去判断能不能找到路由方式,找不到再返回Return消息

Confirm消息确认机制的实现

confirm-type有none、correlated、simple这三种类型
none:表示禁用发布确认模式,默认值,使用此模式之后,不管消息有没有发送到Broker都不会触发 ConfirmCallback回调。
correlated:表示消息成功到达Broker后触发ConfirmCalllBack回调
simple:simple模式下如果消息成功到达Broker后一样会触发

  1. 首先在配置文件中设置
spring:
  rabbitmq:
    publisher-confirm-type: correlated

开启消息确认模式

  1. 实现ConfirmCallback接口
RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack){
                    log.info(correlationData.toString() + "发送成功");
                }else {
                    log.error(correlationData.toString() + "发送失败, 原因: " + cause);
                }
            }
        };

其中重写的方法包含三个参数:

correlationData:发送消息时设置的correlationData。由于confirm消息是异步监听的,因此需要在发送消息时传递一个correlationData,从而在返回confirm消息时判断其属于哪个消息,所以correlationData通常设置为消息的唯一ID;
ack:broker返回的应答,如果broker成功接收消息,则返回true代表接收成功,如果因为各种原因没有成功接收(如消息队列满了),则返回false
这里要注意,由于各种原因(如网络波动),生产端可能并没有收到confirm消息,因此不能将后续的补偿处理仅仅寄希望于在else内完成,else内做的补偿仅仅是在生产端收到confirm消息后nack的情况

  • cause: 如果没有被成功接收,则返回原因
  1. 为rabbitTemplate添加刚刚的Confirm监听器
rabbitTemplate.setConfirmCallback(confirmCallback());

Return消息返回机制的实现
  1. Return消息返回机制的实现与上面的Confirm消息确认机制的实现类似
spring:
  rabbitmq:
    publisher-returns: true
    template:
      mandatory: true

注意,mandatory一定要设置为true,否则找不到路由规则的消息会被broker直接抛弃

  1. 实现returnCallback接口
RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyTest, String exchange, String routingKey) {
               log.error("消息{}路由失败,失败原因:{}",message.getMessageProperties().getMessageId(),replyTest);
            }
};

MessageConverter

Rabbit原始的序列化方法是把数据转化为字节数组。
我们调用rabbitTemplate的convertAndSend方法时

	@Override
	public void convertAndSend(String exchange, String routingKey, final Object object,
			@Nullable CorrelationData correlationData) throws AmqpException {

		send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
	}

rabbitmq会进行序列化操作

protected Message convertMessageIfNecessary(final Object object) {
		if (object instanceof Message) {
			return (Message) object;
		}
		return getRequiredMessageConverter().toMessage(object, new MessageProperties());
	}
	private MessageConverter messageConverter = new SimpleMessageConverter();

如果不指定。那么使用的是SimpleMessageConverter
使用的是转化为二进制字节数组

	public final class SerializationUtils {
    private SerializationUtils() {
    }

    public static byte[] serialize(Object object) {
        if (object == null) {
            return null;
        } else {
            ByteArrayOutputStream stream = new ByteArrayOutputStream();

            try {
                (new ObjectOutputStream(stream)).writeObject(object);
            } catch (IOException var3) {
                throw new IllegalArgumentException("Could not serialize object of type: " + object.getClass(), var3);
            }

            return stream.toByteArray();
        }
    }
	 /**
     * Creates a newly allocated byte array. Its size is the current
     * size of this output stream and the valid contents of the buffer
     * have been copied into it.
     *
     * @return  the current contents of this output stream, as a byte array.
     * @see     java.io.ByteArrayOutputStream#size()
     */
    public synchronized byte toByteArray()[] {
        return Arrays.copyOf(buf, count);
    }

接受消息的时候需要准换回string,再进一步进行操作

   new String(message.getBody()));
  public void ListenerQueue01(Message message){
        String body =  new String(message.getBody()));
        System.out.println("body===="+body);
        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    }

那么如果我们需要指定序列化方法,只要自定义一个返回json数据类型的MessageConverter就可以了。
代码:

@Configuration
public class MyMsgConverter {

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

在发送和接受消息的时候,会自动进行序列化操作
可以这么接收消息

  @RabbitListener(queues = "QUEUE_DEMO_DIRECT")
    public void ListenerQueue01(Message message, Map data){
        System.out.println("mess===="+message);
        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    }

RetryTemplate

这个等待研究rabbitMq的重试策略的时候再进行说明

receiveTimeout

receive() 操作的超时时间
这个方法是消费者使用的方法
rabbitTemplate的receive方法是用来接收消息的方法
例如配置

spring:
  # RabbitMQ 配置项,对应 RabbitProperties 配置类
  rabbitmq:
    template:
      receiveTimeout: 5s

  @Test
    public void receiveTest() {
        Message queue_demo_direct = rabbitTemplate.receive("QUEUE_DEMO_DIRECT");
        System.out.println(queue_demo_direct);
    }

那么receive方法会阻塞5秒。等待拉消费的新消息,如5秒内都没有新消息过来,那么返回空

ReplyTimeout

sendAndReceive() 操作的超时时间
这个方法是生产者会使用的方法

// 向发布订阅模式里面发送消息
public void sendPublish() {
    for (int i = 0; i < 5; i++) {
        // rabbitTemplate.convertSendAndReceive("exchange_fanout", "", "测试发布订阅模型:" + i);
        rabbitTemplate.convertAndSend("exchange_fanout", "", "测试发布订阅模型:" + i);
    }
}

使用 convertAndSend 方法时的结果:输出时没有顺序,不需要等待,直接运行
RabbitTemplate的创建与配置
使用 convertSendAndReceive 方法时的结果:使用此方法,只有确定消费者接收到消息,才会发送下一条信息,每条消息之间会有间隔时间
RabbitTemplate的创建与配置总结
convertSendAndReceive(…):可以同步消费者。使用此方法,当确认了所有的消费者都接收成功之后,才触发另一个convertSendAndReceive(…),也就是才会接收下一条消息。RPC调用方式。
convertAndSend(…):使用此方法,交换机会马上把所有的信息都交给所有的消费者,消费者再自行处理,不会因为消费者处理慢而阻塞线程。

示例

rabbitmq:
   # host: 192.168.19.128 # RabbitMQ 服务的地址
    host: 192.168.159.100 # RabbitMQ 服务的地址
    port: 5672 # RabbitMQ 服务的端口
    username: ping # RabbitMQ 服务的账号
    password: 123456 # RabbitMQ 服务的密码
    template:
      mandatory: true
      receiveTimeout: 5s
      replyTimeout: 60s

生产者

  @Test
    public void DirectExchange() {
        Map map =  new HashMap<>();
        CorrelationData correlationData = new CorrelationData();
        map.put("msg","json");
        rabbitTemplate.convertSendAndReceive (RabbitmqDirectConfig.EXCHANGE_NAME, "ROUTING_KEY_01", map, correlationData);
        System.out.println("发送消息boot mq hello Direct成功");
    }

消费者需要使用 receiveAndReply进行恢复,达到rpc调用的效果

 @Test
    public void receiveTest() {
      rabbitTemplate.receiveAndReply("QUEUE_DEMO_DIRECT",
                (o)->{
                    System.out.println(o);
                    return o;
                });
       // System.out.println(queue_demo_direct);
    }

Exchange,routingKey,defaultReceiveQueue

三个默认值,如果不指定将使用默认配置的交换机和routingkey
defaultReceiveQueue的作用是,receive方法默认拉取消息的队列文章来源地址https://www.toymoban.com/news/detail-485151.html

@Override
	@Nullable
	public Message receive() throws AmqpException {
		return this.receive(getRequiredQueue());
	}

到了这里,关于RabbitTemplate的创建与配置的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 芋道SpringBoot配置Maven、创建SpringBoot项目、创建Web接口、读取配置信息

    🌹作者主页:青花锁 🌹简介:Java领域优质创作者🏆、Java微服务架构公号作者😄 🌹简历模板、学习资料、面试题库、技术互助 🌹文末获取联系方式 📝 第一章 芋道 Spring Boot 快速入门 芋道 SpringBoot是一款国产的SpringCloud微服务框架,包括Outh2.0、微服务网关、微服务注册中

    2024年04月23日
    浏览(37)
  • springboot创建并配置环境(一) - 创建环境

    在springboot的启动流程中, 启动环境 Environment 是 可以说是 除了应用上下文 ApplicationContext 之外 最重要的一个组件了,而且启动环境为应用上下文提供了最基本的前提基础。 在启动环境中,主要保存大量配置信息和当前操作系统的配置信息以及环境变量。 对于它的重要性,我

    2024年02月15日
    浏览(35)
  • 虚拟环境创建、配置及激活

    虚拟环境(Virtual Environment)是在计算机上使用特定版本的编程语言(如python 3.9)和其所需包及依赖项的一种方法(如pandas 2.4),它可以被看作是一个隔离出来的操作平台。每个虚拟环境都有一个自己的 Python 解释器和库,与系统中全局安装的 Python 解释器和库分隔开,多种工具

    2024年02月14日
    浏览(37)
  • Vue创建项目配置情况

    刚开始接触vue项目创建和运行因为node版本和插件版本不一致时长遇到刚装好插件,项目就跑不起来的情况,特此记录一下 vue -V @vue/cli 5.0.8 node -v v12.22.12 npm -v 6.14.16 关闭驼峰命名检查、未使用语法检查  package.json文件内容:  后续会不断更新完善......

    2024年01月16日
    浏览(26)
  • vue3之vite创建h5项目1(创建vite项目、配置IP访问项目、配置多环境变量与预览打包生产效果、配置别名)

    初始化项目模块 添加环境变量文件,每个文件写入配置,定义 env 环境变量前面必须加 VITE_ dev环境 test环境 prod环境 在项目根目录下创建 03-1:配置多环境变量之dev环境 .env.development 03-2:配置多环境变量之test环境 .env.test 03-3:配置多环境变量之prod环境 .env.production 03-4 修改

    2024年02月02日
    浏览(91)
  • IDEA创建Java类时自动配置注释(作者,创建时间,版本等)

    在使用IDEA创建Java类时,自动配置注释功能可以通过以下步骤启用: 打开IDEA的设置窗口。 选择“Editor”选项卡,然后选择“File and Code Templates”。 在“File and Code Templates”窗口中,选择“Includes”选项卡。 在“Includes”选项卡中,选择“File Header”并编辑以下内容: 单击“

    2024年02月16日
    浏览(48)
  • 使用配置文件创建conda环境

    参考链接: conda install 与 pip install 的区别 https://www.cnblogs.com/yibeimingyue/p/14660246.html 注意:使用官方给出的部署文件进行环境的创建时,特别要注意 cuda 版本是否和服务器的版本( 显卡驱动版本 )一致(这里的一致是指:显卡驱动以及 CUDA 版本不能低于需要安装的 CUDA 版本)

    2024年02月05日
    浏览(33)
  • 【Docker私有仓库】创建与配置

    (1)拉取私有仓库镜像(此步省略) (2)启动私有仓库容器 (3)打开浏览器 输入地址 http://192.168.1.103:5000/v2/_catalog 看到 {\\\"repositories\\\":[]} 表示私有仓库搭建成功并且内容为空 (4)修改daemon.json 添加以下内容,保存退出。 此步用于让 docker信任私有仓库地址 (5)重启docker

    2024年02月11日
    浏览(51)
  • Nacos配置中心中配置文件的创建、微服务读取nacos配置中心

    在企业项目中会有非常多的服务,不同的开发环境还有不同的配置文件,所以就导致配置文件非常多。 那么肯定就会有一些公共配置,多个服务都是使用过一样的,那么就可以使用配置中心来进行统一管理,避免修改一个配置项要去各个服务都改一遍。 使用传统方式的配置

    2024年02月02日
    浏览(44)
  • 创建密码库/创建用户帐户/更新 Ansible 库的密钥/ 配置cron作业

    目录 创建密码库 创建用户帐户 更新 Ansible 库的密钥  配置cron作业       按照下方所述,创建一个 Ansible 库来存储用户密码:         库名称为 /home/curtis/ansible/locker.yml         库中含有两个变量,名称如下:             pw_developer,值为 Imadev             pw_manager,值为

    2024年02月12日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包