|
| 1 | +package quamina |
| 2 | + |
| 3 | +// CoreMatcher represents an automaton that allows matching sequences of name/value field pairs against |
| 4 | +// patterns, which are combinations of field names and lists of allowed valued field values. |
| 5 | +// The field names are called "Paths" because they encode, in a jsonpath-ish style, the pathSegments from the |
| 6 | +// root of an incoming object to the leaf field. |
| 7 | +// Since the order of fields is generally not significant in encoded data objects, the fields are sorted |
| 8 | +// by name before constructing the automaton, and so are the incoming field lists to be matched, allowing |
| 9 | +// the automaton to work. |
| 10 | + |
| 11 | +import ( |
| 12 | + "errors" |
| 13 | + "sort" |
| 14 | + "sync" |
| 15 | +) |
| 16 | + |
| 17 | +// CoreMatcher uses a finite automaton to implement the MatchesForJSONEvent and MatchesForFields functions. |
| 18 | +// startState is the start of the automaton |
| 19 | +// namesUsed is a map of field names that are used in any of the patterns that this automaton encodes. Typically, |
| 20 | +// patterns only consider a subset of the fields in an incoming data object, and there is no reason to consider |
| 21 | +// fields that do not appear in patterns when using the automaton for matching |
| 22 | +type CoreMatcher struct { |
| 23 | + startState *fieldMatcher |
| 24 | + namesUsed map[string]bool |
| 25 | + presumedExistFalseMatches *matchSet |
| 26 | + lock sync.Mutex |
| 27 | +} |
| 28 | + |
| 29 | +// X for anything, should eventually be a generic? |
| 30 | +type X interface{} |
| 31 | + |
| 32 | +func NewCoreMatcher() *CoreMatcher { |
| 33 | + m := CoreMatcher{} |
| 34 | + m.startState = newFieldMatcher() |
| 35 | + m.namesUsed = make(map[string]bool) |
| 36 | + m.presumedExistFalseMatches = newMatchSet() |
| 37 | + return &m |
| 38 | +} |
| 39 | + |
| 40 | +// AddPattern - the patternBytes is a JSON object. The X is what the matcher returns to indicate that the |
| 41 | +// provided pattern has been matched. In many applications it might be a string which is the pattern's name. |
| 42 | +func (m *CoreMatcher) AddPattern(x X, patternJSON string) error { |
| 43 | + patternFields, patternNamesUsed, err := patternFromJSON([]byte(patternJSON)) |
| 44 | + if err != nil { |
| 45 | + return err |
| 46 | + } |
| 47 | + |
| 48 | + sort.Slice(patternFields, func(i, j int) bool { return patternFields[i].path < patternFields[j].path }) |
| 49 | + |
| 50 | + // only one thread can be updating at a time |
| 51 | + // NOTE: threads can be calling MatchesFor* functions at any time as we update the automaton. The goal is to |
| 52 | + // maintain consistency during updates, in the sense that a pattern that has been matching events will not |
| 53 | + // stop working during an update. |
| 54 | + // The matcher contains several map[this]that maps but Go maps aren't thread-safe. This could be solved |
| 55 | + // with a straightforward mutex or the fancy sync.Map, but I succumbed to premature optimization and decided |
| 56 | + // I didn't want any of that stuff in the Match* path. So in each case the map (or map-like structure in |
| 57 | + // smallDfaTable) is copied, the copy updated, then the whole map updated atomically in the containing structure |
| 58 | + // see: https://medium.com/@deckarep/the-new-kid-in-town-gos-sync-map-de24a6bf7c2c |
| 59 | + m.lock.Lock() |
| 60 | + defer m.lock.Unlock() |
| 61 | + |
| 62 | + // this is klunky and slow but don't want any locks in the read path |
| 63 | + newNamesUsed := make(map[string]bool) |
| 64 | + for k := range m.namesUsed { |
| 65 | + newNamesUsed[k] = true |
| 66 | + } |
| 67 | + for used := range patternNamesUsed { |
| 68 | + newNamesUsed[used] = true |
| 69 | + } |
| 70 | + m.namesUsed = newNamesUsed |
| 71 | + |
| 72 | + states := []*fieldMatcher{m.startState} |
| 73 | + for _, field := range patternFields { |
| 74 | + var nextStates []*fieldMatcher |
| 75 | + for _, state := range states { |
| 76 | + ns := state.addTransition(field) |
| 77 | + |
| 78 | + // special handling for exists:false, in which case there's only one next state |
| 79 | + if field.vals[0].vType == existsFalseType { |
| 80 | + ns[0].existsFalseFailures.addX(x) |
| 81 | + m.presumedExistFalseMatches.addX(x) |
| 82 | + } |
| 83 | + nextStates = append(nextStates, ns...) |
| 84 | + } |
| 85 | + states = nextStates |
| 86 | + } |
| 87 | + |
| 88 | + // "states" now holds the set of terminal states arrived at by matching each field in the pattern, |
| 89 | + // so update the matches value to indicate this (skipping those that are only there to serve |
| 90 | + // exists:false processing) |
| 91 | + for _, endState := range states { |
| 92 | + if !endState.existsFalseFailures.contains(x) { |
| 93 | + endState.matches = append(endState.matches, x) |
| 94 | + } |
| 95 | + } |
| 96 | + |
| 97 | + return err |
| 98 | +} |
| 99 | + |
| 100 | +func (m *CoreMatcher) DeletePattern(_ X) error { |
| 101 | + return errors.New("operation not supported") |
| 102 | +} |
| 103 | + |
| 104 | +// MatchesForJSONEvent calls the flattener to pull the fields out of the event and |
| 105 | +// hands over to MatchesForFields |
| 106 | +func (m *CoreMatcher) MatchesForJSONEvent(event []byte) ([]X, error) { |
| 107 | + fields, err := NewFJ(m).Flatten(event) |
| 108 | + if err != nil { |
| 109 | + return nil, err |
| 110 | + } |
| 111 | + matches := m.MatchesForFields(fields) |
| 112 | + return matches, nil |
| 113 | +} |
| 114 | + |
| 115 | +// MatchesForFields takes a list of Field structures and sorts them by pathname; the fields in a pattern to |
| 116 | +// matched are similarly sorted; thus running an automaton over them works |
| 117 | +func (m *CoreMatcher) MatchesForFields(fields []Field) []X { |
| 118 | + sort.Slice(fields, func(i, j int) bool { return string(fields[i].Path) < string(fields[j].Path) }) |
| 119 | + return m.matchesForSortedFields(fields).matches() |
| 120 | +} |
| 121 | + |
| 122 | +// proposedTransition represents a suggestion that the name/value pair at fields[fieldIndex] might allow a transition |
| 123 | +// in the indicated state |
| 124 | +type proposedTransition struct { |
| 125 | + matcher *fieldMatcher |
| 126 | + fieldIndex int |
| 127 | +} |
| 128 | + |
| 129 | +// matchesForSortedFields runs the provided list of name/value pairs against the automaton and returns |
| 130 | +// a possibly-empty list of the patterns that match |
| 131 | +func (m *CoreMatcher) matchesForSortedFields(fields []Field) *matchSet { |
| 132 | + |
| 133 | + failedExistsFalseMatches := newMatchSet() |
| 134 | + matches := newMatchSet() |
| 135 | + |
| 136 | + // The idea is that we add potential field transitions to the proposals list; any time such a transition |
| 137 | + // succeeds, i.e. matches a particular field and moves to a new state, we propose transitions from that |
| 138 | + // state on all the following fields in the event |
| 139 | + // Start by giving each field a chance to match against the start state. Doing it by pre-allocating the |
| 140 | + // proposals and filling in their values is observably faster than the more idiomatic append() |
| 141 | + proposals := make([]proposedTransition, len(fields)) |
| 142 | + for i := range fields { |
| 143 | + proposals[i].fieldIndex = i |
| 144 | + proposals[i].matcher = m.startState |
| 145 | + } |
| 146 | + |
| 147 | + // as long as there are still potential transitions |
| 148 | + for len(proposals) > 0 { |
| 149 | + |
| 150 | + // go slices could usefully have a "pop" primitive |
| 151 | + lastEl := len(proposals) - 1 |
| 152 | + proposal := proposals[lastEl] |
| 153 | + proposals = proposals[0:lastEl] |
| 154 | + |
| 155 | + // generate the possibly-empty list of transitions from state on the name/value pair |
| 156 | + nextStates := proposal.matcher.transitionOn(&fields[proposal.fieldIndex]) |
| 157 | + |
| 158 | + // for each state in the set of transitions from the proposed state |
| 159 | + for _, nextState := range nextStates { |
| 160 | + |
| 161 | + // if arriving at this state means we've matched one or more patterns, record that fact |
| 162 | + for _, nextMatch := range nextState.matches { |
| 163 | + matches.addX(nextMatch) |
| 164 | + } |
| 165 | + |
| 166 | + // have we invalidated a presumed exists:false pattern? |
| 167 | + for existsMatch := range nextState.existsFalseFailures.set { |
| 168 | + failedExistsFalseMatches.addX(existsMatch) |
| 169 | + } |
| 170 | + |
| 171 | + // for each state we've transitioned to, give each subsequent field a chance to |
| 172 | + // transition on it, assuming it's not in an object that's in a different element |
| 173 | + // of the same array |
| 174 | + for nextIndex := proposal.fieldIndex + 1; nextIndex < len(fields); nextIndex++ { |
| 175 | + if noArrayTrailConflict(fields[proposal.fieldIndex].ArrayTrail, fields[nextIndex].ArrayTrail) { |
| 176 | + proposals = append(proposals, proposedTransition{fieldIndex: nextIndex, matcher: nextState}) |
| 177 | + } |
| 178 | + } |
| 179 | + } |
| 180 | + } |
| 181 | + for presumedExistsFalseMatch := range m.presumedExistFalseMatches.set { |
| 182 | + if !failedExistsFalseMatches.contains(presumedExistsFalseMatch) { |
| 183 | + matches.addX(presumedExistsFalseMatch) |
| 184 | + } |
| 185 | + } |
| 186 | + return matches |
| 187 | +} |
| 188 | + |
| 189 | +// Arrays are invisible in the automaton. That is to say, if an event has |
| 190 | +// { "a": [ 1, 2, 3 ] } |
| 191 | +// Then the fields will be a/1, a/2, and a/3 |
| 192 | +// Same for {"a": [[1, 2], 3]} or any other permutation |
| 193 | +// So if you have {"a": [ { "b": 1, "c": 2}, {"b": 3, "c": 4}] } |
| 194 | +// then a pattern like { "a": { "b": 1, "c": 4 } } would match. |
| 195 | +// To prevent that from happening, each ArrayPos contains two numbers; the first identifies the array in |
| 196 | +// the event that this name/val occurred in, the second the position in the array. We don't allow |
| 197 | +// transitioning between field values that occur in different positions in the same array. |
| 198 | +// See the arrays_test unit test for more examples. |
| 199 | +func noArrayTrailConflict(from []ArrayPos, to []ArrayPos) bool { |
| 200 | + for _, fromAPos := range from { |
| 201 | + for _, toAPos := range to { |
| 202 | + if fromAPos.Array == toAPos.Array && fromAPos.Pos != toAPos.Pos { |
| 203 | + return false |
| 204 | + } |
| 205 | + } |
| 206 | + } |
| 207 | + return true |
| 208 | +} |
| 209 | + |
| 210 | +func (m *CoreMatcher) IsNameUsed(label []byte) bool { |
| 211 | + _, ok := m.namesUsed[string(label)] |
| 212 | + return ok |
| 213 | +} |
0 commit comments