一.目的
本着服务和业务逻辑分离的思想,将 websocket 服务进行项目分离,不再集成在 web 服务中(业务端),很大的好处是,当发布 web 服务的时候,websocket 的连接不会断开,保证了线上用户不受服务中断的影响
并且 websocket 的连接只用于 客户端 消息的接收, 客户端 消息的发送由 web 服务进行处理,并通过 redis 消息订阅 的功能去推送消息(而不是 websocket 连接里面去推送消息),保证了消息发送的稳定性(毕竟 ws 在网络不好的情况下很容易断连)
二.设计思路
代码: Github
1.提供一个 api 获取 ws的链接,并使用 Appid,APPsecret 作为请求方的校验
请求方带入 clientId 的参数作为用户凭据,然后接口通过给出 ws 链接(并且带上token,并且设置时效性,保证安全性)所以如果 ws 断连后,需要重新获取 ws 链接进行重连
[HttpPost] public BaseResultOutput GetWsLink(GetWsServerInput input) { BaseResultOutput baseResultOutput = new BaseResultOutput(); //授权校验 if (auth == null) { auth = GetAuthConfig(); } if (auth.IsCheck && (auth.AppId != input.AppId || auth.AppSecret != input.AppSecret)) { baseResultOutput.Code = ResultCode.Error; baseResultOutput.Message = "AppId 或 AppSecret 错误"; return baseResultOutput; } var token = $"{Guid.NewGuid()}{Guid.NewGuid()}".Replace("-", ""); if (string.IsNullOrEmpty(input.ClientId)) { baseResultOutput.Code = Enum.Base.ResultCode.Error; baseResultOutput.Message = "参数异常"; return baseResultOutput; } //token值 var token_value = new ImUserTokenModel() { ClientId = input.ClientId }; //生成token校验数据和60s内连接的有效期 RedisHelper.Set($"IM_Token_{token}", $"{StringHelper.ToJson(token_value)}", 60); //返回ws链接 string host = ""; if (Request.IsHttps) { //配置文件获取ws服务的域名或ip host = $"wss://{config.Host}/imserver/ws?token={token}"; } else { host = $"ws://{config.Host}/imserver/ws?token={token}"; } var getWsServerOutput = new GetWsServerOutput() { Url = host }; baseResultOutput.Data = getWsServerOutput; return baseResultOutput; }
2.请求方获取到正确 ws连接 后进行连接,这时候我们使用 .net core 原生的 websocket 服务去实现
具体可参考MSDN文档
因为我们使用了带 token 参数的链接,所以我们还需要进行对 token 的校验
SocketHandle.cs
static async Task Acceptor(HttpContext context, Func n) { if (context.WebSockets.IsWebSocketRequest) { string token = context.Request.Query["token"]; if (string.IsNullOrEmpty(token)) return; //校验token有效性(token从业务端请求进行生成,服务端进行校验) var token_value_str = RedisHelper.Get($"IM_Token_{token}"); if (string.IsNullOrEmpty(token_value_str)) return; var token_value = StringHelper.Json2Object(token_value_str); var socket = await context.WebSockets.AcceptWebSocketAsync(); var h = new SocketHandle(socket, token_value.ClientId); AddWebsocket(token_value.ClientId, socket); await h.EchoLoop(); } else { context.Response.StatusCode = 400; } } async Task EchoLoop() { var buffer = new byte[BufferSize]; var seg = new ArraySegment(buffer); await SendMsg(this.socket, "Link", "连接成功"); try { while (this.socket.State == WebSocketState.Open) { WebSocketMsgOutput webSocketMsgModel = new WebSocketMsgOutput(); var incoming = await this.socket.ReceiveAsync(seg, CancellationToken.None); } this.socket.Abort(); } catch (Exception e) { var outgoing = Encoding.UTF8.GetBytes(e.Message); await SendMsg(this.socket, "Msg", outgoing); } //移除在线ws记录 RemoveWebsocket(this.clientId); }
3.然后,在上下线的时候,添加记录,并且进行上下线消息的订阅推送(ws服务推送,web服务订阅)
// 添加ws在线记录 private static void AddWebsocket(string clientId, WebSocket webSocket) { var data = webSocketList.Count(p => p.Key == clientId); if (data == 0) { webSocketList.GetOrAdd(clientId, cli => new WebSocketListModel { WebSocket = webSocket }); } else { //如果连接已存在,将已存在的移除,建立新的,保证新的连接可以收到消息 RemoveWebsocket(clientId); webSocketList.GetOrAdd(clientId, cli => new WebSocketListModel { WebSocket = webSocket }); } //订阅上线通知 var onlinedata = new { Type = "Online", Client = clientId }; RedisHelper.Publish("onoffline_msg", StringHelper.ToJson(onlinedata)); } // 移除ws在线记录 private static void RemoveWebsocket(string clientId) { webSocketList.TryRemove(clientId, out var oldcli); //订阅下线通知 var data = new { Type = "OffLine", Client = clientId }; RedisHelper.Publish("onoffline_msg", StringHelper.ToJson(data)); }
4.最后,因为我们的消息也是通过 redis 的订阅推送获取过来的(web服务推送,ws服务订阅),所以我们也需要在 ws 服务中进行获取消息并将消息推送到ws在线的目标用户
RedisHelper.Subscribe(("single_msg", RedisSubScribleMessage)); // Redis消息订阅 void RedisSubScribleMessage(CSRedis.CSRedisClient.SubscribeMessageEventArgs e) { var data = StringHelper.Json2Object(e.Body); var outgoing = new ArraySegment(Encoding.UTF8.GetBytes(data.Content)); var toClientSocket = webSocketList.GetValueOrDefault(data.ToUserId); if (toClientSocket != null) { var result = SendMsg(toClientSocket.WebSocket, data.Type, data.Content).Result; } } // ws消息发送 public static async Task SendMsg(WebSocket webSocket, string type, object messageData) { WebSocketMsgOutput webSocketMsgModel = new WebSocketMsgOutput(); webSocketMsgModel.Type = type; webSocketMsgModel.Data = messageData; var message = Encoding.UTF8.GetBytes(StringHelper.ToJson(webSocketMsgModel)); await webSocket.SendAsync(message, WebSocketMessageType.Text, true, CancellationToken.None); return true; }
以上,便可实现了 ws 的消息的推送,不与业务场景相关,所有的业务处理都可在 web 服务中实现
还提供额外的 api功能 给 web服务 去使用,比如 在线的用户数量 和 某个用户是否在线,并提供消息发送测试接口
通过 ws 服务,我们能实现很多业务功能,比如 IM(即时消息通讯功能等),实时消息推送(web实时消息提醒,视频弹幕等)