根据源码,模拟实现 RabbitMQ - 网络通讯设计,实现客户端Connection、Channel(完结)

这篇具有很好参考价值的文章主要介绍了根据源码,模拟实现 RabbitMQ - 网络通讯设计,实现客户端Connection、Channel(完结)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、客户端代码实现

1.1、需求分析

1.2、具体实现

1)实现 ConnectionFactory

2)实现 Connection

3)实现 Channel

二、编写 Demo 

2.1、实例 

2.1、实例演示


一、客户端代码实现


1.1、需求分析

RabbitMQ 的客户端设定:一个客户端可以有多个模块,每个模块都可以和 broker server 之间建立 “逻辑上的连接” (channel),这几个模块的channel 彼此之间是互相不影响的,同时这几个 channel 又复用的同一个 TCP 连接,省去了频繁 建立/销毁 TCP 连接的开销(三次握手、四次挥手......).

这里,我们也按照这样的逻辑实现 消息队列 的客户端,主要涉及到以下三个核心类:

  1. ConnectionFactory:连接工厂,这个类持有服务器的地址,主要功能就是创建 Connection 对象.
  2. Connection:表示一个 TCP连接,持有 Socket 对象,用来 写入请求/读取响应,管理多个Channel 对象.
  3. Channel:表示一个逻辑上的连接,需要提供一系列的方法,去和服务器提供的核心 API 对应(客户端提供的这些方法的内部,就是写入了一个特定的请求,然后等待服务器响应).

1.2、具体实现

1)实现 ConnectionFactory

主要用来创建 Connection 对象.

public class ConnectionFactory {

    //broker server 的 ip 地址
    private String host;
    //broker server 的端口号
    private int port;

//    //访问 broker server 的哪个虚拟主机
//    //这里暂时先不涉及
//    private String virtualHostName;
//    private String username;
//    private String password;

    public Connection newConnection() throws IOException {
        Connection connection = new Connection(host, port);
        return connection;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }
}

2)实现 Connection

属性如下

    private Socket socket;
    //一个 socket 连接需要管理多个 channel
    private ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();
    private InputStream inputStream;
    private OutputStream outputStream;
    // DataXXX 主要用来 读取/写入 特定格式数据(例如 readInt())
    private DataInputStream dataInputStream;
    private DataOutputStream dataOutputStream;
    //用来处理 0xc 的回调,这里开销可能会很大,不希望把 Connection 阻塞住,因此使用 线程池 来处理
    private ExecutorService callbackPool;

构造如下

这里不光需要初始化属性,还需要创建一个扫描线程,由这个线程负责不停的从 socket 中读取响应数据,把这个响应数据再交给对应的 channel 负责处理

    public Connection(String host, int port) throws IOException {
        socket = new Socket(host, port);
        inputStream = socket.getInputStream();
        outputStream = socket.getOutputStream();
        dataInputStream = new DataInputStream(inputStream);
        dataOutputStream = new DataOutputStream(outputStream);

        callbackPool = Executors.newFixedThreadPool(4);

        //创建一个扫描线程,由这个线程负责不停的从 socket 中读取响应数据,把这个响应数据再交给对应的 channel 负责处理
        Thread t = new Thread(() -> {
            try {
                while(!socket.isClosed()) {
                    Response response = readResponse();
                    dispatchResponse(response);
                }
            } catch (SocketException e) {
                //连接正常断开的,此时这个异常可以忽略
                System.out.println("[Connection] 连接正常断开!");
            } catch(IOException | ClassNotFoundException | MqException e) {
                System.out.println("[Connection] 连接异常断开!");
                e.printStackTrace();
            }
        });
        t.start();
    }

释放 Connection 相关资源

    public void close() {
        try {
            callbackPool.shutdown();
            channelMap.clear();
            inputStream.close();
            outputStream.close();
            socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

使用这个方法来区别,当前的响应是一个针对控制请求的响应,还是服务器推送过来的消息.

如果是服务器推送过来的消息,就响应表明是 0xc,也就是一个回调,通过线程池来进行处理;

如果只是一个普通的响应,就把这个结果放到 channel 的 哈希表中(随后 channel 会唤醒所有阻塞等待响应的线程,去 map 中拿数据).

    public void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {
        if(response.getType() == 0xc) {
            //服务器推送过来的消息数据
            SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());
            //根据 channelId 找到对应的 channel 对象
            Channel channel = channelMap.get(subScribeReturns.getChannelId());
            if(channel == null) {
                throw new MqException("[Connection] 该消息对应的 channel 再客户端中不存在!channelId=" + channel.getChannelId());
            }
            //执行该 channel 对象内部的回调(这里的开销未知,有可能很大,同时不希望把这里阻塞住,所以使用线程池来执行)
            callbackPool.submit(() -> {
                try {
                    channel.getConsumer().handlerDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),
                            subScribeReturns.getBody());
                } catch(MqException | IOException e) {
                    e.printStackTrace();
                }
            });
        } else {
            //当前响应是针对刚才的控制请求的响应
            BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());
            //把这个结果放到 channel 的 哈希表中
            Channel channel = channelMap.get(basicReturns.getChannelId());
            if(channel == null) {
                throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在!channelId=" + channel.getChannelId());
            }
            channel.putReturns(basicReturns);
        }
    }

发送请求和读取响应

    /**
     * 发送请求
     * @param request
     * @throws IOException
     */
    public void writeRequest(Request request) throws IOException {
        dataOutputStream.writeInt(request.getType());
        dataOutputStream.writeInt(request.getLength());
        dataOutputStream.write(request.getPayload());
        dataOutputStream.flush();
        System.out.println("[Connection] 发送请求!type=" + request.getType() + ", length=" + request.getLength());
    }

    /**
     * 读取响应
     */
    public Response readResponse() throws IOException {
        Response response = new Response();
        response.setType(dataInputStream.readInt());
        response.setLength(dataInputStream.readInt());
        byte[] payload = new byte[response.getLength()];
        int n = dataInputStream.read(payload);
        if(n != response.getLength()) {
            throw new IOException("读取的响应格式不完整! n=" + n + ", responseLen=" + response.getLength());
        }
        response.setPayload(payload);
        System.out.println("[Connection] 收到响应!type=" + response.getType() + ", length=" + response.getLength());
        return response;
    }

在 Connection 中提供创建 Channel 的方法

    public Channel createChannel() throws IOException {
        String channelId = "C-" + UUID.randomUUID().toString();
        Channel channel = new Channel(channelId, this);
        //放到 Connection 管理的 channel 的 Map 集合中
        channelMap.put(channelId, channel);
        //同时也需要把 “创建channel” 这个消息告诉服务器
        boolean ok = channel.createChannel();
        if(!ok) {
            //如果创建失败,就说明这次创建 channel 操作不顺利
            //把刚才加入 hash 表的键值对再删了
            channelMap.remove(channelId);
            return null;
        }
        return channel;
    }

Ps:代码中使用了很多次 UUID ,这里我们和之前一样,使用加前缀的方式来进行区分.

3)实现 Channel

属性和构造如下

    private String channelId;
    // 当前这个 channel 是属于哪一个连接
    private Connection connection;
    //用来存储后续客户端收到的服务器响应,已经辨别是哪个响应(要对的上号) key 是 rid
    private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();
    //如果当前 Channel 订阅了某个队列,就需要记录对应的回调是什么,当该队列消息返回回来的时候,调用回调
    //此处约定一个 Channel 只能有一个回调
    private Consumer consumer;

    public Channel(String channelId, Connection connection) {
        this.channelId = channelId;
        this.connection = connection;
    }

    public String getChannelId() {
        return channelId;
    }

    public void setChannelId(String channelId) {
        this.channelId = channelId;
    }

    public Connection getConnection() {
        return connection;
    }

    public void setConnection(Connection connection) {
        this.connection = connection;
    }

    public ConcurrentHashMap<String, BasicReturns> getBasicReturnsMap() {
        return basicReturnsMap;
    }

    public void setBasicReturnsMap(ConcurrentHashMap<String, BasicReturns> basicReturnsMap) {
        this.basicReturnsMap = basicReturnsMap;
    }

    public Consumer getConsumer() {
        return consumer;
    }

    public void setConsumer(Consumer consumer) {
        this.consumer = consumer;

实现 0x1 创建 channel

主要就是构造构造出 request,然后发送请求到 BrokerServer 服务器,阻塞等待服务器响应.

    /**
     * 0x1
     * 和服务器进行交互,告诉服务器,此处客户端已经创建了新的 channel 了
     * @return
     */
    public boolean createChannel() throws IOException {
        //构造 payload
        BasicArguments arguments = new BasicArguments();
        arguments.setChannelId(channelId);
        arguments.setRid(generateRid());
        byte[] payload = BinaryTool.toBytes(arguments);
        //发送请求
        Request request = new Request();
        request.setType(0x1);
        request.setLength(payload.length);
        request.setPayload(payload);
        connection.writeRequest(request);

        //等待服务器响应
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

    /**
     * 生成 rid
     * @return
     */
    public String generateRid() {
        return "R-" + UUID.randomUUID().toString();
    }


    /**
     * 阻塞等待服务器响应
     * @param rid
     * @return
     */
    private BasicReturns waitResult(String rid) {
        BasicReturns basicReturns = null;
        while((basicReturns = basicReturnsMap.get(rid)) == null) {
            //查询结果为空,就说明咱们去菜鸟驿站要取的包裹还没到
            //此时就需要阻塞等待
            synchronized (this) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
        basicReturnsMap.remove(rid);
        return basicReturns;
    }


    /**
     * 由 Connection 中的方法调用,区分为普通响应之后触发
     * 将响应放回到 channel 管理的 map 中,并唤醒所有线程
     * @param basicReturns
     */
    public void putReturns(BasicReturns basicReturns) {
        basicReturnsMap.put(basicReturns.getRid(), basicReturns);
        synchronized (this) {
            //当前也不知道有多少线程再等待上述的这个响应
            //因此就把所有等待的线程唤醒
            notifyAll();
        }
    }

Ps:其他的 请求操作也和 0x1 的方式几乎一样,这里不一一展示了,主要说一下 0xa

0xa 消费者订阅队列消息,这里要先设置好回调到属性中,方便 Connection 通过这个属性来 处理回调

值得注意的一点, 我们约定 channelId 就是 consumerTag

    public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws IOException, MqException {
        //先设置回调
        if(this.consumer != null) {
            throw new MqException("该 channel 已经设置过消费消息回调了,不能重复!");
        }
        this.consumer = consumer;
        BasicConsumeArguments basicConsumeArguments = new BasicConsumeArguments();
        basicConsumeArguments.setRid(generateRid());
        basicConsumeArguments.setChannelId(channelId);
        basicConsumeArguments.setConsumerTag(channelId); // 注意:此处的 consumerTag 使用 channelId 来表示
        basicConsumeArguments.setQueueName(queueName);
        basicConsumeArguments.setAutoAck(autoAck);
        byte[] payload = BinaryTool.toBytes(basicConsumeArguments);

        Request request = new Request();
        request.setType(0xa);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(basicConsumeArguments.getRid());
        return basicReturns.isOk();
    }

二、补充:关于回调执行流程


流程如下:

  1. 客户端调用 basicConsume 方法,创建一个消费者订阅队列消息,并带上 Consumer(消费者拿到消息之后具体要做的动作);
  2. Channel 保存了客户端发来的 Consumer(等待接收 0xc 响应,真正执行回调),发送订阅队列消息请求(0xa),并等待响应
  3. BrokerServer 接收到请求后,解析出 0xa ,又创建了一个 Consumer(目的是为了等服务器拿到要消费的消息后,将消息的数据包装成 0xc 的响应,客户端接收到响应之后,执行 “消费者拿到消息后具体要做的动作”  这是 RabbitMQ 的设定),接着 BrokerServer 调用 VirtualHost.
  4. VirtualHost 创建一个新的消费者订阅队列后,如果发现队列中又消息,立即进行消费
  5. 具体的,就是调用刚刚新创建的回调,然后向客户端返回 0xc 的响应,客户端接收到响应之后,执行 “消费者拿到消息后具体要做的动作”.

三、编写 Demo 


3.1、实例 

到了这里基本就实现完成了一个 跨主机/服务器 之间的生产者消费者模型了(功能上可以满足日常开发对消息队列的使用),但是还具有很强的扩展性,可以继续参考 RabbitMQ,如果有想法的,或者是遇到不会的问题,可以私信我~

以下我来我来编写一个 demo,模拟 跨主机/服务器 之间的生产者消费者模型(这里为了方便,就在本机演示).

首先再 spring boot 项目的启动类中 创建 BrokerServer ,绑定端口号,然后启动

@SpringBootApplication
public class RabbitmqProjectApplication {
    public static ConfigurableApplicationContext context;
    public static void main(String[] args) throws IOException {
        context = SpringApplication.run(RabbitmqProjectApplication.class, args);
        BrokerServer brokerServer = new BrokerServer(9090);
        brokerServer.start();
    }

}

编写消费者

public class DemoConsumer {

    public static void main(String[] args) throws IOException, MqException, InterruptedException {
        //建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(9090);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //创建交换机和队列(这里和生产者创建交换机和队列不冲突,谁先启动,就按照谁的创建,即使已经存在交换机和队列,再创建也不会有什么副作用)
        channel.exchangeDeclare("demoExchange", ExchangeType.DIRECT, true, false, null);
        channel.queueDeclare("demoQueue", true, false, false, null);

        //消费者消费消息
        channel.basicConsume("demoQueue", true, new Consumer() {
            @Override
            public void handlerDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
                System.out.println("开销消费");
                System.out.println("consumerTag=" + consumerTag);
                System.out.println("body=" + new String(body));
                System.out.println("消费完毕");
            }
        });

        //由于消费者不知道生产者要生产多少,就在这里通过循环模拟一直等待
        while(true) {
            Thread.sleep(500);
        }
    }

}

编写生产者

public class DemoProducer {

    public static void main(String[] args) throws IOException, InterruptedException {
        //建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(9090);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //创建交换机和队列(这里和消费者创建交换机和队列不冲突,谁先启动,就按照谁的创建,即使已经存在交换机和队列,再创建也不会有什么副作用)
        channel.exchangeDeclare("demoExchange", ExchangeType.DIRECT, true, false, null);
        channel.queueDeclare("demoQueue", true, false, false, null);

        //生产消息
        byte[] body1 = "Im cyk1 !".getBytes();
        channel.basicPublish("demoExchange", "demoQueue", null, body1);

        Thread.sleep(500);

        //关闭连接
        channel.close();
        connection.close();
    }

}

3.2、实例演示

启动 spring boot 项目(启动 BrokerServer)

根据源码,模拟实现 RabbitMQ - 网络通讯设计,实现客户端Connection、Channel(完结),RabbitMQ,rabbitmq,分布式

运行消费者(消费者和生产者谁先后运行都可以)

根据源码,模拟实现 RabbitMQ - 网络通讯设计,实现客户端Connection、Channel(完结),RabbitMQ,rabbitmq,分布式

根据源码,模拟实现 RabbitMQ - 网络通讯设计,实现客户端Connection、Channel(完结),RabbitMQ,rabbitmq,分布式

运行生产者

根据源码,模拟实现 RabbitMQ - 网络通讯设计,实现客户端Connection、Channel(完结),RabbitMQ,rabbitmq,分布式

根据源码,模拟实现 RabbitMQ - 网络通讯设计,实现客户端Connection、Channel(完结),RabbitMQ,rabbitmq,分布式

根据源码,模拟实现 RabbitMQ - 网络通讯设计,实现客户端Connection、Channel(完结),RabbitMQ,rabbitmq,分布式

根据源码,模拟实现 RabbitMQ - 网络通讯设计,实现客户端Connection、Channel(完结),RabbitMQ,rabbitmq,分布式文章来源地址https://www.toymoban.com/news/detail-676353.html

到了这里,关于根据源码,模拟实现 RabbitMQ - 网络通讯设计,实现客户端Connection、Channel(完结)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 根据源码,模拟实现 RabbitMQ - 从需求分析到实现核心类(1)

    根据源码,模拟实现 RabbitMQ - 从需求分析到实现核心类(1)

    目录 一、需求分析 1.1、对 Message Queue 的认识 1.2、消息队列核心概念 1.3、Broker Server 内部关键概念 1.4、Broker Server 核心 API (重点实现) 1.5、交换机类型 Direct 直接交换机 Fanout 扇出交换机 Topic 主题交换机 1.6、持久化 1.7、网络通信 通信流程 远程调用设计思想 1.8、模块设计图

    2024年02月12日
    浏览(9)
  • 根据源码,模拟实现 RabbitMQ - 虚拟主机 + Consume设计 (7)

    根据源码,模拟实现 RabbitMQ - 虚拟主机 + Consume设计 (7)

    目录 一、虚拟主机 + Consume设计 1.1、承接问题 1.2、具体实现 1.2.1、消费者订阅消息实现思路 1.2.2、消费者描述自己执行任务方式实现思路 1.2.3、消息推送给消费者实现思路 1.2.4、消息确认 前面已经实现了虚拟主机大部分功能以及转发规则的判定,也就是说,现在消息已经可

    2024年02月11日
    浏览(10)
  • 根据源码,模拟实现 RabbitMQ - 实现消息持久化,统一硬盘操作(3)

    根据源码,模拟实现 RabbitMQ - 实现消息持久化,统一硬盘操作(3)

    目录 一、实现消息持久化 1.1、消息的存储设定 1.1.1、存储方式 1.1.2、存储格式约定 1.1.3、queue_data.txt 文件内容  1.1.4、queue_stat.txt 文件内容 1.2、实现 MessageFileManager 类 1.2.1、设计目录结构和文件格式 1.2.2、实现消息的写入 1.2.3、实现消息的删除(随机访问文件) 1.2.4、获取队

    2024年02月12日
    浏览(12)
  • C++ 简单实现RPC网络通讯

    C++ 简单实现RPC网络通讯

            RPC是远程调用系统简称,它允许程序调用运行在另一台计算机上的过程,就像调用本地的过程一样。RPC 实现了网络编程的“过程调用”模型,让程序员可以像调用本地函数一样调用远程函数。最近在做的也是远程调用过程,所以通过重新梳理RPC来整理总结一下。  

    2023年04月08日
    浏览(15)
  • 根据源码,模拟实现 RabbitMQ - 通过 SQLite + MyBatis 设计数据库(2)

    根据源码,模拟实现 RabbitMQ - 通过 SQLite + MyBatis 设计数据库(2)

    目录 一、数据库设计 1.1、数据库选择 1.2、环境配置 1.3、建库建表接口实现 1.4、封装数据库操作 1.5、针对 DataBaseManager 进行单元测试 1.6、心得 MySQL 是我们最熟悉的数据库,但是这里我们选择使用 SQLite,原因如下: SQLite 比 MySQL 更轻量:一个完整的 SQLite 数据库,只有一个单

    2024年02月13日
    浏览(9)
  • [Qt网络编程]之UDP通讯的简单编程实现

    [Qt网络编程]之UDP通讯的简单编程实现

    hello!欢迎大家来到我的Qt学习系列之 网络编程之UDP通讯的简单编程实现。 希望这篇文章能对你有所帮助!!! 本篇文章的相关知识请看我的上篇文章: 目录 UDP通讯  基于主窗口的实现  基于线程的实现          UDP数据报协议是一个面向无连接的传输层报文协议 ,它简

    2024年04月25日
    浏览(14)
  • 物联网网络通讯知识

    物联网网络通讯知识

    RTU英文全称Remote Terminal Units,中文全称为远程终端单元。远程终端设备(RTU)是安装在远程现场的 电子设备 ,用来监视和测量安装在远程现场的传感器和设备。通俗理解就是能够编程的还可以将数据传输到服务器的工具。RTU内部是包含 通讯模块 的,RTU仪表配置服务器后,就可

    2024年02月05日
    浏览(9)
  • 网络通讯组件性能优化之路

    网络通讯组件性能优化之路

    BIO为同步阻塞IO,blocking queue的简写,也就是说多线程情况下只有一个线程操作内核的queue,当前线程操作完queue后,才能给下一个线程操作; 问题 在BIO下,一个连接就对应一个线程,如果连接特别多的情况下,就会有特别多的线程,很费线程;在早期的时候,世界上的计算机

    2024年02月02日
    浏览(9)
  • 网络通讯录服务器

    网络通讯录服务器

    简易版本 服务端完整版本 客户端完整版本 Protobuf还常⽤于通讯协议、服务端数据交换场景。那么在这个⽰例中,我们将实现⼀个⽹络版本的 通讯录,模拟实现客⼾端与服务端的交互,通过Protobuf来实现各端之间的协议序列化。 需求如下: 客⼾端可以选择对通讯录进⾏以下操

    2024年02月12日
    浏览(12)
  • 【Linux Day15 TCP网络通讯】

    【Linux Day15 TCP网络通讯】

    接口介绍 socket()方法是用来创建一个套接字 ,有了套接字就可以通过网络进行数据的收发。创建套接字时要指定使用的服务类型,使用 TCP 协议选择流式服务(SOCK_STREAM)。 **bind()方法是用来指定套接字使用的 IP 地址和端口。**IP 地址就是自己主机的地址,测试程序时可以使

    2024年02月19日
    浏览(7)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包