From 58d4969b29bc0989f021f061b1822af9b0d578d7 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Tue, 8 Apr 2025 12:02:23 -0300 Subject: [PATCH 01/26] sweepbatcher: mark channels receive- or send-only --- sweepbatcher/sweep_batcher.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index 03b697b19..6ab67035d 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -307,13 +307,13 @@ type SpendDetail struct { // that the sweep was successful. type SpendNotifier struct { // SpendChan is a channel where the spend details are received. - SpendChan chan *SpendDetail + SpendChan chan<- *SpendDetail // SpendErrChan is a channel where spend errors are received. - SpendErrChan chan error + SpendErrChan chan<- error // QuitChan is a channel that can be closed to stop the notifier. - QuitChan chan bool + QuitChan <-chan bool } var ( From 8860f7b3b1f3b3806b06b9690a5c5764b2730c1f Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Tue, 8 Apr 2025 12:34:42 -0300 Subject: [PATCH 02/26] sweepbatcher: unblock sending if notifier quits --- sweepbatcher/sweep_batcher.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index 6ab67035d..c5498c46c 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -1176,7 +1176,15 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, } select { + // Try to write the update to the notification + // channel. case notifier.SpendChan <- spendDetail: + + // If a quit signal was provided by the swap, + // continue. + case <-notifier.QuitChan: + + // If the context was canceled, stop. case <-ctx.Done(): } @@ -1184,7 +1192,15 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, case err := <-spendErr: select { + // Try to write the error to the notification + // channel. case notifier.SpendErrChan <- err: + + // If a quit signal was provided by the swap, + // continue. + case <-notifier.QuitChan: + + // If the context was canceled, stop. case <-ctx.Done(): } @@ -1194,9 +1210,11 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, return + // If a quit signal was provided by the swap, continue. case <-notifier.QuitChan: return + // If the context was canceled, stop. case <-ctx.Done(): return } From f4dfda5e999d711c3a7d083690d675cad8b16a95 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Tue, 8 Apr 2025 12:36:03 -0300 Subject: [PATCH 03/26] sweepbatcher: send spend error to notifier --- sweepbatcher/sweep_batch.go | 39 +++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index a437461ad..15e7d006c 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -1834,6 +1834,8 @@ func (b *batch) monitorSpend(ctx context.Context, primarySweep sweep) error { return case err := <-spendErr: + b.writeToSpendErrChan(ctx, err) + b.writeToErrChan( fmt.Errorf("spend error: %w", err), ) @@ -2285,6 +2287,43 @@ func (b *batch) writeToErrChan(err error) { } } +// writeToSpendErrChan sends an error to spend error channels of all the sweeps. +func (b *batch) writeToSpendErrChan(ctx context.Context, spendErr error) { + done, err := b.scheduleNextCall() + if err != nil { + done() + + return + } + notifiers := make([]*SpendNotifier, 0, len(b.sweeps)) + for _, s := range b.sweeps { + // If the sweep's notifier is empty then this means that a swap + // is not waiting to read an update from it, so we can skip + // the notification part. + if s.notifier == nil || s.notifier.SpendErrChan == nil { + continue + } + + notifiers = append(notifiers, s.notifier) + } + done() + + for _, notifier := range notifiers { + select { + // Try to write the error to the notification + // channel. + case notifier.SpendErrChan <- spendErr: + + // If a quit signal was provided by the swap, + // continue. + case <-notifier.QuitChan: + + // If the context was canceled, stop. + case <-ctx.Done(): + } + } +} + func (b *batch) persistSweep(ctx context.Context, sweep sweep, completed bool) error { From e22bc9e363dddc6cbc3a805e96da71d6ffd667ff Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Tue, 8 Apr 2025 15:33:05 -0300 Subject: [PATCH 04/26] test: fix error messages --- test/context.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/context.go b/test/context.go index 5fae59357..943a1dee6 100644 --- a/test/context.go +++ b/test/context.go @@ -61,7 +61,7 @@ func (ctx *Context) NotifySpend(tx *wire.MsgTx, inputIndex uint32) { SpenderInputIndex: inputIndex, }: case <-time.After(Timeout): - ctx.T.Fatalf("htlc spend not consumed") + ctx.T.Fatalf("spend not consumed") } } @@ -74,7 +74,7 @@ func (ctx *Context) NotifyConf(tx *wire.MsgTx) { Tx: tx, }: case <-time.After(Timeout): - ctx.T.Fatalf("htlc spend not consumed") + ctx.T.Fatalf("confirmation not consumed") } } @@ -86,7 +86,7 @@ func (ctx *Context) AssertRegisterSpendNtfn(script []byte) { case spendIntent := <-ctx.Lnd.RegisterSpendChannel: require.Equal( ctx.T, script, spendIntent.PkScript, - "server not listening for published htlc script", + "server not listening for published script", ) case <-time.After(Timeout): @@ -134,7 +134,7 @@ func (ctx *Context) AssertRegisterConf(expectTxHash bool, confs int32) *ConfRegi require.Equal(ctx.T, confs, confIntent.NumConfs) case <-time.After(Timeout): - ctx.T.Fatalf("htlc confirmed not subscribed to") + ctx.T.Fatalf("tx confirmed not subscribed to") } return confIntent @@ -249,7 +249,7 @@ func (ctx *Context) GetOutputIndex(tx *wire.MsgTx, } } - ctx.T.Fatal("htlc not present in tx") + ctx.T.Fatal("the output not present in tx") return 0 } From b6381a90bf3f6cb1ca74d86af81695b4c527b821 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Tue, 8 Apr 2025 18:07:13 -0300 Subject: [PATCH 05/26] sweepbatcher: fix mock TotalSweptAmount Forgot to return the calculated total value. --- sweepbatcher/store_mock.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sweepbatcher/store_mock.go b/sweepbatcher/store_mock.go index 90896aac1..0f6c5e3cb 100644 --- a/sweepbatcher/store_mock.go +++ b/sweepbatcher/store_mock.go @@ -212,5 +212,5 @@ func (s *StoreMock) TotalSweptAmount(ctx context.Context, batchID int32) ( } } - return 0, nil + return total, nil } From a8995c6ef45bcc05643ebb70e05b9a2e5bba7e02 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Tue, 8 Apr 2025 18:30:35 -0300 Subject: [PATCH 06/26] sweepbatcher: fix mistake in batch reading from DB --- sweepbatcher/store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sweepbatcher/store.go b/sweepbatcher/store.go index 01b9e74a9..bb245e5fb 100644 --- a/sweepbatcher/store.go +++ b/sweepbatcher/store.go @@ -259,7 +259,7 @@ func convertBatchRow(row sqlc.SweepBatch) *dbBatch { } if row.Confirmed { - batch.State = batchOpen + batch.State = batchConfirmed } if row.BatchTxID.Valid { From af80791b7c285e3d9d66f751ce5ba120b6212593 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Tue, 8 Apr 2025 18:40:36 -0300 Subject: [PATCH 07/26] test/chainnotifier_mock: support errors Support sending errors to error channels returned by RegisterSpendNtfn and RegisterConfirmationsNtfn. --- test/chainnotifier_mock.go | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/test/chainnotifier_mock.go b/test/chainnotifier_mock.go index 3889ce066..96e72f23f 100644 --- a/test/chainnotifier_mock.go +++ b/test/chainnotifier_mock.go @@ -36,6 +36,7 @@ type SpendRegistration struct { Outpoint *wire.OutPoint PkScript []byte HeightHint int32 + ErrChan chan<- error } // ConfRegistration contains registration details. @@ -45,18 +46,24 @@ type ConfRegistration struct { HeightHint int32 NumConfs int32 ConfChan chan *chainntnfs.TxConfirmation + ErrChan chan<- error } func (c *mockChainNotifier) RegisterSpendNtfn(ctx context.Context, outpoint *wire.OutPoint, pkScript []byte, heightHint int32) ( chan *chainntnfs.SpendDetail, chan error, error) { - c.lnd.RegisterSpendChannel <- &SpendRegistration{ + spendErrChan := make(chan error, 1) + + reg := &SpendRegistration{ HeightHint: heightHint, Outpoint: outpoint, PkScript: pkScript, + ErrChan: spendErrChan, } + c.lnd.RegisterSpendChannel <- reg + spendChan := make(chan *chainntnfs.SpendDetail, 1) errChan := make(chan error, 1) @@ -70,6 +77,13 @@ func (c *mockChainNotifier) RegisterSpendNtfn(ctx context.Context, case spendChan <- m: case <-ctx.Done(): } + + case err := <-spendErrChan: + select { + case errChan <- err: + case <-ctx.Done(): + } + case <-ctx.Done(): } }() @@ -129,12 +143,15 @@ func (c *mockChainNotifier) RegisterConfirmationsNtfn(ctx context.Context, opts ...lndclient.NotifierOption) (chan *chainntnfs.TxConfirmation, chan error, error) { + confErrChan := make(chan error, 1) + reg := &ConfRegistration{ PkScript: pkScript, TxID: txid, HeightHint: heightHint, NumConfs: numConfs, ConfChan: make(chan *chainntnfs.TxConfirmation, 1), + ErrChan: confErrChan, } c.Lock() @@ -169,6 +186,13 @@ func (c *mockChainNotifier) RegisterConfirmationsNtfn(ctx context.Context, } } c.Unlock() + + case err := <-confErrChan: + select { + case errChan <- err: + case <-ctx.Done(): + } + case <-ctx.Done(): } }() From b9f958b9911b3fa0bae6182e686ac35301645990 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Tue, 8 Apr 2025 18:43:21 -0300 Subject: [PATCH 08/26] sweepbatcher: cancel spendCtx after processing Function monitorSpendAndNotify used to cancel the context passed to RegisterSpendNtfn right after starting the goroutine processing results. Spend notifications were missed. Now the context is canceled when the goroutine finishes. --- sweepbatcher/sweep_batcher.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index c5498c46c..c8ee03b7a 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -1128,11 +1128,12 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, parentBatchID int32, notifier *SpendNotifier) error { spendCtx, cancel := context.WithCancel(ctx) - defer cancel() // Then we get the total amount that was swept by the batch. totalSwept, err := b.store.TotalSweptAmount(ctx, parentBatchID) if err != nil { + cancel() + return err } @@ -1141,11 +1142,14 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, sweep.initiationHeight, ) if err != nil { + cancel() + return err } b.wg.Add(1) go func() { + defer cancel() defer b.wg.Done() infof("Batcher monitoring spend for swap %x", sweep.swapHash[:6]) From ebd972e0dc1abb769b40e449171d5cf39a08d3c1 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Tue, 8 Apr 2025 18:48:34 -0300 Subject: [PATCH 09/26] sweepbatcher: remove unneded for loops The loop always had exactly one iteration. --- sweepbatcher/sweep_batch.go | 79 ++++++++++-------------- sweepbatcher/sweep_batcher.go | 112 ++++++++++++++++------------------ 2 files changed, 86 insertions(+), 105 deletions(-) diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index 15e7d006c..ee6236590 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -1822,29 +1822,22 @@ func (b *batch) monitorSpend(ctx context.Context, primarySweep sweep) error { b.Infof("monitoring spend for outpoint %s", primarySweep.outpoint.String()) - for { + select { + case spend := <-spendChan: select { - case spend := <-spendChan: - select { - case b.spendChan <- spend: - - case <-ctx.Done(): - } + case b.spendChan <- spend: - return + case <-ctx.Done(): + } - case err := <-spendErr: - b.writeToSpendErrChan(ctx, err) + case err := <-spendErr: + b.writeToSpendErrChan(ctx, err) - b.writeToErrChan( - fmt.Errorf("spend error: %w", err), - ) - - return + b.writeToErrChan( + fmt.Errorf("spend error: %w", err), + ) - case <-ctx.Done(): - return - } + case <-ctx.Done(): } }() @@ -1878,39 +1871,31 @@ func (b *batch) monitorConfirmations(ctx context.Context) error { defer cancel() defer b.wg.Done() - for { + select { + case conf := <-confChan: select { - case conf := <-confChan: - select { - case b.confChan <- conf: - - case <-ctx.Done(): - } - - return - - case err := <-errChan: - b.writeToErrChan(fmt.Errorf("confirmations "+ - "monitoring error: %w", err)) - - return - - case <-reorgChan: - // A re-org has been detected. We set the batch - // state back to open since our batch - // transaction is no longer present in any - // block. We can accept more sweeps and try to - // publish new transactions, at this point we - // need to monitor again for a new spend. - select { - case b.reorgChan <- struct{}{}: - case <-ctx.Done(): - } - return + case b.confChan <- conf: + + case <-ctx.Done(): + } + case err := <-errChan: + b.writeToErrChan(fmt.Errorf("confirmations "+ + "monitoring error: %w", err)) + + case <-reorgChan: + // A re-org has been detected. We set the batch + // state back to open since our batch + // transaction is no longer present in any + // block. We can accept more sweeps and try to + // publish new transactions, at this point we + // need to monitor again for a new spend. + select { + case b.reorgChan <- struct{}{}: case <-ctx.Done(): - return } + + case <-ctx.Done(): } }() diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index c8ee03b7a..f87abcaf1 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -1154,74 +1154,70 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, infof("Batcher monitoring spend for swap %x", sweep.swapHash[:6]) - for { - select { - case spend := <-spendChan: - spendTx := spend.SpendingTx - // Calculate the fee portion that each sweep - // should pay for the batch. - feePortionPerSweep, roundingDifference := - getFeePortionForSweep( - spendTx, len(spendTx.TxIn), - totalSwept, - ) - - onChainFeePortion := getFeePortionPaidBySweep( - spendTx, feePortionPerSweep, - roundingDifference, sweep, + select { + case spend := <-spendChan: + spendTx := spend.SpendingTx + // Calculate the fee portion that each sweep should pay + // for the batch. + feePortionPerSweep, roundingDifference := + getFeePortionForSweep( + spendTx, len(spendTx.TxIn), + totalSwept, ) - // Notify the requester of the spend - // with the spend details, including the fee - // portion for this particular sweep. - spendDetail := &SpendDetail{ - Tx: spendTx, - OnChainFeePortion: onChainFeePortion, - } - - select { - // Try to write the update to the notification - // channel. - case notifier.SpendChan <- spendDetail: - - // If a quit signal was provided by the swap, - // continue. - case <-notifier.QuitChan: - - // If the context was canceled, stop. - case <-ctx.Done(): - } - - return - - case err := <-spendErr: - select { - // Try to write the error to the notification - // channel. - case notifier.SpendErrChan <- err: - - // If a quit signal was provided by the swap, - // continue. - case <-notifier.QuitChan: - - // If the context was canceled, stop. - case <-ctx.Done(): - } - - b.writeToErrChan( - ctx, fmt.Errorf("spend error: %w", err), - ) + onChainFeePortion := getFeePortionPaidBySweep( + spendTx, feePortionPerSweep, + roundingDifference, sweep, + ) - return + // Notify the requester of the spend with the spend + // details, including the fee portion for this + // particular sweep. + spendDetail := &SpendDetail{ + Tx: spendTx, + OnChainFeePortion: onChainFeePortion, + } + + select { + // Try to write the update to the notification channel. + case notifier.SpendChan <- spendDetail: // If a quit signal was provided by the swap, continue. case <-notifier.QuitChan: - return // If the context was canceled, stop. case <-ctx.Done(): - return } + + return + + case err := <-spendErr: + select { + // Try to write the error to the notification + // channel. + case notifier.SpendErrChan <- err: + + // If a quit signal was provided by the swap, + // continue. + case <-notifier.QuitChan: + + // If the context was canceled, stop. + case <-ctx.Done(): + } + + b.writeToErrChan( + ctx, fmt.Errorf("spend error: %w", err), + ) + + return + + // If a quit signal was provided by the swap, continue. + case <-notifier.QuitChan: + return + + // If the context was canceled, stop. + case <-ctx.Done(): + return } }() From 7647526c6f343361bbce6efd7563f847e5ca046c Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Tue, 8 Apr 2025 20:42:14 -0300 Subject: [PATCH 10/26] sweepbatcher: store batch status before monitoring If monitorConfirmations fails, we still want to persist the state to DB. --- sweepbatcher/sweep_batch.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index ee6236590..cdc0d4451 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -2099,16 +2099,20 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { "purged swaps: %v, purged groups: %v", confirmedSweeps, purgedSweeps, purgedSwaps, len(purgeList)) - err = b.monitorConfirmations(ctx) - if err != nil { - return err - } - // We are no longer able to accept new sweeps, so we mark the batch as // closed and persist on storage. b.state = Closed - return b.persist(ctx) + if err := b.persist(ctx); err != nil { + return fmt.Errorf("saving batch failed: %w", err) + } + + err = b.monitorConfirmations(ctx) + if err != nil { + return fmt.Errorf("monitorConfirmations failed: %w", err) + } + + return nil } // handleConf handles a confirmation notification. This is the final step of the From 14a171d37635d3fef10243a816b0a8cf3f28db8b Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Tue, 8 Apr 2025 20:44:30 -0300 Subject: [PATCH 11/26] sweepbatcher: align dbBatch type with DB schema Previously, dbBatch had a State field (enum: Open, Closed, Confirmed), but in the database it is represented as a boolean Confirmed. The Closed state was stored the same way as Open. This wasn't an issue in practice, since an Open batch is quickly transitioned to Closed after startup. However, the in-memory mock stores plain dbBatch instances, leading to inconsistent behavior between the mock and the real DB-backed store. This commit updates dbBatch to match the database representation by replacing the State field with a Confirmed boolean. --- sweepbatcher/store.go | 23 ++++++----------------- sweepbatcher/store_mock.go | 6 +++--- sweepbatcher/sweep_batch.go | 27 +++++++-------------------- sweepbatcher/sweep_batcher.go | 35 +++++++++++++---------------------- 4 files changed, 29 insertions(+), 62 deletions(-) diff --git a/sweepbatcher/store.go b/sweepbatcher/store.go index bb245e5fb..1b87cde8b 100644 --- a/sweepbatcher/store.go +++ b/sweepbatcher/store.go @@ -213,8 +213,8 @@ type dbBatch struct { // ID is the unique identifier of the batch. ID int32 - // State is the current state of the batch. - State string + // Confirmed is set when the batch is fully confirmed. + Confirmed bool // BatchTxid is the txid of the batch transaction. BatchTxid chainhash.Hash @@ -255,11 +255,8 @@ type dbSweep struct { // convertBatchRow converts a batch row from db to a sweepbatcher.Batch struct. func convertBatchRow(row sqlc.SweepBatch) *dbBatch { batch := dbBatch{ - ID: row.ID, - } - - if row.Confirmed { - batch.State = batchConfirmed + ID: row.ID, + Confirmed: row.Confirmed, } if row.BatchTxID.Valid { @@ -288,7 +285,7 @@ func convertBatchRow(row sqlc.SweepBatch) *dbBatch { // it into the database. func batchToInsertArgs(batch dbBatch) sqlc.InsertBatchParams { args := sqlc.InsertBatchParams{ - Confirmed: false, + Confirmed: batch.Confirmed, BatchTxID: sql.NullString{ Valid: true, String: batch.BatchTxid.String(), @@ -305,10 +302,6 @@ func batchToInsertArgs(batch dbBatch) sqlc.InsertBatchParams { MaxTimeoutDistance: batch.MaxTimeoutDistance, } - if batch.State == batchConfirmed { - args.Confirmed = true - } - return args } @@ -317,7 +310,7 @@ func batchToInsertArgs(batch dbBatch) sqlc.InsertBatchParams { func batchToUpdateArgs(batch dbBatch) sqlc.UpdateBatchParams { args := sqlc.UpdateBatchParams{ ID: batch.ID, - Confirmed: false, + Confirmed: batch.Confirmed, BatchTxID: sql.NullString{ Valid: true, String: batch.BatchTxid.String(), @@ -333,10 +326,6 @@ func batchToUpdateArgs(batch dbBatch) sqlc.UpdateBatchParams { }, } - if batch.State == batchConfirmed { - args.Confirmed = true - } - return args } diff --git a/sweepbatcher/store_mock.go b/sweepbatcher/store_mock.go index 0f6c5e3cb..d5a3ffbc1 100644 --- a/sweepbatcher/store_mock.go +++ b/sweepbatcher/store_mock.go @@ -36,7 +36,7 @@ func (s *StoreMock) FetchUnconfirmedSweepBatches(ctx context.Context) ( result := []*dbBatch{} for _, batch := range s.batches { - if batch.State != "confirmed" { + if !batch.Confirmed { result = append(result, &batch) } } @@ -91,7 +91,7 @@ func (s *StoreMock) ConfirmBatch(ctx context.Context, id int32) error { return errors.New("batch not found") } - batch.State = "confirmed" + batch.Confirmed = true s.batches[batch.ID] = batch return nil @@ -201,7 +201,7 @@ func (s *StoreMock) TotalSweptAmount(ctx context.Context, batchID int32) ( return 0, errors.New("batch not found") } - if batch.State != batchConfirmed && batch.State != batchClosed { + if !batch.Confirmed { return 0, nil } diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index cdc0d4451..a54d634c3 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -135,7 +135,9 @@ const ( Open batchState = 0 // Closed is the state in which the batch is no longer able to accept - // new sweeps. + // new sweeps. NOTE: this state exists only in-memory. In the database + // it is stored as Open and converted to Closed after a spend + // notification arrives (quickly after start of Batch.Run). Closed batchState = 1 // Confirmed is the state in which the batch transaction has reached the @@ -870,8 +872,8 @@ func (b *batch) Run(ctx context.Context) error { // completes. timerChan := clock.TickAfter(b.cfg.batchPublishDelay) - b.Infof("started, primary %s, total sweeps %d", - b.primarySweepID, len(b.sweeps)) + b.Infof("started, primary %s, total sweeps %d, state: %d", + b.primarySweepID, len(b.sweeps), b.state) for { // If the batch is not empty, find earliest initialDelay. @@ -2180,7 +2182,7 @@ func (b *batch) persist(ctx context.Context) error { bch := &dbBatch{} bch.ID = b.id - bch.State = stateEnumToString(b.state) + bch.Confirmed = b.state == Confirmed if b.batchTxid != nil { bch.BatchTxid = *b.batchTxid @@ -2239,7 +2241,7 @@ func (b *batch) getBatchDestAddr(ctx context.Context) (btcutil.Address, error) { func (b *batch) insertAndAcquireID(ctx context.Context) (int32, error) { bch := &dbBatch{} - bch.State = stateEnumToString(b.state) + bch.Confirmed = b.state == Confirmed bch.MaxTimeoutDistance = b.cfg.maxTimeoutDistance id, err := b.store.InsertSweepBatch(ctx, bch) @@ -2340,18 +2342,3 @@ func clampBatchFee(fee btcutil.Amount, return fee } - -func stateEnumToString(state batchState) string { - switch state { - case Open: - return batchOpen - - case Closed: - return batchClosed - - case Confirmed: - return batchConfirmed - } - - return "" -} diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index f87abcaf1..9e3a7ab6e 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -31,18 +31,6 @@ const ( // of sweeps that can appear in the same batch. defaultMaxTimeoutDistance = 288 - // batchOpen is the string representation of the state of a batch that - // is open. - batchOpen = "open" - - // batchClosed is the string representation of the state of a batch - // that is closed. - batchClosed = "closed" - - // batchConfirmed is the string representation of the state of a batch - // that is confirmed. - batchConfirmed = "confirmed" - // defaultMainnetPublishDelay is the default publish delay that is used // for mainnet. defaultMainnetPublishDelay = 5 * time.Second @@ -760,7 +748,7 @@ func (b *Batcher) AddSweep(ctx context.Context, sweepReq *SweepRequest) error { "sweep %x: %w", sweep.swapHash[:6], err) } - if parentBatch.State == batchConfirmed { + if parentBatch.Confirmed { fullyConfirmed = true } } @@ -844,7 +832,7 @@ func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep, if completed && *notifier != (SpendNotifier{}) { // The parent batch is indeed confirmed, meaning it is complete // and we won't be able to attach this sweep to it. - if parentBatch.State == batchConfirmed { + if parentBatch.Confirmed { return b.monitorSpendAndNotify( ctx, sweep, parentBatch.ID, notifier, ) @@ -1093,15 +1081,18 @@ func (b *Batcher) FetchUnconfirmedBatches(ctx context.Context) ([]*batch, batch := batch{} batch.id = bch.ID - switch bch.State { - case batchOpen: - batch.state = Open - - case batchClosed: - batch.state = Closed - - case batchConfirmed: + if bch.Confirmed { batch.state = Confirmed + } else { + // We don't store Closed state separately in DB. + // If the batch is closed (included into a block, but + // not fully confirmed), it is now considered Open + // again. It will receive a spending notification as + // soon as it starts, so it is not an issue. If a sweep + // manages to be added during this time, it will be + // detected as missing when analyzing the spend + // notification and will be added to new batch. + batch.state = Open } batch.batchTxid = &bch.BatchTxid From 2de4c8ac8e27207781c6a8c654eb12efb342273a Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Tue, 8 Apr 2025 18:46:35 -0300 Subject: [PATCH 12/26] sweepbatcher: test spending notification and error --- sweepbatcher/sweep_batcher_test.go | 244 ++++++++++++++++++++++++++--- 1 file changed, 220 insertions(+), 24 deletions(-) diff --git a/sweepbatcher/sweep_batcher_test.go b/sweepbatcher/sweep_batcher_test.go index cd5bab01c..13188ff54 100644 --- a/sweepbatcher/sweep_batcher_test.go +++ b/sweepbatcher/sweep_batcher_test.go @@ -762,9 +762,9 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer, testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams, batcherStore, sweepStore) + runErrChan := make(chan error) go func() { - err := batcher.Run(ctx) - checkBatcherError(t, err) + runErrChan <- batcher.Run(ctx) }() // Create a sweep request. @@ -772,13 +772,24 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, Hash: chainhash.Hash{1, 1}, Index: 1, } + const ( + inputValue = 111 + outputValue = 50 + fee = inputValue - outputValue + ) + spendErrChan := make(chan error, 1) + notifier := &SpendNotifier{ + SpendChan: make(chan *SpendDetail, 1), + SpendErrChan: spendErrChan, + QuitChan: make(chan bool, 1), + } sweepReq1 := SweepRequest{ SwapHash: lntypes.Hash{1, 1, 1}, Inputs: []Input{{ - Value: 111, + Value: inputValue, Outpoint: op1, }}, - Notifier: &dummyNotifier, + Notifier: notifier, } const initiationHeight = 550 @@ -786,7 +797,7 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, swap1 := &loopdb.LoopOutContract{ SwapContract: loopdb.SwapContract{ CltvExpiry: 111, - AmountRequested: 111, + AmountRequested: inputValue, ProtocolVersion: loopdb.ProtocolVersionMuSig2, HtlcKeys: htlcKeys, InitiationHeight: initiationHeight, @@ -806,33 +817,27 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, // When batch is successfully created it will execute it's first step, // which leads to a spend monitor of the primary sweep. - <-lnd.RegisterSpendChannel + spendReg := <-lnd.RegisterSpendChannel + + // Wait for tx to be published. + <-lnd.TxPublishChannel // Eventually request will be consumed and a new batch will spin up. + var primarySweepID wire.OutPoint require.Eventually(t, func() bool { - return batcher.numBatches(ctx) == 1 - }, test.Timeout, eventuallyCheckFrequency) + batch := tryGetOnlyBatch(ctx, batcher) + if batch == nil { + return false + } - // Find the batch and assign it to a local variable for easier access. - batch := &batch{} - for _, btch := range getBatches(ctx, batcher) { - btch.testRunInEventLoop(ctx, func() { - if btch.primarySweepID == op1 { - batch = btch - } - }) - } + primarySweepID = batch.snapshot(ctx).primarySweepID - require.Eventually(t, func() bool { // Batch should have the sweep stored. return batch.numSweeps(ctx) == 1 }, test.Timeout, eventuallyCheckFrequency) // The primary sweep id should be that of the first inserted sweep. - require.Equal(t, batch.primarySweepID, op1) - - // Wait for tx to be published. - <-lnd.TxPublishChannel + require.Equal(t, primarySweepID, op1) err = lnd.NotifyHeight(601) require.NoError(t, err) @@ -840,7 +845,11 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, // After receiving a height notification the batch will step again, // leading to a new spend monitoring. require.Eventually(t, func() bool { - batch := batch.snapshot(ctx) + batch := tryGetOnlyBatch(ctx, batcher) + if batch == nil { + return false + } + batch = batch.snapshot(ctx) return batch.currentHeight == 601 }, test.Timeout, eventuallyCheckFrequency) @@ -848,6 +857,58 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, // Wait for tx to be published. <-lnd.TxPublishChannel + // Emulate spend error. + testError := errors.New("test error") + spendReg.ErrChan <- testError + + // Make sure the caller of AddSweep got the spending error. + notifierErr := <-spendErrChan + require.Error(t, notifierErr) + require.ErrorIs(t, notifierErr, testError) + + // Wait for the batcher to crash because of the spending error. + runErr := <-runErrChan + require.ErrorIs(t, runErr, testError) + + // Now launch the batcher again. + batcher = NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer, + testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams, + batcherStore, sweepStore) + go func() { + runErrChan <- batcher.Run(ctx) + }() + + // When batch is successfully created it will execute it's first step, + // which leads to a spend monitor of the primary sweep. + <-lnd.RegisterSpendChannel + + // Wait for tx to be published. + <-lnd.TxPublishChannel + + // Deliver sweep request to batcher. + spendChan := make(chan *SpendDetail, 1) + notifier = &SpendNotifier{ + SpendChan: spendChan, + SpendErrChan: make(chan error, 1), + QuitChan: make(chan bool, 1), + } + sweepReq1.Notifier = notifier + require.NoError(t, batcher.AddSweep(ctx, &sweepReq1)) + + // Wait for the notifier to be installed. + require.Eventually(t, func() bool { + batch := tryGetOnlyBatch(ctx, batcher) + if batch == nil { + return false + } + batch = batch.snapshot(ctx) + + sweep := batch.sweeps[batch.primarySweepID] + + return sweep.notifier != nil && + sweep.notifier.SpendChan == spendChan + }, test.Timeout, eventuallyCheckFrequency) + // Create the spending tx that will trigger the spend monitor of the // batch. spendingTx := &wire.MsgTx{ @@ -861,6 +922,7 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, }, TxOut: []*wire.TxOut{ { + Value: outputValue, PkScript: []byte{3, 2, 1}, }, }, @@ -879,6 +941,11 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, // We notify the spend. lnd.SpendChannel <- spendDetail + // Make sure the notifier got a proper spending notification. + spending := <-spendChan + require.Equal(t, spendingTxHash, spending.Tx.TxHash()) + require.Equal(t, btcutil.Amount(fee), spending.OnChainFeePortion) + // After receiving the spend, the batch is now monitoring for confs. confReg := <-lnd.RegisterConfChannel @@ -889,7 +956,84 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, // The batch should eventually read the spend notification and progress // its state to closed. require.Eventually(t, func() bool { - batch := batch.snapshot(ctx) + batch := tryGetOnlyBatch(ctx, batcher) + if batch == nil { + return false + } + batch = batch.snapshot(ctx) + + return batch.state == Closed + }, test.Timeout, eventuallyCheckFrequency) + + // Emulate a confirmation error. + confReg.ErrChan <- testError + + // Wait for the batcher to crash because of the confirmation error. + runErr = <-runErrChan + require.ErrorIs(t, runErr, testError) + + // Now launch the batcher again. + batcher = NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer, + testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams, + batcherStore, sweepStore) + go func() { + runErrChan <- batcher.Run(ctx) + }() + + // When batch is successfully created it will execute it's first step, + // which leads to a spend monitor of the primary sweep. + <-lnd.RegisterSpendChannel + + // Deliver sweep request to batcher. + spendChan = make(chan *SpendDetail, 1) + notifier = &SpendNotifier{ + SpendChan: spendChan, + SpendErrChan: make(chan error, 1), + QuitChan: make(chan bool, 1), + } + sweepReq1.Notifier = notifier + require.NoError(t, batcher.AddSweep(ctx, &sweepReq1)) + + // Wait for tx to be published. A closed batch is stored in DB as Open. + <-lnd.TxPublishChannel + + // Wait for the notifier to be installed. + require.Eventually(t, func() bool { + batch := tryGetOnlyBatch(ctx, batcher) + if batch == nil { + return false + } + batch = batch.snapshot(ctx) + + sweep := batch.sweeps[batch.primarySweepID] + + return sweep.notifier != nil && + sweep.notifier.SpendChan == spendChan + }, test.Timeout, eventuallyCheckFrequency) + + // We notify the spend. + lnd.SpendChannel <- spendDetail + + // Make sure the notifier got a proper spending notification. + spending = <-spendChan + require.Equal(t, spendingTxHash, spending.Tx.TxHash()) + require.Equal(t, btcutil.Amount(fee), spending.OnChainFeePortion) + + // After receiving the spend, the batch is now monitoring for confs. + confReg = <-lnd.RegisterConfChannel + + // Make sure the confirmation has proper height hint. It should pass + // the swap initiation height, not the current height. + require.Equal(t, int32(initiationHeight), confReg.HeightHint) + + // The batch should eventually read the spend notification and progress + // its state to closed. + require.Eventually(t, func() bool { + batch := tryGetOnlyBatch(ctx, batcher) + if batch == nil { + return false + } + batch = batch.snapshot(ctx) return batch.state == Closed }, test.Timeout, eventuallyCheckFrequency) @@ -905,8 +1049,60 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, // Eventually the batch receives the confirmation notification and // confirms itself. require.Eventually(t, func() bool { + batch := tryGetOnlyBatch(ctx, batcher) + if batch == nil { + return false + } + return batch.isComplete() }, test.Timeout, eventuallyCheckFrequency) + + // Now emulate adding the sweep again after it was fully confirmed. + // This triggers another code path (monitorSpendAndNotify). + spendChan = make(chan *SpendDetail, 1) + notifier = &SpendNotifier{ + SpendChan: spendChan, + SpendErrChan: make(chan error, 1), + QuitChan: make(chan bool, 1), + } + sweepReq1.Notifier = notifier + require.NoError(t, batcher.AddSweep(ctx, &sweepReq1)) + + // Expect a spending registration. + <-lnd.RegisterSpendChannel + + // We notify the spend. + lnd.SpendChannel <- spendDetail + + // Now expect the notifier to produce the spending details. + spending = <-spendChan + require.Equal(t, spendingTxHash, spending.Tx.TxHash()) + require.Equal(t, btcutil.Amount(fee), spending.OnChainFeePortion) + + // Now check what happens in case of a spending error. + spendErrChan = make(chan error, 1) + notifier = &SpendNotifier{ + SpendChan: make(chan *SpendDetail, 1), + SpendErrChan: spendErrChan, + QuitChan: make(chan bool, 1), + } + sweepReq1.Notifier = notifier + require.NoError(t, batcher.AddSweep(ctx, &sweepReq1)) + + // Expect a spending registration. + spendReg = <-lnd.RegisterSpendChannel + + // Emulate spend error. + spendReg.ErrChan <- testError + + // Make sure the caller of AddSweep got the spending error. + notifierErr = <-spendErrChan + require.Error(t, notifierErr) + require.ErrorIs(t, notifierErr, testError) + + // Wait for the batcher to crash because of the spending error. + runErr = <-runErrChan + require.ErrorIs(t, runErr, testError) } // wrappedLogger implements btclog.Logger, recording last debug message format. From 8bb5401a81e058231a0be738cbb788a39ee238d5 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Tue, 8 Apr 2025 23:15:23 -0300 Subject: [PATCH 13/26] sweepbatcher: notify caller about confirmations Add fields ConfChan and ConfErrChan to SpendNotifier type which is a part of SweepRequest passed to AddSweep method. This is needed to reuse confirmation notifications on the calling side the same way it is done for spending notifications. --- sweepbatcher/sweep_batch.go | 72 ++++++++++++++++++++++- sweepbatcher/sweep_batcher.go | 94 +++++++++++++++++++++++++++++- sweepbatcher/sweep_batcher_test.go | 67 ++++++++++++++++++++- 3 files changed, 230 insertions(+), 3 deletions(-) diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index a54d634c3..6103d07dc 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -1882,6 +1882,8 @@ func (b *batch) monitorConfirmations(ctx context.Context) error { } case err := <-errChan: + b.writeToConfErrChan(ctx, err) + b.writeToErrChan(fmt.Errorf("confirmations "+ "monitoring error: %w", err)) @@ -2159,7 +2161,37 @@ func (b *batch) handleConf(ctx context.Context, b.Infof("confirmed in txid %s", b.batchTxid) b.state = Confirmed - return b.store.ConfirmBatch(ctx, b.id) + if err := b.store.ConfirmBatch(ctx, b.id); err != nil { + return fmt.Errorf("failed to store confirmed state: %w", err) + } + + // Send the confirmation to all the notifiers. + for _, s := range b.sweeps { + // If the sweep's notifier is empty then this means that + // a swap is not waiting to read an update from it, so + // we can skip the notification part. + if s.notifier == nil || s.notifier.ConfChan == nil { + continue + } + + // Notify the caller in a goroutine to avoid possible dead-lock. + go func(notifier *SpendNotifier) { + // Note that we don't unblock on ctx, because it will + // expire soon, when batch.Run completes. The caller is + // responsible to consume ConfChan or close QuitChan. + select { + // Try to write the confirmation to the notification + // channel. + case notifier.ConfChan <- conf: + + // If a quit signal was provided by the swap, + // continue. + case <-notifier.QuitChan: + } + }(s.notifier) + } + + return nil } // isComplete returns true if the batch is completed. This method is used by the @@ -2315,6 +2347,44 @@ func (b *batch) writeToSpendErrChan(ctx context.Context, spendErr error) { } } +// writeToConfErrChan sends an error to confirmation error channels of all the +// sweeps. +func (b *batch) writeToConfErrChan(ctx context.Context, confErr error) { + done, err := b.scheduleNextCall() + if err != nil { + done() + + return + } + notifiers := make([]*SpendNotifier, 0, len(b.sweeps)) + for _, s := range b.sweeps { + // If the sweep's notifier is empty then this means that a swap + // is not waiting to read an update from it, so we can skip + // the notification part. + if s.notifier == nil || s.notifier.ConfErrChan == nil { + continue + } + + notifiers = append(notifiers, s.notifier) + } + done() + + for _, notifier := range notifiers { + select { + // Try to write the error to the notification + // channel. + case notifier.ConfErrChan <- confErr: + + // If a quit signal was provided by the swap, + // continue. + case <-notifier.QuitChan: + + // If the context was canceled, stop. + case <-ctx.Done(): + } + } +} + func (b *batch) persistSweep(ctx context.Context, sweep sweep, completed bool) error { diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index 9e3a7ab6e..89668f3da 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -20,6 +20,7 @@ import ( "github.com/lightninglabs/loop/loopdb" "github.com/lightninglabs/loop/swap" "github.com/lightninglabs/loop/utils" + "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/lntypes" @@ -300,6 +301,14 @@ type SpendNotifier struct { // SpendErrChan is a channel where spend errors are received. SpendErrChan chan<- error + // ConfChan is a channel where the confirmation details are received. + // This channel is optional. + ConfChan chan<- *chainntnfs.TxConfirmation + + // ConfErrChan is a channel where confirmation errors are received. + // This channel is optional. + ConfErrChan chan<- error + // QuitChan is a channel that can be closed to stop the notifier. QuitChan <-chan bool } @@ -1114,7 +1123,9 @@ func (b *Batcher) FetchUnconfirmedBatches(ctx context.Context) ([]*batch, } // monitorSpendAndNotify monitors the spend of a specific outpoint and writes -// the response back to the response channel. +// the response back to the response channel. It is called if the batch is fully +// confirmed and we just need to deliver the data back to the caller though +// SpendNotifier. func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, parentBatchID int32, notifier *SpendNotifier) error { @@ -1172,6 +1183,15 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, select { // Try to write the update to the notification channel. case notifier.SpendChan <- spendDetail: + err := b.monitorConfAndNotify( + ctx, sweep, notifier, spendTx, + ) + if err != nil { + b.writeToErrChan( + ctx, fmt.Errorf("monitor conf "+ + "failed: %w", err), + ) + } // If a quit signal was provided by the swap, continue. case <-notifier.QuitChan: @@ -1215,6 +1235,78 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, return nil } +// monitorConfAndNotify monitors the confirmation of a specific transaction and +// writes the response back to the response channel. It is called if the batch +// is fully confirmed and we just need to deliver the data back to the caller +// though SpendNotifier. +func (b *Batcher) monitorConfAndNotify(ctx context.Context, sweep *sweep, + notifier *SpendNotifier, spendTx *wire.MsgTx) error { + + // If confirmation notifications were not requested, stop. + if notifier.ConfChan == nil && notifier.ConfErrChan == nil { + return nil + } + + batchTxid := spendTx.TxHash() + + if len(spendTx.TxOut) != 1 { + return fmt.Errorf("unexpected number of outputs in batch: %d, "+ + "want %d", len(spendTx.TxOut), 1) + } + batchPkScript := spendTx.TxOut[0].PkScript + + reorgChan := make(chan struct{}) + + confCtx, cancel := context.WithCancel(ctx) + + confChan, errChan, err := b.chainNotifier.RegisterConfirmationsNtfn( + confCtx, &batchTxid, batchPkScript, batchConfHeight, + sweep.initiationHeight, lndclient.WithReOrgChan(reorgChan), + ) + if err != nil { + cancel() + return err + } + + b.wg.Add(1) + go func() { + defer cancel() + defer b.wg.Done() + + select { + case conf := <-confChan: + if notifier.ConfChan != nil { + select { + case notifier.ConfChan <- conf: + case <-notifier.QuitChan: + case <-ctx.Done(): + } + } + + case err := <-errChan: + if notifier.ConfErrChan != nil { + select { + case notifier.ConfErrChan <- err: + case <-notifier.QuitChan: + case <-ctx.Done(): + } + } + + b.writeToErrChan(ctx, fmt.Errorf("confirmations "+ + "monitoring error: %w", err)) + + case <-reorgChan: + // A re-org has been detected, but the batch is fully + // confirmed and this is unexpected. Crash the batcher. + b.writeToErrChan(ctx, fmt.Errorf("unexpected reorg")) + + case <-ctx.Done(): + } + }() + + return nil +} + func (b *Batcher) writeToErrChan(ctx context.Context, err error) { select { case b.errChan <- err: diff --git a/sweepbatcher/sweep_batcher_test.go b/sweepbatcher/sweep_batcher_test.go index 13188ff54..409b653af 100644 --- a/sweepbatcher/sweep_batcher_test.go +++ b/sweepbatcher/sweep_batcher_test.go @@ -887,9 +887,11 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, // Deliver sweep request to batcher. spendChan := make(chan *SpendDetail, 1) + confErrChan := make(chan error) notifier = &SpendNotifier{ SpendChan: spendChan, SpendErrChan: make(chan error, 1), + ConfErrChan: confErrChan, QuitChan: make(chan bool, 1), } sweepReq1.Notifier = notifier @@ -968,6 +970,10 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, // Emulate a confirmation error. confReg.ErrChan <- testError + // Make sure the notifier gets the confirmation error. + confErr := <-confErrChan + require.ErrorIs(t, confErr, testError) + // Wait for the batcher to crash because of the confirmation error. runErr = <-runErrChan require.ErrorIs(t, runErr, testError) @@ -986,9 +992,11 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, // Deliver sweep request to batcher. spendChan = make(chan *SpendDetail, 1) + confChan := make(chan *chainntnfs.TxConfirmation) notifier = &SpendNotifier{ SpendChan: spendChan, SpendErrChan: make(chan error, 1), + ConfChan: confChan, QuitChan: make(chan bool, 1), } sweepReq1.Notifier = notifier @@ -1043,9 +1051,15 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, // We mock the tx confirmation notification. lnd.ConfChannel <- &chainntnfs.TxConfirmation{ - Tx: spendingTx, + BlockHeight: 604, + Tx: spendingTx, } + // Make sure the notifier gets a confirmation notification. + conf := <-confChan + require.Equal(t, uint32(604), conf.BlockHeight) + require.Equal(t, spendingTx.TxHash(), conf.Tx.TxHash()) + // Eventually the batch receives the confirmation notification and // confirms itself. require.Eventually(t, func() bool { @@ -1060,9 +1074,11 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, // Now emulate adding the sweep again after it was fully confirmed. // This triggers another code path (monitorSpendAndNotify). spendChan = make(chan *SpendDetail, 1) + confChan = make(chan *chainntnfs.TxConfirmation) notifier = &SpendNotifier{ SpendChan: spendChan, SpendErrChan: make(chan error, 1), + ConfChan: confChan, QuitChan: make(chan bool, 1), } sweepReq1.Notifier = notifier @@ -1079,6 +1095,18 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, require.Equal(t, spendingTxHash, spending.Tx.TxHash()) require.Equal(t, btcutil.Amount(fee), spending.OnChainFeePortion) + // We mock the tx confirmation notification. + <-lnd.RegisterConfChannel + lnd.ConfChannel <- &chainntnfs.TxConfirmation{ + BlockHeight: 604, + Tx: spendingTx, + } + + // Make sure the notifier gets a confirmation notification. + conf = <-confChan + require.Equal(t, uint32(604), conf.BlockHeight) + require.Equal(t, spendingTx.TxHash(), conf.Tx.TxHash()) + // Now check what happens in case of a spending error. spendErrChan = make(chan error, 1) notifier = &SpendNotifier{ @@ -1103,6 +1131,43 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, // Wait for the batcher to crash because of the spending error. runErr = <-runErrChan require.ErrorIs(t, runErr, testError) + + // Now launch the batcher again. + batcher = NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer, + testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams, + batcherStore, sweepStore) + go func() { + runErrChan <- batcher.Run(ctx) + }() + + // Now check what happens in case of a confirmation error. + confErrChan = make(chan error, 1) + notifier = &SpendNotifier{ + SpendChan: make(chan *SpendDetail, 1), + SpendErrChan: make(chan error, 1), + ConfErrChan: confErrChan, + QuitChan: make(chan bool, 1), + } + sweepReq1.Notifier = notifier + require.NoError(t, batcher.AddSweep(ctx, &sweepReq1)) + + // Expect a spending registration. + <-lnd.RegisterSpendChannel + + // We notify the spend. + lnd.SpendChannel <- spendDetail + + // We mock the tx confirmation error notification. + confReg = <-lnd.RegisterConfChannel + confReg.ErrChan <- testError + + // Make sure the notifier gets the confirmation error. + confErr = <-confErrChan + require.ErrorIs(t, confErr, testError) + + // Wait for the batcher to crash because of the confirmation error. + runErr = <-runErrChan + require.ErrorIs(t, runErr, testError) } // wrappedLogger implements btclog.Logger, recording last debug message format. From cd29f433ea3368c7e8b42e1ac48a5b8f17c230d5 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Mon, 28 Apr 2025 02:07:40 -0300 Subject: [PATCH 14/26] test/chainnotifier: send to specific spend reg This is needed to have multiple spending registrations running and to send a notification to a specific spending registration. --- test/chainnotifier_mock.go | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/test/chainnotifier_mock.go b/test/chainnotifier_mock.go index 96e72f23f..a4fae0e77 100644 --- a/test/chainnotifier_mock.go +++ b/test/chainnotifier_mock.go @@ -33,10 +33,11 @@ func (c *mockChainNotifier) RawClientWithMacAuth( // SpendRegistration contains registration details. type SpendRegistration struct { - Outpoint *wire.OutPoint - PkScript []byte - HeightHint int32 - ErrChan chan<- error + Outpoint *wire.OutPoint + PkScript []byte + HeightHint int32 + SpendChannel chan<- *chainntnfs.SpendDetail + ErrChan chan<- error } // ConfRegistration contains registration details. @@ -53,13 +54,15 @@ func (c *mockChainNotifier) RegisterSpendNtfn(ctx context.Context, outpoint *wire.OutPoint, pkScript []byte, heightHint int32) ( chan *chainntnfs.SpendDetail, chan error, error) { + spendChan0 := make(chan *chainntnfs.SpendDetail) spendErrChan := make(chan error, 1) reg := &SpendRegistration{ - HeightHint: heightHint, - Outpoint: outpoint, - PkScript: pkScript, - ErrChan: spendErrChan, + HeightHint: heightHint, + Outpoint: outpoint, + PkScript: pkScript, + SpendChannel: spendChan0, + ErrChan: spendErrChan, } c.lnd.RegisterSpendChannel <- reg @@ -78,6 +81,12 @@ func (c *mockChainNotifier) RegisterSpendNtfn(ctx context.Context, case <-ctx.Done(): } + case m := <-spendChan0: + select { + case spendChan <- m: + case <-ctx.Done(): + } + case err := <-spendErrChan: select { case errChan <- err: From 12d3cc04bc581a02e25fd7425d288e1b57fcd0c8 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Sat, 26 Apr 2025 01:01:39 -0300 Subject: [PATCH 15/26] sweepbatcher: re-add sweeps after fully confirmed In case of a reorg sweeps should not go to another batch but stay in the current batch until it is fully confirmed. Only after that the remaining sweeps are re-added to another batch. Field sweep.completed is now set to true only for fully-confirmed sweeps. --- sweepbatcher/store.go | 2 +- sweepbatcher/sweep_batch.go | 239 ++++++++++--------- sweepbatcher/sweep_batcher.go | 23 +- sweepbatcher/sweep_batcher_presigned_test.go | 98 +++++++- sweepbatcher/sweep_batcher_test.go | 15 +- 5 files changed, 234 insertions(+), 143 deletions(-) diff --git a/sweepbatcher/store.go b/sweepbatcher/store.go index 1b87cde8b..db41287bc 100644 --- a/sweepbatcher/store.go +++ b/sweepbatcher/store.go @@ -248,7 +248,7 @@ type dbSweep struct { // Amount is the amount of the sweep. Amount btcutil.Amount - // Completed indicates whether this sweep is completed. + // Completed indicates whether this sweep is fully-confirmed. Completed bool } diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index 6103d07dc..c54269e77 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -1939,7 +1939,6 @@ func getFeePortionPaidBySweep(spendTx *wire.MsgTx, feePortionPerSweep, func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { var ( txHash = spendTx.TxHash() - purgeList = make([]SweepRequest, 0, len(b.sweeps)) notifyList = make([]sweep, 0, len(b.sweeps)) ) b.batchTxid = &txHash @@ -1949,7 +1948,106 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { b.Warnf("transaction %v has no outputs", txHash) } - // Determine if we should use presigned mode for the batch. + // Make a set of confirmed sweeps. + confirmedSet := make(map[wire.OutPoint]struct{}, len(spendTx.TxIn)) + for _, txIn := range spendTx.TxIn { + confirmedSet[txIn.PreviousOutPoint] = struct{}{} + } + + // As a previous version of the batch transaction may get confirmed, + // which does not contain the latest sweeps, we need to detect the + // sweeps that did not make it to the confirmed transaction and feed + // them back to the batcher. This will ensure that the sweeps will enter + // a new batch instead of remaining dangling. + var ( + totalSweptAmt btcutil.Amount + confirmedSweeps = []wire.OutPoint{} + ) + for _, sweep := range b.sweeps { + // Skip sweeps that were not included into the confirmed tx. + _, found := confirmedSet[sweep.outpoint] + if !found { + continue + } + + totalSweptAmt += sweep.value + notifyList = append(notifyList, sweep) + confirmedSweeps = append(confirmedSweeps, sweep.outpoint) + } + + // Calculate the fee portion that each sweep should pay for the batch. + feePortionPaidPerSweep, roundingDifference := getFeePortionForSweep( + spendTx, len(notifyList), totalSweptAmt, + ) + + for _, sweep := range notifyList { + // If the sweep's notifier is empty then this means that a swap + // is not waiting to read an update from it, so we can skip + // the notification part. + if sweep.notifier == nil || + *sweep.notifier == (SpendNotifier{}) { + + continue + } + + spendDetail := SpendDetail{ + Tx: spendTx, + OnChainFeePortion: getFeePortionPaidBySweep( + spendTx, feePortionPaidPerSweep, + roundingDifference, &sweep, + ), + } + + // Dispatch the sweep notifier, we don't care about the outcome + // of this action so we don't wait for it. + go func() { + // Make sure this context doesn't expire so we + // successfully notify the caller. + ctx := context.WithoutCancel(ctx) + + sweep.notifySweepSpend(ctx, &spendDetail) + }() + } + + b.Infof("spent, confirmed sweeps: %v", confirmedSweeps) + + // We are no longer able to accept new sweeps, so we mark the batch as + // closed and persist on storage. + b.state = Closed + + if err := b.persist(ctx); err != nil { + return fmt.Errorf("saving batch failed: %w", err) + } + + if err := b.monitorConfirmations(ctx); err != nil { + return fmt.Errorf("monitorConfirmations failed: %w", err) + } + + return nil +} + +// handleConf handles a confirmation notification. This is the final step of the +// batch. Here we signal to the batcher that this batch was completed. +func (b *batch) handleConf(ctx context.Context, + conf *chainntnfs.TxConfirmation) error { + + spendTx := conf.Tx + txHash := spendTx.TxHash() + if b.batchTxid == nil || *b.batchTxid != txHash { + b.Warnf("Mismatch of batch txid: tx in spend notification had "+ + "txid %v, but confirmation notification has txif %v. "+ + "Using the later.", b.batchTxid, txHash) + } + b.batchTxid = &txHash + + b.Infof("confirmed in txid %s", b.batchTxid) + b.state = Confirmed + + if err := b.persist(ctx); err != nil { + return fmt.Errorf("saving batch failed: %w", err) + } + + // If the batch is in presigned mode, cleanup presignedHelper. presigned, err := b.isPresigned() if err != nil { return fmt.Errorf("failed to determine if the batch %d uses "+ @@ -1967,40 +2065,43 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { b.id, err) } + // Make a set of confirmed sweeps. + confirmedSet := make(map[wire.OutPoint]struct{}, len(spendTx.TxIn)) + for _, txIn := range spendTx.TxIn { + confirmedSet[txIn.PreviousOutPoint] = struct{}{} + } + // As a previous version of the batch transaction may get confirmed, // which does not contain the latest sweeps, we need to detect the // sweeps that did not make it to the confirmed transaction and feed // them back to the batcher. This will ensure that the sweeps will enter // a new batch instead of remaining dangling. var ( - totalSweptAmt btcutil.Amount confirmedSweeps = []wire.OutPoint{} - purgedSweeps = []wire.OutPoint{} - purgedSwaps = []lntypes.Hash{} + purgeList = make([]SweepRequest, 0, len(b.sweeps)) ) for _, sweep := range allSweeps { - found := false - - for _, txIn := range spendTx.TxIn { - if txIn.PreviousOutPoint == sweep.outpoint { - found = true - totalSweptAmt += sweep.value - notifyList = append(notifyList, sweep) - confirmedSweeps = append( - confirmedSweeps, sweep.outpoint, - ) - - break + _, found := confirmedSet[sweep.outpoint] + if found { + // Save the sweep as completed. Note that sweeps are + // marked completed after the batch is marked confirmed + // because the check in handleSweeps checks sweep's + // status first and then checks the batch status. + err := b.persistSweep(ctx, sweep, true) + if err != nil { + return err } + + confirmedSweeps = append( + confirmedSweeps, sweep.outpoint, + ) + + continue } // If the sweep's outpoint was not found in the transaction's // inputs this means it was left out. So we delete it from this // batch and feed it back to the batcher. - if found { - continue - } - newSweep := sweep delete(b.sweeps, sweep.outpoint) @@ -2032,6 +2133,10 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { }) } } + var ( + purgedSweeps = []wire.OutPoint{} + purgedSwaps = []lntypes.Hash{} + ) for _, sweepReq := range purgeList { purgedSwaps = append(purgedSwaps, sweepReq.SwapHash) for _, input := range sweepReq.Inputs { @@ -2039,45 +2144,8 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { } } - // Calculate the fee portion that each sweep should pay for the batch. - feePortionPaidPerSweep, roundingDifference := getFeePortionForSweep( - spendTx, len(notifyList), totalSweptAmt, - ) - - for _, sweep := range notifyList { - // Save the sweep as completed. - err := b.persistSweep(ctx, sweep, true) - if err != nil { - return err - } - - // If the sweep's notifier is empty then this means that a swap - // is not waiting to read an update from it, so we can skip - // the notification part. - if sweep.notifier == nil || - *sweep.notifier == (SpendNotifier{}) { - - continue - } - - spendDetail := SpendDetail{ - Tx: spendTx, - OnChainFeePortion: getFeePortionPaidBySweep( - spendTx, feePortionPaidPerSweep, - roundingDifference, &sweep, - ), - } - - // Dispatch the sweep notifier, we don't care about the outcome - // of this action so we don't wait for it. - go func() { - // Make sure this context doesn't expire so we - // successfully notify the caller. - ctx := context.WithoutCancel(ctx) - - sweep.notifySweepSpend(ctx, &spendDetail) - }() - } + b.Infof("fully confirmed sweeps: %v, purged sweeps: %v, "+ + "purged swaps: %v", confirmedSweeps, purgedSweeps, purgedSwaps) // Proceed with purging the sweeps. This will feed the sweeps that // didn't make it to the confirmed batch transaction back to the batcher @@ -2099,50 +2167,6 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { } }() - b.Infof("spent, confirmed sweeps: %v, purged sweeps: %v, "+ - "purged swaps: %v, purged groups: %v", confirmedSweeps, - purgedSweeps, purgedSwaps, len(purgeList)) - - // We are no longer able to accept new sweeps, so we mark the batch as - // closed and persist on storage. - b.state = Closed - - if err := b.persist(ctx); err != nil { - return fmt.Errorf("saving batch failed: %w", err) - } - - err = b.monitorConfirmations(ctx) - if err != nil { - return fmt.Errorf("monitorConfirmations failed: %w", err) - } - - return nil -} - -// handleConf handles a confirmation notification. This is the final step of the -// batch. Here we signal to the batcher that this batch was completed. We also -// cleanup up presigned transactions whose primarySweepID is one of the sweeps -// that were spent and fully confirmed: such a transaction can't be broadcasted -// since it is either in a block or double-spends one of spent outputs. -func (b *batch) handleConf(ctx context.Context, - conf *chainntnfs.TxConfirmation) error { - - spendTx := conf.Tx - txHash := spendTx.TxHash() - if b.batchTxid == nil || *b.batchTxid != txHash { - b.Warnf("Mismatch of batch txid: tx in spend notification had "+ - "txid %v, but confirmation notification has txif %v. "+ - "Using the later.", b.batchTxid, txHash) - } - b.batchTxid = &txHash - - // If the batch is in presigned mode, cleanup presignedHelper. - presigned, err := b.isPresigned() - if err != nil { - return fmt.Errorf("failed to determine if the batch %d uses "+ - "presigned mode: %w", b.id, err) - } - if presigned { b.Infof("Cleaning up presigned store") @@ -2158,13 +2182,6 @@ func (b *batch) handleConf(ctx context.Context, } } - b.Infof("confirmed in txid %s", b.batchTxid) - b.state = Confirmed - - if err := b.store.ConfirmBatch(ctx, b.id); err != nil { - return fmt.Errorf("failed to store confirmed state: %w", err) - } - // Send the confirmation to all the notifiers. for _, s := range b.sweeps { // If the sweep's notifier is empty then this means that diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index 89668f3da..82b50a5c9 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -273,7 +273,7 @@ type addSweepsRequest struct { notifier *SpendNotifier // completed is set if the sweep is spent and the spending transaction - // is confirmed. + // is fully confirmed. completed bool // parentBatch is the parent batch of this sweep. It is loaded ony if @@ -777,8 +777,8 @@ func (b *Batcher) AddSweep(ctx context.Context, sweepReq *SweepRequest) error { } infof("Batcher adding sweep group of %d sweeps with primarySweep %x, "+ - "presigned=%v, completed=%v", len(sweeps), sweep.swapHash[:6], - sweep.presigned, completed) + "presigned=%v, fully_confirmed=%v", len(sweeps), + sweep.swapHash[:6], sweep.presigned, completed) req := &addSweepsRequest{ sweeps: sweeps, @@ -838,14 +838,10 @@ func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep, // If the sweep has already been completed in a confirmed batch then we // can't attach its notifier to the batch as that is no longer running. // Instead we directly detect and return the spend here. - if completed && *notifier != (SpendNotifier{}) { - // The parent batch is indeed confirmed, meaning it is complete - // and we won't be able to attach this sweep to it. - if parentBatch.Confirmed { - return b.monitorSpendAndNotify( - ctx, sweep, parentBatch.ID, notifier, - ) - } + if completed && parentBatch.Confirmed { + return b.monitorSpendAndNotify( + ctx, sweep, parentBatch.ID, notifier, + ) } sweep.notifier = notifier @@ -1129,6 +1125,11 @@ func (b *Batcher) FetchUnconfirmedBatches(ctx context.Context) ([]*batch, func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, parentBatchID int32, notifier *SpendNotifier) error { + // If the caller has not provided a notifier, stop. + if notifier == nil || *notifier == (SpendNotifier{}) { + return nil + } + spendCtx, cancel := context.WithCancel(ctx) // Then we get the total amount that was swept by the batch. diff --git a/sweepbatcher/sweep_batcher_presigned_test.go b/sweepbatcher/sweep_batcher_presigned_test.go index 86f626cbc..3803ccc60 100644 --- a/sweepbatcher/sweep_batcher_presigned_test.go +++ b/sweepbatcher/sweep_batcher_presigned_test.go @@ -1360,7 +1360,12 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, require.LessOrEqual(t, numConfirmedSwaps, numSwaps) - const sweepsPerSwap = 2 + const ( + sweepsPerSwap = 2 + feeRate = chainfee.SatPerKWeight(10_000) + swapAmount = 3_000_001 + ) + sweepAmounts := []btcutil.Amount{1_000_001, 2_000_000} lnd := test.NewMockLnd() @@ -1370,7 +1375,7 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, customFeeRate := func(_ context.Context, _ lntypes.Hash) (chainfee.SatPerKWeight, error) { - return chainfee.SatPerKWeight(10_000), nil + return feeRate, nil } presignedHelper := newMockPresignedHelper() @@ -1388,12 +1393,17 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, checkBatcherError(t, err) }() + swapHashes := make([]lntypes.Hash, numSwaps) + groups := make([][]Input, numSwaps) txs := make([]*wire.MsgTx, numSwaps) allOps := make([]wire.OutPoint, 0, numSwaps*sweepsPerSwap) + spendChans := make([]<-chan *SpendDetail, numSwaps) + confChans := make([]<-chan *chainntnfs.TxConfirmation, numSwaps) for i := range numSwaps { // Create a swap of sweepsPerSwap sweeps. swapHash := lntypes.Hash{byte(i + 1)} + swapHashes[i] = swapHash ops := make([]wire.OutPoint, sweepsPerSwap) group := make([]Input, sweepsPerSwap) for j := range sweepsPerSwap { @@ -1405,15 +1415,16 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, group[j] = Input{ Outpoint: ops[j], - Value: btcutil.Amount(1_000_000 * (j + 1)), + Value: sweepAmounts[j], } } + groups[i] = group // Create a swap in DB. swap := &loopdb.LoopOutContract{ SwapContract: loopdb.SwapContract{ CltvExpiry: 111, - AmountRequested: 3_000_000, + AmountRequested: swapAmount, ProtocolVersion: loopdb.ProtocolVersionMuSig2, HtlcKeys: htlcKeys, @@ -1440,11 +1451,24 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, ) require.NoError(t, err) + // Create a spending notification channel. + spendChan := make(chan *SpendDetail, 1) + spendChans[i] = spendChan + confChan := make(chan *chainntnfs.TxConfirmation, 1) + confChans[i] = confChan + notifier := &SpendNotifier{ + SpendChan: spendChan, + SpendErrChan: make(chan error, 1), + ConfChan: confChan, + ConfErrChan: make(chan error, 1), + QuitChan: make(chan bool, 1), + } + // Add the sweep, triggering the publish attempt. require.NoError(t, batcher.AddSweep(ctx, &SweepRequest{ SwapHash: swapHash, Inputs: group, - Notifier: &dummyNotifier, + Notifier: notifier, })) // For the first group it should register for the sweep's spend @@ -1543,6 +1567,13 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, SpendingHeight: int32(601 + numSwaps + 1), } lnd.SpendChannel <- spendDetail + + // Make sure that notifiers of confirmed sweeps received notifications. + for i := range numConfirmedSwaps { + spend := <-spendChans[i] + require.Equal(t, txHash, spend.Tx.TxHash()) + } + <-lnd.RegisterConfChannel require.NoError(t, lnd.NotifyHeight( int32(601+numSwaps+1+batchConfHeight), @@ -1554,12 +1585,18 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, // CleanupTransactions is called here. <-presignedHelper.cleanupCalled - // If all the swaps were confirmed, stop. - if numConfirmedSwaps == numSwaps { - return + // Increasing block height caused the second batch to re-publish. + if online && numConfirmedSwaps < numSwaps { + <-lnd.TxPublishChannel + } + + // Make sure that notifiers of confirmed sweeps received notifications. + for i := range numConfirmedSwaps { + conf := <-confChans[i] + require.Equal(t, txHash, conf.Tx.TxHash()) } - if !online { + if !online && numConfirmedSwaps != numSwaps { // If the sweeps are offline, the missing sweeps in the // confirmed transaction should be re-added to the batcher as // new batch. The groups are added incrementally, so we need @@ -1568,6 +1605,47 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, <-lnd.TxPublishChannel } + // Now make sure that a correct spend and conf contification is sent if + // AddSweep is called after confirming the sweeps. + for i := range numConfirmedSwaps { + // Create a spending notification channel. + spendChan := make(chan *SpendDetail, 1) + confChan := make(chan *chainntnfs.TxConfirmation) + notifier := &SpendNotifier{ + SpendChan: spendChan, + SpendErrChan: make(chan error, 1), + ConfChan: confChan, + ConfErrChan: make(chan error, 1), + QuitChan: make(chan bool, 1), + } + + // Add the sweep, triggering the publish attempt. + require.NoError(t, batcher.AddSweep(ctx, &SweepRequest{ + SwapHash: swapHashes[i], + Inputs: groups[i], + Notifier: notifier, + })) + + spendReg := <-lnd.RegisterSpendChannel + spendReg.SpendChannel <- spendDetail + + spend := <-spendChan + require.Equal(t, txHash, spend.Tx.TxHash()) + + <-lnd.RegisterConfChannel + lnd.ConfChannel <- &chainntnfs.TxConfirmation{ + Tx: tx, + } + + conf := <-confChan + require.Equal(t, tx.TxHash(), conf.Tx.TxHash()) + } + + // If all the swaps were confirmed, stop. + if numConfirmedSwaps == numSwaps { + return + } + // Wait to new batch to appear and to have the expected size. wantSize := (numSwaps - numConfirmedSwaps) * sweepsPerSwap if online { @@ -1675,11 +1753,13 @@ func TestPresigned(t *testing.T) { testPurging(3, 1, false) testPurging(3, 2, false) testPurging(5, 2, false) + testPurging(5, 3, false) // Test cases in which the sweeps are online. testPurging(2, 1, true) testPurging(3, 1, true) testPurging(3, 2, true) testPurging(5, 2, true) + testPurging(5, 3, true) }) } diff --git a/sweepbatcher/sweep_batcher_test.go b/sweepbatcher/sweep_batcher_test.go index 409b653af..b6247be5b 100644 --- a/sweepbatcher/sweep_batcher_test.go +++ b/sweepbatcher/sweep_batcher_test.go @@ -2302,22 +2302,15 @@ func testSweepBatcherSweepReentry(t *testing.T, store testStore, return b.state == Closed }, test.Timeout, eventuallyCheckFrequency) - // Since second batch was created we check that it registered for its - // primary sweep's spend. - <-lnd.RegisterSpendChannel - - // While handling the spend notification the batch should detect that - // some sweeps did not appear in the spending tx, therefore it redirects - // them back to the batcher and the batcher inserts them in a new batch. - require.Eventually(t, func() bool { - return batcher.numBatches(ctx) == 2 - }, test.Timeout, eventuallyCheckFrequency) - // We mock the confirmation notification. lnd.ConfChannel <- &chainntnfs.TxConfirmation{ Tx: spendingTx, } + // Since second batch was created we check that it registered for its + // primary sweep's spend. + <-lnd.RegisterSpendChannel + // Wait for tx to be published. // Here is a race condition, which is unlikely to cause a crash: if we // wait for publish tx before sending a conf notification (previous From 63fd680c8bccadf9f07e030fd3b1628f66af467d Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Sun, 27 Apr 2025 01:08:41 -0300 Subject: [PATCH 16/26] sweepbatcher: fix OnChainFeePortion values There were two mistakes. In case of a swap with multiple sweeps only the fee of the first sweep of a swap was accounted. Rounding diff (the remainder) was attributed to all the sweeps rather than to the first (primary) sweep of the batch. The sweep to attribute the remainder was chosen by comparing SignatureScript which is always empty. New approach is to find the primary sweep and to compare its outpoint directly. --- sweepbatcher/sweep_batch.go | 42 +++++++++++++++----- sweepbatcher/sweep_batcher.go | 32 +++++++++++---- sweepbatcher/sweep_batcher_presigned_test.go | 22 ++++++++++ 3 files changed, 78 insertions(+), 18 deletions(-) diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index c54269e77..41c88d626 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -1923,12 +1923,12 @@ func getFeePortionForSweep(spendTx *wire.MsgTx, numSweeps int, } // getFeePortionPaidBySweep returns the fee portion that the sweep should pay -// for the batch transaction. If the sweep is the first sweep in the batch, it +// for the batch transaction. If the sweep is the primary sweep in the batch, it // pays the rounding difference. -func getFeePortionPaidBySweep(spendTx *wire.MsgTx, feePortionPerSweep, - roundingDiff btcutil.Amount, sweep *sweep) btcutil.Amount { +func getFeePortionPaidBySweep(feePortionPerSweep, roundingDiff btcutil.Amount, + primary bool) btcutil.Amount { - if bytes.Equal(spendTx.TxIn[0].SignatureScript, sweep.htlc.SigScript) { + if primary { return feePortionPerSweep + roundingDiff } @@ -1980,22 +1980,42 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { spendTx, len(notifyList), totalSweptAmt, ) + // Calculate fees per swaps. Only the first sweep in a swap has a + // notifier, so we calculate total fee per swap and send it to a sweep + // having that swap and a notifier. + swap2fee := make(map[lntypes.Hash]btcutil.Amount) + for _, sweep := range notifyList { + primary := sweep.outpoint == b.primarySweepID + + swap2fee[sweep.swapHash] += getFeePortionPaidBySweep( + feePortionPaidPerSweep, roundingDifference, primary, + ) + } + + // Now send notifications to notifiers. for _, sweep := range notifyList { // If the sweep's notifier is empty then this means that a swap - // is not waiting to read an update from it, so we can skip - // the notification part. + // is not waiting to read an update from it or this is not the + // first sweep in a swap, so we can skip the notification part. if sweep.notifier == nil || *sweep.notifier == (SpendNotifier{}) { continue } + // Make sure there is only one sweep with a notifier per swap + // hash, otherwise our fee calculation is incorrect. + fee, has := swap2fee[sweep.swapHash] + if !has { + return fmt.Errorf("no fee for swap %v; maybe "+ + "multiple sweeps with a notifier per swap?", + sweep.swapHash) + } + delete(swap2fee, sweep.swapHash) + spendDetail := SpendDetail{ - Tx: spendTx, - OnChainFeePortion: getFeePortionPaidBySweep( - spendTx, feePortionPaidPerSweep, - roundingDifference, &sweep, - ), + Tx: spendTx, + OnChainFeePortion: fee, } // Dispatch the sweep notifier, we don't care about the outcome diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index 82b50a5c9..cca273d05 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -840,7 +840,7 @@ func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep, // Instead we directly detect and return the spend here. if completed && parentBatch.Confirmed { return b.monitorSpendAndNotify( - ctx, sweep, parentBatch.ID, notifier, + ctx, sweeps, parentBatch.ID, notifier, ) } @@ -1122,7 +1122,7 @@ func (b *Batcher) FetchUnconfirmedBatches(ctx context.Context) ([]*batch, // the response back to the response channel. It is called if the batch is fully // confirmed and we just need to deliver the data back to the caller though // SpendNotifier. -func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, +func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweeps []*sweep, parentBatchID int32, notifier *SpendNotifier) error { // If the caller has not provided a notifier, stop. @@ -1140,6 +1140,17 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, return err } + // Find the primarySweepID. + dbSweeps, err := b.store.FetchBatchSweeps(ctx, parentBatchID) + if err != nil { + cancel() + + return err + } + primarySweepID := dbSweeps[0].Outpoint + + sweep := sweeps[0] + spendChan, spendErr, err := b.chainNotifier.RegisterSpendNtfn( spendCtx, &sweep.outpoint, sweep.htlc.PkScript, sweep.initiationHeight, @@ -1160,6 +1171,7 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, select { case spend := <-spendChan: spendTx := spend.SpendingTx + // Calculate the fee portion that each sweep should pay // for the batch. feePortionPerSweep, roundingDifference := @@ -1168,17 +1180,23 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, totalSwept, ) - onChainFeePortion := getFeePortionPaidBySweep( - spendTx, feePortionPerSweep, - roundingDifference, sweep, - ) + // Sum onchain fee across all the sweeps of the swap. + var fee btcutil.Amount + for _, s := range sweeps { + isFirst := s.outpoint == primarySweepID + + fee += getFeePortionPaidBySweep( + feePortionPerSweep, roundingDifference, + isFirst, + ) + } // Notify the requester of the spend with the spend // details, including the fee portion for this // particular sweep. spendDetail := &SpendDetail{ Tx: spendTx, - OnChainFeePortion: onChainFeePortion, + OnChainFeePortion: fee, } select { diff --git a/sweepbatcher/sweep_batcher_presigned_test.go b/sweepbatcher/sweep_batcher_presigned_test.go index 3803ccc60..0464732aa 100644 --- a/sweepbatcher/sweep_batcher_presigned_test.go +++ b/sweepbatcher/sweep_batcher_presigned_test.go @@ -1568,10 +1568,31 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, } lnd.SpendChannel <- spendDetail + // Calculate the expected on-chain fee of the swap. + wantFee := make([]btcutil.Amount, numConfirmedSwaps) + for i := range numConfirmedSwaps { + batchAmount := swapAmount * btcutil.Amount(numConfirmedSwaps) + txFee := batchAmount - btcutil.Amount(tx.TxOut[0].Value) + numConfirmedSweeps := numConfirmedSwaps * sweepsPerSwap + feePerSweep := txFee / btcutil.Amount(numConfirmedSweeps) + roundingDiff := txFee - feePerSweep*btcutil.Amount( + numConfirmedSweeps, + ) + swapFee := feePerSweep * 2 + + // Add rounding difference to the first swap. + if i == 0 { + swapFee += roundingDiff + } + + wantFee[i] = swapFee + } + // Make sure that notifiers of confirmed sweeps received notifications. for i := range numConfirmedSwaps { spend := <-spendChans[i] require.Equal(t, txHash, spend.Tx.TxHash()) + require.Equal(t, wantFee[i], spend.OnChainFeePortion) } <-lnd.RegisterConfChannel @@ -1631,6 +1652,7 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, spend := <-spendChan require.Equal(t, txHash, spend.Tx.TxHash()) + require.Equal(t, wantFee[i], spend.OnChainFeePortion) <-lnd.RegisterConfChannel lnd.ConfChannel <- &chainntnfs.TxConfirmation{ From 5ab9a4bb1678947f9a32853b0d5acde172eb6308 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Wed, 30 Apr 2025 00:07:04 -0300 Subject: [PATCH 17/26] loopout: close sweepbatcher quitChan This is needed because sweepbatcher can use this channel in multiple select statements to unblock itself if the caller cancels. --- loopout.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/loopout.go b/loopout.go index fde6d2826..9cce79d82 100644 --- a/loopout.go +++ b/loopout.go @@ -1148,7 +1148,7 @@ func (s *loopOutSwap) waitForHtlcSpendConfirmedV2(globalCtx context.Context, quitChan := make(chan bool, 1) defer func() { - quitChan <- true + close(quitChan) }() notifier := sweepbatcher.SpendNotifier{ From 655df5c99889342396effc627db3dee411c3595e Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Mon, 5 May 2025 23:28:23 -0300 Subject: [PATCH 18/26] sweepbatcher: pass utxo to fee provider --- loopout_feerate.go | 4 ++- sweepbatcher/sweep_batcher.go | 6 ++-- sweepbatcher/sweep_batcher_presigned_test.go | 32 ++++++++++---------- sweepbatcher/sweep_batcher_test.go | 12 ++++---- 4 files changed, 28 insertions(+), 26 deletions(-) diff --git a/loopout_feerate.go b/loopout_feerate.go index 4cebd1e32..4540644c1 100644 --- a/loopout_feerate.go +++ b/loopout_feerate.go @@ -7,6 +7,7 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/txscript" + "github.com/btcsuite/btcd/wire" "github.com/lightninglabs/loop/loopdb" "github.com/lightninglabs/loop/swap" "github.com/lightninglabs/loop/utils" @@ -71,7 +72,8 @@ func newLoopOutSweepFeerateProvider(sweeper sweeper, // GetMinFeeRate returns minimum required feerate for a sweep by swap hash. func (p *loopOutSweepFeerateProvider) GetMinFeeRate(ctx context.Context, - swapHash lntypes.Hash) (chainfee.SatPerKWeight, error) { + swapHash lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { _, feeRate, err := p.GetConfTargetAndFeeRate(ctx, swapHash) diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index cca273d05..c77d59799 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -201,8 +201,8 @@ type VerifySchnorrSig func(pubKey *btcec.PublicKey, hash, sig []byte) error // FeeRateProvider is a function that returns min fee rate of a batch sweeping // the UTXO of the swap. -type FeeRateProvider func(ctx context.Context, - swapHash lntypes.Hash) (chainfee.SatPerKWeight, error) +type FeeRateProvider func(ctx context.Context, swapHash lntypes.Hash, + utxo wire.OutPoint) (chainfee.SatPerKWeight, error) // InitialDelayProvider returns the duration after which a newly created batch // is first published. It allows to customize the duration based on total value @@ -1448,7 +1448,7 @@ func (b *Batcher) loadSweep(ctx context.Context, swapHash lntypes.Hash, // provided, otherwise use wallet's EstimateFeeRate. var minFeeRate chainfee.SatPerKWeight if b.customFeeRate != nil { - minFeeRate, err = b.customFeeRate(ctx, swapHash) + minFeeRate, err = b.customFeeRate(ctx, swapHash, outpoint) if err != nil { return nil, fmt.Errorf("failed to fetch min fee rate "+ "for %x: %w", swapHash[:6], err) diff --git a/sweepbatcher/sweep_batcher_presigned_test.go b/sweepbatcher/sweep_batcher_presigned_test.go index 0464732aa..ebbd3509b 100644 --- a/sweepbatcher/sweep_batcher_presigned_test.go +++ b/sweepbatcher/sweep_batcher_presigned_test.go @@ -254,8 +254,8 @@ func testPresigned_forgotten_presign(t *testing.T, ctx, cancel := context.WithCancel(context.Background()) defer cancel() - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return chainfee.SatPerKWeight(10_000), nil } @@ -330,8 +330,8 @@ func testPresigned_input1_offline_then_input2(t *testing.T, setFeeRate := func(feeRate chainfee.SatPerKWeight) { currentFeeRate = feeRate } - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return currentFeeRate, nil } @@ -511,8 +511,8 @@ func testPresigned_two_inputs_one_goes_offline(t *testing.T, setFeeRate := func(feeRate chainfee.SatPerKWeight) { currentFeeRate = feeRate } - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return currentFeeRate, nil } @@ -647,8 +647,8 @@ func testPresigned_first_publish_fails(t *testing.T, setFeeRate := func(feeRate chainfee.SatPerKWeight) { currentFeeRate = feeRate } - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return currentFeeRate, nil } @@ -770,8 +770,8 @@ func testPresigned_locktime(t *testing.T, setFeeRate := func(feeRate chainfee.SatPerKWeight) { currentFeeRate = feeRate } - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return currentFeeRate, nil } @@ -854,8 +854,8 @@ func testPresigned_presigned_group(t *testing.T, ctx, cancel := context.WithCancel(context.Background()) defer cancel() - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return chainfee.SatPerKWeight(10_000), nil } @@ -1091,8 +1091,8 @@ func testPresigned_presigned_and_regular_sweeps(t *testing.T, store testStore, setFeeRate := func(feeRate chainfee.SatPerKWeight) { currentFeeRate = feeRate } - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return currentFeeRate, nil } @@ -1372,8 +1372,8 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, ctx, cancel := context.WithCancel(context.Background()) defer cancel() - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return feeRate, nil } diff --git a/sweepbatcher/sweep_batcher_test.go b/sweepbatcher/sweep_batcher_test.go index b6247be5b..0ad5f64d8 100644 --- a/sweepbatcher/sweep_batcher_test.go +++ b/sweepbatcher/sweep_batcher_test.go @@ -408,8 +408,8 @@ func testFeeBumping(t *testing.T, store testStore, // Disable fee bumping, if requested. var opts []BatcherOption if noFeeBumping { - customFeeRate := func(ctx context.Context, - swapHash lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { // Always provide the same value, no bumping. return test.DefaultMockFee, nil @@ -3689,8 +3689,8 @@ func testSweepFetcher(t *testing.T, store testStore, require.NoError(t, err) store.AssertLoopOutStored() - customFeeRate := func(ctx context.Context, - swapHash lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { // Always provide the same value, no bumping. return feeRate, nil @@ -4536,8 +4536,8 @@ func testFeeRateGrows(t *testing.T, store testStore, swap2feeRate[swapHash] = rate } - customFeeRate := func(ctx context.Context, - swapHash lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, swapHash lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { swap2feeRateMu.Lock() defer swap2feeRateMu.Unlock() From d87ef030b3bfa54329a58765feaa52c95091ad6c Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Tue, 6 May 2025 14:21:39 -0300 Subject: [PATCH 19/26] sweepbatcher: make sure dest pkscript is filled --- sweepbatcher/sweep_batch.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index 41c88d626..3cf21f4fd 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -1281,6 +1281,10 @@ func constructUnsignedTx(sweeps []sweep, address btcutil.Address, return nil, 0, 0, 0, fmt.Errorf("txscript.PayToAddrScript "+ "failed: %w", err) } + if len(batchPkScript) == 0 { + return nil, 0, 0, 0, fmt.Errorf("txscript.PayToAddrScript " + + "returned an empty pkScript") + } // Add the output to weight estimates. err = sweeppkg.AddOutputEstimate(&weightEstimate, address) From 63ad638ab37bdc1286f4dea3978179c2ac0480c9 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Tue, 6 May 2025 14:25:21 -0300 Subject: [PATCH 20/26] sweepbatcher: simplify presigned/purging test It doesn't need loopdb, so remove that code. --- sweepbatcher/sweep_batcher_presigned_test.go | 58 +++----------------- 1 file changed, 7 insertions(+), 51 deletions(-) diff --git a/sweepbatcher/sweep_batcher_presigned_test.go b/sweepbatcher/sweep_batcher_presigned_test.go index ebbd3509b..92b10c42c 100644 --- a/sweepbatcher/sweep_batcher_presigned_test.go +++ b/sweepbatcher/sweep_batcher_presigned_test.go @@ -1354,7 +1354,7 @@ func testPresigned_presigned_and_regular_sweeps(t *testing.T, store testStore, // to another online batch. In offline case they must are added to a new batch // having valid presigned transactions. func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, - store testStore, batcherStore testBatcherStore, online bool) { + batcherStore testBatcherStore, online bool) { defer test.Guard(t)() @@ -1420,33 +1420,13 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, } groups[i] = group - // Create a swap in DB. - swap := &loopdb.LoopOutContract{ - SwapContract: loopdb.SwapContract{ - CltvExpiry: 111, - AmountRequested: swapAmount, - ProtocolVersion: loopdb.ProtocolVersionMuSig2, - HtlcKeys: htlcKeys, - - // Make preimage unique to pass SQL constraints. - Preimage: lntypes.Preimage{byte(i + 1)}, - }, - - DestAddr: destAddr, - SwapInvoice: swapInvoice, - SweepConfTarget: 111, - } - err := store.CreateLoopOut(ctx, swapHash, swap) - require.NoError(t, err) - store.AssertLoopOutStored() - // Enable all the sweeps. for _, op := range ops { presignedHelper.SetOutpointOnline(op, true) } // An attempt to presign must succeed. - err = batcher.PresignSweepsGroup( + err := batcher.PresignSweepsGroup( ctx, group, sweepTimeout, destAddr, ) require.NoError(t, err) @@ -1506,31 +1486,11 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, }, } - // Create a swap in DB. - swap := &loopdb.LoopOutContract{ - SwapContract: loopdb.SwapContract{ - CltvExpiry: 111, - AmountRequested: amount, - ProtocolVersion: loopdb.ProtocolVersionMuSig2, - HtlcKeys: htlcKeys, - - // Make preimage unique to pass SQL constraints. - Preimage: lntypes.Preimage{1, 2, 3}, - }, - - DestAddr: destAddr, - SwapInvoice: swapInvoice, - SweepConfTarget: 111, - } - err := store.CreateLoopOut(ctx, swapHash, swap) - require.NoError(t, err) - store.AssertLoopOutStored() - // Enable the sweep. presignedHelper.SetOutpointOnline(opx, true) // An attempt to presign must succeed. - err = batcher.PresignSweepsGroup( + err := batcher.PresignSweepsGroup( ctx, group, sweepTimeout, destAddr, ) require.NoError(t, err) @@ -1757,14 +1717,10 @@ func TestPresigned(t *testing.T) { } t.Run(name, func(t *testing.T) { - runTests(t, func(t *testing.T, store testStore, - batcherStore testBatcherStore) { - - testPresigned_purging( - t, numSwaps, numConfirmedSwaps, - store, batcherStore, online, - ) - }) + testPresigned_purging( + t, numSwaps, numConfirmedSwaps, + NewStoreMock(), online, + ) }) } From 62aae85b4cbb0c2299458223e61c025d8cc31871 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Tue, 6 May 2025 14:26:06 -0300 Subject: [PATCH 21/26] sweepbatcher: make sure HTLC.PkScript is filled --- sweepbatcher/sweep_batcher.go | 7 +++++++ sweepbatcher/sweep_batcher_presigned_test.go | 10 ++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index c77d59799..e7fb169db 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -1444,6 +1444,13 @@ func (b *Batcher) loadSweep(ctx context.Context, swapHash lntypes.Hash, swapHash[:6], err) } + // Make sure that PkScript of the coin is filled. Otherwise + // RegisterSpendNtfn fails. + if len(s.HTLC.PkScript) == 0 { + return nil, fmt.Errorf("sweep data for %x doesn't have "+ + "HTLC.PkScript set", swapHash[:6]) + } + // Find minimum fee rate for the sweep. Use customFeeRate if it is // provided, otherwise use wallet's EstimateFeeRate. var minFeeRate chainfee.SatPerKWeight diff --git a/sweepbatcher/sweep_batcher_presigned_test.go b/sweepbatcher/sweep_batcher_presigned_test.go index 92b10c42c..4154fa9ff 100644 --- a/sweepbatcher/sweep_batcher_presigned_test.go +++ b/sweepbatcher/sweep_batcher_presigned_test.go @@ -14,6 +14,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btclog/v2" "github.com/lightninglabs/loop/loopdb" + "github.com/lightninglabs/loop/swap" "github.com/lightninglabs/loop/test" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/lntypes" @@ -232,13 +233,18 @@ func (h *mockPresignedHelper) FetchSweep(_ context.Context, h.mu.Lock() defer h.mu.Unlock() - _, has := h.onlineOutpoints[utxo] + // Find IsPresigned. + _, isPresigned := h.onlineOutpoints[utxo] return &SweepInfo{ // Set Timeout to prevent warning messages about timeout=0. Timeout: sweepTimeout, - IsPresigned: has, + IsPresigned: isPresigned, + + HTLC: swap.Htlc{ + PkScript: []byte{10, 11, 12}, + }, }, nil } From ea312b8f531027ebb8ec0e39edcf23681c8800f2 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Thu, 8 May 2025 00:12:30 -0300 Subject: [PATCH 22/26] sweepbatcher/presigned: minRelayFee edge cases Make sure that broadcasted tx has feeRate >= minRelayFee. Make sure that feeRate of broadcasted tx doesn't decrease. --- sweepbatcher/presigned.go | 19 +-- sweepbatcher/sweep_batcher_presigned_test.go | 121 +++++++++++++++++++ 2 files changed, 132 insertions(+), 8 deletions(-) diff --git a/sweepbatcher/presigned.go b/sweepbatcher/presigned.go index 0d52bcbfe..ba889d6eb 100644 --- a/sweepbatcher/presigned.go +++ b/sweepbatcher/presigned.go @@ -405,9 +405,16 @@ func (b *batch) publishPresigned(ctx context.Context) (btcutil.Amount, error, } } + // Determine the current minimum relay fee based on our chain backend. + minRelayFee, err := b.wallet.MinRelayFee(ctx) + if err != nil { + return 0, fmt.Errorf("failed to get minRelayFee: %w", err), + false + } + // Cache current height and desired feerate of the batch. currentHeight := b.currentHeight - feeRate := b.rbfCache.FeeRate + feeRate := max(b.rbfCache.FeeRate, minRelayFee) // Append this sweep to an array of sweeps. This is needed to keep the // order of sweeps stored, as iterating the sweeps map does not @@ -445,13 +452,6 @@ func (b *batch) publishPresigned(ctx context.Context) (btcutil.Amount, error, batchAmt += sweep.value } - // Determine the current minimum relay fee based on our chain backend. - minRelayFee, err := b.wallet.MinRelayFee(ctx) - if err != nil { - return 0, fmt.Errorf("failed to get minRelayFee: %w", err), - false - } - // Get a pre-signed transaction. const loadOnly = false signedTx, err := b.cfg.presignedHelper.SignTx( @@ -506,6 +506,9 @@ func (b *batch) publishPresigned(ctx context.Context) (btcutil.Amount, error, b.batchTxid = &txHash b.batchPkScript = tx.TxOut[0].PkScript + // Update cached FeeRate not to broadcast a tx with lower feeRate. + b.rbfCache.FeeRate = max(b.rbfCache.FeeRate, signedFeeRate) + return fee, nil, true } diff --git a/sweepbatcher/sweep_batcher_presigned_test.go b/sweepbatcher/sweep_batcher_presigned_test.go index 4154fa9ff..56d336c6f 100644 --- a/sweepbatcher/sweep_batcher_presigned_test.go +++ b/sweepbatcher/sweep_batcher_presigned_test.go @@ -159,6 +159,11 @@ func (h *mockPresignedHelper) SignTx(ctx context.Context, h.mu.Lock() defer h.mu.Unlock() + if feeRate < minRelayFee { + return nil, fmt.Errorf("feeRate (%v) is below minRelayFee (%v)", + feeRate, minRelayFee) + } + // If all the inputs are online and loadOnly is not set, sign this exact // transaction. if offline := h.offlineInputs(tx); len(offline) == 0 && !loadOnly { @@ -492,6 +497,118 @@ func testPresigned_input1_offline_then_input2(t *testing.T, require.NoError(t, err) } +// testPresigned_min_relay_fee tests that online and presigned transactions +// comply with min_relay_fee. +func testPresigned_min_relay_fee(t *testing.T, + batcherStore testBatcherStore) { + + defer test.Guard(t)() + + lnd := test.NewMockLnd() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const inputAmt = 1_000_000 + + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { + + return chainfee.FeePerKwFloor, nil + } + + presignedHelper := newMockPresignedHelper() + + batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer, + testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams, + batcherStore, presignedHelper, + WithCustomFeeRate(customFeeRate), + WithPresignedHelper(presignedHelper)) + go func() { + err := batcher.Run(ctx) + checkBatcherError(t, err) + }() + + // Set high min_relay_fee. + lnd.SetMinRelayFee(400) + + // Create the first sweep. + swapHash1 := lntypes.Hash{1, 1, 1} + op1 := wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + } + sweepReq1 := SweepRequest{ + SwapHash: swapHash1, + Inputs: []Input{{ + Value: inputAmt, + Outpoint: op1, + }}, + Notifier: &dummyNotifier, + } + + // Enable the input and presign. + presignedHelper.SetOutpointOnline(op1, true) + err := batcher.PresignSweepsGroup( + ctx, []Input{{Outpoint: op1, Value: inputAmt}}, + sweepTimeout, destAddr, + ) + require.NoError(t, err) + + // Deliver sweep request to batcher. + require.NoError(t, batcher.AddSweep(ctx, &sweepReq1)) + + // Since a batch was created we check that it registered for its primary + // sweep's spend. + <-lnd.RegisterSpendChannel + + // Wait for a transactions to be published. + tx := <-lnd.TxPublishChannel + gotFeeRate := presignedHelper.getTxFeerate(tx, inputAmt) + require.Equal(t, chainfee.SatPerKWeight(402), gotFeeRate) + + // Now decrease min_relay_fee and make sure fee rate doesn't decrease. + // The only difference of tx2 is a higher lock_time. + lnd.SetMinRelayFee(300) + require.NoError(t, lnd.NotifyHeight(601)) + tx2 := <-lnd.TxPublishChannel + require.Equal(t, tx.TxOut[0].Value, tx2.TxOut[0].Value) + gotFeeRate = presignedHelper.getTxFeerate(tx2, inputAmt) + require.Equal(t, chainfee.SatPerKWeight(402), gotFeeRate) + require.Equal(t, uint32(601), tx2.LockTime) + + // Set a higher min_relay_fee, turn off the client and try presigned tx. + lnd.SetMinRelayFee(500) + presignedHelper.SetOutpointOnline(op1, false) + + // Check fee rate of the presigned tx broadcasted. + require.NoError(t, lnd.NotifyHeight(602)) + tx = <-lnd.TxPublishChannel + gotFeeRate = presignedHelper.getTxFeerate(tx, inputAmt) + require.Equal(t, chainfee.SatPerKWeight(523), gotFeeRate) + // LockTime of a presigned tx is 0. + require.Equal(t, uint32(0), tx.LockTime) + + // Now decrease min_relay_fee and make sure fee rate doesn't decrease. + // It should re-broadcast the same presigned tx. + lnd.SetMinRelayFee(450) + require.NoError(t, lnd.NotifyHeight(603)) + tx2 = <-lnd.TxPublishChannel + require.Equal(t, tx.TxHash(), tx2.TxHash()) + gotFeeRate = presignedHelper.getTxFeerate(tx2, inputAmt) + require.Equal(t, chainfee.SatPerKWeight(523), gotFeeRate) + // LockTime of a presigned tx is 0. + require.Equal(t, uint32(0), tx2.LockTime) + + // Even if the client is back online, fee rate doesn't decrease. + presignedHelper.SetOutpointOnline(op1, true) + require.NoError(t, lnd.NotifyHeight(604)) + tx3 := <-lnd.TxPublishChannel + require.Equal(t, tx2.TxOut[0].Value, tx3.TxOut[0].Value) + gotFeeRate = presignedHelper.getTxFeerate(tx3, inputAmt) + require.Equal(t, chainfee.SatPerKWeight(523), gotFeeRate) + require.Equal(t, uint32(604), tx3.LockTime) +} + // testPresigned_two_inputs_one_goes_offline tests presigned mode for the // following scenario: two online inputs are added, then one of them goes // offline, then feerate grows and a presigned transaction is used. @@ -1690,6 +1807,10 @@ func TestPresigned(t *testing.T) { testPresigned_input1_offline_then_input2(t, NewStoreMock()) }) + t.Run("min_relay_fee", func(t *testing.T) { + testPresigned_min_relay_fee(t, NewStoreMock()) + }) + t.Run("two_inputs_one_goes_offline", func(t *testing.T) { testPresigned_two_inputs_one_goes_offline(t, NewStoreMock()) }) From b339904ba4db3d271d599e27a2babdf51d3d0cd0 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Wed, 14 May 2025 21:40:42 -0300 Subject: [PATCH 23/26] sweepbatcher: remove method Presign Method Presign is not as reliable as SignTx, because it checks transaction by txid and can miss for example if LockTime is different. SignTx can do everything Presign was used for. --- sweepbatcher/presigned.go | 28 ++++++------ sweepbatcher/presigned_test.go | 20 ++++++--- sweepbatcher/sweep_batch.go | 2 +- sweepbatcher/sweep_batcher.go | 11 +---- sweepbatcher/sweep_batcher_presigned_test.go | 46 ++++---------------- 5 files changed, 39 insertions(+), 68 deletions(-) diff --git a/sweepbatcher/presigned.go b/sweepbatcher/presigned.go index ba889d6eb..7b309470e 100644 --- a/sweepbatcher/presigned.go +++ b/sweepbatcher/presigned.go @@ -36,13 +36,7 @@ func (b *batch) ensurePresigned(ctx context.Context, newSweeps []*sweep, // presignedTxChecker has methods to check if the inputs are presigned. type presignedTxChecker interface { destPkScripter - - // SignTx signs an unsigned transaction or returns a pre-signed tx. - // It is only called with loadOnly=true by ensurePresigned. - SignTx(ctx context.Context, primarySweepID wire.OutPoint, - tx *wire.MsgTx, inputAmt btcutil.Amount, - minRelayFee, feeRate chainfee.SatPerKWeight, - loadOnly bool) (*wire.MsgTx, error) + presigner } // ensurePresigned checks that there is a presigned transaction spending the @@ -287,11 +281,12 @@ func (b *batch) presign(ctx context.Context, newSweeps []*sweep) error { // presigner tries to presign a batch transaction. type presigner interface { - // Presign tries to presign a batch transaction. If the method returns - // nil, it is guaranteed that future calls to SignTx on this set of - // sweeps return valid signed transactions. - Presign(ctx context.Context, primarySweepID wire.OutPoint, - tx *wire.MsgTx, inputAmt btcutil.Amount) error + // SignTx signs an unsigned transaction or returns a pre-signed tx. + // It is only called with loadOnly=true by ensurePresigned. + SignTx(ctx context.Context, primarySweepID wire.OutPoint, + tx *wire.MsgTx, inputAmt btcutil.Amount, + minRelayFee, feeRate chainfee.SatPerKWeight, + loadOnly bool) (*wire.MsgTx, error) } // presign tries to presign batch sweep transactions of the sweeps. It signs @@ -370,7 +365,14 @@ func presign(ctx context.Context, presigner presigner, destAddr btcutil.Address, } // Try to presign this transaction. - err = presigner.Presign(ctx, primarySweepID, tx, batchAmt) + const ( + loadOnly = false + minRelayFee = chainfee.AbsoluteFeePerKwFloor + ) + _, err = presigner.SignTx( + ctx, primarySweepID, tx, batchAmt, minRelayFee, fr, + loadOnly, + ) if err != nil { return fmt.Errorf("failed to presign unsigned tx %v "+ "for feeRate %v: %w", tx.TxHash(), fr, err) diff --git a/sweepbatcher/presigned_test.go b/sweepbatcher/presigned_test.go index 60f287764..9f0d8b29f 100644 --- a/sweepbatcher/presigned_test.go +++ b/sweepbatcher/presigned_test.go @@ -553,24 +553,30 @@ type mockPresigner struct { failAt int } -// Presign memorizes the value of the output and fails if the number of +// SignTx memorizes the value of the output and fails if the number of // calls previously made is failAt. -func (p *mockPresigner) Presign(ctx context.Context, - primarySweepID wire.OutPoint, tx *wire.MsgTx, - inputAmt btcutil.Amount) error { +func (p *mockPresigner) SignTx(ctx context.Context, + primarySweepID wire.OutPoint, tx *wire.MsgTx, inputAmt btcutil.Amount, + minRelayFee, feeRate chainfee.SatPerKWeight, + loadOnly bool) (*wire.MsgTx, error) { + + if ctx.Err() != nil { + return nil, ctx.Err() + } if !hasInput(tx, primarySweepID) { - return fmt.Errorf("primarySweepID %v not in tx", primarySweepID) + return nil, fmt.Errorf("primarySweepID %v not in tx", + primarySweepID) } if len(p.outputs)+1 == p.failAt { - return fmt.Errorf("test error in Presign") + return nil, fmt.Errorf("test error in SignTx") } p.outputs = append(p.outputs, btcutil.Amount(tx.TxOut[0].Value)) p.lockTimes = append(p.lockTimes, tx.LockTime) - return nil + return tx, nil } // TestPresign checks that function presign presigns correct set of transactions diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index 3cf21f4fd..e791d1593 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -481,7 +481,7 @@ func (b *batch) Errorf(format string, params ...interface{}) { // checkSweepToAdd checks if a sweep can be added or updated in the batch. The // caller must lock the event loop using scheduleNextCall. The function returns // if the sweep already exists in the batch. If presigned mode is enabled, the -// result depends on the outcome of the method presignedHelper.Presign for a +// result depends on the outcome of the method presignedHelper.SignTx for a // non-empty batch. For an empty batch, the input needs to pass // PresignSweepsGroup. func (b *batch) checkSweepToAdd(_ context.Context, sweep *sweep) (bool, error) { diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index e7fb169db..dd3f4aaca 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -158,18 +158,11 @@ type SignMuSig2 func(ctx context.Context, muSig2Version input.MuSig2Version, // fails (e.g. because one of the inputs is offline), an input can't be added to // a batch. type PresignedHelper interface { - // Presign tries to presign a batch transaction. If the method returns - // nil, it is guaranteed that future calls to SignTx on this set of - // sweeps return valid signed transactions. The implementation should - // first check if this transaction already exists in the store to skip - // cosigning if possible. - Presign(ctx context.Context, primarySweepID wire.OutPoint, - tx *wire.MsgTx, inputAmt btcutil.Amount) error - // DestPkScript returns destination pkScript used by the sweep batch // with the primary outpoint specified. Returns an error, if such tx // doesn't exist. If there are many such transactions, returns any of // pkScript's; all of them should have the same destination pkScript. + // TODO: embed this data into SweepInfo. DestPkScript(ctx context.Context, primarySweepID wire.OutPoint) ([]byte, error) @@ -905,7 +898,7 @@ func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep, // spinUpNewBatch creates new batch, starts it and adds the sweeps to it. If // presigned mode is enabled, the result also depends on outcome of -// presignedHelper.Presign. +// presignedHelper.SignTx. func (b *Batcher) spinUpNewBatch(ctx context.Context, sweeps []*sweep) error { // Spin up a fresh batch. newBatch, err := b.spinUpBatch(ctx) diff --git a/sweepbatcher/sweep_batcher_presigned_test.go b/sweepbatcher/sweep_batcher_presigned_test.go index 56d336c6f..edf1fa9bf 100644 --- a/sweepbatcher/sweep_batcher_presigned_test.go +++ b/sweepbatcher/sweep_batcher_presigned_test.go @@ -96,42 +96,6 @@ func (h *mockPresignedHelper) getTxFeerate(tx *wire.MsgTx, return chainfee.NewSatPerKWeight(fee, weight) } -// Presign tries to presign the transaction. It succeeds if all the inputs -// are online. In case of success it adds the transaction to presignedBatches. -func (h *mockPresignedHelper) Presign(ctx context.Context, - primarySweepID wire.OutPoint, tx *wire.MsgTx, - inputAmt btcutil.Amount) error { - - h.mu.Lock() - defer h.mu.Unlock() - - // Check if such a transaction already exists. This is not only an - // optimization, but also enables re-adding multiple groups if sweeps - // are offline. - wantTxHash := tx.TxHash() - for _, candidate := range h.presignedBatches[primarySweepID] { - if candidate.TxHash() == wantTxHash { - return nil - } - } - - if !hasInput(tx, primarySweepID) { - return fmt.Errorf("primarySweepID %v not in tx", primarySweepID) - } - - if offline := h.offlineInputs(tx); len(offline) != 0 { - return fmt.Errorf("some inputs of tx are offline: %v", offline) - } - - tx = tx.Copy() - h.sign(tx) - h.presignedBatches[primarySweepID] = append( - h.presignedBatches[primarySweepID], tx, - ) - - return nil -} - // DestPkScript returns destination pkScript used in presigned tx sweeping // these inputs. func (h *mockPresignedHelper) DestPkScript(ctx context.Context, @@ -164,6 +128,11 @@ func (h *mockPresignedHelper) SignTx(ctx context.Context, feeRate, minRelayFee) } + if !hasInput(tx, primarySweepID) { + return nil, fmt.Errorf("primarySweepID %v not in tx", + primarySweepID) + } + // If all the inputs are online and loadOnly is not set, sign this exact // transaction. if offline := h.offlineInputs(tx); len(offline) == 0 && !loadOnly { @@ -205,7 +174,8 @@ func (h *mockPresignedHelper) SignTx(ctx context.Context, } if bestTx == nil { - return nil, fmt.Errorf("no such presigned tx found") + return nil, fmt.Errorf("some outpoint is offline and no " + + "suitable presigned tx found") } return bestTx.Copy(), nil @@ -1025,7 +995,7 @@ func testPresigned_presigned_group(t *testing.T, // An attempt to presign must fail. err = batcher.PresignSweepsGroup(ctx, group1, sweepTimeout, destAddr) - require.ErrorContains(t, err, "some inputs of tx are offline") + require.ErrorContains(t, err, "some outpoint is offline") // Enable both outpoints. presignedHelper.SetOutpointOnline(op2, true) From 846246e79ec7821346444f224d5a3f76391f1226 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Wed, 14 May 2025 23:31:07 -0300 Subject: [PATCH 24/26] sweepbatcher: format pkscript as hex --- sweepbatcher/presigned.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sweepbatcher/presigned.go b/sweepbatcher/presigned.go index 7b309470e..2d0658274 100644 --- a/sweepbatcher/presigned.go +++ b/sweepbatcher/presigned.go @@ -3,6 +3,7 @@ package sweepbatcher import ( "bytes" "context" + "encoding/hex" "fmt" "github.com/btcsuite/btcd/blockchain" @@ -602,8 +603,9 @@ func CheckSignedTx(unsignedTx, signedTx *wire.MsgTx, inputAmt btcutil.Amount, unsignedOut := unsignedTx.TxOut[0] signedOut := signedTx.TxOut[0] if !bytes.Equal(unsignedOut.PkScript, signedOut.PkScript) { - return fmt.Errorf("mismatch of output pkScript: %v, %v", - unsignedOut.PkScript, signedOut.PkScript) + return fmt.Errorf("mismatch of output pkScript: %s, %s", + hex.EncodeToString(unsignedOut.PkScript), + hex.EncodeToString(signedOut.PkScript)) } // Find the feerate of signedTx. From 32cc0907d78e1220178ba704035fcac6145fbd1e Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Wed, 14 May 2025 23:36:15 -0300 Subject: [PATCH 25/26] sweepbatcher: more logging in PresignSweepsGroup --- sweepbatcher/sweep_batcher.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index dd3f4aaca..ff014fbac 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -2,6 +2,7 @@ package sweepbatcher import ( "context" + "encoding/hex" "errors" "fmt" "strings" @@ -12,6 +13,7 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btclog/v2" "github.com/btcsuite/btcwallet/chain" @@ -683,7 +685,14 @@ func (b *Batcher) PresignSweepsGroup(ctx context.Context, inputs []Input, if err != nil { return fmt.Errorf("failed to get nextBlockFeeRate: %w", err) } - infof("PresignSweepsGroup: nextBlockFeeRate is %v", nextBlockFeeRate) + destPkscript, err := txscript.PayToAddrScript(destAddress) + if err != nil { + return fmt.Errorf("txscript.PayToAddrScript failed: %w", err) + } + infof("PresignSweepsGroup: nextBlockFeeRate is %v, inputs: %v, "+ + "destAddress: %v, destPkscript: %v sweepTimeout: %d", + nextBlockFeeRate, inputs, destAddress, + hex.EncodeToString(destPkscript), sweepTimeout) sweeps := make([]sweep, len(inputs)) for i, input := range inputs { From 381ab936670f237862a1714eec079915850251e1 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Thu, 15 May 2025 00:37:13 -0300 Subject: [PATCH 26/26] sweepbatcher: fix a bug in dest address selection For presigned possible remaining groups, the destination address of the current batch was used instead of the destination address of an expected future batch. TODO: reproduce in unit test "purged". For this, each swap should have a separate destination address. --- sweepbatcher/presigned.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sweepbatcher/presigned.go b/sweepbatcher/presigned.go index 2d0658274..9db0d56ca 100644 --- a/sweepbatcher/presigned.go +++ b/sweepbatcher/presigned.go @@ -246,7 +246,7 @@ func (b *batch) presign(ctx context.Context, newSweeps []*sweep) error { // Cache the destination address. destAddr, err := getPresignedSweepsDestAddr( - ctx, b.cfg.presignedHelper, b.primarySweepID, + ctx, b.cfg.presignedHelper, primarySweepID, b.cfg.chainParams, ) if err != nil {