Skip to content

Commit 6498676

Browse files
authored
🔀 Merge pull request #144 from perun-network/143-settle-implicit-register
Fixes #143 settle implicit register
2 parents 24e9f01 + 90b99ee commit 6498676

File tree

5 files changed

+84
-20
lines changed

5 files changed

+84
-20
lines changed

client/adjudicate.go

Lines changed: 71 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/pkg/errors"
2121

2222
"perun.network/go-perun/channel"
23+
"perun.network/go-perun/log"
2324
"perun.network/go-perun/pkg/sync"
2425
"perun.network/go-perun/wire"
2526
)
@@ -75,7 +76,7 @@ func (c *Channel) Watch(h AdjudicatorEventHandler) error {
7576

7677
// If local version greater than backend version, register local state.
7778
if e.Version() < c.State().Version {
78-
if err := c.Register(ctx); err != nil {
79+
if err := c.registerDispute(ctx); err != nil {
7980
return errors.WithMessage(err, "registering")
8081
}
8182
}
@@ -90,17 +91,17 @@ func (c *Channel) Watch(h AdjudicatorEventHandler) error {
9091
return errors.WithMessage(err, "subscription closed")
9192
}
9293

93-
// Register registers the channel and all its relatives on the adjudicator.
94+
// registerDispute registers a dispute for the channel and all its relatives.
9495
//
9596
// Returns TxTimedoutError when the program times out waiting for a transaction
9697
// to be mined.
9798
// Returns ChainNotReachableError if the connection to the blockchain network
9899
// fails when sending a transaction to / reading from the blockchain.
99-
func (c *Channel) Register(ctx context.Context) error {
100+
func (c *Channel) registerDispute(ctx context.Context) error {
100101
// If this is not the root, go up one level.
101102
// Once we are at the root, we register the whole channel tree together.
102103
if c.parent != nil {
103-
return c.parent.Register(ctx)
104+
return c.parent.registerDispute(ctx)
104105
}
105106

106107
// Lock machines of channel and all subchannels recursively.
@@ -133,13 +134,18 @@ func (c *Channel) Register(ctx context.Context) error {
133134
return nil
134135
}
135136

136-
// ProgressBy progresses the channel state in the adjudicator backend.
137+
// ForceUpdate enforces a channel update on the adjudicator.
137138
//
138139
// Returns TxTimedoutError when the program times out waiting for a transaction
139140
// to be mined.
140141
// Returns ChainNotReachableError if the connection to the blockchain network
141142
// fails when sending a transaction to / reading from the blockchain.
142-
func (c *Channel) ProgressBy(ctx context.Context, update func(*channel.State)) error {
143+
func (c *Channel) ForceUpdate(ctx context.Context, update func(*channel.State)) error {
144+
err := c.ensureRegistered(ctx)
145+
if err != nil {
146+
return err
147+
}
148+
143149
// Lock machine
144150
if !c.machMtx.TryLockCtx(ctx) {
145151
return errors.Errorf("locking machine mutex in time: %v", ctx.Err())
@@ -182,6 +188,13 @@ func (c *Channel) ProgressBy(ctx context.Context, update func(*channel.State)) e
182188
// Returns ChainNotReachableError if the connection to the blockchain network
183189
// fails when sending a transaction to / reading from the blockchain.
184190
func (c *Channel) Settle(ctx context.Context, secondary bool) (err error) {
191+
if !c.State().IsFinal {
192+
err := c.ensureRegistered(ctx)
193+
if err != nil {
194+
return err
195+
}
196+
}
197+
185198
// Lock machines of channel and all subchannels recursively.
186199
l, err := c.tryLockRecursive(ctx)
187200
defer l.Unlock()
@@ -396,3 +409,55 @@ func (c *Channel) subChannelStateMap() (states channel.StateMap, err error) {
396409
})
397410
return
398411
}
412+
413+
// ensureRegistered ensures that the channel is registered.
414+
func (c *Channel) ensureRegistered(ctx context.Context) error {
415+
phase := c.Phase()
416+
if phase == channel.Registered || phase == channel.Progressed {
417+
return nil
418+
}
419+
420+
registeredEvents := make(chan *channel.RegisteredEvent)
421+
422+
// Start event subscription.
423+
sub, err := c.adjudicator.Subscribe(ctx, c.Params())
424+
if err != nil {
425+
return errors.WithMessage(err, "subscribing to adjudicator events")
426+
}
427+
go waitForRegisteredEvent(sub, registeredEvents)
428+
429+
// Register.
430+
err = c.registerDispute(ctx)
431+
if err != nil {
432+
return errors.WithMessage(err, "registering")
433+
}
434+
435+
select {
436+
case e := <-registeredEvents:
437+
err = e.Timeout().Wait(ctx)
438+
if err != nil {
439+
return err
440+
}
441+
case <-ctx.Done():
442+
return ctx.Err()
443+
}
444+
return nil
445+
}
446+
447+
// waitForRegisteredEvent waits until a RegisteredEvent has been observed.
448+
func waitForRegisteredEvent(sub channel.AdjudicatorSubscription, events chan<- *channel.RegisteredEvent) {
449+
defer func() {
450+
err := sub.Close()
451+
if err != nil {
452+
log.Warn("Subscription closed with error:", err)
453+
}
454+
}()
455+
456+
// Scan for registered event.
457+
for e := sub.Next(); e != nil; e = sub.Next() {
458+
if e, ok := e.(*channel.RegisteredEvent); ok {
459+
events <- e
460+
return
461+
}
462+
}
463+
}

client/test/channel.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -184,10 +184,6 @@ func (ch *paymentChannel) settleImpl(secondary bool) {
184184
ctx, cancel := context.WithTimeout(context.Background(), ch.r.timeout)
185185
defer cancel()
186186

187-
if ch.IsLedgerChannel() || !ch.State().IsFinal {
188-
assert.NoError(ch.Register(ctx))
189-
}
190-
191187
assert.NoError(ch.Settle(ctx, secondary))
192188
ch.assertBals(ch.State())
193189

client/test/progression.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -91,14 +91,8 @@ func (r *Paul) exec(_cfg ExecConfig, ch *paymentChannel) {
9191

9292
r.waitStage() // wait for setup complete
9393

94-
// register
95-
assert.NoError(ch.Register(ctx), "registering")
96-
regEvent := <-r.registered
97-
98-
assert.NoError(regEvent.Timeout().Wait(ctx), "waiting for refutation timeout")
99-
10094
// progress
101-
assert.NoError(ch.ProgressBy(ctx, func(s *channel.State) {
95+
assert.NoError(ch.ForceUpdate(ctx, func(s *channel.State) {
10296
bal := func(user channel.Index) int64 {
10397
return s.Balances[assetIdx][user].Int64()
10498
}
@@ -168,7 +162,7 @@ func (r *Paula) exec(_cfg ExecConfig, ch *paymentChannel, _ *acceptNextPropHandl
168162
r.t.Logf("%v received progression confirmation 1", r.setup.Name)
169163

170164
// we progress
171-
assert.NoError(ch.ProgressBy(ctx, func(s *channel.State) {
165+
assert.NoError(ch.ForceUpdate(ctx, func(s *channel.State) {
172166
bal := func(user channel.Index) int64 {
173167
return s.Balances[assetIdx][user].Int64()
174168
}

client/testchannel.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@
1414

1515
package client
1616

17-
import "perun.network/go-perun/channel"
17+
import (
18+
"context"
19+
20+
"perun.network/go-perun/channel"
21+
)
1822

1923
// TestChannel grants access to the `AdjudicatorRequest` which is otherwise
2024
// hidden by `Channel`. Behaves like a `Channel` in all other cases.
@@ -33,3 +37,8 @@ func NewTestChannel(c *Channel) *TestChannel {
3337
func (c *TestChannel) AdjudicatorReq() channel.AdjudicatorReq {
3438
return c.machine.AdjudicatorReq()
3539
}
40+
41+
// Register exposes dispute registration.
42+
func (c *TestChannel) Register(ctx context.Context) error {
43+
return c.Channel.registerDispute(ctx)
44+
}

client/virtual_channel_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func TestVirtualChannelsDispute(t *testing.T) {
8080
chs := []*client.Channel{vct.chAliceIngrid, vct.chIngridAlice, vct.chBobIngrid, vct.chIngridBob}
8181
// Register the channels in a random order.
8282
for _, i := range rand.Perm(len(chs)) {
83-
err := chs[i].Register(ctx)
83+
err := client.NewTestChannel(chs[i]).Register(ctx)
8484
assert.NoErrorf(err, "register channel: %d", i)
8585
}
8686

0 commit comments

Comments
 (0)