Skip to content

Commit e2380e9

Browse files
committed
core/filtermaps: two dimensional log filter
1 parent ea3b509 commit e2380e9

File tree

10 files changed

+1312
-16
lines changed

10 files changed

+1312
-16
lines changed

core/filtermaps/filtermaps.go

Lines changed: 431 additions & 0 deletions
Large diffs are not rendered by default.

core/filtermaps/indexer.go

Lines changed: 423 additions & 0 deletions
Large diffs are not rendered by default.

core/filtermaps/matcher.go

Lines changed: 304 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,304 @@
1+
package filtermaps
2+
3+
import (
4+
"context"
5+
"sync"
6+
7+
"github.com/ethereum/go-ethereum/common"
8+
"github.com/ethereum/go-ethereum/core/types"
9+
)
10+
11+
type Backend interface {
12+
GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error) // returns head lv pointer if blockNumber > head number
13+
GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error)
14+
GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error)
15+
}
16+
17+
func GetPotentialMatches(ctx context.Context, backend Backend, firstBlock, lastBlock uint64, addresses []common.Address, topics [][]common.Hash) ([]*types.Log, error) {
18+
firstIndex, err := backend.GetBlockLvPointer(ctx, firstBlock)
19+
if err != nil {
20+
return nil, err
21+
}
22+
lastIndex, err := backend.GetBlockLvPointer(ctx, lastBlock+1)
23+
if err != nil {
24+
return nil, err
25+
}
26+
firstMap, lastMap := uint32(firstIndex>>logMapsPerEpoch), uint32(lastIndex>>logMapsPerEpoch)
27+
28+
matcher := make(matchSequence, len(topics)+1)
29+
matchAddress := make(matchAny, len(addresses))
30+
for i, address := range addresses {
31+
matchAddress[i] = &singleMatcher{backend: backend, value: addressValue(address)}
32+
}
33+
matcher[0] = newCachedMatcher(matchAddress, firstMap)
34+
for i, topicList := range topics {
35+
matchTopic := make(matchAny, len(topicList))
36+
for j, topic := range topicList {
37+
matchTopic[j] = &singleMatcher{backend: backend, value: topicValue(topic)}
38+
}
39+
matcher[i+1] = newCachedMatcher(matchTopic, firstMap)
40+
}
41+
42+
var logs []*types.Log
43+
for mapIndex := firstMap; mapIndex <= lastMap; mapIndex++ {
44+
//TODO parallelize, check total data size
45+
matches, err := matcher.getMatches(ctx, mapIndex)
46+
if err != nil {
47+
return logs, err
48+
}
49+
matcher.clearUntil(mapIndex)
50+
mlogs, err := getLogsFromMatches(ctx, backend, firstIndex, lastIndex, matches)
51+
if err != nil {
52+
return logs, err
53+
}
54+
logs = append(logs, mlogs...)
55+
}
56+
return logs, nil
57+
}
58+
59+
type matcher interface {
60+
getMatches(ctx context.Context, mapIndex uint32) (potentialMatches, error)
61+
clearUntil(mapIndex uint32)
62+
}
63+
64+
func getLogsFromMatches(ctx context.Context, backend Backend, firstIndex, lastIndex uint64, matches potentialMatches) ([]*types.Log, error) {
65+
var logs []*types.Log
66+
for _, match := range matches {
67+
if match < firstIndex || match > lastIndex {
68+
continue
69+
}
70+
log, err := backend.GetLogByLvIndex(ctx, match)
71+
if err != nil {
72+
return logs, err
73+
}
74+
logs = append(logs, log)
75+
}
76+
return logs, nil
77+
}
78+
79+
// singleMatcher implements matcher
80+
type singleMatcher struct {
81+
backend Backend
82+
value common.Hash
83+
}
84+
85+
func (s *singleMatcher) getMatches(ctx context.Context, mapIndex uint32) (potentialMatches, error) {
86+
filterRow, err := s.backend.GetFilterMapRow(ctx, mapIndex, rowIndex(mapIndex>>logMapsPerEpoch, s.value))
87+
if err != nil {
88+
return nil, err
89+
}
90+
return filterRow.potentialMatches(mapIndex, s.value), nil
91+
}
92+
93+
func (s *singleMatcher) clearUntil(mapIndex uint32) {}
94+
95+
// matchAny implements matcher
96+
type matchAny []matcher
97+
98+
func (m matchAny) getMatches(ctx context.Context, mapIndex uint32) (potentialMatches, error) {
99+
if len(m) == 0 {
100+
return nil, nil
101+
}
102+
if len(m) == 1 {
103+
return m[0].getMatches(ctx, mapIndex)
104+
}
105+
matches := make([]potentialMatches, len(m))
106+
for i, matcher := range m {
107+
var err error
108+
if matches[i], err = matcher.getMatches(ctx, mapIndex); err != nil {
109+
return nil, err
110+
}
111+
}
112+
return mergeResults(matches), nil
113+
}
114+
115+
func (m matchAny) clearUntil(mapIndex uint32) {
116+
for _, matcher := range m {
117+
matcher.clearUntil(mapIndex)
118+
}
119+
}
120+
121+
func mergeResults(results []potentialMatches) potentialMatches {
122+
if len(results) == 0 {
123+
return nil
124+
}
125+
var sumLen int
126+
for _, res := range results {
127+
if res == nil {
128+
// nil is a wild card; all indices in map range are potential matches
129+
return nil
130+
}
131+
sumLen += len(res)
132+
}
133+
merged := make(potentialMatches, 0, sumLen)
134+
for {
135+
best := -1
136+
for i, res := range results {
137+
if len(res) == 0 {
138+
continue
139+
}
140+
if best < 0 || res[0] < results[best][0] {
141+
best = i
142+
}
143+
}
144+
if best < 0 {
145+
return merged
146+
}
147+
if len(merged) == 0 || results[best][0] > merged[len(merged)-1] {
148+
merged = append(merged, results[best][0])
149+
}
150+
results[best] = results[best][1:]
151+
}
152+
}
153+
154+
// matchSequence implements matcher
155+
type matchSequence []matcher
156+
157+
func (m matchSequence) getMatches(ctx context.Context, mapIndex uint32) (potentialMatches, error) {
158+
if len(m) == 0 {
159+
return nil, nil
160+
}
161+
if len(m) == 1 {
162+
return m[0].getMatches(ctx, mapIndex)
163+
}
164+
base, next, offset := m[:len(m)-1], m[len(m)-1], uint64(len(m)-1)
165+
baseRes, err := base.getMatches(ctx, mapIndex)
166+
if err != nil {
167+
return nil, err
168+
}
169+
if baseRes != nil && len(baseRes) == 0 {
170+
return baseRes, nil
171+
}
172+
nextRes, err := next.getMatches(ctx, mapIndex)
173+
if err != nil {
174+
return nil, err
175+
}
176+
nextNextRes, err := next.getMatches(ctx, mapIndex+1)
177+
if err != nil {
178+
return nil, err
179+
}
180+
return matchResults(mapIndex, offset, baseRes, nextRes, nextNextRes), nil
181+
}
182+
183+
func (m matchSequence) clearUntil(mapIndex uint32) {
184+
for _, matcher := range m {
185+
matcher.clearUntil(mapIndex)
186+
}
187+
}
188+
189+
func matchResults(mapIndex uint32, offset uint64, baseRes, nextRes, nextNextRes potentialMatches) potentialMatches {
190+
if nextRes == nil || (baseRes != nil && len(baseRes) == 0) {
191+
return baseRes
192+
}
193+
if len(nextRes) > 0 {
194+
start := 0
195+
for start < len(nextRes) && nextRes[start] < uint64(mapIndex)<<logValuesPerMap+offset {
196+
start++
197+
}
198+
nextRes = nextRes[start:]
199+
}
200+
if len(nextNextRes) > 0 {
201+
stop := 0
202+
for stop < len(nextNextRes) && nextNextRes[stop] < uint64(mapIndex+1)<<logValuesPerMap+offset {
203+
stop++
204+
}
205+
nextNextRes = nextNextRes[:stop]
206+
}
207+
maxLen := len(nextRes) + len(nextNextRes)
208+
if maxLen == 0 {
209+
return nextRes
210+
}
211+
if len(baseRes) < maxLen {
212+
maxLen = len(baseRes)
213+
}
214+
matchedRes := make(potentialMatches, 0, maxLen)
215+
for _, res := range []potentialMatches{nextRes, nextNextRes} {
216+
if baseRes != nil {
217+
for len(res) > 0 && len(baseRes) > 0 {
218+
if res[0] > baseRes[0]+offset {
219+
baseRes = baseRes[1:]
220+
} else if res[0] < baseRes[0]+offset {
221+
res = res[1:]
222+
} else {
223+
matchedRes = append(matchedRes, baseRes[0])
224+
res = res[1:]
225+
baseRes = baseRes[1:]
226+
}
227+
}
228+
} else {
229+
for len(res) > 0 {
230+
matchedRes = append(matchedRes, res[0]-offset)
231+
res = res[1:]
232+
}
233+
}
234+
}
235+
return matchedRes
236+
}
237+
238+
type cachedMatcher struct {
239+
lock sync.Mutex
240+
matcher matcher
241+
cache map[uint32]cachedMatches
242+
clearedBefore uint32
243+
}
244+
245+
func newCachedMatcher(m matcher, start uint32) *cachedMatcher {
246+
return &cachedMatcher{
247+
matcher: m,
248+
cache: make(map[uint32]cachedMatches),
249+
clearedBefore: start,
250+
}
251+
}
252+
253+
type cachedMatches struct {
254+
matches potentialMatches
255+
reqCh chan struct{}
256+
}
257+
258+
func (c *cachedMatcher) getMatches(ctx context.Context, mapIndex uint32) (potentialMatches, error) {
259+
c.lock.Lock()
260+
if mapIndex < c.clearedBefore {
261+
panic("invalid cachedMatcher access")
262+
}
263+
cm, existed := c.cache[mapIndex]
264+
if !existed {
265+
cm = cachedMatches{reqCh: make(chan struct{})}
266+
c.cache[mapIndex] = cm
267+
}
268+
c.lock.Unlock()
269+
if existed {
270+
if cm.matches != nil {
271+
return cm.matches, nil
272+
}
273+
select {
274+
case <-cm.reqCh:
275+
case <-ctx.Done():
276+
return nil, ctx.Err()
277+
}
278+
c.lock.Lock()
279+
cm = c.cache[mapIndex]
280+
c.lock.Unlock()
281+
if cm.matches == nil {
282+
panic("cached matches missing")
283+
}
284+
return cm.matches, nil
285+
}
286+
matches, err := c.matcher.getMatches(ctx, mapIndex)
287+
if err != nil {
288+
return nil, err
289+
}
290+
c.lock.Lock()
291+
c.cache[mapIndex] = cachedMatches{matches: matches}
292+
c.lock.Unlock()
293+
close(cm.reqCh)
294+
return matches, nil
295+
}
296+
297+
func (c *cachedMatcher) clearUntil(mapIndex uint32) {
298+
c.lock.Lock()
299+
for c.clearedBefore <= mapIndex {
300+
delete(c.cache, c.clearedBefore)
301+
c.clearedBefore++
302+
}
303+
c.lock.Unlock()
304+
}

core/rawdb/accessors_indexes.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package rawdb
1818

1919
import (
2020
"bytes"
21+
"encoding/binary"
22+
"errors"
2123
"math/big"
2224

2325
"github.com/ethereum/go-ethereum/common"
@@ -179,3 +181,79 @@ func DeleteBloombits(db ethdb.Database, bit uint, from uint64, to uint64) {
179181
log.Crit("Failed to delete bloom bits", "err", it.Error())
180182
}
181183
}
184+
185+
func ReadFilterMapRow(db ethdb.KeyValueReader, mapRowIndex uint64) ([]byte, error) {
186+
return db.Get(filterMapRowKey(mapRowIndex))
187+
}
188+
189+
func WriteFilterMapRow(db ethdb.KeyValueWriter, mapRowIndex uint64, rowEnc []byte) {
190+
if err := db.Put(filterMapRowKey(mapRowIndex), rowEnc); err != nil {
191+
log.Crit("Failed to store filter map row", "err", err)
192+
}
193+
}
194+
195+
func DeleteFilterMapRow(db ethdb.KeyValueWriter, mapRowIndex uint64) {
196+
if err := db.Delete(filterMapRowKey(mapRowIndex)); err != nil {
197+
log.Crit("Failed to delete filter map row", "err", err)
198+
}
199+
}
200+
201+
func ReadFilterMapBlockPtr(db ethdb.KeyValueReader, mapIndex uint32) (uint64, error) {
202+
encPtr, err := db.Get(filterMapBlockPtrKey(mapIndex))
203+
if err != nil {
204+
return 0, err
205+
}
206+
if len(encPtr) != 8 {
207+
return 0, errors.New("Invalid block number encoding")
208+
}
209+
return binary.BigEndian.Uint64(encPtr), nil
210+
}
211+
212+
func WriteFilterMapBlockPtr(db ethdb.KeyValueWriter, mapIndex uint32, blockNumber uint64) {
213+
var encPtr [8]byte
214+
binary.BigEndian.PutUint64(encPtr[:], blockNumber)
215+
if err := db.Put(filterMapBlockPtrKey(mapIndex), encPtr[:]); err != nil {
216+
log.Crit("Failed to store filter map block pointer", "err", err)
217+
}
218+
}
219+
220+
func DeleteFilterMapBlockPtr(db ethdb.KeyValueWriter, mapIndex uint32) {
221+
if err := db.Delete(filterMapBlockPtrKey(mapIndex)); err != nil {
222+
log.Crit("Failed to delete filter map block pointer", "err", err)
223+
}
224+
}
225+
226+
func ReadBlockLvPointer(db ethdb.KeyValueReader, blockNumber uint64) (uint64, error) {
227+
encPtr, err := db.Get(blockLVKey(blockNumber))
228+
if err != nil {
229+
return 0, err
230+
}
231+
if len(encPtr) != 8 {
232+
return 0, errors.New("Invalid log value pointer encoding")
233+
}
234+
return binary.BigEndian.Uint64(encPtr), nil
235+
}
236+
237+
func WriteBlockLvPointer(db ethdb.KeyValueWriter, blockNumber, lvPointer uint64) {
238+
var encPtr [8]byte
239+
binary.BigEndian.PutUint64(encPtr[:], lvPointer)
240+
if err := db.Put(blockLVKey(blockNumber), encPtr[:]); err != nil {
241+
log.Crit("Failed to store block log value pointer", "err", err)
242+
}
243+
}
244+
245+
func DeleteBlockLvPointer(db ethdb.KeyValueWriter, blockNumber uint64) {
246+
if err := db.Delete(blockLVKey(blockNumber)); err != nil {
247+
log.Crit("Failed to delete block log value pointer", "err", err)
248+
}
249+
}
250+
251+
func ReadFilterMapsRange(db ethdb.KeyValueReader) ([]byte, error) {
252+
return db.Get(filterMapsRangeKey)
253+
}
254+
255+
func WriteFilterMapsRange(db ethdb.KeyValueWriter, encRange []byte) {
256+
if err := db.Put(filterMapsRangeKey, encRange); err != nil {
257+
log.Crit("Failed to store filter maps range", "err", err)
258+
}
259+
}

0 commit comments

Comments
 (0)