java整合WebSocket

这篇具有很好参考价值的文章主要介绍了java整合WebSocket。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、WebSocket介绍

1、简介

WebSocket协议通过在客户端和服务端之间提供全双工通信来进行Web和服务器的交互功能。在WebSocket应用程序中,服务器发布WebSocket端点,客户端使用url连接到服务器。建立连接后,服务器和客户端就可以互相发送消息。客户端通常连接到一台服务器,服务器接受多个客户端的连接。

2、优势

HTPP协议是基于请求响应模式,并且无状态的。HTTP通信只能由客户端发起,HTTP 协议做不到服务器主动向客户端推送信息。
如果我们想要查询当前的排队情况,只能是页面轮询向服务器发出请求,服务器返回查询结果。轮询的效率低,非常浪费资源(因为必须不停连接,或者 HTTP 连接始终打开)

3、服务端注解

@ServerEndpoint(“/websocket/{uid}”)
申明这是一个websocket服务;
需要指定访问该服务的地址,在地址中可以指定参数,需要通过{}进行占位;

@OnOpen
用法:public void onOpen(Session session, @PathParam(“uid”) String uid) throws IOException{}
该方法将在建立连接后执行,会传入session对象,就是客户端与服务端建立的长连接通道,通过@PathParam获取url中声明的参数;

@OnClose
用法:public void onClose() {}
该方法是在连接关闭后执行;

@OnMessage
用法:public void onMessage(String message, Session session) throws IOException {}
该方法用于接收客户端发送的消息;
message:发来的消息数据;
session:会话对象(也是长连接通道);
发送消息到客户端;
用法:session.getBasicRemote().sendText(“hello,websocket.”);
通过session进行消息发送;

二、springboot整合

1、引入依赖

		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

2、配置

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

3、业务代码

>>群聊

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

@Component
@Slf4j
@ServerEndpoint(value = "/api/pushMessageMulti/{roomId}/{userId}",encoders = { ServerEncoder.class })
public class WebSocketServerMulti {

    /**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/
    private static int onlineCount = 0;
    /**concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。*/
    private static Map<String, WebSocketServerMulti> userMap = new ConcurrentHashMap<>();
    //存放房间对象
    private static Map<String, Set<WebSocketServerMulti>> roomMap = new ConcurrentHashMap<>();
    /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
    private Session session;
    /**接收userId*/
    private String userId = "";
    //存出当前群聊在线的人数(使用原因:roomMap中无法获取到当前用户id)
    private static Map<String, List<String>> multiUser = new ConcurrentHashMap<>();




    /**
     * 连接建立成
     * 功调用的方法
     */
    @OnOpen
    public void onOpen(Session session,@PathParam("roomId") String roomId , @PathParam("userId") String userId) throws IOException, EncodeException {
        synchronized (session){
            try {
                this.session = session;
                this.userId=userId;
                userMap.put(userId,this);
                if (!roomMap.containsKey(roomId)) {
                    Set<WebSocketServerMulti> set = new HashSet<>();
                    set.add(userMap.get(userId));
                    roomMap.put(roomId,set);

                    List<String> dd = new LinkedList<>();
                    dd.add(userId);
                    multiUser.put(roomId,dd);
                } else {
                    if(multiUser.get(roomId).contains(userId)){

                        log.info("存在:房间号:"+roomId+"用户连接:"+userId+",当前在线人数为:" + multiUser.get(roomId).size());
                    }else{
                        multiUser.get(roomId).add(userId);

                        roomMap.get(roomId).add(this);
                    }
                }
                System.out.println(multiUser.get(roomId).size());
                log.info("房间号:"+roomId+"用户连接:"+userId+",当前在线人数为:" + multiUser.get(roomId).size());
                Map<String,Object> map = new HashMap<>();
                map.put("online_num",multiUser.get(roomId).size());//在线人数
                map.put("online_list",roomMap.get(roomId));//人员列表
                map.put("roomId",roomId);//群id
                map.put("message","用户"+***+"加入群聊");//消息
				//自定义发送消息
                sendMessageObject(map,roomId);
            }catch (Exception e){
                e.printStackTrace();
            }

        }
    }

    /**
     * 连接关闭@PathParam("userId") String userId
     * 调用的方法
     */
    @OnClose
    public void onClose( @PathParam("roomId") String roomId,@PathParam("userId") String userId) {
        try {
            if (roomMap.containsKey(roomId)) {
                Set<WebSocketServerMulti> set = roomMap.get(roomId);
                Iterator<WebSocketServerMulti> it = set.iterator();
                while (it.hasNext()) {
                    if (it.next().userId.equals(userId)) {
                        it.remove();
                    }
                }

                multiUser.get(roomId).remove(userId);
                log.info("房间号:"+roomId+"用户退出:"+userId+",当前在线人数为:" + multiUser.get(roomId).size());
                Map<String,Object> map = new HashMap<>();
                map.put("online_num",multiUser.get(roomId).size());//在线人数
                map.put("online_list",roomMap.get(roomId));//人员列表
                map.put("roomId",roomId);//群id
                map.put("message","用户"+***+"加入群聊");//消息
				//自定义发送消息
                sendMessageObject(map,roomId);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    /**
     * 收到客户端消
     * 息后调用的方法
     * @param message
     * 客户端发送过来的消息
     **/
    @OnMessage
    public void onMessage(String message) {
        //注意,要给session加上同步锁,否则会出现多个线程同时往同一个session写数据,导致报错的情况。
        synchronized (session){
            //可以群发消息
            if(StringUtils.isNotBlank(message)){
                try {
                    //解析发送的报文
                    JSONObject jsonObject = JSONObject.parseObject(message);
                    //追加发送人(防止串改)
                    jsonObject.put("fromUserId",this.userId);
                    int chatType=jsonObject.getInteger("chatType");
                    String myUserId=jsonObject.getString("myUserId");
                    String toRoomId=jsonObject.getString("toRoomId");
                    log.info("房间号:"+toRoomId+"用户消息:"+userId+",报文:"+message);
                    sendMessageTo(message,toRoomId);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 群聊
     * @param message 消息
     * @param roomId 房间号
     */
    public void sendMessageTo(String message , String roomId) throws IOException {
        if (roomMap.containsKey(roomId)) {
            for (WebSocketServerMulti item : roomMap.get(roomId)) {
                    item.session.getAsyncRemote().sendText(message);
            }
        }
    }


    /**
     * @param error
     */
    @OnError
    public SystemResult onError(Throwable error) {

        log.error("用户错误:"+this.userId+",原因:"+error.getMessage());

        ChatError chatError = new ChatError();
        chatError.setUserId(Integer.valueOf(this.userId));
        chatError.setDetails(error.getMessage());
        chatError.setAddTime(new Date());
        chatErrorMapper.insert(chatError);
        SystemResult systemResult = new SystemResult();
        return systemResult.error(error.getMessage());

    }

    /**
     * 实现服务
     * 器主动推送
     */
    public void sendMessage(String message) {
        try {
            this.session.getBasicRemote().sendText(message);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 实现服务传送Object类型
     * 器主动推送
     */
    public void sendMessageObject(Object message,String roomId) {
        try {
            if (roomMap.containsKey(roomId)) {
                for (WebSocketServerMulti item : roomMap.get(roomId)) {
                    item.session.getBasicRemote().sendObject(message);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * 获得此时的
     * 在线人数
     * @return
     */
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    /**
     * 在线人
     * 数加1
     */
    public static synchronized void addOnlineCount() {
        WebSocketServerMulti.onlineCount++;
    }

    /**
     * 在线人
     * 数减1
     */
    public static synchronized void subOnlineCount() {
        WebSocketServerMulti.onlineCount--;
    }

}

>>单人聊天

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;


@Component
@Slf4j
@ServerEndpoint("/api/pushMessageSolo/{userId}")
public class WebSocketServerSolo {

    /**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/
    private static int onlineCount = 0;
    /**concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。*/
    private static ConcurrentHashMap<String, WebSocketServerSolo> webSocketMap = new ConcurrentHashMap<>();
    /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
    private Session session;
    /**接收userId*/
    private String userId = "";

    /**
     * 连接建立成
     * 功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        this.session = session;
        this.userId=userId;
        if(webSocketMap.containsKey(userId)){
            webSocketMap.remove(userId);
            //加入set中
            webSocketMap.put(userId,this);
        }else{
            //加入set中
            webSocketMap.put(userId,this);
            //在线数加1
            addOnlineCount();
        }
        log.info("用户连接:"+userId+",当前在线人数为:" + getOnlineCount());
        sendMessage("连接成功");
    }

    /**
     * 连接关闭
     * 调用的方法
     */
    @OnClose
    public void onClose() {
        if(webSocketMap.containsKey(userId)){
            webSocketMap.remove(userId);
            //从set中删除
            subOnlineCount();
        }
        log.info("用户退出:"+userId+",当前在线人数为:" + getOnlineCount());
    }

    /**
     * 收到客户端消
     * 息后调用的方法
     * @param message
     * 客户端发送过来的消息
     **/
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("用户消息:"+userId+",报文:"+message);
        //可以群发消息
        //消息保存到数据库、redis
        if(StringUtils.isNotBlank(message)){
            try {
                //解析发送的报文
                JSONObject jsonObject = JSONObject.parseObject(message);
                //追加发送人(防止串改)
                jsonObject.put("fromUserId",this.userId);
                String toUserId=jsonObject.getString("toUserId");
                String myUserId=jsonObject.getString("myUserId");
                //传送给对应toUserId用户的websocket
                if(StringUtils.isNotBlank(toUserId)&&webSocketMap.containsKey(toUserId)){
                    webSocketMap.get(toUserId).sendMessage(message);
                }else{
                    //否则不在这个服务器上,发送到mysql或者redis
                    log.error("请求的userId:"+toUserId+"不在该服务器上");
                }
                if(StringUtils.isNotBlank(myUserId)&&webSocketMap.containsKey(myUserId)){
                    webSocketMap.get(myUserId).sendMessage(message);
                }else{
                    //否则不在这个服务器上,发送到mysql或者redis
                    log.error("请求的userId:"+myUserId+"不在该服务器上");
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }


    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {

        log.error("用户错误:"+this.userId+",原因:"+error.getMessage());
        error.printStackTrace();
    }

    /**
     * 实现服务
     * 器主动推送
     */
    public void sendMessage(String message) {
        try {
            this.session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     *发送自定
     *义消息
     **/
    public static void sendInfo(String message, String userId) {
        log.info("发送消息到:"+userId+",报文:"+message);
        if(StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)){
            webSocketMap.get(userId).sendMessage(message);
        }else{
            log.error("用户"+userId+",不在线!");
        }
    }

    /**
     * 获得此时的
     * 在线人数
     * @return
     */
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    /**
     * 在线人
     * 数加1
     */
    public static synchronized void addOnlineCount() {
        WebSocketServerSolo.onlineCount++;
    }

    /**
     * 在线人
     * 数减1
     */
    public static synchronized void subOnlineCount() {
        WebSocketServerSolo.onlineCount--;
    }

}

三、部署websocket项目问题

1、webSocket功能失效

本地开发的时候都可以正常使用,但是在部署到nginx代理服务器的时候发现报了错误,连不上,报错:Error in connection establishment: net::ERR_NAME_NOT_RESOLVED
发现是nginx服务器默认是不打开webSocket的功能的,这需要我们在nginx服务器上配置:

 location /test/ {
                proxy_pass http://test.com;
                proxy_redirect default;
                proxy_set_header Upgrade $http_upgrade; # allow websockets
                proxy_set_header Connection "upgrade";
                proxy_http_version 1.1;
                }

2、断线重连

如果nginx没有设置读取超时时间,websocket会一直断线重连,大约一分钟重连一次
可以设置长时间得超时时间,避免一直断线重连,避免消耗内存文章来源地址https://www.toymoban.com/news/detail-720058.html

location /test{
	    proxy_pass  http://test.com;
	    proxy_set_header Upgrade $http_upgrade; # allow websockets
    	proxy_set_header Connection "upgrade";
    	proxy_http_version 1.1;
        proxy_connect_timeout 60s;#l连接超时时间,不能设置太长会浪费连接资源
	    proxy_read_timeout 500s;#读超时时间
	    proxy_send_timeout 500s;#写超时时间
            index  index.html index.htm;
        }


到了这里,关于java整合WebSocket的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • java springboot 整合webSocket接入调用chatGPT3.5接口实现自由返回

    java springboot 中使用webSocket接入openAI接口调用chatGPT3.5接口实现自由返回 @Component @Anonymous @ServerEndpoint(“/websocket/{id}”) // 访问路径: ws://localhost:8080/websocket public class WebSocketServer { // try { // sendMessage(“WebSocket连接成功”); // } catch (Exception e) { // // } } /** * 发送消息 * @param message 要

    2024年02月14日
    浏览(12)
  • java之路 —— Shiro与Springboot整合开发

    java之路 —— Shiro与Springboot整合开发

    在 Spring Boot 中做权限管理,一般来说,主流的方案是 Spring Security ,但是,仅仅从技术角度来说,也可以使用 Shiro。 在 Spring Boot 中做权限管理,一般来说,主流的方案是 Spring Security ,但是,仅仅从技术角度来说,也可以使用 Shiro。 在整合之前,让我们先来了解一下Shiro开发

    2024年02月11日
    浏览(8)
  • 《Java Web轻量级整合开发入门》学习笔记

    《Java Web轻量级整合开发入门》学习笔记

    轻量级Java Web整合开发 第一章 轻量级Java Web开发概述 1.2  java web 开发概述 1.JSP是一种编译执行的前台页面技术。对于每个JSP页面,Web服务器都会生成一个相应的Java文件,然后再编译该Java文件,生成相应的Class类型文件。在客户端访问到的JSP页面,就是相应Class文件执行的结果

    2024年02月08日
    浏览(46)
  • 【WebSocket】SpringBoot整合WebSocket实现聊天室(一)

    【WebSocket】SpringBoot整合WebSocket实现聊天室(一)

    目录 一、准备 1、引入依赖 2、创建配置类 二、相关注解 首先我们需要在项目中引入依赖,有两种方式。第一种我们可以在创建Spring Boot项目时搜索WebSocket然后勾选依赖 第二种是我们可以直接在项目的pom.xml文件中插入以下依赖 我们需要进行如下配置 ServerEndpointExporter 是一个

    2024年02月13日
    浏览(44)
  • WebSocket--整合springboot

    目录 握手拦截器 WebSocket处理程序 HttpSessionHandshakelnterceptor (抽象类):   握手拦截器,在握手前后添加操作 AbstractWebSocketHandler (抽象类) :   WebSocket处理程序,监听连接前,连接中,连接后WebSocketConfigurer (接口):    配置程序,比如配置监听哪个端口,上面的握手拦截器,处理

    2024年01月16日
    浏览(16)
  • springboot整合websocket教程

    Websocket是一种网络协议,它允许浏览器和服务器之间进行实时双向数据传输。 在本教程中,我们将创建一个简单的聊天应用程序,使用Websocket实现实时通信。 添加依赖 我们需要添加Spring Boot Websocket依赖,以便我们可以使用Spring Boot中提供的Websocket支持。 配置WebSocket 在Sprin

    2023年04月23日
    浏览(10)
  • 大华摄像头实时预览(spring boot+websocket+flv.js)Java开发

    大华摄像头实时预览(spring boot+websocket+flv.js)Java开发

    1.大华NetSDK_JAVA; 这里使用的是 Linux64的架包 2.websocket 前端使用的vue框架    3.flv.js的播放插件     4.大华摄像头提供的平台(后面称为官方平台) 根据大华《NetSDK_JAVA编程指导手册》的流程图 根据图可以得知关键流程为: 初始化sdk——登录设备——打开实时预览——设置视

    2024年02月04日
    浏览(20)
  • SpringBoot整合WebSocket详细教程

    SpringBoot整合WebSocket详细教程

    共开启两个页面,实现一对一聊天。 服务端代码:https://gitee.com/lianaozhe/springboot-websocket.git 导入相关依赖: WebSocketConfig配置类: WebSocket操作类: TestController测试接口类: test.html文件: 复制test.html文件为test2.html文件,将上面的userId由’20’改为’10’,后面测试使用。 运行服

    2024年02月01日
    浏览(12)
  • ruoyi若依整合websocket

    ruoyi若依整合websocket

    注:本文档中的ruoyi框架为前后 不分离 版本,nginx配置与前后分离版有所不同。 一、导pom,版本需与springboot版本一致  如果是新建module,则需要在ruoyi-admin的pom.xml中导入新建的module,否则注解不生效 二、注册websocket 三、创建拦截器 MyWebSocketInterceptor 四、创建处理器 MyWebSo

    2024年02月06日
    浏览(10)
  • [超详细]SpringBoot整合WebSocket

    WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,它允许在浏览器和服务器之间进行实时的、双向的通信。相对于传统的基于请求和响应的 HTTP 协议,WebSocket 提供了一种更有效、更实时的通信方式,适用于需要实时更新、实时通知和实时交互的应用。 WebSocket 的一些关

    2024年02月11日
    浏览(9)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包