@@ -1475,7 +1475,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1475
1475
let networks = self . message_handler . chan_handler . get_chain_hashes ( ) ;
1476
1476
let resp = msgs:: Init { features, networks, remote_network_address : filter_addresses ( peer. their_socket_address . clone ( ) ) } ;
1477
1477
self . enqueue_message ( peer, & resp) ;
1478
- peer. awaiting_pong_timer_tick_intervals = 0 ;
1479
1478
} ,
1480
1479
NextNoiseStep :: ActThree => {
1481
1480
let their_node_id = try_potential_handleerror ! ( peer,
@@ -1488,7 +1487,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1488
1487
let networks = self . message_handler . chan_handler . get_chain_hashes ( ) ;
1489
1488
let resp = msgs:: Init { features, networks, remote_network_address : filter_addresses ( peer. their_socket_address . clone ( ) ) } ;
1490
1489
self . enqueue_message ( peer, & resp) ;
1491
- peer. awaiting_pong_timer_tick_intervals = 0 ;
1492
1490
} ,
1493
1491
NextNoiseStep :: NoiseComplete => {
1494
1492
if peer. pending_read_is_header {
@@ -1681,6 +1679,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1681
1679
return Err ( PeerHandleError { } . into ( ) ) ;
1682
1680
}
1683
1681
1682
+ peer_lock. awaiting_pong_timer_tick_intervals = 0 ;
1684
1683
peer_lock. their_features = Some ( msg. features ) ;
1685
1684
return Ok ( None ) ;
1686
1685
} else if peer_lock. their_features . is_none ( ) {
@@ -2674,7 +2673,7 @@ mod tests {
2674
2673
use crate :: ln:: ChannelId ;
2675
2674
use crate :: ln:: features:: { InitFeatures , NodeFeatures } ;
2676
2675
use crate :: ln:: peer_channel_encryptor:: PeerChannelEncryptor ;
2677
- use crate :: ln:: peer_handler:: { CustomMessageHandler , PeerManager , MessageHandler , SocketDescriptor , IgnoringMessageHandler , filter_addresses} ;
2676
+ use crate :: ln:: peer_handler:: { CustomMessageHandler , PeerManager , MessageHandler , SocketDescriptor , IgnoringMessageHandler , filter_addresses, ErroringMessageHandler , MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER } ;
2678
2677
use crate :: ln:: { msgs, wire} ;
2679
2678
use crate :: ln:: msgs:: { LightningError , SocketAddress } ;
2680
2679
use crate :: util:: test_utils;
@@ -3216,6 +3215,105 @@ mod tests {
3216
3215
assert ! ( peers[ 0 ] . read_event( & mut fd_a, & b_data) . is_err( ) ) ;
3217
3216
}
3218
3217
3218
+ #[ test]
3219
+ fn test_inbound_conn_handshake_complete_awaiting_pong ( ) {
3220
+ // Test that we do not disconnect an outbound peer after the noise handshake completes due
3221
+ // to a pong timeout for a ping that was never sent if a timer tick fires after we send act
3222
+ // two of the noise handshake along with our init message but before we receive their init
3223
+ // message.
3224
+ let logger = test_utils:: TestLogger :: new ( ) ;
3225
+ let node_signer_a = test_utils:: TestNodeSigner :: new ( SecretKey :: from_slice ( & [ 42 ; 32 ] ) . unwrap ( ) ) ;
3226
+ let node_signer_b = test_utils:: TestNodeSigner :: new ( SecretKey :: from_slice ( & [ 43 ; 32 ] ) . unwrap ( ) ) ;
3227
+ let peer_a = PeerManager :: new ( MessageHandler {
3228
+ chan_handler : ErroringMessageHandler :: new ( ) ,
3229
+ route_handler : IgnoringMessageHandler { } ,
3230
+ onion_message_handler : IgnoringMessageHandler { } ,
3231
+ custom_message_handler : IgnoringMessageHandler { } ,
3232
+ } , 0 , & [ 0 ; 32 ] , & logger, & node_signer_a) ;
3233
+ let peer_b = PeerManager :: new ( MessageHandler {
3234
+ chan_handler : ErroringMessageHandler :: new ( ) ,
3235
+ route_handler : IgnoringMessageHandler { } ,
3236
+ onion_message_handler : IgnoringMessageHandler { } ,
3237
+ custom_message_handler : IgnoringMessageHandler { } ,
3238
+ } , 0 , & [ 1 ; 32 ] , & logger, & node_signer_b) ;
3239
+
3240
+ let a_id = node_signer_a. get_node_id ( Recipient :: Node ) . unwrap ( ) ;
3241
+ let mut fd_a = FileDescriptor {
3242
+ fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
3243
+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
3244
+ } ;
3245
+ let mut fd_b = FileDescriptor {
3246
+ fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
3247
+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
3248
+ } ;
3249
+
3250
+ // Exchange messages with both peers until they both complete the init handshake.
3251
+ let act_one = peer_b. new_outbound_connection ( a_id, fd_b. clone ( ) , None ) . unwrap ( ) ;
3252
+ peer_a. new_inbound_connection ( fd_a. clone ( ) , None ) . unwrap ( ) ;
3253
+
3254
+ assert_eq ! ( peer_a. read_event( & mut fd_a, & act_one) . unwrap( ) , false ) ;
3255
+ peer_a. process_events ( ) ;
3256
+
3257
+ let act_two = fd_a. outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
3258
+ assert_eq ! ( peer_b. read_event( & mut fd_b, & act_two) . unwrap( ) , false ) ;
3259
+ peer_b. process_events ( ) ;
3260
+
3261
+ // Calling this here triggers the race on inbound connections.
3262
+ peer_b. timer_tick_occurred ( ) ;
3263
+
3264
+ let act_three_with_init_b = fd_b. outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
3265
+ assert ! ( !peer_a. peers. read( ) . unwrap( ) . get( & fd_a) . unwrap( ) . lock( ) . unwrap( ) . handshake_complete( ) ) ;
3266
+ assert_eq ! ( peer_a. read_event( & mut fd_a, & act_three_with_init_b) . unwrap( ) , false ) ;
3267
+ peer_a. process_events ( ) ;
3268
+ assert ! ( peer_a. peers. read( ) . unwrap( ) . get( & fd_a) . unwrap( ) . lock( ) . unwrap( ) . handshake_complete( ) ) ;
3269
+
3270
+ let init_a = fd_a. outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
3271
+ assert ! ( !init_a. is_empty( ) ) ;
3272
+
3273
+ assert ! ( !peer_b. peers. read( ) . unwrap( ) . get( & fd_b) . unwrap( ) . lock( ) . unwrap( ) . handshake_complete( ) ) ;
3274
+ assert_eq ! ( peer_b. read_event( & mut fd_b, & init_a) . unwrap( ) , false ) ;
3275
+ peer_b. process_events ( ) ;
3276
+ assert ! ( peer_b. peers. read( ) . unwrap( ) . get( & fd_b) . unwrap( ) . lock( ) . unwrap( ) . handshake_complete( ) ) ;
3277
+
3278
+ // Make sure we're still connected.
3279
+ assert_eq ! ( peer_b. peers. read( ) . unwrap( ) . len( ) , 1 ) ;
3280
+
3281
+ // B should send a ping on the first timer tick after `handshake_complete`.
3282
+ assert ! ( fd_b. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) . is_empty( ) ) ;
3283
+ peer_b. timer_tick_occurred ( ) ;
3284
+ peer_b. process_events ( ) ;
3285
+ assert ! ( !fd_b. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) . is_empty( ) ) ;
3286
+
3287
+ let mut send_warning = || {
3288
+ {
3289
+ let peers = peer_a. peers . read ( ) . unwrap ( ) ;
3290
+ let mut peer_b = peers. get ( & fd_a) . unwrap ( ) . lock ( ) . unwrap ( ) ;
3291
+ peer_a. enqueue_message ( & mut peer_b, & msgs:: WarningMessage {
3292
+ channel_id : ChannelId ( [ 0 ; 32 ] ) ,
3293
+ data : "no disconnect plz" . to_string ( ) ,
3294
+ } ) ;
3295
+ }
3296
+ peer_a. process_events ( ) ;
3297
+ let msg = fd_a. outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
3298
+ assert ! ( !msg. is_empty( ) ) ;
3299
+ assert_eq ! ( peer_b. read_event( & mut fd_b, & msg) . unwrap( ) , false ) ;
3300
+ peer_b. process_events ( ) ;
3301
+ } ;
3302
+
3303
+ // Fire more ticks until we reach the pong timeout. We send any message except pong to
3304
+ // pretend the connection is still alive.
3305
+ send_warning ( ) ;
3306
+ for _ in 0 ..MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER {
3307
+ peer_b. timer_tick_occurred ( ) ;
3308
+ send_warning ( ) ;
3309
+ }
3310
+ assert_eq ! ( peer_b. peers. read( ) . unwrap( ) . len( ) , 1 ) ;
3311
+
3312
+ // One more tick should enforce the pong timeout.
3313
+ peer_b. timer_tick_occurred ( ) ;
3314
+ assert_eq ! ( peer_b. peers. read( ) . unwrap( ) . len( ) , 0 ) ;
3315
+ }
3316
+
3219
3317
#[ test]
3220
3318
fn test_filter_addresses ( ) {
3221
3319
// Tests the filter_addresses function.
0 commit comments