import 'dart:async'; import 'package:fis_common/event/event_type.dart'; import 'package:fis_common/logger/logger.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; import 'package:web_socket_channel/status.dart' as statuses; import 'core/interface/connection.dart'; class WsConnectionException implements Exception { final dynamic message; WsConnectionException(this.message); } /// WebScoket连接 class WsConnection implements IConnection { /// 错误断开重试次数 static const C_ERR_RETRY_LIMIT = 5; /// 连接请求超时时间,10s static const C_CONNECT_REQ_TIMEOUT = 10 * 1000; ConnectionStatus _status = ConnectionStatus.connectedClosed; int _errorRetryCount = 0; bool _keepAlive = false; bool _hasError = false; /// 主机地址 final String host; WebSocketChannel? _channel; Timer? _timer; WsConnection(this.host) { exceptionOccurred = FEventHandler(); statusChanged = FEventHandler(); messageReceived = FEventHandler(); } /// 连接状态 ConnectionStatus get status => _status; /// 是否已连接 bool get isConnected => _channel != null && status == ConnectionStatus.connected; bool get isKeepAlive => _keepAlive; @override late final FEventHandler exceptionOccurred; @override late final FEventHandler messageReceived; @override late final FEventHandler statusChanged; @override Future connect() async { logger.i("WsConnection is trying connect..."); _hasError = false; _keepAlive = true; try { final uri = Uri.parse(host); _channel = WebSocketChannel.connect(uri); final connectResult = await _waitConnectWithTimeout(_channel!); if (!connectResult) { _updateStatus(ConnectionStatus.connectFailed); return false; } _updateStatus(ConnectionStatus.connected); logger.i("The ws connection is connected."); _startCheckConnection(); _channel!.stream.listen( (data) { messageReceived.emit(this, data); }, onError: (error) { _hasError = true; _handleCancelOnError(error); }, cancelOnError: true, onDone: () {}, ); } catch (e) { logger.e("WsConnection connect error.", e); _updateStatus(ConnectionStatus.connectFailed); _handleCancelOnError(e); return false; } return true; } Future _waitConnectWithTimeout( WebSocketChannel channel, { Duration timeLimit = const Duration(seconds: 3), }) async { Future wrapperFn() async { await channel.ready; return true; } final result = await wrapperFn().timeout(timeLimit, onTimeout: () => false); return result; } @override Future close() async { logger.i("WsConnection is closing..."); _keepAlive = false; _stopCheckConnection(); if (_channel != null) { if (_channel!.closeCode == null) { await _channel!.sink.close(statuses.normalClosure); } _channel = null; } _updateStatus(ConnectionStatus.connectedClosed); logger.i("The ws connection is disconnected."); return true; } void _updateStatus(ConnectionStatus value) { try { if (value != _status) { _status = value; statusChanged?.emit(this, value); } } catch (e) { // } } void _startCheckConnection() { _timer?.cancel(); _timer = Timer.periodic( const Duration(milliseconds: 500), (timer) { if (isConnected) { if (_channel!.closeCode != null) { _onDisconnected(); } } }, ); } void _stopCheckConnection() { _timer?.cancel(); _timer = null; } void _onDisconnected() { _updateStatus(ConnectionStatus.connectedClosed); _channel = null; logger.i("WsConnection done, the ws connection is disconnected."); _tryReconnect(); } /// 处理因错误导致的连接中断 void _handleCancelOnError(error) async { exceptionOccurred?.emit(this, WsConnectionException(error)); _updateStatus(ConnectionStatus.connectedClosed); logger.e("WsConnection error, the ws connection is disconnected.", error); _tryReconnect(); } Future _tryReconnect() async { _stopCheckConnection(); if (_keepAlive) { // 延迟1000ms后尝试再次连接 await Future.delayed(const Duration(milliseconds: 1000)); logger.i( "WsConnection is now keeping alive and trying to reconnect for $_errorRetryCount time."); connect(); _errorRetryCount++; } } }