Skip to content

Commit a7dddec

Browse files
C0rWinGerrit Code Review
authored andcommitted
Merge "FAB-13665 consensus migration: kafka2raft green path #2"
2 parents 2c2f333 + 9002e75 commit a7dddec

File tree

3 files changed

+1085
-0
lines changed

3 files changed

+1085
-0
lines changed
Lines changed: 327 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,327 @@
1+
// Copyright IBM Corp. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package migration
5+
6+
import (
7+
"fmt"
8+
"sync"
9+
10+
"github.com/hyperledger/fabric/common/flogging"
11+
"github.com/hyperledger/fabric/protos/orderer"
12+
)
13+
14+
// Status provides access to the consensus-type migration status of the underlying chain.
15+
// The implementation of this interface knows whether the chain is a system or standard channel.
16+
type Status interface {
17+
fmt.Stringer
18+
19+
// StateContext returns the consensus-type migration state and context of the underlying chain.
20+
StateContext() (state orderer.ConsensusType_MigrationState, context uint64)
21+
22+
// SetStateContext sets the consensus-type migration state and context of the underlying chain.
23+
SetStateContext(state orderer.ConsensusType_MigrationState, context uint64)
24+
25+
// IsPending returns true if consensus-type migration is pending on the underlying chain.
26+
// The definition of "pending" differs between the system and standard channels.
27+
// Returns true when: START on system channel, START or CONTEXT on standard channel.
28+
IsPending() bool
29+
30+
// IsCommitted returns true if consensus-type migration is committed on the underlying chain.
31+
// The definition of "committed" differs between the system and standard channels.
32+
// Returns true when: COMMIT on system channel; always false on standard channel.
33+
IsCommitted() bool
34+
}
35+
36+
// Stepper allows the underlying chain to execute the migration state machine.
37+
type Stepper interface {
38+
// Step evaluates the migration state machine of a particular chain. It returns whether the block should be
39+
// committed to the ledger or dropped (commitBlock), and whether the bootstrap file (a.k.a. genesis block)
40+
// should be replaced (commitMigration).
41+
Step(
42+
chainID string,
43+
nextConsensusType string,
44+
nextMigState orderer.ConsensusType_MigrationState,
45+
nextMigContext uint64,
46+
lastCutBlockNumber uint64,
47+
migrationController Controller,
48+
) (commitBlock bool, commitMigration bool)
49+
}
50+
51+
// StatusStepper is a composition of the Status and Stepper interfaces.
52+
type StatusStepper interface {
53+
Status
54+
Stepper
55+
}
56+
57+
//go:generate counterfeiter -o mocks/consensus_migration_controller.go . Controller
58+
59+
// Controller defines methods for controlling and coordinating the process of consensus-type migration.
60+
// It is implemented by the Registrar and is used by the system and standard chains.
61+
type Controller interface {
62+
// ConsensusMigrationPending checks whether consensus-type migration had started,
63+
// by inspecting the status of the system channel.
64+
ConsensusMigrationPending() bool
65+
66+
// ConsensusMigrationStart marks every standard channel as "START" with the given context.
67+
// It should first check that consensus-type migration is not pending on any of the standard channels.
68+
// This call is always triggered by a MigrationState="START" config update on the system channel.
69+
// The context is the height of the system channel config block that carries said config update.
70+
ConsensusMigrationStart(context uint64) (err error)
71+
72+
// ConsensusMigrationCommit verifies that the conditions for committing the consensus-type migration
73+
// are met, and if so, marks the system channel as committed.
74+
// The conditions are:
75+
// 1. system channel mast be at START with context >0;
76+
// 2. all standard channels must be at START with the same context as the system channel.
77+
ConsensusMigrationCommit() (err error)
78+
79+
// ConsensusMigrationAbort verifies that the conditions for aborting the consensus-type migration
80+
// are met, and if so, marks the system channel as aborted.
81+
// The conditions are:
82+
// 1. system channel mast be at START
83+
// 2. all standard channels must be at START or CONTEXT
84+
ConsensusMigrationAbort() (err error)
85+
}
86+
87+
// StatusImpl is an implementation of the StatusStepper interface,
88+
// which provides access to the consensus-type migration status of the underlying chain.
89+
// The methods that accept objects of this type are thread-safe.
90+
type StatusImpl struct {
91+
// mutex protects state and context.
92+
mutex sync.Mutex
93+
// state must be accessed with mutex locked.
94+
state orderer.ConsensusType_MigrationState
95+
// context must be accessed with mutex locked.
96+
context uint64
97+
98+
// systemChannel does not need to be protected by mutex since it is immutable after creation.
99+
systemChannel bool
100+
101+
logger *flogging.FabricLogger
102+
}
103+
104+
// NewStatusStepper generates a new StatusStepper implementation.
105+
func NewStatusStepper(sysChan bool, chainID string) StatusStepper {
106+
return &StatusImpl{
107+
systemChannel: sysChan,
108+
logger: flogging.MustGetLogger("orderer.consensus.migration").With("channel", chainID),
109+
}
110+
}
111+
112+
// StateContext returns the consensus-type migration state and context.
113+
func (ms *StatusImpl) StateContext() (state orderer.ConsensusType_MigrationState, context uint64) {
114+
ms.mutex.Lock()
115+
defer ms.mutex.Unlock()
116+
return ms.state, ms.context
117+
}
118+
119+
// SetStateContext sets the consensus-type migration state and context.
120+
func (ms *StatusImpl) SetStateContext(state orderer.ConsensusType_MigrationState, context uint64) {
121+
ms.mutex.Lock()
122+
defer ms.mutex.Unlock()
123+
ms.state = state
124+
ms.context = context
125+
}
126+
127+
// IsPending returns true if migration is pending.
128+
func (ms *StatusImpl) IsPending() bool {
129+
ms.mutex.Lock()
130+
defer ms.mutex.Unlock()
131+
132+
if ms.systemChannel {
133+
return ms.state == orderer.ConsensusType_MIG_STATE_START
134+
}
135+
136+
return ms.state == orderer.ConsensusType_MIG_STATE_START || ms.state == orderer.ConsensusType_MIG_STATE_CONTEXT
137+
}
138+
139+
// IsCommitted returns true if migration is committed.
140+
func (ms *StatusImpl) IsCommitted() bool {
141+
ms.mutex.Lock()
142+
defer ms.mutex.Unlock()
143+
144+
if ms.systemChannel {
145+
return ms.state == orderer.ConsensusType_MIG_STATE_COMMIT
146+
}
147+
148+
return false
149+
}
150+
151+
// String returns a text representation.
152+
func (ms *StatusImpl) String() string {
153+
ms.mutex.Lock()
154+
defer ms.mutex.Unlock()
155+
156+
return fmt.Sprintf("State=%s, Context=%d, Sys=%t", ms.state, ms.context, ms.systemChannel)
157+
}
158+
159+
// Step evaluates the migration state machine of a particular chain. It returns whether
160+
// the block should be committed to the ledger or dropped (commitBlock), and whether the bootstrap file
161+
// (a.k.a. genesis block) should be replaced (commitMigration).
162+
//
163+
// When we get a message, we check whether it is a permitted transition of the state machine, and whether the
164+
// parameters are correct. If it is a valid transition, we return commitBlock=true, which will cause the caller to
165+
// commit the block to the ledger.
166+
//
167+
// When we get a message that is a COMMIT, which is the final step of migration (this can only happen on the system
168+
// channel), we also return commitMigration=true, which will cause the caller to replace the bootstrap file
169+
// (genesis block), as well as commit the block to the ledger.
170+
//
171+
// Note: the method may call the multichannel.Registrar (migrationController). The Registrar takes a mutex, and then
172+
// calls individual migration.Status objects (.i.e. the lock of the migration.Status mutex is nested within the lock of
173+
// Registrar mutex). In order to avoid deadlocks, here we only call the Registrar (migrationController) when the
174+
// internal mutex in NOT taken.
175+
func (ms *StatusImpl) Step(
176+
chainID string,
177+
nextConsensusType string,
178+
nextMigState orderer.ConsensusType_MigrationState,
179+
nextMigContext uint64,
180+
lastCutBlockNumber uint64,
181+
migrationController Controller,
182+
) (commitBlock bool, commitMigration bool) {
183+
184+
ms.logger.Debugf("Consensus-type migration: Config tx; Current status: %s; Input TX: Type=%s, State=%s, Ctx=%d; lastBlock=%d",
185+
ms, nextConsensusType, nextMigState, nextMigContext, lastCutBlockNumber)
186+
187+
if ms.systemChannel {
188+
commitBlock, commitMigration = ms.stepSystem(
189+
nextConsensusType, nextMigState, nextMigContext, lastCutBlockNumber, migrationController)
190+
} else {
191+
commitBlock = ms.stepStandard(
192+
nextConsensusType, nextMigState, nextMigContext, migrationController)
193+
}
194+
195+
return commitBlock, commitMigration
196+
}
197+
198+
func (ms *StatusImpl) stepSystem(
199+
nextConsensusType string,
200+
nextMigState orderer.ConsensusType_MigrationState,
201+
nextMigContext uint64,
202+
lastCutBlockNumber uint64,
203+
migrationController Controller,
204+
) (commitBlock bool, commitMigration bool) {
205+
206+
unexpectedTransitionResponse := func(from, to orderer.ConsensusType_MigrationState) {
207+
ms.logger.Debugf("Consensus-type migration: Dropping config tx because: unexpected consensus-type migration state transition: %s to %s", from, to)
208+
commitBlock = false
209+
commitMigration = false
210+
}
211+
212+
currState, currContext := ms.StateContext()
213+
214+
switch currState {
215+
case orderer.ConsensusType_MIG_STATE_START:
216+
//=== Migration is pending, expect COMMIT or ABORT ===
217+
switch nextMigState {
218+
case orderer.ConsensusType_MIG_STATE_COMMIT:
219+
if currContext == nextMigContext {
220+
err := migrationController.ConsensusMigrationCommit()
221+
if err != nil {
222+
ms.logger.Warningf("Consensus-type migration: Reject Config tx on system channel, migrationCommit failed; error=%s", err)
223+
} else {
224+
commitBlock = true
225+
commitMigration = true
226+
}
227+
} else {
228+
ms.logger.Warningf("Consensus-type migration: Reject Config tx on system channel; %s to %s, because of bad context:(tx=%d/exp=%d)",
229+
currState, nextMigState, nextMigContext, currContext)
230+
}
231+
case orderer.ConsensusType_MIG_STATE_ABORT:
232+
ms.logger.Panicf("Consensus-type migration: Not implemented yet, transition %s to %s", currState, nextMigState)
233+
//TODO implement abort path
234+
default:
235+
unexpectedTransitionResponse(currState, nextMigState)
236+
}
237+
238+
case orderer.ConsensusType_MIG_STATE_COMMIT:
239+
//=== Migration committed, nothing left to do ===
240+
ms.logger.Debug("Consensus-type migration: Config tx on system channel, migration already committed, nothing left to do, dropping;")
241+
242+
case orderer.ConsensusType_MIG_STATE_ABORT, orderer.ConsensusType_MIG_STATE_NONE:
243+
//=== Migration is NOT pending, expect NONE or START ===
244+
switch nextMigState {
245+
case orderer.ConsensusType_MIG_STATE_START:
246+
err := migrationController.ConsensusMigrationStart(lastCutBlockNumber + 1)
247+
if err != nil {
248+
ms.logger.Warningf("Consensus-type migration: Reject Config tx on system channel, migrationStart failed; error=%s", err)
249+
} else {
250+
ms.logger.Infof("Consensus-type migration: started; Status: %s", ms)
251+
commitBlock = true
252+
}
253+
case orderer.ConsensusType_MIG_STATE_NONE:
254+
commitBlock = true
255+
default:
256+
unexpectedTransitionResponse(currState, nextMigState)
257+
}
258+
259+
default:
260+
ms.logger.Panicf("Consensus-type migration: Unexpected status, probably a bug; Current: %s; Input TX: State=%s, Context=%d, nextConsensusType=%s",
261+
ms, nextMigState, nextMigContext, nextConsensusType)
262+
}
263+
264+
return commitBlock, commitMigration
265+
}
266+
267+
func (ms *StatusImpl) stepStandard(
268+
nextConsensusType string,
269+
nextMigState orderer.ConsensusType_MigrationState,
270+
nextMigContext uint64,
271+
migrationController Controller,
272+
) (commitBlock bool) {
273+
274+
unexpectedTransitionResponse := func(from, to orderer.ConsensusType_MigrationState) {
275+
ms.logger.Debugf("Consensus-type migration: Dropping config tx because: unexpected consensus-type migration state transition: %s to %s", from, to)
276+
commitBlock = false
277+
}
278+
279+
currState, currContext := ms.StateContext()
280+
281+
switch currState {
282+
case orderer.ConsensusType_MIG_STATE_START:
283+
//=== Migration is pending (START is set by system channel, via migrationController, not message), expect CONTEXT or ABORT ===
284+
switch nextMigState {
285+
case orderer.ConsensusType_MIG_STATE_CONTEXT:
286+
if migrationController.ConsensusMigrationPending() && //On the system channel
287+
(nextMigContext == currContext) {
288+
ms.SetStateContext(nextMigState, nextMigContext)
289+
ms.logger.Infof("Consensus-type migration: context accepted; Status: %s", ms)
290+
commitBlock = true
291+
} else {
292+
ms.logger.Warningf("Consensus-type migration: context rejected; migrationPending=%v, context:(tx=%d/exp=%d)",
293+
migrationController.ConsensusMigrationPending(), nextMigContext, currContext)
294+
}
295+
case orderer.ConsensusType_MIG_STATE_ABORT:
296+
ms.logger.Panicf("Consensus-type migration: Not implemented yet, transition %s to %s", currState, nextMigState)
297+
//TODO implement abort path
298+
default:
299+
unexpectedTransitionResponse(currState, nextMigState)
300+
}
301+
302+
case orderer.ConsensusType_MIG_STATE_NONE, orderer.ConsensusType_MIG_STATE_ABORT:
303+
//=== Migration not started or aborted, expect NONE (START is set by system channel, not message)
304+
switch nextMigState {
305+
case orderer.ConsensusType_MIG_STATE_NONE:
306+
commitBlock = true
307+
default:
308+
unexpectedTransitionResponse(currState, nextMigState)
309+
}
310+
311+
case orderer.ConsensusType_MIG_STATE_CONTEXT:
312+
//=== Migration pending, expect ABORT, or nothing else to do (restart to Raft)
313+
switch nextMigState {
314+
case orderer.ConsensusType_MIG_STATE_ABORT:
315+
ms.logger.Panicf("Consensus-type migration: Not implemented yet, transition %s to %s", currState, nextMigState)
316+
//TODO implement abort path
317+
default:
318+
unexpectedTransitionResponse(currState, nextMigState)
319+
}
320+
321+
default:
322+
ms.logger.Panicf("Consensus-type migration: Unexpected status, probably a bug; Current: %s; Input TX: State=%s, Context=%d, nextConsensusType=%s",
323+
ms, nextMigState, nextMigContext, nextConsensusType)
324+
}
325+
326+
return commitBlock
327+
}

0 commit comments

Comments
 (0)