Skip to content

Subscription-demo-raw_message #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: extended-tracer
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package core

import (
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -204,6 +205,7 @@ type BlockChain struct {
chainHeadFeed event.Feed
logsFeed event.Feed
blockProcFeed event.Feed
tracesFeed event.Feed
scope event.SubscriptionScope
genesisBlock *types.Block

Expand Down Expand Up @@ -2537,3 +2539,7 @@ func (bc *BlockChain) SetTrieFlushInterval(interval time.Duration) {
func (bc *BlockChain) GetTrieFlushInterval() time.Duration {
return time.Duration(bc.flushInterval.Load())
}

func (bc *BlockChain) TracersEventsSent(data []json.RawMessage) {
bc.tracesFeed.Send(data)
}
6 changes: 6 additions & 0 deletions core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package core

import (
"encoding/json"
"math/big"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -409,3 +410,8 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript
func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription {
return bc.scope.Track(bc.blockProcFeed.Subscribe(ch))
}

// SubscribeTracesEvent registers a subscription of TracesEvent.
func (bc *BlockChain) SubscribeTracesEvent(ch chan<- []json.RawMessage) event.Subscription {
return bc.scope.Track(bc.tracesFeed.Subscribe(ch))
}
5 changes: 5 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package eth

import (
"context"
"encoding/json"
"errors"
"math/big"
"time"
Expand Down Expand Up @@ -277,6 +278,10 @@ func (b *EthAPIBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.
return b.eth.miner.SubscribePendingLogs(ch)
}

func (b *EthAPIBackend) SubscribeTracesEvent(ch chan<- []json.RawMessage) event.Subscription {
return b.eth.BlockChain().SubscribeTracesEvent(ch)
}

func (b *EthAPIBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
return b.eth.BlockChain().SubscribeChainEvent(ch)
}
Expand Down
31 changes: 31 additions & 0 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,37 @@ func (api *FilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subsc
return rpcSub, nil
}

// Logs creates a subscription that fires for all new log that match the given filter criteria.
func (api *FilterAPI) Traces(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}

rpcSub := notifier.CreateSubscription()

go func() {
traces := make(chan []json.RawMessage)
tracesSub := api.events.SubscribeTraces(traces)

for {
select {
case s := <-traces:
notifier.Notify(rpcSub.ID, s)
case <-rpcSub.Err():
tracesSub.Unsubscribe()
return
case <-notifier.Closed():
tracesSub.Unsubscribe()
return
}
}
}()

return rpcSub, nil
}


// FilterCriteria represents a request to create a new filter.
// Same as ethereum.FilterQuery but with UnmarshalJSON() method.
type FilterCriteria ethereum.FilterQuery
Expand Down
44 changes: 43 additions & 1 deletion eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package filters

import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
Expand Down Expand Up @@ -72,6 +73,7 @@ type Backend interface {
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
SubscribeTracesEvent(ch chan<- []json.RawMessage) event.Subscription

BloomStatus() (uint64, uint64)
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
Expand Down Expand Up @@ -162,6 +164,8 @@ const (
PendingTransactionsSubscription
// BlocksSubscription queries hashes for blocks that are imported
BlocksSubscription
// TracesSubscription keeps track of the normal chain processing operations
TracesSubscription
// LastIndexSubscription keeps track of the last index
LastIndexSubscription
)
Expand All @@ -176,6 +180,8 @@ const (
logsChanSize = 10
// chainEvChanSize is the size of channel listening to ChainEvent.
chainEvChanSize = 10
// tracesChanSize is the size of channel listening to TracesEvent.
tracesChanSize = 100
)

type subscription struct {
Expand All @@ -185,6 +191,7 @@ type subscription struct {
logsCrit ethereum.FilterQuery
logs chan []*types.Log
txs chan []*types.Transaction
traces chan []json.RawMessage
headers chan *types.Header
installed chan struct{} // closed when the filter is installed
err chan error // closed when the filter is uninstalled
Expand All @@ -204,6 +211,7 @@ type EventSystem struct {
rmLogsSub event.Subscription // Subscription for removed log event
pendingLogsSub event.Subscription // Subscription for pending log event
chainSub event.Subscription // Subscription for new chain event
tracesSub event.Subscription // Subscription for traces event

// Channels
install chan *subscription // install filter for event notification
Expand All @@ -213,6 +221,7 @@ type EventSystem struct {
pendingLogsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event
tracesCh chan []json.RawMessage // Channel to receive new traces json.RawMessage
}

// NewEventSystem creates a new manager that listens for event on the given mux,
Expand All @@ -233,6 +242,7 @@ func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem {
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
pendingLogsCh: make(chan []*types.Log, logsChanSize),
chainCh: make(chan core.ChainEvent, chainEvChanSize),
tracesCh: make(chan []json.RawMessage, tracesChanSize),
}

// Subscribe events
Expand All @@ -241,6 +251,7 @@ func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem {
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh)
m.tracesSub = m.backend.SubscribeTracesEvent(m.tracesCh)

// Make sure none of the subscriptions are empty
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil {
Expand Down Expand Up @@ -418,6 +429,24 @@ func (es *EventSystem) SubscribePendingTxs(txs chan []*types.Transaction) *Subsc
return es.subscribe(sub)
}

// SubscribePendingTxs creates a subscription that writes transactions for
// transactions that enter the transaction pool.
func (es *EventSystem) SubscribeTraces(traces chan []json.RawMessage) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: TracesSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
txs: make(chan []*types.Transaction),
traces: traces,
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}


type filterIndex map[Type]map[rpc.ID]*subscription

func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) {
Expand Down Expand Up @@ -471,6 +500,16 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent)
}
}

func (es *EventSystem) handleTraces(filters filterIndex, ev []json.RawMessage) {
if len(ev) == 0 {
return
}
for _, f := range filters[TracesSubscription] {
f.traces <- ev
}
}


func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) {
oldh := es.lastHead
es.lastHead = newHeader
Expand Down Expand Up @@ -550,6 +589,7 @@ func (es *EventSystem) eventLoop() {
es.rmLogsSub.Unsubscribe()
es.pendingLogsSub.Unsubscribe()
es.chainSub.Unsubscribe()
es.tracesSub.Unsubscribe()
}()

index := make(filterIndex)
Expand All @@ -569,7 +609,9 @@ func (es *EventSystem) eventLoop() {
es.handlePendingLogs(index, ev)
case ev := <-es.chainCh:
es.handleChainEvent(index, ev)

case ev := <-es.tracesCh:
es.handleTraces(index, ev)

case f := <-es.install:
if f.typ == MinedAndPendingLogsSubscription {
// the type are logs and pending logs subscriptions
Expand Down
48 changes: 48 additions & 0 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package filters

import (
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
Expand Down Expand Up @@ -50,6 +51,7 @@ type testBackend struct {
rmLogsFeed event.Feed
pendingLogsFeed event.Feed
chainFeed event.Feed
tracesFeed event.Feed
pendingBlock *types.Block
pendingReceipts types.Receipts
}
Expand Down Expand Up @@ -149,6 +151,10 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc
return b.chainFeed.Subscribe(ch)
}

func (b *testBackend) SubscribeTracesEvent(ch chan<- []json.RawMessage) event.Subscription {
return b.tracesFeed.Subscribe(ch)
}

func (b *testBackend) BloomStatus() (uint64, uint64) {
return params.BloomBitsBlocks, b.sections
}
Expand Down Expand Up @@ -245,6 +251,48 @@ func TestBlockSubscription(t *testing.T) {
<-sub1.Err()
}

func TestTracesSubscription(t *testing.T) {
t.Parallel()

var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(t, db, Config{})
api = NewFilterAPI(sys, false)
tracesEvents = [][]json.RawMessage{}
)

for i := 0; i < 10; i++ {
tracesEvents = append(tracesEvents, []json.RawMessage{json.RawMessage(fmt.Sprintf(`{"id":%d}`, i))})
}

chan0 := make(chan []json.RawMessage)
sub0 := api.events.SubscribeTraces(chan0)

go func() { // simulate client
i1 := 0
for i1 != len(tracesEvents) {
select {
case receivedEvents := <-chan0:
if !reflect.DeepEqual(receivedEvents, tracesEvents[i1]) {
t.Errorf("sub0 received invalid event on index %d, want %s, got %s", i1, string(tracesEvents[i1][0]), string(receivedEvents[0]))
}
i1++
}
}

sub0.Unsubscribe()
}()

time.Sleep(1 * time.Second)
for _, e := range tracesEvents {
backend.tracesFeed.Send(e)
}

<-sub0.Err()
}



// TestPendingTxFilter tests whether pending tx filters retrieve all pending transactions that are posted to the event mux.
func TestPendingTxFilter(t *testing.T) {
t.Parallel()
Expand Down
2 changes: 1 addition & 1 deletion eth/filters/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestFilters(t *testing.T) {

// Hack: GenerateChainWithGenesis creates a new db.
// Commit the genesis manually and use GenerateChain.
_, err = gspec.Commit(db, trie.NewDatabase(db))
_, err = gspec.Commit(db, trie.NewDatabase(db), nil)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading