Skip to content

Commit 128b389

Browse files
committed
introduce ignoreValidity in SVS related logic
1 parent 35530e2 commit 128b389

File tree

8 files changed

+200
-143
lines changed

8 files changed

+200
-143
lines changed

std/ndn/client.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ type ConsumeExtArgs struct {
125125
OnProgress func(status ConsumeState)
126126
// NoMetadata disables fetching RDR metadata (advanced usage).
127127
NoMetadata bool
128+
// IgnoreValidity ignores validity period in the validation chain
129+
IgnoreValidity optional.Optional[bool]
128130
}
129131

130132
// ExpressRArgs are the arguments for the express retry API.

std/object/client_consume.go

Lines changed: 56 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -63,29 +63,31 @@ func (c *Client) consumeObject(state *ConsumeState) {
6363
// if metadata fetching is disabled, just attempt to fetch one segment
6464
// with the prefix, then get the versioned name from the segment.
6565
if state.args.NoMetadata {
66-
c.fetchDataByPrefix(name, state.args.TryStore, func(data ndn.Data, err error) {
67-
if err != nil {
68-
state.finalizeError(err)
69-
return
70-
}
71-
meta, err := extractSegMetadata(data)
66+
c.fetchDataByPrefix(name, state.args.TryStore, state.args.IgnoreValidity.GetOr(false),
67+
func(data ndn.Data, err error) {
68+
if err != nil {
69+
state.finalizeError(err)
70+
return
71+
}
72+
meta, err := extractSegMetadata(data)
73+
if err != nil {
74+
state.finalizeError(err)
75+
return
76+
}
77+
c.consumeObjectWithMeta(state, meta)
78+
})
79+
return
80+
}
81+
82+
// fetch RDR metadata for this object
83+
c.fetchMetadata(name, state.args.TryStore, state.args.IgnoreValidity.GetOr(false),
84+
func(meta *rdr.MetaData, err error) {
7285
if err != nil {
7386
state.finalizeError(err)
7487
return
7588
}
7689
c.consumeObjectWithMeta(state, meta)
7790
})
78-
return
79-
}
80-
81-
// fetch RDR metadata for this object
82-
c.fetchMetadata(name, state.args.TryStore, func(meta *rdr.MetaData, err error) {
83-
if err != nil {
84-
state.finalizeError(err)
85-
return
86-
}
87-
c.consumeObjectWithMeta(state, meta)
88-
})
8991
return
9092
}
9193

@@ -104,6 +106,7 @@ func (c *Client) consumeObjectWithMeta(state *ConsumeState, meta *rdr.MetaData)
104106
func (c *Client) fetchMetadata(
105107
name enc.Name,
106108
tryStore bool,
109+
ignoreValidity bool,
107110
callback func(meta *rdr.MetaData, err error),
108111
) {
109112
log.Debug(c, "Fetching object metadata", "name", name)
@@ -126,25 +129,29 @@ func (c *Client) fetchMetadata(
126129
callback(nil, fmt.Errorf("%w: fetch metadata failed with result: %s", ndn.ErrNetwork, args.Result))
127130
return
128131
}
129-
130-
c.Validate(args.Data, args.SigCovered, func(valid bool, err error) {
131-
// validate with trust config
132-
if !valid {
133-
callback(nil, fmt.Errorf("%w: validate metadata failed: %w", ndn.ErrSecurity, err))
134-
return
135-
}
136-
137-
// parse metadata
138-
metadata, err := rdr.ParseMetaData(enc.NewWireView(args.Data.Content()), false)
139-
if err != nil {
140-
callback(nil, fmt.Errorf("%w: failed to parse object metadata: %w", ndn.ErrProtocol, err))
141-
return
142-
}
143-
144-
// clone fields for lifetime
145-
metadata.Name = metadata.Name.Clone()
146-
metadata.FinalBlockID = slices.Clone(metadata.FinalBlockID)
147-
callback(metadata, nil)
132+
c.ValidateExt(ndn.ValidateExtArgs{
133+
Data: args.Data,
134+
SigCovered: args.SigCovered,
135+
IgnoreValidity: optional.Some(ignoreValidity),
136+
Callback: func(valid bool, err error) {
137+
// validate with trust config
138+
if !valid {
139+
callback(nil, fmt.Errorf("%w: validate metadata failed: %w", ndn.ErrSecurity, err))
140+
return
141+
}
142+
143+
// parse metadata
144+
metadata, err := rdr.ParseMetaData(enc.NewWireView(args.Data.Content()), false)
145+
if err != nil {
146+
callback(nil, fmt.Errorf("%w: failed to parse object metadata: %w", ndn.ErrProtocol, err))
147+
return
148+
}
149+
150+
// clone fields for lifetime
151+
metadata.Name = metadata.Name.Clone()
152+
metadata.FinalBlockID = slices.Clone(metadata.FinalBlockID)
153+
callback(metadata, nil)
154+
},
148155
})
149156
},
150157
})
@@ -154,6 +161,7 @@ func (c *Client) fetchMetadata(
154161
func (c *Client) fetchDataByPrefix(
155162
name enc.Name,
156163
tryStore bool,
164+
ignoreValidity bool,
157165
callback func(data ndn.Data, err error),
158166
) {
159167
log.Debug(c, "Fetching data with prefix", "name", name)
@@ -176,14 +184,18 @@ func (c *Client) fetchDataByPrefix(
176184
callback(nil, fmt.Errorf("%w: fetch by prefix failed with result: %s", ndn.ErrNetwork, args.Result))
177185
return
178186
}
179-
180-
c.Validate(args.Data, args.SigCovered, func(valid bool, err error) {
181-
if !valid {
182-
callback(nil, fmt.Errorf("%w: validate by prefix failed: %w", ndn.ErrSecurity, err))
183-
return
184-
}
185-
186-
callback(args.Data, nil)
187+
c.ValidateExt(ndn.ValidateExtArgs{
188+
Data: args.Data,
189+
SigCovered: args.SigCovered,
190+
IgnoreValidity: optional.Some(ignoreValidity),
191+
Callback: func(valid bool, err error) {
192+
if !valid {
193+
callback(nil, fmt.Errorf("%w: validate by prefix failed: %w", ndn.ErrSecurity, err))
194+
return
195+
}
196+
197+
callback(args.Data, nil)
198+
},
187199
})
188200
},
189201
})

std/object/client_consume_seg.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -283,12 +283,17 @@ func (s *rrSegFetcher) handleResult(args ndn.ExpressCallbackArgs, state *Consume
283283
// It is necessary that this function be called only from one goroutine - the engine.
284284
// The notable exception here is when there is a timeout, which has a separate goroutine.
285285
func (s *rrSegFetcher) handleData(args ndn.ExpressCallbackArgs, state *ConsumeState) {
286-
s.client.Validate(args.Data, args.SigCovered, func(valid bool, err error) {
287-
if !valid {
288-
state.finalizeError(fmt.Errorf("%w: validate seg failed: %w", ndn.ErrSecurity, err))
289-
} else {
290-
s.handleValidatedData(args, state)
291-
}
286+
s.client.ValidateExt(ndn.ValidateExtArgs{
287+
Data: args.Data,
288+
SigCovered: args.SigCovered,
289+
IgnoreValidity: state.args.IgnoreValidity,
290+
Callback: func(valid bool, err error) {
291+
if !valid {
292+
state.finalizeError(fmt.Errorf("%w: validate seg failed: %w", ndn.ErrSecurity, err))
293+
} else {
294+
s.handleValidatedData(args, state)
295+
}
296+
},
292297
})
293298
}
294299

std/sync/snapshot_node_history.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/named-data/ndnd/std/log"
99
"github.com/named-data/ndnd/std/ndn"
1010
"github.com/named-data/ndnd/std/ndn/svs_ps"
11+
"github.com/named-data/ndnd/std/types/optional"
1112
)
1213

1314
const SnapHistoryIndexFreshness = time.Millisecond * 10
@@ -43,6 +44,8 @@ type SnapshotNodeHistory struct {
4344

4445
// In Repo mode, all snapshots are fetched automtically for persistence.
4546
IsRepo bool
47+
// IgnoreValidity ignores validity period in the validation chain
48+
IgnoreValidity optional.Optional[bool]
4649
// repoKnown is the known snapshot sequence number.
4750
repoKnown SvMap[uint64]
4851

@@ -155,8 +158,12 @@ func (s *SnapshotNodeHistory) idxName(node enc.Name, boot uint64) enc.Name {
155158

156159
// fetchIndex fetches the latest index for a remote node.
157160
func (s *SnapshotNodeHistory) fetchIndex(node enc.Name, boot uint64, known uint64) {
158-
s.Client.Consume(s.idxName(node, boot), func(cstate ndn.ConsumeState) {
159-
go s.handleIndex(node, boot, known, cstate)
161+
s.Client.ConsumeExt(ndn.ConsumeExtArgs{
162+
Name: s.idxName(node, boot),
163+
IgnoreValidity: s.IgnoreValidity,
164+
Callback: func(cstate ndn.ConsumeState) {
165+
go s.handleIndex(node, boot, known, cstate)
166+
},
160167
})
161168
}
162169

@@ -199,7 +206,11 @@ func (s *SnapshotNodeHistory) handleIndex(node enc.Name, boot uint64, known uint
199206
snapC := make(chan ndn.ConsumeState)
200207

201208
snapName := s.snapName(node, boot).WithVersion(seqNo)
202-
s.Client.Consume(snapName, func(cstate ndn.ConsumeState) { snapC <- cstate })
209+
s.Client.ConsumeExt(ndn.ConsumeExtArgs{
210+
Name: snapName,
211+
IgnoreValidity: s.IgnoreValidity,
212+
Callback: func(cstate ndn.ConsumeState) { snapC <- cstate },
213+
})
203214

204215
scstate := <-snapC
205216
if err := scstate.Error(); err != nil {

std/sync/snapshot_node_latest.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
enc "github.com/named-data/ndnd/std/encoding"
88
"github.com/named-data/ndnd/std/log"
99
"github.com/named-data/ndnd/std/ndn"
10+
"github.com/named-data/ndnd/std/types/optional"
1011
)
1112

1213
// SnapshotNodeLatest is a snapshot strategy that takes a snapshot of the
@@ -32,6 +33,8 @@ type SnapshotNodeLatest struct {
3233
SnapMe func(enc.Name) (enc.Wire, error)
3334
// Threshold is the number of updates before a snapshot is taken.
3435
Threshold uint64
36+
// IgnoreValidity ignores validity period in the validation chain
37+
IgnoreValidity optional.Optional[bool]
3538

3639
// pss is the struct from the svs layer.
3740
pss snapPsState
@@ -113,15 +116,19 @@ func (s *SnapshotNodeLatest) snapName(node enc.Name, boot uint64) enc.Name {
113116
// fetchSnap fetches the latest snapshot for a remote node.
114117
func (s *SnapshotNodeLatest) fetchSnap(node enc.Name, boot uint64) {
115118
// Discover the latest snapshot
116-
s.Client.Consume(s.snapName(node, boot), func(cstate ndn.ConsumeState) {
117-
if cstate.Error() != nil {
118-
// Do not try too fast in case NFD returns NACK
119-
time.AfterFunc(2*time.Second, func() {
119+
s.Client.ConsumeExt(ndn.ConsumeExtArgs{
120+
Name: s.snapName(node, boot),
121+
IgnoreValidity: s.IgnoreValidity,
122+
Callback: func(cstate ndn.ConsumeState) {
123+
if cstate.Error() != nil {
124+
// Do not try too fast in case NFD returns NACK
125+
time.AfterFunc(2*time.Second, func() {
126+
s.handleSnap(node, boot, cstate)
127+
})
128+
} else {
120129
s.handleSnap(node, boot, cstate)
121-
})
122-
} else {
123-
s.handleSnap(node, boot, cstate)
124-
}
130+
}
131+
},
125132
})
126133
}
127134

std/sync/svs.go

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ type SvSyncOpts struct {
6969

7070
// Passive mode does not send sign Sync Interests
7171
Passive bool
72+
// IgnoreValidity ignores validity period in the validation chain
73+
IgnoreValidity optional.Optional[bool]
7274
}
7375

7476
type SvSyncUpdate struct {
@@ -511,24 +513,29 @@ func (s *SvSync) onSyncData(dataWire enc.Wire) {
511513
}
512514

513515
// Validate signature
514-
s.o.Client.Validate(data, sigCov, func(valid bool, err error) {
515-
if !valid || err != nil {
516-
log.Warn(s, "SvSync failed to validate signature", "name", data.Name(), "valid", valid, "err", err)
517-
return
518-
}
516+
s.o.Client.ValidateExt(ndn.ValidateExtArgs{
517+
Data: data,
518+
SigCovered: sigCov,
519+
IgnoreValidity: s.o.IgnoreValidity,
520+
Callback: func(valid bool, err error) {
521+
if !valid || err != nil {
522+
log.Warn(s, "SvSync failed to validate signature", "name", data.Name(), "valid", valid, "err", err)
523+
return
524+
}
519525

520-
// Decode state vector
521-
svWire := data.Content().Join()
522-
params, err := spec_svs.ParseSvsData(enc.NewBufferView(svWire), false)
523-
if err != nil || params.StateVector == nil {
524-
log.Warn(s, "onSyncInterest failed to parse StateVec", "err", err)
525-
return
526-
}
526+
// Decode state vector
527+
svWire := data.Content().Join()
528+
params, err := spec_svs.ParseSvsData(enc.NewBufferView(svWire), false)
529+
if err != nil || params.StateVector == nil {
530+
log.Warn(s, "onSyncInterest failed to parse StateVec", "err", err)
531+
return
532+
}
527533

528-
s.recvSv <- svSyncRecvSvArgs{
529-
sv: params.StateVector,
530-
data: dataWire,
531-
}
534+
s.recvSv <- svSyncRecvSvArgs{
535+
sv: params.StateVector,
536+
data: dataWire,
537+
}
538+
},
532539
})
533540
}
534541

std/sync/svs_alo.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,14 @@ func NewSvsALO(opts SvsAloOpts) (*SvsALO, error) {
140140
}, s.state)
141141
}
142142

143+
// Overeide IgnoreValidity from SVS (incorect but practical)
144+
if latest, ok := s.opts.Snapshot.(*SnapshotNodeLatest); ok {
145+
latest.IgnoreValidity = s.opts.Svs.IgnoreValidity
146+
}
147+
if history, ok := s.opts.Snapshot.(*SnapshotNodeHistory); ok {
148+
history.IgnoreValidity = s.opts.Svs.IgnoreValidity
149+
}
150+
143151
return s, nil
144152
}
145153

0 commit comments

Comments
 (0)