第十四章 RabbitMQ应用

这篇具有很好参考价值的文章主要介绍了第十四章 RabbitMQ应用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。



第十四章 RabbitMQ应用,微服务,rabbitmq,ruby,分布式

前言

一般MQ用于系统解耦、削峰使用,常见于微服务、业务活动等场景。

1、RabbitMQ概念

RabbitMQ整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。
第十四章 RabbitMQ应用,微服务,rabbitmq,ruby,分布式

1.1、生产者和消费者

  • Producer:生产者,就是投递消息的一方。消息一般可以包含2个部分:消息体和标签(Label)。消息的标签用来描述这条消息,比如一个交换器的名称和一个路由键。
  • Consumer:消费者,就是接受消息的一方。消费者连接到RabbitMQ服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体(payload)
  • Broker:消息中间件的服务节点。一个RabbitMQ Broker看做一台RabbitMQ服务器
    第十四章 RabbitMQ应用,微服务,rabbitmq,ruby,分布式

1.2、队列

Queue:队列,是RabbitMQ的内部对象,用于存储消息
第十四章 RabbitMQ应用,微服务,rabbitmq,ruby,分布式
第十四章 RabbitMQ应用,微服务,rabbitmq,ruby,分布式

1.3、交换机、路由键、绑定

Exchange:交换器。生产者将消息发送到Exchange(交换器,通常也可以用大写的"X"来表示),有交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃。
第十四章 RabbitMQ应用,微服务,rabbitmq,ruby,分布式
RoutingKey:路由键。生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。
Binding:绑定。RabbitMQ中通过绑定将交换器与队列联合起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样RabbitMQ就知道如何正确地将消息路由到队列了。
第十四章 RabbitMQ应用,微服务,rabbitmq,ruby,分布式

1.3.1、交换机类型

  • Direct Exchange:直连交换机,根据Routing Key(路由键)进行投递到不同队列。
    第十四章 RabbitMQ应用,微服务,rabbitmq,ruby,分布式
    第十四章 RabbitMQ应用,微服务,rabbitmq,ruby,分布式
  • Fanout Exchange:扇形交换机,采用广播模式,根据绑定的交换机,路由到与之对应的所有队列。
    第十四章 RabbitMQ应用,微服务,rabbitmq,ruby,分布式
  • Topic Exchange:主题交换机,对路由键进行模式匹配后进行投递,符号#表示一个或多个词,*表示一个词。
    第十四章 RabbitMQ应用,微服务,rabbitmq,ruby,分布式
  • Header Exchange:头交换机,不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。
    第十四章 RabbitMQ应用,微服务,rabbitmq,ruby,分布式
    自学参考:https://blog.csdn.net/qq_38550836/article/details/95358353

2、RabbitMQ运转流程

2.1、生产者发送消息流程

  • 生产者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)
  • 生产者声明一个交换器,并设置相关属性,比如交换器类型、是否持久化等
  • 生产者声明一个队列并设置相关属性,比如是否排他、是否持久化、是否自动删除等
  • 生产者通过路由键将交换器和队列绑定起来
  • 生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息
  • 相应的交换器根据接收到的路由键查找相匹配的队列。
  • 如果找到,则将从生产者发送过来的消息存入相应的队列。
  • 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
  • 关闭信道
  • 关闭连接

2.2、消费者接收消息的过程

  • 消费者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)。
  • 消费者向RabbitMQ Broker请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作。
  • 等待RabbitMQ Broker回应并投递相应队列中队列的消息,消费者接收消息。
  • 消费者确认(ack)接收到的消息。
  • RabbitMQ从队列中删除相应已经被确认的消息。
  • 关闭信道
  • 关闭连接
    第十四章 RabbitMQ应用,微服务,rabbitmq,ruby,分布式无论是生产者还是消费者,都需要和RabbitMQ Broker建立连接,这个连接就是一条TCP连接,也就是Connection。一旦TCP连接建立起来,客户端紧接着可以创建一个AMQP信道(Channel),每个信道都会被指派一个唯一的ID。信道是建立在Connection之上的虚拟连接,RabbitMQ处理的每条AMQP指令都是通过信道完成的。

2.3、AMQP协议

Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等
第十四章 RabbitMQ应用,微服务,rabbitmq,ruby,分布式

  • Broker:接收和分发消息的应用,RabbitMQ 就是 Message Broker
  • Virtual Host:虚拟 Broker,将多个单元隔离开
  • Connection:publisher / consumer 和 broker 之间的 tcp 连接
  • Channel:connection 内部建立的逻辑连接,通常每个线程创建单独的 channel
  • Routing key:路由键,用来指示消息的路由转发,相当于快递的地址
  • Exchange:交换机,相当于快递的分拨中心
  • Queue:消息队列,消息最终被送到这里等待 consumer 取走
  • Binding:exchange 和 queue 之间的虚拟连接,用于 message 的分发依据

3、RabbitMQ windows安装

3.1、下载

https://github.com/erlang/otp/releases/download/OTP-25.2/otp_win64_25.2.exe
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.11.5/rabbitmq-server-3.11.5.exe

3.2、安装

配置环境变量
第十四章 RabbitMQ应用,微服务,rabbitmq,ruby,分布式
第十四章 RabbitMQ应用,微服务,rabbitmq,ruby,分布式

cd D:\Program Files\RabbitMQ Server\rabbitmq_server-3.11.5\sbin

开启rabbitmq-plugins插件

rabbitmq-plugins enable rabbitmq_management
第十四章 RabbitMQ应用,微服务,rabbitmq,ruby,分布式

打开地址
http://127.0.0.1:15672/
第十四章 RabbitMQ应用,微服务,rabbitmq,ruby,分布式
输入用户名/密码:guest/guest

4、Spring Boot 整合RabbitMQ

4.1、在user-service添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

4.2、配置文件添加

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

4.3、增加RabbitMQ配置类

package com.xxxx.user.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    /******************direct**********************/
    /**
     * 创建direct队列
     * @return
     */
    @Bean
    public Queue directQueue(){
        return new Queue("directQueue");
    }

    /**
     * 创建direct交换机
     * @return
     */
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("directExchange");
    }

    /**
     * 把队列和交换机绑定在一起
     * @param queue
     * @param directExchange
     * @return
     */
    @Bean
    public Binding bindingDirect(@Qualifier("directQueue") Queue queue, DirectExchange directExchange){
        return BindingBuilder.bind(queue).to(directExchange).with("routingKey");
    }

    /******************topic**********************/
    @Bean
    public Queue topicQuerue1(){
        return new Queue("topicQuerue1");
    }

    @Bean
    public Queue topicQuerue2(){
        return new Queue("topicQuerue2");
    }
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("topicExchange");
    }
    @Bean
    public Binding bindingTopic1(@Qualifier("topicQuerue1") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("topic.key1");
    }

    /**
     * 通配符:* 表示一个词,# 表示零个或多个词
     * @param queue
     * @param topicExchange
     * @return
     */
    @Bean
    public Binding bindingTopic2(@Qualifier("topicQuerue2") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("topic.#");
    }

    /******************fanout**********************/
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanoutQueue1");
    }
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanoutQueue2");
    }
    @Bean
    public Queue fanoutQueue3(){
        return new Queue("fanoutQueue3");
    }
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    public Binding bindingFanout1(@Qualifier("fanoutQueue1") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    @Bean
    public Binding bindingFanout2(@Qualifier("fanoutQueue2") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    @Bean
    public Binding bindingFanout3(@Qualifier("fanoutQueue3") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
}

4.4、新增消费监听类

package com.xxxx.user.consumer;

import com.xxxx.springCloud.common.entity.UserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
@RabbitListener(queues = "directQueue")
public class DataDirectReceiver {
    @RabbitHandler
    public void process(String data){
        log.info("收到directQueue队列信息:" + data);
    }

    @RabbitHandler
    public void process(UserInfo data){
        log.info("收到directQueue队列信息:" + data);
    }
}
package com.xxxx.user.consumer;

import com.xxxx.common.entity.UserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
@RabbitListener(queues = {"topicQuerue1","topicQuerue2"})
public class DataFanoutReceiver {
    @RabbitHandler
    public void process(String data){
        log.info("收到topicQuerue队列信息:" + data);
    }

    @RabbitHandler
    public void process(UserInfo data){
        log.info("收到topicQuerue队列信息:" + data);
    }
}
package com.xxxx.user.consumer;

import com.xxxx.common.entity.UserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
@RabbitListener(queues = {"fanoutQueue1","fanoutQueue2","fanoutQueue3"})
public class DataTopicReceiver {
    @RabbitHandler
    public void process(String data){
        log.info("收到topicQuerue队列信息:" + data);
    }

    @RabbitHandler
    public void process(UserInfo data){
        log.info("收到topicQuerue队列信息:" + data);
    }
}

4.5、消息生产端

package com.xxxx.user;

import com.xxxx.common.entity.UserInfo;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class DataSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendDirect(){
        UserInfo userInfo = new UserInfo();
        userInfo.setUserAccount("tiger");
        userInfo.setPassword("12345");
        this.rabbitTemplate.convertAndSend("directExchange","routingKey",userInfo);
    }
    @Test
    public void sendTopic(){
        this.rabbitTemplate.convertAndSend("topicExchange","topic.key2","Hello world topic");
    }

    @Test
    public void sendFanout(){
        this.rabbitTemplate.convertAndSend("fanoutExchange","","Hello world topic");
    }
}


第十四章 RabbitMQ应用,微服务,rabbitmq,ruby,分布式文章来源地址https://www.toymoban.com/news/detail-824300.html



到了这里,关于第十四章 RabbitMQ应用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 第十四章 代理模式

    CGlib 代理和 jdk 代理的最大区别是不需要 被代理类实现接口,使用方法拦截的形式进行动态代理

    2023年04月26日
    浏览(57)
  • 第十四章 ObjectScript - 系统函数

    本节重点介绍 ObjectScript 中一些最常用的系统函数。 这些函数的名称不区分大小写。 类库还提供了大量实用方法,可以像使用函数一样使用它们。 在给定一些输入的情况下,可以使用以下函数来选择一个值: $CASE 将给定的测试表达式与一组比较值进行比较,然后返回与匹配

    2024年02月10日
    浏览(19)
  • Nodejs 第十四章(process)

    process 是Nodejs操作当前进程和控制当前进程的API,并且是挂载到globalThis下面的全局API API 介绍 1. process.arch 返回操作系统 CPU 架构 跟我们之前讲的os.arch 一样 \\\'arm\\\' 、 \\\'arm64\\\' 、 \\\'ia32\\\' 、 \\\'mips\\\' 、 \\\'mipsel\\\' 、 \\\'ppc\\\' 、 \\\'ppc64\\\' 、 \\\'s390\\\' 、 \\\'s390x\\\' 、以及  \\\'x64\\\' 2. process.cwd() 返回当前的工作目

    2024年02月10日
    浏览(18)
  • [C国演义] 第十四章

    力扣链接 常见的子数组问题 ⇒ 要使用动态规划的解法 那么要确定dp数组的含义 ⇒ do[i] — — 以 s[i] 结尾的子数组可不可以用 wordDict中的字符串来表示 那么问题来了, 如何判断字符串[j, i] 在没在wordDict中呢? 我们可以用一个 哈希表 . 将wordDict导入一个哈希表中, count 判读一个

    2024年02月08日
    浏览(27)
  • Android 第十四章 FragmentContainerView

    FragmentContainerView extends FrameLayout FragmentContainerView是专门为Fragments设计的自定义布局。它扩展了FrameLayout,因此它可以可靠地处理Fragment 事务,并且它还具有与Fragment 行为协调的附加特性 FragmentContainerView应用作Fragments的容器,通常设置在活动的xml布局 FragmentContainerView将只允许

    2024年02月11日
    浏览(29)
  • Rabbitmq----分布式场景下的应用

    如果单机模式忘记也可以看看这个快速回顾rabbitmq,在做学习 消息队列在使用过程中,面临着很多实际问题需要思考: 消息从发送,到消费者接收,会经理多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消

    2024年02月08日
    浏览(20)
  • 第十四章 Unity 移动和旋转(下)

    本章节我们介绍另外两种形式的旋转,也对应了两个方法。首先是RotateAround方法,他是围绕穿过世界坐标中的 point 点的 axis轴旋转 angle 度。这个方法虽然比较晦涩难懂,但是我们使用一个案例,大家就非常明白了。我们创建一个新的“SampleScene5”场景,然后创建一个“Cube”

    2024年02月08日
    浏览(26)
  • 第十四章 TIM基本定时器

    目录 13.1 定时器的分类 13.2 TIM基本定时器简介 13.2.1 定时器的概念和作用 13.2.2 TIM基本定时器的工作原理和使用场景 13.3 TIM基本定时器功能框图 13.3.1 时钟源 13.3.2 控制器 13.3.3 时基(定时器的心脏) 13.3.4 影子寄存器 13.4 TIM基本定时器的初始化和配置方法 13.4.1 定时时间的计算

    2024年02月05日
    浏览(20)
  • 第十四章 使用Vercel部署在线文档

    文档网站需要发布到互联网上才能让更多的人知道。传统的发布方法需要做以下准备。 Linux服务器; 网页服务软件 Nginx; 购买域名 + 实名认证; HTTPS 证书; Sftp 上传工具; Github Action CI 自动发布最新文档。 这里面租用服务器和域名需要一笔花费。安装 Linux、Nginx,配置域名

    2024年02月07日
    浏览(29)
  • 第十四章 开放条件下的宏观经济

    国际收支是指一个经济体的居民与非居民之间因各种经济往来而发生的收入和支付的系统记录。 国际收支是一个经济概念。 国际收支反映的是以货币数量记录的全部国际经济交易。商品和服务买卖、物物交换、金融资产之间的交换、无偿的单项产品和服务的转移、无偿的单

    2024年02月09日
    浏览(22)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包