File tree 3 files changed +31
-6
lines changed
3 files changed +31
-6
lines changed Original file line number Diff line number Diff line change 20
20
import Session from './session' ;
21
21
import { Pool } from './internal/pool' ;
22
22
import { connect } from "./internal/connector" ;
23
+ import StreamObserver from './internal/stream-observer' ;
23
24
24
25
/**
25
26
* A Driver instance is used for mananging {@link Session}s.
@@ -52,8 +53,9 @@ class Driver {
52
53
*/
53
54
_createConnection ( release ) {
54
55
let sessionId = this . _sessionIdGenerator ++ ;
56
+ let streamObserver = new _ConnectionStreamObserver ( this ) ;
55
57
let conn = connect ( this . _url ) ;
56
- conn . initialize ( this . _userAgent , this . _token ) ;
58
+ conn . initialize ( this . _userAgent , this . _token , streamObserver ) ;
57
59
conn . _id = sessionId ;
58
60
conn . _release = ( ) => release ( conn ) ;
59
61
@@ -121,4 +123,22 @@ class Driver {
121
123
}
122
124
}
123
125
126
+ /** Internal stream observer used for connection state */
127
+ class _ConnectionStreamObserver extends StreamObserver {
128
+ constructor ( driver ) {
129
+ super ( ) ;
130
+ this . _driver = driver ;
131
+ this . _hasFailed = false ;
132
+ }
133
+ onError ( error ) {
134
+ if ( ! this . _hasFailed ) {
135
+ super . onError ( error ) ;
136
+ if ( this . _driver . onError ) {
137
+ this . _driver . onError ( error ) ;
138
+ }
139
+ this . _hasFailed = true ;
140
+ }
141
+ }
142
+ }
143
+
124
144
export default Driver
Original file line number Diff line number Diff line change @@ -55,9 +55,9 @@ class WebSocketChannel {
55
55
}
56
56
} ;
57
57
58
- this . _ws . onerror = ( ) => {
58
+ this . _ws . onerror = ( err ) => {
59
59
if ( self . onerror ) {
60
- self . onerror ( ) ;
60
+ self . onerror ( err ) ;
61
61
}
62
62
}
63
63
}
Original file line number Diff line number Diff line change @@ -207,7 +207,12 @@ class Connection {
207
207
}
208
208
} ;
209
209
210
- this . _ch . onerror = ( ) => self . _isBroken = true ;
210
+ this . _ch . onerror = ( error ) => {
211
+ self . _isBroken = true ;
212
+ if ( this . _currentObserver . onError ) {
213
+ this . _currentObserver . onError ( error ) ;
214
+ }
215
+ }
211
216
212
217
this . _dechunker . onmessage = ( buf ) => {
213
218
self . _handleMessage ( self . _unpacker . unpack ( buf ) ) ;
@@ -266,9 +271,9 @@ class Connection {
266
271
break ;
267
272
case IGNORED :
268
273
try {
269
- if ( this . _errorMsg )
274
+ if ( this . _errorMsg && this . _currentObserver . onError )
270
275
this . _currentObserver . onError ( this . _errorMsg ) ;
271
- else
276
+ else if ( this . _currentObserver . onError )
272
277
this . _currentObserver . onError ( msg ) ;
273
278
} finally {
274
279
this . _currentObserver = this . _pendingObservers . shift ( ) ;
You can’t perform that action at this time.
0 commit comments