123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564 |
- using System;
- using System.Collections.Concurrent;
- using System.Net;
- using System.Net.WebSockets;
- using WingServerCommon.Log;
- using WingInterfaceLibrary.Enum;
- using WingInterfaceLibrary.Request.Authentication;
- using WingInterfaceLibrary.Interface;
- using WingNotificationModule.Channel.WebSocket;
- using System.Threading.Tasks;
- using WingNotificationModule.Adapter;
- using WingInterfaceLibrary.Notifications;
- using System.Collections.Generic;
- using System.Linq;
- using WingInterfaceLibrary.Internal.Request;
- using WingInterfaceLibrary.Enum.NotificationEnum;
- namespace WingNotificationModule
- {
- internal partial class NotificationServer
- {
- private readonly ConcurrentDictionary<string, WebScoketLeaf> _leaves = new ConcurrentDictionary<string, WebScoketLeaf>();
- private HttpListener _httpListener;
- private IAuthenticationService _authenticationService;
- private readonly string _host;
- private readonly NotificationQueueManager _notifcationQueueManager;
- public NotificationServer(string host, int maxQueueCount)
- {
- _host = host;
- _notifcationQueueManager = new NotificationQueueManager(maxQueueCount, SendMessage);
- }
- /// <summary>
- /// Start server
- /// </summary>
- public void Start(IAuthenticationService authenticationService)
- {
- _authenticationService = authenticationService;
- _httpListener = new();
- _httpListener.Prefixes.Add(_host);
- _httpListener.Start();
- _httpListener.BeginGetContext(new AsyncCallback(GetContextCallBackAsync), _httpListener);
- Logger.WriteLineInfo($"{_host} is listening.");
- }
- private void GetContextCallBackAsync(IAsyncResult ar)
- {
- HttpListener listener = (HttpListener)ar.AsyncState;
- HttpListenerContext context = listener.EndGetContext(ar);
- try
- {
- listener.BeginGetContext(new AsyncCallback(GetContextCallBackAsync), listener);
- var token = context?.Request?.QueryString["Token"]?.ToString();
- var type = context?.Request?.QueryString["Type"]?.ToString();
- Logger.WriteLineInfo($"GetContextCallBackAsync token:{token},Type:{type}");
- HandleWebSocketAccept(context, token, type);
- }
- catch (Exception ex)
- {
- try
- {
- Logger.WriteLineWarn($"GetContextCallBackAsync exception:{ex}");
- context.Response.Abort();
- }
- catch (Exception exx)
- {
- Logger.WriteLineWarn($"context.Response.Abort() exception:{exx}");
- }
- }
- }
- async void HandleWebSocketAccept(HttpListenerContext context, string token, string type)
- {
- WebScoketLeaf leaf = null;
- try
- {
- WebSocketContext webSocketContext = await context.AcceptWebSocketAsync(subProtocol: null);
- var webSocket = webSocketContext.WebSocket;
- var state = webSocket.State;
- //默认连接是token,其他连接是token_1,token_2...
- var wsToken = "";
- if (string.IsNullOrWhiteSpace(type) || type == "0")
- {
- wsToken = token;
- }
- else
- {
- wsToken = token + "_" + type;
- }
- if (state == WebSocketState.Open)
- {
- leaf = new WebScoketLeaf(webSocket, wsToken);
- //check token id valid
- if (string.IsNullOrEmpty(wsToken))
- {
- await leaf.SendAsync(new DisconnectNotification()
- {
- });
- //scoketLeaf.Close(); //TODO if need to close leaf at server ?
- return;
- }
- if (type != WSConnectTypeEnum.AppletAPI.ToString("D"))//小程序连接不验证token
- {
- var result = await _authenticationService.ValidateTokenAsync(new ValidateTokenRequest() { Token = token }); //TODO do we need know why the token invalid ?
- if (result == null || result.Code != CustomerRpcCode.Ok)
- {
- await leaf.SendAsync(new DisconnectNotification()
- {
- });
- return;
- }
- }
- //cache web socket connection
- _leaves.AddOrUpdate(wsToken, (t) =>
- {
- Logger.WriteLineInfo($"Websocket connection with token {wsToken} status:{state}, add leaf");
- return leaf;
- }, (t, exist) =>
- {
- Logger.WriteLineInfo($"Websocket connection with token {wsToken} status:{state}, replace leaf");
- exist.Close();
- return leaf;
- });
- await leaf.SendAsync(new ConnectionNotification());
- }
- else
- {
- Logger.WriteLineWarn($"Websocket connection with token {wsToken} status:{state}");
- }
- }
- catch (Exception ex)
- {
- Logger.WriteLineError($"HandleWebSocketAccept with token {token} exception: {ex}");
- }
- }
- /// <summary>
- /// Send a message to client and return immediately after sending
- /// </summary>
- /// <param name="request">the message</param>
- /// <returns>bool</returns>
- /// <value>true/false</value>
- public async Task<bool> SendMessageAsync(SendNotificationRequest request)
- {
- var wsConnectType = request.WSConnectType;
- var tokens = await _authenticationService.GetTokensWithClientIdAsync(new GetTokensWithClientIdRequest() { ClientId = request.ClientId });
- tokens = GetDistinctTokenInfos(tokens);
- foreach (var token in tokens)
- {
- //如果存在第二窗口连接则使用第二窗口连接,如果没有第二窗口连接则使用主连接发送消息
- var wsToken = token.Code + "_" + wsConnectType.ToString("D");
- if (wsConnectType == WSConnectTypeEnum.ConsultationSecondWindow || wsConnectType == WSConnectTypeEnum.EducationSecondWindow || wsConnectType == WSConnectTypeEnum.AppletAPI)
- {
- if (_leaves.TryGetValue(wsToken, out var leafConsultation))
- {
- var sendResult = await leafConsultation.SendAsync(request.Message);
- if (sendResult)
- {
- return true;
- }
- }
- }
- else if (wsConnectType == WSConnectTypeEnum.RemoteConnectSecondWindow)
- {
- if (_leaves.TryGetValue(wsToken, out var leafRemote))
- {
- var sendResult = await leafRemote.SendAsync(request.Message);
- }
- else
- {
- wsToken = token + "_" + WSConnectTypeEnum.ConsultationSecondWindow.ToString("D");
- if (_leaves.TryGetValue(wsToken, out var leafConsultation))
- {
- var sendResult = leafConsultation.Send(request.Message);
- }
- }
- }
- if (_leaves.TryGetValue(token.Code, out var leaf))
- {
- await leaf.SendAsync(request.Message);
- return true;
- }
- else
- {
- Logger.WriteLineWarn($"Not found connection for token {token}");
- }
- }
- return false;
- }
- /// <summary>
- /// Send a message to client and return immediately after sending
- /// </summary>
- /// <param name="request">the message</param>
- /// <returns>bool</returns>
- /// <value>true/false</value>
- public async Task<bool> SendMessageAsync(IList<TokenDTO> tokens, SendNotificationRequest request)
- {
- var wsConnectType = request.WSConnectType;
- tokens = GetDistinctTokenInfos(tokens);
- foreach (var token in tokens)
- {
- //如果存在第二窗口连接则使用第二窗口连接,如果没有第二窗口连接则使用主连接发送消息
- var wsToken = token.Code + "_" + wsConnectType.ToString("D");
- if (wsConnectType == WSConnectTypeEnum.ConsultationSecondWindow || wsConnectType == WSConnectTypeEnum.EducationSecondWindow || wsConnectType == WSConnectTypeEnum.AppletAPI)
- {
- if (_leaves.TryGetValue(wsToken, out var leafConsultation))
- {
- var sendResult = await leafConsultation.SendAsync(request.Message);
- if (sendResult)
- {
- return true;
- }
- }
- }
- else if (wsConnectType == WSConnectTypeEnum.RemoteConnectSecondWindow)
- {
- if (_leaves.TryGetValue(wsToken, out var leafRemote))
- {
- var sendResult = await leafRemote.SendAsync(request.Message);
- }
- else
- {
- wsToken = token + "_" + WSConnectTypeEnum.ConsultationSecondWindow.ToString("D");
- if (_leaves.TryGetValue(wsToken, out var leafConsultation))
- {
- var sendResult = leafConsultation.Send(request.Message);
- }
- }
- }
- if (_leaves.TryGetValue(token.Code, out var leaf))
- {
- await leaf.SendAsync(request.Message);
- return true;
- }
- else
- {
- Logger.WriteLineWarn($"Not found connection for token {token}");
- }
- }
- return false;
- }
- /// <summary>
- /// Post a message to client and return immediately after add to sending queue
- /// </summary>
- /// <param name="request">the message</param>
- /// <returns>bool</returns>
- /// <value>true/false</value>
- public async Task<bool> PostMessageAsync(SendNotificationRequest request)
- {
- var tokens = await _authenticationService.GetTokensWithClientIdAsync(new GetTokensWithClientIdRequest() { ClientId = request.ClientId });
- tokens = GetDistinctTokenInfos(tokens);
- foreach (var token in tokens)
- {
- Logger.WriteLineInfo($"NotificationService PostMessageAsync, account name:{token.AccountName}, token: {token.Code}, wsConnectType{request.WSConnectType},message:{request.Message}");
- _notifcationQueueManager.Post(string.Empty, new PostMessageParam(token.Code, request.Message, request.WSConnectType));
- }
- return true;
- }
- /// <summary>
- /// Post a message to client and return immediately after add to sending queue
- /// </summary>
- /// <param name="request">the message</param>
- /// <returns>bool</returns>
- /// <value>true/false</value>
- public async Task<bool> PostMessageAsync(IList<TokenDTO> tokens, object messageData, WSConnectTypeEnum wsConnectType)
- {
- tokens = GetDistinctTokenInfos(tokens);
- foreach (var token in tokens)
- {
- Logger.WriteLineInfo($"NotificationService PostMessageAsync, account name:{token.AccountName}, token: {token.Code},wsConnectType{wsConnectType}, message:{messageData}");
- _notifcationQueueManager.Post(string.Empty, new PostMessageParam(token.Code, messageData, wsConnectType));
- }
- return true;
- }
- private bool SendMessage(PostMessageParam param)
- {
- try
- {
- Logger.WriteLineInfo($"SendMessage start, Token:{param.Token},WSConnectType:{param.WSConnectType},Message:{param.Message}");
- var wsConnectType = param.WSConnectType;
- var token = param.Token;
- //如果存在第二窗口连接则使用第二窗口连接,如果没有第二窗口连接则使用主连接发送消息
- var wsToken = token + "_" + wsConnectType.ToString("D");
- if (param.WSConnectType == WSConnectTypeEnum.ConsultationSecondWindow || param.WSConnectType == WSConnectTypeEnum.EducationSecondWindow || wsConnectType == WSConnectTypeEnum.AppletAPI)
- {
- if (_leaves.TryGetValue(wsToken, out var leafConsultation))
- {
- var sendResult = leafConsultation.Send(param.Message);
- if (sendResult)
- {
- return true;
- }
- }
- }
- else if (wsConnectType == WSConnectTypeEnum.RemoteConnectSecondWindow)
- {
- if (_leaves.TryGetValue(wsToken, out var leafRemote))
- {
- var sendResult = leafRemote.Send(param.Message);
- }
- else
- {
- wsToken = token + "_" + WSConnectTypeEnum.ConsultationSecondWindow.ToString("D");
- if (_leaves.TryGetValue(wsToken, out var leafConsultation))
- {
- var sendResult = leafConsultation.Send(param.Message);
- }
- }
- }
- if (_leaves.TryGetValue(token, out var leaf))
- {
- leaf.Send(param.Message);
- }
- else
- {
- Logger.WriteLineWarn($"Not found connection for token {token} when post message");
- }
- }
- catch (Exception ex)
- {
- Logger.WriteLineError($"post message ex:{ex}");
- return false;
- }
- return true;
- }
- /// <summary>
- /// 开启独立队列
- /// </summary>
- /// <param name="module"></param>
- /// <returns></returns>
- public string OpenNotifyQueueAsync(string module)
- {
- var msgQueueId = module + "_" + Guid.NewGuid().ToString("N").ToUpper();
- var openResult = _notifcationQueueManager.TryAdd(msgQueueId);
- return openResult ? msgQueueId : "DefaultMessageQueue";//todo 如果add失败返回什么
- }
- /// <summary>
- /// Post a message to clients and return immediately after add to sending queue
- /// </summary>
- /// <param name="request">the message</param>
- /// <returns>bool</returns>
- /// <value>true/false</value>
- public async Task<bool> BroadcastMessageAsync(BroadcastNotificationRequest request)
- {
- var tokens = await _authenticationService.GetTokenWithClientIdsAsync(new GetTokenWithClientIdsRequest() { ClientIds = request.ClientIds.ToList() });
- tokens = GetDistinctTokenInfos(tokens);
- foreach (var token in tokens)
- {
- Logger.WriteLineInfo($"NotificationService BroadcastMessageAsync, account name:{token.AccountName}, token: {token.Code}, msgQueueId:{request.MsgQueueId},wsConnectType:{request.WSConnectType}, message:{request.Message}");
- _notifcationQueueManager.Post(request.MsgQueueId, new PostMessageParam(token.Code, request.Message, request.WSConnectType));
- }
- return true;
- }
- /// <summary>
- /// Post a message to clients and return immediately after add to sending queue
- /// </summary>
- /// <param name="request">the message</param>
- /// <returns>bool</returns>
- /// <value>true/false</value>
- public async Task<bool> BroadcastMessageAsync(IList<TokenDTO> tokens, BroadcastNotificationRequest request)
- {
- tokens = GetDistinctTokenInfos(tokens);
- foreach (var token in tokens)
- {
- Logger.WriteLineInfo($"NotificationService BroadcastMessageAsync, account name:{token.AccountName}, token: {token.Code}, msgQueueId:{request.MsgQueueId},wsConnectType:{request.WSConnectType}, message:{request.Message}");
- _notifcationQueueManager.Post(request.MsgQueueId, new PostMessageParam(token.Code, request.Message, request.WSConnectType));
- }
- return true;
- }
- /// <summary>
- /// 移除websocket用户
- /// </summary>
- /// <param name="request">请求实体</param>
- /// <returns>true</returns>
- public async Task<bool> RemoveToken(string tokenCode)
- {
- var result = true;
- //如果存在第二窗口连接则使用第二窗口连接,如果没有第二窗口连接则使用主连接发送消息
- var wsConsultationToken = tokenCode + "_" + WSConnectTypeEnum.ConsultationSecondWindow.ToString("D");
- var wsEducationToken = tokenCode + "_" + WSConnectTypeEnum.EducationSecondWindow.ToString("D");
- var wsRemoteConnectToken = tokenCode + "_" + WSConnectTypeEnum.RemoteConnectSecondWindow.ToString("D");
- if (_leaves.ContainsKey(wsRemoteConnectToken))
- {
- result = result & _leaves.TryRemove(wsRemoteConnectToken, out _);
- }
- if (_leaves.ContainsKey(wsConsultationToken))
- {
- result = result & _leaves.TryRemove(wsConsultationToken, out _);
- }
- if (_leaves.ContainsKey(wsEducationToken))
- {
- result = result & _leaves.TryRemove(wsEducationToken, out _);
- }
- if (_leaves.ContainsKey(tokenCode))
- {
- result = result & _leaves.TryRemove(tokenCode, out _);
- }
- return await Task.FromResult(result);
- }
- /// <summary>
- /// 移除websocket用户
- /// </summary>
- /// <param name="request">请求实体</param>
- /// <returns>true</returns>
- public async Task<bool> RemoveSingleTokenAsync(string tokenCode)
- {
- var result = false;
- if (_leaves.ContainsKey(tokenCode))
- {
- result = _leaves.TryRemove(tokenCode, out _);
- }
- return await Task.FromResult(result);
- }
- /// <summary>
- /// 关闭独立队列
- /// </summary>
- /// <param name="module"></param>
- /// <returns></returns>
- public bool CloseNotifyQueueAsync(string queueId)
- {
- return _notifcationQueueManager.TryRemove(queueId);
- }
- /// <summary>
- /// Stop server
- /// </summary>
- public void Stop()
- {
- _httpListener.Stop();
- }
- private IList<TokenDTO> GetDistinctTokenInfos(IList<TokenDTO> tokenInfos)
- {
- var tokens = new List<TokenDTO>();
- if (tokenInfos != null && tokenInfos.Any())
- {
- var tokenCodes = tokenInfos.Select(x => x.Code).Distinct();
- foreach (var item in tokenCodes)
- {
- tokens.Add(tokenInfos.FirstOrDefault(x => x.Code == item));
- }
- }
- return tokens;
- }
- internal class PostMessageParam
- {
- /// <summary>
- /// The token hash code
- /// </summary>
- /// <value></value>
- public string Token { get; }
- /// <summary>
- /// The request message from caller
- /// </summary>
- /// <value></value>
- public object Message { get; }
- public WSConnectTypeEnum WSConnectType { get; set; }
- public PostMessageParam(string token, object message, WSConnectTypeEnum wsConnectType)
- {
- this.Token = token;
- this.Message = message;
- this.WSConnectType = wsConnectType;
- }
- }
- private class WebScoketLeaf
- {
- WebSocketIO _webSocketIO;
- public string ClientToken { get; }
- public WebScoketLeaf(WebSocket webSocket, string token)
- {
- ClientToken = token;
- _webSocketIO = new WebSocketIO(webSocket);
- }
- /// <summary>
- /// Send messge async
- /// </summary>
- /// <param name="message">The message will be send to client</param>
- /// <returns></returns>
- public async Task<bool> SendAsync(object message)
- {
- if (_webSocketIO.IsDisconnected)
- {
- Logger.WriteLineWarn($"WebSocket is closed,message:{message}");
- return false;
- }
- Logger.WriteLineInfo($"SendAsync start,message:{message}");
- var adapter = new BufferAdapter(message);
- var buffer = adapter.GetMessageBuffer();
- try
- {
- await _webSocketIO.SendAsync(buffer, WebSocketMessageType.Binary);
- Logger.WriteLineInfo($"SendAsync end, message:{message}");
- }
- catch (Exception ex)
- {
- Logger.WriteLineWarn($"SendAsync error,maybe sendWindows closed:ex:{ex}");
- return false;
- }
- return true;
- }
- /// <summary>
- /// Send messge
- /// </summary>
- /// <param name="message">The message will be send to client</param>
- /// <returns></returns>
- public bool Send(object message)
- {
- if (_webSocketIO.IsDisconnected)
- {
- Logger.WriteLineWarn($"WebSocket is closed,message:{message}");
- return false;
- }
- Logger.WriteLineInfo($"Send start, message:{message}");
- var adapter = new BufferAdapter(message);
- var buffer = adapter.GetMessageBuffer();
- try
- {
- _webSocketIO.Send(buffer, WebSocketMessageType.Binary);
- Logger.WriteLineInfo($"Send end, message:{message}");
- }
- catch (Exception ex)
- {
- Logger.WriteLineWarn($"Send error,maybe sendWindows closed:ex:{ex}");
- return false;
- }
- return true;
- }
- /// <summary>
- /// Close the web socket leaf connection
- /// </summary>
- public void Close()
- {
- _webSocketIO.Dispose();
- }
- }
- }
- }
|