Skip to content

Async BumpTransactionEventHandler #3752

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

209 changes: 209 additions & 0 deletions lightning/src/events/bump_transaction/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// This file is Copyright its original authors, visible in version control
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It seems more sensible to move bump_transaction.rs to bump_transaction/mod.rs to keep it and sync.rs together.

Copy link
Contributor Author

@joostjager joostjager Jun 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Although so many mod.rs files with actual code make it a bit harder to navigate. Not sure if rust has a way of keeping files in the same sub dir and still have a descriptive file name, without making it a sub module. And which is also in line with project conventions.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only way to do that would be to have a separate module and re-export from the top-level one. I'm generally not a fan of that unless we have a good reason (eg the module is way too large), which this doesn't feel like.

// history.
//
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
// You may not use this file except in accordance with one or both of these
// licenses.

//! This module provides synchronous wrappers around [`BumpTransactionEventHandler`] and related types.

use core::future::Future;
use core::ops::Deref;
use core::task;

use crate::chain::chaininterface::BroadcasterInterface;
use crate::chain::ClaimId;
use crate::prelude::*;
use crate::sign::SignerProvider;
use crate::sync::Arc;
use crate::util::async_poll::{dummy_waker, AsyncResult, MaybeSend, MaybeSync};
use crate::util::logger::Logger;

use bitcoin::{Psbt, ScriptBuf, Transaction, TxOut};

use super::BumpTransactionEvent;
use super::{
BumpTransactionEventHandler, CoinSelection, CoinSelectionSource, Input, Utxo, Wallet,
WalletSource,
};

/// A synchronous version of the [`WalletSource`] trait.
pub trait WalletSourceSync {
/// A synchronous version of [`WalletSource::list_confirmed_utxos`].
fn list_confirmed_utxos(&self) -> Result<Vec<Utxo>, ()>;
/// A synchronous version of [`WalletSource::get_change_script`].
fn get_change_script(&self) -> Result<ScriptBuf, ()>;
/// A Synchronous version of [`WalletSource::sign_psbt`].
fn sign_psbt(&self, psbt: Psbt) -> Result<Transaction, ()>;
}

pub(crate) struct WalletSourceSyncWrapper<T: Deref>(T)
where
T::Target: WalletSourceSync;

impl<T: Deref> WalletSource for WalletSourceSyncWrapper<T>
where
T::Target: WalletSourceSync,
{
fn list_confirmed_utxos<'a>(&'a self) -> AsyncResult<'a, Vec<Utxo>> {
let utxos = self.0.list_confirmed_utxos();
Box::pin(async move { utxos })
}

fn get_change_script<'a>(&'a self) -> AsyncResult<'a, ScriptBuf> {
let script = self.0.get_change_script();
Box::pin(async move { script })
}

fn sign_psbt<'a>(&'a self, psbt: Psbt) -> AsyncResult<'a, Transaction> {
let signed_psbt = self.0.sign_psbt(psbt);
Box::pin(async move { signed_psbt })
}
}

/// A synchronous wrapper around [`Wallet`] to be used in contexts where async is not available.
pub struct WalletSync<W: Deref + MaybeSync + MaybeSend, L: Deref + MaybeSync + MaybeSend>
where
W::Target: WalletSourceSync + MaybeSend,
L::Target: Logger + MaybeSend,
{
wallet: Wallet<Arc<WalletSourceSyncWrapper<W>>, L>,
}

impl<W: Deref + MaybeSync + MaybeSend, L: Deref + MaybeSync + MaybeSend> WalletSync<W, L>
where
W::Target: WalletSourceSync + MaybeSend,
L::Target: Logger + MaybeSend,
{
/// Constructs a new [`WalletSync`] instance.
pub fn new(source: W, logger: L) -> Self {
Self { wallet: Wallet::new(Arc::new(WalletSourceSyncWrapper(source)), logger) }
}
}

impl<W: Deref + MaybeSync + MaybeSend, L: Deref + MaybeSync + MaybeSend> CoinSelectionSourceSync
for WalletSync<W, L>
where
W::Target: WalletSourceSync + MaybeSend + MaybeSync,
L::Target: Logger + MaybeSend + MaybeSync,
{
fn select_confirmed_utxos(
&self, claim_id: ClaimId, must_spend: Vec<Input>, must_pay_to: &[TxOut],
target_feerate_sat_per_1000_weight: u32,
) -> Result<CoinSelection, ()> {
let mut fut = self.wallet.select_confirmed_utxos(
claim_id,
must_spend,
must_pay_to,
target_feerate_sat_per_1000_weight,
);
let mut waker = dummy_waker();
let mut ctx = task::Context::from_waker(&mut waker);
match fut.as_mut().poll(&mut ctx) {
task::Poll::Ready(result) => result,
task::Poll::Pending => {
unreachable!(
"Wallet::select_confirmed_utxos should not be pending in a sync context"
);
},
}
}

fn sign_psbt(&self, psbt: Psbt) -> Result<Transaction, ()> {
let mut fut = self.wallet.sign_psbt(psbt);
let mut waker = dummy_waker();
let mut ctx = task::Context::from_waker(&mut waker);
match fut.as_mut().poll(&mut ctx) {
task::Poll::Ready(result) => result,
task::Poll::Pending => {
unreachable!("Wallet::sign_psbt should not be pending in a sync context");
},
}
}
}

/// A synchronous version of the [`CoinSelectionSource`] trait.
pub trait CoinSelectionSourceSync {
/// A synchronous version of [`CoinSelectionSource::select_confirmed_utxos`].
fn select_confirmed_utxos(
&self, claim_id: ClaimId, must_spend: Vec<Input>, must_pay_to: &[TxOut],
target_feerate_sat_per_1000_weight: u32,
) -> Result<CoinSelection, ()>;

/// A synchronous version of [`CoinSelectionSource::sign_psbt`].
fn sign_psbt(&self, psbt: Psbt) -> Result<Transaction, ()>;
}

struct CoinSelectionSourceSyncWrapper<T: Deref>(T)
where
T::Target: CoinSelectionSourceSync;

impl<T: Deref> CoinSelectionSource for CoinSelectionSourceSyncWrapper<T>
where
T::Target: CoinSelectionSourceSync,
{
fn select_confirmed_utxos<'a>(
&'a self, claim_id: ClaimId, must_spend: Vec<Input>, must_pay_to: &'a [TxOut],
target_feerate_sat_per_1000_weight: u32,
) -> AsyncResult<'a, CoinSelection> {
let coins = self.0.select_confirmed_utxos(
claim_id,
must_spend,
must_pay_to,
target_feerate_sat_per_1000_weight,
);
Box::pin(async move { coins })
}

fn sign_psbt<'a>(&'a self, psbt: Psbt) -> AsyncResult<'a, Transaction> {
let psbt = self.0.sign_psbt(psbt);
Box::pin(async move { psbt })
}
}

/// A synchronous wrapper around [`BumpTransactionEventHandler`] to be used in contexts where async is not available.
pub struct BumpTransactionEventHandlerSync<B: Deref, C: Deref, SP: Deref, L: Deref>
where
B::Target: BroadcasterInterface,
C::Target: CoinSelectionSourceSync,
SP::Target: SignerProvider,
L::Target: Logger,
{
bump_transaction_event_handler:
Arc<BumpTransactionEventHandler<B, Arc<CoinSelectionSourceSyncWrapper<C>>, SP, L>>,
}

impl<B: Deref, C: Deref, SP: Deref, L: Deref> BumpTransactionEventHandlerSync<B, C, SP, L>
where
B::Target: BroadcasterInterface,
C::Target: CoinSelectionSourceSync,
SP::Target: SignerProvider,
L::Target: Logger,
{
/// Constructs a new instance of [`BumpTransactionEventHandlerSync`].
pub fn new(broadcaster: B, utxo_source: C, signer_provider: SP, logger: L) -> Self {
let bump_transaction_event_handler = Arc::new(BumpTransactionEventHandler::new(
broadcaster,
Arc::new(CoinSelectionSourceSyncWrapper(utxo_source)),
signer_provider,
logger,
));
Self { bump_transaction_event_handler }
}

/// Handles all variants of [`BumpTransactionEvent`].
pub fn handle_event(&self, event: &BumpTransactionEvent) {
let mut fut = Box::pin(self.bump_transaction_event_handler.handle_event(event));
let mut waker = dummy_waker();
let mut ctx = task::Context::from_waker(&mut waker);
match fut.as_mut().poll(&mut ctx) {
task::Poll::Ready(result) => result,
task::Poll::Pending => {
// In a sync context, we can't wait for the future to complete.
unreachable!("BumpTransactionEventHandlerSync::handle_event should not be pending in a sync context");
},
}
}
}
2 changes: 1 addition & 1 deletion lightning/src/ln/async_signer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use bitcoin::transaction::Version;

use crate::chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS;
use crate::chain::ChannelMonitorUpdateStatus;
use crate::events::bump_transaction::WalletSource;
use crate::events::bump_transaction::sync::WalletSourceSync;
use crate::events::{ClosureReason, Event};
use crate::ln::chan_utils::ClosingTransaction;
use crate::ln::channel::DISCONNECT_PEER_AWAITING_RESPONSE_TICKS;
Expand Down
14 changes: 8 additions & 6 deletions lightning/src/ln/functional_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use crate::chain::{BestBlock, ChannelMonitorUpdateStatus, Confirm, Listen, Watch
use crate::chain::channelmonitor::ChannelMonitor;
use crate::chain::transaction::OutPoint;
use crate::events::{ClaimedHTLC, ClosureReason, Event, HTLCHandlingFailureType, PaidBolt12Invoice, PathFailure, PaymentFailureReason, PaymentPurpose};
use crate::events::bump_transaction::{BumpTransactionEvent, BumpTransactionEventHandler, Wallet, WalletSource};
use crate::events::bump_transaction::{BumpTransactionEvent};
use crate::events::bump_transaction::sync::{BumpTransactionEventHandlerSync, WalletSourceSync, WalletSync};
use crate::ln::types::ChannelId;
use crate::types::features::ChannelTypeFeatures;
use crate::types::payment::{PaymentPreimage, PaymentHash, PaymentSecret};
Expand Down Expand Up @@ -472,9 +473,9 @@ pub struct Node<'chan_man, 'node_cfg: 'chan_man, 'chan_mon_cfg: 'node_cfg> {
pub connect_style: Rc<RefCell<ConnectStyle>>,
pub override_init_features: Rc<RefCell<Option<InitFeatures>>>,
pub wallet_source: Arc<test_utils::TestWalletSource>,
pub bump_tx_handler: BumpTransactionEventHandler<
pub bump_tx_handler: BumpTransactionEventHandlerSync<
&'chan_mon_cfg test_utils::TestBroadcaster,
Arc<Wallet<Arc<test_utils::TestWalletSource>, &'chan_mon_cfg test_utils::TestLogger>>,
Arc<WalletSync<Arc<test_utils::TestWalletSource>, &'chan_mon_cfg test_utils::TestLogger>>,
&'chan_mon_cfg test_utils::TestKeysInterface,
&'chan_mon_cfg test_utils::TestLogger,
>,
Expand Down Expand Up @@ -3424,6 +3425,7 @@ pub fn create_network<'a, 'b: 'a, 'c: 'b>(node_count: usize, cfgs: &'b Vec<NodeC
);
let gossip_sync = P2PGossipSync::new(cfgs[i].network_graph.as_ref(), None, cfgs[i].logger);
let wallet_source = Arc::new(test_utils::TestWalletSource::new(SecretKey::from_slice(&[i as u8 + 1; 32]).unwrap()));
let wallet = Arc::new(WalletSync::new(wallet_source.clone(), cfgs[i].logger));
nodes.push(Node{
chain_source: cfgs[i].chain_source, tx_broadcaster: cfgs[i].tx_broadcaster,
fee_estimator: cfgs[i].fee_estimator, router: &cfgs[i].router,
Expand All @@ -3435,9 +3437,9 @@ pub fn create_network<'a, 'b: 'a, 'c: 'b>(node_count: usize, cfgs: &'b Vec<NodeC
blocks: Arc::clone(&cfgs[i].tx_broadcaster.blocks),
connect_style: Rc::clone(&connect_style),
override_init_features: Rc::clone(&cfgs[i].override_init_features),
wallet_source: Arc::clone(&wallet_source),
bump_tx_handler: BumpTransactionEventHandler::new(
cfgs[i].tx_broadcaster, Arc::new(Wallet::new(Arc::clone(&wallet_source), cfgs[i].logger)),
wallet_source: wallet_source.clone(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely prefer to leave Arc::clones over .clone()s. They're way more readable :)

Copy link
Contributor Author

@joostjager joostjager Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel it. Maybe that feeling needs time to develop 😅

Copy link
Contributor

@tnull tnull Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel it. Maybe that feeling needs time to develop 😅

Maybe. FWIW, I share the preference for Arc::clone, especially as it highlights that it's not a deep (i.e. costly) clone, but just cloning a reference..

bump_tx_handler: BumpTransactionEventHandlerSync::new(
cfgs[i].tx_broadcaster, wallet,
&cfgs[i].keys_manager, cfgs[i].logger,
),
})
Expand Down
3 changes: 2 additions & 1 deletion lightning/src/ln/functional_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use crate::chain::channelmonitor::{
};
use crate::chain::transaction::OutPoint;
use crate::chain::{ChannelMonitorUpdateStatus, Confirm, Listen, Watch};
use crate::events::bump_transaction::WalletSource;
use crate::events::{
ClosureReason, Event, FundingInfo, HTLCHandlingFailureType, PathFailure, PaymentFailureReason,
PaymentPurpose,
Expand Down Expand Up @@ -1474,6 +1473,8 @@ pub fn claim_htlc_outputs() {
// This is a regression test for https://github.com/lightningdevkit/rust-lightning/issues/3537.
#[xtest(feature = "_externalize_tests")]
pub fn test_multiple_package_conflicts() {
use crate::events::bump_transaction::sync::WalletSourceSync;

let chanmon_cfgs = create_chanmon_cfgs(3);
let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
let mut user_cfg = test_default_channel_config();
Expand Down
3 changes: 2 additions & 1 deletion lightning/src/ln/monitor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@

//! Further functional tests which test blockchain reorganizations.

use crate::events::bump_transaction::sync::WalletSourceSync;
use crate::sign::{ecdsa::EcdsaChannelSigner, OutputSpender, SignerProvider, SpendableOutputDescriptor};
use crate::chain::channelmonitor::{ANTI_REORG_DELAY, ARCHIVAL_DELAY_BLOCKS,LATENCY_GRACE_PERIOD_BLOCKS, COUNTERPARTY_CLAIMABLE_WITHIN_BLOCKS_PINNABLE, Balance, BalanceSource, ChannelMonitorUpdateStep};
use crate::chain::transaction::OutPoint;
use crate::chain::chaininterface::{ConfirmationTarget, LowerBoundedFeeEstimator, compute_feerate_sat_per_1000_weight};
use crate::events::bump_transaction::{BumpTransactionEvent, WalletSource};
use crate::events::bump_transaction::{BumpTransactionEvent};
use crate::events::{Event, ClosureReason, HTLCHandlingFailureType};
use crate::ln::channel;
use crate::ln::types::ChannelId;
Expand Down
20 changes: 20 additions & 0 deletions lightning/src/util/async_poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,24 @@ pub(crate) fn dummy_waker() -> Waker {
}

/// A type alias for a future that returns a result of type T.
#[cfg(feature = "std")]
pub type AsyncResult<'a, T> = Pin<Box<dyn Future<Output = Result<T, ()>> + 'a + Send>>;
#[cfg(not(feature = "std"))]
pub type AsyncResult<'a, T> = Pin<Box<dyn Future<Output = Result<T, ()>> + 'a>>;

// Marker trait to optionally implement `Sync` under std.
#[cfg(feature = "std")]
pub use core::marker::Sync as MaybeSync;

#[cfg(not(feature = "std"))]
pub trait MaybeSync {}
#[cfg(not(feature = "std"))]
impl<T> MaybeSync for T where T: ?Sized {}

// Marker trait to optionally implement `Send` under std.
#[cfg(feature = "std")]
pub use core::marker::Send as MaybeSend;
#[cfg(not(feature = "std"))]
pub trait MaybeSend {}
#[cfg(not(feature = "std"))]
impl<T> MaybeSend for T where T: ?Sized {}
20 changes: 10 additions & 10 deletions lightning/src/util/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use crate::chain::channelmonitor::{
};
use crate::chain::transaction::OutPoint;
use crate::chain::WatchedOutput;
use crate::events::bump_transaction::{Utxo, WalletSource};
use crate::events::bump_transaction::sync::WalletSourceSync;
use crate::events::bump_transaction::Utxo;
#[cfg(any(test, feature = "_externalize_tests"))]
use crate::ln::chan_utils::CommitmentTransaction;
use crate::ln::channel_state::ChannelDetails;
Expand Down Expand Up @@ -85,7 +86,6 @@ use crate::io;
use crate::prelude::*;
use crate::sign::{EntropySource, NodeSigner, RandomBytes, Recipient, SignerProvider};
use crate::sync::{Arc, Mutex};
use core::cell::RefCell;
use core::mem;
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use core::time::Duration;
Expand Down Expand Up @@ -1874,36 +1874,36 @@ impl Drop for TestScorer {

pub struct TestWalletSource {
secret_key: SecretKey,
utxos: RefCell<Vec<Utxo>>,
utxos: Mutex<Vec<Utxo>>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still need this in the sync code it seems, unfortunately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this is a show stopper for the whole sync wrapping concept. I hope not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TheBlueMatt what do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its okay. Its certainly not ideal but it only kicks on with std, and the alternative is macro-ing the whole code and duplicating it all, which I'm not really sure is a better answer. Of course once Rust has proper impl Trait returns from trait methods we can drop this stuff, but it sounds like thats still a bit off :/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, the Maybe* markers only require this in std.

Of course once Rust has proper impl Trait returns from trait methods we can drop this stuff, but it sounds like thats still a bit off :/

For background knowledge - what exactly can we drop then?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it sounds like thats still a bit off :/

IIUC, that would be RPITIT that has be stabilized in rust-lang/rust#115822 for version 1.75?

Meaning it could be literally months away, once Debian Trixie ships, which is on rustc 1.85?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly, that doesn't suffice. The simple RPITIT that was stabilized only allows you to return an impl Trait for a specific Trait, and any specific call to a trait method that returns impl Trait "gets" a type back that is only Trait. What we want here is flexibility - based on what trait is being called, we want the caller to "get" a type back that is concretely the thing that its getting. IOW if we're returning a Future that happens to be Send + Sync, the caller knows that and can use them as Send + Sync. The version stabilized strips that from the type :(

secp: Secp256k1<bitcoin::secp256k1::All>,
}

impl TestWalletSource {
pub fn new(secret_key: SecretKey) -> Self {
Self { secret_key, utxos: RefCell::new(Vec::new()), secp: Secp256k1::new() }
Self { secret_key, utxos: Mutex::new(Vec::new()), secp: Secp256k1::new() }
}

pub fn add_utxo(&self, outpoint: bitcoin::OutPoint, value: Amount) -> TxOut {
let public_key = bitcoin::PublicKey::new(self.secret_key.public_key(&self.secp));
let utxo = Utxo::new_p2pkh(outpoint, value, &public_key.pubkey_hash());
self.utxos.borrow_mut().push(utxo.clone());
self.utxos.lock().unwrap().push(utxo.clone());
utxo.output
}

pub fn add_custom_utxo(&self, utxo: Utxo) -> TxOut {
let output = utxo.output.clone();
self.utxos.borrow_mut().push(utxo);
self.utxos.lock().unwrap().push(utxo);
output
}

pub fn remove_utxo(&self, outpoint: bitcoin::OutPoint) {
self.utxos.borrow_mut().retain(|utxo| utxo.outpoint != outpoint);
self.utxos.lock().unwrap().retain(|utxo| utxo.outpoint != outpoint);
}
}

impl WalletSource for TestWalletSource {
impl WalletSourceSync for TestWalletSource {
fn list_confirmed_utxos(&self) -> Result<Vec<Utxo>, ()> {
Ok(self.utxos.borrow().clone())
Ok(self.utxos.lock().unwrap().clone())
}

fn get_change_script(&self) -> Result<ScriptBuf, ()> {
Expand All @@ -1913,7 +1913,7 @@ impl WalletSource for TestWalletSource {

fn sign_psbt(&self, psbt: Psbt) -> Result<Transaction, ()> {
let mut tx = psbt.extract_tx_unchecked_fee_rate();
let utxos = self.utxos.borrow();
let utxos = self.utxos.lock().unwrap();
for i in 0..tx.input.len() {
if let Some(utxo) =
utxos.iter().find(|utxo| utxo.outpoint == tx.input[i].previous_output)
Expand Down
Loading