123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- import 'dart:async';
- import 'package:fis_common/event/event_type.dart';
- import 'package:fis_common/logger/logger.dart';
- import 'package:web_socket_channel/web_socket_channel.dart';
- import 'package:web_socket_channel/status.dart' as statuses;
- import 'core/interface/connection.dart';
- class WsConnectionException implements Exception {
- final dynamic message;
- WsConnectionException(this.message);
- }
- /// WebScoket连接
- class WsConnection implements IConnection {
- /// 错误断开重试次数
- static const C_ERR_RETRY_LIMIT = 5;
- /// 连接请求超时时间,10s
- static const C_CONNECT_REQ_TIMEOUT = 10 * 1000;
- ConnectionStatus _status = ConnectionStatus.connectedClosed;
- int _errorRetryCount = 0;
- bool _keepAlive = false;
- bool _hasError = false;
- /// 主机地址
- final String host;
- WebSocketChannel? _channel;
- Timer? _timer;
- WsConnection(this.host) {
- exceptionOccurred = FEventHandler<Exception>();
- statusChanged = FEventHandler<ConnectionStatus>();
- messageReceived = FEventHandler<dynamic>();
- }
- /// 连接状态
- ConnectionStatus get status => _status;
- /// 是否已连接
- bool get isConnected =>
- _channel != null && status == ConnectionStatus.connected;
- bool get isKeepAlive => _keepAlive;
- @override
- late final FEventHandler<Exception> exceptionOccurred;
- @override
- late final FEventHandler<dynamic> messageReceived;
- @override
- late final FEventHandler<ConnectionStatus> statusChanged;
- @override
- Future<bool> connect() async {
- logger.i("WsConnection is trying connect...");
- _hasError = false;
- _keepAlive = true;
- try {
- final uri = Uri.parse(host);
- _channel = WebSocketChannel.connect(uri);
- final connectResult = await _waitConnectWithTimeout(_channel!);
- if (!connectResult) {
- _updateStatus(ConnectionStatus.connectFailed);
- return false;
- }
- _updateStatus(ConnectionStatus.connected);
- logger.i("The ws connection is connected.");
- _startCheckConnection();
- _channel!.stream.listen(
- (data) {
- messageReceived.emit(this, data);
- },
- onError: (error) {
- _hasError = true;
- _handleCancelOnError(error);
- },
- cancelOnError: true,
- onDone: () {},
- );
- } catch (e) {
- logger.e("WsConnection connect error.", e);
- _updateStatus(ConnectionStatus.connectFailed);
- _handleCancelOnError(e);
- return false;
- }
- return true;
- }
- Future<bool> _waitConnectWithTimeout(
- WebSocketChannel channel, {
- Duration timeLimit = const Duration(seconds: 3),
- }) async {
- Future<bool> wrapperFn() async {
- await channel.ready;
- return true;
- }
- final result = await wrapperFn().timeout(timeLimit, onTimeout: () => false);
- return result;
- }
- @override
- Future<bool> close() async {
- logger.i("WsConnection is closing...");
- _keepAlive = false;
- _stopCheckConnection();
- if (_channel != null) {
- if (_channel!.closeCode == null) {
- await _channel!.sink.close(statuses.normalClosure);
- }
- _channel = null;
- }
- _updateStatus(ConnectionStatus.connectedClosed);
- logger.i("The ws connection is disconnected.");
- return true;
- }
- void _updateStatus(ConnectionStatus value) {
- try {
- if (value != _status) {
- _status = value;
- statusChanged?.emit(this, value);
- }
- } catch (e) {
- //
- }
- }
- void _startCheckConnection() {
- _timer?.cancel();
- _timer = Timer.periodic(
- const Duration(milliseconds: 500),
- (timer) {
- if (isConnected) {
- if (_channel!.closeCode != null) {
- _onDisconnected();
- }
- }
- },
- );
- }
- void _stopCheckConnection() {
- _timer?.cancel();
- _timer = null;
- }
- void _onDisconnected() {
- _updateStatus(ConnectionStatus.connectedClosed);
- _channel = null;
- logger.i("WsConnection done, the ws connection is disconnected.");
- _tryReconnect();
- }
- /// 处理因错误导致的连接中断
- void _handleCancelOnError(error) async {
- exceptionOccurred?.emit(this, WsConnectionException(error));
- _updateStatus(ConnectionStatus.connectedClosed);
- logger.e("WsConnection error, the ws connection is disconnected.", error);
- _tryReconnect();
- }
- Future<void> _tryReconnect() async {
- _stopCheckConnection();
- if (_keepAlive) {
- // 延迟1000ms后尝试再次连接
- await Future.delayed(const Duration(milliseconds: 1000));
- logger.i(
- "WsConnection is now keeping alive and trying to reconnect for $_errorRetryCount time.");
- connect();
- _errorRetryCount++;
- }
- }
- }
|