diff --git a/loopout.go b/loopout.go index fde6d2826..9cce79d82 100644 --- a/loopout.go +++ b/loopout.go @@ -1148,7 +1148,7 @@ func (s *loopOutSwap) waitForHtlcSpendConfirmedV2(globalCtx context.Context, quitChan := make(chan bool, 1) defer func() { - quitChan <- true + close(quitChan) }() notifier := sweepbatcher.SpendNotifier{ diff --git a/loopout_feerate.go b/loopout_feerate.go index 4cebd1e32..4540644c1 100644 --- a/loopout_feerate.go +++ b/loopout_feerate.go @@ -7,6 +7,7 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/txscript" + "github.com/btcsuite/btcd/wire" "github.com/lightninglabs/loop/loopdb" "github.com/lightninglabs/loop/swap" "github.com/lightninglabs/loop/utils" @@ -71,7 +72,8 @@ func newLoopOutSweepFeerateProvider(sweeper sweeper, // GetMinFeeRate returns minimum required feerate for a sweep by swap hash. func (p *loopOutSweepFeerateProvider) GetMinFeeRate(ctx context.Context, - swapHash lntypes.Hash) (chainfee.SatPerKWeight, error) { + swapHash lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { _, feeRate, err := p.GetConfTargetAndFeeRate(ctx, swapHash) diff --git a/sweepbatcher/presigned.go b/sweepbatcher/presigned.go index 0d52bcbfe..9db0d56ca 100644 --- a/sweepbatcher/presigned.go +++ b/sweepbatcher/presigned.go @@ -3,6 +3,7 @@ package sweepbatcher import ( "bytes" "context" + "encoding/hex" "fmt" "github.com/btcsuite/btcd/blockchain" @@ -36,13 +37,7 @@ func (b *batch) ensurePresigned(ctx context.Context, newSweeps []*sweep, // presignedTxChecker has methods to check if the inputs are presigned. type presignedTxChecker interface { destPkScripter - - // SignTx signs an unsigned transaction or returns a pre-signed tx. - // It is only called with loadOnly=true by ensurePresigned. - SignTx(ctx context.Context, primarySweepID wire.OutPoint, - tx *wire.MsgTx, inputAmt btcutil.Amount, - minRelayFee, feeRate chainfee.SatPerKWeight, - loadOnly bool) (*wire.MsgTx, error) + presigner } // ensurePresigned checks that there is a presigned transaction spending the @@ -251,7 +246,7 @@ func (b *batch) presign(ctx context.Context, newSweeps []*sweep) error { // Cache the destination address. destAddr, err := getPresignedSweepsDestAddr( - ctx, b.cfg.presignedHelper, b.primarySweepID, + ctx, b.cfg.presignedHelper, primarySweepID, b.cfg.chainParams, ) if err != nil { @@ -287,11 +282,12 @@ func (b *batch) presign(ctx context.Context, newSweeps []*sweep) error { // presigner tries to presign a batch transaction. type presigner interface { - // Presign tries to presign a batch transaction. If the method returns - // nil, it is guaranteed that future calls to SignTx on this set of - // sweeps return valid signed transactions. - Presign(ctx context.Context, primarySweepID wire.OutPoint, - tx *wire.MsgTx, inputAmt btcutil.Amount) error + // SignTx signs an unsigned transaction or returns a pre-signed tx. + // It is only called with loadOnly=true by ensurePresigned. + SignTx(ctx context.Context, primarySweepID wire.OutPoint, + tx *wire.MsgTx, inputAmt btcutil.Amount, + minRelayFee, feeRate chainfee.SatPerKWeight, + loadOnly bool) (*wire.MsgTx, error) } // presign tries to presign batch sweep transactions of the sweeps. It signs @@ -370,7 +366,14 @@ func presign(ctx context.Context, presigner presigner, destAddr btcutil.Address, } // Try to presign this transaction. - err = presigner.Presign(ctx, primarySweepID, tx, batchAmt) + const ( + loadOnly = false + minRelayFee = chainfee.AbsoluteFeePerKwFloor + ) + _, err = presigner.SignTx( + ctx, primarySweepID, tx, batchAmt, minRelayFee, fr, + loadOnly, + ) if err != nil { return fmt.Errorf("failed to presign unsigned tx %v "+ "for feeRate %v: %w", tx.TxHash(), fr, err) @@ -405,9 +408,16 @@ func (b *batch) publishPresigned(ctx context.Context) (btcutil.Amount, error, } } + // Determine the current minimum relay fee based on our chain backend. + minRelayFee, err := b.wallet.MinRelayFee(ctx) + if err != nil { + return 0, fmt.Errorf("failed to get minRelayFee: %w", err), + false + } + // Cache current height and desired feerate of the batch. currentHeight := b.currentHeight - feeRate := b.rbfCache.FeeRate + feeRate := max(b.rbfCache.FeeRate, minRelayFee) // Append this sweep to an array of sweeps. This is needed to keep the // order of sweeps stored, as iterating the sweeps map does not @@ -445,13 +455,6 @@ func (b *batch) publishPresigned(ctx context.Context) (btcutil.Amount, error, batchAmt += sweep.value } - // Determine the current minimum relay fee based on our chain backend. - minRelayFee, err := b.wallet.MinRelayFee(ctx) - if err != nil { - return 0, fmt.Errorf("failed to get minRelayFee: %w", err), - false - } - // Get a pre-signed transaction. const loadOnly = false signedTx, err := b.cfg.presignedHelper.SignTx( @@ -506,6 +509,9 @@ func (b *batch) publishPresigned(ctx context.Context) (btcutil.Amount, error, b.batchTxid = &txHash b.batchPkScript = tx.TxOut[0].PkScript + // Update cached FeeRate not to broadcast a tx with lower feeRate. + b.rbfCache.FeeRate = max(b.rbfCache.FeeRate, signedFeeRate) + return fee, nil, true } @@ -597,8 +603,9 @@ func CheckSignedTx(unsignedTx, signedTx *wire.MsgTx, inputAmt btcutil.Amount, unsignedOut := unsignedTx.TxOut[0] signedOut := signedTx.TxOut[0] if !bytes.Equal(unsignedOut.PkScript, signedOut.PkScript) { - return fmt.Errorf("mismatch of output pkScript: %v, %v", - unsignedOut.PkScript, signedOut.PkScript) + return fmt.Errorf("mismatch of output pkScript: %s, %s", + hex.EncodeToString(unsignedOut.PkScript), + hex.EncodeToString(signedOut.PkScript)) } // Find the feerate of signedTx. diff --git a/sweepbatcher/presigned_test.go b/sweepbatcher/presigned_test.go index 60f287764..9f0d8b29f 100644 --- a/sweepbatcher/presigned_test.go +++ b/sweepbatcher/presigned_test.go @@ -553,24 +553,30 @@ type mockPresigner struct { failAt int } -// Presign memorizes the value of the output and fails if the number of +// SignTx memorizes the value of the output and fails if the number of // calls previously made is failAt. -func (p *mockPresigner) Presign(ctx context.Context, - primarySweepID wire.OutPoint, tx *wire.MsgTx, - inputAmt btcutil.Amount) error { +func (p *mockPresigner) SignTx(ctx context.Context, + primarySweepID wire.OutPoint, tx *wire.MsgTx, inputAmt btcutil.Amount, + minRelayFee, feeRate chainfee.SatPerKWeight, + loadOnly bool) (*wire.MsgTx, error) { + + if ctx.Err() != nil { + return nil, ctx.Err() + } if !hasInput(tx, primarySweepID) { - return fmt.Errorf("primarySweepID %v not in tx", primarySweepID) + return nil, fmt.Errorf("primarySweepID %v not in tx", + primarySweepID) } if len(p.outputs)+1 == p.failAt { - return fmt.Errorf("test error in Presign") + return nil, fmt.Errorf("test error in SignTx") } p.outputs = append(p.outputs, btcutil.Amount(tx.TxOut[0].Value)) p.lockTimes = append(p.lockTimes, tx.LockTime) - return nil + return tx, nil } // TestPresign checks that function presign presigns correct set of transactions diff --git a/sweepbatcher/store.go b/sweepbatcher/store.go index 01b9e74a9..db41287bc 100644 --- a/sweepbatcher/store.go +++ b/sweepbatcher/store.go @@ -213,8 +213,8 @@ type dbBatch struct { // ID is the unique identifier of the batch. ID int32 - // State is the current state of the batch. - State string + // Confirmed is set when the batch is fully confirmed. + Confirmed bool // BatchTxid is the txid of the batch transaction. BatchTxid chainhash.Hash @@ -248,18 +248,15 @@ type dbSweep struct { // Amount is the amount of the sweep. Amount btcutil.Amount - // Completed indicates whether this sweep is completed. + // Completed indicates whether this sweep is fully-confirmed. Completed bool } // convertBatchRow converts a batch row from db to a sweepbatcher.Batch struct. func convertBatchRow(row sqlc.SweepBatch) *dbBatch { batch := dbBatch{ - ID: row.ID, - } - - if row.Confirmed { - batch.State = batchOpen + ID: row.ID, + Confirmed: row.Confirmed, } if row.BatchTxID.Valid { @@ -288,7 +285,7 @@ func convertBatchRow(row sqlc.SweepBatch) *dbBatch { // it into the database. func batchToInsertArgs(batch dbBatch) sqlc.InsertBatchParams { args := sqlc.InsertBatchParams{ - Confirmed: false, + Confirmed: batch.Confirmed, BatchTxID: sql.NullString{ Valid: true, String: batch.BatchTxid.String(), @@ -305,10 +302,6 @@ func batchToInsertArgs(batch dbBatch) sqlc.InsertBatchParams { MaxTimeoutDistance: batch.MaxTimeoutDistance, } - if batch.State == batchConfirmed { - args.Confirmed = true - } - return args } @@ -317,7 +310,7 @@ func batchToInsertArgs(batch dbBatch) sqlc.InsertBatchParams { func batchToUpdateArgs(batch dbBatch) sqlc.UpdateBatchParams { args := sqlc.UpdateBatchParams{ ID: batch.ID, - Confirmed: false, + Confirmed: batch.Confirmed, BatchTxID: sql.NullString{ Valid: true, String: batch.BatchTxid.String(), @@ -333,10 +326,6 @@ func batchToUpdateArgs(batch dbBatch) sqlc.UpdateBatchParams { }, } - if batch.State == batchConfirmed { - args.Confirmed = true - } - return args } diff --git a/sweepbatcher/store_mock.go b/sweepbatcher/store_mock.go index 90896aac1..d5a3ffbc1 100644 --- a/sweepbatcher/store_mock.go +++ b/sweepbatcher/store_mock.go @@ -36,7 +36,7 @@ func (s *StoreMock) FetchUnconfirmedSweepBatches(ctx context.Context) ( result := []*dbBatch{} for _, batch := range s.batches { - if batch.State != "confirmed" { + if !batch.Confirmed { result = append(result, &batch) } } @@ -91,7 +91,7 @@ func (s *StoreMock) ConfirmBatch(ctx context.Context, id int32) error { return errors.New("batch not found") } - batch.State = "confirmed" + batch.Confirmed = true s.batches[batch.ID] = batch return nil @@ -201,7 +201,7 @@ func (s *StoreMock) TotalSweptAmount(ctx context.Context, batchID int32) ( return 0, errors.New("batch not found") } - if batch.State != batchConfirmed && batch.State != batchClosed { + if !batch.Confirmed { return 0, nil } @@ -212,5 +212,5 @@ func (s *StoreMock) TotalSweptAmount(ctx context.Context, batchID int32) ( } } - return 0, nil + return total, nil } diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index a437461ad..e791d1593 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -135,7 +135,9 @@ const ( Open batchState = 0 // Closed is the state in which the batch is no longer able to accept - // new sweeps. + // new sweeps. NOTE: this state exists only in-memory. In the database + // it is stored as Open and converted to Closed after a spend + // notification arrives (quickly after start of Batch.Run). Closed batchState = 1 // Confirmed is the state in which the batch transaction has reached the @@ -479,7 +481,7 @@ func (b *batch) Errorf(format string, params ...interface{}) { // checkSweepToAdd checks if a sweep can be added or updated in the batch. The // caller must lock the event loop using scheduleNextCall. The function returns // if the sweep already exists in the batch. If presigned mode is enabled, the -// result depends on the outcome of the method presignedHelper.Presign for a +// result depends on the outcome of the method presignedHelper.SignTx for a // non-empty batch. For an empty batch, the input needs to pass // PresignSweepsGroup. func (b *batch) checkSweepToAdd(_ context.Context, sweep *sweep) (bool, error) { @@ -870,8 +872,8 @@ func (b *batch) Run(ctx context.Context) error { // completes. timerChan := clock.TickAfter(b.cfg.batchPublishDelay) - b.Infof("started, primary %s, total sweeps %d", - b.primarySweepID, len(b.sweeps)) + b.Infof("started, primary %s, total sweeps %d, state: %d", + b.primarySweepID, len(b.sweeps), b.state) for { // If the batch is not empty, find earliest initialDelay. @@ -1279,6 +1281,10 @@ func constructUnsignedTx(sweeps []sweep, address btcutil.Address, return nil, 0, 0, 0, fmt.Errorf("txscript.PayToAddrScript "+ "failed: %w", err) } + if len(batchPkScript) == 0 { + return nil, 0, 0, 0, fmt.Errorf("txscript.PayToAddrScript " + + "returned an empty pkScript") + } // Add the output to weight estimates. err = sweeppkg.AddOutputEstimate(&weightEstimate, address) @@ -1822,27 +1828,22 @@ func (b *batch) monitorSpend(ctx context.Context, primarySweep sweep) error { b.Infof("monitoring spend for outpoint %s", primarySweep.outpoint.String()) - for { + select { + case spend := <-spendChan: select { - case spend := <-spendChan: - select { - case b.spendChan <- spend: + case b.spendChan <- spend: - case <-ctx.Done(): - } - - return + case <-ctx.Done(): + } - case err := <-spendErr: - b.writeToErrChan( - fmt.Errorf("spend error: %w", err), - ) + case err := <-spendErr: + b.writeToSpendErrChan(ctx, err) - return + b.writeToErrChan( + fmt.Errorf("spend error: %w", err), + ) - case <-ctx.Done(): - return - } + case <-ctx.Done(): } }() @@ -1876,39 +1877,33 @@ func (b *batch) monitorConfirmations(ctx context.Context) error { defer cancel() defer b.wg.Done() - for { + select { + case conf := <-confChan: select { - case conf := <-confChan: - select { - case b.confChan <- conf: - - case <-ctx.Done(): - } - - return - - case err := <-errChan: - b.writeToErrChan(fmt.Errorf("confirmations "+ - "monitoring error: %w", err)) - - return - - case <-reorgChan: - // A re-org has been detected. We set the batch - // state back to open since our batch - // transaction is no longer present in any - // block. We can accept more sweeps and try to - // publish new transactions, at this point we - // need to monitor again for a new spend. - select { - case b.reorgChan <- struct{}{}: - case <-ctx.Done(): - } - return + case b.confChan <- conf: + + case <-ctx.Done(): + } + case err := <-errChan: + b.writeToConfErrChan(ctx, err) + + b.writeToErrChan(fmt.Errorf("confirmations "+ + "monitoring error: %w", err)) + + case <-reorgChan: + // A re-org has been detected. We set the batch + // state back to open since our batch + // transaction is no longer present in any + // block. We can accept more sweeps and try to + // publish new transactions, at this point we + // need to monitor again for a new spend. + select { + case b.reorgChan <- struct{}{}: case <-ctx.Done(): - return } + + case <-ctx.Done(): } }() @@ -1932,12 +1927,12 @@ func getFeePortionForSweep(spendTx *wire.MsgTx, numSweeps int, } // getFeePortionPaidBySweep returns the fee portion that the sweep should pay -// for the batch transaction. If the sweep is the first sweep in the batch, it +// for the batch transaction. If the sweep is the primary sweep in the batch, it // pays the rounding difference. -func getFeePortionPaidBySweep(spendTx *wire.MsgTx, feePortionPerSweep, - roundingDiff btcutil.Amount, sweep *sweep) btcutil.Amount { +func getFeePortionPaidBySweep(feePortionPerSweep, roundingDiff btcutil.Amount, + primary bool) btcutil.Amount { - if bytes.Equal(spendTx.TxIn[0].SignatureScript, sweep.htlc.SigScript) { + if primary { return feePortionPerSweep + roundingDiff } @@ -1948,7 +1943,6 @@ func getFeePortionPaidBySweep(spendTx *wire.MsgTx, feePortionPerSweep, func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { var ( txHash = spendTx.TxHash() - purgeList = make([]SweepRequest, 0, len(b.sweeps)) notifyList = make([]sweep, 0, len(b.sweeps)) ) b.batchTxid = &txHash @@ -1958,7 +1952,126 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { b.Warnf("transaction %v has no outputs", txHash) } - // Determine if we should use presigned mode for the batch. + // Make a set of confirmed sweeps. + confirmedSet := make(map[wire.OutPoint]struct{}, len(spendTx.TxIn)) + for _, txIn := range spendTx.TxIn { + confirmedSet[txIn.PreviousOutPoint] = struct{}{} + } + + // As a previous version of the batch transaction may get confirmed, + // which does not contain the latest sweeps, we need to detect the + // sweeps that did not make it to the confirmed transaction and feed + // them back to the batcher. This will ensure that the sweeps will enter + // a new batch instead of remaining dangling. + var ( + totalSweptAmt btcutil.Amount + confirmedSweeps = []wire.OutPoint{} + ) + for _, sweep := range b.sweeps { + // Skip sweeps that were not included into the confirmed tx. + _, found := confirmedSet[sweep.outpoint] + if !found { + continue + } + + totalSweptAmt += sweep.value + notifyList = append(notifyList, sweep) + confirmedSweeps = append(confirmedSweeps, sweep.outpoint) + } + + // Calculate the fee portion that each sweep should pay for the batch. + feePortionPaidPerSweep, roundingDifference := getFeePortionForSweep( + spendTx, len(notifyList), totalSweptAmt, + ) + + // Calculate fees per swaps. Only the first sweep in a swap has a + // notifier, so we calculate total fee per swap and send it to a sweep + // having that swap and a notifier. + swap2fee := make(map[lntypes.Hash]btcutil.Amount) + for _, sweep := range notifyList { + primary := sweep.outpoint == b.primarySweepID + + swap2fee[sweep.swapHash] += getFeePortionPaidBySweep( + feePortionPaidPerSweep, roundingDifference, primary, + ) + } + + // Now send notifications to notifiers. + for _, sweep := range notifyList { + // If the sweep's notifier is empty then this means that a swap + // is not waiting to read an update from it or this is not the + // first sweep in a swap, so we can skip the notification part. + if sweep.notifier == nil || + *sweep.notifier == (SpendNotifier{}) { + + continue + } + + // Make sure there is only one sweep with a notifier per swap + // hash, otherwise our fee calculation is incorrect. + fee, has := swap2fee[sweep.swapHash] + if !has { + return fmt.Errorf("no fee for swap %v; maybe "+ + "multiple sweeps with a notifier per swap?", + sweep.swapHash) + } + delete(swap2fee, sweep.swapHash) + + spendDetail := SpendDetail{ + Tx: spendTx, + OnChainFeePortion: fee, + } + + // Dispatch the sweep notifier, we don't care about the outcome + // of this action so we don't wait for it. + go func() { + // Make sure this context doesn't expire so we + // successfully notify the caller. + ctx := context.WithoutCancel(ctx) + + sweep.notifySweepSpend(ctx, &spendDetail) + }() + } + + b.Infof("spent, confirmed sweeps: %v", confirmedSweeps) + + // We are no longer able to accept new sweeps, so we mark the batch as + // closed and persist on storage. + b.state = Closed + + if err := b.persist(ctx); err != nil { + return fmt.Errorf("saving batch failed: %w", err) + } + + if err := b.monitorConfirmations(ctx); err != nil { + return fmt.Errorf("monitorConfirmations failed: %w", err) + } + + return nil +} + +// handleConf handles a confirmation notification. This is the final step of the +// batch. Here we signal to the batcher that this batch was completed. +func (b *batch) handleConf(ctx context.Context, + conf *chainntnfs.TxConfirmation) error { + + spendTx := conf.Tx + txHash := spendTx.TxHash() + if b.batchTxid == nil || *b.batchTxid != txHash { + b.Warnf("Mismatch of batch txid: tx in spend notification had "+ + "txid %v, but confirmation notification has txif %v. "+ + "Using the later.", b.batchTxid, txHash) + } + b.batchTxid = &txHash + + b.Infof("confirmed in txid %s", b.batchTxid) + b.state = Confirmed + + if err := b.persist(ctx); err != nil { + return fmt.Errorf("saving batch failed: %w", err) + } + + // If the batch is in presigned mode, cleanup presignedHelper. presigned, err := b.isPresigned() if err != nil { return fmt.Errorf("failed to determine if the batch %d uses "+ @@ -1976,40 +2089,43 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { b.id, err) } + // Make a set of confirmed sweeps. + confirmedSet := make(map[wire.OutPoint]struct{}, len(spendTx.TxIn)) + for _, txIn := range spendTx.TxIn { + confirmedSet[txIn.PreviousOutPoint] = struct{}{} + } + // As a previous version of the batch transaction may get confirmed, // which does not contain the latest sweeps, we need to detect the // sweeps that did not make it to the confirmed transaction and feed // them back to the batcher. This will ensure that the sweeps will enter // a new batch instead of remaining dangling. var ( - totalSweptAmt btcutil.Amount confirmedSweeps = []wire.OutPoint{} - purgedSweeps = []wire.OutPoint{} - purgedSwaps = []lntypes.Hash{} + purgeList = make([]SweepRequest, 0, len(b.sweeps)) ) for _, sweep := range allSweeps { - found := false - - for _, txIn := range spendTx.TxIn { - if txIn.PreviousOutPoint == sweep.outpoint { - found = true - totalSweptAmt += sweep.value - notifyList = append(notifyList, sweep) - confirmedSweeps = append( - confirmedSweeps, sweep.outpoint, - ) - - break + _, found := confirmedSet[sweep.outpoint] + if found { + // Save the sweep as completed. Note that sweeps are + // marked completed after the batch is marked confirmed + // because the check in handleSweeps checks sweep's + // status first and then checks the batch status. + err := b.persistSweep(ctx, sweep, true) + if err != nil { + return err } + + confirmedSweeps = append( + confirmedSweeps, sweep.outpoint, + ) + + continue } // If the sweep's outpoint was not found in the transaction's // inputs this means it was left out. So we delete it from this // batch and feed it back to the batcher. - if found { - continue - } - newSweep := sweep delete(b.sweeps, sweep.outpoint) @@ -2041,6 +2157,10 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { }) } } + var ( + purgedSweeps = []wire.OutPoint{} + purgedSwaps = []lntypes.Hash{} + ) for _, sweepReq := range purgeList { purgedSwaps = append(purgedSwaps, sweepReq.SwapHash) for _, input := range sweepReq.Inputs { @@ -2048,45 +2168,8 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { } } - // Calculate the fee portion that each sweep should pay for the batch. - feePortionPaidPerSweep, roundingDifference := getFeePortionForSweep( - spendTx, len(notifyList), totalSweptAmt, - ) - - for _, sweep := range notifyList { - // Save the sweep as completed. - err := b.persistSweep(ctx, sweep, true) - if err != nil { - return err - } - - // If the sweep's notifier is empty then this means that a swap - // is not waiting to read an update from it, so we can skip - // the notification part. - if sweep.notifier == nil || - *sweep.notifier == (SpendNotifier{}) { - - continue - } - - spendDetail := SpendDetail{ - Tx: spendTx, - OnChainFeePortion: getFeePortionPaidBySweep( - spendTx, feePortionPaidPerSweep, - roundingDifference, &sweep, - ), - } - - // Dispatch the sweep notifier, we don't care about the outcome - // of this action so we don't wait for it. - go func() { - // Make sure this context doesn't expire so we - // successfully notify the caller. - ctx := context.WithoutCancel(ctx) - - sweep.notifySweepSpend(ctx, &spendDetail) - }() - } + b.Infof("fully confirmed sweeps: %v, purged sweeps: %v, "+ + "purged swaps: %v", confirmedSweeps, purgedSweeps, purgedSwaps) // Proceed with purging the sweeps. This will feed the sweeps that // didn't make it to the confirmed batch transaction back to the batcher @@ -2108,46 +2191,6 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { } }() - b.Infof("spent, confirmed sweeps: %v, purged sweeps: %v, "+ - "purged swaps: %v, purged groups: %v", confirmedSweeps, - purgedSweeps, purgedSwaps, len(purgeList)) - - err = b.monitorConfirmations(ctx) - if err != nil { - return err - } - - // We are no longer able to accept new sweeps, so we mark the batch as - // closed and persist on storage. - b.state = Closed - - return b.persist(ctx) -} - -// handleConf handles a confirmation notification. This is the final step of the -// batch. Here we signal to the batcher that this batch was completed. We also -// cleanup up presigned transactions whose primarySweepID is one of the sweeps -// that were spent and fully confirmed: such a transaction can't be broadcasted -// since it is either in a block or double-spends one of spent outputs. -func (b *batch) handleConf(ctx context.Context, - conf *chainntnfs.TxConfirmation) error { - - spendTx := conf.Tx - txHash := spendTx.TxHash() - if b.batchTxid == nil || *b.batchTxid != txHash { - b.Warnf("Mismatch of batch txid: tx in spend notification had "+ - "txid %v, but confirmation notification has txif %v. "+ - "Using the later.", b.batchTxid, txHash) - } - b.batchTxid = &txHash - - // If the batch is in presigned mode, cleanup presignedHelper. - presigned, err := b.isPresigned() - if err != nil { - return fmt.Errorf("failed to determine if the batch %d uses "+ - "presigned mode: %w", b.id, err) - } - if presigned { b.Infof("Cleaning up presigned store") @@ -2163,10 +2206,33 @@ func (b *batch) handleConf(ctx context.Context, } } - b.Infof("confirmed in txid %s", b.batchTxid) - b.state = Confirmed + // Send the confirmation to all the notifiers. + for _, s := range b.sweeps { + // If the sweep's notifier is empty then this means that + // a swap is not waiting to read an update from it, so + // we can skip the notification part. + if s.notifier == nil || s.notifier.ConfChan == nil { + continue + } + + // Notify the caller in a goroutine to avoid possible dead-lock. + go func(notifier *SpendNotifier) { + // Note that we don't unblock on ctx, because it will + // expire soon, when batch.Run completes. The caller is + // responsible to consume ConfChan or close QuitChan. + select { + // Try to write the confirmation to the notification + // channel. + case notifier.ConfChan <- conf: - return b.store.ConfirmBatch(ctx, b.id) + // If a quit signal was provided by the swap, + // continue. + case <-notifier.QuitChan: + } + }(s.notifier) + } + + return nil } // isComplete returns true if the batch is completed. This method is used by the @@ -2189,7 +2255,7 @@ func (b *batch) persist(ctx context.Context) error { bch := &dbBatch{} bch.ID = b.id - bch.State = stateEnumToString(b.state) + bch.Confirmed = b.state == Confirmed if b.batchTxid != nil { bch.BatchTxid = *b.batchTxid @@ -2248,7 +2314,7 @@ func (b *batch) getBatchDestAddr(ctx context.Context) (btcutil.Address, error) { func (b *batch) insertAndAcquireID(ctx context.Context) (int32, error) { bch := &dbBatch{} - bch.State = stateEnumToString(b.state) + bch.Confirmed = b.state == Confirmed bch.MaxTimeoutDistance = b.cfg.maxTimeoutDistance id, err := b.store.InsertSweepBatch(ctx, bch) @@ -2285,6 +2351,81 @@ func (b *batch) writeToErrChan(err error) { } } +// writeToSpendErrChan sends an error to spend error channels of all the sweeps. +func (b *batch) writeToSpendErrChan(ctx context.Context, spendErr error) { + done, err := b.scheduleNextCall() + if err != nil { + done() + + return + } + notifiers := make([]*SpendNotifier, 0, len(b.sweeps)) + for _, s := range b.sweeps { + // If the sweep's notifier is empty then this means that a swap + // is not waiting to read an update from it, so we can skip + // the notification part. + if s.notifier == nil || s.notifier.SpendErrChan == nil { + continue + } + + notifiers = append(notifiers, s.notifier) + } + done() + + for _, notifier := range notifiers { + select { + // Try to write the error to the notification + // channel. + case notifier.SpendErrChan <- spendErr: + + // If a quit signal was provided by the swap, + // continue. + case <-notifier.QuitChan: + + // If the context was canceled, stop. + case <-ctx.Done(): + } + } +} + +// writeToConfErrChan sends an error to confirmation error channels of all the +// sweeps. +func (b *batch) writeToConfErrChan(ctx context.Context, confErr error) { + done, err := b.scheduleNextCall() + if err != nil { + done() + + return + } + notifiers := make([]*SpendNotifier, 0, len(b.sweeps)) + for _, s := range b.sweeps { + // If the sweep's notifier is empty then this means that a swap + // is not waiting to read an update from it, so we can skip + // the notification part. + if s.notifier == nil || s.notifier.ConfErrChan == nil { + continue + } + + notifiers = append(notifiers, s.notifier) + } + done() + + for _, notifier := range notifiers { + select { + // Try to write the error to the notification + // channel. + case notifier.ConfErrChan <- confErr: + + // If a quit signal was provided by the swap, + // continue. + case <-notifier.QuitChan: + + // If the context was canceled, stop. + case <-ctx.Done(): + } + } +} + func (b *batch) persistSweep(ctx context.Context, sweep sweep, completed bool) error { @@ -2312,18 +2453,3 @@ func clampBatchFee(fee btcutil.Amount, return fee } - -func stateEnumToString(state batchState) string { - switch state { - case Open: - return batchOpen - - case Closed: - return batchClosed - - case Confirmed: - return batchConfirmed - } - - return "" -} diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index 03b697b19..ff014fbac 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -2,6 +2,7 @@ package sweepbatcher import ( "context" + "encoding/hex" "errors" "fmt" "strings" @@ -12,6 +13,7 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btclog/v2" "github.com/btcsuite/btcwallet/chain" @@ -20,6 +22,7 @@ import ( "github.com/lightninglabs/loop/loopdb" "github.com/lightninglabs/loop/swap" "github.com/lightninglabs/loop/utils" + "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/lntypes" @@ -31,18 +34,6 @@ const ( // of sweeps that can appear in the same batch. defaultMaxTimeoutDistance = 288 - // batchOpen is the string representation of the state of a batch that - // is open. - batchOpen = "open" - - // batchClosed is the string representation of the state of a batch - // that is closed. - batchClosed = "closed" - - // batchConfirmed is the string representation of the state of a batch - // that is confirmed. - batchConfirmed = "confirmed" - // defaultMainnetPublishDelay is the default publish delay that is used // for mainnet. defaultMainnetPublishDelay = 5 * time.Second @@ -169,18 +160,11 @@ type SignMuSig2 func(ctx context.Context, muSig2Version input.MuSig2Version, // fails (e.g. because one of the inputs is offline), an input can't be added to // a batch. type PresignedHelper interface { - // Presign tries to presign a batch transaction. If the method returns - // nil, it is guaranteed that future calls to SignTx on this set of - // sweeps return valid signed transactions. The implementation should - // first check if this transaction already exists in the store to skip - // cosigning if possible. - Presign(ctx context.Context, primarySweepID wire.OutPoint, - tx *wire.MsgTx, inputAmt btcutil.Amount) error - // DestPkScript returns destination pkScript used by the sweep batch // with the primary outpoint specified. Returns an error, if such tx // doesn't exist. If there are many such transactions, returns any of // pkScript's; all of them should have the same destination pkScript. + // TODO: embed this data into SweepInfo. DestPkScript(ctx context.Context, primarySweepID wire.OutPoint) ([]byte, error) @@ -212,8 +196,8 @@ type VerifySchnorrSig func(pubKey *btcec.PublicKey, hash, sig []byte) error // FeeRateProvider is a function that returns min fee rate of a batch sweeping // the UTXO of the swap. -type FeeRateProvider func(ctx context.Context, - swapHash lntypes.Hash) (chainfee.SatPerKWeight, error) +type FeeRateProvider func(ctx context.Context, swapHash lntypes.Hash, + utxo wire.OutPoint) (chainfee.SatPerKWeight, error) // InitialDelayProvider returns the duration after which a newly created batch // is first published. It allows to customize the duration based on total value @@ -284,7 +268,7 @@ type addSweepsRequest struct { notifier *SpendNotifier // completed is set if the sweep is spent and the spending transaction - // is confirmed. + // is fully confirmed. completed bool // parentBatch is the parent batch of this sweep. It is loaded ony if @@ -307,13 +291,21 @@ type SpendDetail struct { // that the sweep was successful. type SpendNotifier struct { // SpendChan is a channel where the spend details are received. - SpendChan chan *SpendDetail + SpendChan chan<- *SpendDetail // SpendErrChan is a channel where spend errors are received. - SpendErrChan chan error + SpendErrChan chan<- error + + // ConfChan is a channel where the confirmation details are received. + // This channel is optional. + ConfChan chan<- *chainntnfs.TxConfirmation + + // ConfErrChan is a channel where confirmation errors are received. + // This channel is optional. + ConfErrChan chan<- error // QuitChan is a channel that can be closed to stop the notifier. - QuitChan chan bool + QuitChan <-chan bool } var ( @@ -693,7 +685,14 @@ func (b *Batcher) PresignSweepsGroup(ctx context.Context, inputs []Input, if err != nil { return fmt.Errorf("failed to get nextBlockFeeRate: %w", err) } - infof("PresignSweepsGroup: nextBlockFeeRate is %v", nextBlockFeeRate) + destPkscript, err := txscript.PayToAddrScript(destAddress) + if err != nil { + return fmt.Errorf("txscript.PayToAddrScript failed: %w", err) + } + infof("PresignSweepsGroup: nextBlockFeeRate is %v, inputs: %v, "+ + "destAddress: %v, destPkscript: %v sweepTimeout: %d", + nextBlockFeeRate, inputs, destAddress, + hex.EncodeToString(destPkscript), sweepTimeout) sweeps := make([]sweep, len(inputs)) for i, input := range inputs { @@ -760,7 +759,7 @@ func (b *Batcher) AddSweep(ctx context.Context, sweepReq *SweepRequest) error { "sweep %x: %w", sweep.swapHash[:6], err) } - if parentBatch.State == batchConfirmed { + if parentBatch.Confirmed { fullyConfirmed = true } } @@ -780,8 +779,8 @@ func (b *Batcher) AddSweep(ctx context.Context, sweepReq *SweepRequest) error { } infof("Batcher adding sweep group of %d sweeps with primarySweep %x, "+ - "presigned=%v, completed=%v", len(sweeps), sweep.swapHash[:6], - sweep.presigned, completed) + "presigned=%v, fully_confirmed=%v", len(sweeps), + sweep.swapHash[:6], sweep.presigned, completed) req := &addSweepsRequest{ sweeps: sweeps, @@ -841,14 +840,10 @@ func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep, // If the sweep has already been completed in a confirmed batch then we // can't attach its notifier to the batch as that is no longer running. // Instead we directly detect and return the spend here. - if completed && *notifier != (SpendNotifier{}) { - // The parent batch is indeed confirmed, meaning it is complete - // and we won't be able to attach this sweep to it. - if parentBatch.State == batchConfirmed { - return b.monitorSpendAndNotify( - ctx, sweep, parentBatch.ID, notifier, - ) - } + if completed && parentBatch.Confirmed { + return b.monitorSpendAndNotify( + ctx, sweeps, parentBatch.ID, notifier, + ) } sweep.notifier = notifier @@ -912,7 +907,7 @@ func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep, // spinUpNewBatch creates new batch, starts it and adds the sweeps to it. If // presigned mode is enabled, the result also depends on outcome of -// presignedHelper.Presign. +// presignedHelper.SignTx. func (b *Batcher) spinUpNewBatch(ctx context.Context, sweeps []*sweep) error { // Spin up a fresh batch. newBatch, err := b.spinUpBatch(ctx) @@ -1093,15 +1088,18 @@ func (b *Batcher) FetchUnconfirmedBatches(ctx context.Context) ([]*batch, batch := batch{} batch.id = bch.ID - switch bch.State { - case batchOpen: - batch.state = Open - - case batchClosed: - batch.state = Closed - - case batchConfirmed: + if bch.Confirmed { batch.state = Confirmed + } else { + // We don't store Closed state separately in DB. + // If the batch is closed (included into a block, but + // not fully confirmed), it is now considered Open + // again. It will receive a spending notification as + // soon as it starts, so it is not an issue. If a sweep + // manages to be added during this time, it will be + // detected as missing when analyzing the spend + // notification and will be added to new batch. + batch.state = Open } batch.batchTxid = &bch.BatchTxid @@ -1123,83 +1121,207 @@ func (b *Batcher) FetchUnconfirmedBatches(ctx context.Context) ([]*batch, } // monitorSpendAndNotify monitors the spend of a specific outpoint and writes -// the response back to the response channel. -func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, +// the response back to the response channel. It is called if the batch is fully +// confirmed and we just need to deliver the data back to the caller though +// SpendNotifier. +func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweeps []*sweep, parentBatchID int32, notifier *SpendNotifier) error { + // If the caller has not provided a notifier, stop. + if notifier == nil || *notifier == (SpendNotifier{}) { + return nil + } + spendCtx, cancel := context.WithCancel(ctx) - defer cancel() // Then we get the total amount that was swept by the batch. totalSwept, err := b.store.TotalSweptAmount(ctx, parentBatchID) if err != nil { + cancel() + + return err + } + + // Find the primarySweepID. + dbSweeps, err := b.store.FetchBatchSweeps(ctx, parentBatchID) + if err != nil { + cancel() + return err } + primarySweepID := dbSweeps[0].Outpoint + + sweep := sweeps[0] spendChan, spendErr, err := b.chainNotifier.RegisterSpendNtfn( spendCtx, &sweep.outpoint, sweep.htlc.PkScript, sweep.initiationHeight, ) if err != nil { + cancel() + return err } b.wg.Add(1) go func() { + defer cancel() defer b.wg.Done() infof("Batcher monitoring spend for swap %x", sweep.swapHash[:6]) - for { - select { - case spend := <-spendChan: - spendTx := spend.SpendingTx - // Calculate the fee portion that each sweep - // should pay for the batch. - feePortionPerSweep, roundingDifference := - getFeePortionForSweep( - spendTx, len(spendTx.TxIn), - totalSwept, - ) + select { + case spend := <-spendChan: + spendTx := spend.SpendingTx + + // Calculate the fee portion that each sweep should pay + // for the batch. + feePortionPerSweep, roundingDifference := + getFeePortionForSweep( + spendTx, len(spendTx.TxIn), + totalSwept, + ) + + // Sum onchain fee across all the sweeps of the swap. + var fee btcutil.Amount + for _, s := range sweeps { + isFirst := s.outpoint == primarySweepID - onChainFeePortion := getFeePortionPaidBySweep( - spendTx, feePortionPerSweep, - roundingDifference, sweep, + fee += getFeePortionPaidBySweep( + feePortionPerSweep, roundingDifference, + isFirst, ) + } - // Notify the requester of the spend - // with the spend details, including the fee - // portion for this particular sweep. - spendDetail := &SpendDetail{ - Tx: spendTx, - OnChainFeePortion: onChainFeePortion, + // Notify the requester of the spend with the spend + // details, including the fee portion for this + // particular sweep. + spendDetail := &SpendDetail{ + Tx: spendTx, + OnChainFeePortion: fee, + } + + select { + // Try to write the update to the notification channel. + case notifier.SpendChan <- spendDetail: + err := b.monitorConfAndNotify( + ctx, sweep, notifier, spendTx, + ) + if err != nil { + b.writeToErrChan( + ctx, fmt.Errorf("monitor conf "+ + "failed: %w", err), + ) } + // If a quit signal was provided by the swap, continue. + case <-notifier.QuitChan: + + // If the context was canceled, stop. + case <-ctx.Done(): + } + + return + + case err := <-spendErr: + select { + // Try to write the error to the notification + // channel. + case notifier.SpendErrChan <- err: + + // If a quit signal was provided by the swap, + // continue. + case <-notifier.QuitChan: + + // If the context was canceled, stop. + case <-ctx.Done(): + } + + b.writeToErrChan( + ctx, fmt.Errorf("spend error: %w", err), + ) + + return + + // If a quit signal was provided by the swap, continue. + case <-notifier.QuitChan: + return + + // If the context was canceled, stop. + case <-ctx.Done(): + return + } + }() + + return nil +} + +// monitorConfAndNotify monitors the confirmation of a specific transaction and +// writes the response back to the response channel. It is called if the batch +// is fully confirmed and we just need to deliver the data back to the caller +// though SpendNotifier. +func (b *Batcher) monitorConfAndNotify(ctx context.Context, sweep *sweep, + notifier *SpendNotifier, spendTx *wire.MsgTx) error { + + // If confirmation notifications were not requested, stop. + if notifier.ConfChan == nil && notifier.ConfErrChan == nil { + return nil + } + + batchTxid := spendTx.TxHash() + + if len(spendTx.TxOut) != 1 { + return fmt.Errorf("unexpected number of outputs in batch: %d, "+ + "want %d", len(spendTx.TxOut), 1) + } + batchPkScript := spendTx.TxOut[0].PkScript + + reorgChan := make(chan struct{}) + + confCtx, cancel := context.WithCancel(ctx) + + confChan, errChan, err := b.chainNotifier.RegisterConfirmationsNtfn( + confCtx, &batchTxid, batchPkScript, batchConfHeight, + sweep.initiationHeight, lndclient.WithReOrgChan(reorgChan), + ) + if err != nil { + cancel() + return err + } + + b.wg.Add(1) + go func() { + defer cancel() + defer b.wg.Done() + + select { + case conf := <-confChan: + if notifier.ConfChan != nil { select { - case notifier.SpendChan <- spendDetail: + case notifier.ConfChan <- conf: + case <-notifier.QuitChan: case <-ctx.Done(): } + } - return - - case err := <-spendErr: + case err := <-errChan: + if notifier.ConfErrChan != nil { select { - case notifier.SpendErrChan <- err: + case notifier.ConfErrChan <- err: + case <-notifier.QuitChan: case <-ctx.Done(): } + } - b.writeToErrChan( - ctx, fmt.Errorf("spend error: %w", err), - ) + b.writeToErrChan(ctx, fmt.Errorf("confirmations "+ + "monitoring error: %w", err)) - return + case <-reorgChan: + // A re-org has been detected, but the batch is fully + // confirmed and this is unexpected. Crash the batcher. + b.writeToErrChan(ctx, fmt.Errorf("unexpected reorg")) - case <-notifier.QuitChan: - return - - case <-ctx.Done(): - return - } + case <-ctx.Done(): } }() @@ -1324,11 +1446,18 @@ func (b *Batcher) loadSweep(ctx context.Context, swapHash lntypes.Hash, swapHash[:6], err) } + // Make sure that PkScript of the coin is filled. Otherwise + // RegisterSpendNtfn fails. + if len(s.HTLC.PkScript) == 0 { + return nil, fmt.Errorf("sweep data for %x doesn't have "+ + "HTLC.PkScript set", swapHash[:6]) + } + // Find minimum fee rate for the sweep. Use customFeeRate if it is // provided, otherwise use wallet's EstimateFeeRate. var minFeeRate chainfee.SatPerKWeight if b.customFeeRate != nil { - minFeeRate, err = b.customFeeRate(ctx, swapHash) + minFeeRate, err = b.customFeeRate(ctx, swapHash, outpoint) if err != nil { return nil, fmt.Errorf("failed to fetch min fee rate "+ "for %x: %w", swapHash[:6], err) diff --git a/sweepbatcher/sweep_batcher_presigned_test.go b/sweepbatcher/sweep_batcher_presigned_test.go index 86f626cbc..edf1fa9bf 100644 --- a/sweepbatcher/sweep_batcher_presigned_test.go +++ b/sweepbatcher/sweep_batcher_presigned_test.go @@ -14,6 +14,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btclog/v2" "github.com/lightninglabs/loop/loopdb" + "github.com/lightninglabs/loop/swap" "github.com/lightninglabs/loop/test" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/lntypes" @@ -95,42 +96,6 @@ func (h *mockPresignedHelper) getTxFeerate(tx *wire.MsgTx, return chainfee.NewSatPerKWeight(fee, weight) } -// Presign tries to presign the transaction. It succeeds if all the inputs -// are online. In case of success it adds the transaction to presignedBatches. -func (h *mockPresignedHelper) Presign(ctx context.Context, - primarySweepID wire.OutPoint, tx *wire.MsgTx, - inputAmt btcutil.Amount) error { - - h.mu.Lock() - defer h.mu.Unlock() - - // Check if such a transaction already exists. This is not only an - // optimization, but also enables re-adding multiple groups if sweeps - // are offline. - wantTxHash := tx.TxHash() - for _, candidate := range h.presignedBatches[primarySweepID] { - if candidate.TxHash() == wantTxHash { - return nil - } - } - - if !hasInput(tx, primarySweepID) { - return fmt.Errorf("primarySweepID %v not in tx", primarySweepID) - } - - if offline := h.offlineInputs(tx); len(offline) != 0 { - return fmt.Errorf("some inputs of tx are offline: %v", offline) - } - - tx = tx.Copy() - h.sign(tx) - h.presignedBatches[primarySweepID] = append( - h.presignedBatches[primarySweepID], tx, - ) - - return nil -} - // DestPkScript returns destination pkScript used in presigned tx sweeping // these inputs. func (h *mockPresignedHelper) DestPkScript(ctx context.Context, @@ -158,6 +123,16 @@ func (h *mockPresignedHelper) SignTx(ctx context.Context, h.mu.Lock() defer h.mu.Unlock() + if feeRate < minRelayFee { + return nil, fmt.Errorf("feeRate (%v) is below minRelayFee (%v)", + feeRate, minRelayFee) + } + + if !hasInput(tx, primarySweepID) { + return nil, fmt.Errorf("primarySweepID %v not in tx", + primarySweepID) + } + // If all the inputs are online and loadOnly is not set, sign this exact // transaction. if offline := h.offlineInputs(tx); len(offline) == 0 && !loadOnly { @@ -199,7 +174,8 @@ func (h *mockPresignedHelper) SignTx(ctx context.Context, } if bestTx == nil { - return nil, fmt.Errorf("no such presigned tx found") + return nil, fmt.Errorf("some outpoint is offline and no " + + "suitable presigned tx found") } return bestTx.Copy(), nil @@ -232,13 +208,18 @@ func (h *mockPresignedHelper) FetchSweep(_ context.Context, h.mu.Lock() defer h.mu.Unlock() - _, has := h.onlineOutpoints[utxo] + // Find IsPresigned. + _, isPresigned := h.onlineOutpoints[utxo] return &SweepInfo{ // Set Timeout to prevent warning messages about timeout=0. Timeout: sweepTimeout, - IsPresigned: has, + IsPresigned: isPresigned, + + HTLC: swap.Htlc{ + PkScript: []byte{10, 11, 12}, + }, }, nil } @@ -254,8 +235,8 @@ func testPresigned_forgotten_presign(t *testing.T, ctx, cancel := context.WithCancel(context.Background()) defer cancel() - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return chainfee.SatPerKWeight(10_000), nil } @@ -330,8 +311,8 @@ func testPresigned_input1_offline_then_input2(t *testing.T, setFeeRate := func(feeRate chainfee.SatPerKWeight) { currentFeeRate = feeRate } - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return currentFeeRate, nil } @@ -486,6 +467,118 @@ func testPresigned_input1_offline_then_input2(t *testing.T, require.NoError(t, err) } +// testPresigned_min_relay_fee tests that online and presigned transactions +// comply with min_relay_fee. +func testPresigned_min_relay_fee(t *testing.T, + batcherStore testBatcherStore) { + + defer test.Guard(t)() + + lnd := test.NewMockLnd() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const inputAmt = 1_000_000 + + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { + + return chainfee.FeePerKwFloor, nil + } + + presignedHelper := newMockPresignedHelper() + + batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer, + testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams, + batcherStore, presignedHelper, + WithCustomFeeRate(customFeeRate), + WithPresignedHelper(presignedHelper)) + go func() { + err := batcher.Run(ctx) + checkBatcherError(t, err) + }() + + // Set high min_relay_fee. + lnd.SetMinRelayFee(400) + + // Create the first sweep. + swapHash1 := lntypes.Hash{1, 1, 1} + op1 := wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + } + sweepReq1 := SweepRequest{ + SwapHash: swapHash1, + Inputs: []Input{{ + Value: inputAmt, + Outpoint: op1, + }}, + Notifier: &dummyNotifier, + } + + // Enable the input and presign. + presignedHelper.SetOutpointOnline(op1, true) + err := batcher.PresignSweepsGroup( + ctx, []Input{{Outpoint: op1, Value: inputAmt}}, + sweepTimeout, destAddr, + ) + require.NoError(t, err) + + // Deliver sweep request to batcher. + require.NoError(t, batcher.AddSweep(ctx, &sweepReq1)) + + // Since a batch was created we check that it registered for its primary + // sweep's spend. + <-lnd.RegisterSpendChannel + + // Wait for a transactions to be published. + tx := <-lnd.TxPublishChannel + gotFeeRate := presignedHelper.getTxFeerate(tx, inputAmt) + require.Equal(t, chainfee.SatPerKWeight(402), gotFeeRate) + + // Now decrease min_relay_fee and make sure fee rate doesn't decrease. + // The only difference of tx2 is a higher lock_time. + lnd.SetMinRelayFee(300) + require.NoError(t, lnd.NotifyHeight(601)) + tx2 := <-lnd.TxPublishChannel + require.Equal(t, tx.TxOut[0].Value, tx2.TxOut[0].Value) + gotFeeRate = presignedHelper.getTxFeerate(tx2, inputAmt) + require.Equal(t, chainfee.SatPerKWeight(402), gotFeeRate) + require.Equal(t, uint32(601), tx2.LockTime) + + // Set a higher min_relay_fee, turn off the client and try presigned tx. + lnd.SetMinRelayFee(500) + presignedHelper.SetOutpointOnline(op1, false) + + // Check fee rate of the presigned tx broadcasted. + require.NoError(t, lnd.NotifyHeight(602)) + tx = <-lnd.TxPublishChannel + gotFeeRate = presignedHelper.getTxFeerate(tx, inputAmt) + require.Equal(t, chainfee.SatPerKWeight(523), gotFeeRate) + // LockTime of a presigned tx is 0. + require.Equal(t, uint32(0), tx.LockTime) + + // Now decrease min_relay_fee and make sure fee rate doesn't decrease. + // It should re-broadcast the same presigned tx. + lnd.SetMinRelayFee(450) + require.NoError(t, lnd.NotifyHeight(603)) + tx2 = <-lnd.TxPublishChannel + require.Equal(t, tx.TxHash(), tx2.TxHash()) + gotFeeRate = presignedHelper.getTxFeerate(tx2, inputAmt) + require.Equal(t, chainfee.SatPerKWeight(523), gotFeeRate) + // LockTime of a presigned tx is 0. + require.Equal(t, uint32(0), tx2.LockTime) + + // Even if the client is back online, fee rate doesn't decrease. + presignedHelper.SetOutpointOnline(op1, true) + require.NoError(t, lnd.NotifyHeight(604)) + tx3 := <-lnd.TxPublishChannel + require.Equal(t, tx2.TxOut[0].Value, tx3.TxOut[0].Value) + gotFeeRate = presignedHelper.getTxFeerate(tx3, inputAmt) + require.Equal(t, chainfee.SatPerKWeight(523), gotFeeRate) + require.Equal(t, uint32(604), tx3.LockTime) +} + // testPresigned_two_inputs_one_goes_offline tests presigned mode for the // following scenario: two online inputs are added, then one of them goes // offline, then feerate grows and a presigned transaction is used. @@ -511,8 +604,8 @@ func testPresigned_two_inputs_one_goes_offline(t *testing.T, setFeeRate := func(feeRate chainfee.SatPerKWeight) { currentFeeRate = feeRate } - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return currentFeeRate, nil } @@ -647,8 +740,8 @@ func testPresigned_first_publish_fails(t *testing.T, setFeeRate := func(feeRate chainfee.SatPerKWeight) { currentFeeRate = feeRate } - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return currentFeeRate, nil } @@ -770,8 +863,8 @@ func testPresigned_locktime(t *testing.T, setFeeRate := func(feeRate chainfee.SatPerKWeight) { currentFeeRate = feeRate } - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return currentFeeRate, nil } @@ -854,8 +947,8 @@ func testPresigned_presigned_group(t *testing.T, ctx, cancel := context.WithCancel(context.Background()) defer cancel() - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return chainfee.SatPerKWeight(10_000), nil } @@ -902,7 +995,7 @@ func testPresigned_presigned_group(t *testing.T, // An attempt to presign must fail. err = batcher.PresignSweepsGroup(ctx, group1, sweepTimeout, destAddr) - require.ErrorContains(t, err, "some inputs of tx are offline") + require.ErrorContains(t, err, "some outpoint is offline") // Enable both outpoints. presignedHelper.SetOutpointOnline(op2, true) @@ -1091,8 +1184,8 @@ func testPresigned_presigned_and_regular_sweeps(t *testing.T, store testStore, setFeeRate := func(feeRate chainfee.SatPerKWeight) { currentFeeRate = feeRate } - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return currentFeeRate, nil } @@ -1354,23 +1447,28 @@ func testPresigned_presigned_and_regular_sweeps(t *testing.T, store testStore, // to another online batch. In offline case they must are added to a new batch // having valid presigned transactions. func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, - store testStore, batcherStore testBatcherStore, online bool) { + batcherStore testBatcherStore, online bool) { defer test.Guard(t)() require.LessOrEqual(t, numConfirmedSwaps, numSwaps) - const sweepsPerSwap = 2 + const ( + sweepsPerSwap = 2 + feeRate = chainfee.SatPerKWeight(10_000) + swapAmount = 3_000_001 + ) + sweepAmounts := []btcutil.Amount{1_000_001, 2_000_000} lnd := test.NewMockLnd() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { - return chainfee.SatPerKWeight(10_000), nil + return feeRate, nil } presignedHelper := newMockPresignedHelper() @@ -1388,12 +1486,17 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, checkBatcherError(t, err) }() + swapHashes := make([]lntypes.Hash, numSwaps) + groups := make([][]Input, numSwaps) txs := make([]*wire.MsgTx, numSwaps) allOps := make([]wire.OutPoint, 0, numSwaps*sweepsPerSwap) + spendChans := make([]<-chan *SpendDetail, numSwaps) + confChans := make([]<-chan *chainntnfs.TxConfirmation, numSwaps) for i := range numSwaps { // Create a swap of sweepsPerSwap sweeps. swapHash := lntypes.Hash{byte(i + 1)} + swapHashes[i] = swapHash ops := make([]wire.OutPoint, sweepsPerSwap) group := make([]Input, sweepsPerSwap) for j := range sweepsPerSwap { @@ -1405,29 +1508,10 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, group[j] = Input{ Outpoint: ops[j], - Value: btcutil.Amount(1_000_000 * (j + 1)), + Value: sweepAmounts[j], } } - - // Create a swap in DB. - swap := &loopdb.LoopOutContract{ - SwapContract: loopdb.SwapContract{ - CltvExpiry: 111, - AmountRequested: 3_000_000, - ProtocolVersion: loopdb.ProtocolVersionMuSig2, - HtlcKeys: htlcKeys, - - // Make preimage unique to pass SQL constraints. - Preimage: lntypes.Preimage{byte(i + 1)}, - }, - - DestAddr: destAddr, - SwapInvoice: swapInvoice, - SweepConfTarget: 111, - } - err := store.CreateLoopOut(ctx, swapHash, swap) - require.NoError(t, err) - store.AssertLoopOutStored() + groups[i] = group // Enable all the sweeps. for _, op := range ops { @@ -1435,16 +1519,29 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, } // An attempt to presign must succeed. - err = batcher.PresignSweepsGroup( + err := batcher.PresignSweepsGroup( ctx, group, sweepTimeout, destAddr, ) require.NoError(t, err) + // Create a spending notification channel. + spendChan := make(chan *SpendDetail, 1) + spendChans[i] = spendChan + confChan := make(chan *chainntnfs.TxConfirmation, 1) + confChans[i] = confChan + notifier := &SpendNotifier{ + SpendChan: spendChan, + SpendErrChan: make(chan error, 1), + ConfChan: confChan, + ConfErrChan: make(chan error, 1), + QuitChan: make(chan bool, 1), + } + // Add the sweep, triggering the publish attempt. require.NoError(t, batcher.AddSweep(ctx, &SweepRequest{ SwapHash: swapHash, Inputs: group, - Notifier: &dummyNotifier, + Notifier: notifier, })) // For the first group it should register for the sweep's spend @@ -1482,31 +1579,11 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, }, } - // Create a swap in DB. - swap := &loopdb.LoopOutContract{ - SwapContract: loopdb.SwapContract{ - CltvExpiry: 111, - AmountRequested: amount, - ProtocolVersion: loopdb.ProtocolVersionMuSig2, - HtlcKeys: htlcKeys, - - // Make preimage unique to pass SQL constraints. - Preimage: lntypes.Preimage{1, 2, 3}, - }, - - DestAddr: destAddr, - SwapInvoice: swapInvoice, - SweepConfTarget: 111, - } - err := store.CreateLoopOut(ctx, swapHash, swap) - require.NoError(t, err) - store.AssertLoopOutStored() - // Enable the sweep. presignedHelper.SetOutpointOnline(opx, true) // An attempt to presign must succeed. - err = batcher.PresignSweepsGroup( + err := batcher.PresignSweepsGroup( ctx, group, sweepTimeout, destAddr, ) require.NoError(t, err) @@ -1543,6 +1620,34 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, SpendingHeight: int32(601 + numSwaps + 1), } lnd.SpendChannel <- spendDetail + + // Calculate the expected on-chain fee of the swap. + wantFee := make([]btcutil.Amount, numConfirmedSwaps) + for i := range numConfirmedSwaps { + batchAmount := swapAmount * btcutil.Amount(numConfirmedSwaps) + txFee := batchAmount - btcutil.Amount(tx.TxOut[0].Value) + numConfirmedSweeps := numConfirmedSwaps * sweepsPerSwap + feePerSweep := txFee / btcutil.Amount(numConfirmedSweeps) + roundingDiff := txFee - feePerSweep*btcutil.Amount( + numConfirmedSweeps, + ) + swapFee := feePerSweep * 2 + + // Add rounding difference to the first swap. + if i == 0 { + swapFee += roundingDiff + } + + wantFee[i] = swapFee + } + + // Make sure that notifiers of confirmed sweeps received notifications. + for i := range numConfirmedSwaps { + spend := <-spendChans[i] + require.Equal(t, txHash, spend.Tx.TxHash()) + require.Equal(t, wantFee[i], spend.OnChainFeePortion) + } + <-lnd.RegisterConfChannel require.NoError(t, lnd.NotifyHeight( int32(601+numSwaps+1+batchConfHeight), @@ -1554,12 +1659,18 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, // CleanupTransactions is called here. <-presignedHelper.cleanupCalled - // If all the swaps were confirmed, stop. - if numConfirmedSwaps == numSwaps { - return + // Increasing block height caused the second batch to re-publish. + if online && numConfirmedSwaps < numSwaps { + <-lnd.TxPublishChannel + } + + // Make sure that notifiers of confirmed sweeps received notifications. + for i := range numConfirmedSwaps { + conf := <-confChans[i] + require.Equal(t, txHash, conf.Tx.TxHash()) } - if !online { + if !online && numConfirmedSwaps != numSwaps { // If the sweeps are offline, the missing sweeps in the // confirmed transaction should be re-added to the batcher as // new batch. The groups are added incrementally, so we need @@ -1568,6 +1679,48 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, <-lnd.TxPublishChannel } + // Now make sure that a correct spend and conf contification is sent if + // AddSweep is called after confirming the sweeps. + for i := range numConfirmedSwaps { + // Create a spending notification channel. + spendChan := make(chan *SpendDetail, 1) + confChan := make(chan *chainntnfs.TxConfirmation) + notifier := &SpendNotifier{ + SpendChan: spendChan, + SpendErrChan: make(chan error, 1), + ConfChan: confChan, + ConfErrChan: make(chan error, 1), + QuitChan: make(chan bool, 1), + } + + // Add the sweep, triggering the publish attempt. + require.NoError(t, batcher.AddSweep(ctx, &SweepRequest{ + SwapHash: swapHashes[i], + Inputs: groups[i], + Notifier: notifier, + })) + + spendReg := <-lnd.RegisterSpendChannel + spendReg.SpendChannel <- spendDetail + + spend := <-spendChan + require.Equal(t, txHash, spend.Tx.TxHash()) + require.Equal(t, wantFee[i], spend.OnChainFeePortion) + + <-lnd.RegisterConfChannel + lnd.ConfChannel <- &chainntnfs.TxConfirmation{ + Tx: tx, + } + + conf := <-confChan + require.Equal(t, tx.TxHash(), conf.Tx.TxHash()) + } + + // If all the swaps were confirmed, stop. + if numConfirmedSwaps == numSwaps { + return + } + // Wait to new batch to appear and to have the expected size. wantSize := (numSwaps - numConfirmedSwaps) * sweepsPerSwap if online { @@ -1624,6 +1777,10 @@ func TestPresigned(t *testing.T) { testPresigned_input1_offline_then_input2(t, NewStoreMock()) }) + t.Run("min_relay_fee", func(t *testing.T) { + testPresigned_min_relay_fee(t, NewStoreMock()) + }) + t.Run("two_inputs_one_goes_offline", func(t *testing.T) { testPresigned_two_inputs_one_goes_offline(t, NewStoreMock()) }) @@ -1657,14 +1814,10 @@ func TestPresigned(t *testing.T) { } t.Run(name, func(t *testing.T) { - runTests(t, func(t *testing.T, store testStore, - batcherStore testBatcherStore) { - - testPresigned_purging( - t, numSwaps, numConfirmedSwaps, - store, batcherStore, online, - ) - }) + testPresigned_purging( + t, numSwaps, numConfirmedSwaps, + NewStoreMock(), online, + ) }) } @@ -1675,11 +1828,13 @@ func TestPresigned(t *testing.T) { testPurging(3, 1, false) testPurging(3, 2, false) testPurging(5, 2, false) + testPurging(5, 3, false) // Test cases in which the sweeps are online. testPurging(2, 1, true) testPurging(3, 1, true) testPurging(3, 2, true) testPurging(5, 2, true) + testPurging(5, 3, true) }) } diff --git a/sweepbatcher/sweep_batcher_test.go b/sweepbatcher/sweep_batcher_test.go index cd5bab01c..0ad5f64d8 100644 --- a/sweepbatcher/sweep_batcher_test.go +++ b/sweepbatcher/sweep_batcher_test.go @@ -408,8 +408,8 @@ func testFeeBumping(t *testing.T, store testStore, // Disable fee bumping, if requested. var opts []BatcherOption if noFeeBumping { - customFeeRate := func(ctx context.Context, - swapHash lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { // Always provide the same value, no bumping. return test.DefaultMockFee, nil @@ -762,9 +762,9 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer, testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams, batcherStore, sweepStore) + runErrChan := make(chan error) go func() { - err := batcher.Run(ctx) - checkBatcherError(t, err) + runErrChan <- batcher.Run(ctx) }() // Create a sweep request. @@ -772,13 +772,24 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, Hash: chainhash.Hash{1, 1}, Index: 1, } + const ( + inputValue = 111 + outputValue = 50 + fee = inputValue - outputValue + ) + spendErrChan := make(chan error, 1) + notifier := &SpendNotifier{ + SpendChan: make(chan *SpendDetail, 1), + SpendErrChan: spendErrChan, + QuitChan: make(chan bool, 1), + } sweepReq1 := SweepRequest{ SwapHash: lntypes.Hash{1, 1, 1}, Inputs: []Input{{ - Value: 111, + Value: inputValue, Outpoint: op1, }}, - Notifier: &dummyNotifier, + Notifier: notifier, } const initiationHeight = 550 @@ -786,7 +797,7 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, swap1 := &loopdb.LoopOutContract{ SwapContract: loopdb.SwapContract{ CltvExpiry: 111, - AmountRequested: 111, + AmountRequested: inputValue, ProtocolVersion: loopdb.ProtocolVersionMuSig2, HtlcKeys: htlcKeys, InitiationHeight: initiationHeight, @@ -806,33 +817,27 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, // When batch is successfully created it will execute it's first step, // which leads to a spend monitor of the primary sweep. - <-lnd.RegisterSpendChannel + spendReg := <-lnd.RegisterSpendChannel + + // Wait for tx to be published. + <-lnd.TxPublishChannel // Eventually request will be consumed and a new batch will spin up. + var primarySweepID wire.OutPoint require.Eventually(t, func() bool { - return batcher.numBatches(ctx) == 1 - }, test.Timeout, eventuallyCheckFrequency) + batch := tryGetOnlyBatch(ctx, batcher) + if batch == nil { + return false + } - // Find the batch and assign it to a local variable for easier access. - batch := &batch{} - for _, btch := range getBatches(ctx, batcher) { - btch.testRunInEventLoop(ctx, func() { - if btch.primarySweepID == op1 { - batch = btch - } - }) - } + primarySweepID = batch.snapshot(ctx).primarySweepID - require.Eventually(t, func() bool { // Batch should have the sweep stored. return batch.numSweeps(ctx) == 1 }, test.Timeout, eventuallyCheckFrequency) // The primary sweep id should be that of the first inserted sweep. - require.Equal(t, batch.primarySweepID, op1) - - // Wait for tx to be published. - <-lnd.TxPublishChannel + require.Equal(t, primarySweepID, op1) err = lnd.NotifyHeight(601) require.NoError(t, err) @@ -840,7 +845,11 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, // After receiving a height notification the batch will step again, // leading to a new spend monitoring. require.Eventually(t, func() bool { - batch := batch.snapshot(ctx) + batch := tryGetOnlyBatch(ctx, batcher) + if batch == nil { + return false + } + batch = batch.snapshot(ctx) return batch.currentHeight == 601 }, test.Timeout, eventuallyCheckFrequency) @@ -848,6 +857,60 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, // Wait for tx to be published. <-lnd.TxPublishChannel + // Emulate spend error. + testError := errors.New("test error") + spendReg.ErrChan <- testError + + // Make sure the caller of AddSweep got the spending error. + notifierErr := <-spendErrChan + require.Error(t, notifierErr) + require.ErrorIs(t, notifierErr, testError) + + // Wait for the batcher to crash because of the spending error. + runErr := <-runErrChan + require.ErrorIs(t, runErr, testError) + + // Now launch the batcher again. + batcher = NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer, + testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams, + batcherStore, sweepStore) + go func() { + runErrChan <- batcher.Run(ctx) + }() + + // When batch is successfully created it will execute it's first step, + // which leads to a spend monitor of the primary sweep. + <-lnd.RegisterSpendChannel + + // Wait for tx to be published. + <-lnd.TxPublishChannel + + // Deliver sweep request to batcher. + spendChan := make(chan *SpendDetail, 1) + confErrChan := make(chan error) + notifier = &SpendNotifier{ + SpendChan: spendChan, + SpendErrChan: make(chan error, 1), + ConfErrChan: confErrChan, + QuitChan: make(chan bool, 1), + } + sweepReq1.Notifier = notifier + require.NoError(t, batcher.AddSweep(ctx, &sweepReq1)) + + // Wait for the notifier to be installed. + require.Eventually(t, func() bool { + batch := tryGetOnlyBatch(ctx, batcher) + if batch == nil { + return false + } + batch = batch.snapshot(ctx) + + sweep := batch.sweeps[batch.primarySweepID] + + return sweep.notifier != nil && + sweep.notifier.SpendChan == spendChan + }, test.Timeout, eventuallyCheckFrequency) + // Create the spending tx that will trigger the spend monitor of the // batch. spendingTx := &wire.MsgTx{ @@ -861,6 +924,7 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, }, TxOut: []*wire.TxOut{ { + Value: outputValue, PkScript: []byte{3, 2, 1}, }, }, @@ -879,6 +943,11 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, // We notify the spend. lnd.SpendChannel <- spendDetail + // Make sure the notifier got a proper spending notification. + spending := <-spendChan + require.Equal(t, spendingTxHash, spending.Tx.TxHash()) + require.Equal(t, btcutil.Amount(fee), spending.OnChainFeePortion) + // After receiving the spend, the batch is now monitoring for confs. confReg := <-lnd.RegisterConfChannel @@ -889,7 +958,90 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, // The batch should eventually read the spend notification and progress // its state to closed. require.Eventually(t, func() bool { - batch := batch.snapshot(ctx) + batch := tryGetOnlyBatch(ctx, batcher) + if batch == nil { + return false + } + batch = batch.snapshot(ctx) + + return batch.state == Closed + }, test.Timeout, eventuallyCheckFrequency) + + // Emulate a confirmation error. + confReg.ErrChan <- testError + + // Make sure the notifier gets the confirmation error. + confErr := <-confErrChan + require.ErrorIs(t, confErr, testError) + + // Wait for the batcher to crash because of the confirmation error. + runErr = <-runErrChan + require.ErrorIs(t, runErr, testError) + + // Now launch the batcher again. + batcher = NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer, + testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams, + batcherStore, sweepStore) + go func() { + runErrChan <- batcher.Run(ctx) + }() + + // When batch is successfully created it will execute it's first step, + // which leads to a spend monitor of the primary sweep. + <-lnd.RegisterSpendChannel + + // Deliver sweep request to batcher. + spendChan = make(chan *SpendDetail, 1) + confChan := make(chan *chainntnfs.TxConfirmation) + notifier = &SpendNotifier{ + SpendChan: spendChan, + SpendErrChan: make(chan error, 1), + ConfChan: confChan, + QuitChan: make(chan bool, 1), + } + sweepReq1.Notifier = notifier + require.NoError(t, batcher.AddSweep(ctx, &sweepReq1)) + + // Wait for tx to be published. A closed batch is stored in DB as Open. + <-lnd.TxPublishChannel + + // Wait for the notifier to be installed. + require.Eventually(t, func() bool { + batch := tryGetOnlyBatch(ctx, batcher) + if batch == nil { + return false + } + batch = batch.snapshot(ctx) + + sweep := batch.sweeps[batch.primarySweepID] + + return sweep.notifier != nil && + sweep.notifier.SpendChan == spendChan + }, test.Timeout, eventuallyCheckFrequency) + + // We notify the spend. + lnd.SpendChannel <- spendDetail + + // Make sure the notifier got a proper spending notification. + spending = <-spendChan + require.Equal(t, spendingTxHash, spending.Tx.TxHash()) + require.Equal(t, btcutil.Amount(fee), spending.OnChainFeePortion) + + // After receiving the spend, the batch is now monitoring for confs. + confReg = <-lnd.RegisterConfChannel + + // Make sure the confirmation has proper height hint. It should pass + // the swap initiation height, not the current height. + require.Equal(t, int32(initiationHeight), confReg.HeightHint) + + // The batch should eventually read the spend notification and progress + // its state to closed. + require.Eventually(t, func() bool { + batch := tryGetOnlyBatch(ctx, batcher) + if batch == nil { + return false + } + batch = batch.snapshot(ctx) return batch.state == Closed }, test.Timeout, eventuallyCheckFrequency) @@ -899,14 +1051,123 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, // We mock the tx confirmation notification. lnd.ConfChannel <- &chainntnfs.TxConfirmation{ - Tx: spendingTx, + BlockHeight: 604, + Tx: spendingTx, } + // Make sure the notifier gets a confirmation notification. + conf := <-confChan + require.Equal(t, uint32(604), conf.BlockHeight) + require.Equal(t, spendingTx.TxHash(), conf.Tx.TxHash()) + // Eventually the batch receives the confirmation notification and // confirms itself. require.Eventually(t, func() bool { + batch := tryGetOnlyBatch(ctx, batcher) + if batch == nil { + return false + } + return batch.isComplete() }, test.Timeout, eventuallyCheckFrequency) + + // Now emulate adding the sweep again after it was fully confirmed. + // This triggers another code path (monitorSpendAndNotify). + spendChan = make(chan *SpendDetail, 1) + confChan = make(chan *chainntnfs.TxConfirmation) + notifier = &SpendNotifier{ + SpendChan: spendChan, + SpendErrChan: make(chan error, 1), + ConfChan: confChan, + QuitChan: make(chan bool, 1), + } + sweepReq1.Notifier = notifier + require.NoError(t, batcher.AddSweep(ctx, &sweepReq1)) + + // Expect a spending registration. + <-lnd.RegisterSpendChannel + + // We notify the spend. + lnd.SpendChannel <- spendDetail + + // Now expect the notifier to produce the spending details. + spending = <-spendChan + require.Equal(t, spendingTxHash, spending.Tx.TxHash()) + require.Equal(t, btcutil.Amount(fee), spending.OnChainFeePortion) + + // We mock the tx confirmation notification. + <-lnd.RegisterConfChannel + lnd.ConfChannel <- &chainntnfs.TxConfirmation{ + BlockHeight: 604, + Tx: spendingTx, + } + + // Make sure the notifier gets a confirmation notification. + conf = <-confChan + require.Equal(t, uint32(604), conf.BlockHeight) + require.Equal(t, spendingTx.TxHash(), conf.Tx.TxHash()) + + // Now check what happens in case of a spending error. + spendErrChan = make(chan error, 1) + notifier = &SpendNotifier{ + SpendChan: make(chan *SpendDetail, 1), + SpendErrChan: spendErrChan, + QuitChan: make(chan bool, 1), + } + sweepReq1.Notifier = notifier + require.NoError(t, batcher.AddSweep(ctx, &sweepReq1)) + + // Expect a spending registration. + spendReg = <-lnd.RegisterSpendChannel + + // Emulate spend error. + spendReg.ErrChan <- testError + + // Make sure the caller of AddSweep got the spending error. + notifierErr = <-spendErrChan + require.Error(t, notifierErr) + require.ErrorIs(t, notifierErr, testError) + + // Wait for the batcher to crash because of the spending error. + runErr = <-runErrChan + require.ErrorIs(t, runErr, testError) + + // Now launch the batcher again. + batcher = NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer, + testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams, + batcherStore, sweepStore) + go func() { + runErrChan <- batcher.Run(ctx) + }() + + // Now check what happens in case of a confirmation error. + confErrChan = make(chan error, 1) + notifier = &SpendNotifier{ + SpendChan: make(chan *SpendDetail, 1), + SpendErrChan: make(chan error, 1), + ConfErrChan: confErrChan, + QuitChan: make(chan bool, 1), + } + sweepReq1.Notifier = notifier + require.NoError(t, batcher.AddSweep(ctx, &sweepReq1)) + + // Expect a spending registration. + <-lnd.RegisterSpendChannel + + // We notify the spend. + lnd.SpendChannel <- spendDetail + + // We mock the tx confirmation error notification. + confReg = <-lnd.RegisterConfChannel + confReg.ErrChan <- testError + + // Make sure the notifier gets the confirmation error. + confErr = <-confErrChan + require.ErrorIs(t, confErr, testError) + + // Wait for the batcher to crash because of the confirmation error. + runErr = <-runErrChan + require.ErrorIs(t, runErr, testError) } // wrappedLogger implements btclog.Logger, recording last debug message format. @@ -2041,22 +2302,15 @@ func testSweepBatcherSweepReentry(t *testing.T, store testStore, return b.state == Closed }, test.Timeout, eventuallyCheckFrequency) - // Since second batch was created we check that it registered for its - // primary sweep's spend. - <-lnd.RegisterSpendChannel - - // While handling the spend notification the batch should detect that - // some sweeps did not appear in the spending tx, therefore it redirects - // them back to the batcher and the batcher inserts them in a new batch. - require.Eventually(t, func() bool { - return batcher.numBatches(ctx) == 2 - }, test.Timeout, eventuallyCheckFrequency) - // We mock the confirmation notification. lnd.ConfChannel <- &chainntnfs.TxConfirmation{ Tx: spendingTx, } + // Since second batch was created we check that it registered for its + // primary sweep's spend. + <-lnd.RegisterSpendChannel + // Wait for tx to be published. // Here is a race condition, which is unlikely to cause a crash: if we // wait for publish tx before sending a conf notification (previous @@ -3435,8 +3689,8 @@ func testSweepFetcher(t *testing.T, store testStore, require.NoError(t, err) store.AssertLoopOutStored() - customFeeRate := func(ctx context.Context, - swapHash lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { // Always provide the same value, no bumping. return feeRate, nil @@ -4282,8 +4536,8 @@ func testFeeRateGrows(t *testing.T, store testStore, swap2feeRate[swapHash] = rate } - customFeeRate := func(ctx context.Context, - swapHash lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, swapHash lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { swap2feeRateMu.Lock() defer swap2feeRateMu.Unlock() diff --git a/test/chainnotifier_mock.go b/test/chainnotifier_mock.go index 3889ce066..a4fae0e77 100644 --- a/test/chainnotifier_mock.go +++ b/test/chainnotifier_mock.go @@ -33,9 +33,11 @@ func (c *mockChainNotifier) RawClientWithMacAuth( // SpendRegistration contains registration details. type SpendRegistration struct { - Outpoint *wire.OutPoint - PkScript []byte - HeightHint int32 + Outpoint *wire.OutPoint + PkScript []byte + HeightHint int32 + SpendChannel chan<- *chainntnfs.SpendDetail + ErrChan chan<- error } // ConfRegistration contains registration details. @@ -45,18 +47,26 @@ type ConfRegistration struct { HeightHint int32 NumConfs int32 ConfChan chan *chainntnfs.TxConfirmation + ErrChan chan<- error } func (c *mockChainNotifier) RegisterSpendNtfn(ctx context.Context, outpoint *wire.OutPoint, pkScript []byte, heightHint int32) ( chan *chainntnfs.SpendDetail, chan error, error) { - c.lnd.RegisterSpendChannel <- &SpendRegistration{ - HeightHint: heightHint, - Outpoint: outpoint, - PkScript: pkScript, + spendChan0 := make(chan *chainntnfs.SpendDetail) + spendErrChan := make(chan error, 1) + + reg := &SpendRegistration{ + HeightHint: heightHint, + Outpoint: outpoint, + PkScript: pkScript, + SpendChannel: spendChan0, + ErrChan: spendErrChan, } + c.lnd.RegisterSpendChannel <- reg + spendChan := make(chan *chainntnfs.SpendDetail, 1) errChan := make(chan error, 1) @@ -70,6 +80,19 @@ func (c *mockChainNotifier) RegisterSpendNtfn(ctx context.Context, case spendChan <- m: case <-ctx.Done(): } + + case m := <-spendChan0: + select { + case spendChan <- m: + case <-ctx.Done(): + } + + case err := <-spendErrChan: + select { + case errChan <- err: + case <-ctx.Done(): + } + case <-ctx.Done(): } }() @@ -129,12 +152,15 @@ func (c *mockChainNotifier) RegisterConfirmationsNtfn(ctx context.Context, opts ...lndclient.NotifierOption) (chan *chainntnfs.TxConfirmation, chan error, error) { + confErrChan := make(chan error, 1) + reg := &ConfRegistration{ PkScript: pkScript, TxID: txid, HeightHint: heightHint, NumConfs: numConfs, ConfChan: make(chan *chainntnfs.TxConfirmation, 1), + ErrChan: confErrChan, } c.Lock() @@ -169,6 +195,13 @@ func (c *mockChainNotifier) RegisterConfirmationsNtfn(ctx context.Context, } } c.Unlock() + + case err := <-confErrChan: + select { + case errChan <- err: + case <-ctx.Done(): + } + case <-ctx.Done(): } }() diff --git a/test/context.go b/test/context.go index 5fae59357..943a1dee6 100644 --- a/test/context.go +++ b/test/context.go @@ -61,7 +61,7 @@ func (ctx *Context) NotifySpend(tx *wire.MsgTx, inputIndex uint32) { SpenderInputIndex: inputIndex, }: case <-time.After(Timeout): - ctx.T.Fatalf("htlc spend not consumed") + ctx.T.Fatalf("spend not consumed") } } @@ -74,7 +74,7 @@ func (ctx *Context) NotifyConf(tx *wire.MsgTx) { Tx: tx, }: case <-time.After(Timeout): - ctx.T.Fatalf("htlc spend not consumed") + ctx.T.Fatalf("confirmation not consumed") } } @@ -86,7 +86,7 @@ func (ctx *Context) AssertRegisterSpendNtfn(script []byte) { case spendIntent := <-ctx.Lnd.RegisterSpendChannel: require.Equal( ctx.T, script, spendIntent.PkScript, - "server not listening for published htlc script", + "server not listening for published script", ) case <-time.After(Timeout): @@ -134,7 +134,7 @@ func (ctx *Context) AssertRegisterConf(expectTxHash bool, confs int32) *ConfRegi require.Equal(ctx.T, confs, confIntent.NumConfs) case <-time.After(Timeout): - ctx.T.Fatalf("htlc confirmed not subscribed to") + ctx.T.Fatalf("tx confirmed not subscribed to") } return confIntent @@ -249,7 +249,7 @@ func (ctx *Context) GetOutputIndex(tx *wire.MsgTx, } } - ctx.T.Fatal("htlc not present in tx") + ctx.T.Fatal("the output not present in tx") return 0 }