Skip to content

Commit 0335103

Browse files
committed
svs: return instance state as binary
1 parent 7b84d03 commit 0335103

File tree

7 files changed

+69
-54
lines changed

7 files changed

+69
-54
lines changed

dv/dv/router.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,8 @@ func (dv *Router) createPrefixTable() {
343343
dv.pfxSubs = make(map[uint64]enc.Name)
344344

345345
// SVS delivery agent
346-
dv.pfxSvs = ndn_sync.NewSvsALO(ndn_sync.SvsAloOpts{
346+
var err error
347+
dv.pfxSvs, err = ndn_sync.NewSvsALO(ndn_sync.SvsAloOpts{
347348
Name: dv.config.RouterDataPrefix(),
348349
Svs: ndn_sync.SvSyncOpts{
349350
Client: dv.client,
@@ -358,6 +359,9 @@ func (dv *Router) createPrefixTable() {
358359
Threshold: PrefixSnapThreshold,
359360
},
360361
})
362+
if err != nil {
363+
panic(err)
364+
}
361365

362366
// Local prefix table
363367
dv.pfx = table.NewPrefixTable(dv.config, func(w enc.Wire) {

std/examples/svs/alo-history/main.go

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/named-data/ndnd/std/engine"
1111
"github.com/named-data/ndnd/std/log"
1212
"github.com/named-data/ndnd/std/ndn"
13-
spec_svs "github.com/named-data/ndnd/std/ndn/svs/v3"
1413
"github.com/named-data/ndnd/std/ndn/svs_ps"
1514
"github.com/named-data/ndnd/std/object"
1615
ndn_sync "github.com/named-data/ndnd/std/sync"
@@ -74,7 +73,7 @@ func main() {
7473
defer client.Stop()
7574

7675
// Create a new SVS ALO instance
77-
svsalo = ndn_sync.NewSvsALO(ndn_sync.SvsAloOpts{
76+
svsalo, err = ndn_sync.NewSvsALO(ndn_sync.SvsAloOpts{
7877
// Name is the name of the node
7978
Name: name,
8079

@@ -95,6 +94,9 @@ func main() {
9594
Threshold: 10,
9695
},
9796
})
97+
if err != nil {
98+
panic(err)
99+
}
98100

99101
// OnError gets called when we get an error from the SVS ALO instance.
100102
svsalo.SetOnError(func(err error) {
@@ -190,30 +192,21 @@ func publish(content []byte) {
190192
commitState(state)
191193
}
192194

193-
func commitState(state *spec_svs.InstanceState) {
195+
func commitState(state enc.Wire) {
194196
// Once a publication is processed, ideally the application should persist
195197
// it's own state and the state of the Sync group *atomically*.
196198
//
197199
// Applications can use their own data structures to store the state.
198-
// In this example, we use the object store to persist the state.
199-
store.Put(group, 0, state.Encode().Join())
200+
// In this example, we use the NDN object store to persist the state.
201+
store.Put(group, 0, state.Join())
200202
}
201203

202-
func readState() *spec_svs.InstanceState {
204+
func readState() enc.Wire {
203205
// Read the state from the object store
204206
// See commitState for more information
205207
stateWire, err := store.Get(group, false)
206208
if err != nil {
207-
log.Error(nil, "Unable to get state from object store", "err", err)
208-
os.Exit(1)
209-
}
210-
if stateWire != nil {
211-
state, err := spec_svs.ParseInstanceState(enc.NewBufferView(stateWire), true)
212-
if err != nil {
213-
log.Error(nil, "Unable to parse state from object store", "err", err)
214-
os.Exit(1)
215-
}
216-
return state
209+
panic("unable to get state (store is broken)")
217210
}
218-
return nil
211+
return enc.Wire{stateWire}
219212
}

std/examples/svs/alo-latest/main.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func main() {
7171
msgSize := 0
7272

7373
// Create a new SVS ALO instance
74-
svsalo := ndn_sync.NewSvsALO(ndn_sync.SvsAloOpts{
74+
svsalo, err := ndn_sync.NewSvsALO(ndn_sync.SvsAloOpts{
7575
// Name is the name of the node
7676
Name: name,
7777

@@ -102,6 +102,9 @@ func main() {
102102
Threshold: 10,
103103
},
104104
})
105+
if err != nil {
106+
panic(err)
107+
}
105108

106109
// Set of publishers that we are subscribed to.
107110
// The value is the number of errors received for the publisher.

std/sync/svs_alo.go

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,15 @@ type SvsAloOpts struct {
4949
// Snapshot is the snapshot strategy.
5050
Snapshot Snapshot
5151
// InitialState is the initial state of the instance.
52-
InitialState *spec_svs.InstanceState
52+
InitialState enc.Wire
5353

5454
// MaxPipelineSize is the number of objects to fetch
5555
// concurrently for a single publisher (default 10)
5656
MaxPipelineSize uint64
5757
}
5858

5959
// NewSvsALO creates a new SvsALO instance.
60-
func NewSvsALO(opts SvsAloOpts) *SvsALO {
60+
func NewSvsALO(opts SvsAloOpts) (*SvsALO, error) {
6161
if len(opts.Name) == 0 {
6262
panic("Name is required")
6363
}
@@ -89,21 +89,8 @@ func NewSvsALO(opts SvsAloOpts) *SvsALO {
8989
// Read initial state if provided.
9090
s.opts.Svs.OnUpdate = s.onSvsUpdate
9191
if s.opts.InitialState != nil {
92-
if !s.opts.InitialState.Name.Equal(s.opts.Name) {
93-
panic("Name mismatch in provided initial state")
94-
}
95-
s.opts.Svs.BootTime = s.opts.InitialState.BootstrapTime
96-
s.opts.Svs.InitialState = s.opts.InitialState.StateVector
97-
98-
for _, entry := range s.opts.InitialState.StateVector.Entries {
99-
hash := entry.Name.TlvStr()
100-
for _, seqEntry := range entry.SeqNoEntries {
101-
s.state.Set(hash, seqEntry.BootstrapTime, svsDataState{
102-
Known: seqEntry.SeqNo,
103-
Latest: seqEntry.SeqNo,
104-
Pending: seqEntry.SeqNo,
105-
})
106-
}
92+
if err := s.parseInstanceState(s.opts.InitialState); err != nil {
93+
return nil, err
10794
}
10895
}
10996

@@ -131,7 +118,7 @@ func NewSvsALO(opts SvsAloOpts) *SvsALO {
131118
})
132119
}
133120

134-
return s
121+
return s, nil
135122
}
136123

137124
// String is the log identifier.
@@ -194,7 +181,7 @@ func (s *SvsALO) SetOnPublisher(callback func(enc.Name)) {
194181
}
195182

196183
// Publish sends a message to the group
197-
func (s *SvsALO) Publish(content enc.Wire) (enc.Name, *spec_svs.InstanceState, error) {
184+
func (s *SvsALO) Publish(content enc.Wire) (name enc.Name, state enc.Wire, err error) {
198185
s.mutex.Lock()
199186
defer s.mutex.Unlock()
200187

std/sync/svs_alo_data.go

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package sync
22

33
import (
4+
"fmt"
45
"slices"
56
"time"
67

@@ -34,7 +35,7 @@ func (s *SvsALO) objectName(node enc.Name, boot uint64, seq uint64) enc.Name {
3435
WithVersion(enc.VersionImmutable)
3536
}
3637

37-
func (s *SvsALO) produceObject(content enc.Wire) (enc.Name, *spec_svs.InstanceState, error) {
38+
func (s *SvsALO) produceObject(content enc.Wire) (enc.Name, enc.Wire, error) {
3839
// This instance owns the underlying SVS instance.
3940
// So we can be sure that the sequence number does not
4041
// change while we hold the lock on this instance.
@@ -231,23 +232,51 @@ func (s *SvsALO) queuePub(pub SvsPub) {
231232
s.outpipe <- pub
232233
}
233234

234-
// instanceState returns the current state of the instance.
235-
func (s *SvsALO) instanceState() *spec_svs.InstanceState {
236-
stateVector := s.state.Encode(func(state svsDataState) uint64 {
237-
return state.Known
238-
})
235+
// queueError queues an error to the application.
236+
func (s *SvsALO) queueError(err error) {
237+
select {
238+
case s.errpipe <- err:
239+
default:
240+
}
241+
}
239242

240-
return &spec_svs.InstanceState{
243+
// instanceState returns the current state of the instance.
244+
func (s *SvsALO) instanceState() enc.Wire {
245+
state := spec_svs.InstanceState{
241246
Name: s.opts.Name,
242247
BootstrapTime: s.BootTime(),
243-
StateVector: stateVector,
248+
StateVector: s.state.Encode(func(state svsDataState) uint64 {
249+
return state.Known
250+
}),
244251
}
252+
return state.Encode()
245253
}
246254

247-
// queueError queues an error to the application.
248-
func (s *SvsALO) queueError(err error) {
249-
select {
250-
case s.errpipe <- err:
251-
default:
255+
// parseInstanceState parses an instance state into the current state.
256+
// Only the constructor should call this function.
257+
func (s *SvsALO) parseInstanceState(wire enc.Wire) error {
258+
initState, err := spec_svs.ParseInstanceState(enc.NewWireView(wire), true)
259+
if err != nil {
260+
return err
261+
}
262+
263+
if !initState.Name.Equal(s.opts.Name) {
264+
return fmt.Errorf("initial state name mismatch: %v != %v", initState.Name, s.opts.Name)
252265
}
266+
267+
s.opts.Svs.BootTime = initState.BootstrapTime
268+
s.opts.Svs.InitialState = initState.StateVector
269+
270+
for _, entry := range initState.StateVector.Entries {
271+
hash := entry.Name.TlvStr()
272+
for _, seqEntry := range entry.SeqNoEntries {
273+
s.state.Set(hash, seqEntry.BootstrapTime, svsDataState{
274+
Known: seqEntry.SeqNo,
275+
Latest: seqEntry.SeqNo,
276+
Pending: seqEntry.SeqNo,
277+
})
278+
}
279+
}
280+
281+
return nil
253282
}

std/sync/svs_pub.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package sync
22

33
import (
44
enc "github.com/named-data/ndnd/std/encoding"
5-
spec_svs "github.com/named-data/ndnd/std/ndn/svs/v3"
65
)
76

87
// SvsPub is the generic received data publication from SVS.
@@ -20,7 +19,7 @@ type SvsPub struct {
2019
// IsSnapshot is true if this is a snapshot.
2120
IsSnapshot bool
2221
// State is the state after this publication is applied.
23-
State *spec_svs.InstanceState
22+
State enc.Wire
2423

2524
// subcribers is the list of subscribers.
2625
subcribers []func(SvsPub)

std/utils/utils.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func ConvertNonce(nonce []byte) (ret optional.Optional[uint32]) {
3939
return ret
4040
}
4141

42-
// If is the ternary operator
42+
// If is the ternary operator (eager evaluation)
4343
func If[T any](cond bool, t, f T) T {
4444
if cond {
4545
return t

0 commit comments

Comments
 (0)