Skip to content

Commit d7295a6

Browse files
committed
Pruner: A wapper to support DelPattern
1 parent ac81886 commit d7295a6

File tree

7 files changed

+875
-0
lines changed

7 files changed

+875
-0
lines changed

pruner/doc.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package pruner
2+
3+
// Support for removing a pattern from a matcher
4+
//
5+
// The core quamina.Matcher doesn't currently support deleting
6+
// patterns. Some of the contemplated implementations of would
7+
// probably be pretty difficult. At least one approach is pretty
8+
// easy: Wrap the current matcher to filter removed patterns from
9+
// match results and periodically rebuild the matcher from scrach with
10+
// the live patterns. More specifically:
11+
//
12+
// 1. Remember patterns that have been added
13+
// 2. Remember patterns that have been removed (implicitly)
14+
// 3. Filter MatchedFor...() results to remove any removed patterns
15+
// 4. Support rebuilding the matcher state periodically with only the
16+
// live patterns.
17+
// 5. Maintain some statistics to help decide when to rebuild
18+
//
19+
// The implementation of the set of live patterns is pluggable via a
20+
// Go interface (State). The default implementation is a
21+
// `map[quamina.X]string (MemState)`. Other implementations can
22+
// provide persistence.
23+
//
24+
// By default, rebuilding is triggered automatically (synchronously
25+
// currently) during mutations. The code also supports pluggable
26+
// rebuilding policies, but those features are not currently exposed.
27+
// A Rebuild method is available for the application to force a
28+
// rebuild.

pruner/pruner.go

Lines changed: 327 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,327 @@
1+
package pruner
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
quamina "quamina/lib"
8+
)
9+
10+
// Stats reports basic counts to aid in deciding when to Rebuild.
11+
type Stats struct {
12+
// Some of these values are ints instead of uints because Go
13+
// likes to print uints in hex, and I'd like to see some
14+
// temporary logging output in decimal. ToDo: back to uints?
15+
16+
// Live the count of total live patterns.
17+
Live int
18+
19+
// Added is the count of total patterns added.
20+
Added int
21+
22+
// Deleted is the count of the patterns removed.
23+
Deleted int
24+
25+
// Emitted is the count to total patterns found since the last
26+
// rebuild.
27+
Emitted int64
28+
29+
// Filtered is the count of pattners that have been removed
30+
// from MatchFor results (since the last rebuild) because
31+
// their patterns had been removed.
32+
Filtered int64
33+
34+
// LastRebuilt is the time the last rebuild started.
35+
LastRebuilt time.Time
36+
37+
// RebuildDuration is the duration of the last rebuild.
38+
RebuildDuration time.Duration
39+
40+
// RebuildPurged is the count of patterns removed during
41+
// rebuild.
42+
RebuildPurged int
43+
}
44+
45+
// Matcher provides DelPattern on top of quamina.Matcher.
46+
//
47+
// Matcher maintains the set of live patterns, and it will rebuild the
48+
// underlying matcher synchronously (currently) periodically sduring
49+
// standard operations (AddPattern, DelPattern, MatchesForJSONEvent).
50+
//
51+
// Roughly speaking, the current rebuild policy automatically rebuilds
52+
// the index when the ratio of filtered patterns to emitted patterns
53+
// exceeds 0.2 (and if there's been some traffic).
54+
//
55+
// An application can call Rebuild to force a rebuild at any any. See
56+
// Stats() to obtain some useful statistics about the matcher.
57+
//
58+
// Eventually automatically-invoked rebuild policies might be
59+
// pluggable.
60+
type Matcher struct {
61+
// Matcher is the underlying matcher that does the hard work.
62+
//
63+
// Matcher should maybe not be public.
64+
*quamina.Matcher
65+
66+
// live is live set of patterns.
67+
live State
68+
69+
stats Stats
70+
71+
// rebuildTrigger, if not nil, determines when a mutation
72+
// triggers a rebuild.
73+
//
74+
// If nil, no automatic Rebuild is ever triggered.
75+
rebuildTrigger rebuildTrigger
76+
77+
// lock protectes the pointer the underlying Matcher as well as stats.
78+
//
79+
// The Matcher pointer is updated after a successful Rebuild.
80+
// Stats are updated by Add, Del, and Rebuild.
81+
lock sync.RWMutex
82+
}
83+
84+
var defaultRebuildTrigger = newTooMuchFiltering(0.2, 1000)
85+
86+
// tooMuchFiltering is the standard rebuildTrigger, which will fire
87+
// when:
88+
//
89+
// MinAction is less than the sum of counts of found and filtered
90+
// patterns and
91+
//
92+
// FilteredToEmitted is greater than the ratio of counts of filtered
93+
// and found patterns.
94+
//
95+
// defaultRebuildTrigger provides the default trigger policy used by
96+
// NewMatcher.
97+
type tooMuchFiltering struct {
98+
FilteredToEmitted float64
99+
MinAction int64
100+
}
101+
102+
func newTooMuchFiltering(ratio float64, min int64) *tooMuchFiltering {
103+
return &tooMuchFiltering{
104+
FilteredToEmitted: ratio,
105+
MinAction: min,
106+
}
107+
}
108+
109+
func (t *tooMuchFiltering) Rebuild(added bool, s *Stats) bool {
110+
if added {
111+
// No need to think when we're adding a pattern since
112+
// that operation cannot result in an increase of
113+
// filtered patterns -- I think.
114+
return false
115+
}
116+
117+
if s.Emitted+s.Filtered < t.MinAction {
118+
return false
119+
}
120+
121+
if s.Emitted == 0 {
122+
return true
123+
}
124+
125+
var (
126+
numerator = float64(s.Filtered)
127+
denominator = float64(s.Emitted)
128+
ratio = numerator / denominator
129+
)
130+
131+
return t.FilteredToEmitted < ratio
132+
}
133+
134+
// DisableRebuild will prevent any automatic rebuilds.
135+
func (m *Matcher) DisableRebuild() {
136+
m.lock.Lock()
137+
m.rebuildTrigger = nil
138+
m.lock.Unlock()
139+
}
140+
141+
// rebuildTrigger provides a way to control when rebuilds are
142+
// automatically triggered during standard operations.
143+
//
144+
// Currently an AddPattern, DelPattern, or MatchesForJSONEvent can
145+
// trigger a rebuild. When a rebuild is triggered, it's executed
146+
// synchronous, the the Add/Del method doesn't return until the
147+
// rebuild is complete.
148+
type rebuildTrigger interface {
149+
// Rebuild should return true to trigger a rebuild.
150+
//
151+
// This method is called by AddPattern and DelPattern. added
152+
// is true when called by AddPattern; false otherwise. These
153+
// methods do not return until the rebuild is complete, so
154+
// beware.
155+
Rebuild(added bool, s *Stats) bool
156+
}
157+
158+
// NewMatcher does what you'd expect.
159+
//
160+
// The State defaults to MemState.
161+
func NewMatcher(s State) *Matcher {
162+
if s == nil {
163+
s = NewMemState()
164+
}
165+
return &Matcher{
166+
Matcher: quamina.NewMatcher(),
167+
live: s,
168+
rebuildTrigger: defaultRebuildTrigger,
169+
}
170+
}
171+
172+
// maybeRebuild calls rebuildTrigger (if not) and then calls rebuild()
173+
// if that trigger said to do that. If rebuildTrigger is nil, no
174+
// rebuild is executed.
175+
//
176+
// This method assumes the caller has a write lock.
177+
func (m *Matcher) maybeRebuild(added bool) error {
178+
if m.rebuildTrigger == nil {
179+
return nil
180+
}
181+
if m.rebuildTrigger.Rebuild(added, &m.stats) {
182+
return m.rebuild(added)
183+
}
184+
185+
return nil
186+
}
187+
188+
// AddPattern calls the underlying AddPattern method and then maybe
189+
// rebuilds the index (if the AddPattern succeeded).
190+
func (m *Matcher) AddPattern(x quamina.X, pat string) error {
191+
var err error
192+
193+
// Do we m.live.Add first, do we m.Matcher.AddPattern first?
194+
if err = m.Matcher.AddPattern(x, pat); err == nil {
195+
m.lock.Lock()
196+
m.stats.Added++
197+
m.stats.Live++
198+
m.maybeRebuild(true)
199+
m.lock.Unlock()
200+
err = m.live.Add(x, pat)
201+
// ToDo: Contemplate what do to about an error here
202+
// (or if we got an error from AddPattern after we did
203+
// live.Add.
204+
}
205+
206+
return err
207+
}
208+
209+
// MatchesForJSONEvent calls the underlying MatchesForJSONEvent method
210+
// and then maybe rebuilds the index.
211+
func (m *Matcher) MatchesForJSONEvent(event []byte) ([]quamina.X, error) {
212+
xs, err := m.Matcher.MatchesForJSONEvent(event)
213+
if err != nil {
214+
return nil, err
215+
}
216+
217+
// Delove any X that isn't in the live set.
218+
219+
acc := make([]quamina.X, 0, len(xs))
220+
221+
// We're updating stats.Filtered, so we need a write lock. If
222+
// we forget about stats.Filtered, we can live with only a
223+
// read lock here.
224+
var emitted, filtered int64
225+
for _, x := range xs {
226+
var have string
227+
if have, err = m.live.Get(x); err != nil {
228+
break
229+
}
230+
if have == "" {
231+
filtered++
232+
continue
233+
}
234+
acc = append(acc, x)
235+
emitted++
236+
}
237+
238+
if err != nil {
239+
return nil, err
240+
}
241+
242+
m.lock.Lock()
243+
m.stats.Filtered += filtered
244+
m.stats.Emitted += emitted
245+
m.maybeRebuild(false)
246+
m.lock.Unlock()
247+
248+
return acc, nil
249+
}
250+
251+
// DelPattern "removes" the pattern from the index and maybe rebuilds
252+
// the index.
253+
func (m *Matcher) DelPattern(x quamina.X) (bool, error) {
254+
had, err := m.live.Del(x)
255+
if err == nil {
256+
if had {
257+
m.lock.Lock()
258+
m.stats.Deleted++
259+
m.stats.Live--
260+
m.maybeRebuild(false)
261+
m.lock.Unlock()
262+
}
263+
}
264+
265+
return had, err
266+
}
267+
268+
// Rebuild rebuilds the matcher state based on only live patterns.
269+
//
270+
// If calling fearlessly, then the old matcher is released before
271+
// building the new one.
272+
//
273+
// This method resets the Stats.
274+
func (m *Matcher) Rebuild(fearlessly bool) error {
275+
m.lock.Lock()
276+
err := m.rebuild(fearlessly)
277+
m.lock.Unlock()
278+
return err
279+
}
280+
281+
// rebuild is Rebuild but assumes having the lock.
282+
func (m *Matcher) rebuild(fearlessly bool) error {
283+
// We assume we have the lock.
284+
285+
// Nothing fancy here now.
286+
287+
var (
288+
then = time.Now()
289+
m1 = quamina.NewMatcher()
290+
)
291+
292+
if fearlessly {
293+
// Let the GC reduce heap requirements?
294+
m.Matcher = nil
295+
}
296+
297+
count := 0
298+
err := m.live.Iterate(func(x quamina.X, p string) error {
299+
err := m1.AddPattern(x, p)
300+
if err == nil {
301+
count++
302+
}
303+
return err
304+
})
305+
306+
if err == nil {
307+
m.Matcher = m1
308+
m.stats.RebuildPurged = m.stats.Deleted
309+
m.stats.Live = count
310+
m.stats.Added = 0
311+
m.stats.Deleted = 0
312+
m.stats.Filtered = 0
313+
m.stats.LastRebuilt = then
314+
m.stats.RebuildDuration = time.Now().Sub(then)
315+
}
316+
317+
return err
318+
}
319+
320+
// Stats returns some statistics that might be helpful to rebuild
321+
// policies.
322+
func (m *Matcher) Stats() Stats {
323+
m.lock.RLock()
324+
s := m.stats // Copies
325+
m.lock.RUnlock()
326+
return s
327+
}

0 commit comments

Comments
 (0)