Skip to content

consensus/istanbul: add block lock #127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions consensus/istanbul/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package istanbul

import (
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -54,4 +55,13 @@ type Backend interface {
// CheckSignature verifies the signature by checking if it's signed by
// the given validator
CheckSignature(data []byte, addr common.Address, sig []byte) error

// HasBlock checks if the combination of the given hash and height matches any existing blocks
HasBlock(hash common.Hash, number *big.Int) bool

// GetProposer returns the proposer of the given block height
GetProposer(number uint64) common.Address

// ParentValidators returns the validator set of the given proposal's parent block
ParentValidators(proposal Proposal) ValidatorSet
}
37 changes: 32 additions & 5 deletions consensus/istanbul/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package backend

import (
"crypto/ecdsa"
"math/big"
"sync"
"time"

Expand Down Expand Up @@ -92,11 +93,7 @@ func (sb *backend) Address() common.Address {

// Validators implements istanbul.Backend.Validators
func (sb *backend) Validators(proposal istanbul.Proposal) istanbul.ValidatorSet {
snap, err := sb.snapshot(sb.chain, proposal.Number().Uint64(), proposal.Hash(), nil)
if err != nil {
return validator.NewSet(nil, sb.config.ProposerPolicy)
}
return snap.ValSet
return sb.getValidators(proposal.Number().Uint64(), proposal.Hash())
}

// Broadcast implements istanbul.Backend.Send
Expand Down Expand Up @@ -220,3 +217,33 @@ func (sb *backend) CheckSignature(data []byte, address common.Address, sig []byt
}
return nil
}

// HasBlock implements istanbul.Backend.HashBlock
func (sb *backend) HasBlock(hash common.Hash, number *big.Int) bool {
return sb.chain.GetHeader(hash, number.Uint64()) != nil
}

// GetProposer implements istanbul.Backend.GetProposer
func (sb *backend) GetProposer(number uint64) common.Address {
if h := sb.chain.GetHeaderByNumber(number); h != nil {
a, _ := sb.Author(h)
return a
}
return common.Address{}
}

// ParentValidators implements istanbul.Backend.GetParentValidators
func (sb *backend) ParentValidators(proposal istanbul.Proposal) istanbul.ValidatorSet {
if block, ok := proposal.(*types.Block); ok {
return sb.getValidators(block.Number().Uint64()-1, block.ParentHash())
}
return validator.NewSet(nil, sb.config.ProposerPolicy)
}

func (sb *backend) getValidators(number uint64, hash common.Hash) istanbul.ValidatorSet {
snap, err := sb.snapshot(sb.chain, number, hash, nil)
if err != nil {
return validator.NewSet(nil, sb.config.ProposerPolicy)
}
return snap.ValSet
}
38 changes: 38 additions & 0 deletions consensus/istanbul/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,44 @@ func TestCommit(t *testing.T) {
}
}

func TestHasBlock(t *testing.T) {
chain, engine := newBlockChain(1)
block := makeBlockWithoutSeal(chain, engine, chain.Genesis())
finalBlock, _ := engine.Seal(chain, block, nil)
chain.InsertChain(types.Blocks{finalBlock})
if engine.HasBlock(block.Hash(), finalBlock.Number()) {
t.Errorf("error mismatch: have true, want false")
}
if !engine.HasBlock(finalBlock.Hash(), finalBlock.Number()) {
t.Errorf("error mismatch: have false, want true")
}
}

func TestGetProposer(t *testing.T) {
chain, engine := newBlockChain(1)
block := makeBlock(chain, engine, chain.Genesis())
chain.InsertChain(types.Blocks{block})
expected := engine.GetProposer(1)
actual := engine.Address()
if actual != expected {
t.Errorf("proposer mismatch: have %v, want %v", actual.Hex(), expected.Hex())
}
}

func TestParentValidators(t *testing.T) {
chain, engine := newBlockChain(1)
block := makeBlock(chain, engine, chain.Genesis())
chain.InsertChain(types.Blocks{block})
expected := engine.Validators(block).List()
//Block without seal will make empty validator set
block = makeBlockWithoutSeal(chain, engine, block)
chain.InsertChain(types.Blocks{block})
actual := engine.ParentValidators(block).List()
if len(expected) != len(actual) || expected[0] != actual[0] {
t.Errorf("validator set mismatch: have %v, want %v", actual, expected)
}
}

/**
* SimpleBackend
* Private key: bb047e5940b6d83354d9432db7c449ac8fca2248008aaa7271369880f9f11cc1
Expand Down
6 changes: 3 additions & 3 deletions consensus/istanbul/core/backlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestCheckMessage(t *testing.T) {
current: newRoundState(&istanbul.View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}, newTestValidatorSet(4)),
}, newTestValidatorSet(4), common.Hash{}, nil),
}

// invalid view format
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestProcessFutureBacklog(t *testing.T) {
current: newRoundState(&istanbul.View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}, newTestValidatorSet(4)),
}, newTestValidatorSet(4), common.Hash{}, nil),
state: StateAcceptRequest,
}
c.subscribeEvents()
Expand Down Expand Up @@ -294,7 +294,7 @@ func testProcessBacklog(t *testing.T, msg *message) {
current: newRoundState(&istanbul.View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}, newTestValidatorSet(4)),
}, newTestValidatorSet(4), common.Hash{}, nil),
}
c.subscribeEvents()
defer c.unsubscribeEvents()
Expand Down
17 changes: 16 additions & 1 deletion consensus/istanbul/core/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,26 @@ package core
import (
"reflect"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
)

func (c *core) sendCommit() {
sub := c.current.Subject()
c.broadcastCommit(sub)
}

func (c *core) sendCommitForOldBlock(view *istanbul.View, digest common.Hash) {
sub := &istanbul.Subject{
View: view,
Digest: digest,
}
c.broadcastCommit(sub)
}

func (c *core) broadcastCommit(sub *istanbul.Subject) {
logger := c.logger.New("state", c.state)

sub := c.current.Subject()
encodedSubject, err := Encode(sub)
if err != nil {
logger.Error("Failed to encode", "subject", sub)
Expand Down Expand Up @@ -60,6 +73,8 @@ func (c *core) handleCommit(msg *message, src istanbul.Validator) error {
// If we already have a proposal, we may have chance to speed up the consensus process
// by committing the proposal without prepare messages.
if c.current.Commits.Size() > 2*c.valSet.F() && c.state.Cmp(StateCommitted) < 0 {
// Still need to call LockBlock here since state can skip Prepared state and jump directly to Committed state.
c.current.LockHash()
c.commit()
}

Expand Down
10 changes: 9 additions & 1 deletion consensus/istanbul/core/commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ OUTER:
if err != test.expectedErr {
t.Errorf("error mismatch: have %v, want %v", err, test.expectedErr)
}
if r0.current.IsHashLocked() {
t.Errorf("block should not be locked")
}
continue OUTER
}
}
Expand All @@ -191,7 +194,9 @@ OUTER:
if r0.current.Commits.Size() > 2*r0.valSet.F() {
t.Errorf("the size of commit messages should be less than %v", 2*r0.valSet.F()+1)
}

if r0.current.IsHashLocked() {
t.Errorf("block should not be locked")
}
continue
}

Expand All @@ -214,6 +219,9 @@ OUTER:
if signedCount <= 2*r0.valSet.F() {
t.Errorf("the expected signed count should be larger than %v, but got %v", 2*r0.valSet.F(), signedCount)
}
if !r0.current.IsHashLocked() {
t.Errorf("block should be locked")
}
}
}

Expand Down
27 changes: 24 additions & 3 deletions consensus/istanbul/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func (c *core) commit() {
}

if err := c.backend.Commit(proposal, committedSeals); err != nil {
c.current.UnlockHash() //Unlock block when insertion fails
c.sendNextRoundChange()
return
}
Expand All @@ -187,13 +188,21 @@ func (c *core) startNewRound(newView *istanbul.View, roundChange bool) {
// Clear invalid round change messages
c.roundChangeSet = newRoundChangeSet(c.valSet)
// New snapshot for new round
c.current = newRoundState(newView, c.valSet)
c.updateRoundState(newView, c.valSet, roundChange)
// Calculate new proposer
c.valSet.CalcProposer(c.lastProposer, newView.Round.Uint64())
c.waitingForRoundChange = false
c.setState(StateAcceptRequest)
if roundChange && c.isProposer() {
c.backend.NextRound()
// If it is locked, propose the old proposal
if c.current != nil && c.current.IsHashLocked() {
r := &istanbul.Request{
Proposal: c.current.Proposal(), //c.current.Proposal would be the locked proposal by previous proposer, see updateRoundState
}
c.sendPreprepare(r)
} else {
c.backend.NextRound()
}
}
c.newRoundChangeTimer()

Expand All @@ -207,13 +216,25 @@ func (c *core) catchUpRound(view *istanbul.View) {
c.roundMeter.Mark(new(big.Int).Sub(view.Round, c.current.Round()).Int64())
}
c.waitingForRoundChange = true
c.current = newRoundState(view, c.valSet)

//Needs to keep block lock for round catching up
c.updateRoundState(view, c.valSet, true)
c.roundChangeSet.Clear(view.Round)
c.newRoundChangeTimer()

logger.Trace("Catch up round", "new_round", view.Round, "new_seq", view.Sequence, "new_proposer", c.valSet)
}

// updateRoundState updates round state by checking if locking block is necessary
func (c *core) updateRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet, roundChange bool) {
// Lock only if both roundChange is true and it is locked
if roundChange && c.current != nil && c.current.IsHashLocked() {
c.current = newRoundState(view, validatorSet, c.current.GetLockedHash(), c.current.Preprepare)
} else {
c.current = newRoundState(view, validatorSet, common.Hash{}, nil)
}
}

func (c *core) setState(state State) {
if c.state != state {
c.state = state
Expand Down
7 changes: 5 additions & 2 deletions consensus/istanbul/core/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,18 @@ func (c *core) handlePrepare(msg *message, src istanbul.Validator) error {
return err
}

// If it is locked, it can only process on the locked block.
// Passing verifyPrepare and checkMessage implies it is processing on the locked block since it was verified in the Preprepare step.
if err := c.verifyPrepare(prepare, src); err != nil {
return err
}

c.acceptPrepare(msg, src)

// Change to StatePrepared if we've received enough prepare messages
// Change to StatePrepared if we've received enough prepare messages or it is locked
// and we are in earlier state before StatePrepared
if c.current.Prepares.Size() > 2*c.valSet.F() && c.state.Cmp(StatePrepared) < 0 {
if (c.current.IsHashLocked() || c.current.Prepares.Size() > 2*c.valSet.F()) && c.state.Cmp(StatePrepared) < 0 {
c.current.LockHash()
c.setState(StatePrepared)
c.sendCommit()
}
Expand Down
9 changes: 9 additions & 0 deletions consensus/istanbul/core/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ OUTER:
if err != test.expectedErr {
t.Errorf("error mismatch: have %v, want %v", err, test.expectedErr)
}
if r0.current.IsHashLocked() {
t.Errorf("block should not be locked")
}
continue OUTER
}
}
Expand All @@ -214,6 +217,9 @@ OUTER:
if r0.current.Prepares.Size() > 2*r0.valSet.F() {
t.Errorf("the size of prepare messages should be less than %v", 2*r0.valSet.F()+1)
}
if r0.current.IsHashLocked() {
t.Errorf("block should not be locked")
}

continue
}
Expand Down Expand Up @@ -246,6 +252,9 @@ OUTER:
if !reflect.DeepEqual(m, expectedSubject) {
t.Errorf("subject mismatch: have %v, want %v", m, expectedSubject)
}
if !r0.current.IsHashLocked() {
t.Errorf("block should be locked")
}
}
}

Expand Down
37 changes: 34 additions & 3 deletions consensus/istanbul/core/preprepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,21 @@ func (c *core) handlePreprepare(msg *message, src istanbul.Validator) error {
}

// Ensure we have the same view with the preprepare message
// If it is old message, see if we need to broadcast COMMIT
if err := c.checkMessage(msgPreprepare, preprepare.View); err != nil {
if err == errOldMessage {
// Get validator set for the given proposal
valSet := c.backend.ParentValidators(preprepare.Proposal).Copy()
previousProposer := c.backend.GetProposer(preprepare.Proposal.Number().Uint64() - 1)
valSet.CalcProposer(previousProposer, preprepare.View.Round.Uint64())
// Broadcast COMMIT if it is an existing block
// 1. The proposer needs to be a proposer matches the given (Sequence + Round)
// 2. The given block must exist
if valSet.IsProposer(src.Address()) && c.backend.HasBlock(preprepare.Proposal.Hash(), preprepare.Proposal.Number()) {
c.sendCommitForOldBlock(preprepare.View, preprepare.Proposal.Hash())
return nil
}
}
return err
}

Expand Down Expand Up @@ -84,10 +98,27 @@ func (c *core) handlePreprepare(msg *message, src istanbul.Validator) error {
return err
}

// Here is about to accept the preprepare
if c.state == StateAcceptRequest {
c.acceptPreprepare(preprepare)
c.setState(StatePreprepared)
c.sendPrepare()
// If it is locked, it can only process on the locked block
// Otherwise, broadcast PREPARE and enter Prepared state
if c.current.IsHashLocked() {
// Broadcast COMMIT directly if the proposal matches the locked block
// Otherwise, send ROUND CHANGE
if preprepare.Proposal.Hash() == c.current.GetLockedHash() {
// Broadcast COMMIT and enters Prepared state directly
c.acceptPreprepare(preprepare)
c.setState(StatePrepared)
c.sendCommit()
} else {
// Send round change
c.sendNextRoundChange()
}
} else {
c.acceptPreprepare(preprepare)
c.setState(StatePreprepared)
c.sendPrepare()
}
}

return nil
Expand Down
Loading