NotificationService.cs 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523
  1. using System.Threading.Tasks;
  2. using WingServerCommon.Service;
  3. using WingServerCommon.Config;
  4. using WingServerCommon.Config.Parameters;
  5. using WingInterfaceLibrary.Interface;
  6. using WingInterfaceLibrary.Interface.DBInterface;
  7. using WingInterfaceLibrary.Request.Notification;
  8. using System.Collections.Generic;
  9. using WingInterfaceLibrary.Enum;
  10. using WingInterfaceLibrary.Notifications;
  11. using WingServerCommon.Log;
  12. using WingInterfaceLibrary.DB.Request;
  13. using System;
  14. using WingInterfaceLibrary.DTO.Message;
  15. using WingInterfaceLibrary.Request.Authentication;
  16. using System.Linq;
  17. using Newtonsoft.Json.Linq;
  18. using WingServerCommon.Interfaces.Cache;
  19. using WingServerCommon.Utilities;
  20. using WingInterfaceLibrary.Request.Lock;
  21. using WingInterfaceLibrary.Internal.Interface;
  22. using WingInterfaceLibrary.Internal.Request;
  23. using WingInterfaceLibrary.Request;
  24. namespace WingNotificationModule.Service
  25. {
  26. /// <summary>
  27. /// 通知服务
  28. /// </summary>
  29. public class NotificationService : JsonRpcService, INotificationService
  30. {
  31. private NotificationServer _notificationServer;
  32. private IAuthenticationService _authenticationService;
  33. private IStorageDBService _storageDBService;
  34. private IMessageDBService _messageDBService;
  35. private IUserDBService _userDBService;
  36. private IMasterInteractionCenterService _masterInteractionCenterService;
  37. private string _serverID;
  38. private string _serverHost;
  39. private LockManager _lockManager;
  40. private ILockService _lockService;
  41. /// <summary>
  42. /// 构造函数
  43. /// </summary>
  44. public NotificationService()
  45. {
  46. _serverHost = ConfigurationManager.Host;
  47. _serverID = ConfigurationManager.GetParammeter<StringParameter>("General", "ServerID").Value;
  48. var host = ConfigurationManager.GetParammeter<StringParameter>("Notification", "Host");
  49. var maxQueueCount = ConfigurationManager.GetParammeter<IntParameter>("Notification", "MaxQueueCount").Value;
  50. _notificationServer = new NotificationServer(host.Value, maxQueueCount);
  51. }
  52. private void ThrowCustomerException(CustomerRpcCode customerRpcCodeEnum, string msg)
  53. {
  54. ThrowRpcException((int)customerRpcCodeEnum, msg);
  55. }
  56. public override void Load(JsonRpcClientPool jsonRpcClientPool)
  57. {
  58. base.Load(jsonRpcClientPool);
  59. _authenticationService = GetProxy<IAuthenticationService>();
  60. _masterInteractionCenterService = GetProxy<IMasterInteractionCenterService>();
  61. _messageDBService = GetProxy<IMessageDBService>();
  62. _userDBService = GetProxy<IUserDBService>();
  63. _notificationServer.Start(_authenticationService);
  64. _storageDBService = GetProxy<IStorageDBService>();
  65. _lockService = GetProxy<ILockService>();
  66. var inProcessServiceSettings = ConfigurationManager.GetParammeter<StringParameter>("Services", "InProcess").Value;
  67. var remoteServiceSettings = ConfigurationManager.GetParammeter<StringParameter>("Services", "Remote").Value;
  68. if (!inProcessServiceSettings.Contains("LockService") && !remoteServiceSettings.Contains("LockService"))
  69. {
  70. _lockManager = new LockManager();
  71. }
  72. else
  73. {
  74. _lockManager = new LockManager(ApplyLock, ReleaseLock);
  75. }
  76. }
  77. private bool ApplyLock(string lockKey)
  78. {
  79. try
  80. {
  81. var result = _lockService.ApplyLockAsync(new ApplyLockRequest
  82. {
  83. LockKey = lockKey
  84. }).Result;
  85. return result.IsSuccess;
  86. }
  87. catch (Exception ex)
  88. {
  89. Logger.WriteLineError($"NotificationService ApplyLock err, lock key:{lockKey}, ex:{ex}");
  90. return false;
  91. }
  92. }
  93. private bool ReleaseLock(string lockUniqueCode)
  94. {
  95. try
  96. {
  97. var result = _lockService.ReleaseLockAsync(new ReleaseLockRequest
  98. {
  99. LockUniqueCode = lockUniqueCode
  100. }).Result;
  101. return result.IsSuccess;
  102. }
  103. catch (Exception ex)
  104. {
  105. Logger.WriteLineError($"NotificationService ReleaseLock err, release key:{lockUniqueCode}, ex:{ex}");
  106. return false;
  107. }
  108. }
  109. /// <summary>
  110. /// 发送通知,等待发送完消息返回是否成功
  111. /// </summary>
  112. /// <param name="request">发送通知请求</param>
  113. /// <returns>是否成功</returns>
  114. /// <value>true</value>
  115. public async Task<bool> SendMessageAsync(SendNotificationRequest request)
  116. {
  117. try
  118. {
  119. var res = await AddMessageInfoAsync(request);
  120. if (res)
  121. {
  122. IList<TokenDTO> tokens;
  123. if (request.Tokens != null && request.Tokens.Count > 0)
  124. {
  125. tokens = request.Tokens;
  126. }
  127. else
  128. {
  129. tokens = await _authenticationService.GetTokensWithClientIdAsync(new GetTokensWithClientIdRequest() { ClientId = request.ClientId });
  130. }
  131. var localTokens = tokens.Where(v => v.LoginServer == ConfigurationManager.Host).ToList();
  132. var result = await _notificationServer.SendMessageAsync(localTokens, request);
  133. var otherTokens = tokens.Where(v => v.LoginServer != ConfigurationManager.Host).ToList();
  134. if (otherTokens != null && otherTokens.Count > 0)
  135. {
  136. var notificationRequest = new SyncNotification()
  137. {
  138. Clients = otherTokens.Select(v => new NotificationClientInfo()
  139. {
  140. ClientId = v.ClientId,
  141. LoginServerUrl = v.LoginServer
  142. }).ToList(),
  143. Message = request.Message,
  144. WSConnectType = request.WSConnectType
  145. };
  146. await SyncToMasterAsync(SyncTypeEnum.Notification, notificationRequest);
  147. }
  148. return result;
  149. }
  150. else
  151. {
  152. return res;
  153. }
  154. }
  155. catch (Exception ex)
  156. {
  157. Logger.WriteLineError($"NotificationService SendMessageAsync RelevanceCode: {request.RelevanceCode}, err: {ex}");
  158. return false;
  159. }
  160. }
  161. /// <summary>
  162. /// 发送通知,消息进入发送队列,返回是否成功进入发送队列
  163. /// </summary>
  164. /// <param name="request">发送通知请求</param>
  165. /// <returns>是否成功</returns>
  166. /// <value>true</value>
  167. public async Task<bool> PostMessageAsync(SendNotificationRequest request)
  168. {
  169. try
  170. {
  171. var res = await AddMessageInfoAsync(request);
  172. if (res)
  173. {
  174. IList<TokenDTO> tokens;
  175. if (request.Tokens != null && request.Tokens.Count > 0)
  176. {
  177. tokens = request.Tokens;
  178. }
  179. else
  180. {
  181. tokens = await _authenticationService.GetTokensWithClientIdAsync(new GetTokensWithClientIdRequest() { ClientId = request.ClientId });
  182. }
  183. var localTokens = tokens.Where(v => v.LoginServer == ConfigurationManager.Host).ToList();
  184. var result = await _notificationServer.PostMessageAsync(localTokens, request.Message, request.WSConnectType);
  185. var otherTokens = tokens.Where(v => v.LoginServer != ConfigurationManager.Host).ToList();
  186. if (otherTokens != null && otherTokens.Count > 0)
  187. {
  188. var notificationRequest = new SyncNotification()
  189. {
  190. Clients = otherTokens.Select(v => new NotificationClientInfo()
  191. {
  192. ClientId = v.ClientId,
  193. LoginServerUrl = v.LoginServer
  194. }).ToList(),
  195. Message = request.Message,
  196. WSConnectType = request.WSConnectType
  197. };
  198. await SyncToMasterAsync(SyncTypeEnum.Notification, notificationRequest);
  199. }
  200. return result;
  201. }
  202. else
  203. {
  204. return res;
  205. }
  206. }
  207. catch (Exception ex)
  208. {
  209. Logger.WriteLineError($"NotificationService PostMessageAsync RelevanceCode: {request.RelevanceCode}, err: {ex}");
  210. return false;
  211. }
  212. }
  213. /// <summary>
  214. /// 广播消息,等待发送完消息返回是否成功
  215. /// </summary>
  216. /// <param name="request">发送通知请求</param>
  217. /// <returns>是否成功</returns>
  218. /// <value>true</value>
  219. public async Task<bool> BroadcastMessageAsync(BroadcastNotificationRequest request)
  220. {
  221. try
  222. {
  223. var res = await AddMessageInfoAsync(null, request);
  224. if (res)
  225. {
  226. var tokens = await _authenticationService.GetTokenWithClientIdsAsync(new GetTokenWithClientIdsRequest() { ClientIds = request.ClientIds.ToList() });
  227. var localTokens = tokens.Where(v => v.LoginServer == ConfigurationManager.Host).ToList();
  228. var result = await _notificationServer.BroadcastMessageAsync(localTokens, request);
  229. if (request.IsNeedSyn)
  230. {
  231. var otherTokens = tokens.Where(v => v.LoginServer != ConfigurationManager.Host).ToList();
  232. if (otherTokens != null && otherTokens.Count > 0)
  233. {
  234. var notificationRequest = new SyncNotification()
  235. {
  236. Clients = otherTokens.Select(v => new NotificationClientInfo()
  237. {
  238. ClientId = v.ClientId,
  239. LoginServerUrl = v.LoginServer
  240. }).ToList(),
  241. Message = request.Message,
  242. WSConnectType = request.WSConnectType
  243. };
  244. await SyncToMasterAsync(SyncTypeEnum.Notification, notificationRequest);
  245. }
  246. }
  247. return result;
  248. }
  249. else
  250. {
  251. return res;
  252. }
  253. }
  254. catch (Exception ex)
  255. {
  256. Logger.WriteLineError($"NotificationService BroadcastMessageAsync RelevanceCode: {request.RelevanceCode}, err: {ex}");
  257. return false;
  258. }
  259. }
  260. /// <summary>转发请求给主服务器</summary>
  261. /// <param name="syncType"></param>
  262. /// <param name="roomId"></param>
  263. /// <param name="roomId"></param>
  264. /// <param name="message"></param>
  265. /// <returns></returns>
  266. private async Task SyncToMasterAsync(SyncTypeEnum syncType, SyncNotification message)
  267. {
  268. try
  269. {
  270. if (ConfigurationManager.IsDistributed)
  271. {
  272. var syncRequest = new SyncReceiveServiceDataRequest
  273. {
  274. SyncService = SyncServiceEnum.Notification,
  275. SyncType = syncType,
  276. SourceUrl = ConfigurationManager.Host,
  277. ServerID = _serverID,
  278. ServiceDataJson = "",
  279. Notification = message
  280. };
  281. var result = await _masterInteractionCenterService.SyncReceiveSlaveServiceDataAsync(syncRequest);
  282. if (!result)
  283. {
  284. Logger.WriteLineWarn($"NotificationService SyncToMasterAsync failed, syncType:{syncType.ToString()}");
  285. }
  286. }
  287. }
  288. catch (Exception ex)
  289. {
  290. Logger.WriteLineError($"NotificationService SyncToMasterAsync err, syncType:{syncType.ToString()}, err: {ex}");
  291. }
  292. }
  293. public async Task<string> OpenNotifyQueueAsync(OpenNotifyQueueRequest request)
  294. {
  295. var msgQueueId = _notificationServer.OpenNotifyQueueAsync(request.Module);
  296. Logger.WriteLineInfo($"NotificationService OpenNotifyQueueAsync, module:{request.Module}, msgQueueId:{msgQueueId}");
  297. return msgQueueId;
  298. }
  299. public async Task<bool> CloseNotifyQueueAsync(CloseNotifyQueueRequest request)
  300. {
  301. var result = _notificationServer.CloseNotifyQueueAsync(request.MsgQueueId);
  302. Logger.WriteLineInfo($"NotificationService CloseNotifyQueueAsync, msgQueueId:{request.MsgQueueId}");
  303. return result;
  304. }
  305. /// <summary>
  306. /// 移除websocket用户
  307. /// </summary>
  308. /// <param name="request">请求实体</param>
  309. /// <returns>true</returns>
  310. public async Task<bool> RemoveToken(TokenRequest request)
  311. {
  312. var result = await _notificationServer.RemoveToken(request.Token);
  313. return result;
  314. }
  315. /// <summary>
  316. /// 移除指定单个websocket连接
  317. /// </summary>
  318. /// <param name="request">请求实体</param>
  319. /// <returns>true</returns>
  320. public async Task<bool> RemoveSingleTokenAsync(TokenRequest request)
  321. {
  322. var result = await _notificationServer.RemoveSingleTokenAsync(request.Token);
  323. return result;
  324. }
  325. /// <summary>
  326. /// 新增消息列表接口
  327. /// </summary>
  328. /// <param name="request">请求消息实体</param>
  329. /// <returns>true</returns>
  330. private async Task<bool> AddMessageInfoAsync(SendNotificationRequest request, BroadcastNotificationRequest broadcastRequest = null)
  331. {
  332. try
  333. {
  334. if (request == null && broadcastRequest == null)
  335. {
  336. ThrowCustomerException(CustomerRpcCode.NotificationParamError, "Notification param error");
  337. }
  338. AddMessagesDBRequest req = null;
  339. if (request != null)
  340. {
  341. IList<TokenDTO> tokens;
  342. if (request.Tokens != null && request.Tokens.Count > 0)
  343. {
  344. tokens = request.Tokens;
  345. }
  346. else
  347. {
  348. tokens = await _authenticationService.GetTokensWithClientIdAsync(new GetTokensWithClientIdRequest() { ClientId = request.ClientId });
  349. }
  350. var tokenInfo = tokens.FirstOrDefault() ?? new TokenDTO();
  351. string accountName = "";
  352. if (string.IsNullOrEmpty(tokenInfo.ClientId))
  353. {
  354. if (broadcastRequest.ReceiverType == ApplicantTypeEnum.Client)
  355. {
  356. var userList = CacheMaintenance.Instance.Get<IUserInfoManager>().Where(x => x.Code == request.ClientId)?.ToList() ?? new List<CacheUserDTO>();
  357. if (userList.Count > 0)
  358. {
  359. var userInfoDTO = userList.FirstOrDefault() ?? new CacheUserDTO();
  360. if (!string.IsNullOrEmpty(userInfoDTO.FullName))
  361. {
  362. accountName = userInfoDTO.FullName;
  363. }
  364. else
  365. {
  366. accountName = userInfoDTO.UserName;
  367. }
  368. }
  369. }
  370. else if (broadcastRequest.ReceiverType == ApplicantTypeEnum.Device)
  371. {
  372. var deviceList = CacheMaintenance.Instance.Get<IDeviceInfosManager>().Where(x => x.Code == request.ClientId)?.ToList() ?? new List<CacheDeviceDTO>();
  373. if (deviceList.Count > 0)
  374. {
  375. var deviceInfoDTO = deviceList.FirstOrDefault() ?? new CacheDeviceDTO();
  376. accountName = deviceInfoDTO.Name;
  377. }
  378. }
  379. }
  380. else
  381. {
  382. accountName = tokenInfo.AccountName;
  383. }
  384. var clientInfos = new List<ClientInfoDTO>()
  385. {
  386. new ClientInfoDTO() {
  387. ClientId = request.ClientId,
  388. Name = accountName,
  389. IsReaded = false
  390. }
  391. };
  392. JObject jsonObj = JObject.Parse(request.JsonMessage);
  393. req = new AddMessagesDBRequest()
  394. {
  395. Code = jsonObj["Code"].ToString(),
  396. NotificationType = request.NotificationType,
  397. Content = request.JsonMessage,
  398. ServerHost = ConfigurationManager.Host,
  399. NotifyTime = DateTime.UtcNow,
  400. ReceiverType = request.ReceiverType,
  401. TransactionType = request.TransactionType,
  402. RelevanceCode = request.RelevanceCode,
  403. ClientInfos = clientInfos
  404. };
  405. }
  406. else
  407. {
  408. var clientInfos = new List<ClientInfoDTO>();
  409. if (broadcastRequest.ClientIds?.Count > 0)
  410. {
  411. foreach (var clientId in broadcastRequest.ClientIds)
  412. {
  413. var curTokens = await _authenticationService.GetTokensWithClientIdAsync(new GetTokensWithClientIdRequest() { ClientId = clientId });
  414. var curTokenInfo = curTokens.FirstOrDefault() ?? new TokenDTO();
  415. string accountName = "";
  416. if (string.IsNullOrEmpty(curTokenInfo.ClientId))
  417. {
  418. if (broadcastRequest.ReceiverType == ApplicantTypeEnum.Client)
  419. {
  420. var userList = CacheMaintenance.Instance.Get<IUserInfoManager>().Where(x => x.Code == clientId)?.ToList() ?? new List<CacheUserDTO>();
  421. if (userList.Count > 0)
  422. {
  423. var userInfoDTO = userList.FirstOrDefault() ?? new CacheUserDTO();
  424. if (!string.IsNullOrEmpty(userInfoDTO.FullName))
  425. {
  426. accountName = userInfoDTO.FullName;
  427. }
  428. else
  429. {
  430. accountName = userInfoDTO.UserName;
  431. }
  432. }
  433. }
  434. else if (broadcastRequest.ReceiverType == ApplicantTypeEnum.Device)
  435. {
  436. var deviceList = CacheMaintenance.Instance.Get<IDeviceInfosManager>().Where(x => x.Code == clientId)?.ToList() ?? new List<CacheDeviceDTO>();
  437. if (deviceList.Count > 0)
  438. {
  439. var deviceInfoDTO = deviceList.FirstOrDefault() ?? new CacheDeviceDTO();
  440. accountName = deviceInfoDTO.Name;
  441. }
  442. }
  443. }
  444. else
  445. {
  446. accountName = curTokenInfo.AccountName;
  447. }
  448. var entity = new ClientInfoDTO()
  449. {
  450. ClientId = clientId,
  451. Name = accountName,
  452. IsReaded = false
  453. };
  454. clientInfos.Add(entity);
  455. }
  456. }
  457. JObject jsonObj = JObject.Parse(broadcastRequest.JsonMessage);
  458. req = new AddMessagesDBRequest()
  459. {
  460. Code = jsonObj["Code"].ToString(),
  461. NotificationType = broadcastRequest.NotificationType,
  462. Content = broadcastRequest.JsonMessage,
  463. ServerHost = ConfigurationManager.Host,
  464. NotifyTime = DateTime.UtcNow,
  465. ReceiverType = broadcastRequest.ReceiverType,
  466. TransactionType = broadcastRequest.TransactionType,
  467. RelevanceCode = broadcastRequest.RelevanceCode,
  468. ClientInfos = clientInfos
  469. };
  470. }
  471. var result = await _messageDBService.AddMessageInfoDBAsync(req);
  472. return result;
  473. }
  474. catch (Exception ex)
  475. {
  476. Logger.WriteLineError($"NotificationService AddMessageInfoAsync err, {ex}");
  477. return false;
  478. }
  479. }
  480. /// <summary>
  481. /// 接收到同步信息
  482. /// </summary>
  483. /// <param name="request">The request</param>
  484. /// <returns></returns>
  485. /// <show>false</show>
  486. public async Task<bool> ReceiveSyncServerMessageAsync(SyncReceiveServiceDataRequest request)
  487. {
  488. try
  489. {
  490. Logger.WriteLineInfo($"Notification ReceiveSyncServerMessageAsync, source url:{request.SourceUrl}, syncType:{request.SyncType.ToString()},wsConnectType{request.Notification.WSConnectType}");
  491. SyncNotification message = request.Notification;
  492. var localClients = message.Clients.Where(v => v.LoginServerUrl == ConfigurationManager.Host).ToList();
  493. var localClientIds = localClients.Select(v => v.ClientId).ToList();
  494. var tokens = await _authenticationService.GetTokenWithClientIdsAsync(new GetTokenWithClientIdsRequest() { ClientIds = localClientIds });
  495. var localTokens = tokens.Where(v => v.LoginServer == ConfigurationManager.Host).ToList();
  496. var result = await _notificationServer.PostMessageAsync(localTokens, message.Message, message.WSConnectType);
  497. return result;
  498. }
  499. catch (Exception ex)
  500. {
  501. Logger.WriteLineError($"NotificationService ReceiveSyncServerMessageAsync err, {ex}");
  502. return false;
  503. }
  504. }
  505. }
  506. }