Skip to content

Commit 270ecbc

Browse files
committed
sweepbatcher: notify caller about confirmations
Add fields ConfChan and ConfErrChan to SpendNotifier type which is a part of SweepRequest passed to AddSweep method. This is needed to reuse confirmation notifications on the calling side the same way it is done for spending notifications.
1 parent 007108c commit 270ecbc

File tree

3 files changed

+232
-6
lines changed

3 files changed

+232
-6
lines changed

sweepbatcher/sweep_batch.go

Lines changed: 73 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -881,8 +881,8 @@ func (b *batch) Run(ctx context.Context) error {
881881
return fmt.Errorf("handleSpend error: %w", err)
882882
}
883883

884-
case <-b.confChan:
885-
if err := b.handleConf(runCtx); err != nil {
884+
case conf := <-b.confChan:
885+
if err := b.handleConf(runCtx, conf); err != nil {
886886
return fmt.Errorf("handleConf error: %w", err)
887887
}
888888

@@ -1736,6 +1736,8 @@ func (b *batch) monitorConfirmations(ctx context.Context) error {
17361736
}
17371737

17381738
case err := <-errChan:
1739+
b.writeToConfErrChan(ctx, err)
1740+
17391741
b.writeToErrChan(fmt.Errorf("confirmations "+
17401742
"monitoring error: %w", err))
17411743

@@ -1908,11 +1910,40 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
19081910

19091911
// handleConf handles a confirmation notification. This is the final step of the
19101912
// batch. Here we signal to the batcher that this batch was completed.
1911-
func (b *batch) handleConf(ctx context.Context) error {
1913+
func (b *batch) handleConf(ctx context.Context,
1914+
conf *chainntnfs.TxConfirmation) error {
1915+
19121916
b.Infof("confirmed in txid %s", b.batchTxid)
19131917
b.state = Confirmed
19141918

1915-
return b.store.ConfirmBatch(ctx, b.id)
1919+
if err := b.store.ConfirmBatch(ctx, b.id); err != nil {
1920+
return fmt.Errorf("failed to store confirmed state: %w", err)
1921+
}
1922+
1923+
// Send the confirmation to all the notifiers.
1924+
for _, s := range b.sweeps {
1925+
// If the sweep's notifier is empty then this means that a swap
1926+
// is not waiting to read an update from it, so we can skip
1927+
// the notification part.
1928+
if s.notifier == nil || s.notifier.ConfChan == nil {
1929+
continue
1930+
}
1931+
1932+
select {
1933+
// Try to write the confirmation to the notification
1934+
// channel.
1935+
case s.notifier.ConfChan <- conf:
1936+
1937+
// If a quit signal was provided by the swap,
1938+
// continue.
1939+
case <-s.notifier.QuitChan:
1940+
1941+
// If the context was canceled, stop.
1942+
case <-ctx.Done():
1943+
}
1944+
}
1945+
1946+
return nil
19161947
}
19171948

19181949
// isComplete returns true if the batch is completed. This method is used by the
@@ -2053,6 +2084,44 @@ func (b *batch) writeToSpendErrChan(ctx context.Context, spendErr error) {
20532084
}
20542085
}
20552086

2087+
// writeToConfErrChan sends an error to confirmation error channels of all the
2088+
// sweeps.
2089+
func (b *batch) writeToConfErrChan(ctx context.Context, confErr error) {
2090+
done, err := b.scheduleNextCall()
2091+
if err != nil {
2092+
done()
2093+
2094+
return
2095+
}
2096+
notifiers := make([]*SpendNotifier, 0, len(b.sweeps))
2097+
for _, s := range b.sweeps {
2098+
// If the sweep's notifier is empty then this means that a swap
2099+
// is not waiting to read an update from it, so we can skip
2100+
// the notification part.
2101+
if s.notifier == nil || s.notifier.ConfErrChan == nil {
2102+
continue
2103+
}
2104+
2105+
notifiers = append(notifiers, s.notifier)
2106+
}
2107+
done()
2108+
2109+
for _, notifier := range notifiers {
2110+
select {
2111+
// Try to write the error to the notification
2112+
// channel.
2113+
case notifier.ConfErrChan <- confErr:
2114+
2115+
// If a quit signal was provided by the swap,
2116+
// continue.
2117+
case <-notifier.QuitChan:
2118+
2119+
// If the context was canceled, stop.
2120+
case <-ctx.Done():
2121+
}
2122+
}
2123+
}
2124+
20562125
func (b *batch) persistSweep(ctx context.Context, sweep sweep,
20572126
completed bool) error {
20582127

sweepbatcher/sweep_batcher.go

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/lightninglabs/loop/loopdb"
2121
"github.com/lightninglabs/loop/swap"
2222
"github.com/lightninglabs/loop/utils"
23+
"github.com/lightningnetwork/lnd/chainntnfs"
2324
"github.com/lightningnetwork/lnd/clock"
2425
"github.com/lightningnetwork/lnd/input"
2526
"github.com/lightningnetwork/lnd/lntypes"
@@ -230,6 +231,14 @@ type SpendNotifier struct {
230231
// SpendErrChan is a channel where spend errors are received.
231232
SpendErrChan chan<- error
232233

234+
// ConfChan is a channel where the confirmation details are received.
235+
// This channel is optional.
236+
ConfChan chan<- *chainntnfs.TxConfirmation
237+
238+
// ConfErrChan is a channel where confirmation errors are received.
239+
// This channel is optional.
240+
ConfErrChan chan<- error
241+
233242
// QuitChan is a channel that can be closed to stop the notifier.
234243
QuitChan <-chan bool
235244
}
@@ -937,7 +946,9 @@ func (b *Batcher) FetchUnconfirmedBatches(ctx context.Context) ([]*batch,
937946
}
938947

939948
// monitorSpendAndNotify monitors the spend of a specific outpoint and writes
940-
// the response back to the response channel.
949+
// the response back to the response channel. It is called if the batch is fully
950+
// confirmed and we just need to deliver the data back to the caller though
951+
// SpendNotifier.
941952
func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep,
942953
parentBatchID int32, notifier *SpendNotifier) error {
943954

@@ -995,6 +1006,15 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep,
9951006
select {
9961007
// Try to write the update to the notification channel.
9971008
case notifier.SpendChan <- spendDetail:
1009+
err := b.monitorConfAndNotify(
1010+
ctx, sweep, notifier, spendTx,
1011+
)
1012+
if err != nil {
1013+
b.writeToErrChan(
1014+
ctx, fmt.Errorf("monitor conf "+
1015+
"failed: %w", err),
1016+
)
1017+
}
9981018

9991019
// If a quit signal was provided by the swap, continue.
10001020
case <-notifier.QuitChan:
@@ -1038,6 +1058,78 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep,
10381058
return nil
10391059
}
10401060

1061+
// monitorConfAndNotify monitors the confirmation of a specific transaction and
1062+
// writes the response back to the response channel. It is called if the batch
1063+
// is fully confirmed and we just need to deliver the data back to the caller
1064+
// though SpendNotifier.
1065+
func (b *Batcher) monitorConfAndNotify(ctx context.Context, sweep *sweep,
1066+
notifier *SpendNotifier, spendTx *wire.MsgTx) error {
1067+
1068+
// If confirmation notifications were not requested, stop.
1069+
if notifier.ConfChan == nil && notifier.ConfErrChan == nil {
1070+
return nil
1071+
}
1072+
1073+
batchTxid := spendTx.TxHash()
1074+
1075+
if len(spendTx.TxOut) != 1 {
1076+
return fmt.Errorf("unexpected number of outputs in batch: %d, "+
1077+
"want %d", len(spendTx.TxOut), 1)
1078+
}
1079+
batchPkScript := spendTx.TxOut[0].PkScript
1080+
1081+
reorgChan := make(chan struct{})
1082+
1083+
confCtx, cancel := context.WithCancel(ctx)
1084+
1085+
confChan, errChan, err := b.chainNotifier.RegisterConfirmationsNtfn(
1086+
confCtx, &batchTxid, batchPkScript, batchConfHeight,
1087+
sweep.initiationHeight, lndclient.WithReOrgChan(reorgChan),
1088+
)
1089+
if err != nil {
1090+
cancel()
1091+
return err
1092+
}
1093+
1094+
b.wg.Add(1)
1095+
go func() {
1096+
defer cancel()
1097+
defer b.wg.Done()
1098+
1099+
select {
1100+
case conf := <-confChan:
1101+
if notifier.ConfChan != nil {
1102+
select {
1103+
case notifier.ConfChan <- conf:
1104+
case <-notifier.QuitChan:
1105+
case <-ctx.Done():
1106+
}
1107+
}
1108+
1109+
case err := <-errChan:
1110+
if notifier.ConfErrChan != nil {
1111+
select {
1112+
case notifier.ConfErrChan <- err:
1113+
case <-notifier.QuitChan:
1114+
case <-ctx.Done():
1115+
}
1116+
}
1117+
1118+
b.writeToErrChan(ctx, fmt.Errorf("confirmations "+
1119+
"monitoring error: %w", err))
1120+
1121+
case <-reorgChan:
1122+
// A re-org has been detected, but the batch is fully
1123+
// confirmed and this is unexpected. Crash the batcher.
1124+
b.writeToErrChan(ctx, fmt.Errorf("unexpected reorg"))
1125+
1126+
case <-ctx.Done():
1127+
}
1128+
}()
1129+
1130+
return nil
1131+
}
1132+
10411133
func (b *Batcher) writeToErrChan(ctx context.Context, err error) {
10421134
select {
10431135
case b.errChan <- err:

sweepbatcher/sweep_batcher_test.go

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -887,9 +887,11 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore,
887887

888888
// Deliver sweep request to batcher.
889889
spendChan := make(chan *SpendDetail, 1)
890+
confErrChan := make(chan error)
890891
notifier = &SpendNotifier{
891892
SpendChan: spendChan,
892893
SpendErrChan: make(chan error, 1),
894+
ConfErrChan: confErrChan,
893895
QuitChan: make(chan bool, 1),
894896
}
895897
sweepReq1.Notifier = notifier
@@ -968,6 +970,10 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore,
968970
// Emulate a confirmation error.
969971
confReg.ErrChan <- testError
970972

973+
// Make sure the notifier gets the confirmation error.
974+
confErr := <-confErrChan
975+
require.ErrorIs(t, confErr, testError)
976+
971977
// Wait for the batcher to crash because of the confirmation error.
972978
runErr = <-runErrChan
973979
require.ErrorIs(t, runErr, testError)
@@ -986,9 +992,11 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore,
986992

987993
// Deliver sweep request to batcher.
988994
spendChan = make(chan *SpendDetail, 1)
995+
confChan := make(chan *chainntnfs.TxConfirmation)
989996
notifier = &SpendNotifier{
990997
SpendChan: spendChan,
991998
SpendErrChan: make(chan error, 1),
999+
ConfChan: confChan,
9921000
QuitChan: make(chan bool, 1),
9931001
}
9941002
sweepReq1.Notifier = notifier
@@ -1043,9 +1051,15 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore,
10431051

10441052
// We mock the tx confirmation notification.
10451053
lnd.ConfChannel <- &chainntnfs.TxConfirmation{
1046-
Tx: spendingTx,
1054+
BlockHeight: 604,
1055+
Tx: spendingTx,
10471056
}
10481057

1058+
// Make sure the notifier gets a confirmation notification.
1059+
conf := <-confChan
1060+
require.Equal(t, uint32(604), conf.BlockHeight)
1061+
require.Equal(t, spendingTx.TxHash(), conf.Tx.TxHash())
1062+
10491063
// Eventually the batch receives the confirmation notification and
10501064
// confirms itself.
10511065
require.Eventually(t, func() bool {
@@ -1060,9 +1074,11 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore,
10601074
// Now emulate adding the sweep again after it was fully confirmed.
10611075
// This triggers another code path (monitorSpendAndNotify).
10621076
spendChan = make(chan *SpendDetail, 1)
1077+
confChan = make(chan *chainntnfs.TxConfirmation)
10631078
notifier = &SpendNotifier{
10641079
SpendChan: spendChan,
10651080
SpendErrChan: make(chan error, 1),
1081+
ConfChan: confChan,
10661082
QuitChan: make(chan bool, 1),
10671083
}
10681084
sweepReq1.Notifier = notifier
@@ -1079,6 +1095,18 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore,
10791095
require.Equal(t, spendingTxHash, spending.Tx.TxHash())
10801096
require.Equal(t, btcutil.Amount(fee), spending.OnChainFeePortion)
10811097

1098+
// We mock the tx confirmation notification.
1099+
<-lnd.RegisterConfChannel
1100+
lnd.ConfChannel <- &chainntnfs.TxConfirmation{
1101+
BlockHeight: 604,
1102+
Tx: spendingTx,
1103+
}
1104+
1105+
// Make sure the notifier gets a confirmation notification.
1106+
conf = <-confChan
1107+
require.Equal(t, uint32(604), conf.BlockHeight)
1108+
require.Equal(t, spendingTx.TxHash(), conf.Tx.TxHash())
1109+
10821110
// Now check what happens in case of a spending error.
10831111
spendErrChan = make(chan error, 1)
10841112
notifier = &SpendNotifier{
@@ -1103,6 +1131,43 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore,
11031131
// Wait for the batcher to crash because of the spending error.
11041132
runErr = <-runErrChan
11051133
require.ErrorIs(t, runErr, testError)
1134+
1135+
// Now launch the batcher again.
1136+
batcher = NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
1137+
testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams,
1138+
batcherStore, sweepStore)
1139+
go func() {
1140+
runErrChan <- batcher.Run(ctx)
1141+
}()
1142+
1143+
// Now check what happens in case of a confirmation error.
1144+
confErrChan = make(chan error, 1)
1145+
notifier = &SpendNotifier{
1146+
SpendChan: make(chan *SpendDetail, 1),
1147+
SpendErrChan: make(chan error, 1),
1148+
ConfErrChan: confErrChan,
1149+
QuitChan: make(chan bool, 1),
1150+
}
1151+
sweepReq1.Notifier = notifier
1152+
require.NoError(t, batcher.AddSweep(&sweepReq1))
1153+
1154+
// Expect a spending registration.
1155+
<-lnd.RegisterSpendChannel
1156+
1157+
// We notify the spend.
1158+
lnd.SpendChannel <- spendDetail
1159+
1160+
// We mock the tx confirmation error notification.
1161+
confReg = <-lnd.RegisterConfChannel
1162+
confReg.ErrChan <- testError
1163+
1164+
// Make sure the notifier gets the confirmation error.
1165+
confErr = <-confErrChan
1166+
require.ErrorIs(t, confErr, testError)
1167+
1168+
// Wait for the batcher to crash because of the confirmation error.
1169+
runErr = <-runErrChan
1170+
require.ErrorIs(t, runErr, testError)
11061171
}
11071172

11081173
// wrappedLogger implements btclog.Logger, recording last debug message format.

0 commit comments

Comments
 (0)