一.目的

本着服务和业务逻辑分离的思想,将 websocket 服务进行项目分离,不再集成在 web 服务中(业务端),很大的好处是,当发布 web 服务的时候,websocket 的连接不会断开,保证了线上用户不受服务中断的影响

并且 websocket 的连接只用于 客户端 消息的接收客户端 消息的发送由 web 服务进行处理,并通过 redis 消息订阅 的功能去推送消息(而不是 websocket 连接里面去推送消息),保证了消息发送的稳定性(毕竟 ws 在网络不好的情况下很容易断连)

二.设计思路

代码: Github

1.提供一个 api 获取 ws的链接,并使用 Appid,APPsecret 作为请求方的校验
请求方带入 clientId 的参数作为用户凭据,然后接口通过给出 ws 链接(并且带上token,并且设置时效性,保证安全性)所以如果 ws 断连后,需要重新获取 ws 链接进行重连

[HttpPost]
public BaseResultOutput GetWsLink(GetWsServerInput input)
{
    BaseResultOutput baseResultOutput = new BaseResultOutput();
    //授权校验
    if (auth == null)
    {
        auth = GetAuthConfig();
    }

    if (auth.IsCheck && (auth.AppId != input.AppId || auth.AppSecret != input.AppSecret))
    {
        baseResultOutput.Code = ResultCode.Error;
        baseResultOutput.Message = "AppId 或 AppSecret 错误";
        return baseResultOutput;
    }
              
    var token = $"{Guid.NewGuid()}{Guid.NewGuid()}".Replace("-", "");
    if (string.IsNullOrEmpty(input.ClientId))
    {
        baseResultOutput.Code = Enum.Base.ResultCode.Error;
        baseResultOutput.Message = "参数异常";
        return baseResultOutput;
    }
    //token值
    var token_value = new ImUserTokenModel()
    {
        ClientId = input.ClientId
    };

    //生成token校验数据和60s内连接的有效期
    RedisHelper.Set($"IM_Token_{token}", $"{StringHelper.ToJson(token_value)}", 60);
    //返回ws链接
    string host = "";
    if (Request.IsHttps)
    {   //配置文件获取ws服务的域名或ip
        host = $"wss://{config.Host}/imserver/ws?token={token}";
    }
    else
    {
        host = $"ws://{config.Host}/imserver/ws?token={token}";
    }

    var getWsServerOutput = new GetWsServerOutput()
    {
        Url = host
    };
    baseResultOutput.Data = getWsServerOutput;
    return baseResultOutput;
}

2.请求方获取到正确 ws连接 后进行连接,这时候我们使用 .net core 原生的 websocket 服务去实现
具体可参考MSDN文档
因为我们使用了带 token 参数的链接,所以我们还需要进行对 token 的校验
SocketHandle.cs

static async Task Acceptor(HttpContext context, Func n)
{
    if (context.WebSockets.IsWebSocketRequest)
    {
        string token = context.Request.Query["token"];
        if (string.IsNullOrEmpty(token)) return;

        //校验token有效性(token从业务端请求进行生成,服务端进行校验)
        var token_value_str = RedisHelper.Get($"IM_Token_{token}");
        if (string.IsNullOrEmpty(token_value_str)) return;
        var token_value = StringHelper.Json2Object(token_value_str);
        var socket = await context.WebSockets.AcceptWebSocketAsync();
        var h = new SocketHandle(socket, token_value.ClientId);
        AddWebsocket(token_value.ClientId, socket);
        await h.EchoLoop();
    }
    else
    {
        context.Response.StatusCode = 400;
    }
}

async Task EchoLoop()
{
    var buffer = new byte[BufferSize];
    var seg = new ArraySegment(buffer);
    await SendMsg(this.socket, "Link", "连接成功");
    try
    {
        while (this.socket.State == WebSocketState.Open)
        {
            WebSocketMsgOutput webSocketMsgModel = new WebSocketMsgOutput();
            var incoming = await this.socket.ReceiveAsync(seg, CancellationToken.None);
        }
        this.socket.Abort();
    }
    catch (Exception e)
    {
        var outgoing = Encoding.UTF8.GetBytes(e.Message);
        await SendMsg(this.socket, "Msg", outgoing);
    }
    //移除在线ws记录
    RemoveWebsocket(this.clientId);
}

3.然后,在上下线的时候,添加记录,并且进行上下线消息的订阅推送(ws服务推送,web服务订阅)

// 添加ws在线记录
private static void AddWebsocket(string clientId, WebSocket webSocket)
{
    var data = webSocketList.Count(p => p.Key == clientId);
    if (data == 0)
    {
        webSocketList.GetOrAdd(clientId, cli => new WebSocketListModel { WebSocket = webSocket });
    }
    else
    {
        //如果连接已存在,将已存在的移除,建立新的,保证新的连接可以收到消息
        RemoveWebsocket(clientId);
        webSocketList.GetOrAdd(clientId, cli => new WebSocketListModel { WebSocket = webSocket });
    }
    //订阅上线通知
    var onlinedata = new
    {
        Type = "Online",
        Client = clientId
    };
    RedisHelper.Publish("onoffline_msg", StringHelper.ToJson(onlinedata));
}

// 移除ws在线记录
private static void RemoveWebsocket(string clientId)
{
    webSocketList.TryRemove(clientId, out var oldcli);
    //订阅下线通知
    var data = new
    {
        Type = "OffLine",
        Client = clientId
    };
    RedisHelper.Publish("onoffline_msg", StringHelper.ToJson(data));
}

4.最后,因为我们的消息也是通过 redis 的订阅推送获取过来的(web服务推送,ws服务订阅),所以我们也需要在 ws 服务中进行获取消息并将消息推送到ws在线的目标用户

RedisHelper.Subscribe(("single_msg", RedisSubScribleMessage));
// Redis消息订阅
void RedisSubScribleMessage(CSRedis.CSRedisClient.SubscribeMessageEventArgs e)
{
        var data = StringHelper.Json2Object(e.Body);
        var outgoing = new ArraySegment(Encoding.UTF8.GetBytes(data.Content));

        var toClientSocket = webSocketList.GetValueOrDefault(data.ToUserId);
        if (toClientSocket != null)
        {
            var result = SendMsg(toClientSocket.WebSocket, data.Type, data.Content).Result;
        }
}

// ws消息发送 
public static async Task SendMsg(WebSocket webSocket, string type, object messageData)
{
        WebSocketMsgOutput webSocketMsgModel = new WebSocketMsgOutput();
        webSocketMsgModel.Type = type;
        webSocketMsgModel.Data = messageData;
        var message = Encoding.UTF8.GetBytes(StringHelper.ToJson(webSocketMsgModel));
        await webSocket.SendAsync(message, WebSocketMessageType.Text, true, CancellationToken.None);
        return true;
}

以上,便可实现了 ws 的消息的推送,不与业务场景相关,所有的业务处理都可在 web 服务中实现

还提供额外的 api功能 给 web服务 去使用,比如 在线的用户数量 和 某个用户是否在线,并提供消息发送测试接口

 

通过 ws 服务,我们能实现很多业务功能,比如 IM(即时消息通讯功能等),实时消息推送(web实时消息提醒,视频弹幕等)