Skip to content

Commit 59e47b1

Browse files
committed
add support for signals
1 parent 253f7ba commit 59e47b1

File tree

6 files changed

+79
-14
lines changed

6 files changed

+79
-14
lines changed

cmd/dummy-blockchain/app/app.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type Flags struct {
2626
BlockRate int
2727
BlockSizeInBytes int
2828
ServerAddr string
29+
WithSignal bool
2930
Tracer string
3031
StopHeight uint64
3132

@@ -50,7 +51,6 @@ func Main(version string) {
5051
logrus.Fatal(err)
5152
}
5253

53-
5454
root.AddCommand(
5555
makeInitCommand(),
5656
makeResetCommand(),
@@ -77,6 +77,7 @@ func initFlags(root *cobra.Command) error {
7777
flags.Uint64Var(&cliOpts.StopHeight, "stop-height", 0, "Stop block production at this height")
7878
flags.StringVar(&cliOpts.ServerAddr, "server-addr", "0.0.0.0:8080", "Server address")
7979
flags.StringVar(&cliOpts.Tracer, "tracer", "", "The tracer to use, either <empty>, none or firehose")
80+
flags.BoolVar(&cliOpts.WithSignal, "with-signal", false, "whether we produce BlockCommitmentLevel signals on top of blocks")
8081

8182
return nil
8283
}
@@ -166,6 +167,7 @@ func makeStartComand() *cobra.Command {
166167
cliOpts.StopHeight,
167168
cliOpts.ServerAddr,
168169
blockTracer,
170+
cliOpts.WithSignal,
169171
)
170172

171173
if err := node.Initialize(); err != nil {

core/engine.go

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type Engine struct {
1919
blockSizeInBytes int
2020
blockRate time.Duration
2121
blockChan chan *types.Block
22+
signalChan chan *types.Signal
2223
prevBlock *types.Block
2324
finalBlock *types.Block
2425
}
@@ -35,6 +36,7 @@ func NewEngine(genesisHash string, genesisHeight uint64, genesisTime time.Time,
3536
blockRate: blockRate,
3637
blockSizeInBytes: blockSizeInBytes,
3738
blockChan: make(chan *types.Block),
39+
signalChan: make(chan *types.Signal),
3840
}
3941
}
4042

@@ -49,7 +51,7 @@ func (e *Engine) Initialize(prevBlock *types.Block, finalBlock *types.Block) err
4951
return nil
5052
}
5153

52-
func (e *Engine) StartBlockProduction(ctx context.Context) {
54+
func (e *Engine) StartBlockProduction(ctx context.Context, withSignal bool) {
5355
ticker := time.NewTicker(e.blockRate)
5456

5557
logrus.WithField("rate", e.blockRate).Info("starting block producer")
@@ -60,18 +62,47 @@ func (e *Engine) StartBlockProduction(ctx context.Context) {
6062
}
6163
}
6264

65+
var lastBlock *types.Block
66+
var lastSignal *types.Signal
67+
68+
signalTicker := time.NewTicker(e.blockRate)
69+
if withSignal {
70+
<-time.After(e.blockRate / 2) // offset by half duration
71+
signalTicker.Reset(e.blockRate)
72+
} else {
73+
signalTicker.Stop()
74+
}
75+
6376
for {
6477
select {
6578
case <-ticker.C:
6679
for _, block := range e.createBlocks() {
6780
e.blockChan <- block
81+
lastBlock = block
6882

6983
if e.stopHeight > 0 && block.Header.Height >= e.stopHeight {
7084
logrus.Info("reached stop height")
7185
ticker.Stop()
7286
return
7387
}
7488
}
89+
case <-signalTicker.C:
90+
if !withSignal {
91+
continue // just ignore if a signal ticker comes in, but it actually should not be called because of the Stop(), unless there is a crazy race condtition
92+
}
93+
if lastBlock != nil {
94+
if lastSignal != nil && lastSignal.BlockID == lastBlock.Header.Hash {
95+
continue // don't send duplicate signal
96+
}
97+
sig := &types.Signal{
98+
BlockID: lastBlock.Header.Hash,
99+
BlockNumber: lastBlock.Header.Height,
100+
CommitmentLevel: 10,
101+
}
102+
e.signalChan <- sig
103+
lastSignal = sig
104+
}
105+
75106
case <-ctx.Done():
76107
logrus.Info("stopping block producer")
77108
close(e.blockChan)
@@ -80,10 +111,14 @@ func (e *Engine) StartBlockProduction(ctx context.Context) {
80111
}
81112
}
82113

83-
func (e *Engine) Subscription() <-chan *types.Block {
114+
func (e *Engine) SubscribeBlocks() <-chan *types.Block {
84115
return e.blockChan
85116
}
86117

118+
func (e *Engine) SubscribeSignals() <-chan *types.Signal {
119+
return e.signalChan
120+
}
121+
87122
func (e *Engine) createBlocks() (out []*types.Block) {
88123
if e.prevBlock == nil {
89124
genesisBlock := types.GenesisBlock(e.genesisHash, e.genesisHeight, e.genesisTime)

core/node.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@ import (
1212
)
1313

1414
type Node struct {
15-
engine Engine
16-
server Server
17-
store *Store
18-
tracer tracer.Tracer
15+
engine Engine
16+
server Server
17+
store *Store
18+
tracer tracer.Tracer
19+
withSignal bool
1920
}
2021

2122
func NewNode(
@@ -29,14 +30,16 @@ func NewNode(
2930
stopHeight uint64,
3031
serverAddr string,
3132
tracer tracer.Tracer,
33+
withSignal bool,
3234
) *Node {
3335
store := NewStore(storeDir, genesisHash, genesisHeight, genesisTime)
3436

3537
return &Node{
36-
engine: NewEngine(genesisHash, genesisHeight, genesisTime, genesisBlockBurst, stopHeight, blockRate, blockSizeInBytes),
37-
store: store,
38-
server: NewServer(store, serverAddr),
39-
tracer: tracer,
38+
engine: NewEngine(genesisHash, genesisHeight, genesisTime, genesisBlockBurst, stopHeight, blockRate, blockSizeInBytes),
39+
store: store,
40+
server: NewServer(store, serverAddr),
41+
tracer: tracer,
42+
withSignal: withSignal,
4043
}
4144
}
4245

@@ -97,11 +100,11 @@ func (node *Node) Initialize() error {
97100

98101
func (node *Node) Start(ctx context.Context) error {
99102
go node.server.Start() // TODO: handle error here
100-
go node.engine.StartBlockProduction(ctx)
103+
go node.engine.StartBlockProduction(ctx, node.withSignal)
101104

102105
for {
103106
select {
104-
case block, ok := <-node.engine.Subscription():
107+
case block, ok := <-node.engine.SubscribeBlocks():
105108
if !ok {
106109
return nil
107110
}
@@ -127,6 +130,15 @@ func (node *Node) Start(ctx context.Context) error {
127130
tracer.OnBlockEnd(block, node.engine.finalBlock.Header)
128131
}
129132

133+
case sig, ok := <-node.engine.SubscribeSignals():
134+
if !ok {
135+
return nil
136+
}
137+
138+
if tracer := node.tracer; tracer != nil {
139+
tracer.OnCommitmentSignal(sig)
140+
}
141+
130142
case <-ctx.Done():
131143
return nil
132144
}

tracer/firehose_tracer.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,20 @@ func (t *FirehoseTracer) OnBlockStart(header *types.BlockHeader) {
7575
},
7676
}
7777

78-
if header.PrevHash != nil {
78+
if header.PrevHash != nil && header.PrevNum != nil {
7979
t.activeBlock.Header.PreviousNum = header.PrevNum
8080
t.activeBlock.Header.PreviousHash = header.PrevHash
8181
}
8282
}
8383

84+
func (t *FirehoseTracer) OnCommitmentSignal(sig *types.Signal) {
85+
fmt.Printf("FIRE SIGNAL 1 %d %s %d\n",
86+
sig.BlockNumber,
87+
sig.BlockID,
88+
sig.CommitmentLevel,
89+
)
90+
}
91+
8492
// OnTrxStart implements Tracer.
8593
func (t *FirehoseTracer) OnTrxStart(trx *types.Transaction) {
8694
if t.activeTrx != nil {

tracer/tracer.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ type Tracer interface {
99

1010
OnBlockStart(header *types.BlockHeader)
1111

12+
OnCommitmentSignal(sig *types.Signal)
13+
1214
OnTrxStart(trx *types.Transaction)
1315

1416
OnTrxEvent(trxHash string, event *types.Event)

types/types.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ type Block struct {
5252
Transactions []Transaction `json:"transactions"`
5353
}
5454

55+
type Signal struct {
56+
BlockID string
57+
BlockNumber uint64
58+
CommitmentLevel int32
59+
}
60+
5561
// ApproximatedSize computes an approximation of how big the block would be
5662
// once converted into a Protobuf Block model
5763
func (b *Block) ApproximatedSize() int {

0 commit comments

Comments
 (0)