Centos7安装RabbitMQ与使用(超详细),21年大数据开发面经分享

这篇具有很好参考价值的文章主要介绍了Centos7安装RabbitMQ与使用(超详细),21年大数据开发面经分享。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

public static void main(String[] args) throws Exception{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
/**
* 为了保险起见,防止生产方未启动队列未创建的情况下消费方启动后报404异常,最好在消费方中也声明创建队列,注意消费方和生产方声明的队列必须完全一致
* 注意:不用考虑队列是否会重复创建,在RabbitMQ中如果队列已经存在是不会被重新创建的
*/
channel.queueDeclare(“work-queue”,true,false,false,null);
/**
* 设置每次抓取的数据条数
* 不设置:默认平分队列中的消息,如果队列中中有100条数据,那么两个消费方各抓取50条数据进行消费
* 设置 :按照设置的条数抓取,如果设置1,那么消费方从队列中每次抓取1条数据进行消费,消费完成后再抓取1条,直到队列中没有消息
*/
channel.basicQos(1);

/**
* 第一个参数:队列名,所要消费的队列
* 第二个参数:是否自动确认
* true表示自送确认:消息拿到了就确认
* false表示手动确认:消息处理完成后确认
*/

channel.basicConsume(“work-queue”,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body)+envelope.getDeliveryTag());
/**
* 手动确认方式
* 第一个参数:包裹的标签(消息的标签,RabbitMQ将每个消息看成是一个包裹),是个整数
* 第二个参数:是否多条消息批量确认,如果第一、二、三…条消息没有确认,后面一条消息确认被消费了,那么前面所有的消息都会被确认消费了
*/
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}

}

消费者2:

package com.example.demo.rabbitMQ.work;

import com.example.demo.rabbitMQ.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class ConsumerTwo {
public static void main(String[] args) throws Exception{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
/**
* 为了保险起见,防止生产方未启动队列未创建的情况下消费方启动后报404异常,最好在消费方中也声明创建队列,注意消费方和生产方声明的队列必须完全一致
* 注意:不用考虑队列是否会重复创建,在RabbitMQ中如果队列已经存在是不会被重新创建的
*/
channel.queueDeclare(“work-queue”,true,false,false,null);

/**
* 设置每次抓取的数据条数
* 不设置:默认平分队列中的消息,如果队列中中有100条数据,那么两个消费方各抓取50条数据进行消费
* 设置 :按照设置的条数抓取,如果设置1,那么消费方从队列中每次抓取1条数据进行消费,消费完成后再抓取1条,直到队列中没有消息
*/
channel.basicQos(1);

/**
* 第一个参数:队列名,所要消费的队列
* 第二个参数:是否自动确认
* true表示自送确认:消息拿到了就确认
* false表示手动确认:消息处理完成后确认
*/

channel.basicConsume(“work-queue”,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(new String(body)+envelope.getDeliveryTag());
/**
* 手动确认方式
* 第一个参数:包裹的标签(消息的标签,RabbitMQ将每个消息看成是一个包裹),是个整数
* 第二个参数:是否多条消息批量确认,如果第一、二、三…条消息没有确认,后面一条消息确认被消费了,那么前面所有的消息都会被确认消费了
*/
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}

}

广播模型-Fanout ============================================================

在广播模式下,生产者将消息发送给交换机后,交换机将消息推送给每一个绑定在交换机上的队列,每一个队列的消费者都能拿到消息。在Fanout模式中,一条消息,会被所有绑定在该交换机上的队列消费。

生产者:

package com.example.demo.rabbitMQ.fanout;

import com.example.demo.rabbitMQ.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();

/**
* 声明一个Fanout类型的交换机
* 第一个参数:交换机名称
* 第二个参数:交换机类型:BuiltinExchangeType.FANOUT、BuiltinExchangeType.TOPIC、BuiltinExchangeType.DIRECT
*/
channel.exchangeDeclare(“fanout-exchange”, BuiltinExchangeType.FANOUT);

/**
* 注意:通过channel.queueDeclare()方法创建的Queue绑定在默认的交换机上,且BindingKey和队列名一致
* 第一个参数:队列名
* 第二个参数:队列是否持久化,如果为false,rabbitMQ服务关闭队列消失
* 第三个参数:队列是否为排他队列,如果为true,队列仅供创建它的连接使用,当前连接关闭队列消失
* 第四个参数:队列是否自动删除,如果为true,队列中的消息消费完成,并且消费方关闭后,队列自动删除
*/
channel.queueDeclare(“fanout-queue”,true,false,false,null);

/**
* 将队列绑定到交换机上
* 第一个参数:队列名称
* 第二个参数:交换机名称
* 第三个参数:BindingKey
*/
channel.queueBind(“fanout-queue”,“fanout-exchange”,“fanout-model”);

/**
* 第一个参数:交换机名称,如果为“”空串表示使用默认交换机,将消息投递到默认交换机,通过默认交换机投递到与之绑定的队列
* 第二个参数:RoutingKey==BindingKey,在Fanout模式下指定RoutingKey没有意义,Fanout模式下每个与该交换机绑定的队列都能拿到消息
* 第三个参数:消息的属性(消息是否持久化、消息存活时间)
* 第四个参数:消息内容
*/
channel.basicPublish(“fanout-exchange”,“fanout-model”,null,"Hellow World ".getBytes());
}
}

消费者1:

package com.example.demo.rabbitMQ.fanout;

import com.example.demo.rabbitMQ.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class ConsumerOne {
public static void main(String[] args) throws Exception{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
/**
* 声明一个Fanout类型的交换机
* 第一个参数:交换机名称
* 第二个参数:交换机类型
*/
channel.exchangeDeclare(“fanout-exchange”, BuiltinExchangeType.FANOUT);
/**
* 注意:通过channel.queueDeclare()方法创建的Queue绑定在默认的交换机上,且BindingKey和队列名一致
* 第一个参数:队列名
* 第二个参数:队列是否持久化,如果为false,连接关闭队列消失
* 第三个参数:队列是否为排他队列,如果为true,队列仅供创建它的连接使用,当前连接关闭队列消失
* 第四个参数:队列是否自动删除,如果为true,队列中的消息消费完成,并且消费方关闭后,队列自动删除
* getQueue()方法可获取队列名称
*/
String queueName = channel.queueDeclare(“fanout-queue-one”, true, false, false, null).getQueue();
/**
* 将队列绑定到交换机上
* 第一个参数:队列名称
* 第二个参数:交换机名称
* 第三个参数:BindingKey
*/
channel.queueBind(queueName,“fanout-exchange”,“fanout-model-one”);
/**
* 从队列中消费消息
* 第一个参数:队列名称
* 第二个参数:是否自动确认
* 第三个参数:消费者
*/
channel.basicConsume(“fanout-queue-one”,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body)+“One”+" "+envelope.getDeliveryTag());
/**
* 确认消息是否消费,给队列反馈
* 第一个参数:包裹(消息)标签
* 第二个参数:是否多条消息批量确认,如果第一、二、三…条消息没有确认,后面一条消息确认被消费了,那么前面所有的消息都会被确认消费了
*/
channel.basicAck(envelope.getDeliveryTag(),true);
}
});
}
}

消费者2:

package com.example.demo.rabbitMQ.fanout;

import com.example.demo.rabbitMQ.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class ConsumerTwo {
public static void main(String[] args) throws Exception{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
/**
* 声明一个Fanout类型的交换机
* 第一个参数:交换机名称
* 第二个参数:交换机类型
*/
channel.exchangeDeclare(“fanout-exchange”, BuiltinExchangeType.FANOUT);
/**
* 注意:通过channel.queueDeclare()方法创建的Queue绑定在默认的交换机上,且BindingKey和队列名一致
* 第一个参数:队列名
* 第二个参数:队列是否持久化,如果为false,rabbitMQ服务关闭队列消失
* 第三个参数:队列是否为排他队列,如果为true,队列仅供创建它的连接使用,当前连接关闭队列消失
* 第四个参数:队列是否自动删除,如果为true,队列中的消息消费完成,并且消费方关闭后,队列自动删除
*/
channel.queueDeclare(“fanout-queue-two”,true,false,false,null);
/**
* 将队列绑定到交换机上
* 第一个参数:队列名称
* 第二个参数:交换机名称
* 第三个参数:BindingKey
*/
channel.queueBind(“fanout-queue-two”,“fanout-exchange”,“fanout-model-two”);
/**
* 从队列中消费消息
* 第一个参数:队列名称
* 第二个参数:是否自动确认
* 第三个参数:消费者
*/
channel.basicConsume(“fanout-queue-two”,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body)+“Two”);
}
});
}
}

消费者3:

package com.example.demo.rabbitMQ.fanout;

import com.example.demo.rabbitMQ.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class ConsumerThree {
public static void main(String[] args) throws Exception{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
/**
* 声明一个Fanout类型的交换机
* 第一个参数:交换机名称
* 第二个参数:交换机类型
*/
channel.exchangeDeclare(“fanout-exchange”, BuiltinExchangeType.FANOUT);
/**
* 注意:通过channel.queueDeclare()方法创建的Queue绑定在默认的交换机上,且BindingKey和队列名一致
* 第一个参数:队列名
* 第二个参数:队列是否持久化,如果为false,连接关闭队列消失
* 第三个参数:队列是否为排他队列,如果为true,队列仅供创建它的连接使用,当前连接关闭队列消失
* 第四个参数:队列是否自动删除,如果为true,队列中的消息消费完成,并且消费方关闭后,队列自动删除
* getQueue()方法可获取队列名称
*/
String queueName = channel.queueDeclare(“fanout-queue-one”, true, false, false, null).getQueue();
/**
* 将队列绑定到交换机上
* 第一个参数:队列名称
* 第二个参数:交换机名称
* 第三个参数:BindingKey
*/
channel.queueBind(queueName,“fanout-exchange”,“fanout-model-one”);
/**
* 设置每次抓取的数据条数
* 不设置:默认平分队列中的消息,如果队列中中有100条数据,那么两个消费方各抓取50条数据进行消费
* 设置 :按照设置的条数抓取,如果设置1,那么消费方从队列中每次抓取1条数据进行消费,消费完成后再抓取1条,直到队列中没有消息
*/
channel.basicQos(1);
/**
* 从队列中消费消息
* 第一个参数:队列名称
* 第二个参数:是否自动确认
* 第三个参数:消费者
*/
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body)+“Three”+" "+envelope.getDeliveryTag());
}
});
}
}

定向模型-Direct ============================================================

在定向模型中,生产者将消息发送给交换机后,交换机将消息根据RoutingKey\BindingKey推送到对应的队列上。Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的BindingKey与消息的 RoutingKey完全一致,才会接收到消息
生产者:

package com.example.demo.rabbitMQ.direct;

import com.example.demo.rabbitMQ.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.nio.charset.StandardCharsets;

public class Producer {
public static void main(String[] args) throws Exception{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(“direct-exchange”, BuiltinExchangeType.DIRECT);
//队列1
String queue1Name = channel.queueDeclare(“direct-queue-one”, true, false, false, null).getQueue();
channel.queueBind(queue1Name,“direct-exchange”,“direct-model-one”);
//队列2
String queue2Name = channel.queueDeclare(“direct-queue-two”, true, false, false, null).getQueue();
channel.queueBind(queue2Name,“direct-exchange”,“direct-model-two”);

channel.basicPublish(“direct-exchange”,“direct-model-two”,null,"JAVA是世界上最好的语言 ".getBytes(StandardCharsets.UTF_8));
}
}

消费者1:

package com.example.demo.rabbitMQ.direct;

import com.example.demo.rabbitMQ.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class ConsumerOne {
public static void main(String[] args) throws Exception{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(“direct-exchange”, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare(“direct-queue-one”, true, false, false, null).getQueue();
channel.queueBind(queueName,“direct-exchange”,“direct-model-one”);

channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, StandardCharsets.UTF_8)+“One”);
}
});
}
}

消费者2:

package com.example.demo.rabbitMQ.direct;

import com.example.demo.rabbitMQ.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class ConsumerTwo {
public static void main(String[] args) throws Exception{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(“direct-exchange”, BuiltinExchangeType.DIRECT);

String queueName = channel.queueDeclare(“direct-queue-two”, true, false, false, null).getQueue();
channel.queueBind(queueName,“direct-exchange”,“direct-model-two”);

channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body)+“Two”);
}
});
}
}

主题模型-Topic ============================================================

在主题模式下,生产者将消息发送给交换机后,交换机根据RoutingKey\BindingKey匹配队列,将消息推送到匹配的队列上。在topic模式下通过#号和*号进行模糊匹配,通过.进行分割,#号表示可能有一个或多个单词,也可能没有;*号表示有且仅有一个单词

生产者:

package com.example.demo.rabbitMQ.topic;

import com.example.demo.rabbitMQ.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {
public static void main(String[] args) throws Exception{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
/**
* 声明一个TOPIC类型的交换机
* 第一个参数:交换机名称
* 第二个参数:交换机类型:BuiltinExchangeType.FANOUT、BuiltinExchangeType.TOPIC、BuiltinExchangeType.DIRECT
*/
channel.exchangeDeclare(“topic-exchange”, BuiltinExchangeType.TOPIC);
/**
* 声明队列1:
* 注意:通过channel.queueDeclare()方法创建的Queue绑定在默认的交换机上,且BindingKey和队列名一致
* 第一个参数:队列名
* 第二个参数:队列是否持久化,如果为false,rabbitMQ服务关闭队列消失
* 第三个参数:队列是否为排他队列,如果为true,队列仅供创建它的连接使用,当前连接关闭队列消失
* 第四个参数:队列是否自动删除,如果为true,队列中的消息消费完成,并且消费方关闭后,队列自动删除
*/
String queue1Name = channel.queueDeclare(“topic-queue-one”, true, false, false, null).getQueue();
/**
* 将队列绑定到交换机上
* 第一个参数:队列名称
* 第二个参数:交换机名称
* 第三个参数:BindingKey,在topic模式下通过#和*进行模糊匹配,通过.进行分割,#号表示可能有一个或多个单词,也可能没有;*号表示有且仅有一个单词
*/
channel.queueBind(queue1Name,“topic-exchange”,“topic-model.#”);

/**
* 声明队列2:
* 注意:通过channel.queueDeclare()方法创建的Queue绑定在默认的交换机上,且BindingKey和队列名一致
* 第一个参数:队列名
* 第二个参数:队列是否持久化,如果为false,rabbitMQ服务关闭队列消失
* 第三个参数:队列是否为排他队列,如果为true,队列仅供创建它的连接使用,当前连接关闭队列消失
* 第四个参数:队列是否自动删除,如果为true,队列中的消息消费完成,并且消费方关闭后,队列自动删除
*/
String queue2Name = channel.queueDeclare(“topic-queue-two”, true, false, false, null).getQueue();
/**
* 将队列绑定到交换机上
* 第一个参数:队列名称
* 第二个参数:交换机名称
* 第三个参数:BindingKey,在topic模式下通过#和*进行模糊匹配,通过.进行分割,#号表示可能有一个或多个单词,也可能没有;*号表示有且仅有一个单词
*/
channel.queueBind(queue2Name,“topic-exchange”,“topic-model.*”);

/**
* 声明队列3:
* 注意:通过channel.queueDeclare()方法创建的Queue绑定在默认的交换机上,且BindingKey和队列名一致
* 第一个参数:队列名
* 第二个参数:队列是否持久化,如果为false,rabbitMQ服务关闭队列消失
* 第三个参数:队列是否为排他队列,如果为true,队列仅供创建它的连接使用,当前连接关闭队列消失
* 第四个参数:队列是否自动删除,如果为true,队列中的消息消费完成,并且消费方关闭后,队列自动删除
*/
String queue3Name = channel.queueDeclare(“topic-queue-three”, true, false, false, null).getQueue();
/**
* 将队列绑定到交换机上
* 第一个参数:队列名称
* 第二个参数:交换机名称
* 第三个参数:BindingKey,在topic模式下通过#和*进行模糊匹配,通过.进行分割,#号表示可能有一个或多个单词,也可能没有;*号表示有且仅有一个单词
*/
channel.queueBind(queue3Name,“topic-exchange”,“topic-model.three.#”);

/**
* 第一个参数:交换机名称,如果为“”空串表示使用默认交换机,将消息投递到默认交换机,通过默认交换机投递到与之绑定的队列
* 第二个参数:RoutingKey==BindingKey,在topic模式下通过#和*进行模糊匹配,通过.进行分割,#号表示可能有一个或多个单词,也可能没有;*号表示有且仅有一个单词
* 第三个参数:消息的属性(消息是否持久化、消息存活时间)
* 第四个参数:消息内容
*/
channel.basicPublish(“topic-exchange”,“topic-model.three.java”,null,“java是世界上最优秀的语言”.getBytes());

}
}

消费者1:

package com.example.demo.rabbitMQ.topic;

import com.example.demo.rabbitMQ.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class ConsumerOne {
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
/**
* 为了保险起见,防止生产方未启动队列\交换机未创建消的情况下费方启动后报404异常,最好在消费方中也声明创建队列,注意消费方和生产方声明的队列\交换机必须完全一致
* 注意:不用考虑队列\交换机是否会重复创建,在RabbitMQ中如果队列已经存在是不会被重新创建的
*/

//声明交换机
channel.exchangeDeclare(“topic-exchange”, BuiltinExchangeType.TOPIC);
//声明队列
String queue1Name = channel.queueDeclare(“topic-queue-one”, true, false, false, null).getQueue();
//将队列绑定到交换机上
channel.queueBind(queue1Name,“topic-exchange”,“topic-model.#”);
//设置每次抓取的数据条数
channel.basicQos(1);
/**
* 第一个参数:被消费的队列名
* 第二个参数:是否自动确认
* 第三个参数:使用默认的消费者
*/
channel.basicConsume(queue1Name,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, StandardCharsets.UTF_8));
//在手动确认的情况下使用
/**
* 确认消息是否消费,给队列反馈
* 第一个参数:包裹(消息)标签
* 第二个参数:是否多条消息批量确认,如果第一、二、三…条消息没有确认,后面一条消息确认被消费了,那么前面所有的消息都会被确认消费了
*/
//channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}

消费者2:

package com.example.demo.rabbitMQ.topic;

import com.example.demo.rabbitMQ.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class ConsumerTwo {
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
/**
* 为了保险起见,防止生产方未启动队列\交换机未创建消的情况下费方启动后报404异常,最好在消费方中也声明创建队列,注意消费方和生产方声明的队列\交换机必须完全一致
* 注意:不用考虑队列\交换机是否会重复创建,在RabbitMQ中如果队列已经存在是不会被重新创建的
*/

//声明交换机
channel.exchangeDeclare(“topic-exchange”, BuiltinExchangeType.TOPIC);
//声明队列
String queue2Name = channel.queueDeclare(“topic-queue-two”, true, false, false, null).getQueue();
//将队列绑定到交换机上
channel.queueBind(queue2Name,“topic-exchange”,“topic-model.*”);
//设置每次抓取的数据条数
channel.basicQos(1);
/**
* 第一个参数:被消费的队列名
* 第二个参数:是否自动确认
* 第三个参数:使用默认的消费者
*/
channel.basicConsume(queue2Name,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, StandardCharsets.UTF_8));
//在手动确认的情况下使用
/**
* 确认消息是否消费,给队列反馈
* 第一个参数:包裹(消息)标签
* 第二个参数:是否多条消息批量确认,如果第一、二、三…条消息没有确认,后面一条消息确认被消费了,那么前面所有的消息都会被确认消费了
*/
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}

消费者3:

package com.example.demo.rabbitMQ.topic;

import com.example.demo.rabbitMQ.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class ConsumerThree {
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
/**
* 为了保险起见,防止生产方未启动队列\交换机未创建消的情况下费方启动后报404异常,最好在消费方中也声明创建队列,注意消费方和生产方声明的队列\交换机必须完全一致
* 注意:不用考虑队列\交换机是否会重复创建,在RabbitMQ中如果队列已经存在是不会被重新创建的
*/

//声明交换机
channel.exchangeDeclare(“topic-exchange”, BuiltinExchangeType.TOPIC);
//声明队列
String queue3Name = channel.queueDeclare(“topic-queue-two”, true, false, false, null).getQueue();
//将队列绑定到交换机上
channel.queueBind(queue3Name,“topic-exchange”,“topic-model”);
//设置每次抓取的数据条数
channel.basicQos(1);
/**
* 第一个参数:被消费的队列名
* 第二个参数:是否自动确认
* 第三个参数:使用默认的消费者
*/
channel.basicConsume(queue3Name,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, StandardCharsets.UTF_8));
//在手动确认的情况下使用
/**
* 确认消息是否消费,给队列反馈
* 第一个参数:包裹(消息)标签
* 第二个参数:是否多条消息批量确认,如果第一、二、三…条消息没有确认,后面一条消息确认被消费了,那么前面所有的消息都会被确认消费了
*/
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}

SpringBoot+RabbitMQ

第一步:先导依赖

org.springframework.boot spring-boot-starter-amqp com.alibaba fastjson 1.2.4

第二步:配置RabbitMQ

spring:
rabbitmq:
host: 192.168.119.134
port: 5672
username: rabbit
password: 123456

第三步:配置RabbitMQ使用的序列化

// 消息的消费方json数据的反序列化
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(
ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}

// 定义使用json的方式转换数据
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate amqpTemplate = new RabbitTemplate();
amqpTemplate.setConnectionFactory(connectionFactory);
amqpTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return amqpTemplate;
}

第四步:根据情况选择合适的模型

一、简单模型 ============================================================

特点:队列是绑定在默认交换机上,BindingKey就是队列名

消息投递方:
在SpringBoot中即可使用RabbitTemplate进行消息的投递,也可使用原生的方式进行消息投递

package com.example.demo.rabbitMQ.springBootrabbitMQ.simple;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(“/rabbit”)
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;

@GetMapping(“simple”)
public String sendMessage(){
rabbitTemplate.convertAndSend(“simple-queue”,“落霞与孤鹜齐飞,秋水共长天一色”);
return “成功”;
}
}

消息消费方
在SpringBoot中即可使用注解@RabbitListener进行消息的投递,也可使用原生的方式进行消息投递

package com.example.demo.rabbitMQ.springBootrabbitMQ.simple;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {
/**
* 1、@RabbitListener(queues = {“topic-queue-one”} ),一个@RabbitListener就是一个消费者
* 2、使用queues,如果队列不存在就会报异常,使用queuesToDeclare,如果队列不存在就创建队列
* 3、注意:SpringBoot提供了一个很好的消息确认机制,如果消费方消费的过程中有异常,一定要像service层一样抛出异常不能捕捉
* 4、监听队列,如果有消息就进行消费
* 5、简单模型和工作模型的queue都是绑定在默认交换机上
*/
@RabbitListener(queuesToDeclare = {@Queue(name = “simple-queue”,durable = “true”)})
public void getMessage(String message){
System.out.println("springBoot+rabbitMQ: "+message);
}
}

二、Work模型 ============================================================

特点:工作模型只是在简单模型的基础上增加了多个消费者

消息投递方

package com.example.demo.rabbitMQ.springBootrabbitMQ.work;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(“/rabbit”)
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;

@RequestMapping(value = “/work”,method = RequestMethod.GET)
public void sendMessage(){
rabbitTemplate.convertAndSend(“work-queue-one”,“祖国您好!”);
}
}

消息消费方

package com.example.demo.rabbitMQ.springBootrabbitMQ.work;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

/**
* 1、@RabbitListener(queues = {“topic-queue-one”} ),一个@RabbitListener就是一个消费者
* 2、使用queues,如果队列不存在就会报异常,使用queuesToDeclare,如果队列不存在就创建队列
* 3、注意:SpringBoot提供了一个很好的消息确认机制,如果消费方消费的过程中有异常,一定要像service层一样抛出异常不能捕捉
* 4、监听队列,如果有消息就进行消费
* 5、简单模型和工作模型的queue都是绑定在默认交换机上,工作模型就是在简单模型的基础上增加了多个消费者
*/

@RabbitListener(queuesToDeclare = @Queue(“work-queue-one”))
public void getMessageOne(String message){
System.out.println(message);
}

@RabbitListener(queuesToDeclare = @Queue(“work-queue-two”))
public void getMessageTwo(String message){
System.out.println(message);
}
}

三、Fanout模型 ============================================================

特点:凡是绑定在该交换机上的队列都会收到消息,无关路由键

消息投递方

package com.example.demo.rabbitMQ.springBootrabbitMQ.fanout;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(“rabbit”)
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;

@RequestMapping(“fanout”)
//fanout类型的交换机无关路由键
public void sendMessage(){
rabbitTemplate.convertAndSend(“fanout-exchange”,“”,“这是fanout类型的交换机”);
}
}

消息消费方

package com.example.demo.rabbitMQ.springBootrabbitMQ.fanout;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;

自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数大数据工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
Centos7安装RabbitMQ与使用(超详细),21年大数据开发面经分享,2024年程序员学习,rabbitmq,大数据,ruby
Centos7安装RabbitMQ与使用(超详细),21年大数据开发面经分享,2024年程序员学习,rabbitmq,大数据,ruby
Centos7安装RabbitMQ与使用(超详细),21年大数据开发面经分享,2024年程序员学习,rabbitmq,大数据,ruby
Centos7安装RabbitMQ与使用(超详细),21年大数据开发面经分享,2024年程序员学习,rabbitmq,大数据,ruby
Centos7安装RabbitMQ与使用(超详细),21年大数据开发面经分享,2024年程序员学习,rabbitmq,大数据,ruby

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新

如果你觉得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)
Centos7安装RabbitMQ与使用(超详细),21年大数据开发面经分享,2024年程序员学习,rabbitmq,大数据,ruby

te.convertAndSend(“fanout-exchange”,“”,“这是fanout类型的交换机”);
}
}

消息消费方

package com.example.demo.rabbitMQ.springBootrabbitMQ.fanout;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;

自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数大数据工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
[外链图片转存中…(img-kdSdEgbK-1712513731160)]
[外链图片转存中…(img-A5rWcrQG-1712513731161)]
[外链图片转存中…(img-bD5ZrjXL-1712513731161)]
[外链图片转存中…(img-BhvSH84G-1712513731162)]
[外链图片转存中…(img-hXo1vlAm-1712513731162)]

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新

如果你觉得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)
[外链图片转存中…(img-iMitFADB-1712513731162)]文章来源地址https://www.toymoban.com/news/detail-853141.html

到了这里,关于Centos7安装RabbitMQ与使用(超详细),21年大数据开发面经分享的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • Centos7安装配置RabbitMQ

    Centos7安装配置RabbitMQ

    在开始演示安装RabbitMQ前,小编决定先介绍一下前置知识,想看安装步骤的小伙伴可以直接跳过这个章节,本篇博客算是小编学习RabbitMQ的笔记,部分内容都是摘录的,如果有感兴趣的小伙伴可以私信小编,后续小编将会更新更多的关于RabbitMQ的知识,感兴趣的小伙伴可以点个订阅。 1

    2023年04月25日
    浏览(11)
  • Centos7下安装RabbitMQ教程

    Centos7下安装RabbitMQ教程

    看我这个文章安装如果不会,你顺着网线来揍我,不能说最简单,我的是超级简单!!! 一、做准备(我是在vm虚拟机上的Centos7镜像上安装的)     1、安装rabbitmq得准备他的安装包(rabbitmq-server-3.8.5-1.el7.noarch)        下载地址mq              2、还得准备erlang语言(er

    2024年02月07日
    浏览(10)
  • RabbitMQ离线安装(Centos7)

    RabbitMQ离线安装(Centos7)

    摘要: 本文介绍在Centos7上离线安装RabbitMQ 目录 一、安装RabbitMQ 1、下载rpm安装包 2、安装rpm包 二、开放相应端口白名单 1、停止 Firewall 2、打开iptables配置文件 3、追加相应端口的配置内容 4、重启iptables 三、配置并启动RabbitMQ 1、开启用户远程访问 2、 启动RabbitMQ服务 3、开启

    2024年02月06日
    浏览(14)
  • centos7安装erlang及rabbitMQ

    centos7安装erlang及rabbitMQ

    第一:自己的系统版本,centos中uname -a指令可以查看,el8,el7,rabbitMQ的包不一样! 第二:根据rabbitMQ中erlang version找到想要下载rabbitMQ对应erlang版本,地址地=:https://www.rabbitmq.com/which-erlang.html 下载地址 erlang下载: https://www.erlang.org/ RabbitMQ下载: https://github.com/rabbitmq/rabbitmq

    2024年02月12日
    浏览(14)
  • 大数据-安装 Hadoop3.1.3 详细教程-伪分布式配置(Centos7)

    大数据-安装 Hadoop3.1.3 详细教程-伪分布式配置(Centos7)

    **相关资源:**https://musetransfer.com/s/q43oznf6f(有效期至2023年3月16日)|【Muse】你有一份文件待查收,请点击链接获取文件 1.检查是否安装ssh (CentOS 7 即使是最小化安装也已附带openssh 可跳过本步骤) 若已安装进行下一步骤 若未安装 请自行百度 本教程不做过多讲解 2.配置ss

    2023年04月08日
    浏览(12)
  • 安装RabbitMQ及配置Centos7 方式(2)

    安装RabbitMQ及配置Centos7 方式(2)

    自行搭建学习参考使用,这里采用的Centos7 方式,这已经是多年前的方式了,现在主流方式是容器化安装、部署,docker、ks8,同学们可自行去学习参考。 环境:centos7 、otp_src_21.3、rabbitmq-server-generic-unix-3.7.9、c++。 注意 : Erlang 和 RabbitMQ版本对照 RabbitMQ版本 Erlang最低版本要求

    2024年03月10日
    浏览(11)
  • Centos7系统 Docker 安装和使用教程(超详细附带常用的容器部署代码)

    简介 Docker 是一个开源的容器化平台,可帮助开发者轻松地创建、部署和运行应用程序。 Docker 使开发人员能够在一个独立的容器中打包应用程序及其依赖项,这样他们就可以轻松地将应用程序移植到任何其他环境中。 使用 Docker 的主要优势: 便于部署:由于 Docker 容器可以在

    2024年02月08日
    浏览(35)
  • 手把手教你安装RabbitMQ(基于CentOS7系统)

    手把手教你安装RabbitMQ(基于CentOS7系统)

    RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 可靠性 灵活的路由 消息集群 高可用 插件机制 多种协议 多语言客户端 管理界面 跟踪机制 先安装一些必要的依赖

    2023年04月08日
    浏览(14)
  • Centos7的安装步骤【详细】

    Centos7的安装步骤【详细】

    centos7下载地址:http://mirrors.aliyun.com/centos/7/isos/x86_64/ 1、打开虚拟机,点击主页,创建新的虚拟机 2、使用典型和自定义都行,这里我选的是自定义,点击下一步 3、点击下一步 3、这里选择稍后安装操作系统 4、点击下一步 5、给虚拟机命名,然后在电脑上为这个虚拟机创建一

    2024年02月14日
    浏览(13)
  • CentOs7安装nginx【详细】

    CentOs7安装nginx【详细】

    先查看是否启动了 nginx 服务 出现这个则 nginx 没启动服务 出现这个则 nginx 启动了服务 如果 nginx 启动了服务,则需要先关闭 nginx 服务 【没启动就略过这一步】 查看所有与 nginx 有关的文件夹 删除与 nginx 有关的文件夹 卸载Nginx相关的依赖 这样就卸载完成了 查看安装nginx所需

    2024年02月02日
    浏览(10)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包