Skip to content

Commit a475d1a

Browse files
author
Ping Xie
committed
Reintroduce server-initiated wait and remove the REPLICAONLY flag for
`CLUSTER SETSLOT NODE` Signed-off-by: Ping Xie <pingxie@google.com>
1 parent d9110f4 commit a475d1a

7 files changed

Lines changed: 158 additions & 205 deletions

File tree

src/blocked.c

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,8 @@ void unblockClient(client *c, int queue_for_reprocessing) {
186186
c->bstate.btype == BLOCKED_ZSET ||
187187
c->bstate.btype == BLOCKED_STREAM) {
188188
unblockClientWaitingData(c);
189-
} else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF) {
189+
} else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF ||
190+
c->bstate.btype == BLOCKED_WAIT_PREREPL) {
190191
unblockClientWaitingReplicas(c);
191192
} else if (c->bstate.btype == BLOCKED_MODULE) {
192193
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
@@ -202,7 +203,8 @@ void unblockClient(client *c, int queue_for_reprocessing) {
202203

203204
/* Reset the client for a new query, unless the client has pending command to process
204205
* or in case a shutdown operation was canceled and we are still in the processCommand sequence */
205-
if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN) {
206+
if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN &&
207+
c->bstate.btype != BLOCKED_WAIT_PREREPL) {
206208
freeClientOriginalArgv(c);
207209
/* Clients that are not blocked on keys are not reprocessed so we must
208210
* call reqresAppendResponse here (for clients blocked on key,
@@ -240,6 +242,8 @@ void replyToBlockedClientTimedOut(client *c) {
240242
addReplyLongLong(c,replicationCountAOFAcksByOffset(c->bstate.reploffset));
241243
} else if (c->bstate.btype == BLOCKED_MODULE) {
242244
moduleBlockedClientTimedOut(c, 0);
245+
} else if (c->bstate.btype == BLOCKED_WAIT_PREREPL) {
246+
addReplyErrorObject(c, shared.noreplicaserr);
243247
} else {
244248
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
245249
}
@@ -597,23 +601,30 @@ static void handleClientsBlockedOnKey(readyList *rl) {
597601
}
598602
}
599603

600-
/* block a client due to wait command */
601-
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
604+
/* block a client for replica acknowledgement */
605+
void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int blockType, int numlocal) {
602606
c->bstate.timeout = timeout;
603607
c->bstate.reploffset = offset;
604608
c->bstate.numreplicas = numreplicas;
605-
listAddNodeHead(server.clients_waiting_acks,c);
606-
blockClient(c,BLOCKED_WAIT);
609+
c->bstate.numlocal = numlocal;
610+
listAddNodeHead(server.clients_waiting_acks, c);
611+
blockClient(c, blockType);
612+
}
613+
614+
/* block a client due to pre-replication */
615+
void blockForPreReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
616+
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT_PREREPL, 0);
617+
c->flags |= CLIENT_PENDING_COMMAND;
618+
}
619+
620+
/* block a client due to wait command */
621+
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
622+
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT, 0);
607623
}
608624

609625
/* block a client due to waitaof command */
610626
void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas) {
611-
c->bstate.timeout = timeout;
612-
c->bstate.reploffset = offset;
613-
c->bstate.numreplicas = numreplicas;
614-
c->bstate.numlocal = numlocal;
615-
listAddNodeHead(server.clients_waiting_acks,c);
616-
blockClient(c,BLOCKED_WAITAOF);
627+
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAITAOF, numlocal);
617628
}
618629

619630
/* Postpone client from executing a command. For example the server might be busy

src/cluster_legacy.c

Lines changed: 120 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -5229,6 +5229,7 @@ int clusterAddSlot(clusterNode *n, int slot) {
52295229
if (server.cluster->slots[slot]) return C_ERR;
52305230
clusterNodeSetSlotBit(n,slot);
52315231
server.cluster->slots[slot] = n;
5232+
bitmapClearBit(server.cluster->owner_not_claiming_slot, slot);
52325233
return C_OK;
52335234
}
52345235

@@ -6333,8 +6334,10 @@ int clusterCommandSpecial(client *c) {
63336334
/* SETSLOT 10 NODE <node ID> */
63346335
int slot;
63356336
clusterNode *n;
6337+
int replSetSlot = nodeIsMaster(myself);
6338+
int reply = 1;
63366339

6337-
/* Allow primaries to replicate "CLUSTER SETSLOT" */
6340+
/* Allow primaries to replicate "CLUSTER SETSLOT" */
63386341
if (!(c->flags & CLIENT_MASTER) && nodeIsSlave(myself)) {
63396342
addReplyError(c,"Please use SETSLOT only with masters.");
63406343
return 1;
@@ -6358,6 +6361,11 @@ int clusterCommandSpecial(client *c) {
63586361
addReplyError(c,"Target node is not a master");
63596362
return 1;
63606363
}
6364+
serverLog(LL_NOTICE,
6365+
"Migrating slot %d to node %.40s (%.s)",
6366+
slot,
6367+
n->name,
6368+
n->human_nodename);
63616369
server.cluster->migrating_slots_to[slot] = n;
63626370
} else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
63636371
if (server.cluster->slots[slot] == myself) {
@@ -6375,9 +6383,15 @@ int clusterCommandSpecial(client *c) {
63756383
addReplyError(c,"Target node is not a master");
63766384
return 1;
63776385
}
6386+
serverLog(LL_NOTICE,
6387+
"Importing slot %d from node %.40s (%s)",
6388+
slot,
6389+
n->name,
6390+
n->human_nodename);
63786391
server.cluster->importing_slots_from[slot] = n;
63796392
} else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
63806393
/* CLUSTER SETSLOT <SLOT> STABLE */
6394+
serverLog(LL_NOTICE, "Marking slot %d stable", slot);
63816395
server.cluster->importing_slots_from[slot] = NULL;
63826396
server.cluster->migrating_slots_to[slot] = NULL;
63836397
} else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
@@ -6403,154 +6417,127 @@ int clusterCommandSpecial(client *c) {
64036417
}
64046418
}
64056419

6406-
serverLog(LL_NOTICE, "Assigning slot %d to node %.40s (%s) in shard %.40s",
6407-
slot,
6408-
n->name,
6409-
n->human_nodename,
6410-
n->shard_id);
6411-
6412-
/* If this slot is in migrating status but we have no keys
6413-
* for it assigning the slot to another node will clear
6414-
* the migrating status. */
6415-
if (countKeysInSlot(slot) == 0 &&
6416-
server.cluster->migrating_slots_to[slot])
6417-
server.cluster->migrating_slots_to[slot] = NULL;
6418-
6419-
int slot_was_mine = server.cluster->slots[slot] == myself;
6420-
clusterDelSlot(slot);
6421-
clusterAddSlot(n,slot);
6422-
6423-
/* If we are a master left without slots, we should turn into a
6424-
* replica of the new master. */
6425-
if (slot_was_mine &&
6426-
n != myself &&
6427-
myself->numslots == 0 &&
6428-
server.cluster_allow_replica_migration) {
6429-
serverLog(LL_NOTICE,
6430-
"Lost my last slot during slot migration. Reconfiguring myself "
6431-
"as a replica of %.40s (%s) in shard %.40s",
6432-
n->name,
6433-
n->human_nodename,
6434-
n->shard_id);
6435-
clusterSetMaster(n, 1);
6436-
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG |
6437-
CLUSTER_TODO_UPDATE_STATE |
6438-
CLUSTER_TODO_FSYNC_CONFIG);
6439-
}
6440-
6441-
/* If this node or this node's primary was importing this slot,
6442-
* assigning the slot to itself also clears the importing status. */
6443-
if ((n == myself || n == myself->slaveof) &&
6444-
server.cluster->importing_slots_from[slot])
6445-
{
6446-
server.cluster->importing_slots_from[slot] = NULL;
6447-
6448-
/* Only primary broadcasts the updates */
6449-
if (n == myself) {
6450-
/* This slot was manually migrated, set this node configEpoch
6451-
* to a new epoch so that the new version can be propagated
6452-
* by the cluster.
6453-
*
6454-
* Note that if this ever results in a collision with another
6455-
* node getting the same configEpoch, for example because a
6456-
* failover happens at the same time we close the slot, the
6457-
* configEpoch collision resolution will fix it assigning
6458-
* a different epoch to each node. */
6459-
if (clusterBumpConfigEpochWithoutConsensus() == C_OK) {
6460-
serverLog(LL_NOTICE,
6461-
"ConfigEpoch updated after importing slot %d",
6462-
slot);
6463-
}
6464-
/* After importing this slot, let the other nodes know as
6465-
* soon as possible. */
6466-
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
6467-
}
6468-
}
6469-
} else if (!strcasecmp(c->argv[3]->ptr,"node") &&
6470-
c->argc == 6 &&
6471-
!strcasecmp(c->argv[5]->ptr,"replicaonly"))
6472-
{
6473-
/* CLUSTER SETSLOT <SLOT> NODE <NODE ID> REPLICAONLY */
6474-
64756420
/* When finalizing the slot, there is a possibility that the
6476-
* target node B sends a cluster PONG to the source node A
6421+
* destination node B sends cluster PONG to the source node A
64776422
* before SETSLOT has been replicated to B'. If B crashes here,
6478-
* B' will be in an importing state and the slot will have no
6479-
* owner. To help mitigate this issue, we added a new SETSLOT
6480-
* command variant that takes a special marker token called
6481-
* "REPLICAONLY". This command is a no-op on the primary. It
6482-
* simply replicates asynchronously the command without the
6483-
* "REPLICAONLY" marker to the replicas, if there exist any.
6484-
* The caller is expected to wait for this asynchronous
6485-
* replication to complete using the "WAIT" command.
6486-
*
6487-
* With the help of this command, we finalize the slots
6488-
* on the replicas before the primary in the following
6489-
* sequence, where A is the source primary and B is the target
6490-
* primary:
6423+
* B' will be in importing state and the slot will have no owner.
6424+
* To help mitigate this issue, we enforce the following order
6425+
* for slot migration finalization such that the replicas will
6426+
* finalize the slot ownership before this primary:
64916427
*
6492-
* 1. Client C issues SETSLOT n NODE B REPLICAONLY against
6493-
* node B
6494-
* 2. Node B replicates SETSLOT n NODE B to all of its replicas,
6495-
* such as B', B'', etc
6496-
* 3. Client C then issues WAIT <num_replicas> <timeout> for
6497-
* a number of B's replicas of C's choice to complete the
6498-
* finalization
6499-
* 4. On successful WAIT completion, Client C executes SETSLOT
6500-
* n NODE B against node B but without the "REPLICAONLY"
6501-
* marker this time, which completes the slot finalization
6502-
* on node B
6503-
*
6504-
* The following steps can happen in parallel:
6428+
* 1. Client C issues SETSLOT n NODE B against node B
6429+
* 2. Node B replicates SETSLOT n NODE B to all of its
6430+
* replicas, such as B', B'', etc
6431+
* 3. On replication completion, node B executes SETSLOT
6432+
* n NODE B and returns control back to client C
6433+
* 4. The following steps can happen in parallel
65056434
* a. Client C issues SETSLOT n NODE B against node A
6506-
* b. Node B gossips its new slot ownership to the cluster,
6507-
* including A, A', etc */
6435+
* b. node B gossips its new slot ownership to the cluster
6436+
* including A, A', etc
6437+
*
6438+
* Where A is the source primary and B is the destination primary. */
6439+
int replDone = (c->flags & CLIENT_INTERNAL_PREREPL_DONE) != 0;
6440+
int syncReplRequired = (server.cluster->importing_slots_from[slot] != NULL) &&
6441+
(nodeIsMaster(myself) != 0) &&
6442+
(myself->numslaves != 0);
6443+
6444+
6445+
/* Slot states must be updated on replicas first before being
6446+
* applied to the primary. This is controlled via the retry
6447+
* flag */
6448+
if (replDone || !syncReplRequired) {
6449+
serverLog(LL_NOTICE, "Assigning slot %d to node %.40s (%s) in shard %.40s",
6450+
slot,
6451+
n->name,
6452+
n->human_nodename,
6453+
n->shard_id);
65086454

6509-
n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr));
6455+
/* If this slot is in migrating status but we have no keys
6456+
* for it assigning the slot to another node will clear
6457+
* the migrating status. */
6458+
if (countKeysInSlot(slot) == 0 &&
6459+
server.cluster->migrating_slots_to[slot])
6460+
server.cluster->migrating_slots_to[slot] = NULL;
6461+
6462+
int slot_was_mine = server.cluster->slots[slot] == myself;
6463+
clusterDelSlot(slot);
6464+
clusterAddSlot(n, slot);
6465+
6466+
/* If we are a master left without slots, we should turn into a
6467+
* replica of the new master. */
6468+
if (slot_was_mine &&
6469+
n != myself &&
6470+
myself->numslots == 0 &&
6471+
server.cluster_allow_replica_migration) {
6472+
serverLog(LL_NOTICE,
6473+
"Lost my last slot during slot migration. Reconfiguring myself "
6474+
"as a replica of %.40s (%s) in shard %.40s",
6475+
n->name,
6476+
n->human_nodename,
6477+
n->shard_id);
6478+
clusterSetMaster(n, 1);
6479+
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG |
6480+
CLUSTER_TODO_UPDATE_STATE |
6481+
CLUSTER_TODO_FSYNC_CONFIG);
6482+
}
65106483

6511-
if (!n) {
6512-
addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[4]->ptr);
6513-
return 1;
6514-
}
6515-
if (nodeIsSlave(n)) {
6516-
addReplyError(c,"Target node is not a master");
6517-
return 1;
6518-
}
6519-
/* If this hash slot was served by 'myself' before to switch
6520-
* make sure there are no longer local keys for this hash slot. */
6521-
if (server.cluster->slots[slot] == myself && n != myself) {
6522-
if (countKeysInSlot(slot) != 0) {
6523-
addReplyErrorFormat(c,
6524-
"Can't assign hashslot %d to a different node "
6525-
"while I still hold keys for this hash slot.", slot);
6526-
return 1;
6484+
/* If this node or this node's primary was importing this slot,
6485+
* assigning the slot to itself also clears the importing status. */
6486+
if ((n == myself || n == myself->slaveof) &&
6487+
server.cluster->importing_slots_from[slot])
6488+
{
6489+
server.cluster->importing_slots_from[slot] = NULL;
6490+
6491+
/* Only primary broadcasts the updates */
6492+
if (n == myself) {
6493+
/* This slot was manually migrated, set this node configEpoch
6494+
* to a new epoch so that the new version can be propagated
6495+
* by the cluster.
6496+
*
6497+
* Note that if this ever results in a collision with another
6498+
* node getting the same configEpoch, for example because a
6499+
* failover happens at the same time we close the slot, the
6500+
* configEpoch collision resolution will fix it assigning
6501+
* a different epoch to each node. */
6502+
if (clusterBumpConfigEpochWithoutConsensus() == C_OK) {
6503+
serverLog(LL_NOTICE,
6504+
"ConfigEpoch updated after importing slot %d",
6505+
slot);
6506+
}
6507+
/* After importing this slot, let the other nodes know as
6508+
* soon as possible. */
6509+
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
6510+
}
65276511
}
6528-
}
6529-
if (server.cluster->importing_slots_from[slot] == NULL) {
6530-
addReplyError(c,"Slot is not open for importing");
6531-
return 1;
6532-
}
6533-
if (myself->numslaves == 0) {
6534-
addReplyError(c,"Target node has no replicas");
6535-
return 1;
6536-
}
65376512

6538-
/* Remove the last "REPLICAONLY" token so the command
6539-
* can be applied as the real "SETSLOT" command on the
6540-
* replicas. */
6541-
serverAssert(c->argc == 6);
6542-
rewriteClientCommandVector(c, 5, c->argv[0], c->argv[1], c->argv[2], c->argv[3], c->argv[4]);
6513+
/* Don't replicate again if setslot has been replicated */
6514+
replSetSlot &= !replDone;
6515+
} else {
6516+
/* We are a primary and this is the first time we see this "setslot"
6517+
* command. Force-replicate the setslot command to all of our replicas
6518+
* first and only on success will we handle the command.
6519+
* Note that
6520+
* 1. All replicas are expected to ack the replication within 1000ms
6521+
* 2. The repl offset target is set to the master's current repl offset + 1.
6522+
* There is no concern of partial replication because replicas always
6523+
* ack the repl offset at the command boundary. */
6524+
blockForPreReplication(c, mstime()+1000, server.master_repl_offset+1, myself->numslaves);
6525+
replicationRequestAckFromSlaves();
6526+
6527+
/* Don't reply to the client yet */
6528+
reply = 0;
6529+
}
65436530
} else {
65446531
addReplyError(c,
65456532
"Invalid CLUSTER SETSLOT action or number of arguments. Try CLUSTER HELP");
65466533
return 1;
65476534
}
65486535

65496536
/* Force-replicate "CLUSTER SETSLOT" */
6550-
if (nodeIsMaster(myself)) forceCommandPropagation(c, PROPAGATE_REPL);
6537+
if (replSetSlot) forceCommandPropagation(c, PROPAGATE_REPL);
65516538

65526539
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
6553-
addReply(c,shared.ok);
6540+
if (reply) addReply(c,shared.ok);
65546541
} else if (!strcasecmp(c->argv[1]->ptr,"bumpepoch") && c->argc == 2) {
65556542
/* CLUSTER BUMPEPOCH */
65566543
int retval = clusterBumpConfigEpochWithoutConsensus();
@@ -6623,6 +6610,7 @@ int clusterCommandSpecial(client *c) {
66236610

66246611
/* Set the master. */
66256612
clusterSetMaster(n, 1);
6613+
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
66266614
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
66276615
addReply(c,shared.ok);
66286616
} else if (!strcasecmp(c->argv[1]->ptr,"count-failure-reports") &&

src/networking.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2082,7 +2082,7 @@ void resetClient(client *c) {
20822082
c->multibulklen = 0;
20832083
c->bulklen = -1;
20842084
c->slot = -1;
2085-
c->flags &= ~CLIENT_EXECUTING_COMMAND;
2085+
c->flags &= ~(CLIENT_EXECUTING_COMMAND | CLIENT_INTERNAL_PREREPL_DONE);
20862086

20872087
/* Make sure the duration has been recorded to some command. */
20882088
serverAssert(c->duration == 0);

0 commit comments

Comments
 (0)