diff --git a/core/blockchain.go b/core/blockchain.go index 42e35b3fa622..25f2cac437f0 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -18,6 +18,7 @@ package core import ( + "encoding/json" "errors" "fmt" "io" @@ -204,6 +205,7 @@ type BlockChain struct { chainHeadFeed event.Feed logsFeed event.Feed blockProcFeed event.Feed + tracesFeed event.Feed scope event.SubscriptionScope genesisBlock *types.Block @@ -2537,3 +2539,7 @@ func (bc *BlockChain) SetTrieFlushInterval(interval time.Duration) { func (bc *BlockChain) GetTrieFlushInterval() time.Duration { return time.Duration(bc.flushInterval.Load()) } + +func (bc *BlockChain) TracersEventsSent(data []json.RawMessage) { + bc.tracesFeed.Send(data) +} \ No newline at end of file diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index fd65cb2db32f..8551356ef87f 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -17,6 +17,7 @@ package core import ( + "encoding/json" "math/big" "github.com/ethereum/go-ethereum/common" @@ -409,3 +410,8 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription { return bc.scope.Track(bc.blockProcFeed.Subscribe(ch)) } + +// SubscribeTracesEvent registers a subscription of TracesEvent. +func (bc *BlockChain) SubscribeTracesEvent(ch chan<- []json.RawMessage) event.Subscription { + return bc.scope.Track(bc.tracesFeed.Subscribe(ch)) +} diff --git a/eth/api_backend.go b/eth/api_backend.go index 18aea2d039ae..a6c0223ae1de 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -18,6 +18,7 @@ package eth import ( "context" + "encoding/json" "errors" "math/big" "time" @@ -277,6 +278,10 @@ func (b *EthAPIBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event. return b.eth.miner.SubscribePendingLogs(ch) } +func (b *EthAPIBackend) SubscribeTracesEvent(ch chan<- []json.RawMessage) event.Subscription { + return b.eth.BlockChain().SubscribeTracesEvent(ch) +} + func (b *EthAPIBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { return b.eth.BlockChain().SubscribeChainEvent(ch) } diff --git a/eth/filters/api.go b/eth/filters/api.go index cc08b442e850..90220f6f604f 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -284,6 +284,37 @@ func (api *FilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subsc return rpcSub, nil } +// Logs creates a subscription that fires for all new log that match the given filter criteria. +func (api *FilterAPI) Traces(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + + go func() { + traces := make(chan []json.RawMessage) + tracesSub := api.events.SubscribeTraces(traces) + + for { + select { + case s := <-traces: + notifier.Notify(rpcSub.ID, s) + case <-rpcSub.Err(): + tracesSub.Unsubscribe() + return + case <-notifier.Closed(): + tracesSub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} + + // FilterCriteria represents a request to create a new filter. // Same as ethereum.FilterQuery but with UnmarshalJSON() method. type FilterCriteria ethereum.FilterQuery diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 35e396c23e75..3a6577b51a47 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -20,6 +20,7 @@ package filters import ( "context" + "encoding/json" "errors" "fmt" "sync" @@ -72,6 +73,7 @@ type Backend interface { SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription + SubscribeTracesEvent(ch chan<- []json.RawMessage) event.Subscription BloomStatus() (uint64, uint64) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) @@ -162,6 +164,8 @@ const ( PendingTransactionsSubscription // BlocksSubscription queries hashes for blocks that are imported BlocksSubscription + // TracesSubscription keeps track of the normal chain processing operations + TracesSubscription // LastIndexSubscription keeps track of the last index LastIndexSubscription ) @@ -176,6 +180,8 @@ const ( logsChanSize = 10 // chainEvChanSize is the size of channel listening to ChainEvent. chainEvChanSize = 10 + // tracesChanSize is the size of channel listening to TracesEvent. + tracesChanSize = 100 ) type subscription struct { @@ -185,6 +191,7 @@ type subscription struct { logsCrit ethereum.FilterQuery logs chan []*types.Log txs chan []*types.Transaction + traces chan []json.RawMessage headers chan *types.Header installed chan struct{} // closed when the filter is installed err chan error // closed when the filter is uninstalled @@ -204,6 +211,7 @@ type EventSystem struct { rmLogsSub event.Subscription // Subscription for removed log event pendingLogsSub event.Subscription // Subscription for pending log event chainSub event.Subscription // Subscription for new chain event + tracesSub event.Subscription // Subscription for traces event // Channels install chan *subscription // install filter for event notification @@ -213,6 +221,7 @@ type EventSystem struct { pendingLogsCh chan []*types.Log // Channel to receive new log event rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event chainCh chan core.ChainEvent // Channel to receive new chain event + tracesCh chan []json.RawMessage // Channel to receive new traces json.RawMessage } // NewEventSystem creates a new manager that listens for event on the given mux, @@ -233,6 +242,7 @@ func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem { rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), pendingLogsCh: make(chan []*types.Log, logsChanSize), chainCh: make(chan core.ChainEvent, chainEvChanSize), + tracesCh: make(chan []json.RawMessage, tracesChanSize), } // Subscribe events @@ -241,6 +251,7 @@ func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem { m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh) + m.tracesSub = m.backend.SubscribeTracesEvent(m.tracesCh) // Make sure none of the subscriptions are empty if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil { @@ -418,6 +429,24 @@ func (es *EventSystem) SubscribePendingTxs(txs chan []*types.Transaction) *Subsc return es.subscribe(sub) } +// SubscribePendingTxs creates a subscription that writes transactions for +// transactions that enter the transaction pool. +func (es *EventSystem) SubscribeTraces(traces chan []json.RawMessage) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: TracesSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + txs: make(chan []*types.Transaction), + traces: traces, + headers: make(chan *types.Header), + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + + type filterIndex map[Type]map[rpc.ID]*subscription func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) { @@ -471,6 +500,16 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) } } +func (es *EventSystem) handleTraces(filters filterIndex, ev []json.RawMessage) { + if len(ev) == 0 { + return + } + for _, f := range filters[TracesSubscription] { + f.traces <- ev + } +} + + func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) { oldh := es.lastHead es.lastHead = newHeader @@ -550,6 +589,7 @@ func (es *EventSystem) eventLoop() { es.rmLogsSub.Unsubscribe() es.pendingLogsSub.Unsubscribe() es.chainSub.Unsubscribe() + es.tracesSub.Unsubscribe() }() index := make(filterIndex) @@ -569,7 +609,9 @@ func (es *EventSystem) eventLoop() { es.handlePendingLogs(index, ev) case ev := <-es.chainCh: es.handleChainEvent(index, ev) - + case ev := <-es.tracesCh: + es.handleTraces(index, ev) + case f := <-es.install: if f.typ == MinedAndPendingLogsSubscription { // the type are logs and pending logs subscriptions diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 8f06dff1c594..40a45f157390 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -18,6 +18,7 @@ package filters import ( "context" + "encoding/json" "errors" "fmt" "math/big" @@ -50,6 +51,7 @@ type testBackend struct { rmLogsFeed event.Feed pendingLogsFeed event.Feed chainFeed event.Feed + tracesFeed event.Feed pendingBlock *types.Block pendingReceipts types.Receipts } @@ -149,6 +151,10 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc return b.chainFeed.Subscribe(ch) } +func (b *testBackend) SubscribeTracesEvent(ch chan<- []json.RawMessage) event.Subscription { + return b.tracesFeed.Subscribe(ch) +} + func (b *testBackend) BloomStatus() (uint64, uint64) { return params.BloomBitsBlocks, b.sections } @@ -245,6 +251,48 @@ func TestBlockSubscription(t *testing.T) { <-sub1.Err() } +func TestTracesSubscription(t *testing.T) { + t.Parallel() + + var ( + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(t, db, Config{}) + api = NewFilterAPI(sys, false) + tracesEvents = [][]json.RawMessage{} + ) + + for i := 0; i < 10; i++ { + tracesEvents = append(tracesEvents, []json.RawMessage{json.RawMessage(fmt.Sprintf(`{"id":%d}`, i))}) + } + + chan0 := make(chan []json.RawMessage) + sub0 := api.events.SubscribeTraces(chan0) + + go func() { // simulate client + i1 := 0 + for i1 != len(tracesEvents) { + select { + case receivedEvents := <-chan0: + if !reflect.DeepEqual(receivedEvents, tracesEvents[i1]) { + t.Errorf("sub0 received invalid event on index %d, want %s, got %s", i1, string(tracesEvents[i1][0]), string(receivedEvents[0])) + } + i1++ + } + } + + sub0.Unsubscribe() + }() + + time.Sleep(1 * time.Second) + for _, e := range tracesEvents { + backend.tracesFeed.Send(e) + } + + <-sub0.Err() +} + + + // TestPendingTxFilter tests whether pending tx filters retrieve all pending transactions that are posted to the event mux. func TestPendingTxFilter(t *testing.T) { t.Parallel() diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index 8eea0a267892..6e8f2ce01d7a 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -180,7 +180,7 @@ func TestFilters(t *testing.T) { // Hack: GenerateChainWithGenesis creates a new db. // Commit the genesis manually and use GenerateChain. - _, err = gspec.Commit(db, trie.NewDatabase(db)) + _, err = gspec.Commit(db, trie.NewDatabase(db), nil) if err != nil { t.Fatal(err) } diff --git a/eth/tracers/printer.go b/eth/tracers/printer.go index 98c741783124..fad44bfeeddc 100644 --- a/eth/tracers/printer.go +++ b/eth/tracers/printer.go @@ -6,35 +6,130 @@ import ( "math/big" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" ) -type Printer struct{} +type Printer struct { + eventsChan chan json.RawMessage + triggerEvent chan json.RawMessage + feed bool + eventsBuffer []json.RawMessage // added field to buffer events +} func NewPrinter() *Printer { return &Printer{} } +func NewPrinterWithFeed(bc *core.BlockChain) *Printer { + p := &Printer{ + eventsChan: make(chan json.RawMessage, 100), + triggerEvent: make(chan json.RawMessage, 100), + feed: true, + } + //TODO: Collecting streaming logs through the event loop and distributing them + //triggered by specific events, such as onBlockEnd. + go p.EventLoop(bc) + return p +} + // CaptureStart implements the EVMLogger interface to initialize the tracing operation. func (p *Printer) CaptureStart(from common.Address, to common.Address, create bool, input []byte, gas uint64, value *big.Int) { - fmt.Printf("CaptureStart: from=%v, to=%v, create=%v, input=%s, gas=%v, value=%v\n", from, to, create, hexutil.Bytes(input), gas, value) + fmt.Printf("CaptureStart: from=%v, to=%v, create=%v, input=%v, gas=%v, value=%v\n", from, to, create, hexutil.Bytes(input), gas, value) + if p.feed { + message := map[string]interface{}{ + "event": "CaptureStart", + "from": from.Hex(), + "to": to.Hex(), + "create": create, + "input": input, + "gas": gas, + "value": value.String(), + } + + data, err := json.Marshal(message) + if err != nil { + fmt.Printf("Failed to marshal json: %v\n", err) + return + } + + p.eventsChan <- json.RawMessage(data) + } } // CaptureEnd is called after the call finishes to finalize the tracing. func (p *Printer) CaptureEnd(output []byte, gasUsed uint64, err error) { - fmt.Printf("CaptureEnd: output=%s, gasUsed=%v, err=%v\n", hexutil.Bytes(output), gasUsed, err) + fmt.Printf("CaptureEnd: output=%v, gasUsed=%v, err=%v\n", hexutil.Bytes(output), gasUsed, err) + + if p.feed { + message := map[string]interface{}{ + "event": "CaptureEnd", + "output": output, + "gasUsed": gasUsed, + "error": err.Error(), + } + + data, err := json.Marshal(message) + if err != nil { + fmt.Printf("Failed to marshal json: %v\n", err) + return + } + + p.eventsChan <- json.RawMessage(data) + } } // CaptureState implements the EVMLogger interface to trace a single step of VM execution. func (p *Printer) CaptureState(pc uint64, op vm.OpCode, gas, cost uint64, scope *vm.ScopeContext, rData []byte, depth int, err error) { //fmt.Printf("CaptureState: pc=%v, op=%v, gas=%v, cost=%v, scope=%v, rData=%v, depth=%v, err=%v\n", pc, op, gas, cost, scope, rData, depth, err) +/* if p.feed { + message := map[string]interface{}{ + "event": "CaptureState", + "pc": pc, + "op": op, + "gas": gas, + "cost": cost, + "scope": scope, + "rData": rData, + "depth": depth, + "error": err, + } + + data, err := json.Marshal(message) + if err != nil { + fmt.Printf("CaptureState: failed to marshal JSON: %v\n", err) + return + } + + p.eventsChan <- data + } */ } // CaptureFault implements the EVMLogger interface to trace an execution fault. func (p *Printer) CaptureFault(pc uint64, op vm.OpCode, gas, cost uint64, _ *vm.ScopeContext, depth int, err error) { fmt.Printf("CaptureFault: pc=%v, op=%v, gas=%v, cost=%v, depth=%v, err=%v\n", pc, op, gas, cost, depth, err) + + if p.feed { + message := map[string]interface{}{ + "event": "CaptureFault", + "pc": pc, + "op": op.String(), + "gas": gas, + "cost": cost, + "depth": depth, + "error": err.Error(), + } + + data, err := json.Marshal(message) + if err != nil { + fmt.Printf("Failed to marshal json: %v\n", err) + return + } + + p.eventsChan <- json.RawMessage(data) + } } // CaptureKeccakPreimage is called during the KECCAK256 opcode. @@ -42,13 +137,50 @@ func (p *Printer) CaptureKeccakPreimage(hash common.Hash, data []byte) {} // CaptureEnter is called when EVM enters a new scope (via call, create or selfdestruct). func (p *Printer) CaptureEnter(typ vm.OpCode, from common.Address, to common.Address, input []byte, gas uint64, value *big.Int) { - fmt.Printf("CaptureEnter: typ=%v, from=%v, to=%v, input=%s, gas=%v, value=%v\n", typ, from, to, hexutil.Bytes(input), gas, value) + fmt.Printf("CaptureEnter: typ=%v, from=%v, to=%v, input=%v, gas=%v, value=%v\n", typ, from, to, hexutil.Bytes(input), gas, value) + + if p.feed { + message := map[string]interface{}{ + "event": "CaptureEnter", + "typ": typ.String(), + "from": from.Hex(), + "to": to.Hex(), + "input": input, + "gas": gas, + "value": value.String(), + } + + data, err := json.Marshal(message) + if err != nil { + fmt.Printf("Failed to marshal json: %v\n", err) + return + } + + p.eventsChan <- json.RawMessage(data) + } } // CaptureExit is called when EVM exits a scope, even if the scope didn't // execute any code. func (p *Printer) CaptureExit(output []byte, gasUsed uint64, err error) { - fmt.Printf("CaptureExit: output=%s, gasUsed=%v, err=%v\n", hexutil.Bytes(output), gasUsed, err) + fmt.Printf("CaptureExit: output=%v, gasUsed=%v, err=%v\n", hexutil.Bytes(output), gasUsed, err) + + if p.feed { + message := map[string]interface{}{ + "event": "CaptureExit", + "output": output, + "gasUsed": gasUsed, + "error": err.Error(), + } + + data, err := json.Marshal(message) + if err != nil { + fmt.Printf("Failed to marshal json: %v\n", err) + return + } + + p.eventsChan <- json.RawMessage(data) + } } func (p *Printer) CaptureTxStart(env *vm.EVM, tx *types.Transaction) { @@ -59,6 +191,20 @@ func (p *Printer) CaptureTxStart(env *vm.EVM, tx *types.Transaction) { } fmt.Printf("CaptureTxStart: tx=%s\n", buf) + if p.feed { + message := map[string]interface{}{ + "event": "CaptureTxStart", + "tx": tx.Hash().Hex(), + } + + data, err := json.Marshal(message) + if err != nil { + fmt.Printf("Failed to marshal json: %v\n", err) + return + } + + p.triggerEvent <- json.RawMessage(data) + } } func (p *Printer) CaptureTxEnd(receipt *types.Receipt) { @@ -68,34 +214,166 @@ func (p *Printer) CaptureTxEnd(receipt *types.Receipt) { return } fmt.Printf("CaptureTxEnd: receipt=%s\n", buf) + + if p.feed { + message := map[string]interface{}{ + "event": "CaptureTxEnd", + "receipt": receipt.TxHash.Hex(), + } + + data, err := json.Marshal(message) + if err != nil { + fmt.Printf("Failed to marshal json: %v\n", err) + return + } + + p.triggerEvent <- json.RawMessage(data) + } } func (p *Printer) OnBlockStart(b *types.Block) { fmt.Printf("OnBlockStart: b=%v\n", b.NumberU64()) + + if p.feed { + message := map[string]interface{}{ + "event": "OnBlockStart", + "blockNumber": b.NumberU64(), + } + + data, err := json.Marshal(message) + if err != nil { + fmt.Printf("Failed to marshal json: %v\n", err) + return + } + + p.triggerEvent <- json.RawMessage(data) + } } func (p *Printer) OnBlockEnd(td *big.Int, err error) { fmt.Printf("OnBlockEnd: td=%v, err=%v\n", td, err) + + if p.feed { + message := map[string]interface{}{ + "event": "OnBlockEnd", + "totalDifficulty": td.String(), + "error": err, + } + + data, err := json.Marshal(message) + if err != nil { + fmt.Printf("Failed to marshal json: %v\n", err) + return + } + + p.triggerEvent <- json.RawMessage(data) + } } func (p *Printer) OnGenesisBlock(b *types.Block) { fmt.Printf("OnGenesisBlock: b=%v\n", b.NumberU64()) + + if p.feed { + message := map[string]interface{}{ + "event": "OnGenesisBlock", + "blockNumber": b.NumberU64(), + } + + data, err := json.Marshal(message) + if err != nil { + fmt.Printf("Failed to marshal json: %v\n", err) + return + } + + p.eventsChan <- json.RawMessage(data) + } } func (p *Printer) OnBalanceChange(a common.Address, prev, new *big.Int) { fmt.Printf("OnBalanceChange: a=%v, prev=%v, new=%v\n", a, prev, new) + + if p.feed { + message := map[string]interface{}{ + "event": "OnBalanceChange", + "address": a.Hex(), + "prevBalance": prev.String(), + "newBalance": new.String(), + } + + data, err := json.Marshal(message) + if err != nil { + fmt.Printf("Failed to marshal json: %v\n", err) + return + } + + p.eventsChan <- json.RawMessage(data) + } } func (p *Printer) OnNonceChange(a common.Address, prev, new uint64) { fmt.Printf("OnNonceChange: a=%v, prev=%v, new=%v\n", a, prev, new) + + if p.feed { + message := map[string]interface{}{ + "event": "OnNonceChange", + "address": a.Hex(), + "prevNonce": prev, + "newNonce": new, + } + + data, err := json.Marshal(message) + if err != nil { + fmt.Printf("Failed to marshal json: %v\n", err) + return + } + + p.eventsChan <- json.RawMessage(data) + } } func (p *Printer) OnCodeChange(a common.Address, prevCodeHash common.Hash, prev []byte, codeHash common.Hash, code []byte) { - fmt.Printf("OnCodeChange: a=%v, prevCodeHash=%v, prev=%s, codeHash=%v, code=%s\n", a, prevCodeHash, hexutil.Bytes(prev), codeHash, hexutil.Bytes(code)) + fmt.Printf("OnCodeChange: a=%v, prevCodeHash=%v, prev=%v, codeHash=%v, code=%v\n", a, prevCodeHash, hexutil.Bytes(prev), codeHash, code) + + if p.feed { + message := map[string]interface{}{ + "event": "OnCodeChange", + "address": a.Hex(), + "prevCodeHash": prevCodeHash.Hex(), + "prevCode": string(prev), + "codeHash": codeHash.Hex(), + "code": string(code), + } + + data, err := json.Marshal(message) + if err != nil { + fmt.Printf("Failed to marshal json: %v\n", err) + return + } + + p.eventsChan <- json.RawMessage(data) + } } func (p *Printer) OnStorageChange(a common.Address, k, prev, new common.Hash) { fmt.Printf("OnStorageChange: a=%v, k=%v, prev=%v, new=%v\n", a, k, prev, new) + + if p.feed { + message := map[string]interface{}{ + "event": "OnStorageChange", + "address": a.Hex(), + "key": k.Hex(), + "prevHash": prev.Hex(), + "newHash": new.Hex(), + } + + data, err := json.Marshal(message) + if err != nil { + fmt.Printf("Failed to marshal json: %v\n", err) + return + } + + p.eventsChan <- json.RawMessage(data) + } } func (p *Printer) OnLog(l *types.Log) { @@ -105,12 +383,81 @@ func (p *Printer) OnLog(l *types.Log) { return } fmt.Printf("OnLog: l=%s\n", buf) + + if p.feed { + message := map[string]interface{}{ + "event": "OnLog", + "log": l, + } + + data, err := json.Marshal(message) + if err != nil { + fmt.Printf("Failed to marshal json: %v\n", err) + return + } + + p.eventsChan <- json.RawMessage(data) + } } func (p *Printer) OnNewAccount(a common.Address) { fmt.Printf("OnNewAccount: a=%v\n", a) + + if p.feed { + message := map[string]interface{}{ + "event": "OnNewAccount", + "address": a.Hex(), + } + + data, err := json.Marshal(message) + if err != nil { + fmt.Printf("Failed to marshal json: %v\n", err) + return + } + + p.eventsChan <- json.RawMessage(data) + } } func (p *Printer) OnGasConsumed(gas, amount uint64) { fmt.Printf("OnGasConsumed: gas=%v, amount=%v\n", gas, amount) + + if p.feed { + message := map[string]interface{}{ + "event": "OnGasConsumed", + "gas": gas, + "amount": amount, + } + + data, err := json.Marshal(message) + if err != nil { + fmt.Printf("Failed to marshal json: %v\n", err) + return + } + + p.eventsChan <- json.RawMessage(data) + } +} + +// EventLoop receives data from channels, adds them to Trace, +// and sends Trace when the OnBlockEnd event occurs. This function operates +// in a loop and should typically be run in a separate goroutine. + +//TODO: Collecting streaming logs through the event loop and distributing them +//triggered by specific events, such as onBlockEnd. +func (p *Printer) EventLoop(bc *core.BlockChain) { + for { + select { + case data := <-p.triggerEvent: + // Append the triggerEvent to the buffer + p.eventsBuffer = append(p.eventsBuffer, data) + // Send all buffered events + bc.TracersEventsSent(p.eventsBuffer) + // Clear the events buffer + p.eventsBuffer = []json.RawMessage{} + case data := <-p.eventsChan: + // Buffer the events from eventsChan + p.eventsBuffer = append(p.eventsBuffer, data) + } + } }