coolEx

Today will be better

[redis设计与实现][14]sentinel——jedis客户端

sentinel客户端支持——jedis

简单的代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class App
{  
    public static void main( String[] args )  
    {  
        Set<String> sentinels = new HashSet<String>();
        // 添加sentinel
        sentinels.add("172.18.18.207:26379");
        // 初始化基于sentinel的连接池,需要一个master的名字(用于查询这个master)
        JedisSentinelPool pool = new JedisSentinelPool("mymaster", sentinels);  

        // 后面使用和标准的jedis连接池相同
        Jedis jedis = pool.getResource();  

        jedis.set("jedis", "jedis");  

        pool.returnResource(jedis);  

    }
}

JedisSentinelPool的源码在github上托管。
大致的流程:
1. 根据给定的sentinel列表和master名字,获得master信息(IP,端口)(通过sentinel master命令)
2. 启动一个线程,根据给定的sentinel列表,订阅+switch-master消息
3. 一旦发现有master切换,调用initPool方法,修改连接池使用的master

[redis设计与实现][13]sentinel——故障恢复

选举领头sentinel

当sentinelStartFailoverIfNeeded判断需要进入故障恢复(failover)的时候,会调用sentinelStartFailover函数,开始进入failover状态。
这时,会标记master的failover_state为SENTINEL_FAILOVER_STATE_WAIT_START。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
    /* We can't failover if the master is not in O_DOWN state. */
    if (!(master->flags & SRI_O_DOWN)) return 0;

    /* Failover already in progress? */
    if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;

    /* Last failover attempt started too little time ago? */
    if (mstime() - master->failover_start_time <
        master->failover_timeout*2)
    {
        if (master->failover_delay_logged != master->failover_start_time) {
            time_t clock = (master->failover_start_time +
                            master->failover_timeout*2) / 1000;
            char ctimebuf[26];

            ctime_r(&clock,ctimebuf);
            ctimebuf[24] = '\0'; /* Remove newline. */
            master->failover_delay_logged = master->failover_start_time;
            redisLog(REDIS_WARNING,
                "Next failover delay: I will not start a failover before %s",
                ctimebuf);
        }
        return 0;
    }

    sentinelStartFailover(master);
    return 1;
}
void sentinelStartFailover(sentinelRedisInstance *master) {
    redisAssert(master->flags & SRI_MASTER);

    master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
    master->flags |= SRI_FAILOVER_IN_PROGRESS;
    master->failover_epoch = ++sentinel.current_epoch;
    sentinelEvent(REDIS_WARNING,"+new-epoch",master,"%llu",
        (unsigned long long) sentinel.current_epoch);
    sentinelEvent(REDIS_WARNING,"+try-failover",master,"%@");
    master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
    master->failover_state_change_time = mstime();
}

按照前面发送is-master-down-by-addr询问的逻辑,这时候sentinel在发送is-master-down-by-addr询问的时候,
会带上自己的run id,表示要选举一个局部领头sentinel。大致选举规则和方法:
* 所有在线的sentinel都有被选为领头sentinel的资格
* 每次进行领头sentinel选举之后,不论选举是否成功,所有sentinel的配置纪元的值都会自增一次。
* 在一个配置纪元里面,所有sentinel都有一次将某个sentinel设置为局部领头sentinel的机会,并且一旦设置,这个配置新纪元就不能再修改
* 每个发现主服务器进入客观下线的sentinel都会要求其他sentinel将自己设置为局部领头sentinel
* 当一个sentinel(源sentinel)向另一个sentinel(目标sentinel)发送is-master-down-by-addr询问,表示要求对方将自己设置为局部领头sentinel
* 局部领头sentinel是先到先得:只有第一个发送is-master-down-by-addr询问的sentinel被设为局部领头sentinel,后续的都会被拒绝
* 目标sentinel回复is-master-down-by-addr询问,回复中会带上局部领头sentinel运行id和配置纪元
* 源sentinel收到目标sentinel返回之后,会检查其中的leader_epoch参数和自己的配置纪元是否相同,如果相同再比较其中的运行id是否和自己的相同,
如果相同,目标sentinel将源sentinel设置成了局部领头sentinel
* 如果有某个sentinel被半数以上sentinel设置局部领头sentinel,则这个sentinel成为领头sentinel
* 因为领头sentinel的产生需要半数以上sentinel支持,并且每个sentinel在一个配置纪元里面只能设置一次局部sentinel,所以一个配置纪元里面,
只会出现一个领头sentinel
* 如果在给定时限内没有选举出领头sentinel,将会在一段时间后再次进行选举。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
void sentinelCommand(redisClient *c) {
    ...
    else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
        /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> <current-epoch> <runid>*/
        sentinelRedisInstance *ri;
        long long req_epoch;
        uint64_t leader_epoch = 0;
        char *leader = NULL;
        long port;
        int isdown = 0;

        if (c->argc != 6) goto numargserr;
        if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK ||
            getLongLongFromObjectOrReply(c,c->argv[4],&req_epoch,NULL)
                                                              != REDIS_OK)
            return;
        ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
            c->argv[2]->ptr,port,NULL);

        /* It exists? Is actually a master? Is subjectively down? It's down.
         * Note: if we are in tilt mode we always reply with "0". */
        if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
                                    (ri->flags & SRI_MASTER))
            isdown = 1;

        /* Vote for the master (or fetch the previous vote) if the request
         * includes a runid, otherwise the sender is not seeking for a vote. */
         // 这个时候发过来的是sentinel的run_id,所以要给出一个局部领头sentinel
        if (ri && ri->flags & SRI_MASTER && strcasecmp(c->argv[5]->ptr,"*")) {
            leader = sentinelVoteLeader(ri,(uint64_t)req_epoch,
                                            c->argv[5]->ptr,
                                            &leader_epoch);
        }

        /* Reply with a three-elements multi-bulk reply:
         * down state, leader, vote epoch. */
        addReplyMultiBulkLen(c,3);
        addReply(c, isdown ? shared.cone : shared.czero);
        addReplyBulkCString(c, leader ? leader : "*");
        addReplyLongLong(c, (long long)leader_epoch);
        if (leader) sdsfree(leader);
    }
    ...
}

/* Vote for the sentinel with 'req_runid' or return the old vote if already
 * voted for the specifed 'req_epoch' or one greater.
 *
 * If a vote is not available returns NULL, otherwise return the Sentinel
 * runid and populate the leader_epoch with the epoch of the vote. */
char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch) {
    // 源sentinel纪元比当前的新,更新纪元
    if (req_epoch > sentinel.current_epoch) {
        sentinel.current_epoch = req_epoch;
        sentinelFlushConfig();
        sentinelEvent(REDIS_WARNING,"+new-epoch",master,"%llu",
            (unsigned long long) sentinel.current_epoch);
    }
    // 当前sentinel在当前纪元还没有领头sentinel,设置源sentinel为局部领头sentinel
    if (master->leader_epoch < req_epoch && sentinel.current_epoch <= req_epoch)
    {
        sdsfree(master->leader);
        // 设置局部领头sentinel为源sentinel
        master->leader = sdsnew(req_runid);
        master->leader_epoch = sentinel.current_epoch;
        sentinelFlushConfig();
        sentinelEvent(REDIS_WARNING,"+vote-for-leader",master,"%s %llu",
            master->leader, (unsigned long long) master->leader_epoch);
        /* If we did not voted for ourselves, set the master failover start
         * time to now, in order to force a delay before we can start a
         * failover for the same master. */
        if (strcasecmp(master->leader,server.runid))
            master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
    }

    *leader_epoch = master->leader_epoch;
    return master->leader ? sdsnew(master->leader) : NULL;
}

这时候,通过is-master-down-by-addr询问,已经选举出局部领头sentinel,然后继续回到sentinelHandleRedisInstance,
sentinelFailoverStateMachine方法会继续这个选举:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
    redisAssert(ri->flags & SRI_MASTER);

    if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;

    switch(ri->failover_state) {
        case SENTINEL_FAILOVER_STATE_WAIT_START:
            sentinelFailoverWaitStart(ri);
            break;
        case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
            sentinelFailoverSelectSlave(ri);
            break;
        case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
            sentinelFailoverSendSlaveOfNoOne(ri);
            break;
        case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
            sentinelFailoverWaitPromotion(ri);
            break;
        case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
            sentinelFailoverReconfNextSlave(ri);
            break;
    }
}
void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
    char *leader;
    int isleader;

    /* Check if we are the leader for the failover epoch. */
    // 在等待开始failover的时候,检查leader是否已经选举出来了
    leader = sentinelGetLeader(ri, ri->failover_epoch);
    isleader = leader && strcasecmp(leader,server.runid) == 0;
    sdsfree(leader);

    /* If I'm not the leader, and it is not a forced failover via
     * SENTINEL FAILOVER, then I can't continue with the failover. */
    // 没有被选上作为领头sentinel,放弃进入failover,由领头sentinel去完成
    if (!isleader && !(ri->flags & SRI_FORCE_FAILOVER)) {
        int election_timeout = SENTINEL_ELECTION_TIMEOUT;

        /* The election timeout is the MIN between SENTINEL_ELECTION_TIMEOUT
         * and the configured failover timeout. */
        if (election_timeout > ri->failover_timeout)
            election_timeout = ri->failover_timeout;
        /* Abort the failover if I'm not the leader after some time. */
        if (mstime() - ri->failover_start_time > election_timeout) {
            sentinelEvent(REDIS_WARNING,"-failover-abort-not-elected",ri,"%@");
            sentinelAbortFailover(ri);
        }
        return;
    }
    sentinelEvent(REDIS_WARNING,"+elected-leader",ri,"%@");
    ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
    ri->failover_state_change_time = mstime();
    sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
}
char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) {
    dict *counters;
    dictIterator *di;
    dictEntry *de;
    unsigned int voters = 0, voters_quorum;
    char *myvote;
    char *winner = NULL;
    uint64_t leader_epoch;
    uint64_t max_votes = 0;

    redisAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
    // 用于统计sentinel获得的票数,key为sentinel的run_id,value为获得的票数
    counters = dictCreate(&leaderVotesDictType,NULL);

    voters = dictSize(master->sentinels)+1; /* All the other sentinels and me. */

    /* Count other sentinels votes */
    di = dictGetIterator(master->sentinels);
    // 迭代所有sentinels,统计每个sentinel记录的局部领头sentinel
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);
        // 只能统计当前纪元的
        if (ri->leader != NULL && ri->leader_epoch == sentinel.current_epoch)
            sentinelLeaderIncr(counters,ri->leader);
    }
    dictReleaseIterator(di);

    /* Check what's the winner. For the winner to win, it needs two conditions:
     * 1) Absolute majority between voters (50% + 1).
     * 2) And anyway at least master->quorum votes. */
    // 获得刚统计的获得票数最多的sentinel的run_id和最大票数
    di = dictGetIterator(counters);
    while((de = dictNext(di)) != NULL) {
        uint64_t votes = dictGetUnsignedIntegerVal(de);

        if (votes > max_votes) {
            max_votes = votes;
            winner = dictGetKey(de);
        }
    }
    dictReleaseIterator(di);

    /* Count this Sentinel vote:
     * if this Sentinel did not voted yet, either vote for the most
     * common voted sentinel, or for itself if no vote exists at all. */
    // 当前sentinel的票数,确定自己能否成为领头sentinel
    if (winner)
        myvote = sentinelVoteLeader(master,epoch,winner,&leader_epoch);
    else
        myvote = sentinelVoteLeader(master,epoch,server.runid,&leader_epoch);

    if (myvote && leader_epoch == epoch) {
        uint64_t votes = sentinelLeaderIncr(counters,myvote);

        if (votes > max_votes) {
            max_votes = votes;
            winner = myvote;
        }
    }

    voters_quorum = voters/2+1;
    // 必须大于半数,且大于配置的quonum
    if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
        winner = NULL;

    winner = winner ? sdsnew(winner) : NULL;
    sdsfree(myvote);
    dictRelease(counters);
    return winner;
}

这样,领头sentinel就已经被选择出来,并且failover状态已经变成了SENTINEL_FAILOVER_STATE_SELECT_SLAVE,
当前sentinel作为领头sentinel,将会真正完成master的failover。

故障转移

故障转移(failover)的第一步,就是选出新的master,大致的筛选流程为:
1. 删除列表中所有处于下线或者断线状态的slave
2. 删除列表中所有最近五秒内没有回复过领头sentinel的INFO命令的slave
3. 删除所有与已下线主服务器连接断开超过down-after-milliseconds * 10毫秒的slave(确保slave没有过早与master断开,副本比较新)
4. 根据slave优先级选择
5. 如果优先级相同,选择复制偏移量最大的slave
6. 如果都相同,按照run_id排序,选出run_id最小的slave


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
    // 选出slave
    sentinelRedisInstance *slave = sentinelSelectSlave(ri);

    /* We don't handle the timeout in this state as the function aborts
     * the failover or go forward in the next state. */
    if (slave == NULL) {
        sentinelEvent(REDIS_WARNING,"-failover-abort-no-good-slave",ri,"%@");
        sentinelAbortFailover(ri);
    } else {
        // 修改状态为SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE
        sentinelEvent(REDIS_WARNING,"+selected-slave",slave,"%@");
        slave->flags |= SRI_PROMOTED;
        ri->promoted_slave = slave;
        ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
        ri->failover_state_change_time = mstime();
        sentinelEvent(REDIS_NOTICE,"+failover-state-send-slaveof-noone",
            slave, "%@");
    }
}
sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
    sentinelRedisInstance **instance =
        zmalloc(sizeof(instance[0])*dictSize(master->slaves));
    sentinelRedisInstance *selected = NULL;
    int instances = 0;
    dictIterator *di;
    dictEntry *de;
    mstime_t max_master_down_time = 0;

    // 计算最长同步延迟
    if (master->flags & SRI_S_DOWN)
        max_master_down_time += mstime() - master->s_down_since_time;
    max_master_down_time += master->down_after_period * 10;

    di = dictGetIterator(master->slaves);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *slave = dictGetVal(de);
        mstime_t info_validity_time;

        // 已经断开的slave,直接忽略
        if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_DISCONNECTED)) continue;
        // 超过5倍ping间隔的slave也忽略
        if (mstime() - slave->last_avail_time > SENTINEL_PING_PERIOD*5) continue;
        if (slave->slave_priority == 0) continue;

        /* If the master is in SDOWN state we get INFO for slaves every second.
         * Otherwise we get it with the usual period so we need to account for
         * a larger delay. */
        if (master->flags & SRI_S_DOWN)
            info_validity_time = SENTINEL_PING_PERIOD*5;
        else
            info_validity_time = SENTINEL_INFO_PERIOD*3;
        // INFO响应超过有效时间,忽略
        if (mstime() - slave->info_refresh > info_validity_time) continue;
        // 和master断开的时间太长,忽略
        if (slave->master_link_down_time > max_master_down_time) continue;
        instance[instances++] = slave;
    }
    dictReleaseIterator(di);
    if (instances) {
        // 快速排序
        qsort(instance,instances,sizeof(sentinelRedisInstance*),
            compareSlavesForPromotion);
        selected = instance[0];
    }
    zfree(instance);
    return selected;
}
int compareSlavesForPromotion(const void *a, const void *b) {
    sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
                          **sb = (sentinelRedisInstance **)b;
    char *sa_runid, *sb_runid;

    // 先根据slave优先级排序
    if ((*sa)->slave_priority != (*sb)->slave_priority)
        return (*sa)->slave_priority - (*sb)->slave_priority;

    /* If priority is the same, select the slave with greater replication
     * offset (processed more data frmo the master). */
    // 优先级相同,根据复制偏移量
    if ((*sa)->slave_repl_offset > (*sb)->slave_repl_offset) {
        return -1; /* a < b */
    } else if ((*sa)->slave_repl_offset < (*sb)->slave_repl_offset) {
        return 1; /* b > a */
    }

    /* If the replication offset is the same select the slave with that has
     * the lexicographically smaller runid. Note that we try to handle runid
     * == NULL as there are old Redis versions that don't publish runid in
     * INFO. A NULL runid is considered bigger than any other runid. */
    // 到这里选哪个都无所谓了,按照runid来选择
    sa_runid = (*sa)->runid;
    sb_runid = (*sb)->runid;
    if (sa_runid == NULL && sb_runid == NULL) return 0;
    else if (sa_runid == NULL) return 1;  /* a > b */
    else if (sb_runid == NULL) return -1; /* a < b */
    return strcasecmp(sa_runid, sb_runid);
}

这样,新的master已经被选出来了。failover的状态变成了SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE。

新的master被选出来之后,需要对新的master发送命令,让它角色发生变化,从slave变成master:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
    int retval;

    /* We can't send the command to the promoted slave if it is now
     * disconnected. Retry again and again with this state until the timeout
     * is reached, then abort the failover. */
    if (ri->promoted_slave->flags & SRI_DISCONNECTED) {
        if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
            sentinelEvent(REDIS_WARNING,"-failover-abort-slave-timeout",ri,"%@");
            sentinelAbortFailover(ri);
        }
        return;
    }

    /* Send SLAVEOF NO ONE command to turn the slave into a master.
     * We actually register a generic callback for this command as we don't
     * really care about the reply. We check if it worked indirectly observing
     * if INFO returns a different role (master instead of slave). */
    // 发送slaveof no one命令,告知slave成为master
    // 由于是否成功通过info命令观察,所以这里发送的时候不关注slaveof的结果
    retval = sentinelSendSlaveOf(ri->promoted_slave,NULL,0);
    if (retval != REDIS_OK) return;
    sentinelEvent(REDIS_NOTICE, "+failover-state-wait-promotion",
        ri->promoted_slave,"%@");
    // 状态变成SENTINEL_FAILOVER_STATE_WAIT_PROMOTION
    ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
    ri->failover_state_change_time = mstime();
}

这样,就完成了slave到master的转变。failover状态变成了SENTINEL_FAILOVER_STATE_WAIT_PROMOTION。

在SENTINEL_FAILOVER_STATE_WAIT_PROMOTION这个状态,处理函数不真正去检查slave状态,而是通过定时的INFO命令来修改上位的slave。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
    /* Just handle the timeout. Switching to the next state is handled
     * by the function parsing the INFO command of the promoted slave. */
    if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
        sentinelEvent(REDIS_WARNING,"-failover-abort-slave-timeout",ri,"%@");
        sentinelAbortFailover(ri);
    }
}

void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
    ...
    // 发现有slave成为了master(之前发送了slaveof no one)
    if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
        /* If this is a promoted slave we can change state to the
         * failover state machine. */
        // 确定的确是sentinel发出的切换指令
        if ((ri->flags & SRI_PROMOTED) &&
            (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
            (ri->master->failover_state ==
                SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
        {
            /* Now that we are sure the slave was reconfigured as a master
             * set the master configuration epoch to the epoch we won the
             * election to perform this failover. This will force the other
             * Sentinels to update their config (assuming there is not
             * a newer one already available). */
            // 修改master配置纪元为新的纪元
            ri->master->config_epoch = ri->master->failover_epoch;
            // failover状态改成SENTINEL_FAILOVER_STATE_RECONF_SLAVES
            ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
            ri->master->failover_state_change_time = mstime();
            sentinelFlushConfig();
            sentinelEvent(REDIS_WARNING,"+promoted-slave",ri,"%@");
            sentinelEvent(REDIS_WARNING,"+failover-state-reconf-slaves",
                ri->master,"%@");
            sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
                "start",ri->master->addr,ri->addr);
            sentinelForceHelloUpdateForMaster(ri->master);
        } else {
            /* A slave turned into a master. We want to force our view and
             * reconfigure as slave. Wait some time after the change before
             * going forward, to receive new configs if any. */
            // 有一个自称是master的,强行把他变成slave
            mstime_t wait_time = SENTINEL_PUBLISH_PERIOD*4;

            if (!(ri->flags & SRI_PROMOTED) &&
                 sentinelMasterLooksSane(ri->master) &&
                 sentinelRedisInstanceNoDownFor(ri,wait_time) &&
                 mstime() - ri->role_reported_time > wait_time)
            {
                int retval = sentinelSendSlaveOf(ri,
                        ri->master->addr->ip,
                        ri->master->addr->port);
                if (retval == REDIS_OK)
                    sentinelEvent(REDIS_NOTICE,"+convert-to-slave",ri,"%@");
            }
        }
    }
    ...
}

在INFO命令回调中,处理了slave->master的切换,这时候failover状态变成了SENTINEL_FAILOVER_STATE_RECONF_SLAVES。

这个状态的时候,状态机处理函数会对所有slave发送slaveof命令,切换slave对应的master,重新建立主备关系。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
    dictIterator *di;
    dictEntry *de;
    int in_progress = 0;

    di = dictGetIterator(master->slaves);
    // 首先判断有多少处于重新建立主备关系
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *slave = dictGetVal(de);

        if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
            in_progress++;
    }
    dictReleaseIterator(di);

    di = dictGetIterator(master->slaves);
    // 控制网络传输等,确保并行创建主备的slave小于设置的阈值
    while(in_progress < master->parallel_syncs &&
          (de = dictNext(di)) != NULL)
    {
        sentinelRedisInstance *slave = dictGetVal(de);
        int retval;

        /* Skip the promoted slave, and already configured slaves. */
        if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;

        /* If too much time elapsed without the slave moving forward to
         * the next state, consider it reconfigured even if it is not.
         * Sentinels will detect the slave as misconfigured and fix its
         * configuration later. */
        // 同步超时,先不管
        if ((slave->flags & SRI_RECONF_SENT) &&
            (mstime() - slave->slave_reconf_sent_time) >
            SENTINEL_SLAVE_RECONF_TIMEOUT)
        {
            sentinelEvent(REDIS_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
            slave->flags &= ~SRI_RECONF_SENT;
            slave->flags |= SRI_RECONF_DONE;
        }

        /* Nothing to do for instances that are disconnected or already
         * in RECONF_SENT state. */
        // 已经在同步中
        if (slave->flags & (SRI_DISCONNECTED|SRI_RECONF_SENT|SRI_RECONF_INPROG))
            continue;

        /* Send SLAVEOF <new master>. */
        // 还没同步,发送slaveof命令,开始主备复制
        retval = sentinelSendSlaveOf(slave,
                master->promoted_slave->addr->ip,
                master->promoted_slave->addr->port);
        if (retval == REDIS_OK) {
            slave->flags |= SRI_RECONF_SENT;
            slave->slave_reconf_sent_time = mstime();
            sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent",slave,"%@");
            in_progress++;
        }
    }
    dictReleaseIterator(di);

    /* Check if all the slaves are reconfigured and handle timeout. */
    sentinelFailoverDetectEnd(master);
}

如果所有的slave都复制完毕,failover会进入SENTINEL_FAILOVER_STATE_UPDATE_CONFIG状态。

SENTINEL_FAILOVER_STATE_UPDATE_CONFIG状态会重置master,将master相关属性全部从原先的master改成被提升的master。
(sentinelResetMasterAndChangeAddress)。
这样整个failover的流程就结束了,redis集群又重新建立了新的主备关系。

[redis设计与实现][12]sentinel——故障检测

检查主观下线

sentinel每次发送PING命令,用于检测被监测的master和slave是否宕机。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int sentinelSendPing(sentinelRedisInstance *ri) {
    int retval = redisAsyncCommand(ri->cc,
        sentinelPingReplyCallback, NULL, "PING");
    if (retval == REDIS_OK) {
        ri->pending_commands++;
        /* We update the ping time only if we received the pong for
         * the previous ping, otherwise we are technically waiting
         * since the first ping that did not received a reply. */
         // 只有收到了PONG响应的时候,这个字段才会变成0
        if (ri->last_ping_time == 0) ri->last_ping_time = mstime();
        return 1;
    } else {
        return 0;
    }
}
void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
    ...
    if (strncmp(r->str,"PONG",4) == 0 ||
        strncmp(r->str,"LOADING",7) == 0 ||
        strncmp(r->str,"MASTERDOWN",10) == 0)
    {
        ri->last_avail_time = mstime();
        ri->last_ping_time = 0; /* Flag the pong as received. */
    }
    ...
}

只有收到PONG、LOADING、MASTERDOWN响应的时候,sentinel才认为master正常,记录最后的响应时间。

同时每次调用sentinelHandleRedisInstance函数的时候,都会去检查是否有监控的master活着slave已经主管超时。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
    mstime_t elapsed = 0;

    if (ri->last_ping_time)
        elapsed = mstime() - ri->last_ping_time;

    /* Check if we are in need for a reconnection of one of the
     * links, because we are detecting low activity.
     *
     * 1) Check if the command link seems connected, was connected not less
     *    than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have a
     *    pending ping for more than half the timeout. */
     // 检查命令连接是否已经超时
    if (ri->cc &&
        (mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
        ri->last_ping_time != 0 && /* Ther is a pending ping... */
        /* The pending ping is delayed, and we did not received
         * error replies as well. */
        (mstime() - ri->last_ping_time) > (ri->down_after_period/2) &&
        (mstime() - ri->last_pong_time) > (ri->down_after_period/2))
    {
        sentinelKillLink(ri,ri->cc);
    }

    /* 2) Check if the pubsub link seems connected, was connected not less
     *    than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
     *    activity in the Pub/Sub channel for more than
     *    SENTINEL_PUBLISH_PERIOD * 3.
     */
     // 检查订阅连接是否已经超时
    if (ri->pc &&
        (mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
        (mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
    {
        sentinelKillLink(ri,ri->pc);
    }

    /* Update the SDOWN flag. We believe the instance is SDOWN if:
     *
     * 1) It is not replying.
     * 2) We believe it is a master, it reports to be a slave for enough time
     *    to meet the down_after_period, plus enough time to get two times
     *    INFO report from the instance. */
    if (elapsed > ri->down_after_period ||
        (ri->flags & SRI_MASTER &&
         ri->role_reported == SRI_SLAVE &&
         mstime() - ri->role_reported_time >
          (ri->down_after_period+SENTINEL_INFO_PERIOD*2)))
    {
        /* Is subjectively down */
        if ((ri->flags & SRI_S_DOWN) == 0) {
            sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@");
            ri->s_down_since_time = mstime();
            ri->flags |= SRI_S_DOWN;
        }
    } else {
        /* Is subjectively up */
        if (ri->flags & SRI_S_DOWN) {
            sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@");
            ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
        }
    }
}

注意:同一个master的所有slave,主观下线超时时间相同(配置文件中配置的down-after-milliseconds),不同master可以独立配置。

检测客观下线


1
2
3
4
5
6
7
8
9
10
11
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
    ...
    /* Only masters */
    if (ri->flags & SRI_MASTER) {
        sentinelCheckObjectivelyDown(ri);
        if (sentinelStartFailoverIfNeeded(ri))
            sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
        sentinelFailoverStateMachine(ri);
        sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
    }
}

在sentinelHandleRedisInstance函数最后,针对master还会做这些操作:
* 检查客观下线
* 询问其他sentinel的检查结果
* 进行故障转移

首先先来看询问其他sentinel的检查结果:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#define SENTINEL_ASK_FORCED (1<<0)
void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
    dictIterator *di;
    dictEntry *de;

    // 迭代sentinels字典里面保存的所有已知sentinels
    di = dictGetIterator(master->sentinels);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);
        mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
        char port[32];
        int retval;

        /* If the master state from other sentinel is too old, we clear it. */
        if (elapsed > SENTINEL_ASK_PERIOD*5) {
            ri->flags &= ~SRI_MASTER_DOWN;
            sdsfree(ri->leader);
            ri->leader = NULL;
        }

        /* Only ask if master is down to other sentinels if:
         *
         * 1) We believe it is down, or there is a failover in progress.
         * 2) Sentinel is connected.
         * 3) We did not received the info within SENTINEL_ASK_PERIOD ms. */
         // 只有当自己已经检测到master已经进入主观下线状态才发送询问
        if ((master->flags & SRI_S_DOWN) == 0) continue;
        // 确保这个sentinel仍然连接着
        if (ri->flags & SRI_DISCONNECTED) continue;
        // 非强制,且在指定周期(1000ms)内还没有收到回复,暂时不发送
        if (!(flags & SENTINEL_ASK_FORCED) &&
            mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
            continue;

        /* Ask */
        ll2string(port,sizeof(port),master->addr->port);
        retval = redisAsyncCommand(ri->cc,
                    sentinelReceiveIsMasterDownReply, NULL,
                    "SENTINEL is-master-down-by-addr %s %s %llu %s",
                    master->addr->ip, port,
                    sentinel.current_epoch,
                    (master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
                    server.runid : "*");
        if (retval == REDIS_OK) ri->pending_commands++;
    }
    dictReleaseIterator(di);
}

发送消息包含:
* 被判断为主观下线的master IP(master->addr->ip)
* 被判断为主管下线的master端口(port)
* sentinel当前纪元
* 运行id,*表示用于检测该master是否客观下线,当前sentinel运行id用于选举头领

接收响应:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
    sentinelRedisInstance *ri = c->data;
    redisReply *r;
    REDIS_NOTUSED(privdata);

    if (ri) ri->pending_commands--;
    if (!reply || !ri) return;
    r = reply;

    /* Ignore every error or unexpected reply.
     * Note that if the command returns an error for any reason we'll
     * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
    // 响应格式:第一个元素表示是否同意主观下线,1表示同意,0表示不同意
    // 第二个元素,在查询状态的时候返回*
    // 第三个元素仅在第二个元素不为*的时候有效,其余情况都未0
    if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 &&
        r->element[0]->type == REDIS_REPLY_INTEGER &&
        r->element[1]->type == REDIS_REPLY_STRING &&
        r->element[2]->type == REDIS_REPLY_INTEGER)
    {
        ri->last_master_down_reply_time = mstime();
        if (r->element[0]->integer == 1) {
            ri->flags |= SRI_MASTER_DOWN;
        } else {
            ri->flags &= ~SRI_MASTER_DOWN;
        }
        if (strcmp(r->element[1]->str,"*")) {
            /* If the runid in the reply is not "*" the Sentinel actually
             * replied with a vote. */
            sdsfree(ri->leader);
            if ((long long)ri->leader_epoch != r->element[2]->integer)
                redisLog(REDIS_WARNING,
                    "%s voted for %s %llu", ri->name,
                    r->element[1]->str,
                    (unsigned long long) r->element[2]->integer);
            ri->leader = sdsnew(r->element[1]->str);
            ri->leader_epoch = r->element[2]->integer;
        }
    }
}

检查是否客观下线:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
    dictIterator *di;
    dictEntry *de;
    unsigned int quorum = 0, odown = 0;

    if (master->flags & SRI_S_DOWN) {
        /* Is down for enough sentinels? */
        quorum = 1; /* the current sentinel. */
        /* Count all the other sentinels. */
        // 遍历sentinels字典,统计标记master主观下线的sentinel数量
        di = dictGetIterator(master->sentinels);
        while((de = dictNext(di)) != NULL) {
            sentinelRedisInstance *ri = dictGetVal(de);

            if (ri->flags & SRI_MASTER_DOWN) quorum++;
        }
        dictReleaseIterator(di);
        if (quorum >= master->quorum) odown = 1;
    }

    /* Set the flag accordingly to the outcome. */
    if (odown) {
        if ((master->flags & SRI_O_DOWN) == 0) {
            sentinelEvent(REDIS_WARNING,"+odown",master,"%@ #quorum %d/%d",
                quorum, master->quorum);
            master->flags |= SRI_O_DOWN;
            master->o_down_since_time = mstime();
        }
    } else {
        if (master->flags & SRI_O_DOWN) {
            sentinelEvent(REDIS_WARNING,"-odown",master,"%@");
            master->flags &= ~SRI_O_DOWN;
        }
    }
}

有了前面通过is-master-down-by-addr询问和标记,就能够知道标记为主观下线的sentinel数量,如果这个数量超过了配置文件里面设置的quorum,
则标记该master进入客观下线。

[redis设计与实现][11]sentinel——通信

通信

初始化完成之后,sentinel会主动和master、slave进行通信,获取他们的信息。

获取主服务器信息

首先,sentinel会和master建立两个连接,分别是命令连接和订阅连接(分别保存在sentinelRedisInstance的cc和pc字段中)。


1
2
3
4
5
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
    sentinelReconnectInstance(ri);
    ...
}
#define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"

命令连接用于后续获取master的信息(包括slave的信息),订阅(sentinel:hello频道)用于获取master的掉线状态等消息的推送。

连接完成之后,sentinel会定时发送消息:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
    mstime_t now = mstime();
    mstime_t info_period, ping_period;
    int retval;

    /* Return ASAP if we have already a PING or INFO already pending, or
     * in the case the instance is not properly connected. */
    if (ri->flags & SRI_DISCONNECTED) return;

    /* For INFO, PING, PUBLISH that are not critical commands to send we
     * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
     * want to use a lot of memory just because a link is not working
     * properly (note that anyway there is a redundant protection about this,
     * that is, the link will be disconnected and reconnected if a long
     * timeout condition is detected. */
    if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;

    /* If this is a slave of a master in O_DOWN condition we start sending
     * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
     * period. In this state we want to closely monitor slaves in case they
     * are turned into masters by another Sentinel, or by the sysadmin. */
     // INFO命令默认发送间隔为 10s #define SENTINEL_INFO_PERIOD 10000
     // 如果为已经宕机的master的slave,改1s
    if ((ri->flags & SRI_SLAVE) &&
        (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
        info_period = 1000;
    } else {
        info_period = SENTINEL_INFO_PERIOD;
    }

    /* We ping instances every time the last received pong is older than
     * the configured 'down-after-milliseconds' time, but every second
     * anyway if 'down-after-milliseconds' is greater than 1 second. */
     // ping命令默认间隔1s #define SENTINEL_PING_PERIOD 1000
    ping_period = ri->down_after_period;
    if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;

    if ((ri->flags & SRI_SENTINEL) == 0 &&
        (ri->info_refresh == 0 ||
        (now - ri->info_refresh) > info_period))
    {
        /* Send INFO to masters and slaves, not sentinels. */
        retval = redisAsyncCommand(ri->cc,
            sentinelInfoReplyCallback, NULL, "INFO");
        if (retval == REDIS_OK) ri->pending_commands++;
    } else if ((now - ri->last_pong_time) > ping_period) {
        /* Send PING to all the three kinds of instances. */
        sentinelSendPing(ri);
    } else if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
        /* PUBLISH hello messages to all the three kinds of instances. */
        sentinelSendHello(ri);
    }
}

处理INFO消息的返回:


1
2
3
4
5
6
7
8
9
10
11
12
13
void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
    sentinelRedisInstance *ri = c->data;
    redisReply *r;
    REDIS_NOTUSED(privdata);

    if (ri) ri->pending_commands--;
    if (!reply || !ri) return;
    r = reply;

    if (r->type == REDIS_REPLY_STRING) {
        sentinelRefreshInstanceInfo(ri,r->str);
    }
}

这里针对向master发送的INFO,sentinel会:
1. 获取master run_id记录,检查master的role,更新自己维护的master列表
2. 通过复制字段,获取master对应的slave列表,更新自己维护的slave列表

获取slave信息

同样,sentinel也会以同样的频率向slave发送INFO命令,并且提取以下参数:
* run_id
* role
* master的host和port
* 主从服务器的连接状态(master_link_status)
* slave优先级(slave_priority)
* slave复制偏移量(slave_repl_offset)
并更新自己维护的sentinelRedisInstance结构。

发送和接受订阅信息

sentinel每秒通过命令连接向所有master和slave发送信息。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
int sentinelSendHello(sentinelRedisInstance *ri) {
    char ip[REDIS_IP_STR_LEN];
    char payload[REDIS_IP_STR_LEN+1024];
    int retval;
    char *announce_ip;
    int announce_port;
    sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master;
    sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master);

    if (ri->flags & SRI_DISCONNECTED) return REDIS_ERR;

    /* Use the specified announce address if specified, otherwise try to
     * obtain our own IP address. */
    if (sentinel.announce_ip) {
        announce_ip = sentinel.announce_ip;
    } else {
        if (anetSockName(ri->cc->c.fd,ip,sizeof(ip),NULL) == -1)
            return REDIS_ERR;
        announce_ip = ip;
    }
    announce_port = sentinel.announce_port ?
                    sentinel.announce_port : server.port;

    /* Format and send the Hello message. */
    snprintf(payload,sizeof(payload),
        "%s,%d,%s,%llu," /* Info about this sentinel. */
        "%s,%s,%d,%llu", /* Info about current master. */
        announce_ip, announce_port, server.runid,
        (unsigned long long) sentinel.current_epoch,
        /* --- */
        master->name,master_addr->ip,master_addr->port,
        (unsigned long long) master->config_epoch);
    retval = redisAsyncCommand(ri->cc,
        sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
            SENTINEL_HELLO_CHANNEL,payload);
    if (retval != REDIS_OK) return REDIS_ERR;
    ri->pending_commands++;
    return REDIS_OK;
}

发送内容包括:
* sentinel ip(announce_ip)
* sentinel端口(announce_port)
* sentinel运行id(server.runid)
* sentinel配置纪元(sentinel.current_epoch)
* master名称(master->name)
* master IP(master_addr->ip)
* master端口(master_addr->port)
* master纪元(master->config_epoch)

同时,所有连接到这个master上的sentinel都会收到这个消息,然后做出回应:
* 更新sentinels字典
* 创建连接其他sentinel的命令连接

[redis设计与实现][10]sentinel——简介和启动

Sentinel(Redis 3.0.0-rc1)

Sentinel是Redis HA方案,一个或多个Sentinel实例组成的Sentinel系统,可以监视任意多个主服务器(master),
以及这些主服务器属下的所有从服务器(slave),并在被监视的主服务器进入下线状态时,自动在将被下线主服务器属下的某个从服务器升级为新的主服务器,
然后由新的主服务器代替已下线的主服务器继续处理命令请求。

基础数据结构


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
typedef struct sentinelRedisInstance {
    // 当前实例的类型和状态(master, slave, sentinel,是否下线)
    int flags;
    // 主机名,ip:port
    char *name;
    // 实例的runid
    char *runid;
    // 配置纪元
    uint64_t config_epoch;
    // 实例的地址
    sentinelAddr *addr;
    redisAsyncContext *cc; /* Hiredis context for commands. */
    redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
    int pending_commands;   /* Number of commands sent waiting for a reply. */
    mstime_t cc_conn_time; /* cc connection time. */
    mstime_t pc_conn_time; /* pc connection time. */
    mstime_t pc_last_activity; /* Last time we received any message. */
    mstime_t last_avail_time; /* Last time the instance replied to ping with
                                 a reply we consider valid. */
    mstime_t last_ping_time;  /* Last time a pending ping was sent in the
                                 context of the current command connection
                                 with the instance. 0 if still not sent or
                                 if pong already received. */
    mstime_t last_pong_time;  /* Last time the instance replied to ping,
                                 whatever the reply was. That's used to check
                                 if the link is idle and must be reconnected. */
    mstime_t last_pub_time;   /* Last time we sent hello via Pub/Sub. */
    mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
                                 we received a hello from this Sentinel
                                 via Pub/Sub. */
    mstime_t last_master_down_reply_time; /* Time of last reply to
                                             SENTINEL is-master-down command. */
    mstime_t s_down_since_time; /* Subjectively down since time. */
    mstime_t o_down_since_time; /* Objectively down since time. */
    // 无响应多少毫秒之后,进入主观下线
    mstime_t down_after_period;
    mstime_t info_refresh;  /* Time at which we received INFO output from it. */

    /* Role and the first time we observed it.
     * This is useful in order to delay replacing what the instance reports
     * with our own configuration. We need to always wait some time in order
     * to give a chance to the leader to report the new configuration before
     * we do silly things. */
    int role_reported;
    mstime_t role_reported_time;
    mstime_t slave_conf_change_time; /* Last time slave master addr changed. */

    /* Master specific. */
    dict *sentinels;    /* Other sentinels monitoring the same master. */
    dict *slaves;       /* Slaves for this master instance. */
    // 判断为客观下线需要的支持票数
    unsigned int quorum;
    // 故障转移时,可以同时对新的master进行同步的slave数量
    int parallel_syncs; /* How many slaves to reconfigure at same time. */
    char *auth_pass;    /* Password to use for AUTH against master & slaves. */

    /* Slave specific. */
    mstime_t master_link_down_time; /* Slave replication link down time. */
    int slave_priority; /* Slave priority according to its INFO output. */
    mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
    struct sentinelRedisInstance *master; /* Master instance if it's slave. */
    char *slave_master_host;    /* Master host as reported by INFO */
    int slave_master_port;      /* Master port as reported by INFO */
    int slave_master_link_status; /* Master link status as reported by INFO */
    unsigned long long slave_repl_offset; /* Slave replication offset. */
    /* Failover */
    char *leader;       /* If this is a master instance, this is the runid of
                           the Sentinel that should perform the failover. If
                           this is a Sentinel, this is the runid of the Sentinel
                           that this Sentinel voted as leader. */
    uint64_t leader_epoch; /* Epoch of the 'leader' field. */
    uint64_t failover_epoch; /* Epoch of the currently started failover. */
    int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
    mstime_t failover_state_change_time;
    mstime_t failover_start_time;   /* Last failover attempt start time. */
    // 刷新故障迁移状态的最大时限
    mstime_t failover_timeout;      /* Max time to refresh failover state. */
    mstime_t failover_delay_logged; /* For what failover_start_time value we
                                       logged the failover delay. */
    struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
    /* Scripts executed to notify admin or reconfigure clients: when they
     * are set to NULL no script is executed. */
    char *notification_script;
    char *client_reconfig_script;
} sentinelRedisInstance;

struct sentinelState {
    // 当前纪元
    uint64_t current_epoch;
    // 监控的master字典,key是master名称,value是sentinelRedisInstance对象
    dict *masters;
    // 是否处于TILT模式
    int tilt;
    // 目前正在执行脚本的数量
    int running_scripts;
    // 进入TITL模式时间
    mstime_t tilt_start_time;
    // 最后一次执行时间处理器的时间
    mstime_t previous_time;
    // 用户脚本执行队列
    list *scripts_queue;
} sentinel;

Sentinel初始化

启动命令:


1
redis-sentinel /path/to/sentinel.conf

或者


1
redis-server /path/to/sentinel.conf --sentinel

Sentinel启动的时候,必须指定配置文件,最小配置类似:


1
2
3
4
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel down-after-milliseconds mymaster 60000
sentinel failover-timeout mymaster 180000
sentinel parallel-syncs mymaster 1

这个配置文件表示,当前sentinel监视一个Redis master(mymaster),ip为127.0.0.1,端口为6379,
需要两个sentinel声明下线,才进行主备切换。mymaster 60000ms未响应标记为失效。

在main函数中,会直接对sentinel启动特殊的配置:


1
2
3
4
5
6
7
8
int main(int argc, char **argv) {
    ...
    if (server.sentinel_mode) {
        initSentinelConfig();
        initSentinel();
    }
    ...
}

首先是覆盖redis server的端口设置,sentinel会默认监听在26379端口:


1
2
3
4
#define REDIS_SENTINEL_PORT 26379
void initSentinelConfig(void) {
    server.port = REDIS_SENTINEL_PORT;
}

然后覆盖server的命令表格,初始化sentinel对象:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void initSentinel(void) {
    unsigned int j;

    /* Remove usual Redis commands from the command table, then just add
     * the SENTINEL command. */
     // 清空Redis支持的命令表格,改成sentinel支持的命令表格
    dictEmpty(server.commands,NULL);
    for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
        int retval;
        struct redisCommand *cmd = sentinelcmds+j;

        retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
        redisAssert(retval == DICT_OK);
    }

    /* Initialize various data structures. */
    sentinel.current_epoch = 0;
    sentinel.masters = dictCreate(&instancesDictType,NULL);
    sentinel.tilt = 0;
    sentinel.tilt_start_time = 0;
    sentinel.previous_time = mstime();
    sentinel.running_scripts = 0;
    sentinel.scripts_queue = listCreate();
    sentinel.announce_ip = NULL;
    sentinel.announce_port = 0;
}

sentinel只能支持监控相关的命令,无法执行通常的Redis命令,sentinel可以支持的命令表格为:


1
2
3
4
5
6
7
8
9
10
11
12
struct redisCommand sentinelcmds[] = {
    {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
    {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
    {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
    {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
    {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
    {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
    {"publish",sentinelPublishCommand,3,"",0,NULL,0,0,0,0,0},
    {"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0},
    {"role",sentinelRoleCommand,1,"l",0,NULL,0,0,0,0,0},
    {"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0}
};

同时,由于sentinel不接受Redis普通命令,因此初始化的时候,也不会去加载rdb文件等原始数据。

chrome爬页面上表格某列的数据

有的时候会有这样一个需求,页面上有个大表格,我需要复制里面的一列到本地。比如,我要从表格里面,复制列出来的所有机器IP(这样比数据库导出方便点~)

首先,先用chrome的开发者工具,找到要复制的列中的某一个单元格,然后选择复制xpath。这样会复制下来这个元素的xpath路径,比如:

//*[@id="machineGroupTable"]/tbody/tr[2]/td[2]

chrome的console,支持用$x()函数直接用xpath来定位元素,因此,可以通过类似这样的js,来获取刚选中单元格所在的所有列:

1
$x('//<em>[@id="machineGroupTable"]/tbody//td[2]')

这样返回的是chrome经过处理的xpath结果,直接就是dom的数组,因此可以直接遍历,获取单元格中的文本。

1
2
var ip=[];
$x('//</em>[@id="machineGroupTable"]/tbody//td[2]').forEach(function(e){ip.push(e.innerText)})

这样就把这列的所有内容,放到了ip这个数组中。

最后,把ip数组复制出来:

1
copy(ip.join('\n'))

copy也是chrome console内置的命令,可以把传进去的参数复制到剪切板。注意这里要自己join下,不然会直接输出json格式的字符串。

这样,表格的列已经被复制到了剪切板,直接粘贴到需要的文本中即可。

[redis设计与实现][9]复制

复制(Redis2.8)

设置主服务器的地址和端口(SLAVE OF命令)

SLAVEOF host port

Redis的主从复制设置非常方便,只需要在从服务器上设置主服务器的IP和端口即可。如果需要关闭主从同步,只需要执行SLAVEOF NO ONE即可。 该命令的具体描述见官方文档

void slaveofCommand(redisClient *c) {
    // 先处理no one,解除现有的主从同步 
    if (!strcasecmp(c->argv[1]->ptr,no) &&
        !strcasecmp(c->argv[2]->ptr,one)) {
        if (server.masterhost{
            replicationUnsetMaster();
            redisLog(REDIS_NOTICE,MASTER MODE enabled (user request));
        }
    } else {
        long port;
 
        if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK))
            return;
 
        /* Check if we are already attached to the specified slave */
        if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
            && server.masterport == port) {
            redisLog(REDIS_NOTICE,SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.);
            addReplySds(c,sdsnew(+OK Already connected to specified master\r\n));
            return;
        }
        /* There was no previous master or the user specified a different one,
         * we can continue. */
         // 设置新的主从同步,这里只是设置,然后直接返回 
        replicationSetMaster(c->argv[1]->ptr, port);
        redisLog(REDIS_NOTICE,SLAVE OF %s:%d enabled (user request),
            server.masterhost, server.masterport);
    }
    addReply(c,shared.ok);
}
 
void replicationSetMaster(char *ip, int port) {
    sdsfree(server.masterhost);
    server.masterhost = sdsdup(ip);
    server.masterport = port;
    //如果当前slave以前是master,断开所有原先的连接 
    if (server.master) freeClient(server.master);
    disconnectSlaves(); /* Force our slaves to resync with us as well. */
    replicationDiscardCachedMaster(); /* Don’t try a PSYNC. */
    freeReplicationBacklog(); /* Don’t allow our chained slaves to PSYNC. */
    cancelReplicationHandshake();
    server.repl_state = REDIS_REPL_CONNECT;
    server.master_repl_offset = 0;
}

可以看到,slaveof命令是一个异步命令,执行的时候只是设置了新的主服务器,然后就立马返回结果了。真正执行连接等操作的, 是在定时器中执行的。

/* Replication cron function — used to reconnect to master and
 * to detect transfer failures. */
run_with_period(1000) replicationCron();

建立套接字连接

提醒哦那个每隔1秒钟,会调用replicationCron函数,该函数会根据状态执行定时操作。当状态为REDIS_REPL_CONNECT的时候 执行逻辑为:

void replicationCron(void) {
    …
    /* Check if we should connect to a MASTER */
    if (server.repl_state == REDIS_REPL_CONNECT) {
        redisLog(REDIS_NOTICE,Connecting to MASTER %s:%d,
            server.masterhost, server.masterport);
        if (connectWithMaster() == REDIS_OK) {
            redisLog(REDIS_NOTICE,MASTER <-> SLAVE sync started);
        }
    }
    …
}
 
int connectWithMaster(void) {
    int fd;
 
    fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
    if (fd == -1{
        redisLog(REDIS_WARNING,Unable to connect to MASTER: %s,
            strerror(errno));
        return REDIS_ERR;
    }
 
    if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
            AE_ERR)
    {
        close(fd);
        redisLog(REDIS_WARNING,Can’t create readable event for SYNC);
        return REDIS_ERR;
    }
 
    server.repl_transfer_lastio = server.unixtime;
    server.repl_transfer_s = fd;
    server.repl_state = REDIS_REPL_CONNECTING;
    return REDIS_OK;
}

如果发现当前主从状态是REDIS_REPL_CONNECT(刚执行slaveof的时候设置的),就会去连接主服务器。当socket连接建立之后, 会注册syncWithMaster这个回调,并且设置主从状态为REDIS_REPL_CONNECTING。

发送PING命令

PING命令都很熟悉了,jedis pool中用来检测当前连接是否有效,用的就是这个命令。手工执行PING命令,Redis会返回一个PONG作为响应。

这里发送PING命令,主要也是为了检测当前和master连接是否正常,master是否能够正常处理命令。

void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
    …
    if (server.repl_state == REDIS_REPL_CONNECTING) {
        redisLog(REDIS_NOTICE,Non blocking connect for SYNC fired the event.);
        /* Delete the writable event so that the readable event remains
         * registered and we can wait for the PONG reply. */
        aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
        server.repl_state = REDIS_REPL_RECEIVE_PONG;
        /* Send the PING, don’t check for errors at all, we have the timeout
         * that will take care about this. */
        syncWrite(fd,PING\r\n,6,100);
        return;
    }
    …
}

这里当状态是REDIS_REPL_CONNECTING的时候,向master发送了PING命令,然后就等待master返回PONG的响应。

PONG响应也是在这个函数中进行处理的:

void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
    …
    /* Receive the PONG command. */
    if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
        char buf[1024];
 
        /* Delete the readable event, we no longer need it now that there is
         * the PING reply to read. */
        aeDeleteFileEvent(server.el,fd,AE_READABLE);
 
        /* Read the reply with explicit timeout. */
        buf[0] = \0;
        if (syncReadLine(fd,buf,sizeof(buf),
            server.repl_syncio_timeout*1000) == -1)
        {
            redisLog(REDIS_WARNING,
                I/O error reading PING reply from master: %s,
                strerror(errno));
            goto error;
        }
 
        /* We accept only two replies as valid, a positive +PONG reply
         * (we just check for “+”) or an authentication error.
         * Note that older versions of Redis replied with “operation not
         * permitted” instead of using a proper error code, so we test
         * both. */
        if (buf[0] != + &&
            strncmp(buf,-NOAUTH,7) != 0 &&
            strncmp(buf,-ERR operation not permitted,28) != 0)
        {
            redisLog(REDIS_WARNING,Error reply to PING from master: ‘%s,buf);
            goto error;
        } else {
            redisLog(REDIS_NOTICE,
                Master replied to PING, replication can continue…);
        }
    }
 
    /* AUTH with the master if required. */
    if(server.masterauth{
        err = sendSynchronousCommand(fd,AUTH,server.masterauth,NULL);
        if (err[0] == -{
            redisLog(REDIS_WARNING,Unable to AUTH to MASTER: %s,err);
            sdsfree(err);
            goto error;
        }
        sdsfree(err);
    }
 
    /* Set the slave port, so that Master’s INFO command can list the
     * slave listening port correctly. */
    {
        sds port = sdsfromlonglong(server.port);
        err = sendSynchronousCommand(fd,REPLCONF,listening-port,port,
                                         NULL);
        sdsfree(port);
        /* Ignore the error if any, not all the Redis versions support
         * REPLCONF listening-port. */
        if (err[0] == -{
            redisLog(REDIS_NOTICE,(Non critical) Master does not understand REPLCONF listening-port: %s, err);
        }
        sdsfree(err);
    }
    …
    error:
        close(fd);
        server.repl_transfer_s = -1;
        server.repl_state = REDIS_REPL_CONNECT;
        return;
}
  • 如果读取master返回值失败,直接跳转到error,关闭连接,重新将连接状态设置为REDIS_REPL_CONNECT(也就是SLAVEOF执行完成之后的状态), 等待下次定时器重连;
  • 读取响应成功,判断响应值是否为PONG,如果为PONG则表示连接检测完成,将发送当前slave端口信息,用于master同步数据
  • 如果判断是需要认证,切设置了masterauth,则发送AUTH命令,向master发起授权。
    • 如果授权成功,将继续后续的同步流程
    • 如果授权失败,则进入error流程,关闭连接,并等待下次重试

发送端口信息

前面的PONG响应流程里面已经提到了,当正确接收到了PONG响应,或者是完成了认证之后,slave会发起一个REPLCONF命令,将自己的端口发送给master。 master接受到这个命令之后,将slave的端口信息记录到这个slave对应的client对象的slave_listening_port属性中。

void replconfCommand(redisClient *c) {
    …
    if (!strcasecmp(c->argv[j]->ptr,listening-port)) {
        long port;
 
        if ((getLongFromObjectOrReply(c,c->argv[j+1],
                &port,NULL) != REDIS_OK))
            return;
        c->slave_listening_port = port;
    }
    …
}

这时,在master上通过INFO命令,就可以看见slave的端口信息:

INFO replication

同步

还是在syncWithMaster函数中。发送完端口信息之后,slave会尝试进行增量同步:

void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
    …
    psync_result = slaveTryPartialResynchronization(fd);
    if (psync_result == PSYNC_CONTINUE) {
        redisLog(REDIS_NOTICE, MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.);
        return;
    }
 
    /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
     * and the server.repl_master_runid and repl_master_initial_offset are
     * already populated. */
    if (psync_result == PSYNC_NOT_SUPPORTED) {
        redisLog(REDIS_NOTICE,Retrying with SYNC…);
        if (syncWrite(fd,SYNC\r\n,6,server.repl_syncio_timeout*1000) == -1{
            redisLog(REDIS_WARNING,I/O error writing to MASTER: %s,
                strerror(errno));
            goto error;
        }
    }
    …

如果不支持增量同步,会向master发送SYNC命令做全量同步。增量同步是在Redis2.8中支持的,所以全量同步就不管了。大致的操作流程就是 master做一次BGSAVE,然后将保存的rdb文件通过TCP连接发送给slave,slave加载这个rdb文件。

这里着重了解增量同步:

#define PSYNC_CONTINUE 0
#define PSYNC_FULLRESYNC 1
#define PSYNC_NOT_SUPPORTED 2
int slaveTryPartialResynchronization(int fd) {
    char *psync_runid;
    char psync_offset[32];
    sds reply;
 
    /* Initially set repl_master_initial_offset to -1 to mark the current
     * master run_id and offset as not valid. Later if we’ll be able to do
     * a FULL resync using the PSYNC command we’ll set the offset at the
     * right value, so that this information will be propagated to the
     * client structure representing the master into server.master. */
    server.repl_master_initial_offset = -1;
 
    if (server.cached_master{
        psync_runid = server.cached_master->replrunid;
        snprintf(psync_offset,sizeof(psync_offset),%lld, server.cached_master->reploff+1);
        redisLog(REDIS_NOTICE,Trying a partial resynchronization (request %s:%s)., psync_runid, psync_offset);
    } else {
        redisLog(REDIS_NOTICE,Partial resynchronization not possible (no cached master));
        psync_runid = ?;
        memcpy(psync_offset,-1,3);
    }
 
    /* Issue the PSYNC command */
    reply = sendSynchronousCommand(fd,PSYNC,psync_runid,psync_offset,NULL);
 
    if (!strncmp(reply,+FULLRESYNC,11)) {
        char *runid = NULL, *offset = NULL;
 
        /* FULL RESYNC, parse the reply in order to extract the run id
         * and the replication offset. */
        runid = strchr(reply, );
        if (runid) {
            runid++;
            offset = strchr(runid, );
            if (offset) offset++;
        }
        if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
            redisLog(REDIS_WARNING,
                Master replied with wrong +FULLRESYNC syntax.);
            /* This is an unexpected condition, actually the +FULLRESYNC
             * reply means that the master supports PSYNC, but the reply
             * format seems wrong. To stay safe we blank the master
             * runid to make sure next PSYNCs will fail. */
            memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
        } else {
            memcpy(server.repl_master_runid, runid, offset-runid-1);
            server.repl_master_runid[REDIS_RUN_ID_SIZE] = \0;
            server.repl_master_initial_offset = strtoll(offset,NULL,10);
            redisLog(REDIS_NOTICE,Full resync from master: %s:%lld,
                server.repl_master_runid,
                server.repl_master_initial_offset);
        }
        /* We are going to full resync, discard the cached master structure. */
        replicationDiscardCachedMaster();
        sdsfree(reply);
        return PSYNC_FULLRESYNC;
    }
 
    if (!strncmp(reply,+CONTINUE,9)) {
        /* Partial resync was accepted, set the replication state accordingly */
        redisLog(REDIS_NOTICE,
            Successful partial resynchronization with master.);
        sdsfree(reply);
        replicationResurrectCachedMaster(fd);
        return PSYNC_CONTINUE;
    }
 
    /* If we reach this point we receied either an error since the master does
     * not understand PSYNC, or an unexpected reply from the master.
     * Return PSYNC_NOT_SUPPORTED to the caller in both cases. */
 
    if (strncmp(reply,-ERR,4)) {
        /* If it’s not an error, log the unexpected event. */
        redisLog(REDIS_WARNING,
            Unexpected reply to PSYNC from master: %s, reply);
    } else {
        redisLog(REDIS_NOTICE,
            Master does not support PSYNC or is in 
            error state (reply: %s), reply);
    }
    sdsfree(reply);
    replicationDiscardCachedMaster();
    return PSYNC_NOT_SUPPORTED;
}
  1. 首先设置同步偏移量为-1,表示第一次增量更新(其实也就是个全量更新)
  2. 向master发送PSYNC命令,告知master自己的id和同步偏移量
  3. master返回全量更新(FULLRESYNC),保存master返回的偏移量和运行id,清除之前缓存的master信息 确认可以增量同步后,由于第一次是全量同步,因此操作和原全量同步相同:
    void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
     …
     /* Prepare a suitable temp file for bulk transfer */
     while(maxtries–) {
         snprintf(tmpfile,256,
             temp-%d.%ld.rdb,(int)server.unixtime,(long int)getpid());
         dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
         if (dfd != -1break;
         sleep(1);
     }
     if (dfd == -1{
         redisLog(REDIS_WARNING,Opening the temp file needed for MASTER <-> SLAVE synchronization: %s,strerror(errno));
         goto error;
     }
     
     /* Setup the non blocking download of the bulk file. */
     if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
             == AE_ERR)
     {
         redisLog(REDIS_WARNING,
             Can’t create readable event for SYNC: %s (fd=%d),
             strerror(errno),fd);
         goto error;
     }
     
     server.repl_state = REDIS_REPL_TRANSFER;
     server.repl_transfer_size = -1;
     server.repl_transfer_read = 0;
     server.repl_transfer_last_fsync_off = 0;
     server.repl_transfer_fd = dfd;
     server.repl_transfer_lastio = server.unixtime;
     server.repl_transfer_tmpfile = zstrdup(tmpfile);
     return;
    }
  4. 创建一个临时文件,用于保存master传回的rdb文件
  5. 开始读取master传输回来的rdb文件,注册readSyncBulkPayload回调函数来处理
  6. 设置当前的状态为REDIS_REPL_TRANSFER,并保存传输文件等中间内容

readSyncBulkPayload函数用于接收master传输的rdb文件,并加载到Redis中,大致流程:

  1. 读取文件长度
  2. 读取文件内容,并保存到本地rdb临时文件中
  3. 读取完成之后,清空Redis数据库
  4. 加载rdb文件
  5. 创建一个master -> slave的通道,将当前slave作为master的client,以继续执行master同步过来的命令
  6. 将同步状态改成REDIS_REPL_CONNECTED,并回写同步偏移量等
  7. 开启aof如果需要(server.aof_state != REDIS_AOF_OFF)

master对PSYNC命令的处理

void syncCommand(redisClient *c) {
    /* ignore SYNC if already slave or in monitor mode */
    if (c->flags & REDIS_SLAVE) return;
 
    /* Refuse SYNC requests if we are a slave but the link with our master
     * is not ok… */
    if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) {
        addReplyError(c,Can’t SYNC while not connected with my master);
        return;
    }
 
    /* SYNC can’t be issued when the server has pending data to send to
     * the client about already issued commands. We need a fresh reply
     * buffer registering the differences between the BGSAVE and the current
     * dataset, so that we can copy to other slaves if needed. */
    if (listLength(c->reply) != 0 || c->bufpos != 0{
        addReplyError(c,SYNC and PSYNC are invalid with pending output);
        return;
    }
 
    redisLog(REDIS_NOTICE,Slave asks for synchronization);
 
    /* Try a partial resynchronization if this is a PSYNC command.
     * If it fails, we continue with usual full resynchronization, however
     * when this happens masterTryPartialResynchronization() already
     * replied with:
     *
     * +FULLRESYNC <runid> <offset>
     *
     * So the slave knows the new runid and offset to try a PSYNC later
     * if the connection with the master is lost. */
    if (!strcasecmp(c->argv[0]->ptr,psync)) {
        if (masterTryPartialResynchronization(c) == REDIS_OK) {
            server.stat_sync_partial_ok++;
            return/* No full resync needed, return. */
        } else {
            char *master_runid = c->argv[1]->ptr;
 
            /* Increment stats for failed PSYNCs, but only if the
             * runid is not “?”, as this is used by slaves to force a full
             * resync on purpose when they are not albe to partially
             * resync. */
            if (master_runid[0] != ?) server.stat_sync_partial_err++;
        }
    } else {
        /* If a slave uses SYNC, we are dealing with an old implementation
         * of the replication protocol (like redis-cli –slave). Flag the client
         * so that we don’t expect to receive REPLCONF ACK feedbacks. */
        c->flags |= REDIS_PRE_PSYNC;
    }
 
    /* Full resynchronization. */
    server.stat_sync_full++;
 
    /* Here we need to check if there is a background saving operation
     * in progress, or if it is required to start one */
    if (server.rdb_child_pid != -1{
        /* Ok a background save is in progress. Let’s check if it is a good
         * one for replication, i.e. if there is another slave that is
         * registering differences since the server forked to save */
        redisClient *slave;
        listNode *ln;
        listIter li;
 
        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            slave = ln->value;
            if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
        }
        if (ln) {
            /* Perfect, the server is already registering differences for
             * another slave. Set the right state, and copy the buffer. */
            copyClientOutputBuffer(c,slave);
            c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
            redisLog(REDIS_NOTICE,Waiting for end of BGSAVE for SYNC);
        } else {
            /* No way, we need to wait for the next BGSAVE in order to
             * register differences */
            c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
            redisLog(REDIS_NOTICE,Waiting for next BGSAVE for SYNC);
        }
    } else {
        /* Ok we don’t have a BGSAVE in progress, let’s start one */
        redisLog(REDIS_NOTICE,Starting BGSAVE for SYNC);
        if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
            redisLog(REDIS_NOTICE,Replication failed, can’t BGSAVE);
            addReplyError(c,Unable to perform background save);
            return;
        }
        c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
        /* Flush the script cache for the new slave. */
        replicationScriptCacheFlush();
    }
 
    if (server.repl_disable_tcp_nodelay)
        anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
    c->repldbfd = -1;
    c->flags |= REDIS_SLAVE;
    server.slaveseldb = -1/* Force to re-emit the SELECT command. */
    listAddNodeTail(server.slaves,c);
    if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
        createReplicationBacklog();
    return;
}
  1. 首先判断自己是slave的时候不能执行psync
  2. 判断是否需要全量同步,如果不需要,直接退出
  3. 如果需要全量同步,创建一个rdb文件
    • 如果已经在写rdb文件,尽量复用当前的文件
    • 如果没有,则发起一个bgsave

判断是否需要全量同步:

int masterTryPartialResynchronization(redisClient *c) {
    long long psync_offset, psync_len;
    char *master_runid = c->argv[1]->ptr;
    char buf[128];
    int buflen;
 
    /* Is the runid of this master the same advertised by the wannabe slave
     * via PSYNC? If runid changed this master is a different instance and
     * there is no way to continue. */
    if (strcasecmp(master_runid, server.runid)) {
        /* Run id “?” is used by slaves that want to force a full resync. */
        if (master_runid[0] != ?{
            redisLog(REDIS_NOTICE,Partial resynchronization not accepted: 
                Runid mismatch (Client asked for ‘%s‘, I’m ‘%s‘),
                master_runid, server.runid);
        } else {
            redisLog(REDIS_NOTICE,Full resync requested by slave.);
        }
        goto need_full_resync;
    }
 
    /* We still have the data our slave is asking for? */
    if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
       REDIS_OK) goto need_full_resync;
    if (!server.repl_backlog ||
        psync_offset < server.repl_backlog_off ||
        psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
    {
        redisLog(REDIS_NOTICE,
            Unable to partial resync with the slave for lack of backlog (Slave request was: %lld)., psync_offset);
        if (psync_offset > server.master_repl_offset{
            redisLog(REDIS_WARNING,
                Warning: slave tried to PSYNC with an offset that is greater than the master replication offset.);
        }
        goto need_full_resync;
    }
 
    /* If we reached this point, we are able to perform a partial resync:
     * 1) Set client state to make it a slave.
     * 2) Inform the client we can continue with +CONTINUE
     * 3) Send the backlog data (from the offset to the end) to the slave. */
    c->flags |= REDIS_SLAVE;
    c->replstate = REDIS_REPL_ONLINE;
    c->repl_ack_time = server.unixtime;
    listAddNodeTail(server.slaves,c);
    /* We can’t use the connection buffers since they are used to accumulate
     * new commands at this stage. But we are sure the socket send buffer is
     * emtpy so this write will never fail actually. */
    buflen = snprintf(buf,sizeof(buf),+CONTINUE\r\n);
    if (write(c->fd,buf,buflen) != buflen) {
        freeClientAsync(c);
        return REDIS_OK;
    }
    psync_len = addReplyReplicationBacklog(c,psync_offset);
    redisLog(REDIS_NOTICE,
        Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld., psync_len, psync_offset);
    /* Note that we don’t need to set the selected DB at server.slaveseldb
     * to -1 to force the master to emit SELECT, since the slave already
     * has this state from the previous connection with the master. */
 
    refreshGoodSlavesCount();
    return REDIS_OK; /* The caller can return, no full resync needed. */
 
need_full_resync:
    /* We need a full resync for some reason… notify the client. */
    psync_offset = server.master_repl_offset;
    /* Add 1 to psync_offset if it the replication backlog does not exists
     * as when it will be created later we’ll increment the offset by one. */
    if (server.repl_backlog == NULL) psync_offset++;
    /* Again, we can’t use the connection buffers (see above). */
    buflen = snprintf(buf,sizeof(buf),+FULLRESYNC %s %lld\r\n,
                      server.runid,psync_offset);
    if (write(c->fd,buf,buflen) != buflen) {
        freeClientAsync(c);
        return REDIS_OK;
    }
    return REDIS_ERR;
}

主要场景有两个:

  1. 当前请求的id和server的id不匹配
  2. 当前Redis保存的日志无法满足slave要求的偏移量
    • master还没有back log
    • master back log长度不够

同时,每次rdb文件保存完毕的时候,都会调用updateSlavesWaitingBgsave函数,处理保存的rdb文件。

void updateSlavesWaitingBgsave(int bgsaveerr) {
    listNode *ln;
    int startbgsave = 0;
    listIter li;
 
    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        redisClient *slave = ln->value;
 
        if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
            startbgsave = 1;
            slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
        } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
            // rdb文件写入完毕 
            struct redis_stat buf;
 
            if (bgsaveerr != REDIS_OK) {
                freeClient(slave);
                redisLog(REDIS_WARNING,SYNC failed. BGSAVE child returned an error);
                continue;
            }
            // 打开刚写入的rdb文件 
            if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
                redis_fstat(slave->repldbfd,&buf) == -1{
                freeClient(slave);
                redisLog(REDIS_WARNING,SYNC failed. Can’t open/stat DB after BGSAVE: %s, strerror(errno));
                continue;
            }
            slave->repldboff = 0;
            slave->repldbsize = buf.st_size;
            slave->replstate = REDIS_REPL_SEND_BULK;
            aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
            // 开始发送rdb文件 
            if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
                freeClient(slave);
                continue;
            }
        }
    }
    if (startbgsave) {
        /* Since we are starting a new background save for one or more slaves,
         * we flush the Replication Script Cache to use EVAL to propagate every
         * new EVALSHA for the first time, since all the new slaves don’t know
         * about previous scripts. */
        replicationScriptCacheFlush();
        if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
            listIter li;
 
            listRewind(server.slaves,&li);
            redisLog(REDIS_WARNING,SYNC failed. BGSAVE failed);
            while((ln = listNext(&li))) {
                redisClient *slave = ln->value;
 
                if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
                    freeClient(slave);
            }
        }
    }
}

命令传播

上面的流程结束,slave已经包含了master BGSAVE时所包含的所有数据。后续就需要master一直将自己的命令发送给slave。

void call(redisClient *c, int flags) {
    …
    /* Propagate the command into the AOF and replication link */
    if (flags & REDIS_CALL_PROPAGATE) {
        int flags = REDIS_PROPAGATE_NONE;
 
        if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;
        if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;
        if (dirty)
            flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
        if (flags != REDIS_PROPAGATE_NONE)
            propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
    }
    …
}
 
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);
    if (flags & REDIS_PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

在调用任何命令的时候,都会将命令分发到slave上去(除了AOF加载或者命令加了REDIS_CMD_SKIP_MONITOR标签)。

replicationFeedSlaves函数主要作用有两个:

  1. 将命令发送给所有在线的slave
  2. 将命令写入到back log中,方便后续增量同步
    void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
     listNode *ln;
     listIter li;
     int j, len;
     char llstr[REDIS_LONGSTR_SIZE];
     
     /* If there aren’t slaves, and there is no backlog buffer to populate,
      * we can return ASAP. */
     if (server.repl_backlog == NULL && listLength(slaves) == 0return;
     
     /* We can’t have slaves attached and no backlog. */
     redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
     
     /* Send SELECT command to every slave if needed. */
     if (server.slaveseldb != dictid) {
         robj *selectcmd;
     
         /* For a few DBs we have pre-computed SELECT command. */
         // 每次都增加一个SELECT命令,防止弄错db 
         if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
             selectcmd = shared.select[dictid];
         } else {
             int dictid_len;
     
             dictid_len = ll2string(llstr,sizeof(llstr),dictid);
             selectcmd = createObject(REDIS_STRING,
                 sdscatprintf(sdsempty(),
                 *2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n,
                 dictid_len, llstr));
         }
     
         /* Add the SELECT command into the backlog. */
         // 将select命令写入到backlog中 
         if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
     
         /* Send it to slaves. */
         // 将select命令发送给slave 
         listRewind(slaves,&li);
         while((ln = listNext(&li))) {
             redisClient *slave = ln->value;
             addReply(slave,selectcmd);
         }
     
         if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)
             decrRefCount(selectcmd);
     }
     server.slaveseldb = dictid;
     
     /* Write the command to the replication backlog if any. */
     // 将命令写入到backlog中 
     if (server.repl_backlog{
         char aux[REDIS_LONGSTR_SIZE+3];
     
         /* Add the multi bulk reply length. */
         aux[0] = *;
         len = ll2string(aux+1,sizeof(aux)-1,argc);
         aux[len+1] = \r;
         aux[len+2] = \n;
         feedReplicationBacklog(aux,len+3);
     
         for (j = 0; j < argc; j++) {
             long objlen = stringObjectLen(argv[j]);
     
             /* We need to feed the buffer with the object as a bulk reply
              * not just as a plain string, so create the $..CRLF payload len
              * ad add the final CRLF */
             aux[0] = $;
             len = ll2string(aux+1,sizeof(aux)-1,objlen);
             aux[len+1] = \r;
             aux[len+2] = \n;
             feedReplicationBacklog(aux,len+3);
             feedReplicationBacklogWithObject(argv[j]);
             feedReplicationBacklog(aux+len+1,2);
         }
     }
     
     /* Write the command to every slave. */
     // 将命令发送到所有的slave 
     listRewind(slaves,&li);
     while((ln = listNext(&li))) {
         redisClient *slave = ln->value;
     
         /* Don’t feed slaves that are still waiting for BGSAVE to start */
         if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
     
         /* Feed slaves that are waiting for the initial SYNC (so these commands
          * are queued in the output buffer until the initial SYNC completes),
          * or are already in sync with the master. */
     
         /* Add the multi bulk length. */
         addReplyMultiBulkLen(slave,argc);
     
         /* Finally any additional argument that was not stored inside the
          * static buffer if any (from j to argc). */
         for (j = 0; j < argc; j++)
             addReplyBulk(slave,argv[j]);
     }
    }

    注:backlog大小可以设置,默认的大小为1M,如果超过,覆盖最初的日志

    #define REDIS_DEFAULT_REPL_BACKLOG_SIZE (1024*1024)    /* 1mb */

心跳检测和命令丢失补偿

在命令传播阶段,slave每秒一次向master发送REPLCONF命令,发送当前的offset,让master检测是否有命令丢失。 这个也是在定时器中发送的。

void replicationCron(void) {
    …
    if (server.masterhost && server.master &&
        !(server.master->flags & REDIS_PRE_PSYNC))
        replicationSendAck();
    …
}
 
void replicationSendAck(void) {
    redisClient *c = server.master;
 
    if (c != NULL{
        c->flags |= REDIS_MASTER_FORCE_REPLY;
        addReplyMultiBulkLen(c,3);
        addReplyBulkCString(c,REPLCONF);
        addReplyBulkCString(c,ACK);
        addReplyBulkLongLong(c,c->reploff);
        c->flags &= ~REDIS_MASTER_FORCE_REPLY;
    }
}

同时,master在接收到这个ACK包的时候,会记录slave的ack offset和ack时间:

void replconfCommand(redisClient *c) {
    …
    else if (!strcasecmp(c->argv[j]->ptr,ack)) {
        /* REPLCONF ACK is used by slave to inform the master the amount
         * of replication stream that it processed so far. It is an
         * internal only command that normal clients should never use. */
        long long offset;
 
        if (!(c->flags & REDIS_SLAVE)) return;
        if ((getLongLongFromObject(c->argv[j+1], &offset) != REDIS_OK))
            return;
        if (offset > c->repl_ack_off)
            c->repl_ack_off = offset;
        c->repl_ack_time = server.unixtime;
        /* Note: this command does not reply anything! */
        return;
    }
    …
}

还是在定时器中,每次调用的时候都会清理已经超时的slave:

void replicationCron(void) {
    …
    /* Disconnect timedout slaves. */
    if (listLength(server.slaves)) {
        listIter li;
        listNode *ln;
 
        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            redisClient *slave = ln->value;
 
            if (slave->replstate != REDIS_REPL_ONLINE) continue;
            if (slave->flags & REDIS_PRE_PSYNC) continue;
            if ((server.unixtime – slave->repl_ack_time) > server.repl_timeout)
            {
                char ip[REDIS_IP_STR_LEN];
                int port;
 
                if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) != -1{
                    redisLog(REDIS_WARNING,
                        Disconnecting timedout slave: %s:%d,
                        ip, slave->slave_listening_port);
                }
                freeClient(slave);
            }
        }
    }
    …
}

这里的repl_ack_time由slave每次发送的ack包写入,server.repl_timeout默认值是60s:

#define REDIS_REPL_TIMEOUT 60

增量同步

master断开了slave连接之后,slave为了能够进行增量同步,freeClient的实现,针对master的slave client,也有不同的处理:

void freeClient(redisClient *c) {
    …
    /* If it is our master that’s beging disconnected we should make sure
     * to cache the state to try a partial resynchronization later.
     *
     * Note that before doing this we make sure that the client is not in
     * some unexpected state, by checking its flags. */
    if (server.master && c->flags & REDIS_MASTER) {
        redisLog(REDIS_WARNING,Connection with master lost.);
        if (!(c->flags & (REDIS_CLOSE_AFTER_REPLY|
                          REDIS_CLOSE_ASAP|
                          REDIS_BLOCKED|
                          REDIS_UNBLOCKED)))
        {
            replicationCacheMaster(c);
            return;
        }
    }
    …
}
 
void replicationCacheMaster(redisClient *c) {
    listNode *ln;
 
    redisAssert(server.master != NULL && server.cached_master == NULL);
    redisLog(REDIS_NOTICE,Caching the disconnected master state.);
 
    /* Remove from the list of clients, we don’t want this client to be
     * listed by CLIENT LIST or processed in any way by batch operations. */
     // 首先将slave从client列表中删除 
    ln = listSearchKey(server.clients,c);
    redisAssert(ln != NULL);
    listDelNode(server.clients,ln);
 
    /* Save the master. Server.master will be set to null later by
     * replicationHandleMasterDisconnection(). */
     // 把slave的master保存到cached_master中 
    server.cached_master = server.master;
 
    /* Remove the event handlers and close the socket. We’ll later reuse
     * the socket of the new connection with the master during PSYNC. */
     // 清理slave连接,释放资源 
    aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
    aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
    close(c->fd);
 
    /* Set fd to -1 so that we can safely call freeClient(c) later. */
    c->fd = -1;
 
    /* Invalidate the Peer ID cache. */
    if (c->peerid) {
        sdsfree(c->peerid);
        c->peerid = NULL;
    }
·
    /* Caching the master happens instead of the actual freeClient() call,
     * so make sure to adjust the replication state. This function will
     * also set server.master to NULL. */
    replicationHandleMasterDisconnection();
}
void replicationHandleMasterDisconnection(void) {
    server.master = NULL;
    server.repl_state = REDIS_REPL_CONNECT;
    server.repl_down_since = server.unixtime;
    /* We lost connection with our master, force our slaves to resync
     * with us as well to load the new data set.
     *
     * If server.masterhost is NULL the user called SLAVEOF NO ONE so
     * slave resync is not needed. */
    if (server.masterhost != NULL) disconnectSlaves();
}

经过这些处理,一个断开连接的slave,复制状态变成了REDIS_REPL_CONNECT。按照之前的流程,定时器会去尝试连接master, 发送PING命令,然后再发送PSYNC命令的时候,由于已经有了cached_master,会在PSYNC命令中带上之前master的id和偏移量。 相关slave和master的处理逻辑,前面代码中已经有了。

[redis设计与实现][6]基本数据结构——压缩列表

压缩列表(ziplist)是列表键和哈希键的底层实现之一。
压缩列表结构:

属性 类型 长度 用途
zlbytes uint32_t 4字节 记录整个压缩列表占用的内存字节数:在对压缩列表进行内存重分配,或者计算zlend的位置时使用
zltail uint32_t 4字节 记录压缩列表尾节点记录压缩列表的起始地址便宜:用于快速定位尾节点
zllen uint16_t 2字节 记录压缩列表包含的节点数量:当这个值小于UINT64_MAX(65535)时,这个属性的值就是压缩列表包含的节点数量;当这个值等于UINT64_MAX时,节点的真实数量需要遍历整个压缩列表才能计算得出
entryX 列表节点 不定 压缩列表包含的各个节点,节点长度由节点保存的内容决定
zlend uint8_t 1字节 特殊值0xFF,用于标记压缩列表的末端

压缩列表节点结构:

  • previous_entry_length:以字节为单位,记录压缩列表中前一个节点的长度。previous_entry_length属性的长度可以是1字节或者5字节:
    • 如果前一节点的长度小于254字节,那么previous_entry_length属性的长度为1字节
    • 日过前一节点的长度大于等于254字节,那么previous_entry_length属性的长度为5字节:其中第一字解会被设置为0xFE(254),而之后的四个字节用于保存前一节点的长度
  • encoding:节点的encoding属性记录了节点content属性薄脆数据的类型以及长度:
    • 一字节、两字节或者五字节长,值的最高位为00、01或者10的是字节数组编码:这种编码便是节点的content属性保存着字节数组,数组的长度由编码除去最高两位之后的其他位记录;
    • 一字节长,值的最高位以11开头的是整数编码:这种编码表示节点的content属性保存着整数值,整数值的类型和长度由编码去除最高两位之后的其他位记录;
  • content

字节数组编码:

编码 编码长度 content属性保存的值
00bbbbbb 1字节 长度小于等于63字节的字节数组
01bbbbbb xxxxxxxx 2字节 长度小于等于16383字节的字节数组
10______ aaaaaaaa bbbbbbbb cccccccc dddddddd 5字节 长度小于等于4294967295的字节数组

整数编码:

编码 编码长度 content属性保存的值
11000000 1字节 int16_t类型的整数
11010000 1字节 int32_t类型的整数
11100000 1字节 int64_t类型的整数
11110000 1字节 24位有符号整数
11111110 1字节 8位有符号整数
1111xxxx 1字节 无contennt属性,xxxx保存了0到12之前的值

连锁更新:
多个连续、长度结余250到253字节之间的节点,如果前面有插入或删除节点,导致previous_entry_length属性需要扩张成5字节,可能导致这些连续的节点都需要更新。最坏情况可能会对整个压缩列表重新执行N次空间重新分配。

unsigned char ziplistNew(void);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
unsigned char *ziplistNew(void) {
//多一个表示结尾的字节
    unsigned int bytes = ZIPLIST_HEADER_SIZE+1;
    unsigned char *zl = zmalloc(bytes);
//#define ZIPLIST_BYTES(zl)       (</em>((uint32_t<em>)(zl)))
    ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
//#define ZIPLIST_TAIL_OFFSET(zl) (</em>((uint32_t<em>)((zl)+sizeof(uint32_t))))
    ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
//#define ZIPLIST_LENGTH(zl)      (</em>((uint16_t<em>)((zl)+sizeof(uint32_t)</em>2)))
    ZIPLIST_LENGTH(zl) = 0;
//#define ZIP_END 255
    zl[bytes-1] = ZIP_END;
    return zl;
}

unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where) {
    unsigned char *p;
//#define ZIPLIST_ENTRY_HEAD(zl)  ((zl)+ZIPLIST_HEADER_SIZE)
//#define ZIPLIST_ENTRY_END(zl)   ((zl)+intrev32ifbe(ZIPLIST_BYTES(zl))-1)
    p = (where == ZIPLIST_HEAD) ? ZIPLIST_ENTRY_HEAD(zl) : ZIPLIST_ENTRY_END(zl);
    return __ziplistInsert(zl,p,s,slen);
}
static unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen;
    unsigned int prevlensize, prevlen = 0;
    size_t offset;
    int nextdiff = 0;
    unsigned char encoding = 0;
    long long value = 123456789; /* initialized to avoid warning. Using a value
                                    that is easy to see if for some reason
                                    we use it uninitialized. */

    zlentry tail;

<pre><code>/* Find out prevlen for the entry that is inserted. */
if (p[0] != ZIP_END) {
</code></pre>

//获取当前节点的previous_entry_length
        ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
    } else {
        unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
        if (ptail[0] != ZIP_END) {
            prevlen = zipRawEntryLength(ptail);
        }
    }

<pre><code>/* See if the entry can be encoded */
</code></pre>

//尝试是否可以编码为整数,同时返回整数编码
    if (zipTryEncoding(s,slen,&value,&encoding)) {
        /* 'encoding' is set to the appropriate integer encoding <em>/
//是整数编码,返回content字节数
        reqlen = zipIntSize(encoding);
    } else {
        /</em> 'encoding' is untouched, however zipEncodeLength will use the
         * string length to figure out how to encode it. <em>/
//字符数组编码,长度直接是数组长度
        reqlen = slen;
    }
    /</em> We need space for both the length of the previous entry and
     * the length of the payload. */

//加上前置节点长度编码占用字节数
    reqlen += zipPrevEncodeLength(NULL,prevlen);
//加上编码占用字节数
    reqlen += zipEncodeLength(NULL,encoding,slen);

<pre><code>/* When the insert position is not equal to the tail, we need to
 * make sure that the next entry can hold this entry's length in
 * its prevlen field. */

</code></pre>

//判断插入位置的节点,实际前置节点部分字节差
//如果有差距后面需要从1字节变成5字节
    nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;

<pre><code>/* Store offset because a realloc may change the address of zl. */
offset = p-zl;
</code></pre>

//重新分配压缩列表内存,重新计算当前插入节点位置
    zl = ziplistResize(zl,curlen+reqlen+nextdiff);
    p = zl+offset;

<pre><code>/* Apply memory move when necessary and update tail offset. */
</code></pre>

//从头插入,后面的节点内存位置向后移动
    if (p[0] != ZIP_END) {
        /* Subtract one because of the ZIP_END bytes */
//向后移动,目标位置当前节点以后,起始位置当前位置减去前置节点便宜,长度为所有原节点长度减去尾部标示符
        memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);

<pre><code>    /* Encode this entry's raw length in the next entry. */
</code></pre>

//设置后面节点的前置节点长度
        zipPrevEncodeLength(p+reqlen,reqlen);

<pre><code>    /* Update offset for tail */
</code></pre>

//更新尾部节点偏移量
        ZIPLIST_TAIL_OFFSET(zl) =
            intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);

<pre><code>    /* When the tail contains more than one entry, we need to take
     * &quot;nextdiff&quot; in account as well. Otherwise, a change in the
     * size of prevlen doesn't have an effect on the *tail* offset. */

    tail = zipEntry(p+reqlen);
</code></pre>

//尾部标示符不对,说明后一个节点的前置节点便宜有扩大(nextdiff > 0),以新的为准
        if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
        }
    } else {
//尾部插入,直接更新尾部标记
        /* This element will be the new tail. */
        ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
    }

<pre><code>/* When nextdiff != 0, the raw length of the next entry has changed, so
 * we need to cascade the update throughout the ziplist */

if (nextdiff != 0) {
    offset = p-zl;
</code></pre>

//传说中的连锁更新
        zl = __ziplistCascadeUpdate(zl,p+reqlen);
        p = zl+offset;
    }

<pre><code>/* Write the entry */
</code></pre>

//写入当前节点
//前置节点偏移
    p += zipPrevEncodeLength(p,prevlen);
//编码
    p += zipEncodeLength(p,encoding,slen);
//#define ZIP_IS_STR(enc) (((enc) & ZIP_STR_MASK) < ZIP_STR_MASK)
    if (ZIP_IS_STR(encoding)) {
//是字符串编码,直接复制过去
        memcpy(p,s,slen);
    } else {
//是整数编码,写入整数
        zipSaveInteger(p,value,encoding);
    }
//增加压缩列表节点数
    ZIPLIST_INCR_LENGTH(zl,1);
    return zl;
}

static int zipTryEncoding(unsigned char *entry, unsigned int entrylen, long long *v, unsigned char *encoding) {
    long long value;

<pre><code>if (entrylen &gt;= 32 || entrylen == 0) return 0;
</code></pre>

//尝试将字符串转换成整数
    if (string2ll((char<em>)entry,entrylen,&value)) {
        /</em> Great, the string can be encoded. Check what's the smallest
         * of our encoding types that can hold this value. */
        if (value >= 0 && value <= 12) {
            *encoding = ZIP_INT_IMM_MIN+value;
        } else if (value >= INT8_MIN && value <= INT8_MAX) {
            *encoding = ZIP_INT_8B;
        } else if (value >= INT16_MIN && value <= INT16_MAX) {
            *encoding = ZIP_INT_16B;
        } else if (value >= INT24_MIN && value <= INT24_MAX) {
            *encoding = ZIP_INT_24B;
        } else if (value >= INT32_MIN && value <= INT32_MAX) {
            *encoding = ZIP_INT_32B;
        } else {
            *encoding = ZIP_INT_64B;
        }
        *v = value;
        return 1;
    }
    return 0;
}
//联锁(级联)更新
static unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p) {
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), rawlen, rawlensize;
    size_t offset, noffset, extra;
    unsigned char *np;
    zlentry cur, next;

<pre><code>while (p[0] != ZIP_END) {
    cur = zipEntry(p);
    rawlen = cur.headersize + cur.len;
    rawlensize = zipPrevEncodeLength(NULL,rawlen);

    /* Abort if there is no next entry. */
</code></pre>

//下一个节点是列表尾,退出
        if (p[rawlen] == ZIP_END) break;
        next = zipEntry(p+rawlen);

<pre><code>    /* Abort when &quot;prevlen&quot; has not changed. */
</code></pre>

//下一个节点前置长度节点没有变化,不需要级联更新
        if (next.prevrawlen == rawlen) break;

//下一个节点需要扩张前置节点偏移
        if (next.prevrawlensize < rawlensize) {
            /* The "prevlen" field of "next" needs more bytes to hold
             * the raw length of "cur". */
            offset = p-zl;
            extra = rawlensize-next.prevrawlensize;
//压缩列表重新分配空间,原空间加上扩充空间
            zl = ziplistResize(zl,curlen+extra);
            p = zl+offset;

<pre><code>        /* Current pointer and offset for next element. */
        np = p+rawlen;
        noffset = np-zl;

        /* Update tail offset when next element is not the tail element. */
</code></pre>

//更新尾节点偏移
            if ((zl+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) != np) {
                ZIPLIST_TAIL_OFFSET(zl) =
                    intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra);
            }

<pre><code>        /* Move the tail to the back. */
</code></pre>

//向后移动内存
            memmove(np+rawlensize,
                np+next.prevrawlensize,
                curlen-noffset-next.prevrawlensize-1);
//重新设置上一节点长度
            zipPrevEncodeLength(np,rawlen);

<pre><code>        /* Advance the cursor */
</code></pre>

//向后遍历是否需要继续调整
            p += rawlen;
            curlen += extra;
        } else {
//下一个节点是要缩短的,强制不缩短,防止过多的内存重新分配
            if (next.prevrawlensize > rawlensize) {
                /* This would result in shrinking, which we want to avoid.
                 * So, set "rawlen" in the available bytes. */
                zipPrevEncodeLengthForceLarge(p+rawlen,rawlen);
            } else {
                zipPrevEncodeLength(p+rawlen,rawlen);
            }

<pre><code>        /* Stop here, as the raw length of &quot;next&quot; has not changed. */
        break;
    }
}
return zl;
</code></pre>

}
//写入整数
static void zipSaveInteger(unsigned char <em>p, int64_t value, unsigned char encoding) {
    int16_t i16;
    int32_t i32;
    int64_t i64;
    if (encoding == ZIP_INT_8B) {
        ((int8_t</em>)p)[0] = (int8_t)value;
    } else if (encoding == ZIP_INT_16B) {
        i16 = value;
        memcpy(p,&i16,sizeof(i16));
        memrev16ifbe(p);
    } else if (encoding == ZIP_INT_24B) {
        i32 = value<<8;
        memrev32ifbe(&i32);
        memcpy(p,((uint8_t<em>)&i32)+1,sizeof(i32)-sizeof(uint8_t));
    } else if (encoding == ZIP_INT_32B) {
        i32 = value;
        memcpy(p,&i32,sizeof(i32));
        memrev32ifbe(p);
    } else if (encoding == ZIP_INT_64B) {
        i64 = value;
        memcpy(p,&i64,sizeof(i64));
        memrev64ifbe(p);
    } else if (encoding >= ZIP_INT_IMM_MIN && encoding <= ZIP_INT_IMM_MAX) {
        /</em> Nothing to do, the value is stored in the encoding itself. */
    } else {
        assert(NULL);
    }
}

[redis设计与实现][8]数据库

Redis数据库定义:

typedef struct redisDb {
    dict *dict;                 /* The keyspace for this DB */
    dict *expires;              /* Timeout of keys with a timeout set */
    dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP) */
    dict *ready_keys;           /* Blocked keys that received a PUSH */
    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
    int id;
    long long avg_ttl;          /* Average TTL, just for stats */
} redisDb;

dict

dict数组保存所有的数据库,Redis初始化的时候,默认会创建16个数据库

#define REDIS_DEFAULT_DBNUM     16

默认情况下,Redis客户端的目标数据库是0 号数据库,可以通过select命令切换。 注意,由于Redis缺少获取当前操作的数据库命令,使用select切换需要特别注意

读写数据库中的键值对的时候,Redis除了对键空间执行指定操作外,还有一些额外的操作:

  • 读取键之后(读和写操作都会先读取),记录键空间命中或不命中次数
  • 读取键之后,更新键的LRU
  • 读取时发现已经过期,会先删除过期键
  • 如果有客户端使用watch命令监视了key,会在修改后标记为dirty
  • 修改之后,会对dirty键计数器加1,用于持久化和复制
  • 如果开启了数据库通知,修改之后会发送相应通知
robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) {
    robj *o = lookupKeyRead(c->db, key);
    if (!o) addReply(c,reply);
    return o;
}
robj *lookupKeyRead(redisDb *db, robj *key) {
    robj *val;
 
    //查询是否已经过期 
    expireIfNeeded(db,key);
    val = lookupKey(db,key);
    if (val == NULL)
        server.stat_keyspace_misses++;
    else
        server.stat_keyspace_hits++;
    return val;
}
robj *lookupKey(redisDb *db, robj *key) {
    dictEntry *de = dictFind(db->dict,key->ptr);
    if (de) {
        robj *val = dictGetVal(de);
 
        /* Update the access time for the ageing algorithm.
         * Don’t do it if we have a saving child, as this will trigger
         * a copy on write madness. */
        if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
            //设置lru时间 
            val->lru = server.lruclock;
        return val;
    } else {
        return NULL;
    }
}

expires

通过exprire或者pexpire命令,可以设置键的TTL,如果键的TTL为0,会被自动删除。

expires字典保存了数据库中所有键的过期时间。

  • 过期字典的键是指向某个数据中的键对象
  • 过期字段的值是long long类型的整数,保存这个键的过期时间
    void expireCommand(redisClient *c) {
      expireGenericCommand(c,mstime(),UNIT_SECONDS);
    }
    void expireGenericCommand(redisClient *c, long long basetime, int unit) {
      robj *key = c->argv[1], *param = c->argv[2];
      long long when; /* unix time in milliseconds when the key will expire. */
     
      if (getLongLongFromObjectOrReply(c, param, &when, NULL) != REDIS_OK)
          return;
     
      if (unit == UNIT_SECONDS) when *= 1000;
      when += basetime;
     
      /* No key, return zero. */
      if (lookupKeyRead(c->db,key) == NULL{
          addReply(c,shared.czero);
          return;
      }
     
      /* EXPIRE with negative TTL, or EXPIREAT with a timestamp into the past
       * should never be executed as a DEL when load the AOF or in the context
       * of a slave instance.
       *
       * Instead we take the other branch of the IF statement setting an expire
       * (possibly in the past) and wait for an explicit DEL from the master. */
      if (when <= mstime() && !server.loading && !server.masterhost{
          robj *aux;
     
          redisAssertWithInfo(c,key,dbDelete(c->db,key));
          server.dirty++;
     
          /* Replicate/AOF this as an explicit DEL. */
          aux = createStringObject(DEL,3);
          rewriteClientCommandVector(c,2,aux,key);
          decrRefCount(aux);
          signalModifiedKey(c->db,key);
          notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,del,key,c->db->id);
          addReply(c, shared.cone);
          return;
      } else {
          //放到expires字典中 
          setExpire(c->db,key,when);
          addReply(c,shared.cone);
          signalModifiedKey(c->db,key);
          notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,expire,key,c->db->id);
          server.dirty++;
          return;
      }
    }

过期键删除策略

  • 惰性删除:每次执行命令前,都会调用expireIfNeeded函数检查是否过期,如果已经过期,改函数会删除过期键
  • 定时删除:定时执行activeExpireCycleTryExpire函数

    expireIfNeeded

    int expireIfNeeded(redisDb *db, robj *key) {
      mstime_t when = getExpire(db,key);
      mstime_t now;
     
      if (when < 0return 0/* No expire for this key */
     
      /* Don’t expire anything while loading. It will be done later. */
      if (server.loadingreturn 0;
     
      /* If we are in the context of a Lua script, we claim that time is
       * blocked to when the Lua script started. This way a key can expire
       * only the first time it is accessed and not in the middle of the
       * script execution, making propagation to slaves / AOF consistent.
       * See issue #1525 on Github for more information. */
      now = server.lua_caller ? server.lua_time_start : mstime();
     
      /* If we are running in the context of a slave, return ASAP:
       * the slave key expiration is controlled by the master that will
       * send us synthesized DEL operations for expired keys.
       *
       * Still we try to return the right information to the caller,
       * that is, 0 if we think the key should be still valid, 1 if
       * we think the key is expired at this time. */
      if (server.masterhost != NULLreturn now > when;
     
      /* Return when this key has not expired */
      if (now <= when) return 0;
     
      /* Delete the key */
      server.stat_expiredkeys++;
      propagateExpire(db,key);
      notifyKeyspaceEvent(REDIS_NOTIFY_EXPIRED,
          expired,key,db->id);
      return dbDelete(db,key);
    }

    activeExpireCycleTryExpire

    while (num–) {
      dictEntry *de;
      long long ttl;
     
      if ((de = dictGetRandomKey(db->expires)) == NULLbreak;
      ttl = dictGetSignedIntegerVal(de)-now;
      if (activeExpireCycleTryExpire(db,de,now)) expired++;
      if (ttl < 0) ttl = 0;
      ttl_sum += ttl;
      ttl_samples++;
    }

    AOF、RDB和复制功能对过期键的处理

  • 生成RDB文件时,已过期的键不会被保存到新的RDB文件中
  • 载入RDB文件:
    • 主服务器载入时,会忽略过期键
    • 从服务器载入时,都会被载入(但是很快会因为同步被覆盖)
  • AOF写入,已过期未删除的键没有影响,被删除后,会追加一条del命令
  • AOF重写,会对键进行检查,过期键不会保存到重写后的AOF文件
  • 复制:
    • 主服务器删除一个过期键后,会显式向所有从服务器发送DEL命令
    • 从服务器执行读命令,及时过期也不会删除,只有接受到主服务器DEL命令才会删除

netty4 UDP 通信

netty4 UDP 通信

netty4 UDP 通信的例子很少,官方代码里面只有https://github.com/netty/netty/tree/4.0/example/src/main/java/io/netty/example/qotm这一个例子。但是,因为需求是从原先TCP 方式修改成UDP,需要能够最大方式的利用原先的解码器、处理器,但是官方的例子里面, 客户端只有一个广播消息,服务端回复一个消息。这当然是无法满足真正的业务需求的。特别是这次的需求是将原有的TCP长连接方式发送心跳, 改成UDP方式。

复用客户端编解码器和handler

原先的解码器是为TCP 包写的,直接通过继承ReplayingDecoder,对bytebuf中的字节按照自定义的协议,转换成内部对象。 按照官方例子,基于UDP的解码器,都是直接将从DatagramPacket对象封装的UDP包中进行解码。 因此,最简单的编码复用方式,是在自己的ByteToMessage的解码器之前,放置一个MessageToMessage解码器,将DatagramPacket中的bytebuf提取出来。

public class UdpMessageToByteDecode extends MessageToMessageDecoder<DatagramPacket> {
    @Override
    protected void decode(ChannelHandlerContext ctxDatagramPacket msgList<Object> out) throws Exception {
        out.add(msg.content());
        msg.retain();
    }
}

这里特别注意两个问题:

  1. 在out中添加了msg的bytebuf之后,注意调用msg的retain方法,防止msg中的bytebuf提前释放抛出异常
  2. 这样写了之后,丢失了DatagramPacket中的sender,这样在后续的处理器中,无法直接向来源发送消息,所以这种方式基本只试用于客户端

除了解码器,编码器也需要有对应的改造。由于UDP包没有连接,没有办法在连接的时候指定一个目标地址。 所以解码器利用了channel的attribute map来让客户端能够随时切换服务端地址。

public class UdpByteToMessageEncode extends MessageToMessageEncoder<ByteBuf> {
    private static Log log = LogFactory.getLog(UdpByteToMessageEncode.class);
 
    public static final AttributeKey<InetSocketAddress> TARGET_ADDRESS = AttributeKey.valueOf(TARGET_ADDRESS);
 
    @Override
    protected void encode(ChannelHandlerContext ctxByteBuf msgList<Object> out) throws Exception {
        InetSocketAddress ip = ctx.channel().attr(TARGET_ADDRESS).get();
        if (ip == null{
            log.error(no server ip);
            return;
        }
 
        DatagramPacket packet = new DatagramPacket(msg, ip);
        msg.retain();
        out.add(packet);
    }
}

首先从原始编码器获得bytebuf,从channel的attribute中获取目标服务器地址,然后组装成最终发送的DatagramPacket。 这里要注意的是:

  1. 目标地址为了能在整个连接上下文中共享,需要保存在channel中,而不是context中
  2. 发送的bytebuf仍然需要调用retain来防止提前释放抛出异常

服务端数据应答

之前在客户端的编码解码器之前(后)加上简单封装,就可以正常的首发UDP包。因为客户端的业务逻辑比较简单,UDP包只包含了客户端心跳, 无需对服务端的应答包进行处理。而服务器端,则需要对应答包做出业务逻辑处理(这里大致处理是将心跳信息写入Redis中),并发送响应包。 因此,会发现一个严重的问题,原先处理TCP的流程,缺少了一个重要的信息,就是客户端地址。

为了解决这个问题,只能新建一个新的编码、解码器,并且继承原先的业务对象(请求、应答包)增加来源(目标)地址。但是由于UDP包没有TCP 包 粘连的问题,因此只需要继承MessageToMessageDecoder即可,不需要使用类似ReplayingDecoder这样复杂的解码器上层实现。

public class UdpRequestDecode extends MessageToMessageDecoder<DatagramPacket> {
    private static final Log log = LogFactory.getLog(UdpRequestDecode.class);
 
    @Override
    protected void decode(ChannelHandlerContext ctxDatagramPacket packetList<Object> out) throws Exception {
        try {
            UdpRequestData requestData = new UdpRequestData();
            ByteBuf content = packet.content();
            requestData.setEncode(content.readByte());
 
            ...
 
            requestData.setSender(packet.sender());
 
            out.add(requestData);
        } catch (Exception e) {
            log.error(String.format(error decode udp packet from [ %s ], ignore!, packet.sender().toString()));
        }
    }
}

编码器也是类似,需要在业务处理的handler中判断是否为UdpRequestData,确定是否需要在write的时候使用对应的UdpResponseData。

服务器端连接管理

服务器连接管理,主要用于记录服务器端链接数量和负载的记录,这些数据会被写入到Redis中,客户端将依据这些数据来挑选连接数量最少、 压力最小的服务器进行连接。之前使用的TCP协议,因此连接管理非常方便,服务器端维护一个ChannelGroup,在active和inactive的时候从ChannelGroup 中添加或者删除。

由于UDP是无连接的,无法通过handler的几个回调来判断连接状态,所以将逻辑改成了服务器端维护一个hashmap,客户端IP作为key, 最后一次udp包接收时间作为value。在处理业务逻辑的handler中,每当收到一个UDP包,就将这对数据放入到hashmap中。服务器端定时器在定时写入Redis 的时候,通过判断value是否过期,来移除这个键值对。通过这样的逻辑,来记录服务器端的连接数基本上是靠谱的。

同时,为了兼容原先的TCP方式心跳,需要将TCP方式的连接对应的时间戳,改成Long.MAX_VALUE,确保TCP连接只在inactive的时候才被移除。

@Override
protected void channelRead0(ChannelHandlerContext ctx, RequestData request) throws Exception {
    RequestContext requestContext = new RequestContext(request, ctx.channel());
    ResponseData response = DO_SOMETHING(requestContext);
 
    if (request instanceof UdpRequestData{
        UdpRequestData udpRequest = (UdpRequestData) request;
        // log current sender ip 
        Server.addClient(udpRequest.getSender().getAddress().getHostAddress(), System.currentTimeMillis());
 
        UdpResponseData udpResponseData = UdpResponseData.fromResponseData(response);
        udpResponseData.setRecipient(udpRequest.getSender());
        ctx.writeAndFlush(udpResponseData);
    } else {
        ctx.writeAndFlush(response);
    }
}
 
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    Channel channel = ctx.channel();
    SocketAddress remoteAddress = channel.remoteAddress();
    if (remoteAddress != null{
        InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
        // tcp never timeout 
        Server.addClient(inetSocketAddress.getAddress().getHostAddress(), Long.MAX_VALUE);
    }
    super.channelActive(ctx);
}
 
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    Channel channel = ctx.channel();
    SocketAddress remoteAddress = channel.remoteAddress();
    if (remoteAddress != null{
        InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
        Server.removeClient(inetSocketAddress.getAddress().getHostAddress());
    }
    super.channelInactive(ctx);
}

定时器处理逻辑大致为:

public ServerClusterInfo getServerClusterInfo() {
    ServerClusterInfo serverClusterInfo = new ServerClusterInfo();
    serverClusterInfo.setPort(this.serverClusterInfo.getPort());
    serverClusterInfo.setHost(this.serverClusterInfo.getHost());
 
    Set<String> ips = new HashSet<String>();
    long currentTs = System.currentTimeMillis();
    Iterator<Map.Entry<StringLong>> iter = ipMap.entrySet().iterator();
    while (iter.hasNext(){
        Map.Entry<StringLong> ipEntry = iter.next();
        if (currentTs - ipEntry.getValue() < TIMEOUT{
            ips.add(ipEntry.getKey());
        } else {
            iter.remove();
        }
    }
 
    serverClusterInfo.setCurChannel(ips.size());
    serverClusterInfo.setConnectIps(ips);
    serverClusterInfo.setCurConnect(ips.size());
    return serverClusterInfo;
}

客户端的超时重连

之前TCP版本的重连非常方便,当发现客户端连接服务器的channel isActive方法返回false的时候,直接重新获取服务器列表, 按照连接数排序,重新去连接一个连接数最低的服务器即可。

同样是因为UDP是无连接的,同时UDP也是不安全的,一个包发出去很有可能肉包子打狗——有去无回了。(当然,在内网环境下,网络环境还是应该乐观点的~) 处理类似的最简单办法,就是超时重连了。

之前的客户端,所有消息会被放到一个阻塞队列(因为心跳什么的都是由定时器线程产生的),由一个线程来完成从阻塞队列读取并发送的过程。 这次的修改,也只能从这里下手了。首先在客户端维护一个最后一个数据包返回的时间戳(为了线程安全神马的,直接用了AtomicLong类型)。

首先在初始化的时候,将这个值设置为0。确保第一次发送消息的时候,客户端会主动去“连接”服务端。为什么这里的“连接”是加上引号的呢? 原因很简答,还是那句话,UDP是无连接的。按照之前的设计,这个所谓的“连接”,只是简单的在channel的attribute map中设置了TARGET_ADDRESS这个变量而已。

Thread sendThread = new Thread() {
    public void run() {
        while (true{
            RequestData requestData;
            try {
                requestData = queue.take();
 
                long currentTimeMillis = System.currentTimeMillis();
 
                if (currentTimeMillis - lastResponse.get() > TIMEOUT{
                    boolean connect = BootStrapConnectUtil.connect(channelFuture, serverClusterInfos);
                    if(!connect) {
                        log.error(fail to connect to watch dog! ignore current package);
                        continue;
                    }
                }
 
                channelFuture.channel().writeAndFlush(requestData);
            } catch (Throwable e) {
                log.error(, e);
            }
 
        }
    }
};
 
public static boolean connect(ChannelFuture channelFuture, List<ServerClusterInfo> serverClusterInfos) {
    InetSocketAddress remoteAddress = channelFuture.channel().attr(UdpByteToMessageEncode.TARGET_ADDRESS).get();
    serverClusterInfos = sortServer(serverClusterInfos, remoteAddress);
    boolean result = false;
    for (ServerClusterInfo server : serverClusterInfos) {
        try {
            channelFuture.channel().attr(UdpByteToMessageEncode.TARGET_ADDRESS).set(server.getAddress());
            result = true;
            Client.getClient().setLastResponse(System.currentTimeMillis());
        } catch (Throwable ignored) {
        }
    }
    return result;
}

这里好多命名和流程,一看就不像是UDP协议的,的确,这些代码之前都走的是TCP协议~~

除此之外,当然是handler里面,要在每次收到服务器端响应包的时候,回写这个lastResponse了。

@Override
protected void channelRead0(ChannelHandlerContext ctx, ResponseData response) throws Exception {
    ResultEnum responseCode = ResultEnum.fromCode(response.getResult());
    if (responseCode != null{
        switch (responseCode) {
            case SUCCESS:// 处理成功 
                Client.getClient().setLastResponse(System.currentTimeMillis());
                break;
            default:
                // do nothing 
        }
    }
}
 
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.close();
    // 强行过期 
    AgentClient.getClient().setLastResponse(0l);
}

这里还加了一个,如果发生了异常,强行“关闭”连接。

这样,原先TCP方式的心跳,被改成了UDP方式。为了避免UDP协议的无连接、不可靠,整个流程中加了两个超时,确保了客户端、服务器端都能够和 以前一样正常的首发消息、断线重连等。