@@ -32,9 +32,87 @@ class Subscription {
32
32
}
33
33
}
34
34
35
- enum LiveQueryClientEvent { CONNECTED , DISCONNECTED }
35
+ enum LiveQueryClientEvent { CONNECTED , DISCONNECTED , USER_DISCONNECTED }
36
36
37
- class Client with WidgetsBindingObserver {
37
+ class LiveQueryReconnectingController with WidgetsBindingObserver {
38
+ // -1 means "do not try to reconnect",
39
+ static const List <int > retryInterval = [0 , 500 , 1000 , 2000 , 5000 , 10000 ];
40
+ static const String DEBUG_TAG = 'LiveQueryReconnectingController' ;
41
+
42
+ final Function _reconnect;
43
+ final Stream <LiveQueryClientEvent > _eventStream;
44
+ final bool debug;
45
+
46
+ int _retryState = 0 ;
47
+ bool _isOnline = false ;
48
+ bool _isConnected = false ;
49
+ bool _userDisconnected = false ;
50
+
51
+ Timer _currentTimer;
52
+
53
+ LiveQueryReconnectingController (
54
+ this ._reconnect, this ._eventStream, this .debug) {
55
+ Connectivity ().onConnectivityChanged.listen ((ConnectivityResult state) {
56
+ if (! _isOnline && state != ConnectivityResult .none) _retryState = 0 ;
57
+ _isOnline = state != ConnectivityResult .none;
58
+ if (debug) print ('$DEBUG_TAG : $state ' );
59
+ _setReconnect ();
60
+ });
61
+ _eventStream.listen ((LiveQueryClientEvent event) {
62
+ switch (event) {
63
+ case LiveQueryClientEvent .CONNECTED :
64
+ _isConnected = true ;
65
+ _retryState = 0 ;
66
+ _userDisconnected = false ;
67
+ break ;
68
+ case LiveQueryClientEvent .DISCONNECTED :
69
+ _isConnected = false ;
70
+ _setReconnect ();
71
+ break ;
72
+ case LiveQueryClientEvent .USER_DISCONNECTED :
73
+ _userDisconnected = true ;
74
+ if (_currentTimer != null ) {
75
+ _currentTimer.cancel ();
76
+ _currentTimer = null ;
77
+ }
78
+ break ;
79
+ }
80
+
81
+ if (debug) print ('$DEBUG_TAG : $event ' );
82
+ });
83
+ WidgetsBinding .instance.addObserver (this );
84
+ }
85
+
86
+ @override
87
+ void didChangeAppLifecycleState (AppLifecycleState state) {
88
+ switch (state) {
89
+ case AppLifecycleState .resumed:
90
+ _setReconnect ();
91
+ break ;
92
+ default :
93
+ break ;
94
+ }
95
+ }
96
+
97
+ void _setReconnect () {
98
+ if (_isOnline &&
99
+ ! _isConnected &&
100
+ _currentTimer == null &&
101
+ ! _userDisconnected &&
102
+ retryInterval[_retryState] >= 0 ) {
103
+ _currentTimer =
104
+ Timer (Duration (milliseconds: retryInterval[_retryState]), () {
105
+ _currentTimer = null ;
106
+ _reconnect ();
107
+ });
108
+ if (debug)
109
+ print ('$DEBUG_TAG : Retrytimer set to ${retryInterval [_retryState ]}ms' );
110
+ if (_retryState < retryInterval.length - 1 ) _retryState++ ;
111
+ }
112
+ }
113
+ }
114
+
115
+ class Client {
38
116
factory Client () => _getInstance ();
39
117
Client ._internal (
40
118
{bool debug, ParseHTTPClient client, bool autoSendSessionId}) {
@@ -57,14 +135,9 @@ class Client with WidgetsBindingObserver {
57
135
} else if (_liveQueryURL.contains ('http' )) {
58
136
_liveQueryURL = _liveQueryURL.replaceAll ('http' , 'ws' );
59
137
}
60
- Connectivity ().onConnectivityChanged
61
- .listen ((ConnectivityResult connectivityResult) {
62
- print ('onConnectivityChanged:$connectivityResult ' );
63
- if (connectivityResult != ConnectivityResult .none) {
64
- reconnect ();
65
- }
66
- });
67
- WidgetsBinding .instance.addObserver (this );
138
+
139
+ reconnectingController = LiveQueryReconnectingController (
140
+ () => reconnect (userInitialized: false ), getClientEventStream, _debug);
68
141
}
69
142
static Client get instance => _getInstance ();
70
143
static Client _instance;
@@ -85,15 +158,15 @@ class Client with WidgetsBindingObserver {
85
158
bool _sendSessionId;
86
159
WebSocketChannel _channel;
87
160
String _liveQueryURL;
88
- bool _userDisconnected = false ;
89
161
bool _connecting = false ;
90
162
StreamController <LiveQueryClientEvent > _clientEventStreamController;
91
163
Stream <LiveQueryClientEvent > _clientEventStream;
164
+ LiveQueryReconnectingController reconnectingController;
92
165
93
166
final Map <int , Subscription > _requestSubScription = < int , Subscription > {};
94
167
95
- Future <void > reconnect () async {
96
- await _connect ();
168
+ Future <void > reconnect ({ bool userInitialized = false } ) async {
169
+ await _connect (userInitialized : userInitialized );
97
170
_connectLiveQuery ();
98
171
}
99
172
@@ -104,19 +177,7 @@ class Client with WidgetsBindingObserver {
104
177
return WebSocket .connecting;
105
178
}
106
179
107
- @override
108
- void didChangeAppLifecycleState (AppLifecycleState state) {
109
- super .didChangeAppLifecycleState (state);
110
- switch (state) {
111
- case AppLifecycleState .resumed:
112
- reconnect ();
113
- break ;
114
- default :
115
- break ;
116
- }
117
- }
118
-
119
- Future <dynamic > disconnect () async {
180
+ Future <dynamic > disconnect ({bool userInitialized = false }) async {
120
181
if (_webSocket != null && _webSocket.readyState == WebSocket .open) {
121
182
if (_debug) {
122
183
print ('$_printConstLiveQuery : Socket closed' );
@@ -134,13 +195,16 @@ class Client with WidgetsBindingObserver {
134
195
_requestSubScription.values.toList ().forEach ((Subscription subcription) {
135
196
subcription._enabled = false ;
136
197
});
137
- _userDisconnected = true ;
138
198
_connecting = false ;
199
+ if (userInitialized)
200
+ _clientEventStreamController.sink
201
+ .add (LiveQueryClientEvent .USER_DISCONNECTED );
139
202
}
140
203
141
204
Future <Subscription > subscribe (QueryBuilder query) async {
142
205
if (_webSocket == null ) {
143
- await reconnect ();
206
+ await _clientEventStream.any ((LiveQueryClientEvent event) =>
207
+ event == LiveQueryClientEvent .CONNECTED );
144
208
}
145
209
final int requestId = _requestIdGenerator ();
146
210
final Subscription subscription = Subscription (query, requestId);
@@ -173,17 +237,17 @@ class Client with WidgetsBindingObserver {
173
237
return _requestIdCount++ ;
174
238
}
175
239
176
- Future <dynamic > _connect () async {
240
+ Future <dynamic > _connect ({ bool userInitialized = false } ) async {
177
241
if (_connecting) {
178
242
print ('already connecting' );
179
243
return Future <void >.value (null );
180
244
}
181
- await disconnect ();
245
+ await disconnect (userInitialized : userInitialized );
182
246
_connecting = true ;
183
- _userDisconnected = false ;
184
247
185
248
try {
186
249
_webSocket = await WebSocket .connect (_liveQueryURL);
250
+ _connecting = false ;
187
251
if (_webSocket != null && _webSocket.readyState == WebSocket .open) {
188
252
if (_debug) {
189
253
print ('$_printConstLiveQuery : Socket opened' );
@@ -200,18 +264,12 @@ class Client with WidgetsBindingObserver {
200
264
}, onDone: () {
201
265
_clientEventStreamController.sink
202
266
.add (LiveQueryClientEvent .DISCONNECTED );
203
- if (! _userDisconnected) {
204
- reconnect ();
205
- }
206
267
if (_debug) {
207
268
print ('$_printConstLiveQuery : Done' );
208
269
}
209
270
}, onError: (Object error) {
210
271
_clientEventStreamController.sink
211
272
.add (LiveQueryClientEvent .DISCONNECTED );
212
- if (! _userDisconnected) {
213
- reconnect ();
214
- }
215
273
if (_debug) {
216
274
print (
217
275
'$_printConstLiveQuery : Error: ${error .runtimeType .toString ()}' );
@@ -220,6 +278,7 @@ class Client with WidgetsBindingObserver {
220
278
ParseApiRQ .liveQuery, _debug, 'IOWebSocketChannel' ));
221
279
});
222
280
} on Exception catch (e) {
281
+ _connecting = false ;
223
282
_clientEventStreamController.sink.add (LiveQueryClientEvent .DISCONNECTED );
224
283
if (_debug) {
225
284
print ('$_printConstLiveQuery : Error: ${e .toString ()}' );
@@ -299,7 +358,6 @@ class Client with WidgetsBindingObserver {
299
358
final Map <String , dynamic > actionData = jsonDecode (message);
300
359
Subscription subscription;
301
360
if (actionData.containsKey ('op' ) && actionData['op' ] == 'connected' ) {
302
- _connecting = false ;
303
361
print ('ReSubScription:$_requestSubScription ' );
304
362
_requestSubScription.values.toList ().forEach ((Subscription subcription) {
305
363
_subscribeLiveQuery (subcription);
0 commit comments