12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970 |
- using System;
- using System.Collections.Concurrent;
- using WingServerCommon.Log;
- using WingServerCommon.Utilities.Executors;
- namespace WingNotificationModule
- {
- internal partial class NotificationServer
- {
- internal class NotificationQueueManager
- {
- const string DefaultMessageQueueId = "DefaultMessageQueue";
- private readonly ConcurrentDictionary<string, SequenceExecutor<PostMessageParam>> _notifyExecutorCollection = new ConcurrentDictionary<string, SequenceExecutor<PostMessageParam>>();
- private readonly Func<PostMessageParam, bool> _sendMessage;
- private int _maxQueueCount;
- public NotificationQueueManager(int maxQueueCount, Func<PostMessageParam, bool> sendMessage)
- {
- _sendMessage = sendMessage;
- _maxQueueCount = maxQueueCount;
- _notifyExecutorCollection.TryAdd(DefaultMessageQueueId, new SequenceExecutor<PostMessageParam>(DefaultMessageQueueId));
- }
- /// <summary>
- /// Try add a msg pushing queue
- /// </summary>
- public bool TryAdd(string msgQueueId)
- {
- //TODO define some rules
- if (_maxQueueCount <= _notifyExecutorCollection.Count)
- {
- return false;
- }
- return _notifyExecutorCollection.TryAdd(msgQueueId, new SequenceExecutor<PostMessageParam>(msgQueueId));
- }
- /// <summary>
- /// Try remove a msg pushing queue
- /// </summary>
- public bool TryRemove(string msgQueueId)
- {
- //TODO define some rules
- return _notifyExecutorCollection.TryRemove(msgQueueId, out _);
- }
- /// <summary>
- /// Post
- /// </summary>
- /// <param name="msgQueueId"></param>
- /// <param name="messageParam"></param>
- public void Post(string msgQueueId, PostMessageParam messageParam)
- {
- Logger.WriteLineInfo($"Post,msgQueueId:{msgQueueId},Token:{messageParam.Token},WSConnectType:{messageParam.WSConnectType},Message:{messageParam.Message}");
- SequenceExecutor<PostMessageParam> postMsgExecutor = null;
- if (!string.IsNullOrWhiteSpace(msgQueueId) && _notifyExecutorCollection.TryGetValue(msgQueueId, out postMsgExecutor) && postMsgExecutor != null)
- {
- postMsgExecutor.Add(_sendMessage, messageParam);
- Logger.WriteLineInfo($"postMsgExecutor.Add,, WorkerCount:{postMsgExecutor.WorkerCount},GetExecutingStatus:{postMsgExecutor.GetExecutingStatus},IsClosed:{postMsgExecutor.IsClosed},msgQueueId:{msgQueueId},Token:{messageParam.Token},WSConnectType:{messageParam.WSConnectType},Message:{messageParam.Message}");
- }
- else if (_notifyExecutorCollection.TryGetValue(DefaultMessageQueueId, out postMsgExecutor) && postMsgExecutor != null)
- {
- postMsgExecutor.Add(_sendMessage, messageParam);
- Logger.WriteLineInfo($"postMsgExecutor.Add, WorkerCount:{postMsgExecutor.WorkerCount},GetExecutingStatus:{postMsgExecutor.GetExecutingStatus},IsClosed:{postMsgExecutor.IsClosed},msgQueueId:{msgQueueId},Token:{messageParam.Token},WSConnectType:{messageParam.WSConnectType},Message:{messageParam.Message}");
- }
- }
- }
- }
- }
|