Melon 1 anno fa
parent
commit
6c947732a4

+ 1 - 1
lib/notifications/center.dart

@@ -18,7 +18,7 @@ class WsNotificationCenter extends NotificationCenter {
   }
 
   @override
-  IConnection get connection => _connection!;
+  IConnection? get connection => _connection;
 
   String get host => _host ?? '';
 

+ 52 - 39
lib/notifications/connection.dart

@@ -36,11 +36,9 @@ class WsConnection implements IConnection {
 
   Timer? _timer;
 
-  WsConnection(
-    this.host, {
-    this.exceptionOccurred,
-    this.statusChanged,
-  }) {
+  WsConnection(this.host) {
+    exceptionOccurred = FEventHandler<Exception>();
+    statusChanged = FEventHandler<ConnectionStatus>();
     messageReceived = FEventHandler<dynamic>();
   }
 
@@ -54,55 +52,66 @@ class WsConnection implements IConnection {
   bool get isKeepAlive => _keepAlive;
 
   @override
-  FEventHandler<Exception>? exceptionOccurred;
+  late final FEventHandler<Exception> exceptionOccurred;
 
   @override
   late final FEventHandler<dynamic> messageReceived;
 
   @override
-  FEventHandler<ConnectionStatus>? statusChanged;
+  late final FEventHandler<ConnectionStatus> statusChanged;
 
   @override
   Future<bool> connect() async {
     logger.i("WsConnection is trying connect...");
     _hasError = false;
     _keepAlive = true;
-    final uri = Uri.parse(host);
-    _channel = WebSocketChannel.connect(uri);
-    if (_channel == null) {
-      logger.i("The ws connect failed.");
-      return false;
-    }
-
-    final completer = Completer<bool>();
-    Future.delayed(const Duration(milliseconds: 500), () {
-      if (completer.isCompleted || _hasError) return;
+    try {
+      final uri = Uri.parse(host);
+      _channel = WebSocketChannel.connect(uri);
+
+      final connectResult = await _waitConnectWithTimeout(_channel!);
+      if (!connectResult) {
+        _updateStatus(ConnectionStatus.connectFailed);
+        return false;
+      }
 
       _updateStatus(ConnectionStatus.connected);
-      completer.complete(true);
 
       logger.i("The ws connection is connected.");
       _startCheckConnection();
-    });
 
-    _channel!.stream.listen(
-      (data) {
-        messageReceived.emit(this, data);
-      },
-      onError: (error) {
-        _hasError = true;
-        _handleCancelOnError(error);
-        try {
-          if (!completer.isCompleted) {
-            completer.complete(false);
-          }
-        } catch (e) {}
-      },
-      cancelOnError: true,
-      onDone: () {},
-    );
+      _channel!.stream.listen(
+        (data) {
+          messageReceived.emit(this, data);
+        },
+        onError: (error) {
+          _hasError = true;
+          _handleCancelOnError(error);
+        },
+        cancelOnError: true,
+        onDone: () {},
+      );
+    } catch (e) {
+      logger.e("WsConnection connect error.", e);
+      _updateStatus(ConnectionStatus.connectFailed);
+      _handleCancelOnError(e);
+      return false;
+    }
+
+    return true;
+  }
+
+  Future<bool> _waitConnectWithTimeout(
+    WebSocketChannel channel, {
+    Duration timeLimit = const Duration(seconds: 3),
+  }) async {
+    Future<bool> wrapperFn() async {
+      await channel.ready;
+      return true;
+    }
 
-    return completer.future;
+    final result = await wrapperFn().timeout(timeLimit, onTimeout: () => false);
+    return result;
   }
 
   @override
@@ -123,9 +132,13 @@ class WsConnection implements IConnection {
   }
 
   void _updateStatus(ConnectionStatus value) {
-    if (value != _status) {
-      _status = value;
-      statusChanged?.emit(this, value);
+    try {
+      if (value != _status) {
+        _status = value;
+        statusChanged?.emit(this, value);
+      }
+    } catch (e) {
+      //
     }
   }
 

+ 8 - 6
lib/notifications/core/center.dart

@@ -18,7 +18,7 @@ abstract class NotificationCenter implements IProcessPipeline {
   set isRunning(bool val) => _isRunning = val;
 
   /// 消息连接
-  IConnection get connection;
+  IConnection? get connection;
 
   ///消息处理器供应者
   IHandlerProvider get handlerProvider => _handlerDispatcher;
@@ -29,15 +29,17 @@ abstract class NotificationCenter implements IProcessPipeline {
 
   @override
   void start() async {
-    connection.messageReceived.addListener(_onMessageReceived);
-    connection.connect();
-    isRunning = true;
+    if (connection != null) {
+      connection!.messageReceived.addListener(_onMessageReceived);
+      connection!.connect();
+      isRunning = true;
+    }
   }
 
   @override
   void stop() {
-    connection.messageReceived.removeListener(_onMessageReceived);
-    connection.close();
+    connection?.messageReceived.removeListener(_onMessageReceived);
+    connection?.close();
     isRunning = false;
   }
 

+ 2 - 2
lib/notifications/core/interface/connection.dart

@@ -21,10 +21,10 @@ abstract class IConnection {
   late FEventHandler<dynamic> messageReceived;
 
   /// 发送异常事件
-  FEventHandler<Exception>? exceptionOccurred;
+  late FEventHandler<Exception> exceptionOccurred;
 
   /// 状态变更事件
-  FEventHandler<ConnectionStatus>? statusChanged;
+  late FEventHandler<ConnectionStatus> statusChanged;
 
   /// 连接
   Future<bool> connect();

+ 4 - 4
pubspec.yaml

@@ -14,10 +14,10 @@ dependencies:
     sdk: flutter
 
   fis_common:
-      git:
-        url: http://git.ius.plus:88/Project-Wing/fis_lib_common.git
-        ref: ^1.0.9
-  web_socket_channel: 2.2.0
+    git:
+      url: http://git.ius.plus:88/Project-Wing/fis_lib_common.git
+      ref: ^1.0.9
+  web_socket_channel: 2.3.0
 
 dev_dependencies:
   flutter_test: