复制(Redis2.8)
设置主服务器的地址和端口(SLAVE OF命令)
Redis的主从复制设置非常方便,只需要在从服务器上设置主服务器的IP和端口即可。如果需要关闭主从同步,只需要执行SLAVEOF NO ONE即可。 该命令的具体描述见官方文档
可以看到,slaveof命令是一个异步命令,执行的时候只是设置了新的主服务器,然后就立马返回结果了。真正执行连接等操作的, 是在定时器中执行的。
建立套接字连接
提醒哦那个每隔1秒钟,会调用replicationCron函数,该函数会根据状态执行定时操作。当状态为REDIS_REPL_CONNECT的时候 执行逻辑为:
如果发现当前主从状态是REDIS_REPL_CONNECT(刚执行slaveof的时候设置的),就会去连接主服务器。当socket连接建立之后, 会注册syncWithMaster这个回调,并且设置主从状态为REDIS_REPL_CONNECTING。
发送PING命令
PING命令都很熟悉了,jedis pool中用来检测当前连接是否有效,用的就是这个命令。手工执行PING命令,Redis会返回一个PONG作为响应。
这里发送PING命令,主要也是为了检测当前和master连接是否正常,master是否能够正常处理命令。
这里当状态是REDIS_REPL_CONNECTING的时候,向master发送了PING命令,然后就等待master返回PONG的响应。
PONG响应也是在这个函数中进行处理的:
- 如果读取master返回值失败,直接跳转到error,关闭连接,重新将连接状态设置为REDIS_REPL_CONNECT(也就是SLAVEOF执行完成之后的状态), 等待下次定时器重连;
- 读取响应成功,判断响应值是否为PONG,如果为PONG则表示连接检测完成,将发送当前slave端口信息,用于master同步数据
- 如果判断是需要认证,切设置了masterauth,则发送AUTH命令,向master发起授权。
- 如果授权成功,将继续后续的同步流程
- 如果授权失败,则进入error流程,关闭连接,并等待下次重试
发送端口信息
前面的PONG响应流程里面已经提到了,当正确接收到了PONG响应,或者是完成了认证之后,slave会发起一个REPLCONF命令,将自己的端口发送给master。 master接受到这个命令之后,将slave的端口信息记录到这个slave对应的client对象的slave_listening_port属性中。
这时,在master上通过INFO命令,就可以看见slave的端口信息:
同步
还是在syncWithMaster函数中。发送完端口信息之后,slave会尝试进行增量同步:
如果不支持增量同步,会向master发送SYNC命令做全量同步。增量同步是在Redis2.8中支持的,所以全量同步就不管了。大致的操作流程就是 master做一次BGSAVE,然后将保存的rdb文件通过TCP连接发送给slave,slave加载这个rdb文件。
这里着重了解增量同步:
- 首先设置同步偏移量为-1,表示第一次增量更新(其实也就是个全量更新)
- 向master发送PSYNC命令,告知master自己的id和同步偏移量
- master返回全量更新(FULLRESYNC),保存master返回的偏移量和运行id,清除之前缓存的master信息 确认可以增量同步后,由于第一次是全量同步,因此操作和原全量同步相同:
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {.../* Prepare a suitable temp file for bulk transfer */while(maxtries--) {snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);if (dfd != -1) break;sleep(1);}if (dfd == -1) {redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));goto error;}/* Setup the non blocking download of the bulk file. */if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)== AE_ERR){redisLog(REDIS_WARNING,"Can't create readable event for SYNC: %s (fd=%d)",strerror(errno),fd);goto error;}server.repl_state = REDIS_REPL_TRANSFER;server.repl_transfer_size = -1;server.repl_transfer_read = 0;server.repl_transfer_last_fsync_off = 0;server.repl_transfer_fd = dfd;server.repl_transfer_lastio = server.unixtime;server.repl_transfer_tmpfile = zstrdup(tmpfile);return;}
- 创建一个临时文件,用于保存master传回的rdb文件
- 开始读取master传输回来的rdb文件,注册readSyncBulkPayload回调函数来处理
- 设置当前的状态为REDIS_REPL_TRANSFER,并保存传输文件等中间内容
readSyncBulkPayload函数用于接收master传输的rdb文件,并加载到Redis中,大致流程:
- 读取文件长度
- 读取文件内容,并保存到本地rdb临时文件中
- 读取完成之后,清空Redis数据库
- 加载rdb文件
- 创建一个master -> slave的通道,将当前slave作为master的client,以继续执行master同步过来的命令
- 将同步状态改成REDIS_REPL_CONNECTED,并回写同步偏移量等
- 开启aof如果需要(server.aof_state != REDIS_AOF_OFF)
master对PSYNC命令的处理
- 首先判断自己是slave的时候不能执行psync
- 判断是否需要全量同步,如果不需要,直接退出
- 如果需要全量同步,创建一个rdb文件
- 如果已经在写rdb文件,尽量复用当前的文件
- 如果没有,则发起一个bgsave
判断是否需要全量同步:
主要场景有两个:
- 当前请求的id和server的id不匹配
- 当前Redis保存的日志无法满足slave要求的偏移量
- master还没有back log
- master back log长度不够
同时,每次rdb文件保存完毕的时候,都会调用updateSlavesWaitingBgsave函数,处理保存的rdb文件。
命令传播
上面的流程结束,slave已经包含了master BGSAVE时所包含的所有数据。后续就需要master一直将自己的命令发送给slave。
在调用任何命令的时候,都会将命令分发到slave上去(除了AOF加载或者命令加了REDIS_CMD_SKIP_MONITOR标签)。
replicationFeedSlaves函数主要作用有两个:
- 将命令发送给所有在线的slave
- 将命令写入到back log中,方便后续增量同步
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {listNode *ln;listIter li;int j, len;char llstr[REDIS_LONGSTR_SIZE];/* If there aren't slaves, and there is no backlog buffer to populate,* we can return ASAP. */if (server.repl_backlog == NULL && listLength(slaves) == 0) return;/* We can't have slaves attached and no backlog. */redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));/* Send SELECT command to every slave if needed. */if (server.slaveseldb != dictid) {robj *selectcmd;/* For a few DBs we have pre-computed SELECT command. */// 每次都增加一个SELECT命令,防止弄错dbif (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {selectcmd = shared.select[dictid];} else {int dictid_len;dictid_len = ll2string(llstr,sizeof(llstr),dictid);selectcmd = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",dictid_len, llstr));}/* Add the SELECT command into the backlog. */// 将select命令写入到backlog中if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);/* Send it to slaves. */// 将select命令发送给slavelistRewind(slaves,&li);while((ln = listNext(&li))) {redisClient *slave = ln->value;addReply(slave,selectcmd);}if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)decrRefCount(selectcmd);}server.slaveseldb = dictid;/* Write the command to the replication backlog if any. */// 将命令写入到backlog中if (server.repl_backlog) {char aux[REDIS_LONGSTR_SIZE+3];/* Add the multi bulk reply length. */aux[0] = '*';len = ll2string(aux+1,sizeof(aux)-1,argc);aux[len+1] = '\r';aux[len+2] = '\n';feedReplicationBacklog(aux,len+3);for (j = 0; j < argc; j++) {long objlen = stringObjectLen(argv[j]);/* We need to feed the buffer with the object as a bulk reply* not just as a plain string, so create the $..CRLF payload len* ad add the final CRLF */aux[0] = '$';len = ll2string(aux+1,sizeof(aux)-1,objlen);aux[len+1] = '\r';aux[len+2] = '\n';feedReplicationBacklog(aux,len+3);feedReplicationBacklogWithObject(argv[j]);feedReplicationBacklog(aux+len+1,2);}}/* Write the command to every slave. */// 将命令发送到所有的slavelistRewind(slaves,&li);while((ln = listNext(&li))) {redisClient *slave = ln->value;/* Don't feed slaves that are still waiting for BGSAVE to start */if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;/* Feed slaves that are waiting for the initial SYNC (so these commands* are queued in the output buffer until the initial SYNC completes),* or are already in sync with the master. *//* Add the multi bulk length. */addReplyMultiBulkLen(slave,argc);/* Finally any additional argument that was not stored inside the* static buffer if any (from j to argc). */for (j = 0; j < argc; j++)addReplyBulk(slave,argv[j]);}}
注:backlog大小可以设置,默认的大小为1M,如果超过,覆盖最初的日志
#define REDIS_DEFAULT_REPL_BACKLOG_SIZE (1024*1024) /* 1mb */
心跳检测和命令丢失补偿
在命令传播阶段,slave每秒一次向master发送REPLCONF命令,发送当前的offset,让master检测是否有命令丢失。 这个也是在定时器中发送的。
同时,master在接收到这个ACK包的时候,会记录slave的ack offset和ack时间:
还是在定时器中,每次调用的时候都会清理已经超时的slave:
这里的repl_ack_time由slave每次发送的ack包写入,server.repl_timeout默认值是60s:
增量同步
master断开了slave连接之后,slave为了能够进行增量同步,freeClient的实现,针对master的slave client,也有不同的处理:
经过这些处理,一个断开连接的slave,复制状态变成了REDIS_REPL_CONNECT。按照之前的流程,定时器会去尝试连接master, 发送PING命令,然后再发送PSYNC命令的时候,由于已经有了cached_master,会在PSYNC命令中带上之前master的id和偏移量。 相关slave和master的处理逻辑,前面代码中已经有了。