自定义Dubbo RPC通信协议

这篇具有很好参考价值的文章主要介绍了自定义Dubbo RPC通信协议。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

Dubbo 协议层的核心SPI接口是org.apache.dubbo.rpc.Protocol,通过扩展该接口和围绕的相关接口,就可以让 Dubbo 使用我们自定义的协议来通信。默认的协议是 dubbo,本文提供一个 Grpc 协议的实现。

设计思路

Google 提供了 Java 的 Grpc 实现,所以我们站在巨人的肩膀上即可,就不用重复造轮子了。

首先,我们要实现 Protocol 接口,服务暴露时开启我们的 GrpcServer,绑定本地端口,用于后续处理连接和请求。
服务端如何处理grpc请求呢???
方案一,是把暴露的所有服务 Invoker 都封装成grpc的 Service,全部统一让 GrpcServer 处理,但是这么做太麻烦了。方案二,是提供一个 DispatcherService,统一处理客户端发来的grpc请求,再根据参数查找要调用的服务,执行本地调用返回结果。本文采用方案二。
客户端引用服务时,我们创建 GrpcInvoker 对象,和服务端建立连接并生成 DispatcherService 的本地存根 Stub 对象,发起 RPC 调用时只需把 RpcInvocation 转换成 Protobuf 消息发出去即可。

实现GrpcProtocol

项目结构

首先,我们新建一个dubbo-extension-protocol-grpc模块,引入必要的依赖。

<dependencies>
    <dependency>
        <groupId>org.apache.dubbo</groupId>
        <artifactId>dubbo-rpc-api</artifactId>
        <version>${dubbo.version}</version>
    </dependency>
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-all</artifactId>
        <version>1.56.1</version>
    </dependency>
</dependencies>

项目结构:

main
--java
----dubbo.extension.rpc.grpc
------message
--------RequestData.java
--------ResponseData.java
------Codec.java
------DispatcherService.java
------DispatcherServiceGrpc.java
------GrpcExporter.java
------GrpcInvoker.java
------GrpcProtocol.java
------GrpcProtocolServer.java
--resources
----META-INF/dubbo
------org.apache.dubbo.rpc.Protocol

服务&消息定义

然后是定义grpc的 Service 和消息格式
DispatcherService.proto 请求分发服务的定义

syntax = "proto3";

option java_multiple_files = true;
option java_package = "dubbo.extension.rpc.grpc";
option java_outer_classname = "DispatcherServiceProto";
option objc_class_prefix = "HLW";

import "RequestData.proto";
import "ResponseData.proto";

service DispatcherService {
  rpc dispatch (RequestData) returns (ResponseData) {}
}

RequestData.proto 请求消息的定义,主要是对 Invocation 的描述

syntax = "proto3";

option java_multiple_files = true;
option java_package = "dubbo.extension.rpc.grpc.message";
option java_outer_classname = "RequestDataProto";
option objc_class_prefix = "HLW";

message RequestData {
  string targetServiceUniqueName = 1;
  string methodName = 2;
  string serviceName = 3;
  repeated bytes parameterTypes = 4;
  string parameterTypesDesc = 5;
  repeated bytes arguments = 6;
  bytes attachments = 7;
}

ResponseData.proto 响应消息的定义,主要是对 AppResponse 的描述

syntax = "proto3";

option java_multiple_files = true;
option java_package = "dubbo.extension.rpc.grpc.message";
option java_outer_classname = "ResponseataProto";
option objc_class_prefix = "HLW";

message ResponseData {
  int32 status = 1;
  string errorMessage = 2;
  bytes result = 3;
  bytes attachments = 4;
}

使用protobuf-maven-plugin插件把 proto 文件生成对应的 Java 类。

协议实现

新建 GrpcProtocol 类,继承 AbstractProtocol,实现 Protocol 协议细节。
核心是:服务暴露时开启 Grpc 服务,引用服务时生成对应的 Invoker。

public class GrpcProtocol extends AbstractProtocol {

    @Override
    protected <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException {
        return new GrpcInvoker<>(type, url);
    }

    @Override
    public int getDefaultPort() {
        return 18080;
    }

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        GrpcExporter<T> exporter = new GrpcExporter<>(invoker);
        exporterMap.put(invoker.getInterface().getName(), exporter);
        openServer(invoker.getUrl());
        return exporter;
    }

    private void openServer(URL url) {
        String key = serviceKey(url);
        ProtocolServer protocolServer = serverMap.get(key);
        if (protocolServer == null) {
            synchronized (serverMap) {
                protocolServer = serverMap.get(key);
                if (protocolServer == null) {
                    serverMap.put(key, createServer(url));
                }
            }
        }
    }

    private ProtocolServer createServer(URL url) {
        return new GrpcProtocolServer(url, exporterMap);
    }
}

新建 GrpcProtocolServer 类实现 ProtocolServer 接口,核心是启动 GrpcServer,并添加 DispatcherService 处理请求。

public class GrpcProtocolServer implements ProtocolServer {

    private final Server server;

    public GrpcProtocolServer(URL url, Map<String, Exporter<?>> exporterMap) {
        server = ServerBuilder.forPort(url.getPort())
                .addService(new DispatcherService(exporterMap))
                .build();
        try {
            server.start();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public String getAddress() {
        return null;
    }

    @Override
    public void setAddress(String address) {

    }

    @Override
    public void close() {
        server.shutdown();
    }
}

新建 DispatcherService 类实现 Grpc Service,用来处理客户端的grpc请求。核心是把 RequestData 解码成 RpcInvocation,再查找本地 Invoker 调用并返回结果。

public class DispatcherService extends DispatcherServiceGrpc.DispatcherServiceImplBase {

    private final Map<String, Exporter<?>> exporterMap;

    public DispatcherService(Map<String, Exporter<?>> exporterMap) {
        this.exporterMap = exporterMap;
    }

    @Override
    public void dispatch(RequestData request, StreamObserver<ResponseData> responseObserver) {
        RpcInvocation invocation = Codec.decodeInvocation(request);
        ResponseData responseData;
        try {
            Invoker<?> invoker = exporterMap.get(invocation.getServiceName()).getInvoker();
            Object returnValue = invoker.invoke(invocation).get().getValue();
            responseData = Codec.encodeResponse(returnValue, null);
        } catch (Exception e) {
            responseData = Codec.encodeResponse(null, e);
        }
        responseObserver.onNext(responseData);
        responseObserver.onCompleted();
    }
}

新建 GrpcInvoker 类实现 Invoker 接口,服务引用时会创建它,目的是发起 RPC 调用时通过 Stub 发一个请求到 DispatcherService,实现grpc协议的 RPC 调用。

public class GrpcInvoker<T> extends AbstractInvoker<T> {

    private static final Map<String, DispatcherServiceGrpc.DispatcherServiceFutureStub> STUB_MAP = new ConcurrentHashMap<>();

    public GrpcInvoker(Class<T> type, URL url) {
        super(type, url);
    }

    private DispatcherServiceGrpc.DispatcherServiceFutureStub getStub() {
        String key = getUrl().getAddress();
        DispatcherServiceGrpc.DispatcherServiceFutureStub stub = STUB_MAP.get(key);
        if (stub == null) {
            synchronized (STUB_MAP) {
                stub = STUB_MAP.get(key);
                if (stub == null) {
                    STUB_MAP.put(key, stub = createClient(getUrl()));
                }
            }
        }
        return stub;
    }

    private DispatcherServiceGrpc.DispatcherServiceFutureStub createClient(URL url) {
        ManagedChannel channel = ManagedChannelBuilder.forAddress(url.getHost(), url.getPort()).usePlaintext().build();
        return DispatcherServiceGrpc.newFutureStub(channel);
    }

    @Override
    protected Result doInvoke(Invocation invocation) throws Throwable {
        RequestData requestData = Codec.encodeInvocation((RpcInvocation) invocation);
        ResponseData responseData = getStub().dispatch(requestData).get();
        return Codec.decodeResponse(responseData, invocation);
    }
}

最后是编解码器 Codec,它的作用是对 RequestData、ResponseData 对象的编解码。对于请求来说,要编解码的是 RpcInvocation;对于响应来说,要编解码的是返回值和异常信息。
方法实参是 Object[] 类型,附带参数是 Map 类型,本身不能直接通过 Protobuf 传输,我们会先利用 Serialization 序列化成字节数组后再传输。

public class Codec {

    private static final Serialization serialization = ExtensionLoader.getExtensionLoader(Serialization.class).getDefaultExtension();

    public static RequestData encodeInvocation(RpcInvocation invocation) {
        RequestData.Builder builder = RequestData.newBuilder()
                .setTargetServiceUniqueName(invocation.getTargetServiceUniqueName())
                .setMethodName(invocation.getMethodName())
                .setServiceName(invocation.getServiceName());
        for (Class<?> parameterType : invocation.getParameterTypes()) {
            builder.addParameterTypes(serialize(parameterType));
        }
        builder.setParameterTypesDesc(invocation.getParameterTypesDesc());
        for (Object argument : invocation.getArguments()) {
            builder.addArguments(serialize(argument));
        }
        builder.setAttachments(serialize(invocation.getAttachments()));
        return builder.build();
    }

    public static RpcInvocation decodeInvocation(RequestData requestData) {
        RpcInvocation invocation = new RpcInvocation();
        invocation.setTargetServiceUniqueName(requestData.getTargetServiceUniqueName());
        invocation.setMethodName(requestData.getMethodName());
        invocation.setServiceName(requestData.getServiceName());
        List<ByteString> parameterTypesList = requestData.getParameterTypesList();
        Class<?>[] parameterTypes = new Class[parameterTypesList.size()];
        for (int i = 0; i < parameterTypesList.size(); i++) {
            parameterTypes[i] = (Class<?>) deserialize(parameterTypesList.get(i));
        }
        invocation.setParameterTypes(parameterTypes);
        invocation.setParameterTypesDesc(requestData.getParameterTypesDesc());
        List<ByteString> argumentsList = requestData.getArgumentsList();
        Object[] arguments = new Object[argumentsList.size()];
        for (int i = 0; i < argumentsList.size(); i++) {
            arguments[i] = deserialize(argumentsList.get(i));
        }
        invocation.setArguments(arguments);
        invocation.setAttachments((Map<String, String>) deserialize(requestData.getAttachments()));
        return invocation;
    }

    public static Result decodeResponse(ResponseData responseData, Invocation invocation) {
        AppResponse appResponse = new AppResponse();
        if (responseData.getStatus() == 200) {
            appResponse.setValue(deserialize(responseData.getResult()));
            appResponse.setAttachments((Map<String, String>) deserialize(responseData.getAttachments()));
        } else {
            appResponse.setException(new RuntimeException(responseData.getErrorMessage()));
        }
        return new AsyncRpcResult(CompletableFuture.completedFuture(appResponse), invocation);
    }

    private static Object deserialize(ByteString byteString) {
        try {
            InputStream inputStream = new ByteArrayInputStream(byteString.toByteArray());
            ObjectInput objectInput = serialization.deserialize(null, inputStream);
            return objectInput.readObject();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static ByteString serialize(Object obj) {
        try {
            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
            ObjectOutput output = serialization.serialize(null, outputStream);
            output.writeObject(obj);
            output.flushBuffer();
            return ByteString.copyFrom(outputStream.toByteArray());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static ResponseData encodeResponse(Object returnValue, Throwable throwable) {
        ResponseData.Builder builder = ResponseData.newBuilder();
        if (throwable == null) {
            builder.setStatus(200);
            builder.setResult(serialize(returnValue));
            builder.setAttachments(serialize(new HashMap<>()));//先忽略
        } else {
            builder.setStatus(500);
            builder.setErrorMessage(throwable.getMessage());
        }
        return builder.build();
    }
}

实现完毕,最后是让 Dubbo 可以加载到我们自定义的 GrpcProtocol,可以通过 SPI 的方式。新建META-INF/dubbo/org.apache.dubbo.rpc.Protocol文件,内容:

grpc=dubbo.extension.rpc.grpc.GrpcProtocol

服务提供方使用自定义协议:

ProtocolConfig protocolConfig = new ProtocolConfig("grpc", 10880);

消费方使用自定义协议:

ReferenceConfig#setUrl("grpc://127.0.0.1:10880");

尾巴

Protocol 层关心的是如何暴露服务和引用服务,以及如何让双方使用某个具体的协议来通信,以完成 RPC 调用。如果你觉得官方提供的 dubbo 协议无法满足你的业务,就可以通过扩展 Protocol 接口来实现你自己的私有协议。文章来源地址https://www.toymoban.com/news/detail-805950.html

到了这里,关于自定义Dubbo RPC通信协议的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 用Netty自己实现Dubbo RPC

    用Netty自己实现Dubbo RPC

    1. RPC(Remote Procedure Call)— 远程过程调用,是一个计算机通信协议. 该协议 允许运行在一台计算机中的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程; 2. 两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样 (如图): 3.常

    2024年02月10日
    浏览(11)
  • 分布式RPC框架Dubbo详解

    分布式RPC框架Dubbo详解

    目录   1.架构演进 1.1 单体架构 1.2  垂直架构 1.3 分布式架构 1.4 SOA架构 1.5 微服务架构 2.RPC框架 2.1 RPC基本概念介绍 2.1.1 RPC协议 2.1.2 RPC框架 2.1.3 RPC与HTTP、TCP/ UDP、Socket的区别 2.1.4 RPC的运行流程  2.1.5 为什么需要RPC 2.2 Dubbo  2.2.1 Dubbo 概述 2.2.2 Dubbo实战   架构演进如下图: 这

    2024年02月07日
    浏览(22)
  • 深入浅出:理解 RPC 和 Dubbo 架构

    深入浅出:理解 RPC 和 Dubbo 架构

    Apache Dubbo是一款高性能的Java RPC框架.其前身是阿里巴巴公司开源的一个高性能,轻量级的开源Java RPC框架,可以和Spring框架无缝集成. Dubbo 官网 RPC介绍 Remote Procedure Call 远程过程调用,是分布式架构的核心,按响应方式分以下两种: 同步调用:客户端调用服务方方法,等待直到服务方返

    2023年04月12日
    浏览(12)
  • 应用架构演变过程、rpc及Dubbo简介

            单一应用架构 - 垂直应用架构 - 分布式服务架构 - 微服务架构。 单一应用架构         当网站流量很小时,只需一个应用,将所有功能都部署在一起,以减少部署节点和成本。 此时,用于简化增删改查工作量的 数据访问框架(ORM) 是关键。         缺点:

    2024年02月02日
    浏览(11)
  • 【Dubbo3云原生微服务开发实战】「Dubbo前奏导学」 RPC服务的底层原理和实现

    【Dubbo3云原生微服务开发实战】「Dubbo前奏导学」 RPC服务的底层原理和实现

    Dubbo是一款高效而强大的RPC服务框架,它旨在解决微服务架构下的服务监控和通信问题。该框架提供了Java、Golang等多语言的SDK,使得使用者可以轻松构建和开发微服务。Dubbo具备远程地址发现和通信能力,可通过Dubbo独有的身临其境的服务治理特验为主导,以提高开发人员的功

    2024年02月05日
    浏览(10)
  • 不满足于RPC,详解Dubbo的服务调用链路

    不满足于RPC,详解Dubbo的服务调用链路

    【收藏向】从用法到源码,一篇文章让你精通Dubbo的SPI机制 面试Dubbo ,却问我和Springcloud有什么区别? 超简单,手把手教你搭建Dubbo工程(内附源码) Dubbo最核心功能——服务暴露的配置、使用及原理 并不简单的代理,Dubbo是如何做服务引用的 经过前面一系列的铺垫,今天终

    2024年02月16日
    浏览(14)
  • 微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用

    微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用

    🏷️ 个人主页 :鼠鼠我捏,要死了捏的主页  🏷️ 系列专栏 :Golang全栈-专栏 🏷️ 个人学习笔记,若有缺误,欢迎评论区指正   前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站AI学习网站。 目录 前言 快速上手

    2024年02月19日
    浏览(10)
  • Java 【dubbo rpc改feign调用】controller注解处理

    【框架改造问题点记录,dubbo改为spring cloud alibaba】 【第三篇】controller注解处理 【描述】项目之前用了jboss,引入了很多ws.rs包,controller参数注解使用QueryParam。改造时批量替换成了@RequestParam(代表必传)。但是前端并不会传全部参数,会导致400,持续更新… 不加注解,表示

    2024年02月17日
    浏览(10)
  • Dubbo源码解析第一期:如何使用Netty4构建RPC

    Dubbo源码解析第一期:如何使用Netty4构建RPC

            早期学习和使用Dubbo的时候(那时候Dubbo还没成为Apache顶级项目),写过一些源码解读,但随着Dubbo发生了翻天覆地的变化,那些文章早已过时,所以现在计划针对最新的Apache Dubbo源码来进行“阅读理解”,希望和大家一起再探Dubbo的实现。由于能力有限,如果文章

    2024年01月21日
    浏览(11)
  • Netty核心技术十一--用Netty 自己 实现 dubbo RPC

    Netty核心技术十一--用Netty 自己 实现 dubbo RPC

    RPC(Remote Procedure Call) :远程 过程调用,是一个计算机 通信协议。该协议允许运 行于一台计算机的程序调 用另一台计算机的子程序, 而程序员无需额外地为这 个交互作用编程 两个或多个应用程序都分 布在不同的服务器上,它 们之间的调用都像是本地 方法调用一样(如图

    2024年02月16日
    浏览(12)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包