소스 검색

主副服务器同步

fly 2 년 전
부모
커밋
4d41c4b282
4개의 변경된 파일201개의 추가작업 그리고 114개의 파일을 삭제
  1. 50 1
      src/InteractionCenter/MasterInteractionCenterService.cs
  2. 63 57
      src/InteractionCenter/MasterInteractionSyncService.cs
  3. 85 54
      src/WingServer.cs
  4. 3 2
      src/appsettings.json

+ 50 - 1
src/InteractionCenter/MasterInteractionCenterService.cs

@@ -4,19 +4,25 @@ using WingInterfaceLibrary.OpLog;
 using System.Threading.Tasks;
 using System.Collections.Generic;
 using WingInterfaceLibrary.Interface.DBInterface;
+using System.Linq;
+using WingServerCommon.Log;
+using CSScriptLib;
+using System.Reflection;
 
 namespace WingCloudServer.InteractionCenter
 {
     public partial class MasterInteractionCenterService : InteractionCenterService, IMasterInteractionCenterService
     {
 
+        private Dictionary<string, string> _serverUrlMap;
+
         public override void Load(JsonRpcClientPool jsonRpcClientPool)
         {
             base.Load(jsonRpcClientPool);
             _opLogDBService = GetProxy<IOpLogDBService>();
             _distributedServerInfoDBService = GetProxy<IDistributedServerInfoDBService>();
             _liveRoomDBService = GetProxy<ILiveRoomDBService>();
-
+            _serverUrlMap = new Dictionary<string, string>();
         }
 
         /// <summary>
@@ -46,11 +52,52 @@ namespace WingCloudServer.InteractionCenter
         /// <returns></returns>
         public async Task<bool> SyncOpLogToMasterAsync(SyncOpLogToMasterRequest request)
         {
+            try
+            {
+                _serverUrlMap.TryAdd(request.SourceUrl, request.ServerID);
+                DynamicAddRemoteService(request.ServerID, request.SourceUrl);
+            }
+            catch (Exception ex)
+            {
+                Logger.WriteLineWarn("TryAdd-AddRemoteService:" + ex.ToString());
+            }
             return await _opLogDBService.SyncOpLogAsync(request);
         }
 
+        public void DynamicAddRemoteService(string serverID, string sourceUrl)
+        {
+            var eval = CSScript.Evaluator.ReferenceDomainAssemblies(DomainAssemblies.AllStaticNonGAC);
+            var str = @"using System;
+                        using System.Threading.Tasks;
+                        using WingServerCommon.Service;
+                        using WingInterfaceLibrary.OpLog;
+                        using System.Collections.Generic;
+                        using JsonRpcLite.Rpc;
+                        using WingInterfaceLibrary.Interface.DBInterface;
+                        using WingCloudServer;
+
+                        public interface IDynamicSlaveService" + serverID + @"
+                        {
+                            Task<bool> DynamicSlaveAsync(SyncReceiveServiceDataRequest request);
+                        }
+
+                        public class DynamicAddRemoteServiceClass
+                        {
+                            public void DynamicAddRemoteMethod(SyncReceiveServiceDataRequest request)
+                            {
+                               WingServer.AddRemoteService<IDynamicSlaveService + " + serverID + @">(IDynamicSlaveService + " + serverID + @", " + sourceUrl + @");
+                            }
+                        }";
+            Assembly compilecode = eval.CompileCode(str);
+            var ps = compilecode.GetType("css_root+DynamicAddRemoteServiceClass");
+            var obj = compilecode.CreateInstance("css_root+DynamicAddRemoteServiceClass");
+            var mes = ps.GetMethod("DynamicAddRemoteMethod");
+            mes.Invoke(obj, new object[] { });
+        }
     }
 
+
+
     public class SlaveInteractionCenterService : InteractionCenterService, ISlaveInteractionCenterService
     {
 
@@ -59,9 +106,11 @@ namespace WingCloudServer.InteractionCenter
             base.Load(jsonRpcClientPool);
             _opLogDBService = GetProxy<IOpLogDBService>();
         }
+
         /// Sychronize Receive ServiceData for slave
         public async Task<bool> SyncReceiveMasterServiceDataAsync(SyncReceiveServiceDataRequest request)
         {
+            Logger.WriteLineInfo("SyncReceiveSlaveServiceDataAsync:" + request.ServiceDataJson + "," + request.ServerID);
             //执行oplogs
             if (request.Oplogs != null && request.Oplogs.Count > 0)
             {

+ 63 - 57
src/InteractionCenter/MasterInteractionSyncService.cs

@@ -1,3 +1,4 @@
+using System;
 using System.Collections.Generic;
 using System.Linq;
 using System.Reflection;
@@ -11,6 +12,8 @@ using WingInterfaceLibrary.OpLog;
 using WingInterfaceLibrary.Request.DBRequest;
 using WingInterfaceLibrary.Result;
 using WingServerCommon.Config;
+using WingServerCommon.Config.Parameters;
+using WingServerCommon.Log;
 using WingServerCommon.Service;
 
 namespace WingCloudServer.InteractionCenter
@@ -18,66 +21,62 @@ namespace WingCloudServer.InteractionCenter
     public partial class MasterInteractionCenterService : IMasterInteractionCenterService
     {
 
+        private string _serverID = ConfigurationManager.GetParammeter<StringParameter>("General", "ServerID").Value;
+
         /// <summary>
-        /// 实时同步业务数据到主服务器
+        /// 主服务同步数据到副服务器
         /// </summary>
         /// <param name="request"></param>
         /// <returns></returns>
         public async Task<bool> SyncReceiveSlaveServiceDataAsync(SyncReceiveServiceDataRequest request)
         {
-
-            // //todo 判断是当前服务器并且是主服务器 则只需要发送oplogs到其他副服务器
-
-            // var localServerUrl = ConfigurationManager.Host;
-            // var baseLiveConsultation = JsonConvert.DeserializeObject<BaseLiveConsultationJson>(request.ServiceDataJson);
-            // // 调用manager
-
-            // // 判断是否存在其他服务器的用户
-            // var getLiveRoomByCodeDBRequest = new GetLiveRoomByCodeDBRequest();
-            // getLiveRoomByCodeDBRequest.LiveRoomCode = baseLiveConsultation.RoomCode;
-            // var roomDTO = await _liveRoomDBService.GetLiveRoomByCodeAsync(getLiveRoomByCodeDBRequest);
-            // var users = roomDTO.UserInfos;
-            // //操作人所在服务器
-            // var operatorUser = users.FirstOrDefault(x => x.Code == baseLiveConsultation.OperatorCode);
-            // //连接其他副服务器的用户
-            // var usersConnectOtherSlave = users.Where(x => x.LoginServerHost != operatorUser.LoginServerHost && x.LoginServerHost != localServerUrl);
-            // if (usersConnectOtherSlave != null && usersConnectOtherSlave.Count() > 0)
-            // {
-            //     //获取oplogs
-            //     var getOpLogsByCodesFormMasterRequest = new GetOpLogsByCodesFormMasterRequest();
-            //     getOpLogsByCodesFormMasterRequest.Codes = new List<string> { baseLiveConsultation.ConsultationRecordCode, baseLiveConsultation.RoomCode };
-            //     var oplogs = await _opLogDBService.GetOpLogsByCodesAsync(getOpLogsByCodesFormMasterRequest);
-            //     //通知副服务器
-            //     var otherSlaves = usersConnectOtherSlave.Select(x => x.LoginServerHost).Distinct();
-            //     if (otherSlaves.Count() > 0)
-            //     {
-            //         //动态注册副服务器服务
-            //         var otherSlaveHosts = new List<string>();
-            //         foreach (var item in otherSlaves)
-            //         {
-            //             var slaveHost = item.Replace("http:", "").Replace(".", "").Replace("/", "");
-            //             DynamicSlaveService(request, slaveHost);
-            //         }
-
-            //     }
-            // }
-            // return true;
-
-            //获取oplogs
-
-            //通知副服务器
-
-
-            //动态注册副服务器服务
-            var otherSlaveHosts = new List<string>();
-            var slaveHost = "127001";
-            DynamicSlaveService(request, slaveHost);
-
+            Logger.WriteLineInfo("SyncReceiveSlaveServiceDataAsync:"+request.ServiceDataJson+","+request.ServerID);
+            //判断是当前服务器并且是主服务器 则只需要发送oplogs到其他副服务器
+            if (string.IsNullOrWhiteSpace(request.ServerID))
+            {
+                //调用manager
+            }
+            var localServerUrl = ConfigurationManager.Host;
+            var baseLiveConsultation = JsonConvert.DeserializeObject<BaseLiveConsultationJson>(request.ServiceDataJson);
+            // 判断是否存在其他服务器的用户
+            var getLiveRoomByCodeDBRequest = new GetLiveRoomByCodeDBRequest();
+            getLiveRoomByCodeDBRequest.LiveRoomCode = baseLiveConsultation.RoomCode;
+            var roomDTO = await _liveRoomDBService.GetLiveRoomByCodeAsync(getLiveRoomByCodeDBRequest);
+            var users = roomDTO.UserInfos;
+            //操作人所在服务器
+            var operatorUser = users.FirstOrDefault(x => x.Code == baseLiveConsultation.OperatorCode);
+            //连接其他副服务器的用户
+            var usersConnectOtherSlave = users.Where(x => x.LoginServerHost != operatorUser.LoginServerHost && x.LoginServerHost != localServerUrl);
+            if (usersConnectOtherSlave != null && usersConnectOtherSlave.Count() > 0)
+            {
+                //获取oplogs
+                var getOpLogsByCodesFormMasterRequest = new GetOpLogsByCodesFormMasterRequest();
+                getOpLogsByCodesFormMasterRequest.Codes = new List<string> { baseLiveConsultation.ConsultationRecordCode, baseLiveConsultation.RoomCode };
+                var oplogs = await _opLogDBService.GetOpLogsByCodesAsync(getOpLogsByCodesFormMasterRequest);
+                //通知副服务器
+                var otherSlaves = usersConnectOtherSlave.Select(x => x.LoginServerHost).Distinct();
+                if (otherSlaves.Count() > 0)
+                {
+                    //动态注册副服务器服务
+                    var otherSlaveHosts = new List<string>();
+                    foreach (var item in otherSlaves)
+                    {
+                        try
+                        {
+                            var serverID = _serverUrlMap.FirstOrDefault(x => x.Key == item).Value;
+                            DynamicSlaveService(request, serverID);
+                        }
+                        catch (Exception ex)
+                        {
+                            Logger.WriteLineWarn("SyncReceiveSlaveServiceDataAsync-DynamicSlaveService:" + ex);
+                        }
+                    }
+                }
+            }
             return true;
-
         }
 
-        public void DynamicSlaveService(SyncReceiveServiceDataRequest request, string host)
+        public void DynamicSlaveService(SyncReceiveServiceDataRequest request, string serverID)
         {
             var eval = CSScript.Evaluator.ReferenceDomainAssemblies(DomainAssemblies.AllStaticNonGAC);
             var str = @"using System;
@@ -87,20 +86,27 @@ namespace WingCloudServer.InteractionCenter
                         using System.Collections.Generic;
                         using JsonRpcLite.Rpc;
                         using WingInterfaceLibrary.Interface.DBInterface;
+                        using WingCloudServer;
+
+                        public interface IDynamicSlaveService" + serverID + @"
+                        {
+                            Task<bool> DynamicSlaveAsync(SyncReceiveServiceDataRequest request);
+                        }
 
-                        public class DynamicSlaveServiceClass : JsonRpcService
+                        public class DynamicSlaveServiceClass
                         {
-                            public void DynamicSlaveMethod(SyncReceiveServiceDataRequest request, IEnumerable<string> otherSlaveHosts)
+                            public void DynamicSlaveMethod(SyncReceiveServiceDataRequest request)
                             {
-                                var _dynamicService = GetProxy<IDynamicSlaveService" + host + @">();
-                                _dynamicService.DynamicSlaveAsync(request);
+                                var _dynamicService = WingServer.WingServerGetProxy(IDynamicSlaveService" + serverID + @");
+
+                                (_dynamicService as IDynamicSlaveService" + serverID + @").DynamicSlaveAsync(request);
                             }
                         }";
             Assembly compilecode = eval.CompileCode(str);
             var ps = compilecode.GetType("css_root+DynamicSlaveServiceClass");
             var obj = compilecode.CreateInstance("css_root+DynamicSlaveServiceClass");
             var mes = ps.GetMethod("DynamicSlaveMethod");
-            mes.Invoke(obj, new object[] { request, host });
+            mes.Invoke(obj, new object[] { request });
         }
     }
-}
+}

+ 85 - 54
src/WingServer.cs

@@ -33,6 +33,9 @@ namespace WingCloudServer
         private JsonRpcInProcessEngine _inProcessEngine;
         private JsonRpcHttpServerEngine _jsonRpcHttpServerEngine;
         private Dictionary<Type, object> _rpcServices;
+        private static JsonRpcClientPool _rpcClientPool;
+        private static Dictionary<string, object> _remoteServices;
+
 
         public WingServer(string host)
         {
@@ -75,13 +78,56 @@ namespace WingCloudServer
             _rpcInProcessServer.Stop();
         }
 
+        /// <summary>
+        /// 动态添加远程服务
+        /// </summary>
+        /// <param name="serviceName"></param>
+        /// <param name="url"></param>
+        internal static void AddRemoteService<T>(string serviceName, string url)
+        {
+            try
+            {
+                if (_remoteServices.Any(x => x.Key == serviceName))
+                {
+                    return;
+                }
+                var serviceInfo = new RemoteServiceInfo()
+                {
+                    ServiceName = serviceName,
+                    Url = url
+                };
+                _rpcClientPool.Add(serviceInfo);
+
+                var service = _rpcClientPool.GetJsonRpcClient<T>().CreateProxy<T>();
+                _remoteServices.TryAdd(serviceName, service);
+            }
+            catch (Exception ex)
+            {
+                Logger.WriteLineWarn("AddService err" + ex);
+            }
+        }
+
+        internal static object WingServerGetProxy(string serviceName)
+        {
+            try
+            {
+                var service = _remoteServices.FirstOrDefault(x => x.Key == serviceName);
+                return service.Value;
+            }
+            catch (Exception ex)
+            {
+                Logger.WriteLineWarn("WingServerGetProxy err" + ex);
+            }
+            return null;
+        }
+
         void InitializeServices()
         {
             var folder = Path.Combine(AppDomain.CurrentDomain.BaseDirectory);
             //initiaize rpc client pool
             var remoteRpcHttpServices = LoadRemoteServiceConfig(folder);//new RemoteServiceInfo[0]; 
-            var rpcClientPool = new JsonRpcClientPool(_inProcessEngine);
-            rpcClientPool.Initialize(remoteRpcHttpServices.ToArray());
+            _rpcClientPool = new JsonRpcClientPool(_inProcessEngine);
+            _rpcClientPool.Initialize(remoteRpcHttpServices.ToArray());
             var inProcessServicesString = ConfigurationManager.GetParammeter<StringParameter>("Services", "InProcess").Value;
             var inProcessServices = inProcessServicesString.Split(',');
             var rpcHttpServicesString = ConfigurationManager.GetParammeter<StringParameter>("Services", "JsonRpcHttp").Value;
@@ -116,8 +162,6 @@ namespace WingCloudServer
                 }
             }
 
-            //TestCSScript(rpcClientPool);
-
             //Rpc http service load and register to rpc http server
             if (ConfigurationManager.IsDistributed)
             {
@@ -125,12 +169,12 @@ namespace WingCloudServer
                 {
                     var masterInteractionCenterService = new MasterInteractionCenterService();
                     _rpcHttpServer.RegisterService(typeof(IMasterInteractionCenterService), masterInteractionCenterService);
-                    masterInteractionCenterService.Load(rpcClientPool);
-
+                    masterInteractionCenterService.Load(_rpcClientPool);
                 }
                 else
                 {
                     //TODO determin if we need to assess slave server on master server
+                    RegisterDynamicSlaveInteractionCenterService(_rpcClientPool);
                 }
             }
 
@@ -167,95 +211,82 @@ namespace WingCloudServer
             foreach (var service in _rpcServices)
             {
                 var method = service.Key.GetMethod("Load");
-                method.Invoke(service.Value, new object[] { rpcClientPool });
+                method.Invoke(service.Value, new object[] { _rpcClientPool });
             }
 
-            //load service if distrubuted system
-            if (ConfigurationManager.IsDistributed)
-            {
-                if (ConfigurationManager.IsMaster)
-                {
-                    //_rpcHttpServer.RegisterService(IMaster); TODO
-                }
-                else
-                {
-                    //TODO
-                }
-            }
             //最快服务器
             var vinnoServerService = new VinnoServerService();
             _rpcHttpServer.RegisterService(typeof(WingInterfaceLibrary.Interface.IVinnoServerService), vinnoServerService);
-            vinnoServerService.Load(rpcClientPool);
+            vinnoServerService.Load(_rpcClientPool);
 
             //plugin 
             _tokenVerifyPlugin = new TokenVerifyPlugin();
             _rpcHttpServer.RegisterService(typeof(ITokenVerifyPlugin), _tokenVerifyPlugin);
-            _tokenVerifyPlugin.Load(rpcClientPool);
+            _tokenVerifyPlugin.Load(_rpcClientPool);
             _ipAddressPlugin = new IPAddressPlugin();
             _rpcHttpServer.RegisterService(typeof(IIPAddressPlugin), _ipAddressPlugin);
-            _ipAddressPlugin.Load(rpcClientPool);
+            _ipAddressPlugin.Load(_rpcClientPool);
             _serverListPlugin = new ServerListPlugin();
             _rpcHttpServer.RegisterService(typeof(IServerListPlugin), _serverListPlugin);
-            _serverListPlugin.Load(rpcClientPool);
+            _serverListPlugin.Load(_rpcClientPool);
 
         }
 
-        public void TestCSScript(JsonRpcClientPool rpcClientPool)
+        /// <summary>
+        /// 副服务器动态服务
+        /// </summary>
+        /// <param name="rpcClientPool"></param>
+        public void RegisterDynamicSlaveInteractionCenterService(JsonRpcClientPool rpcClientPool)
         {
-            //接口
-                    var eval = CSScript.Evaluator.ReferenceDomainAssemblies(DomainAssemblies.AllStaticNonGAC);
-                    var host = ConfigurationManager.Host.Replace("http:", "").Replace(".", "").Replace("/", "");
-                    var str = @"
+            var serverID = ConfigurationManager.GetParammeter<StringParameter>("General", "ServerID").Value;
+            //先注册 下面要调用
+            var slaveInteractionCenterService = new SlaveInteractionCenterService();
+            _rpcHttpServer.RegisterService(typeof(ISlaveInteractionCenterService), slaveInteractionCenterService);
+            slaveInteractionCenterService.Load(rpcClientPool);
+
+            var eval = CSScript.Evaluator.ReferenceDomainAssemblies(DomainAssemblies.AllStaticNonGAC);
+            var str = @"
                     using System;
                     using System.Threading.Tasks;
-                    using System;
                     using WingServerCommon.Service;
                     using WingInterfaceLibrary.OpLog;
-                    using System.Threading.Tasks;
                     using System.Collections.Generic;
                     using JsonRpcLite.Rpc;
                     using WingInterfaceLibrary.Interface.DBInterface;
                     using WingInterfaceLibrary.DB.Request;
                                      
-                    public interface ICSScriptTest"+host+@"
+                    public interface IDynamicSlaveService" + serverID + @"
                     {
-                        Task<bool> CSScriptTestAsync();
+                        Task<bool> DynamicSlaveAsync(SyncReceiveServiceDataRequest request);
                     }
-                    public class CSScriptTestService : JsonRpcService,ICSScriptTest"+host+@"
+                    public class DynamicSlaveService : JsonRpcService,IDynamicSlaveService" + serverID + @"
                     {
-                        protected IOpLogDBService OpLogDBService;
-                        protected IDeviceInfoDBService deviceInfoDBService;
+                        protected ISlaveInteractionCenterService _slaveInteractionCenterService;
                         public override void Load(JsonRpcClientPool jsonRpcClientPool)
                         {
                             base.Load(jsonRpcClientPool);
-                            OpLogDBService = GetProxy<IOpLogDBService>();
-                            deviceInfoDBService = GetProxy<IDeviceInfoDBService>();
+                            _slaveInteractionCenterService = GetProxy<ISlaveInteractionCenterService>();
                         }
-                            public async Task<bool> CSScriptTestAsync()
+                        public async Task<bool> DynamicSlaveAsync(SyncReceiveServiceDataRequest request)
                         {
-                            var res = deviceInfoDBService.FindDictionaryItemsAsync(new FindDictionaryItemsDBRequest()).Result;
-                            Console.WriteLine(""flytest""+res.Count);
+                            await _slaveInteractionCenterService.SyncReceiveMasterServiceDataAsync(request);
                             return true;
                         }
                     }
-                    public class Test
+                    public class DynamicMethodClass
                     {
-                        public void Test1(JsonRpcClientPool rpcClientPool,JsonRpcServer _rpcHttpServer){
-                            var csScriptTestService = new CSScriptTestService();
-                            _rpcHttpServer.RegisterService(typeof(ICSScriptTest"+host+@"), csScriptTestService);
-                            csScriptTestService.Load(rpcClientPool);
+                        public void DynamicMethod(JsonRpcClientPool rpcClientPool,JsonRpcServer _rpcHttpServer){
+                            var DynamicSlaveService = new DynamicSlaveService();
+                            _rpcHttpServer.RegisterService(typeof(IDynamicSlaveService" + serverID + @"), DynamicSlaveService);
+                            DynamicSlaveService.Load(rpcClientPool);
                         }
                     }
                     ";
-
-                    Assembly compilecode = eval.CompileCode(str);
-
-
-
-                    var ps = compilecode.GetType("css_root+Test");
-                    var obj = compilecode.CreateInstance("css_root+Test");
-                    var mes = ps.GetMethod("Test1");
-                    mes.Invoke(obj, new object[] { rpcClientPool, _rpcHttpServer });
+            Assembly compilecode = eval.CompileCode(str);
+            var ps = compilecode.GetType("css_root+DynamicMethodClass");
+            var obj = compilecode.CreateInstance("css_root+DynamicMethodClass");
+            var mes = ps.GetMethod("DynamicMethod");
+            mes.Invoke(obj, new object[] { rpcClientPool, _rpcHttpServer });
         }
 
         void LoadDataAfterRegister()

+ 3 - 2
src/appsettings.json

@@ -3,7 +3,8 @@
     "Version": "1.0.0.0",
     "IsDistributed": false,
     "IsMaster": false,
-    "MasterUrl": ""
+    "MasterUrl": "",
+    "ServerID":""
   },
   "Gateway": {
     "Host": "http://127.0.0.1/",
@@ -109,6 +110,6 @@
     "RTCBizid":71678,
     "RTCUseSSL":false,
     "RTCPlayDomain":"liveplay.fis.plus",
-    "RTCInitRoomId":0
+    "RTCInitRoomId":21000
   }
 }