Java netty发送接收(TCP、UDP)

这篇具有很好参考价值的文章主要介绍了Java netty发送接收(TCP、UDP)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

最下方附项目地址

依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
</dependency>

yml配置

gps:
  netty:
    tcp:
      port: 8888
      read-timeout: 15 #读超时 15分钟
    udp:
      port: 7777
    threads:
      boss: 1
      worker: 4
      business:
        num: 1 #业务线程数量
        max-pending: 100000

配置类

@Configuration
public class EventLoopGroupConfig {

    @Value("${gps.netty.threads.boss}")
    private int bossNum;

    @Value("${gps.netty.threads.worker}")
    private int workerNum;

    @Value("${gps.netty.threads.business.num}")
    private int businessNum;

	@Value("${gps.netty.threads.business.max-pending}")
    private int maxPending;


    /**
     * TCP连接处理
     * @return
     */
    @Bean(name = "bossGroup")
    public NioEventLoopGroup bossGroup() {
        return new NioEventLoopGroup(bossNum);
    }

    /**
     * Socket数据读写
     * @return
     */
    @Bean(name = "workerGroup")
    public NioEventLoopGroup workerGroup() {
        return new NioEventLoopGroup(workerNum);
    }

    /**
     * Handler业务处理
     * @return
     */
    @Bean(name = "businessGroup")
    public EventExecutorGroup businessGroup() {
        return new DefaultEventExecutorGroup(businessNum,new BusinessThreadFactory(),maxPending, RejectedExecutionHandlers.reject());
    }
    
    static class BusinessThreadFactory implements ThreadFactory {
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        BusinessThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "business-thread-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon()){
                t.setDaemon(false);
            }

            if (t.getPriority() != Thread.NORM_PRIORITY){
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }

}

TcpServer

@Slf4j
@Component
public class NettyTcpServer implements ApplicationListener<ApplicationStartedEvent> {

    @Value("${gps.netty.tcp.port}")
    private int port;
    
    @Value("${gps.netty.tcp.read-timeout}")
    private int readTimeOut;

    @Autowired
    @Qualifier("bossGroup")
    private NioEventLoopGroup bossGroup;

    @Autowired
    @Qualifier("workerGroup")
    private NioEventLoopGroup workerGroup;
    
    @Autowired
    @Qualifier("businessGroup")
    private EventExecutorGroup businessGroup;

    @Autowired
    private TcpServerHandler tcpServerHandler;

    /**
     * 启动Server
     *
     */
    @Override
	public void onApplicationEvent(ApplicationStartedEvent event) {
    	try {
	        ServerBootstrap serverBootstrap = new ServerBootstrap();
	        serverBootstrap.group(bossGroup, workerGroup)
	                .channel(NioServerSocketChannel.class)
	                .childHandler(new ChannelInitializer<SocketChannel>() { //
						@Override
						public void initChannel(SocketChannel ch) throws Exception {
							ch.pipeline().addLast(new IdleStateHandler(readTimeOut, 0, 0, TimeUnit.MINUTES));
							// 1024表示单条消息的最大长度,解码器在查找分隔符的时候,达到该长度还没找到的话会抛异常
							ch.pipeline().addLast(
									new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(new byte[] { MsgUtil.DELIMITER }),
											Unpooled.copiedBuffer(new byte[] { MsgUtil.DELIMITER, MsgUtil.DELIMITER })));
							ch.pipeline().addLast(businessGroup,tcpServerHandler);
						}
					})
	                .option(ChannelOption.SO_BACKLOG, 1024) //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
	                .childOption(ChannelOption.TCP_NODELAY, true)//立即写出
	                .childOption(ChannelOption.SO_KEEPALIVE, true);//长连接
	        ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
	        if (channelFuture.isSuccess()) {
				log.info("TCP服务启动完毕,port={}", port);
	        }
    	}catch(Exception e) {
			log.info("TCP服务启动失败", e);
    	}
    }

    /**
     * 销毁资源
     */
    @PreDestroy
    public void destroy() {
        bossGroup.shutdownGracefully().syncUninterruptibly();
        workerGroup.shutdownGracefully().syncUninterruptibly();
        log.info("TCP服务关闭成功");
    }
}

UdpServer

@Slf4j
@Configuration
public class NettyUdpServer implements ApplicationListener<ApplicationStartedEvent> {

    @Value("${gps.netty.udp.port}")
    private int port;

    @Resource
    private UdpServerHandler udpServerHandler;

    private EventLoopGroup group = null;

    @Override
    public void onApplicationEvent(@NonNull ApplicationStartedEvent event) {

        try {
            Bootstrap b = new Bootstrap();
            String osName= SystemPropertyUtil.get("os.name").toLowerCase();
            if("linux".equals(osName)) {
                group = new EpollEventLoopGroup();
                b.group(group)
                        .channel(EpollDatagramChannel.class);
            }else {
                group = new NioEventLoopGroup();
                b.group(group)
                        .channel(NioDatagramChannel.class);
            }
            //广播
            b.option(ChannelOption.SO_BROADCAST, true)
                    //接收缓存区  10M
                    .option(ChannelOption.SO_RCVBUF, 1024 * 1024 * 10 )
                    //发送缓存区  10M
                    .option(ChannelOption.SO_SNDBUF, 1024 * 1024 * 10 )
                    .handler(udpServerHandler);

            ChannelFuture channelFuture = b.bind(port).sync();
            if (channelFuture.isSuccess()) {
                log.info("UDP服务启动完毕,port={}", port);
            }

        } catch (InterruptedException e) {
            log.info("UDP服务启动失败", e);
        }

    }

    /**
     * 销毁资源
     */
    @PreDestroy
    public void destroy() {
        if(group!=null) {
            group.shutdownGracefully();
        }
        log.info("UDP服务关闭成功");
    }
}

TcpHandler

@Slf4j
@ChannelHandler.Sharable
@Component
public class TcpServerHandler extends SimpleChannelInboundHandler<ByteBuf> {

	@Autowired
	private KafkaSender kafkaSender;

	@Autowired
	@Qualifier("businessGroup")
	private EventExecutorGroup businessGroup;

    /**
     * 使用
     * @param ctx
     * @param byteBuf
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) {
		String content = byteBuf.toString(StandardCharsets.UTF_8);

		log.info("TCP服务端接收到消息:{}",  content);


		ByteBuf buf = Unpooled.copiedBuffer("TCP已经接收到消息:".getBytes(StandardCharsets.UTF_8));

		businessGroup.execute(()->{
			try {
				kafkaSender.sendMessage("hello", content);
				send2client(ctx,buf.array());
			}catch(Throwable e) {
				log.error("TCP数据接收处理出错",e);
				ByteBuf err = Unpooled.copiedBuffer("系统错误:".getBytes(StandardCharsets.UTF_8));
				send2client(ctx,err.array());
			}
		});

    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    	log.error("TCP数据接收处理出错:",cause);
    }

    /**
     * 返回消息给客户端
     * @param ctx
     * @param msg
     */
    void send2client(ChannelHandlerContext ctx, byte[] msg) {
    	ByteBuf buf= Unpooled.buffer(msg.length+1);
    	buf.writeBytes(msg);
    	buf.writeByte(MsgUtil.DELIMITER);
    	ctx.writeAndFlush(buf).addListener(future->{
    		if(!future.isSuccess()) {
				log.error("TCP发送给客户端消息失败");
    		}
    	});
    }
}

 UdpHandler

@ChannelHandler.Sharable
@Component
@Slf4j
public class UdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {

    @Autowired
    @Qualifier("businessGroup")
    private EventExecutorGroup businessGroup;

   @Autowired
   private KafkaSender kafkaSender;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
        String content = packet.content().toString(StandardCharsets.UTF_8);

        log.info("UDP服务端接收到消息:{}",  content);


        ByteBuf buf = Unpooled.copiedBuffer("UDP已经接收到消息:".getBytes(StandardCharsets.UTF_8));


        businessGroup.execute(()->{
            try {
                kafkaSender.sendMessage("hello", content);
                ctx.writeAndFlush(new DatagramPacket(buf, packet.sender()));
            }catch(Throwable e) {
                log.info("UDP数据接收处理出错{}",  e);
            }
        });
    }

Tcp消息发送

@Slf4j
@RestController
public class TcpClientController {

    @Value("${gps.netty.tcp.port}")
    private int port;

    @PostMapping("sendTcp")
    public String send(String msg){
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        Channel ch =null;
        TcpServerHandler handler=new TcpServerHandler();

        try {
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline().addLast(
                                    new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(new byte[] { 0x7e }),
                                            Unpooled.copiedBuffer(new byte[] { 0x7e, 0x7e })));
                            ch.pipeline().addLast(handler);
                        }});
            ch =b.connect("localhost", port).sync().channel();

        } catch (Exception e) {
            e.printStackTrace();
        }

        ByteBuf tcpMsg = Unpooled.copiedBuffer(msg.getBytes(StandardCharsets.UTF_8));

        ByteBuf buf= Unpooled.buffer(msg.length() + 1);
        try{
            log.info("TCP客户端发送消息:{}", msg);
            buf.writeBytes(tcpMsg);//消息体
            buf.writeByte(MsgUtil.DELIMITER);//消息分割符
            ch.writeAndFlush(buf).sync();
        }catch (Exception e){
            log.info("TCP客户端发送消息失败:{}", e);
        }

        //关闭链接
        group.shutdownGracefully();
        return "success";
    }

Udp消息发送

@Slf4j
@RestController
public class UdpClientController {

    @Value("${gps.netty.udp.port}")
    private int port;

    @PostMapping("sendUdp")
    public String send(String msg){
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        Channel ch =null;
        UdpServerHandler handler=new UdpServerHandler();

        try {
            b.group(group)
                    .channel(NioDatagramChannel.class)
                    .option(ChannelOption.SO_BROADCAST, true)
                    .handler(new ChannelInitializer<NioDatagramChannel>() {

                        @Override
                        protected void initChannel(NioDatagramChannel ch) throws Exception {
                            ch.pipeline().addLast(handler.getClass().getSimpleName(),handler);
                        }});
            ch =b.bind(0).sync().channel();
        } catch (Exception e) {
            e.printStackTrace();
        }

        ByteBuf buf = Unpooled.copiedBuffer(msg.getBytes(StandardCharsets.UTF_8));

        try{
            log.info("UDP客户端发送消息:{}", msg);
            ch.writeAndFlush(new DatagramPacket(
                    Unpooled.copiedBuffer(buf.array()),
                    SocketUtils.socketAddress("localhost", port))).sync();
        }catch (Exception e){
            log.info("UDP客户端发送消息失败:{}", e);
        }

        //关闭链接
        group.shutdownGracefully();
        return "success";
    }
}

项目地址https://gitee.com/xn-mg/netty_kafka文章来源地址https://www.toymoban.com/news/detail-595660.html

到了这里,关于Java netty发送接收(TCP、UDP)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • TCP/IP UDP广播无法发送或接收

    在看《TCP/IP 网络编程》这本书的时候,看到广播那一节,跟着书上写代码,怎么写都不行,广播就是没法发送/接收,发送端一直在发送数据,接收端就是没有反应。 对了好几遍源码,没有问题。实在是愁人。 最后查了很多资料,确定是网卡的问题。 现在的计算机都是多网

    2024年02月04日
    浏览(8)
  • Linux 下 nc 发送接收 udp、tcp数据

    Linux 下 nc 发送接收 udp、tcp数据

    nc,全名叫 netcat,它可以用来完成很多的网络功能,譬如端口扫描、建立TCP/UDP连接,数据传输、网络调试等等,因此,它也常被称为网络工具的 瑞士军刀 。 一、只服务端使用nc 备注:这种方式只能发送一次数据,不能互相发送 1、udp发送接收数据,udp本质不区分客户端服务

    2024年02月16日
    浏览(13)
  • C/C++ Socket UDP 广播消息的发送与接收

    C/C++ Socket UDP 广播消息的发送与接收

    局域网内全网段广播消息的IP地址为:255.255.255.255,向该IP地址发送广播消息,局域网下的任何网段的客户机都能收到广播。 对于发送端,如果你只想给某个特定的网段发送消息,例如你的IP地址为192.168.31.107,那么你的广播地址是192.168.31.255,向该广播地址发送广播消息,只

    2024年02月12日
    浏览(12)
  • 如何在前端实现WebSocket发送和接收UDP消息(多线程模式)

    如何在前端实现WebSocket发送和接收UDP消息(多线程模式)

    本文将继续介绍如何在前端应用中利用WebSocket技术发送和接收UDP消息,并引入多线程模式来提高发送效率和性能。我们将使用JavaScript语言来编写代码,并结合WebSocket API、UDP数据包、Web Workers和UDP消息监听器来实现这一功能。 首先,我们需要在前端应用中建立一个WebSocket连接

    2024年02月12日
    浏览(12)
  • java后端使用websocket实现与客户端之间接收及发送消息

    客户端请求websocket接口,连接通道=》我这边业务成功客户端发消息=》客户端自动刷新。 接口:ws://localhost:8080/websocket/xx 经测试,成功 如果是线上服务器连接,则需要在nginx里配置websocket相关内容,再重启nginx,代码如下 本地连接的时候用的是ws://,因为是http链接,但是如果是

    2024年02月16日
    浏览(19)
  • JAVA Socket实现实时接收TCP消息,让你的服务端通信更高效!

    JAVA Socket实现实时接收TCP消息,让你的服务端通信更高效!

    本文主要介绍如何利用socket实现实时接收服务端发送的TCP消息。 目录 一、需要掌握 二、程序源码 三、运行演示 网络调试助手下载:https://www.aliyundrive.com/s/6Y8L7Wv5sT6 网络通信协议的理解:JAVA socket是基于TCP/IP协议实现的,需要对TCP/IP协议有一定的了解,包括TCP连接的建立、数

    2024年02月11日
    浏览(11)
  • 如何在前端实现WebSocket发送和接收TCP消息(多线程模式)

    如何在前端实现WebSocket发送和接收TCP消息(多线程模式)

    当在前端实现WebSocket发送和接收TCP消息时,可以使用以下步骤来实现多线程模式。本文将详细介绍如何在前端实现WebSocket发送和接收TCP消息,并解释使用到的相关函数及原理。 在前端实现WebSocket发送和接收TCP消息的第一步是创建一个WebSocket连接。我们可以使用浏览器提供的

    2024年02月12日
    浏览(14)
  • 【Java发送邮箱】spring boot 发送邮箱

    【Java发送邮箱】spring boot 发送邮箱

    打开网页版的QQ邮箱, 登录邮箱,进入设置-》帐户 然后,在“帐户”设置中,找到服务设置项,进行设置,如下: 开启POP3/SMTP服务器,验证密保   用正确的手机好发送正确的验证内容到指定的号码,成功获取授权码

    2024年01月23日
    浏览(47)
  • 探索Java中最常用的框架:Spring、Spring MVC、Spring Boot、MyBatis和Netty

    探索Java中最常用的框架:Spring、Spring MVC、Spring Boot、MyBatis和Netty

    🎉欢迎来到Java面试技巧专栏~探索Java中最常用的框架:Spring、Spring MVC、Spring Boot、MyBatis和Netty ☆* o(≧▽≦)o *☆嗨~我是IT·陈寒🍹 ✨博客主页:IT·陈寒的博客 🎈该系列文章专栏:Java面试技巧 📜其他专栏:Java学习路线 Java面试技巧 Java实战项目 AIGC人工智能 数据结构学习

    2024年02月08日
    浏览(17)
  • java实现阿里云rocketMQ消息的发送与消费(tcp协议sdk)

    java实现阿里云rocketMQ消息的发送与消费(tcp协议sdk)

    登录阿里云官网,先申请rocketMQ,再申请Topic、Group ID,然后就是参考阿里云的JAVA SDK进行编程实现。 环境要求: 安装JDK 1.8或以上版本 安装Maven 安装Java SDK 参照 阿里云 官方文档,来一步一步操作。 文档提供的SDK有 TCP 和Http协议,这里使用 TCP协议 来实现rocketMQ消息的发送与消

    2024年02月07日
    浏览(10)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包