Skip to content

Commit 620fd99

Browse files
author
Ping Xie
committed
Slot migration improvement
Signed-off-by: Ping Xie <pingxie@google.com>
1 parent ba0c93c commit 620fd99

17 files changed

Lines changed: 907 additions & 148 deletions

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,4 @@ Makefile.dep
4141
compile_commands.json
4242
redis.code-workspace
4343
.cache
44+
.cscope.*

src/cluster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ char *clusterNodeHostname(clusterNode *node);
103103
const char *clusterNodePreferredEndpoint(clusterNode *n);
104104
long long clusterNodeReplOffset(clusterNode *node);
105105
clusterNode *clusterLookupNode(const char *name, int length);
106+
void clusterReplicateOpenSlots(void);
106107

107108
/* functions with shared implementations */
108109
clusterNode *getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);

src/cluster_legacy.c

Lines changed: 496 additions & 117 deletions
Large diffs are not rendered by default.

src/cluster_legacy.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ typedef struct clusterLink {
5353
#define CLUSTER_NODE_NOFAILOVER 512 /* Slave will not try to failover. */
5454
#define CLUSTER_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
5555

56+
#define nodeIsMaster(n) ((n)->flags & CLUSTER_NODE_MASTER)
5657
#define nodeIsSlave(n) ((n)->flags & CLUSTER_NODE_SLAVE)
5758
#define nodeInHandshake(n) ((n)->flags & CLUSTER_NODE_HANDSHAKE)
5859
#define nodeHasAddr(n) (!((n)->flags & CLUSTER_NODE_NOADDR))

src/commands/cluster-setslot.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
"command_flags": [
1111
"NO_ASYNC_LOADING",
1212
"ADMIN",
13-
"STALE"
13+
"STALE",
14+
"MAY_REPLICATE"
1415
],
1516
"arguments": [
1617
{

src/debug.c

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -873,8 +873,7 @@ NULL
873873
server.aof_flush_sleep = atoi(c->argv[2]->ptr);
874874
addReply(c,shared.ok);
875875
} else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc >= 3) {
876-
replicationFeedSlaves(server.slaves, -1,
877-
c->argv + 2, c->argc - 2);
876+
replicationFeedSlaves(-1, c->argv + 2, c->argc - 2);
878877
addReply(c,shared.ok);
879878
} else if (!strcasecmp(c->argv[1]->ptr,"error") && c->argc == 3) {
880879
sds errstr = sdsnewlen("-",1);

src/rdb.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3310,7 +3310,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
33103310
robj *argv[2];
33113311
argv[0] = server.lazyfree_lazy_expire ? shared.unlink : shared.del;
33123312
argv[1] = &keyobj;
3313-
replicationFeedSlaves(server.slaves,dbid,argv,2);
3313+
replicationFeedSlaves(dbid,argv,2);
33143314
}
33153315
sdsfree(key);
33163316
decrRefCount(val);

src/replication.c

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ void feedReplicationBuffer(char *s, size_t len) {
434434
* received by our clients in order to create the replication stream.
435435
* Instead if the instance is a replica and has sub-replicas attached, we use
436436
* replicationFeedStreamFromMasterStream() */
437-
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
437+
void replicationFeedSlaves(int dictid, robj **argv, int argc) {
438438
int j, len;
439439
char llstr[LONG_STR_SIZE];
440440

@@ -451,7 +451,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
451451

452452
/* If there aren't slaves, and there is no backlog buffer to populate,
453453
* we can return ASAP. */
454-
if (server.repl_backlog == NULL && listLength(slaves) == 0) {
454+
if (server.repl_backlog == NULL && listLength(server.slaves) == 0) {
455455
/* We increment the repl_offset anyway, since we use that for tracking AOF fsyncs
456456
* even when there's no replication active. This code will not be reached if AOF
457457
* is also disabled. */
@@ -1313,6 +1313,9 @@ int replicaPutOnline(client *slave) {
13131313
NULL);
13141314
serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",
13151315
replicationGetSlaveName(slave));
1316+
1317+
/* Replicate slot being migrated/imported to the new replica */
1318+
clusterReplicateOpenSlots();
13161319
return 1;
13171320
}
13181321

@@ -3788,8 +3791,7 @@ void replicationCron(void) {
37883791

37893792
if (!manual_failover_in_progress) {
37903793
ping_argv[0] = shared.ping;
3791-
replicationFeedSlaves(server.slaves, -1,
3792-
ping_argv, 1);
3794+
replicationFeedSlaves(-1, ping_argv, 1);
37933795
}
37943796
}
37953797

src/server.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1612,7 +1612,7 @@ static void sendGetackToReplicas(void) {
16121612
argv[0] = shared.replconf;
16131613
argv[1] = shared.getack;
16141614
argv[2] = shared.special_asterick; /* Not used argument. */
1615-
replicationFeedSlaves(server.slaves, -1, argv, 3);
1615+
replicationFeedSlaves(-1, argv, 3);
16161616
}
16171617

16181618
extern int ProcessingEventsWhileBlocked;
@@ -3296,7 +3296,7 @@ static void propagateNow(int dbid, robj **argv, int argc, int target) {
32963296
if (server.aof_state != AOF_OFF && target & PROPAGATE_AOF)
32973297
feedAppendOnlyFile(dbid,argv,argc);
32983298
if (target & PROPAGATE_REPL)
3299-
replicationFeedSlaves(server.slaves,dbid,argv,argc);
3299+
replicationFeedSlaves(dbid,argv,argc);
33003300
}
33013301

33023302
/* Used inside commands to schedule the propagation of additional commands

src/server.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2816,7 +2816,7 @@ ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout);
28162816
ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);
28172817

28182818
/* Replication */
2819-
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
2819+
void replicationFeedSlaves(int dictid, robj **argv, int argc);
28202820
void replicationFeedStreamFromMasterStream(char *buf, size_t buflen);
28212821
void resetReplicationBuffer(void);
28222822
void feedReplicationBuffer(char *buf, size_t len);

0 commit comments

Comments
 (0)