Skip to content

Commit 457b29e

Browse files
committed
cleanup README and use compressed CityLots
1 parent 8583893 commit 457b29e

File tree

11 files changed

+110
-146
lines changed

11 files changed

+110
-146
lines changed

README.md

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -124,17 +124,15 @@ but an AND on the field names.
124124

125125
Note that the `shellstyle` Patterns can include only
126126
one `*` character. The architecture probably allows
127-
support for a larger subset of regular expressions
128-
but currently, the testing for just the single -`*`
129-
patterns is a bit lacking.
127+
support for a larger subset of regular expressions,
128+
eventually.
130129

131130
Number matching is weak - the number has to appear
132131
exactly the same in the pattern and the event. I.e.,
133132
Quamina doesn't know that 35, 35.000, and 3.5e1 are the
134-
same number.
135-
136-
There's a fix for this in the code which is commented
137-
out because it causes a significant performance penalty.
133+
same number. There's a fix for this in the code which
134+
is commented out because it causes a
135+
significant performance penalty.
138136

139137
## Flattening and Matching
140138

@@ -222,8 +220,9 @@ The `error` return is used to signal invalid Pattern
222220
structure, which could be bad UTF-8 or malformed JSON
223221
or leaf values which are not provided as arrays.
224222
225-
As many Patterns as desired can be added to a Matcher
226-
but at this time there is no capability of removing any.
223+
As many Patterns as desired can be added to a Matcher.
224+
The `CoreMatcher` type does not support `DeletePattern()`
225+
but `pruner.Matcher` does.
227226
228227
The `AddPattern` call is single-threaded; if multiple
229228
threads call it, they will block and execute sequentially.
@@ -242,15 +241,20 @@ The `[]X` return slice may be empty if none of the Patterns
242241
match the provided Event.
243242

244243
```go
245-
func (m *Matcher) MatchesForFields([]Field) []X
244+
func (m *Matcher) MatchesForFields([]Field) ([]X, error)
246245
```
247246
Performs the functions of `MatchesForJSON` on an
248247
Event which has been flattened into a list of `Field`
249-
instances.
248+
instances. At the moment, `CoreMatcher` only returns
249+
an error if the `[]Field` argument is nil. `pruner.Matcher`
250+
can return an error if it suffers a failure in its
251+
Pattern storage.
250252

251-
`MatchesForJSONEvent` is thread-safe. Many threads may
253+
These matching calls are thread-safe. Many threads may
252254
be executing it concurrently, even while `AddPattern` is
253-
also executing.
255+
also executing. There is a significant performance
256+
penalty if there is a high rate of `AddPattern` in
257+
combination with matching.
254258

255259
```go
256260
func NewFJ(*Matcher) Flattener
@@ -260,7 +264,7 @@ Creates a new JSON-specific Flattener.
260264
func (fj *FJ) Flatten([]byte event) []Field
261265
```
262266
Transforms an event, which must be JSON object
263-
encoded in UTF-8 into a list of `Field` instances.
267+
encoded in UTF-8, into a list of `Field` instances.
264268

265269
```go
266270
func (fj *FJ) FlattenAndMatch([]byte event) ([]X, error)
@@ -305,7 +309,7 @@ colonies before slavery was abolished.
305309

306310
### Credits
307311

308-
@timbray: v1.0 and patches.
312+
@timbray: v0.1 and patches.
309313

310314
@jsmorph: `Pruner` and concurrency testing.
311315

core/benchmarks_test.go

Lines changed: 39 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,50 @@ package core
22

33
import (
44
"bufio"
5+
"compress/gzip"
56
"fmt"
67
"os"
78
"runtime"
89
"runtime/debug"
10+
"sync"
911
"testing"
1012
"time"
1113
)
1214

1315
const oneMeg = 1024 * 1024
1416

17+
var cityLotsLock sync.Mutex
18+
var cityLotsLines [][]byte
19+
var cityLotsLineCount int
20+
21+
func getCityLotsLines(t *testing.T) [][]byte {
22+
cityLotsLock.Lock()
23+
defer cityLotsLock.Unlock()
24+
if cityLotsLines != nil {
25+
return cityLotsLines
26+
}
27+
file, err := os.Open("../testdata/citylots.jlines.gz")
28+
if err != nil {
29+
t.Error("Can't open citlots.jlines.gz: " + err.Error())
30+
}
31+
defer func(file *os.File) {
32+
_ = file.Close()
33+
}(file)
34+
zr, err := gzip.NewReader(file)
35+
if err != nil {
36+
t.Error("Can't open zip reader: " + err.Error())
37+
}
38+
39+
scanner := bufio.NewScanner(zr)
40+
buf := make([]byte, oneMeg)
41+
scanner.Buffer(buf, oneMeg)
42+
for scanner.Scan() {
43+
cityLotsLineCount++
44+
cityLotsLines = append(cityLotsLines, []byte(scanner.Text()))
45+
}
46+
return cityLotsLines
47+
}
48+
1549
func TestCRANLEIGH(t *testing.T) {
1650

1751
jCranleigh := `{ "type": "Feature", "properties": { "MAPBLKLOT": "7222001", "BLKLOT": "7222001", "BLOCK_NUM": "7222", "LOT_NUM": "001", "FROM_ST": "1", "TO_ST": "1", "STREET": "CRANLEIGH", "ST_TYPE": "DR", "ODD_EVEN": "O" }, "geometry": { "type": "Polygon", "coordinates": [ [ [ -122.472773074480756, 37.73439178240811, 0.0 ], [ -122.47278111723567, 37.73451247621523, 0.0 ], [ -122.47242608711845, 37.73452184591072, 0.0 ], [ -122.472418368113281, 37.734401143064396, 0.0 ], [ -122.472773074480756, 37.73439178240811, 0.0 ] ] ] } }`
@@ -55,14 +89,6 @@ const thresholdPerformance = 1.0
5589
// that it uses geometry/co-ordintes, which will force the fj flattener to process the big arrays of numbers in
5690
// each line. A high proportion of typical Quamina workloads should run faster.
5791
func TestCityLots(t *testing.T) {
58-
file, err := os.Open("../testdata/citylots.jlines")
59-
if err != nil {
60-
t.Error("Can't open file: " + err.Error())
61-
}
62-
defer func(file *os.File) {
63-
_ = file.Close()
64-
}(file)
65-
6692
patterns := []string{
6793
`{ "properties": { "STREET": [ "CRANLEIGH" ] } }`,
6894
`{ "properties": { "STREET": [ "17TH" ], "ODD_EVEN": [ "E"] } }`,
@@ -82,10 +108,7 @@ func TestCityLots(t *testing.T) {
82108
"0011008": 1,
83109
}
84110

85-
scanner := bufio.NewScanner(file)
86-
buf := make([]byte, oneMeg)
87-
scanner.Buffer(buf, oneMeg)
88-
111+
var err error
89112
m := NewCoreMatcher()
90113
for i := range names {
91114
err = m.AddPattern(names[i], patterns[i])
@@ -96,20 +119,13 @@ func TestCityLots(t *testing.T) {
96119
fj := NewFJ(m)
97120
results := make(map[X]int)
98121

99-
lineCount := 0
100-
var lines [][]byte
101-
for scanner.Scan() {
102-
lineCount++
103-
lines = append(lines, []byte(scanner.Text()))
104-
}
105-
lineCount = 0
122+
lines := getCityLotsLines(t)
106123
before := time.Now()
107124
for _, line := range lines {
108125
matches, err := fj.FlattenAndMatch(line)
109126
if err != nil {
110127
t.Error("Matches4JSON: " + err.Error())
111128
}
112-
lineCount++
113129
for _, match := range matches {
114130
count, ok := results[match]
115131
if !ok {
@@ -121,7 +137,7 @@ func TestCityLots(t *testing.T) {
121137
fmt.Println()
122138

123139
elapsed := float64(time.Since(before).Milliseconds())
124-
perSecond := float64(lineCount) / (elapsed / 1000.0)
140+
perSecond := float64(cityLotsLineCount) / (elapsed / 1000.0)
125141
fmt.Printf("%.2f matches/second\n\n", perSecond)
126142

127143
if perSecond < thresholdPerformance {
@@ -135,11 +151,6 @@ func TestCityLots(t *testing.T) {
135151
t.Errorf(message1 + message2)
136152
}
137153

138-
err = scanner.Err()
139-
if err != nil {
140-
t.Error("Scanner error: " + err.Error())
141-
}
142-
143154
if len(results) != len(wanted) {
144155
t.Errorf("got %d results, wanted %d", len(results), len(wanted))
145156
}
@@ -185,18 +196,7 @@ func TestMySoftwareHatesMe(t *testing.T) {
185196

186197
// exercise shellstyle matching a little, is much faster than TestCityLots because it's only working wth one field
187198
func TestBigShellStyle(t *testing.T) {
188-
file, err := os.Open("../testdata/citylots.jlines")
189-
if err != nil {
190-
t.Error("Can't open file: " + err.Error())
191-
}
192-
defer func(file *os.File) {
193-
_ = file.Close()
194-
}(file)
195-
196-
scanner := bufio.NewScanner(file)
197-
buf := make([]byte, oneMeg)
198-
scanner.Buffer(buf, oneMeg)
199-
199+
lines := getCityLotsLines(t)
200200
m := NewCoreMatcher()
201201

202202
wanted := map[X]int{
@@ -231,12 +231,6 @@ func TestBigShellStyle(t *testing.T) {
231231
*/
232232
fmt.Println(matcherStats(m))
233233

234-
lineCount := 0
235-
var lines [][]byte
236-
for scanner.Scan() {
237-
lineCount++
238-
lines = append(lines, []byte(scanner.Text()))
239-
}
240234
lCounts := make(map[X]int)
241235
before := time.Now()
242236
fj := NewFJ(m)
@@ -256,7 +250,7 @@ func TestBigShellStyle(t *testing.T) {
256250
}
257251
}
258252
elapsed := float64(time.Since(before).Milliseconds())
259-
perSecond := float64(lineCount) / (elapsed / 1000.0)
253+
perSecond := float64(cityLotsLineCount) / (elapsed / 1000.0)
260254
fmt.Printf("%.2f matches/second with letter patterns\n\n", perSecond)
261255

262256
for k, wc := range wanted {

core/concurrency_test.go

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
package core
22

33
import (
4-
"bufio"
54
"fmt"
65
"math/rand"
7-
"os"
86
"testing"
97
"time"
108
)
@@ -39,13 +37,7 @@ func TestConcurrency(t *testing.T) {
3937
// I was initially surprised that adding 860 or so changes to the automaton while it's running doesn't seem to
4038
// cause any decrease in performance. But I guess it splits out very cleanly onto another core and really
4139
// doesn't steal any resources from the thread doing the Match calls
42-
file, err := os.Open("../testdata/citylots.jlines")
43-
if err != nil {
44-
t.Error("Can't open file: " + err.Error())
45-
}
46-
defer func(file *os.File) {
47-
_ = file.Close()
48-
}(file)
40+
lines := getCityLotsLines(t)
4941

5042
patterns := []string{
5143
`{ "properties": { "STREET": [ "CRANLEIGH" ] } }`,
@@ -69,10 +61,7 @@ func TestConcurrency(t *testing.T) {
6961
"0011008": 1,
7062
}
7163

72-
scanner := bufio.NewScanner(file)
73-
buf := make([]byte, oneMeg)
74-
scanner.Buffer(buf, oneMeg)
75-
64+
var err error
7665
m := NewCoreMatcher()
7766
for i := range names {
7867
err = m.AddPattern(names[i], patterns[i])
@@ -82,14 +71,8 @@ func TestConcurrency(t *testing.T) {
8271
}
8372
results := make(map[X]int)
8473

85-
lineCount := 0
86-
var lines [][]byte
87-
for scanner.Scan() {
88-
lineCount++
89-
lines = append(lines, []byte(scanner.Text()))
90-
}
9174
use37 := true
92-
lineCount = 0
75+
lineCount := 0
9376
before := time.Now()
9477
ch := make(chan string, 1000)
9578
sent := 0
@@ -117,11 +100,6 @@ func TestConcurrency(t *testing.T) {
117100
perSecond := float64(lineCount) / (elapsed / 1000.0)
118101
fmt.Printf("\n%.2f matches/second with updates\n\n", perSecond)
119102

120-
err = scanner.Err()
121-
if err != nil {
122-
t.Error("Scanner error: " + err.Error())
123-
}
124-
125103
if len(results) != len(wanted) {
126104
t.Errorf("got %d results, wanted %d", len(results), len(wanted))
127105
}

core/core_matcher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ type coreStart struct {
3333
}
3434

3535
// X for anything, should eventually be a generic?
36-
type X interface{}
36+
type X any
3737

3838
func NewCoreMatcher() *CoreMatcher {
3939
m := CoreMatcher{}

core/fj.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type FJ struct {
2828
}
2929

3030
// Reset an FJ struct so it can be re-used and won't need to be reconstructed for each event to be flattened
31-
func (fj *FJ) Reset() {
31+
func (fj *FJ) reset() {
3232
fj.eventIndex = 0
3333
fj.fields = fj.fields[:0]
3434
fj.skipping = 0
@@ -80,7 +80,7 @@ func (fj *FJ) Flatten(event []byte) ([]Field, error) {
8080
if fj.cleanSheet {
8181
fj.cleanSheet = false
8282
} else {
83-
fj.Reset()
83+
fj.reset()
8484
}
8585
if len(event) == 0 {
8686
return nil, fj.error("empty event")
@@ -471,7 +471,7 @@ func (fj *FJ) readStringValue() ([]byte, error) {
471471
} else if ch == '\\' {
472472
val, err := fj.readStringValWithEscapes(valStart)
473473
return val, err
474-
} else if ch <= 0x1f || ch >= byte(ByteCeiling) {
474+
} else if ch <= 0x1f || ch >= byte(byteCeiling) {
475475
return nil, fj.error(fmt.Sprintf("illegal UTF-8 byte %x in string value", ch))
476476
}
477477
if fj.step() != nil {
@@ -501,7 +501,7 @@ func (fj *FJ) readStringValWithEscapes(nameStart int) ([]byte, error) {
501501
return nil, err
502502
}
503503
val = append(val, unescaped...)
504-
} else if ch <= 0x1f || ch >= byte(ByteCeiling) {
504+
} else if ch <= 0x1f || ch >= byte(byteCeiling) {
505505
return nil, fj.error(fmt.Sprintf("illegal UTF-8 byte %x in string value", ch))
506506
} else {
507507
val = append(val, ch)
@@ -529,7 +529,7 @@ func (fj *FJ) readMemberName() ([]byte, error) {
529529
} else if ch == '\\' {
530530
name, err := fj.readMemberNameWithEscapes(nameStart)
531531
return name, err
532-
} else if ch <= 0x1f || ch >= byte(ByteCeiling) {
532+
} else if ch <= 0x1f || ch >= byte(byteCeiling) {
533533
return nil, fj.error(fmt.Sprintf("illegal UTF-8 byte %x in field name", ch))
534534
}
535535
if fj.step() != nil {
@@ -547,7 +547,7 @@ func (fj *FJ) readMemberNameWithEscapes(nameStart int) ([]byte, error) {
547547
if ch == '"' {
548548
fj.eventIndex = from
549549
return memberName, nil
550-
} else if ch <= 0x1f || ch >= byte(ByteCeiling) {
550+
} else if ch <= 0x1f || ch >= byte(byteCeiling) {
551551
return nil, fj.error(fmt.Sprintf("illegal UTF-8 byte %x in field name", ch))
552552
} else if ch == '\\' {
553553
var unescaped []byte

core/flattener.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,5 @@ type Field struct {
3535
type Flattener interface {
3636
Flatten(event []byte) ([]Field, error)
3737
FlattenAndMatch(event []byte) ([]X, error)
38-
Reset()
38+
reset()
3939
}

0 commit comments

Comments
 (0)