123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523 |
- using System.Threading.Tasks;
- using WingServerCommon.Service;
- using WingServerCommon.Config;
- using WingServerCommon.Config.Parameters;
- using WingInterfaceLibrary.Interface;
- using WingInterfaceLibrary.Interface.DBInterface;
- using WingInterfaceLibrary.Request.Notification;
- using System.Collections.Generic;
- using WingInterfaceLibrary.Enum;
- using WingInterfaceLibrary.Notifications;
- using WingServerCommon.Log;
- using WingInterfaceLibrary.DB.Request;
- using System;
- using WingInterfaceLibrary.DTO.Message;
- using WingInterfaceLibrary.Request.Authentication;
- using System.Linq;
- using Newtonsoft.Json.Linq;
- using WingServerCommon.Interfaces.Cache;
- using WingServerCommon.Utilities;
- using WingInterfaceLibrary.Request.Lock;
- using WingInterfaceLibrary.Internal.Interface;
- using WingInterfaceLibrary.Internal.Request;
- using WingInterfaceLibrary.Request;
- namespace WingNotificationModule.Service
- {
- /// <summary>
- /// 通知服务
- /// </summary>
- public class NotificationService : JsonRpcService, INotificationService
- {
- private NotificationServer _notificationServer;
- private IAuthenticationService _authenticationService;
- private IStorageDBService _storageDBService;
- private IMessageDBService _messageDBService;
- private IUserDBService _userDBService;
- private IMasterInteractionCenterService _masterInteractionCenterService;
- private string _serverID;
- private string _serverHost;
- private LockManager _lockManager;
- private ILockService _lockService;
- /// <summary>
- /// 构造函数
- /// </summary>
- public NotificationService()
- {
- _serverHost = ConfigurationManager.Host;
- _serverID = ConfigurationManager.GetParammeter<StringParameter>("General", "ServerID").Value;
- var host = ConfigurationManager.GetParammeter<StringParameter>("Notification", "Host");
- var maxQueueCount = ConfigurationManager.GetParammeter<IntParameter>("Notification", "MaxQueueCount").Value;
- _notificationServer = new NotificationServer(host.Value, maxQueueCount);
- }
- private void ThrowCustomerException(CustomerRpcCode customerRpcCodeEnum, string msg)
- {
- ThrowRpcException((int)customerRpcCodeEnum, msg);
- }
- public override void Load(JsonRpcClientPool jsonRpcClientPool)
- {
- base.Load(jsonRpcClientPool);
- _authenticationService = GetProxy<IAuthenticationService>();
- _masterInteractionCenterService = GetProxy<IMasterInteractionCenterService>();
- _messageDBService = GetProxy<IMessageDBService>();
- _userDBService = GetProxy<IUserDBService>();
- _notificationServer.Start(_authenticationService);
- _storageDBService = GetProxy<IStorageDBService>();
- _lockService = GetProxy<ILockService>();
- var inProcessServiceSettings = ConfigurationManager.GetParammeter<StringParameter>("Services", "InProcess").Value;
- var remoteServiceSettings = ConfigurationManager.GetParammeter<StringParameter>("Services", "Remote").Value;
- if (!inProcessServiceSettings.Contains("LockService") && !remoteServiceSettings.Contains("LockService"))
- {
- _lockManager = new LockManager();
- }
- else
- {
- _lockManager = new LockManager(ApplyLock, ReleaseLock);
- }
- }
- private bool ApplyLock(string lockKey)
- {
- try
- {
- var result = _lockService.ApplyLockAsync(new ApplyLockRequest
- {
- LockKey = lockKey
- }).Result;
- return result.IsSuccess;
- }
- catch (Exception ex)
- {
- Logger.WriteLineError($"NotificationService ApplyLock err, lock key:{lockKey}, ex:{ex}");
- return false;
- }
- }
- private bool ReleaseLock(string lockUniqueCode)
- {
- try
- {
- var result = _lockService.ReleaseLockAsync(new ReleaseLockRequest
- {
- LockUniqueCode = lockUniqueCode
- }).Result;
- return result.IsSuccess;
- }
- catch (Exception ex)
- {
- Logger.WriteLineError($"NotificationService ReleaseLock err, release key:{lockUniqueCode}, ex:{ex}");
- return false;
- }
- }
- /// <summary>
- /// 发送通知,等待发送完消息返回是否成功
- /// </summary>
- /// <param name="request">发送通知请求</param>
- /// <returns>是否成功</returns>
- /// <value>true</value>
- public async Task<bool> SendMessageAsync(SendNotificationRequest request)
- {
- try
- {
- var res = await AddMessageInfoAsync(request);
- if (res)
- {
- IList<TokenDTO> tokens;
- if (request.Tokens != null && request.Tokens.Count > 0)
- {
- tokens = request.Tokens;
- }
- else
- {
- tokens = await _authenticationService.GetTokensWithClientIdAsync(new GetTokensWithClientIdRequest() { ClientId = request.ClientId });
- }
- var localTokens = tokens.Where(v => v.LoginServer == ConfigurationManager.Host).ToList();
- var result = await _notificationServer.SendMessageAsync(localTokens, request);
- var otherTokens = tokens.Where(v => v.LoginServer != ConfigurationManager.Host).ToList();
- if (otherTokens != null && otherTokens.Count > 0)
- {
- var notificationRequest = new SyncNotification()
- {
- Clients = otherTokens.Select(v => new NotificationClientInfo()
- {
- ClientId = v.ClientId,
- LoginServerUrl = v.LoginServer
- }).ToList(),
- Message = request.Message,
- WSConnectType = request.WSConnectType
- };
- await SyncToMasterAsync(SyncTypeEnum.Notification, notificationRequest);
- }
- return result;
- }
- else
- {
- return res;
- }
- }
- catch (Exception ex)
- {
- Logger.WriteLineError($"NotificationService SendMessageAsync RelevanceCode: {request.RelevanceCode}, err: {ex}");
- return false;
- }
- }
- /// <summary>
- /// 发送通知,消息进入发送队列,返回是否成功进入发送队列
- /// </summary>
- /// <param name="request">发送通知请求</param>
- /// <returns>是否成功</returns>
- /// <value>true</value>
- public async Task<bool> PostMessageAsync(SendNotificationRequest request)
- {
- try
- {
- var res = await AddMessageInfoAsync(request);
- if (res)
- {
- IList<TokenDTO> tokens;
- if (request.Tokens != null && request.Tokens.Count > 0)
- {
- tokens = request.Tokens;
- }
- else
- {
- tokens = await _authenticationService.GetTokensWithClientIdAsync(new GetTokensWithClientIdRequest() { ClientId = request.ClientId });
- }
- var localTokens = tokens.Where(v => v.LoginServer == ConfigurationManager.Host).ToList();
- var result = await _notificationServer.PostMessageAsync(localTokens, request.Message, request.WSConnectType);
- var otherTokens = tokens.Where(v => v.LoginServer != ConfigurationManager.Host).ToList();
- if (otherTokens != null && otherTokens.Count > 0)
- {
- var notificationRequest = new SyncNotification()
- {
- Clients = otherTokens.Select(v => new NotificationClientInfo()
- {
- ClientId = v.ClientId,
- LoginServerUrl = v.LoginServer
- }).ToList(),
- Message = request.Message,
- WSConnectType = request.WSConnectType
- };
- await SyncToMasterAsync(SyncTypeEnum.Notification, notificationRequest);
- }
- return result;
- }
- else
- {
- return res;
- }
- }
- catch (Exception ex)
- {
- Logger.WriteLineError($"NotificationService PostMessageAsync RelevanceCode: {request.RelevanceCode}, err: {ex}");
- return false;
- }
- }
- /// <summary>
- /// 广播消息,等待发送完消息返回是否成功
- /// </summary>
- /// <param name="request">发送通知请求</param>
- /// <returns>是否成功</returns>
- /// <value>true</value>
- public async Task<bool> BroadcastMessageAsync(BroadcastNotificationRequest request)
- {
- try
- {
- var res = await AddMessageInfoAsync(null, request);
- if (res)
- {
- var tokens = await _authenticationService.GetTokenWithClientIdsAsync(new GetTokenWithClientIdsRequest() { ClientIds = request.ClientIds.ToList() });
- var localTokens = tokens.Where(v => v.LoginServer == ConfigurationManager.Host).ToList();
- var result = await _notificationServer.BroadcastMessageAsync(localTokens, request);
- if (request.IsNeedSyn)
- {
- var otherTokens = tokens.Where(v => v.LoginServer != ConfigurationManager.Host).ToList();
- if (otherTokens != null && otherTokens.Count > 0)
- {
- var notificationRequest = new SyncNotification()
- {
- Clients = otherTokens.Select(v => new NotificationClientInfo()
- {
- ClientId = v.ClientId,
- LoginServerUrl = v.LoginServer
- }).ToList(),
- Message = request.Message,
- WSConnectType = request.WSConnectType
- };
- await SyncToMasterAsync(SyncTypeEnum.Notification, notificationRequest);
- }
- }
- return result;
- }
- else
- {
- return res;
- }
- }
- catch (Exception ex)
- {
- Logger.WriteLineError($"NotificationService BroadcastMessageAsync RelevanceCode: {request.RelevanceCode}, err: {ex}");
- return false;
- }
- }
- /// <summary>转发请求给主服务器</summary>
- /// <param name="syncType"></param>
- /// <param name="roomId"></param>
- /// <param name="roomId"></param>
- /// <param name="message"></param>
- /// <returns></returns>
- private async Task SyncToMasterAsync(SyncTypeEnum syncType, SyncNotification message)
- {
- try
- {
- if (ConfigurationManager.IsDistributed)
- {
- var syncRequest = new SyncReceiveServiceDataRequest
- {
- SyncService = SyncServiceEnum.Notification,
- SyncType = syncType,
- SourceUrl = ConfigurationManager.Host,
- ServerID = _serverID,
- ServiceDataJson = "",
- Notification = message
- };
- var result = await _masterInteractionCenterService.SyncReceiveSlaveServiceDataAsync(syncRequest);
- if (!result)
- {
- Logger.WriteLineWarn($"NotificationService SyncToMasterAsync failed, syncType:{syncType.ToString()}");
- }
- }
- }
- catch (Exception ex)
- {
- Logger.WriteLineError($"NotificationService SyncToMasterAsync err, syncType:{syncType.ToString()}, err: {ex}");
- }
- }
- public async Task<string> OpenNotifyQueueAsync(OpenNotifyQueueRequest request)
- {
- var msgQueueId = _notificationServer.OpenNotifyQueueAsync(request.Module);
- Logger.WriteLineInfo($"NotificationService OpenNotifyQueueAsync, module:{request.Module}, msgQueueId:{msgQueueId}");
- return msgQueueId;
- }
- public async Task<bool> CloseNotifyQueueAsync(CloseNotifyQueueRequest request)
- {
- var result = _notificationServer.CloseNotifyQueueAsync(request.MsgQueueId);
- Logger.WriteLineInfo($"NotificationService CloseNotifyQueueAsync, msgQueueId:{request.MsgQueueId}");
- return result;
- }
- /// <summary>
- /// 移除websocket用户
- /// </summary>
- /// <param name="request">请求实体</param>
- /// <returns>true</returns>
- public async Task<bool> RemoveToken(TokenRequest request)
- {
- var result = await _notificationServer.RemoveToken(request.Token);
- return result;
- }
- /// <summary>
- /// 移除指定单个websocket连接
- /// </summary>
- /// <param name="request">请求实体</param>
- /// <returns>true</returns>
- public async Task<bool> RemoveSingleTokenAsync(TokenRequest request)
- {
- var result = await _notificationServer.RemoveSingleTokenAsync(request.Token);
- return result;
- }
- /// <summary>
- /// 新增消息列表接口
- /// </summary>
- /// <param name="request">请求消息实体</param>
- /// <returns>true</returns>
- private async Task<bool> AddMessageInfoAsync(SendNotificationRequest request, BroadcastNotificationRequest broadcastRequest = null)
- {
- try
- {
- if (request == null && broadcastRequest == null)
- {
- ThrowCustomerException(CustomerRpcCode.NotificationParamError, "Notification param error");
- }
- AddMessagesDBRequest req = null;
- if (request != null)
- {
- IList<TokenDTO> tokens;
- if (request.Tokens != null && request.Tokens.Count > 0)
- {
- tokens = request.Tokens;
- }
- else
- {
- tokens = await _authenticationService.GetTokensWithClientIdAsync(new GetTokensWithClientIdRequest() { ClientId = request.ClientId });
- }
- var tokenInfo = tokens.FirstOrDefault() ?? new TokenDTO();
- string accountName = "";
- if (string.IsNullOrEmpty(tokenInfo.ClientId))
- {
- if (broadcastRequest.ReceiverType == ApplicantTypeEnum.Client)
- {
- var userList = CacheMaintenance.Instance.Get<IUserInfoManager>().Where(x => x.Code == request.ClientId)?.ToList() ?? new List<CacheUserDTO>();
- if (userList.Count > 0)
- {
- var userInfoDTO = userList.FirstOrDefault() ?? new CacheUserDTO();
- if (!string.IsNullOrEmpty(userInfoDTO.FullName))
- {
- accountName = userInfoDTO.FullName;
- }
- else
- {
- accountName = userInfoDTO.UserName;
- }
- }
- }
- else if (broadcastRequest.ReceiverType == ApplicantTypeEnum.Device)
- {
- var deviceList = CacheMaintenance.Instance.Get<IDeviceInfosManager>().Where(x => x.Code == request.ClientId)?.ToList() ?? new List<CacheDeviceDTO>();
- if (deviceList.Count > 0)
- {
- var deviceInfoDTO = deviceList.FirstOrDefault() ?? new CacheDeviceDTO();
- accountName = deviceInfoDTO.Name;
- }
- }
- }
- else
- {
- accountName = tokenInfo.AccountName;
- }
- var clientInfos = new List<ClientInfoDTO>()
- {
- new ClientInfoDTO() {
- ClientId = request.ClientId,
- Name = accountName,
- IsReaded = false
- }
- };
- JObject jsonObj = JObject.Parse(request.JsonMessage);
- req = new AddMessagesDBRequest()
- {
- Code = jsonObj["Code"].ToString(),
- NotificationType = request.NotificationType,
- Content = request.JsonMessage,
- ServerHost = ConfigurationManager.Host,
- NotifyTime = DateTime.UtcNow,
- ReceiverType = request.ReceiverType,
- TransactionType = request.TransactionType,
- RelevanceCode = request.RelevanceCode,
- ClientInfos = clientInfos
- };
- }
- else
- {
- var clientInfos = new List<ClientInfoDTO>();
- if (broadcastRequest.ClientIds?.Count > 0)
- {
- foreach (var clientId in broadcastRequest.ClientIds)
- {
- var curTokens = await _authenticationService.GetTokensWithClientIdAsync(new GetTokensWithClientIdRequest() { ClientId = clientId });
- var curTokenInfo = curTokens.FirstOrDefault() ?? new TokenDTO();
- string accountName = "";
- if (string.IsNullOrEmpty(curTokenInfo.ClientId))
- {
- if (broadcastRequest.ReceiverType == ApplicantTypeEnum.Client)
- {
- var userList = CacheMaintenance.Instance.Get<IUserInfoManager>().Where(x => x.Code == clientId)?.ToList() ?? new List<CacheUserDTO>();
- if (userList.Count > 0)
- {
- var userInfoDTO = userList.FirstOrDefault() ?? new CacheUserDTO();
- if (!string.IsNullOrEmpty(userInfoDTO.FullName))
- {
- accountName = userInfoDTO.FullName;
- }
- else
- {
- accountName = userInfoDTO.UserName;
- }
- }
- }
- else if (broadcastRequest.ReceiverType == ApplicantTypeEnum.Device)
- {
- var deviceList = CacheMaintenance.Instance.Get<IDeviceInfosManager>().Where(x => x.Code == clientId)?.ToList() ?? new List<CacheDeviceDTO>();
- if (deviceList.Count > 0)
- {
- var deviceInfoDTO = deviceList.FirstOrDefault() ?? new CacheDeviceDTO();
- accountName = deviceInfoDTO.Name;
- }
- }
- }
- else
- {
- accountName = curTokenInfo.AccountName;
- }
- var entity = new ClientInfoDTO()
- {
- ClientId = clientId,
- Name = accountName,
- IsReaded = false
- };
- clientInfos.Add(entity);
- }
- }
- JObject jsonObj = JObject.Parse(broadcastRequest.JsonMessage);
- req = new AddMessagesDBRequest()
- {
- Code = jsonObj["Code"].ToString(),
- NotificationType = broadcastRequest.NotificationType,
- Content = broadcastRequest.JsonMessage,
- ServerHost = ConfigurationManager.Host,
- NotifyTime = DateTime.UtcNow,
- ReceiverType = broadcastRequest.ReceiverType,
- TransactionType = broadcastRequest.TransactionType,
- RelevanceCode = broadcastRequest.RelevanceCode,
- ClientInfos = clientInfos
- };
- }
- var result = await _messageDBService.AddMessageInfoDBAsync(req);
- return result;
- }
- catch (Exception ex)
- {
- Logger.WriteLineError($"NotificationService AddMessageInfoAsync err, {ex}");
- return false;
- }
- }
- /// <summary>
- /// 接收到同步信息
- /// </summary>
- /// <param name="request">The request</param>
- /// <returns></returns>
- /// <show>false</show>
- public async Task<bool> ReceiveSyncServerMessageAsync(SyncReceiveServiceDataRequest request)
- {
- try
- {
- Logger.WriteLineInfo($"Notification ReceiveSyncServerMessageAsync, source url:{request.SourceUrl}, syncType:{request.SyncType.ToString()},wsConnectType{request.Notification.WSConnectType}");
- SyncNotification message = request.Notification;
- var localClients = message.Clients.Where(v => v.LoginServerUrl == ConfigurationManager.Host).ToList();
- var localClientIds = localClients.Select(v => v.ClientId).ToList();
- var tokens = await _authenticationService.GetTokenWithClientIdsAsync(new GetTokenWithClientIdsRequest() { ClientIds = localClientIds });
- var localTokens = tokens.Where(v => v.LoginServer == ConfigurationManager.Host).ToList();
- var result = await _notificationServer.PostMessageAsync(localTokens, message.Message, message.WSConnectType);
- return result;
- }
- catch (Exception ex)
- {
- Logger.WriteLineError($"NotificationService ReceiveSyncServerMessageAsync err, {ex}");
- return false;
- }
- }
- }
- }
|