Netty基础

这篇具有很好参考价值的文章主要介绍了Netty基础。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

2.1Netty是什么

是一个基于异步的(多线程处理结果和接收)、事件驱动的网络应用框架,用于基于快速开发可维护、高性能的网络服务器和客户端

异步是指调用时的异步,他的IO还是多路复用的IO

许多中间件都依赖与Netty

  • zookper
  • hadoop
  • Spring 5.x - flux 抛弃了Tomact 使用netty作为服务器端

Netty优势

  • 解决很多网络传输中的问题,比如黏包和半包
  • 对API进行增强
  • 防止epoll空轮询导致CPU100%

2.2Hello World

目标:

开发一个简单的服务器端和客户端

  • 客户端向服务器发送hello world
  • 服务器仅接收,不返回
  • HelloServer
public class HelloServer {
    public static void main(String[] args) {
        //1. 启动器 负责组装netty组件 启动服务器
        new ServerBootstrap()
                //2.BossEventLoop WorkerEventLoop(selector,thread) 加入一个eventGroup组
            	//EventLoop用来处理数据的容器
                .group(new NioEventLoopGroup())
                //3. 选择服务器的实现
                .channel(NioServerSocketChannel.class)
                //4. boss 负责处理连接 worker(child)负责处理读写 决定了worker(child) 能执行哪些操作(handler)
                .childHandler(
                        //5. channel代表和客户端进行数据读写的通道 Initializer初始化器  负责添加别的handler
                        new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        //6.添加具体的handler
                        //用于解码 将传输过来的数据类型(byteBuf) 转换成字符串
                        //服务器收到接收的结果后,调用handler的初始化方法  然后一步一步执行自己写的handler即可
                        nioSocketChannel.pipeline().addLast(new StringDecoder());
                        //自定义的handler
                        nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            //处理读事件
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                //打印上一步转换好的字符串
                                System.out.println(msg);
                            }
                        });
                    }
                    //7. 绑定 监听端口
                }).bind(8080);
    }
}
  • HelloClient
public class HelloClient {
    public static void main(String[] args) throws InterruptedException {
        //1.创建启动器类 启动客户端
        new Bootstrap()
                //2.添加EventLoop  //EventLoop用来处理数据的容器
                .group(new NioEventLoopGroup())
                //3.选择客户端的channel实现
                .channel(NioSocketChannel.class)
                //4.添加处理器
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override  //连接建立后被调用
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        //编码器 将string -> byteBuf
                        nioSocketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                //5.连接到服务器
                .connect(new InetSocketAddress("localhost",8080))
                .sync()     //阻塞方法 直到连接建立
                .channel()
                //  向客户端发送数据
                .writeAndFlush("hello,world");  //收发数据都要走handler  这里将str -> ByteBuf 然后发给服务器
    }
}

2.3组件

2.3.1EventLoop

本质是一个单线程执行器(同时维护了一个selectror),里面有run方法处理Channel上源源不断的io事件。

继承关系:

  • juc.ScheduledExecutorService,所以包含了线程池中的所有方法
  • 继承netty自己的OrderEventExecutor
    • 提供了boolean inEventLoop(Thread thread)方法判断一个线程是否属于此EventLoop
    • 提供了parent方法来看自己属于哪个EventLoop
事件循环组:

EventLoopGroup是一组EventGroup,Channel一般会调用EventLoopGroup

示例:

@Slf4j
public class TestEventLoop {
    public static void main(String[] args) {
        //1.NioEventLoopGroup功能比较全面:定时任务、Io、普通任务
        //不指定参数,默认线程数为cpu核心数*2
        EventLoopGroup group = new NioEventLoopGroup(2);
//        System.out.println(NettyRuntime.availableProcessors());

        //普通任务、定时任务
//        EventLoopGroup group2 = new DefaultEventLoopGroup();

        //2.获取下一个事件循环对象  每次调用得到下一个 然后回过头从第一个开始
        System.out.println(group.next()); //NioEventLoop@d70c109
        System.out.println(group.next()); //NioEventLoop@17ed40e0
        System.out.println(group.next()); //NioEventLoop@d70c109

        //3.执行普通任务  异步处理
//        group.next().submit(()->{
//            log.debug("new Runnable OK"); //[nioEventLoopGroup-2-1]
//        });

        //4.执行定时任务  初始延迟1s后 2s打印一次
        group.next().scheduleAtFixedRate(()->{
            log.debug("data ok");
        },1,2, TimeUnit.SECONDS);

        log.debug("Main OK");

    }
}

接收和发送案例

  • client
@Sl4j
public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException {
        //1.创建启动器类 启动客户端
        Channel channel = new Bootstrap()
                //2.添加EventLoop  EventLoop用来处理数据的容器
                .group(new NioEventLoopGroup())
                //3.选择客户端的channel实现
                .channel(NioSocketChannel.class)
                //4.添加处理器
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override  //连接建立后被调用
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        //编码器 将string -> byteBuf
                        nioSocketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                //5.连接到服务器
                .connect(new InetSocketAddress("localhost", 8080))
                .sync()     //阻塞方法 直到连接建立
                .channel();
        //  向客户端发送数据
        System.out.println(channel);
        System.out.println("");  //此处断点  右键断点选择Thread模式
    }
}
  • server
@Slf4j
public class EventLoopServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override  //没有数据转化的处理器 此处msg是ByteBuf类型
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buffer = (ByteBuf) msg;
                                log.debug(buffer.toString(Charset.defaultCharset()));
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

断点处右键 发送不同的案例 跑多个client

10:29:30.078 [nioEventLoopGroup-2-3] DEBUG com.rpc.netty.test2.EventLoopServer - 1
10:30:00.447 [nioEventLoopGroup-2-3] DEBUG com.rpc.netty.test2.EventLoopServer - 2
10:30:37.059 [nioEventLoopGroup-2-3] DEBUG com.rpc.netty.test2.EventLoopServer - 2cccc
10:33:34.727 [nioEventLoopGroup-2-4] DEBUG com.rpc.netty.test2.EventLoopServer - 122222

不同的client绑定的nioEventLoopGroup不同,一个client只绑定一个,固定的绑定一个,所以从这可以看到,nioEventLoopGroup其实就像一个大的线程池,里面包含了不同的和client连接的线程,用来处理任务

流程图:

Netty基础

对server的改进:

当多个线程来的时候,有的是accept,有的是读写操作,我们现在想将他们区分开。那么区分开后会不会有什么问题呢?因为我们使用的是NIO模型,所以当一个线程耗时比较长的时候,可能就会造成其他线程无法处理问题,这个时候怎么办呢?

我们的解决办法都是用不同的EventLoop去处理不同的问题:

@Slf4j
public class EventLoopServer {
    public static void main(String[] args) {
        //改进2: 防止一个线程等待时间过长影响其他线程 创一个独立的EventLoopGroup去处理
        DefaultEventLoopGroup group = new DefaultEventLoopGroup();
        new ServerBootstrap()
                //改进1,负责accept和read
                //boos 和 worker boos只负责处理accept事件 worker只负责读写
                .group(new NioEventLoopGroup(),new NioEventLoopGroup(2))//worker 只有两个 可以一个线程连接多个client
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        //todo 改进2:命名为handler1 和handler2进行对比
                        nioSocketChannel.pipeline().addLast("handler1",new ChannelInboundHandlerAdapter(){
                            @Override  //没有数据转化的处理器 此处msg是ByteBuf类型
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buffer = (ByteBuf) msg;
                                log.debug(buffer.toString(Charset.defaultCharset()));
                                log.debug("handler1被{}线程处理",Thread.currentThread().getName());
                                //todo 将消息传递给下一个handler
                                ctx.fireChannelRead(msg);
                            }
                          //todo 改进2:使用group里的线程进行处理,命名为handler2
                        }).addLast(group,"handler2",new ChannelInboundHandlerAdapter(){
                            @Override  //没有数据转化的处理器 此处msg是ByteBuf类型
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buffer = (ByteBuf) msg;
                                log.debug(buffer.toString(Charset.defaultCharset()));
                                log.debug("handler2被{}线程处理",Thread.currentThread().getName());
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

用三个不同的客户端发消息,结果:

14:35:15.206 [nioEventLoopGroup-4-1] DEBUG com.rpc.netty.test2.EventLoopServer - 1
14:35:15.206 [nioEventLoopGroup-4-1] DEBUG com.rpc.netty.test2.EventLoopServer - handler1被nioEventLoopGroup-4-1线程处理
14:35:15.207 [defaultEventLoopGroup-2-1] DEBUG com.rpc.netty.test2.EventLoopServer - 1
14:35:15.207 [defaultEventLoopGroup-2-1] DEBUG com.rpc.netty.test2.EventLoopServer - handler2被defaultEventLoopGroup-2-1线程处理

14:37:33.377 [nioEventLoopGroup-4-1] DEBUG com.rpc.netty.test2.EventLoopServer - 2
14:37:33.377 [nioEventLoopGroup-4-1] DEBUG com.rpc.netty.test2.EventLoopServer - handler1被nioEventLoopGroup-4-1线程处理
14:37:33.377 [defaultEventLoopGroup-2-3] DEBUG com.rpc.netty.test2.EventLoopServer - 2
14:37:33.377 [defaultEventLoopGroup-2-3] DEBUG com.rpc.netty.test2.EventLoopServer - handler2被defaultEventLoopGroup-2-3线程处理

14:37:42.990 [nioEventLoopGroup-4-2] DEBUG com.rpc.netty.test2.EventLoopServer - 3
14:37:42.990 [nioEventLoopGroup-4-2] DEBUG com.rpc.netty.test2.EventLoopServer - handler1被nioEventLoopGroup-4-2线程处理
14:37:42.990 [defaultEventLoopGroup-2-4] DEBUG com.rpc.netty.test2.EventLoopServer - 3
14:37:42.990 [defaultEventLoopGroup-2-4] DEBUG com.rpc.netty.test2.EventLoopServer - handler2被defaultEventLoopGroup-2-4线程处理

其实可以看到,读写操作给一个EventLoop去执行了,hander2用的也是另外一个eventLoop去执行的

流程图:

Netty基础

handler执行过程中如何切换给不同EventLoop处理handler的呢?
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    	//下一个handler的时间循环是否与当前的事件循环是一个线程
        EventExecutor executor = next.executor();
    	//是 直接调用
        if (executor.inEventLoop()) { //是否是同一个线程
            next.invokeChannelRead(m);
        } else {
            //不是 将要执行的diamante作为任务提交给下一个事件循环处理
            executor.execute(new Runnable() {
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }
  • 如果两个handler绑定的是同一个线程,那么直接调用
  • 如果不是,那么将会作为一个runnable传递给下一个handler的线程来调用

2.3.2Channel

channle主要作用:

  • close可以用来关闭channel
  • closeFuture用来处理channel的关闭
    • sync方法作用是同步等待channel的关闭
    • 而addList方法是异步等待channel关闭
  • plpeline方法添加处理器
  • write方法将数据写入
  • writeAndFlush方法将数据写入
Channel常用方法

1.处理异步连接

我们思考一个问题,为什么要用sync方法?

我们先看这样一个初始化客户端的写法:

@Slf4j
public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException {
        //xxxFuture、Promise基本都是和异步方法配合使用,用来处理结果
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override  //连接建立后被调用
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        nioSocketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                //1.连接到服务器
                //异步非阻塞   main 调用该方法  但是真正连接服务器的 是另一个线程 Nio线程
                .connect(new InetSocketAddress("localhost", 8080)); //可能1s后执行
}

返回了一个ChannelFuture对象,那么他有什么用呢?

他会异步非阻塞的去连接服务器,如果我们不写channel.sync方法,我们尝试去连接服务器发数据,我们的数据是没办法发送成功的

Channel channel = channelFuture.channel();
log.debug("{}",channel);
channel.writeAndFlush("hello world");

原因也很简单,因为他是异步的,所以当channelFuture.channel();先被执行时,因为没连接到,所以此时的channel是个空的,自然无法连接到服务器。

针对这个原因,我们提供了如下两个方法:

  //todo 1.使用sync同步处理结果,sync的作用就是等待connect连接成功,再继续执行下面的方法
//        channelFuture.sync();  //阻塞住 等待nio连接建立完成

//        Channel channel = channelFuture.channel();
//        log.debug("{}",channel);
//        channel.writeAndFlush("hello world");

        //todo 2: 使用addListener(回调对象) 方法异步处理结果
        channelFuture.addListener(new ChannelFutureListener(){
            @Override  //在nio线程连接建立完之后,会调用该方法
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                //调用下面的 发送的不是主线程 是nio线程
                Channel channel = channelFuture.channel();
                log.debug("{}",channel);
                channel.writeAndFlush("222");
            }
        });

方式2日志结果:

21:45:24.571 [nioEventLoopGroup-2-1] DEBUG com.rpc.netty.test3.EventLoopClient - [id: 0xc70019e8, L:/127.0.0.1:1752 - R:localhost/127.0.0.1:8080]

可见,是个nio的线程进行处理的

⎛⎝≥⏝⏝≤⎛⎝

2.处理异步关闭后的操作
我们现在有另一个需求,我们需要让客户端断开连接之后,继续执行一些操作,我们现在有这样的想法:

@Slf4j
public class CloseFutureClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        ChannelFuture channelFuture = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); //日志级别 debug
                        nioSocketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));
        Channel channel = channelFuture.sync().channel();
        log.debug("{}",channel);
        new Thread(()->{
            Scanner scanner = new Scanner(System.in);
            while (true){
                String next = scanner.next();
                if("q".equals(next)){
                    channel.close();
                    break;
                }
                channel.writeAndFlush(next);

            }
        },"input").start();

        log.debug("处理关闭之后的操作");
    }
}

当我们将这个客户端启动之后,我们就可以看见已经开始处理关闭在之后的操作了,显然,这不是我们需要的,原因就是因为客户端的连接以及在线程里进行收取信息,这些其实都是异步的,而主线程在启动之后就会打印日志内容了。

那么我们在channel.close();之后处理可以吗?

这也是不行的,因为close也是异步方法,所有后面的也不一定会被处理掉。

那该怎么办?

其实和1内连接类似,提供下面两种方法:

     	//todo 3.正确写法:
        //获取CloseFuture对象
        ChannelFuture closeFuture = channel.closeFuture();
        System.out.println("waiting close....");
        //3.1同步模式处理关闭
//        closeFuture.sync();
//        log.debug("处理关闭之后的操作"); //main线程打印

        //3.2 异步关闭 类似channel的
        closeFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                log.debug("处理关闭之后的操作");  //[nioEventLoopGroup-2-1]  线程执行
            }
        });

这些都可以实现

但是新的问题又来了,我们close之后,客户端并不会自动的停止运行,这是怎么回事?

因为NioEventLoopGroupe内还有些线程在运行,所以我们需要在处理关闭之后的操作后单独的调用一下他的关闭方法

//优雅关闭Nio线程,拒绝接收新的任务
group.shutdownGracefully();

他将会拒绝接收新的任务,然后优美的结束

为什么netty需要异步?

异步效率高?

相比于只使用一个一个的线程去处理相同的事情,不如用相同的线程,每个线程只处理一个事件,就像 医院的挂号、看病、结账、拿药一样,分派给不同的人去做。

  • 单线程没有办法异步提高效率,必须配合多线程、多核CPU才能发挥异步的优势
  • 异步没有缩短响应时间,反而有所增加
  • 合理进行任务拆分,也是利用异步的关键

2.3.3Future & Promise

异步处理常用的两个接口

netty的Futrue继承自jdk的Futrue,而Promise继承netty的Future

  • jdk:Future只能同步等待任务结束(成功或失败),才能得到结果
  • netty:
    • Future可以同步等待任务结束结果,也可以异步获取结果,但是都要等待任务结束
    • Promise:不仅有netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器

jdk Future 、Netty Future & Promise

具体例子:

  • jdk Future
@Slf4j
public class TestJdkFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.jdk的future主要关联线程池使用
        ExecutorService pool = Executors.newFixedThreadPool(2);
        //2.提交任务
        Future<Integer> future = pool.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.debug("执行计算");
                Thread.sleep(1000);
                return 50;
            }
        });

        //3.main线程通过future获取结果
        log.debug("等待结果");
        Integer res = future.get();
        log.debug("结果是:{}",res);
    }
}
  • netty Future
@Slf4j
public class TestNettyFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        NioEventLoopGroup eventLoop = new NioEventLoopGroup();

        EventLoop next = eventLoop.next();
        Future<Integer> future = next.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.debug("处理结果");
                Thread.sleep(1000);
                return 100;
            }
        });

        log.debug("等待结果");
//        log.debug("结果{}",future.get());  //同步方式获取结果
        //异步获取结果
        future.addListener(new GenericFutureListener<Future<? super Integer>>() {
            @Override
            public void operationComplete(Future<? super Integer> future) throws Exception {
                log.debug("接收结果:{}",future.getNow()); //立刻获取结果 非阻塞
            }
        });
    }
}
  • Netty Promise
@Slf4j
public class TestNettyPromise {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.准备eventLoop对象
        EventLoop eventLoop = new NioEventLoopGroup().next();

        //2.主动拿到promise对象  结果容器
        DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);

        //3.线程执行计算,像promise对象填充结果
        new Thread(()->{
            log.debug("开始计算1");
            try {
                int i = 1/0;
                Thread.sleep(1000);
                //计算成功结果放入容器
                promise.setSuccess(80);
            } catch (InterruptedException e) {
                e.printStackTrace();
                //异常 将异常也放进容器
                promise.setFailure(e);
            }
            

        }).start();

        //4.接收结果的线程
        log.debug("等待结果");
        log.debug("结果:{}",promise.get());
    }
}

2.3.4Handler & Pipeline

业务处理主要在Handler工作

ChannelHandler用来处理Channel上的各种事件,分为入站和出站。所有ChannelHandler连成一起,就是Pipeline

看看他们之间的关系吧

@Slf4j
public class TestPipeline {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        //1.通过nio拿到pipeline(pipeline)
                        ChannelPipeline pipeline = nioSocketChannel.pipeline();

                        //2.添加处理器 handler链(双向链表):head ->  h1 -> h2 -> h3 -> tail
                        pipeline.addLast("handler1",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("1");
                                ByteBuf buf = (ByteBuf) msg;
                                String name = buf.toString(Charset.defaultCharset());

                                //传递给h2 用来往下走 此时用来传递处理的是name 也就是这里处理过的
                                super.channelRead(ctx, name);
                            }
                        });
                        pipeline.addLast("handler2",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("2");
                                //获取到name后 转换成student
                                Student student = new Student(msg.toString());
                                super.channelRead(ctx, student);
                                //作用和super.channelRead效果一样
//                                ctx.fireChannelRead(student);
                            }
                        });
                        pipeline.addLast("handler3",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object student) throws Exception {
                                log.debug("3,结果:{},类型:{}",student,student.getClass());
                                //后面没有入站处理器,所以写出操作
                                //为了触发下面的出站处理器4 5 6 否则下面的只有写入操作才会触发
                                //ctx是从当前处理器,向前去找出站处理器,不是往后找,所以下面的出站处理器不会被执行
                                ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("hello".getBytes()));
                                //下面的这个可以往后调用整条
//                                nioSocketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("server".getBytes()));
                            }
                        });

                        //出站处理器 出站处理器会从后往前走 所以流程是6-> 5 -> 4
                        pipeline.addLast("handler4",new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("4");
                                super.write(ctx, msg, promise);
                            }
                        });
                        pipeline.addLast("handler5",new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("5");
                                super.write(ctx, msg, promise);
                            }
                        });
                        pipeline.addLast("handler6",new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("6");
                                super.write(ctx, msg, promise);
                            }
                        });
                    }
                })
                .bind(8080);
    }

    @Data
    @AllArgsConstructor
    static class Student{
        private String name;
    }
}

客户端使用CloseFutureClient

整个调用结果:

10:06:11.458 [nioEventLoopGroup-2-2] DEBUG com.rpc.netty.test5.TestPipeline - 1
10:06:11.460 [nioEventLoopGroup-2-2] DEBUG com.rpc.netty.test5.TestPipeline - 2
10:06:11.461 [nioEventLoopGroup-2-2] DEBUG com.rpc.netty.test5.TestPipeline - 3,结果:TestPipeline.Student(name=jj),类型:class com.rpc.netty.test5.TestPipeline$Student

因为里面ctx的操作问题,具体看里面的注释

下面是整个调用链的结构:

Netty基础

2.3.5ByteBuf

对字节数据的封装

创建
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;

import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
import static io.netty.util.internal.StringUtil.NEWLINE;

/**
* @author 我见青山多妩媚
* @date 2023/5/3 0003 11:03
* @Description TODO
*/
public class TestByteBuf {
   public static void main(String[] args) {
       //不知道容量为256
       ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
       log(buffer);         //read index:0 write index:0 capcacity:256
       StringBuilder builder = new StringBuilder();
       for (int i = 0; i < 32; i++) {
           builder.append("a");
       }

       buffer.writeBytes(builder.toString().getBytes());
       log(buffer);         //read index:0 write index:300 capcacity:512

   }

   private static void log(ByteBuf buffer){
       int len = buffer.readableBytes();
       int rows = len/16 + (len % 15 == 0 ? 0 : 1) + 4;
       StringBuilder buf = new StringBuilder(rows * 80 * 2)
               .append("read index:").append(buffer.readerIndex())
               .append(" write index:").append(buffer.writerIndex())
               .append(" capcacity:").append(buffer.capacity())
               .append(NEWLINE);
       appendPrettyHexDump(buf,buffer);
       System.out.println(buf.toString());
   }
}
直接内存 vs 堆内存

直接内存:

NIO的Buffer提供了一个可以不经过JVM内存直接访问系统物理内存的类——DirectBuffer。 DirectBuffer类继承自ByteBuffer,但和普通的ByteBuffer不同,普通的ByteBuffer仍在JVM堆上分配内存,其最大内存受到最大堆内存的限制;而DirectBuffer直接分配在物理内存中,并不占用堆空间,其可申请的最大内存受操作系统限制。

堆内存:

Java 虚拟机具有一个堆,堆是运行时数据区域,所有类实例和数组的内存均从此处分配。堆是在 Java 虚拟机启动时创建的。对象的堆内存由称为垃圾回收器的自动内存管理系统回收。

区别:

  • 直接内存的读写操作比普通Buffer快,但它的创建、销毁比普通Buffer慢。
  • 因此直接内存使用于需要大内存空间且频繁访问的场合,不适用于频繁申请释放内存的场合。

可以用下面的代码创建池化基于堆的ByteBuf

ByteBuf buffer  = ByteBufAllocator.DEFAULT.heapBuffer(10);

也可以使用下面的代码来创建池化基于直接内存的ByteBuff

ByteBuf buffer = ByteBufAllocatoe.DEFAULT.directBuffer(10);
  • 直接内存创建和销毁代价昂贵,但读写性能高(少一次内存复制,适合配合池化功能一起使用)
  • 直接内存对GC内存压力小,因为这部分内存不受JVM垃圾回收的管理,但也要注意及时主动释放
池化 vs 非池化

池化的最大意义在于可以重用ByteBuf,优点包括

  • 没有池化,则每次都得创建新的ByteBuf实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加GC压力
  • 有了池化,则可以重用池中ByteBuf实例,并且采用了与jemalloc类似的内存分配算法提升分配效率
  • 高并发时,池化功能更节约内存,减少内存溢出的可能

池化功能是否开启,可以通过下面的系统环境变量来设置

-Dio,netty.allocator.type={unpooled|pooled}
  • 4.1以后,非安卓平台默认开启池化实现,安卓采用非池化
  • 4.1之前,池化不成熟,都是非池化ww
组成

ByteBuf由四部分组成:

Netty基础

最开始读写指针都在0位置

写入

常用方法如下

方法签名 含义 备注
writeBoolean(boolean value) 写入 boolean 值 用一字节 01|00 代表 true|false
writeByte(int value) 写入 byte 值
writeShort(int value) 写入 short 值
writeInt(int value) 写入 int 值 Big Endian(大端写入),即 0x250,写入后 00 00 02 50
writeIntLE(int value) 写入 int 值 Little Endian(小端写入),即 0x250,写入后 50 02 00 00
writeLong(long value) 写入 long 值
writeChar(int value) 写入 char 值
writeFloat(float value) 写入 float 值
writeDouble(double value) 写入 double 值
writeBytes(ByteBuf src) 写入 netty 的 ByteBuf
writeBytes(byte[] src) 写入 byte[]
writeBytes(ByteBuffer src) 写入 nio 的 ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset) 写入字符串 CharSequence为字符串类的父类,第二个参数为对应的字符集

注意

  • 这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用来写入不同的数据
  • 网络传输中,默认习惯是 Big Endian,使用 writeInt(int value)

使用方法

public class ByteBufStudy {
    public static void main(String[] args) {
        // 创建ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
        ByteBufUtil.log(buffer);

        // 向buffer中写入数据
        buffer.writeBytes(new byte[]{1, 2, 3, 4});
        ByteBufUtil.log(buffer);

        buffer.writeInt(5);
        ByteBufUtil.log(buffer);

        buffer.writeIntLE(6);
        ByteBufUtil.log(buffer);

        buffer.writeLong(7);
        ByteBufUtil.log(buffer);
    }
} 

运行结果

read index:0 write index:0 capacity:16

read index:0 write index:4 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04                                     |....            |
+--------+-------------------------------------------------+----------------+

read index:0 write index:8 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05                         |........        |
+--------+-------------------------------------------------+----------------+

read index:0 write index:12 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00             |............    |
+--------+-------------------------------------------------+----------------+

read index:0 write index:20 capacity:20
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 00 00 00 00 |................|
|00000010| 00 00 00 07                                     |....            |
+--------+-------------------------------------------------+----------------+ 

还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置

扩容

当ByteBuf中的容量无法容纳写入的数据时,会进行扩容操作

buffer.writeLong(7);
ByteBufUtil.log(buffer); 
// 扩容前
read index:0 write index:12 capacity:16
...

// 扩容后
read index:0 write index:20 capacity:20
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 00 00 00 00 |................|
|00000010| 00 00 00 07                                     |....            |
+--------+-------------------------------------------------+----------------+ 
扩容规则
  • 如何写入后数据大小未超过 512 字节,则选择下一个 16 的整数倍进行扩容

    • 例如写入后大小为 12 字节,则扩容后 capacity 是 16 字节
  • 如果写入后数据大小超过 512 字节,则选择下一个 2

    n

    • 例如写入后大小为 513 字节,则扩容后 capacity 是 210=1024 字节(29=512 已经不够了)
  • 扩容不能超过 maxCapacity,否则会抛出java.lang.IndexOutOfBoundsException异常

Exception in thread "main" java.lang.IndexOutOfBoundsException: writerIndex(20) + minWritableBytes(8) exceeds maxCapacity(20): PooledUnsafeDirectByteBuf(ridx: 0, widx: 20, cap: 20/20)
... 
读取

读取主要是通过一系列read方法进行读取,读取时会根据读取数据的字节数移动读指针

如果需要重复读取,需要调用buffer.markReaderIndex()对读指针进行标记,并通过buffer.resetReaderIndex()将读指针恢复到mark标记的位置

public class ByteBufStudy {
    public static void main(String[] args) {
        // 创建ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);

        // 向buffer中写入数据
        buffer.writeBytes(new byte[]{1, 2, 3, 4});
        buffer.writeInt(5);

        // 读取4个字节
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        ByteBufUtil.log(buffer);

        // 通过mark与reset实现重复读取
        buffer.markReaderIndex();
        System.out.println(buffer.readInt());
        ByteBufUtil.log(buffer);

        // 恢复到mark标记处
        buffer.resetReaderIndex();
        ByteBufUtil.log(buffer);
    }
}

结果:

1
2
3
4
read index:4 write index:8 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05                                     |....            |
+--------+-------------------------------------------------+----------------+
5
read index:8 write index:8 capacity:16

read index:4 write index:8 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05                                     |....            |
+--------+-------------------------------------------------+----------------+ 

还有以 get 开头的一系列方法,这些方法不会改变读指针的位置

释放

由于 Netty 中有堆外内存(直接内存)的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。

  • UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
  • UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
  • PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存

Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口

  • 每个 ByteBuf 对象的初始计数为 1
  • 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
  • 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
  • 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用
释放规则

因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在每个 ChannelHandler 中都去调用 release ,就失去了传递性(如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)

基本规则是,谁是最后使用者,谁负责 release

  • 起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe.read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))

  • 入站 ByteBuf 处理原则

    • 对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
    • 将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
    • 如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
    • 注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
    • 假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
  • 出站 ByteBuf 处理原则

    • 出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release
  • 异常处理原则

    • 有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true

      while (!buffer.release()) {} 
      

当ByteBuf被传到了pipeline的head与tail时,ByteBuf会被其中的方法彻底释放,但前提是ByteBuf被传递到了head与tail中

TailConext中释放ByteBuf的源码

protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
    } finally {
        // 具体的释放方法
        ReferenceCountUtil.release(msg);
    }
} 

判断传过来的是否为ByteBuf,是的话才需要释放

public static boolean release(Object msg) {
	return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false;
} 
切片

ByteBuf切片是【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针

得到分片后的buffer后,要调用其retain方法,使其内部的引用计数加一。避免原ByteBuf释放,导致切片buffer无法使用

修改原ByteBuf中的值,也会影响切片后得到的ByteBuf

Netty基础

public class TestSlice {
    public static void main(String[] args) {
        // 创建ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);

        // 向buffer中写入数据
        buffer.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});

        // 将buffer分成两部分
        ByteBuf slice1 = buffer.slice(0, 5);
        ByteBuf slice2 = buffer.slice(5, 5);

        // 需要让分片的buffer引用计数加一
        // 避免原Buffer释放导致分片buffer无法使用
        slice1.retain();
        slice2.retain();
        
        ByteBufUtil.log(slice1);
        ByteBufUtil.log(slice2);

        // 更改原始buffer中的值
        System.out.println("===========修改原buffer中的值===========");
        buffer.setByte(0,5);

        System.out.println("===========打印slice1===========");
        ByteBufUtil.log(slice1);
    }
} 

运行结果

read index:0 write index:5 capacity:5
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05                                  |.....           |
+--------+-------------------------------------------------+----------------+
read index:0 write index:5 capacity:5
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 06 07 08 09 0a                                  |.....           |
+--------+-------------------------------------------------+----------------+
===========修改原buffer中的值===========
===========打印slice1===========
read index:0 write index:5 capacity:5
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 05 02 03 04 05                                  |.....           |
+--------+-------------------------------------------------+----------------+ 
优势
  • 池化思想 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
  • 读写指针分离,不需要像 ByteBuffer 一样切换读写模式
  • 可以自动扩容
  • 支持链式调用,使用更流畅
  • 很多地方体现零拷贝,例如
    • slice、duplicate、CompositeByteBuf

关于零拷贝文章来源地址https://www.toymoban.com/news/detail-435714.html

到了这里,关于Netty基础的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 从无到有手写一个基于Netty+Kyro+Zookeeper的RPC框架,javaweb面试题目整理

    从无到有手写一个基于Netty+Kyro+Zookeeper的RPC框架,javaweb面试题目整理

    通过这个简易的轮子,你可以学到 RPC 的底层原理和原理以及各种 Java 编码实践的运用。 你甚至可以把这个当做你的毕设/项目经验的选择,这是非常不错!对比其他求职者的项目经验都是各种系统,造轮子肯定是更加能赢得面试官的青睐。 介绍 这是一款基于 Netty+Kyro+Zookee

    2024年04月15日
    浏览(8)
  • Netty学习——源码篇9 Handler其他处理与异步处理

    Netty学习——源码篇9 Handler其他处理与异步处理

            每个ChannelHandler被添加到ChannelPipeline后,都会创建一个ChannelHandlerContext,并与ChannelHandler关联绑定。ChannelHandlerContext允许ChannelHandler与其他的ChannelHandler进行交互。ChannelHandlerContext不会改变添加到其中的ChannelHandler,因此它是安全的。ChannelHandlerContext、ChannelHandler、Cha

    2024年04月13日
    浏览(32)
  • linux 创建一个线程的基础开销探讨

    linux 创建一个线程的基础开销探讨

    测试方法比较笨,每修改一次线程数,就重新编译一次,再运行。在程序运行过程中,查看到进程 pid,然后通过以下命令查看进程的运行状态信息输出到以线程数为名字的日志文件中,最后用 vimdiff 对比文件来查看内存上的差异。 每多创建一个线程,虚拟内存增长 8M 左右(

    2024年02月13日
    浏览(23)
  • 构建异步高并发服务器:Netty与Spring Boot的完美结合

    构建异步高并发服务器:Netty与Spring Boot的完美结合

    「作者主页」 :雪碧有白泡泡 「个人网站」 :雪碧的个人网站 ChatGPT体验地址 在Java基础中,IO流是一个重要操作,先上八股 BIO:传统的IO,同步阻塞,一个连接一个线程。一般不怎么使用 AIO:JDK7引入的,异步非阻塞IO NIO:JDK1.4之后新的API,是多路复用,允许你一次性处理

    2024年02月03日
    浏览(18)
  • 11.Netty源码之线程模型

    11.Netty源码之线程模型

    NIO 的类库和 API 繁杂,使用麻烦:需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。 需要具备其他的额外技能:要熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的 NIO 程序。 开发工作量和难度都

    2024年02月16日
    浏览(11)
  • 2.1 搭建第一个区块链网络流程说明

    区块链技术正在迅速发展,成为创新和改变的关键驱动力。在此,我将向您介绍 如何在Ubuntu操作系统下搭建您的第一个区块链网络 。无论您是初学者还是有一定经验的开发者,通过本指南,您将了解搭建区块链网络的基本步骤和必要工具。     首先,确保您的计算机具备足

    2024年02月09日
    浏览(6)
  • Java基础:为什么hashmap是线程不安全的?

    HashMap 是线程不安全的主要原因是它的内部结构和操作不是线程安全的。下面是一些导致 HashMap 线程不安全的因素: 非同步操作:HashMap 的操作不是线程同步的,也就是说,在多线程环境下同时对 HashMap 进行读写操作可能会导致数据不一致的问题。 非原子操作:HashMap 的操作

    2024年02月10日
    浏览(8)
  • vue3 一个基于pinia简单易懂的系统权限管理实现方案,vue-router动态路由异步问题解决

    vue3 一个基于pinia简单易懂的系统权限管理实现方案,vue-router动态路由异步问题解决

    作为项目经验稀少的vue开发者来说,在关键技术点上的经验不多,我希望通过我的思想和实践,把好的东西分享在这里,目的是进一步促进技术交流。项目即将完成,权限是最后的收尾工作,好的权限实现方案,可以让我们没有后顾之忧,也可以提升项目的运行速度。 在开发

    2023年04月08日
    浏览(43)
  • 基于Java Socket写一个多线程的聊天室(附源码)

    基于Java Socket写一个多线程的聊天室(附源码)

    Socket编程是在TCP/IP上的网络编程,但是Socket在上述模型的什么位置呢。这个位置被一个天才的理论家或者是抽象的计算机大神提出并且安排出来 ​ 我们可以发现Socket就在应用程序的传输层和应用层之间,设计了一个Socket抽象层,传输层的底一层的服务提供给Socket抽象层,S

    2024年02月10日
    浏览(16)
  • 2.1Qt基础按钮控件

    2.1Qt基础按钮控件

    QT Creator UI 设计界面的按钮组截图如下: 以下是对按钮组控件的一些功能介绍: Push Button 按压按钮:最普通的按钮,按(点击)按钮命令计算机执行一些动作,或者回答问题,比如 windows 开始菜单里的重启,注销,关机等按钮。 Tool Button 工具按钮:工具按钮通常是一个集合,一

    2024年02月06日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包