diff --git a/lib/src/network/parse_live_query.dart b/lib/src/network/parse_live_query.dart index 0c33147d6..bd569ca69 100644 --- a/lib/src/network/parse_live_query.dart +++ b/lib/src/network/parse_live_query.dart @@ -2,13 +2,38 @@ import 'dart:convert'; import 'dart:io'; import 'package:web_socket_channel/io.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; +import 'package:connectivity/connectivity.dart'; import '../../parse_server_sdk.dart'; enum LiveQueryEvent { create, enter, update, leave, delete, error } -class LiveQuery { - LiveQuery({bool debug, ParseHTTPClient client, bool autoSendSessionId}) { +const String _printConstLiveQuery = 'LiveQuery: '; + +class Subscription { + Subscription(this.query, this.requestId); + QueryBuilder query; + int requestId; + bool _enabled = false; + final List _liveQueryEvent = [ + 'create', + 'enter', + 'update', + 'leave', + 'delete', + 'error' + ]; + Map eventCallbacks = {}; + void on(LiveQueryEvent op, Function callback) { + eventCallbacks[_liveQueryEvent[op.index]] = callback; + } +} + +class Client { + factory Client() => _getInstance(); + Client._internal( + {bool debug, ParseHTTPClient client, bool autoSendSessionId}) { _client = client ?? ParseHTTPClient( sendSessionId: @@ -18,57 +43,114 @@ class LiveQuery { _debug = isDebugEnabled(objectLevelDebug: debug); _sendSessionId = autoSendSessionId ?? ParseCoreData().autoSendSessionId ?? true; + _liveQueryURL = _client.data.liveQueryURL; + if (_liveQueryURL.contains('https')) { + _liveQueryURL = _liveQueryURL.replaceAll('https', 'wss'); + } else if (_liveQueryURL.contains('http')) { + _liveQueryURL = _liveQueryURL.replaceAll('http', 'ws'); + } + Connectivity() + .onConnectivityChanged + .listen((ConnectivityResult connectivityResult) { + print('onConnectivityChanged:$connectivityResult'); + if (connectivityResult != ConnectivityResult.none) { + reconnect(); + } + }); + } + static Client get instance => _getInstance(); + static Client _instance; + static Client _getInstance( + {bool debug, ParseHTTPClient client, bool autoSendSessionId}) { + _instance ??= Client._internal( + debug: debug, client: client, autoSendSessionId: autoSendSessionId); + return _instance; } WebSocket _webSocket; ParseHTTPClient _client; bool _debug; bool _sendSessionId; - IOWebSocketChannel _channel; - Map _connectMessage; - Map _subscribeMessage; - Map _unsubscribeMessage; - Map eventCallbacks = {}; - int _requestIdCount = 1; - final List _liveQueryEvent = [ - 'create', - 'enter', - 'update', - 'leave', - 'delete', - 'error' - ]; - final String _printConstLiveQuery = 'LiveQuery: '; + WebSocketChannel _channel; + String _liveQueryURL; + bool _userDisconnected = false; + bool _connecting = false; - int _requestIdGenerator() { - return _requestIdCount++; + final Map _requestSubScription = {}; + + Future reconnect() async { + await _connect(); + _connectLiveQuery(); } - // ignore: always_specify_types - Future subscribe(QueryBuilder query) async { - String _liveQueryURL = _client.data.liveQueryURL; - if (_liveQueryURL.contains('https')) { - _liveQueryURL = _liveQueryURL.replaceAll('https', 'wss'); - } else if (_liveQueryURL.contains('http')) { - _liveQueryURL = _liveQueryURL.replaceAll('http', 'ws'); + Future disconnect() async { + if (_webSocket != null && _webSocket.readyState == WebSocket.open) { + if (_debug) { + print('$_printConstLiveQuery: Socket closed'); + } + await _webSocket.close(); + _webSocket = null; + } + if (_channel != null && _channel.sink != null) { + if (_debug) { + print('$_printConstLiveQuery: close'); + } + await _channel.sink.close(); + _channel = null; } + _requestSubScription.values.toList().forEach((Subscription subcription) { + subcription._enabled = false; + }); + _userDisconnected = true; + _connecting = false; + } - final String _className = query.object.parseClassName; - final List keysToReturn = query.limiters['keys']?.split(','); - query.limiters.clear(); //Remove limits in LiveQuery - final String _where = query.buildQuery().replaceAll('where=', ''); + Future subscribe(QueryBuilder query) async { + if (_webSocket == null) { + await reconnect(); + } + final int requestId = _requestIdGenerator(); + final Subscription subscription = Subscription(query, requestId); + _requestSubScription[requestId] = subscription; + //After a client connects to the LiveQuery server, + //it can send a subscribe message to subscribe a ParseQuery. + _subscribeLiveQuery(subscription); + return subscription; + } - //Convert where condition to Map - Map _whereMap = Map(); - if (_where != '') { - _whereMap = json.decode(_where); + void unSubscribe(Subscription subscription) { + //Mount message for Unsubscribe + final Map unsubscribeMessage = { + 'op': 'unsubscribe', + 'requestId': subscription.requestId, + }; + if (_channel != null && _channel.sink != null) { + if (_debug) { + print('$_printConstLiveQuery: UnsubscribeMessage: $unsubscribeMessage'); + } + _channel.sink.add(jsonEncode(unsubscribeMessage)); + subscription._enabled = false; + _requestSubScription.remove(subscription.requestId); } + } - final int requestId = _requestIdGenerator(); + static int _requestIdCount = 1; + + int _requestIdGenerator() { + return _requestIdCount++; + } + + Future _connect() async { + if (_connecting) { + print('already connecting'); + return Future.value(null); + } + await disconnect(); + _connecting = true; + _userDisconnected = false; try { _webSocket = await WebSocket.connect(_liveQueryURL); - if (_webSocket != null && _webSocket.readyState == WebSocket.open) { if (_debug) { print('$_printConstLiveQuery: Socket opened'); @@ -76,121 +158,175 @@ class LiveQuery { } else { if (_debug) { print('$_printConstLiveQuery: Error when connection client'); - return Future.value(null); } + return Future.value(null); } - _channel = IOWebSocketChannel(_webSocket); _channel.stream.listen((dynamic message) { - if (_debug) { - print('$_printConstLiveQuery: Listen: $message'); - } - - final Map actionData = jsonDecode(message); - - if (eventCallbacks.containsKey(actionData['op'])) { - if (actionData.containsKey('object')) { - final Map map = actionData['object']; - final String className = map['className']; - if (className == '_User') { - eventCallbacks[actionData['op']]( - ParseUser(null, null, null).fromJson(map)); - } else { - eventCallbacks[actionData['op']]( - ParseObject(className).fromJson(map)); - } - } else { - eventCallbacks[actionData['op']](actionData); - } - } + _handleMessage(message); }, onDone: () { + if (!_userDisconnected) { + reconnect(); + } if (_debug) { print('$_printConstLiveQuery: Done'); } }, onError: (Object error) { + if (!_userDisconnected) { + reconnect(); + } if (_debug) { print( '$_printConstLiveQuery: Error: ${error.runtimeType.toString()}'); } - return Future.value(handleException( - Exception(error), ParseApiRQ.liveQuery, _debug, _className)); + return Future.value(handleException(Exception(error), + ParseApiRQ.liveQuery, _debug, 'IOWebSocketChannel')); }); - - //The connect message is sent from a client to the LiveQuery server. - //It should be the first message sent from a client after the WebSocket connection is established. - _connectMessage = { - 'op': 'connect', - 'applicationId': _client.data.applicationId - }; - if (_sendSessionId && _client.data.sessionId != null) { - _connectMessage['sessionToken'] = _client.data.sessionId; + } on Exception catch (e) { + if (_debug) { + print('$_printConstLiveQuery: Error: ${e.toString()}'); } + return handleException(e, ParseApiRQ.liveQuery, _debug, 'LiveQuery'); + } + } - if (_client.data.clientKey != null) - _connectMessage['clientKey'] = _client.data.clientKey; - if (_client.data.masterKey != null) - _connectMessage['masterKey'] = _client.data.masterKey; + void _connectLiveQuery() { + if (_channel == null || _channel.sink == null) { + return; + } + //The connect message is sent from a client to the LiveQuery server. + //It should be the first message sent from a client after the WebSocket connection is established. + final Map connectMessage = { + 'op': 'connect', + 'applicationId': _client.data.applicationId + }; - if (_debug) { - print('$_printConstLiveQuery: ConnectMessage: $_connectMessage'); - } - _channel.sink.add(jsonEncode(_connectMessage)); - - //After a client connects to the LiveQuery server, - //it can send a subscribe message to subscribe a ParseQuery. - _subscribeMessage = { - 'op': 'subscribe', - 'requestId': requestId, - 'query': { - 'className': _className, - 'where': _whereMap, - if (keysToReturn != null && keysToReturn.isNotEmpty) - 'fields': keysToReturn - } - }; - if (_sendSessionId && _client.data.sessionId != null) { - _subscribeMessage['sessionToken'] = _client.data.sessionId; - } + if (_sendSessionId && _client.data.sessionId != null) { + connectMessage['sessionToken'] = _client.data.sessionId; + } - if (_debug) { - print('$_printConstLiveQuery: SubscribeMessage: $_subscribeMessage'); - } + if (_client.data.clientKey != null) + connectMessage['clientKey'] = _client.data.clientKey; + if (_client.data.masterKey != null) + connectMessage['masterKey'] = _client.data.masterKey; - _channel.sink.add(jsonEncode(_subscribeMessage)); + if (_debug) { + print('$_printConstLiveQuery: ConnectMessage: $connectMessage'); + } + _channel.sink.add(jsonEncode(connectMessage)); + } - //Mount message for Unsubscribe - _unsubscribeMessage = { - 'op': 'unsubscribe', - 'requestId': requestId, - }; - } on Exception catch (e) { - if (_debug) { - print('$_printConstLiveQuery: Error: ${e.toString()}'); + void _subscribeLiveQuery(Subscription subscription) { + if (subscription._enabled) { + return; + } + subscription._enabled = true; + QueryBuilder query = subscription.query; + final List keysToReturn = query.limiters['keys']?.split(','); + query.limiters.clear(); //Remove limits in LiveQuery + final String _where = query.buildQuery().replaceAll('where=', ''); + + //Convert where condition to Map + Map _whereMap = Map(); + if (_where != '') { + _whereMap = json.decode(_where); + } + + final Map subscribeMessage = { + 'op': 'subscribe', + 'requestId': subscription.requestId, + 'query': { + 'className': query.object.parseClassName, + 'where': _whereMap, + if (keysToReturn != null && keysToReturn.isNotEmpty) + 'fields': keysToReturn } - return handleException(e, ParseApiRQ.liveQuery, _debug, _className); + }; + if (_sendSessionId && _client.data.sessionId != null) { + subscribeMessage['sessionToken'] = _client.data.sessionId; } - } - void on(LiveQueryEvent op, Function callback) { - eventCallbacks[_liveQueryEvent[op.index]] = callback; + if (_debug) { + print('$_printConstLiveQuery: SubscribeMessage: $subscribeMessage'); + } + + _channel.sink.add(jsonEncode(subscribeMessage)); } - Future unSubscribe() async { - if (_channel != null) { - if (_channel.sink != null) { - if (_debug) { - print( - '$_printConstLiveQuery: UnsubscribeMessage: $_unsubscribeMessage'); - } - _channel.sink.add(jsonEncode(_unsubscribeMessage)); - await _channel.sink.close(); - } + void _handleMessage(String message) { + if (_debug) { + print('$_printConstLiveQuery: Listen: $message'); } - if (_webSocket != null && _webSocket.readyState == WebSocket.open) { - if (_debug) { - print('$_printConstLiveQuery: Socket closed'); + + final Map actionData = jsonDecode(message); + Subscription subscription; + if (actionData.containsKey('op') && actionData['op'] == 'connected') { + _connecting = false; + print('ReSubScription:$_requestSubScription'); + _requestSubScription.values.toList().forEach((Subscription subcription) { + _subscribeLiveQuery(subcription); + }); + return; + } + if (actionData.containsKey('requestId')) { + subscription = _requestSubScription[actionData['requestId']]; + } + if (subscription == null) { + return; + } + if (subscription.eventCallbacks.containsKey(actionData['op'])) { + if (actionData.containsKey('object')) { + final Map map = actionData['object']; + final String className = map['className']; + if (className == '_User') { + subscription.eventCallbacks[actionData['op']]( + ParseUser(null, null, null).fromJson(map)); + } else { + subscription.eventCallbacks[actionData['op']]( + ParseObject(className).fromJson(map)); + } + } else { + subscription.eventCallbacks[actionData['op']](actionData); } - await _webSocket.close(); } } } + +class LiveQuery { + LiveQuery({bool debug, ParseHTTPClient client, bool autoSendSessionId}) { + _client = client ?? + ParseHTTPClient( + sendSessionId: + autoSendSessionId ?? ParseCoreData().autoSendSessionId, + securityContext: ParseCoreData().securityContext); + + _debug = isDebugEnabled(objectLevelDebug: debug); + _sendSessionId = + autoSendSessionId ?? ParseCoreData().autoSendSessionId ?? true; + this.client = Client._getInstance( + client: _client, debug: _debug, autoSendSessionId: _sendSessionId); + } + + ParseHTTPClient _client; + bool _debug; + bool _sendSessionId; + Subscription _latestSubscription; + Client client; + + // ignore: always_specify_types + @deprecated + Future subscribe(QueryBuilder query) async { + _latestSubscription = await client.subscribe(query); + return _latestSubscription; + } + + @deprecated + Future unSubscribe() async { + client.unSubscribe(_latestSubscription); + } + + @deprecated + void on(LiveQueryEvent op, Function callback) { + _latestSubscription.on(op, callback); + } +} diff --git a/pubspec.yaml b/pubspec.yaml index f73e13abe..8c996dd60 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -14,6 +14,7 @@ dependencies: # Networking web_socket_channel: ^1.0.13 + connectivity: ^0.4.4 #Database sembast: ^2.0.1