@@ -15,10 +15,8 @@ use sc_client_api::StorageChangeSet;
15
15
use sp_core:: storage:: StorageKey ;
16
16
use sp_core:: { blake2_128, twox_128} ;
17
17
18
- use futures:: { StreamExt , TryStreamExt } ;
19
- use jsonrpc_core:: futures:: {
20
- future:: Future as Future01 , sink:: Sink as Sink01 , stream, stream:: Stream as Stream01 ,
21
- } ;
18
+ use futures:: { future, StreamExt , TryStreamExt } ;
19
+ use jsonrpc_core:: futures:: { future:: Future , sink:: Sink , stream, stream:: Stream } ;
22
20
use jsonrpc_pubsub:: { manager:: SubscriptionManager , typed:: Subscriber , SubscriptionId } ;
23
21
use log:: warn;
24
22
@@ -220,13 +218,16 @@ where
220
218
} ;
221
219
222
220
let stream = stream
223
- . map ( |( _block, changes) | Ok :: < _ , ( ) > ( get_geode_state ( changes) ) )
221
+ . filter_map ( move |( _block, changes) | match get_geode_state ( changes) {
222
+ Ok ( state) => future:: ready ( Some ( Ok :: < _ , ( ) > ( Ok ( state) ) ) ) ,
223
+ Err ( _) => future:: ready ( None ) ,
224
+ } )
224
225
. compat ( ) ;
225
226
226
227
self . manager . add ( subscriber, |sink| {
227
- let stream = stream. map ( |res| Ok ( res) ) ;
228
228
sink. sink_map_err ( |e| warn ! ( "Error sending notifications: {:?}" , e) )
229
229
. send_all ( stream:: iter_result ( vec ! [ Ok ( initial) ] ) . chain ( stream) )
230
+ // we ignore the resulting Stream (if the first stream is over we are unsubscribed)
230
231
. map ( |_| ( ) )
231
232
} ) ;
232
233
}
@@ -259,22 +260,22 @@ fn blake2_128_concat(d: &[u8]) -> Vec<u8> {
259
260
v
260
261
}
261
262
262
- fn get_geode_state ( changes : StorageChangeSet ) -> GeodeState {
263
+ fn get_geode_state ( changes : StorageChangeSet ) -> Result < GeodeState > {
263
264
for ( _, _, data) in changes. iter ( ) {
264
265
match data {
265
266
Some ( data) => {
266
267
let mut value: & [ u8 ] = & data. 0 . clone ( ) ;
267
- match GeodeState :: decode ( & mut value) {
268
- Ok ( state ) => {
269
- return state;
268
+ match Geode :: < AccountId , Hash > :: decode ( & mut value) {
269
+ Ok ( geode ) => {
270
+ return Ok ( geode . state ) ;
270
271
}
271
- Err ( _) => warn ! ( "unable to decode GeodeState " ) ,
272
+ Err ( _) => warn ! ( "unable to decode Geode " ) ,
272
273
}
273
274
}
274
275
None => warn ! ( "empty change set" ) ,
275
276
} ;
276
277
}
277
- GeodeState :: Null
278
+ Err ( Error :: internal_error ( ) )
278
279
}
279
280
280
281
fn client_err ( _: sp_blockchain:: Error ) -> Error {
0 commit comments