coolEx

Today will be better

nanomsg实验——survey

nanomsg实验——survey

survey模式是由server发出询问,client针对请求回复响应的一种模式。这种模式在分布式系统中非常有用,
可以用来做服务发现、分布式事物等分布式询问。

客户端

客户端实现比较方便,除了基础调用(创建socket、连接url)之外,就是先接收服务端询问
(例子中比较简单,服务端询问是固定的,所以没有对内容进行检查)针对询问发送响应
(例子中是发送服务端当前时间)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <ctime>
#include <nanomsg/nn.h>
#include <nanomsg/survey.h>

using namespace std;

int main(int argc, const char **argv) {
    if(argc != 3) {
        fprintf(stderr, "usage: %s NAME URLn", argv[0]);
        exit(-1);
    }
    const char *name = argv[1];
    const char *url = argv[2];

    int sock = nn_socket(AF_SP, NN_RESPONDENT);
    if(sock < 0){
        fprintf(stderr, "nn_socket fail: %sn", nn_strerror(errno));
        exit(-1);
    }
    if(nn_connect(sock, url) < 0) {
        fprintf(stderr, "nn_connect fail: %sn", nn_strerror(errno));
        exit(-1);
    }

    while(1){
        char *buf = NULL;
        int bytes = nn_recv (sock, &buf, NN_MSG, 0);

        if(bytes > 0) {
            printf ("CLIENT (%s): RECEIVED "%s" SURVEY REQUESTn", name, buf);
            nn_freemsg (buf);

            char sendBuffer[128];
            time_t rawtime;
            struct tm * timeinfo;

            time (&rawtime);
            timeinfo = localtime (&rawtime);
            char *timeText = asctime (timeinfo);
            int textLen = strlen(timeText);
            timeText[textLen - 1] = '\0';
            sprintf(sendBuffer, "[ %s ] %s", name, timeText);
            int sendSize = strlen(sendBuffer) + 1;
            int actualSendSize = nn_send(sock, sendBuffer, sendSize, 0);

            if(actualSendSize != sendSize) {
                fprintf(stderr, "nn_send fail, expect length %d, actual length %dn", sendSize, actualSendSize);
                continue;
            }
        }
    }

    nn_shutdown(sock, 0);

    return 0;
}

这里收到消息后,就简单的打印,然后将响应数据写会给服务端。

服务端

服务端有个问题,之前搜索了几个例子都不太正常。经过尝试和简单查看代码之后发现,通过nanomsg基础api,
无法获取当前有多少客户端。但是,如果当前所有连接的客户端的响应都已经收到,再次调用nn_recv之后,
会直接返回-1,表示读取失败,同时errno(通过errno函数获取)被设置为EFSM,表示当前状态机状态不正确。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/survey.h>

using namespace std;

const char *SURVEY_TYPE = "DATE";

int main(int argc, char** argv)
{

    if ( argc != 2 ) {
        fprintf(stderr, "usage: %s URLn", argv[0]);
        exit(-1);
    }
    const char *url = argv[1];
    int sock = nn_socket(AF_SP, NN_SURVEYOR);
    if(sock < 0) {
        fprintf (stderr, "nn_socket failed: %sn", nn_strerror (errno));
        exit(-1);
    }

    if(nn_bind(sock, url) < 0) {
        fprintf(stderr, "nn_bind fail: %sn", nn_strerror(errno));
        exit(-1);
    }

    while(1) {
        int sendSize = strlen(SURVEY_TYPE) + 1;
        int actualSendSize;
        printf ("SERVER: SENDING DATE SURVEY REQUESTn");
        if ((actualSendSize = nn_send(sock, SURVEY_TYPE, sendSize, 0)) != sendSize) {
            fprintf(stderr, "nn_send fail, expect length %d, actual length %dn", sendSize, actualSendSize);
            continue;
        }

        int count = 0;
        while(1) {
            char *buf = NULL;
            int bytes = nn_recv (sock, &buf, NN_MSG, 0);
            if (bytes < 0 && nn_errno() == ETIMEDOUT) break;
            if (bytes >= 0) {
                printf ("SERVER: RECEIVED "%s" SURVEY RESPONSEn", buf);
                ++count;
                nn_freemsg (buf);
            } else {
                fprintf(stderr, "nn_recv fail: %sn", nn_strerror(errno));
                break;
            }
        }
        printf("SERVER: current receive %d survey response.n", count);
        sleep(1);
    }

    nn_shutdown(sock, 0);

    return 0;

}

这里用了两个死循环,外层循环不停尝试向客户端发起询问。完成询问后,通过另外一个死循环读取所有的客户端响应,
当读取失败时退出循环。

之前找到的源码是直接判断错误是否ETIMEDOUT,经过打印会发现每次都没有超时,而是状态机错误:


1
2
3
/*  If no survey is going on return EFSM error. */
if (nn_slow (!nn_surveyor_inprogress (surveyor)))
    return -EFSM;

测试

测试和前文差不多,先启动一个server,然后再一个个启动client:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#!/bin/bash

BASE="$( cd "$( dirname "$0" )" && pwd )"
SERVER=$BASE/surveyserver
CLIENT=$BASE/surveyclient

URL="tcp://127.0.0.1:1234"

echo "start surveyserver to bind tcp: $URL"
$SERVER tcp://127.0.0.1:1234 &

echo "start to start surveyclient"
for((i = 0; i < 10; i++))
do
    echo "start client$i"
    $CLIENT client$i $URL &
    sleep 1
done

sleep 20
echo "kill all process and exit"

for pid in `jobs -p`
do
    echo "kill $pid"
    kill $pid
done

wait

输出为:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
start surveyserver to bind tcp: tcp://127.0.0.1:1234
start to start surveyclient
start client0
SERVER: SENDING DATE SURVEY REQUEST
start client1
nn_recv fail: Operation cannot be performed in this state
SERVER: current receive 0 survey response.
start client2
SERVER: SENDING DATE SURVEY REQUEST
CLIENT (client0): RECEIVED "DATE" SURVEY REQUEST
SERVER: RECEIVED "[ client0 ] Tue Feb 17 23:32:43 2015" SURVEY RESPONSE
CLIENT (client1): RECEIVED "DATE" SURVEY REQUEST
SERVER: RECEIVED "[ client1 ] Tue Feb 17 23:32:43 2015" SURVEY RESPONSE
nn_recv fail: Operation cannot be performed in this state
SERVER: current receive 2 survey response.
start client3
SERVER: SENDING DATE SURVEY REQUEST
CLIENT (client0): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client1): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client2): RECEIVED "DATE" SURVEY REQUEST
...
SERVER: SENDING DATE SURVEY REQUEST
CLIENT (client0): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client1): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client2): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client3): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client4): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client5): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client6): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client7): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client9): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client8): RECEIVED "DATE" SURVEY REQUEST
SERVER: RECEIVED "[ client0 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client1 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client2 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client3 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client4 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client5 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client6 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client7 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client9 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client8 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
nn_recv fail: Operation cannot be performed in this state
SERVER: current receive 10 survey response.

从输出可以看见,每次最后一个接收完成之后,都会有一个“Operation cannot be performed in this state”
错误,也就是EFSM错误。

nanomsg实验——pubsub

nanomsg实验——pubsub

发布订阅模式是很多消息中间件提供的常见功能。通过消息机制,能够将消息发布者和消息接收(消费)者
进行解耦。pubsub模式也是nanomsg直接支持的一直消息模型之一,因此通过pubsub模式实验,
同时也大致了解了下nanomsg的基础用法。

服务端


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>

void usage(const char *name)
{
    fprintf(stderr, "%s [ bind url]n", name);
}

int main(int argc, char **argv)
{
    if(argc != 2) {
        usage(argv[0]);
        exit(-1);
    }

    const char *url = argv[1];
    int sock = nn_socket(AF_SP, NN_PUB);
    if(sock < 0) {
        fprintf (stderr, "nn_socket failed: %sn", nn_strerror (errno));
        exit(-1);
    }

    if(nn_bind(sock, url) < 0) {
        fprintf(stderr, "nn_bind failed: %sn", nn_strerror(errno));
        exit(-1);
    }

    while(1) {
        time_t rawtime;
        struct tm * timeinfo;

        time (&rawtime);
        timeinfo = localtime (&rawtime);
        char *text = asctime (timeinfo);
        int textLen = strlen(text);
        text[textLen - 1] = '\0';

        printf ("SERVER: PUBLISHING DATE %sn", text);
        nn_send(sock, text, textLen, 0);
        sleep(1);
    }

    return 0;
}

nanomsg使用非常简单,只要直接include nanomsg/nn.h,即可使用基本API。使用内置的通信模式,
需要引入对应的头文件,如pubsub模式,引入nonomsg/pubsub.h即可。

pubsub server,需要首先通过nn_socket调用创建socket,这里模仿了POSIX接口,
函数返回一个文件描述符。因此直接通过判断返回值是否大于0,判断是否创建成功。注意第二个参数为协议,
在协议相关头文件中会定义对应的宏。然后所有操作都将基于这个文件描述符。
和berkeley sockets一样,server需要bind一个端口,nanomsg需要bind一个url。目前nanomsg支持的格式有:
* 进程内通信(inproc):url格式为inproc://test
* 进程间同in想(ipc):url格式为ipc:///tmp/test.ipc
* tcp通信:url格式为tcp://*:5555

github上源码貌似已经支持websocket了。

nanomsg的错误和UNIX相同,失败之后会设置errno,可以通过nn_strerror获取对应的错误文本。

bind完了之后,就可以通过nn_send函数向socket发送消息了。这个函数参数和berkeley sockets api接口类似。
这里直接获取当前时间,然后发出给所有订阅者。

客户端


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <stdio.h>
#include <stdlib.h>

#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>

int main(int argc, char **argv)
{
    if(argc != 3) {
        fprintf(stderr, "usage: %s NAME BIND_URLn", argv[0]);
        exit(-1);
    }
    const char *name = argv[1];
    const char *url = argv[2];

    int sock = nn_socket (AF_SP, NN_SUB);
    if(sock < 0) {
        fprintf(stderr, "fail to create socket: %sn", nn_strerror(errno));
        exit(-1);
    }
    if(nn_setsockopt (sock, NN_SUB, NN_SUB_SUBSCRIBE, "", 0) < 0) {
        fprintf(stderr, "fail to set sorket opts: %sn", nn_strerror(errno));
        exit(-1);
    }

    if (nn_connect(sock, url) < 0) {
        fprintf(stderr, "fail to connect to %s : %sn", url, nn_strerror(errno));
        exit(-1);
    }


    while ( 1 ) {
        char *buf = NULL;
        int bytes = nn_recv (sock, &buf, NN_MSG, 0);
        printf ("CLIENT (%s): RECEIVED %sn", name, buf);
        nn_freemsg (buf);
    }

    nn_shutdown(sock, 0);

    return 0;
}

客户端初始化和服务端差不多,在连接服务端之前,需要通过nn_setsockopt将当前socket设置成消息订阅者。
然后通过nn_connect连接发布者,参数和服务端bind的差不多,也是一个socket、一个url。
这里的url要和服务端bind的url相同。之后就是一个死循环不停的接收发布者的消息。

测试

首先是编译,和普通c程序相同,只是增加链接nanomsg。


1
2
gcc -o pubserver pubserver.c -lnanomsg
gcc -o pubclient pubclient.c -lnanomsg

为了方便测试,写了一个简单的shell脚本:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#!/bin/bash

BASE="$( cd "$( dirname "$0" )" && pwd )"
PUB=$BASE/pubserver
SUB=$BASE/pubclient

URL="tcp://127.0.0.1:1234"

echo "start pubserver to bind tcp: $URL"

$PUB tcp://127.0.0.1:1234 &

echo "start to start pubclient"
for((i = 0; i < 10; i++))
    do
    echo "start client$i"
    $SUB client$i $URL &
    sleep 1
done

sleep 20
echo "kill all process and exit"

for pid in `jobs -p`
do
    echo "kill $pid"
    kill $pid
done

wait

脚本很简单,首先启动一个消息发布者,然后每秒启动一个消息接受者。等待20s之后,kill掉所有子进程。

脚本的输出:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
start pubserver to bind tcp: tcp://127.0.0.1:1234
start to start pubclient
start client0
SERVER: PUBLISHING DATE Tue Feb 17 15:12:11 2015
start client1
SERVER: PUBLISHING DATE Tue Feb 17 15:12:12 2015
CLIENT (client0): RECEIVED Tue Feb 17 15:12:12 2015
CLIENT (client1): RECEIVED Tue Feb 17 15:12:12 2015
start client2
SERVER: PUBLISHING DATE Tue Feb 17 15:12:13 2015
CLIENT (client0): RECEIVED Tue Feb 17 15:12:13 2015
CLIENT (client1): RECEIVED Tue Feb 17 15:12:13 2015
CLIENT (client2): RECEIVED Tue Feb 17 15:12:13 2015
start client3
SERVER: PUBLISHING DATE Tue Feb 17 15:12:14 2015
CLIENT (client0): RECEIVED Tue Feb 17 15:12:14 2015
CLIENT (client1): RECEIVED Tue Feb 17 15:12:14 2015
CLIENT (client2): RECEIVED Tue Feb 17 15:12:14 2015
...
SERVER: PUBLISHING DATE Tue Feb 17 15:12:41 2015
CLIENT (client0): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client1): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client2): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client3): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client4): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client5): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client6): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client7): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client8): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client9): RECEIVED Tue Feb 17 15:12:41 2015
kill all process and exit

可以看见每次启动一个新的订阅者,每个订阅者都能够收到发布者发布的当前时间。

graphviz dot初探

graphviz dot初探

简介

现在文档都用markdown保存到github、gitlab这种代码仓库。markdown遇到最大的问题就是对图片的引用,
直接用工具绘制的图片可以引用,但是这样没法像md文件那样在git仓库中进行版本管理,而且既然文档用了描述语言,
引用图片源文件能用描述语言就更好了。

dot是graphviz的一种描述语言,可以通过graphviz提供的命令行工具生成图片文件。

安装

用gentoo(prefix)安装graphviz直接emerge即可,除了默认的选项,增加了svg格式的支持:


1
USE="svg" emerge media-gfx/graphviz

绘图

dot的文档可以参考官方pdf

节点和箭头

dot的节点创建和连接非常方便,直接a->b就可以创建节点a和b,并且用默认的箭头连接两个节点。


1
2
3
digraph e1 {
    a->b;
}

使用命令生成png图片:


1
dot -Tpng -o e1.png e1.dot

生成的效果图片为: e1

节点属性

节点也可以先“声明”再使用(连接),节点可以用方括号添加属性,比如节点标签、形状、填充、颜色等。
稍稍修改a、b的形状:


1
2
3
4
5
digraph e2 {
    a[label="label a" shape="box3d" color="red"];
    b[label="label b" shape="box" style="filled" fillcolor="yellow"];
    a->b;
}

生成效果图: e2

集群(cluster)

dot中可以设置子图,默认子图是不会显示边框和名称。除非子图的名字是cluster开头:


1
2
3
4
5
6
7
8
9
10
11
12
digraph e3 {
    subgraph cluster0 {
        a->{b c}
        label="cluster 0"
    }
    subgraph cluster1 {
        x->{y z}
        label="cluster 1"
    }
    edge[constraint=false];
    a->x;
}

效果图: e3

这里通过设置edge的constraint为false,不让右边的子图“掉下去”。

docker服务发现——最终测试

最终测试

首先启动confd,按照前面的命令,去掉onetime参数,放到后台最为守护进程长期运行,确保etcd注册目录修改之后,能准实时生成haproxy的配置文件。

然后在两台slave,一台启动两个nginx容器,一台启动一台,模拟上面的a.abc.com和b.abc.com两个域名。


1
docker run -P -v `pwd`/html:/var/www/html -d dockerfile/nginx

这里暴露所有的端口(80和443),然后挂载当前的html目录给该容器,再html目录中创建一个1.html文件,
包含容器id、内部ip、外部ip作为测试。同样启动两个之后,通过master上的etcdctl配置这几个启动的容器:


1
2
3
4
5
6
etcdctl set /services/web/a.abc.com/server1/ip 10.211.55.12
etcdctl set /services/web/a.abc.com/server1/port 49154
etcdctl set /services/web/a.abc.com/server2/ip 10.211.55.12
etcdctl set /services/web/a.abc.com/server2/port 49156
etcdctl set /services/web/b.abc.com/server1/ip 10.211.55.13
etcdctl set /services/web/b.abc.com/server1/port 49154

confd会间歇性检查目录修改状态:

INFO /home/babydragon/haproxy/haproxy.cfg has md5sum c8fb4ae9c10086b9f94cd11d0edecec1 should be 048c844d73c062014c0fd77d9548c47d

2015-02-09T11:42:00+08:00 master confd[3781]: INFO Target config /home/babydragon/haproxy/haproxy.cfg out of sync

2015-02-09T11:42:00+08:00 master confd[3781]: INFO Target config /home/babydragon/haproxy/haproxy.cfg has been updated

然后haproxy被更新:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
acl is_a.abc.com hdr(host) -i a.abc.com

acl is_b.abc.com hdr(host) -i b.abc.com



use_backend a.abc.com_cluster if is_a.abc.com

use_backend b.abc.com_cluster if is_b.abc.com



backend a.abc.com_cluster
cookie SERVERID insert indirect nocache

server server1 10.211.55.12:49154 cookie server1 check

server server2 10.211.55.12:49156 cookie server2 check


backend b.abc.com_cluster
cookie SERVERID insert indirect nocache

server server1 10.211.55.13:49154 cookie server1 check

重新启动haproxy的容器(没有配置直接加载haproxy.cfg),查看status页面,两个backend都已经生效。通过curl模拟下:


1
curl -H "Host: a.abc.com" http://10.211.55.11:49154/1.html

I am a80b37f78259 on 172.17.0.4 (Host: 10.211.55.12)


1
curl -H "Host: a.abc.com" http://10.211.55.11:49154/1.html

I am 209b20bab7ce on 172.17.0.3 (Host: 10.211.55.12)

由于配置了负载均衡为轮询方式,两次请求被落到了不同的容器上,haproxy正确的将请求分发到了两个容器中。

docker服务发现——confd

confd

confd通过读取配置(支持etcd,consul,环境变量),通过go的模板,生成最终的配置文件。

安装

安装和etcd一样,非常方便,已经提供了64位的可执行程序,下载下来之后直接放到PATH中(/usr/local/bin)即可(别忘了+x)。

haproxy配置生成

confd配置文件默认在/etc/confd中,可以通过参数-confdir指定。目录中包含两个子目录,分别是:conf.d templates。
confd会先读取conf.d目录中的配置文件(toml格式),然后根据文件指定的模板路径去渲染模板。

基于之前配置的etcd集群,读取/services/web中的目录和key。结构为:


1
2
/services/web/$DOMAIN/$HOST/ip
                          |-port

其中DOMAIN表示注册应用的域名,HOST表示注册机器的主机名。

首先创建confd配置文件:


1
2
3
4
5
6
7
[template]  
src = "haproxy.cfg.tmpl"  
dest = "/home/babydragon/haproxy/haproxy.cfg"  
keys = [  
"/services/web",  
]  
#reload_cmd = "/etc/init.d/haproxy reload"

现在只测试模板生成,所以不关注检查、重启等命令,配置模板名称、目标路径和获取的key即可。

着重看下模板变动部分(前面就是写死的一些haproxy的标准配置,如日志等)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
frontend webin
bind :80

{{$domains := lsdir "/services/web"}}
{{range $domain := $domains}}
acl is_{{$domain}} hdr(host) -i {{$domain}}
{{end}}

{{range $domain := $domains}}
use_backend {{$domain}}_cluster if is_{{$domain}}
{{end}}

{{range $domain := $domains}}
backend {{$domain}}_cluster
cookie SERVERID insert indirect nocache
{{$domain_dir := printf "/services/web/%s" $domain}}{{range $host := lsdir $domain_dir}}
server {{base $host}} {{$ip_key := printf "/services/web/%s/%s/ip" $domain $host}}{{getv $ip_key}}:{{$port_key := printf "/services/web/%s/%s/port" $domain $host}}{{getv $port_key}} cookie {{base $host}} check
{{end}}
{{end}}

这里主要有两个循环,第一个循环所有的域名,第一个循环每一个域名下的所有主机。
haproxy需要通过设置acl的方式来进行按照域名做负载均衡。因此首先循环域名,为每个域名创建一个acl规则和一个规则的使用。
下面再通过一个循环,创建没个域名对应的后段。

confd模板详细文档可以参考github上的文档
大致的意思是通过lsdir获取当前目录下的所有子目录。第一层子目录为域名,根据域名即可生成acl规则、规则使用、后端名称等数据。
再重新通过瓶装域名目录,对域名目录执行lsdir,读取目录下的每个主机名,创建后端的server条目(一个域名下的负载均衡后段服务器),
同时获取挂在这个目录下的属性键值对(这里只有ip和port)。

创建完模板和配置之后,先构造一些测试数据:


1
2
3
4
5
6
7
8
9
10
11
12
etcdctl mkdir /services/web
etcdctl mkdir /services/web/a.abc.com
etcdctl mkdir /services/web/b.abc.com
etcdctl mkdir /services/web/a.abc.com/server1
etcdctl mkdir /services/web/a.abc.com/server2
etcdctl mkdir /services/web/b.abc.com/server1
etcdctl set /services/web/a.abc.com/server1/ip 10.0.0.1
etcdctl set /services/web/a.abc.com/server1/port 10000
etcdctl set /services/web/a.abc.com/server2/port 10001
etcdctl set /services/web/a.abc.com/server2/ip 10.0.0.1
etcdctl set /services/web/b.abc.com/server1/ip 10.0.0.2
etcdctl set /services/web/b.abc.com/server1/port 12345

这里模拟三个容器,其中两个作为域名a.abc.com运行容器,一个作为b.abc.com容器。

然后执行confd,检查生成的配置文件:


1
confd -confdir ./confd -onetime -backend etcd -node 127.0.0.1:4001

刚才那段模板渲染后为:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
frontend webin
    bind :80



    acl is_a.abc.com hdr(host) -i a.abc.com

    acl is_b.abc.com hdr(host) -i b.abc.com



    use_backend a.abc.com_cluster if is_a.abc.com

    use_backend b.abc.com_cluster if is_b.abc.com



backend a.abc.com_cluster
    cookie SERVERID insert indirect nocache

    server server1 10.0.0.1:10000 cookie server1 check

    server server2 10.0.0.1:10001 cookie server2 check


backend b.abc.com_cluster
    cookie SERVERID insert indirect nocache

    server server1 10.0.0.2:12345 cookie server1 check

docker服务发现——etcd集群

etcd

etcd用于服务发现的基础注册和通知,功能类似于zk,通过注册和监听,实现基础的服务发现。

安装

etcd安装非常简单,可以用go自己编译,etcd也提供了可以直接使用的二进制包(64位)。
具体的安装提示页面在github上,
直接按照上面的描述下载即可。为了方便,把里面的etcd相关的二进制文件(etcd, etcdctl等)
复制到了/usr/local/bin中,方便后续使用。

运行

首先尝试单机版启动,参照手册先直接启动,etcd默认监听的是localhost,既只监听了lo设备,
这样会导致启动后集群中的其他机器无法访问,因此在启动的时候将默认的localhost改成0.0.0.0,
确保etcd监听了所有网卡。


1
etcd -listen-client-urls "http://0.0.0.0:4001" -listen-peer-urls="http://0.0.0.0:7001"

启动之后可通过rest接口或者etcdctl执行命令:


1
curl -L http://127.0.0.1:4001/version

输出结果为:

{“releaseVersion”:”2.0.0″,”internalVersion”:”2″}

简单写入和读取:


1
curl -L http://127.0.0.1:4001/v2/keys/message -XPUT -d value="Hello world"

{“action”:”set”,”node”:{“key”:”/message”,”value”:”Hello world”,”modifiedIndex”:3,”createdIndex”:3}}


1
curl -L http://127.0.0.1:4001/v2/keys/message

{“action”:”get”,”node”:{“key”:”/message”,”value”:”Hello world”,”modifiedIndex”:3,”createdIndex”:3}}

集群启动

之前的启动方式为单机启动,集群创建官方给了很多种方式,这里尝试动态添加机器的方式。


1
2
3
4
etcd -name "node1" -initial-cluster "node1=http://10.211.55.11:2380"
    -initial-advertise-peer-urls "http://10.211.55.11:2380"
    -listen-client-urls "http://0.0.0.0:4001"
    -listen-peer-urls="http://0.0.0.0:2380"

启动之后,通过member api,可以查看到当前集群:


1
curl -L http://10.211.55.11:4001/v2/members

{“members”:[{“id”:”f0b31008acf03099″,”name”:”node1″,”peerURLs”:[“http://10.211.55.11:2380″,”http://10.211.55.11:7001″],”clientURLs”:[“http://localhost:2379″,”http://localhost:4001″]}]}

向集群中添加一台机器:


1
etcdctl member add node2 http://10.211.55.12:2380

Added member named node2 with ID 6d345c68496f80fc to cluster

ETCD_NAME=”node2″
ETCD_INITIAL_CLUSTER=”node2=http://10.211.55.12:2380,node1=http://10.211.55.11:2380,node1=http://10.211.55.11:7001″
ETCD_INITIAL_CLUSTER_STATE=”existing”

这里为了方便,没有只用rest接口,直接使用了etcdctl,该命令会提示在启动新的节点时所需要配置的环境变量。

启动新的节点:


1
2
3
4
5
6
7
export ETCD_NAME="node2"
export ETCD_INITIAL_CLUSTER="node1=http://10.211.55.11:2380,node2=http://10.211.55.12:2380"
export ETCD_INITIAL_CLUSTER_STATE="existing"
etcd -listen-client-urls http://0.0.0.0:4001
    -advertise-client-urls http://0.0.0.0:4001
    -listen-peer-urls http://10.211.55.12:2380
    -initial-advertise-peer-urls http://10.211.55.12:2380

启动之后,通过member接口查看当前集群机器:


1
etcdctl member list

293ea5ba1d70f5f1: name=node2 peerURLs=http://10.211.55.12:2380 clientURLs=http://0.0.0.0:4001

bd93686a68a54c2d: name=node1 peerURLs=http://10.211.55.11:2380 clientURLs=http://localhost:2379,http://localhost:4001

同样的方式再添加一台,成为基础的3台机器集群:


1
2
3
4
5
6
7
export ETCD_NAME="node3"
export ETCD_INITIAL_CLUSTER="node2=http://10.211.55.12:2380,node3=http://10.211.55.13:2380,node1=http://10.211.55.11:2380"
export ETCD_INITIAL_CLUSTER_STATE="existing"
etcd -listen-client-urls http://0.0.0.0:4001
    -advertise-client-urls http://0.0.0.0:4001
    -listen-peer-urls http://10.211.55.13:2380
    -initial-advertise-peer-urls http://10.211.55.13:2380

最终集群为:

293ea5ba1d70f5f1: name=node2 peerURLs=http://10.211.55.12:2380 clientURLs=http://0.0.0.0:4001

76610041ace6c4f8: name=node3 peerURLs=http://10.211.55.13:2380 clientURLs=http://0.0.0.0:4001

bd93686a68a54c2d: name=node1 peerURLs=http://10.211.55.11:2380 clientURLs=http://localhost:2379,http://localhost:4001

在node1节点机器上运行:


1
etcdctl set /message hello

之后,在三台机器中执行:


1
etcdctl get /message

都能够正确的获取这个key的值:hello。

使用shc加密shell脚本

使用shc加密shell脚本

使用

之前为了防止svn认证问题,都是直接把svn的用户名、密码写在脚本里面的。由于明文密码非常不安全,所以必须采取密文保存。
为了不影响原有脚本流程,采用了抽取svn命令为独立脚本,在这个脚本里面写上用户名和密码,并加密保存。

加密采用了shc这个工具进行。该工具采用arc4的方式加密数据,而且比较简单,运行之后会根据输入的脚本(svn.sh)生成两个文件:
svn.sh.x这个是加密后的可执行程序,svn.sh.x.c这个是那个可执行程序的C源码。最终运行只需要前面那个.x结尾的可执行程序即可。

简单说下我的使用,刚开始什么参数都不加,最终生成的.x文件为动态链接,虽然没有什么依赖,但是需要动态链接glibc。由于我本地的
glibc版本远大于服务器上的,所以导致这个文件无法运行,提示找不到GLIBC 2.6找不到。查看了帮助文档之后,可以通过设置CFLAGS
环境变量,修改编译参数,再加上其他参数,最终的命令为:


1
CFLAGS="-static" shc -v -T -r -f svn.sh

其中通过环境变量,让编译输出的.x文件变成静态链接,这样就不需要依赖主机的任何动态链接库;-v参数输出命令的详细输出;
-T参数增加traceable,让输出的可执行程序可追踪;-r参数生成可分发的二进制程序。

这样原先脚本执行svn,只要直接改成svn.sh.x,去掉原先的用户名、密码,这样密码就被加密保存了。

原理

shc采用arc4加密,在命令中,大致通过
– 处理输入参数
– 读取脚本
– 解析使用的shell
– 生成C源码文件
– 编译C源码文件
这几步完成。在生成源码文件的时候,会对内容进行arc4加密,C源码中重新进行arc4进行解密,最终通过execvp调用运行最终的脚本。

几个参数的大致实现:

-T

如果不设置这个变量,生成的可执行文件将不能被trace


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void untraceable(char * argv0)
{
    char proc[80];
    int pid, mine;

    switch(pid = fork()) {
        case  0:
            pid = getppid();
            /* For problematic SunOS ptrace */
            #if defined(__FreeBSD__)
            sprintf(proc, "/proc/%d/mem", (int)pid);
            #else
            sprintf(proc, "/proc/%d/as",  (int)pid);
            #endif
            close(0);
            mine = !open(proc, O_RDWR|O_EXCL);
            if (!mine && errno != EBUSY)
                mine = !ptrace(PTRACE_ATTACH, pid, 0, 0);
                if (mine) {
                    kill(pid, SIGCONT);
                } else {
                    perror(argv0);
                    kill(pid, SIGKILL);
                }
                _exit(mine);
        case -1:
            break;
        default:
            if (pid == waitpid(pid, 0, 0))
                return;
    }
    perror(argv0);
    _exit(1);
}

首先fock一个进程,试图去通过ptrace调用挂载到新fock出来的进程上,如果成功了,说明没有被其他进程trace,则发送SIGCONT信号继续执行,
否则直接发送SIGKILL信号终止进程继续执行。但是在实际使用过程中,不知道为什么,偶尔会让服务器上多出T状态的进程,看上是因为ptrace调用
向进程发送了SIGSTOP之后,再次发送SIGCONT没有成功。因此后面都加上了-T参数,跳过这个不能追踪设置。当前这样会大大降低安全性。

-e

这个参数可以指定可执行程序失效时间,超过这个时间加密后的可执行程序将无法运行。实现也很简单直接把加密后的过期时间放在了生成的C代码前,
执行shell之前,会有


1
2
if (date[0] && (atoll(date)<time(NULL)))
    return msg1;

这样的判断,直接阻止程序的运行。

[redis设计与实现][14]sentinel——jedis客户端

sentinel客户端支持——jedis

简单的代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class App
{  
    public static void main( String[] args )  
    {  
        Set<String> sentinels = new HashSet<String>();
        // 添加sentinel
        sentinels.add("172.18.18.207:26379");
        // 初始化基于sentinel的连接池,需要一个master的名字(用于查询这个master)
        JedisSentinelPool pool = new JedisSentinelPool("mymaster", sentinels);  

        // 后面使用和标准的jedis连接池相同
        Jedis jedis = pool.getResource();  

        jedis.set("jedis", "jedis");  

        pool.returnResource(jedis);  

    }
}

JedisSentinelPool的源码在github上托管。
大致的流程:
1. 根据给定的sentinel列表和master名字,获得master信息(IP,端口)(通过sentinel master命令)
2. 启动一个线程,根据给定的sentinel列表,订阅+switch-master消息
3. 一旦发现有master切换,调用initPool方法,修改连接池使用的master

[redis设计与实现][13]sentinel——故障恢复

选举领头sentinel

当sentinelStartFailoverIfNeeded判断需要进入故障恢复(failover)的时候,会调用sentinelStartFailover函数,开始进入failover状态。
这时,会标记master的failover_state为SENTINEL_FAILOVER_STATE_WAIT_START。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
    /* We can't failover if the master is not in O_DOWN state. */
    if (!(master->flags & SRI_O_DOWN)) return 0;

    /* Failover already in progress? */
    if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;

    /* Last failover attempt started too little time ago? */
    if (mstime() - master->failover_start_time <
        master->failover_timeout*2)
    {
        if (master->failover_delay_logged != master->failover_start_time) {
            time_t clock = (master->failover_start_time +
                            master->failover_timeout*2) / 1000;
            char ctimebuf[26];

            ctime_r(&clock,ctimebuf);
            ctimebuf[24] = '\0'; /* Remove newline. */
            master->failover_delay_logged = master->failover_start_time;
            redisLog(REDIS_WARNING,
                "Next failover delay: I will not start a failover before %s",
                ctimebuf);
        }
        return 0;
    }

    sentinelStartFailover(master);
    return 1;
}
void sentinelStartFailover(sentinelRedisInstance *master) {
    redisAssert(master->flags & SRI_MASTER);

    master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
    master->flags |= SRI_FAILOVER_IN_PROGRESS;
    master->failover_epoch = ++sentinel.current_epoch;
    sentinelEvent(REDIS_WARNING,"+new-epoch",master,"%llu",
        (unsigned long long) sentinel.current_epoch);
    sentinelEvent(REDIS_WARNING,"+try-failover",master,"%@");
    master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
    master->failover_state_change_time = mstime();
}

按照前面发送is-master-down-by-addr询问的逻辑,这时候sentinel在发送is-master-down-by-addr询问的时候,
会带上自己的run id,表示要选举一个局部领头sentinel。大致选举规则和方法:
* 所有在线的sentinel都有被选为领头sentinel的资格
* 每次进行领头sentinel选举之后,不论选举是否成功,所有sentinel的配置纪元的值都会自增一次。
* 在一个配置纪元里面,所有sentinel都有一次将某个sentinel设置为局部领头sentinel的机会,并且一旦设置,这个配置新纪元就不能再修改
* 每个发现主服务器进入客观下线的sentinel都会要求其他sentinel将自己设置为局部领头sentinel
* 当一个sentinel(源sentinel)向另一个sentinel(目标sentinel)发送is-master-down-by-addr询问,表示要求对方将自己设置为局部领头sentinel
* 局部领头sentinel是先到先得:只有第一个发送is-master-down-by-addr询问的sentinel被设为局部领头sentinel,后续的都会被拒绝
* 目标sentinel回复is-master-down-by-addr询问,回复中会带上局部领头sentinel运行id和配置纪元
* 源sentinel收到目标sentinel返回之后,会检查其中的leader_epoch参数和自己的配置纪元是否相同,如果相同再比较其中的运行id是否和自己的相同,
如果相同,目标sentinel将源sentinel设置成了局部领头sentinel
* 如果有某个sentinel被半数以上sentinel设置局部领头sentinel,则这个sentinel成为领头sentinel
* 因为领头sentinel的产生需要半数以上sentinel支持,并且每个sentinel在一个配置纪元里面只能设置一次局部sentinel,所以一个配置纪元里面,
只会出现一个领头sentinel
* 如果在给定时限内没有选举出领头sentinel,将会在一段时间后再次进行选举。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
void sentinelCommand(redisClient *c) {
    ...
    else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
        /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> <current-epoch> <runid>*/
        sentinelRedisInstance *ri;
        long long req_epoch;
        uint64_t leader_epoch = 0;
        char *leader = NULL;
        long port;
        int isdown = 0;

        if (c->argc != 6) goto numargserr;
        if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK ||
            getLongLongFromObjectOrReply(c,c->argv[4],&req_epoch,NULL)
                                                              != REDIS_OK)
            return;
        ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
            c->argv[2]->ptr,port,NULL);

        /* It exists? Is actually a master? Is subjectively down? It's down.
         * Note: if we are in tilt mode we always reply with "0". */
        if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
                                    (ri->flags & SRI_MASTER))
            isdown = 1;

        /* Vote for the master (or fetch the previous vote) if the request
         * includes a runid, otherwise the sender is not seeking for a vote. */
         // 这个时候发过来的是sentinel的run_id,所以要给出一个局部领头sentinel
        if (ri && ri->flags & SRI_MASTER && strcasecmp(c->argv[5]->ptr,"*")) {
            leader = sentinelVoteLeader(ri,(uint64_t)req_epoch,
                                            c->argv[5]->ptr,
                                            &leader_epoch);
        }

        /* Reply with a three-elements multi-bulk reply:
         * down state, leader, vote epoch. */
        addReplyMultiBulkLen(c,3);
        addReply(c, isdown ? shared.cone : shared.czero);
        addReplyBulkCString(c, leader ? leader : "*");
        addReplyLongLong(c, (long long)leader_epoch);
        if (leader) sdsfree(leader);
    }
    ...
}

/* Vote for the sentinel with 'req_runid' or return the old vote if already
 * voted for the specifed 'req_epoch' or one greater.
 *
 * If a vote is not available returns NULL, otherwise return the Sentinel
 * runid and populate the leader_epoch with the epoch of the vote. */
char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch) {
    // 源sentinel纪元比当前的新,更新纪元
    if (req_epoch > sentinel.current_epoch) {
        sentinel.current_epoch = req_epoch;
        sentinelFlushConfig();
        sentinelEvent(REDIS_WARNING,"+new-epoch",master,"%llu",
            (unsigned long long) sentinel.current_epoch);
    }
    // 当前sentinel在当前纪元还没有领头sentinel,设置源sentinel为局部领头sentinel
    if (master->leader_epoch < req_epoch && sentinel.current_epoch <= req_epoch)
    {
        sdsfree(master->leader);
        // 设置局部领头sentinel为源sentinel
        master->leader = sdsnew(req_runid);
        master->leader_epoch = sentinel.current_epoch;
        sentinelFlushConfig();
        sentinelEvent(REDIS_WARNING,"+vote-for-leader",master,"%s %llu",
            master->leader, (unsigned long long) master->leader_epoch);
        /* If we did not voted for ourselves, set the master failover start
         * time to now, in order to force a delay before we can start a
         * failover for the same master. */
        if (strcasecmp(master->leader,server.runid))
            master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
    }

    *leader_epoch = master->leader_epoch;
    return master->leader ? sdsnew(master->leader) : NULL;
}

这时候,通过is-master-down-by-addr询问,已经选举出局部领头sentinel,然后继续回到sentinelHandleRedisInstance,
sentinelFailoverStateMachine方法会继续这个选举:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
    redisAssert(ri->flags & SRI_MASTER);

    if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;

    switch(ri->failover_state) {
        case SENTINEL_FAILOVER_STATE_WAIT_START:
            sentinelFailoverWaitStart(ri);
            break;
        case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
            sentinelFailoverSelectSlave(ri);
            break;
        case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
            sentinelFailoverSendSlaveOfNoOne(ri);
            break;
        case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
            sentinelFailoverWaitPromotion(ri);
            break;
        case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
            sentinelFailoverReconfNextSlave(ri);
            break;
    }
}
void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
    char *leader;
    int isleader;

    /* Check if we are the leader for the failover epoch. */
    // 在等待开始failover的时候,检查leader是否已经选举出来了
    leader = sentinelGetLeader(ri, ri->failover_epoch);
    isleader = leader && strcasecmp(leader,server.runid) == 0;
    sdsfree(leader);

    /* If I'm not the leader, and it is not a forced failover via
     * SENTINEL FAILOVER, then I can't continue with the failover. */
    // 没有被选上作为领头sentinel,放弃进入failover,由领头sentinel去完成
    if (!isleader && !(ri->flags & SRI_FORCE_FAILOVER)) {
        int election_timeout = SENTINEL_ELECTION_TIMEOUT;

        /* The election timeout is the MIN between SENTINEL_ELECTION_TIMEOUT
         * and the configured failover timeout. */
        if (election_timeout > ri->failover_timeout)
            election_timeout = ri->failover_timeout;
        /* Abort the failover if I'm not the leader after some time. */
        if (mstime() - ri->failover_start_time > election_timeout) {
            sentinelEvent(REDIS_WARNING,"-failover-abort-not-elected",ri,"%@");
            sentinelAbortFailover(ri);
        }
        return;
    }
    sentinelEvent(REDIS_WARNING,"+elected-leader",ri,"%@");
    ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
    ri->failover_state_change_time = mstime();
    sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
}
char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) {
    dict *counters;
    dictIterator *di;
    dictEntry *de;
    unsigned int voters = 0, voters_quorum;
    char *myvote;
    char *winner = NULL;
    uint64_t leader_epoch;
    uint64_t max_votes = 0;

    redisAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
    // 用于统计sentinel获得的票数,key为sentinel的run_id,value为获得的票数
    counters = dictCreate(&leaderVotesDictType,NULL);

    voters = dictSize(master->sentinels)+1; /* All the other sentinels and me. */

    /* Count other sentinels votes */
    di = dictGetIterator(master->sentinels);
    // 迭代所有sentinels,统计每个sentinel记录的局部领头sentinel
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);
        // 只能统计当前纪元的
        if (ri->leader != NULL && ri->leader_epoch == sentinel.current_epoch)
            sentinelLeaderIncr(counters,ri->leader);
    }
    dictReleaseIterator(di);

    /* Check what's the winner. For the winner to win, it needs two conditions:
     * 1) Absolute majority between voters (50% + 1).
     * 2) And anyway at least master->quorum votes. */
    // 获得刚统计的获得票数最多的sentinel的run_id和最大票数
    di = dictGetIterator(counters);
    while((de = dictNext(di)) != NULL) {
        uint64_t votes = dictGetUnsignedIntegerVal(de);

        if (votes > max_votes) {
            max_votes = votes;
            winner = dictGetKey(de);
        }
    }
    dictReleaseIterator(di);

    /* Count this Sentinel vote:
     * if this Sentinel did not voted yet, either vote for the most
     * common voted sentinel, or for itself if no vote exists at all. */
    // 当前sentinel的票数,确定自己能否成为领头sentinel
    if (winner)
        myvote = sentinelVoteLeader(master,epoch,winner,&leader_epoch);
    else
        myvote = sentinelVoteLeader(master,epoch,server.runid,&leader_epoch);

    if (myvote && leader_epoch == epoch) {
        uint64_t votes = sentinelLeaderIncr(counters,myvote);

        if (votes > max_votes) {
            max_votes = votes;
            winner = myvote;
        }
    }

    voters_quorum = voters/2+1;
    // 必须大于半数,且大于配置的quonum
    if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
        winner = NULL;

    winner = winner ? sdsnew(winner) : NULL;
    sdsfree(myvote);
    dictRelease(counters);
    return winner;
}

这样,领头sentinel就已经被选择出来,并且failover状态已经变成了SENTINEL_FAILOVER_STATE_SELECT_SLAVE,
当前sentinel作为领头sentinel,将会真正完成master的failover。

故障转移

故障转移(failover)的第一步,就是选出新的master,大致的筛选流程为:
1. 删除列表中所有处于下线或者断线状态的slave
2. 删除列表中所有最近五秒内没有回复过领头sentinel的INFO命令的slave
3. 删除所有与已下线主服务器连接断开超过down-after-milliseconds * 10毫秒的slave(确保slave没有过早与master断开,副本比较新)
4. 根据slave优先级选择
5. 如果优先级相同,选择复制偏移量最大的slave
6. 如果都相同,按照run_id排序,选出run_id最小的slave


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
    // 选出slave
    sentinelRedisInstance *slave = sentinelSelectSlave(ri);

    /* We don't handle the timeout in this state as the function aborts
     * the failover or go forward in the next state. */
    if (slave == NULL) {
        sentinelEvent(REDIS_WARNING,"-failover-abort-no-good-slave",ri,"%@");
        sentinelAbortFailover(ri);
    } else {
        // 修改状态为SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE
        sentinelEvent(REDIS_WARNING,"+selected-slave",slave,"%@");
        slave->flags |= SRI_PROMOTED;
        ri->promoted_slave = slave;
        ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
        ri->failover_state_change_time = mstime();
        sentinelEvent(REDIS_NOTICE,"+failover-state-send-slaveof-noone",
            slave, "%@");
    }
}
sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
    sentinelRedisInstance **instance =
        zmalloc(sizeof(instance[0])*dictSize(master->slaves));
    sentinelRedisInstance *selected = NULL;
    int instances = 0;
    dictIterator *di;
    dictEntry *de;
    mstime_t max_master_down_time = 0;

    // 计算最长同步延迟
    if (master->flags & SRI_S_DOWN)
        max_master_down_time += mstime() - master->s_down_since_time;
    max_master_down_time += master->down_after_period * 10;

    di = dictGetIterator(master->slaves);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *slave = dictGetVal(de);
        mstime_t info_validity_time;

        // 已经断开的slave,直接忽略
        if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_DISCONNECTED)) continue;
        // 超过5倍ping间隔的slave也忽略
        if (mstime() - slave->last_avail_time > SENTINEL_PING_PERIOD*5) continue;
        if (slave->slave_priority == 0) continue;

        /* If the master is in SDOWN state we get INFO for slaves every second.
         * Otherwise we get it with the usual period so we need to account for
         * a larger delay. */
        if (master->flags & SRI_S_DOWN)
            info_validity_time = SENTINEL_PING_PERIOD*5;
        else
            info_validity_time = SENTINEL_INFO_PERIOD*3;
        // INFO响应超过有效时间,忽略
        if (mstime() - slave->info_refresh > info_validity_time) continue;
        // 和master断开的时间太长,忽略
        if (slave->master_link_down_time > max_master_down_time) continue;
        instance[instances++] = slave;
    }
    dictReleaseIterator(di);
    if (instances) {
        // 快速排序
        qsort(instance,instances,sizeof(sentinelRedisInstance*),
            compareSlavesForPromotion);
        selected = instance[0];
    }
    zfree(instance);
    return selected;
}
int compareSlavesForPromotion(const void *a, const void *b) {
    sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
                          **sb = (sentinelRedisInstance **)b;
    char *sa_runid, *sb_runid;

    // 先根据slave优先级排序
    if ((*sa)->slave_priority != (*sb)->slave_priority)
        return (*sa)->slave_priority - (*sb)->slave_priority;

    /* If priority is the same, select the slave with greater replication
     * offset (processed more data frmo the master). */
    // 优先级相同,根据复制偏移量
    if ((*sa)->slave_repl_offset > (*sb)->slave_repl_offset) {
        return -1; /* a < b */
    } else if ((*sa)->slave_repl_offset < (*sb)->slave_repl_offset) {
        return 1; /* b > a */
    }

    /* If the replication offset is the same select the slave with that has
     * the lexicographically smaller runid. Note that we try to handle runid
     * == NULL as there are old Redis versions that don't publish runid in
     * INFO. A NULL runid is considered bigger than any other runid. */
    // 到这里选哪个都无所谓了,按照runid来选择
    sa_runid = (*sa)->runid;
    sb_runid = (*sb)->runid;
    if (sa_runid == NULL && sb_runid == NULL) return 0;
    else if (sa_runid == NULL) return 1;  /* a > b */
    else if (sb_runid == NULL) return -1; /* a < b */
    return strcasecmp(sa_runid, sb_runid);
}

这样,新的master已经被选出来了。failover的状态变成了SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE。

新的master被选出来之后,需要对新的master发送命令,让它角色发生变化,从slave变成master:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
    int retval;

    /* We can't send the command to the promoted slave if it is now
     * disconnected. Retry again and again with this state until the timeout
     * is reached, then abort the failover. */
    if (ri->promoted_slave->flags & SRI_DISCONNECTED) {
        if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
            sentinelEvent(REDIS_WARNING,"-failover-abort-slave-timeout",ri,"%@");
            sentinelAbortFailover(ri);
        }
        return;
    }

    /* Send SLAVEOF NO ONE command to turn the slave into a master.
     * We actually register a generic callback for this command as we don't
     * really care about the reply. We check if it worked indirectly observing
     * if INFO returns a different role (master instead of slave). */
    // 发送slaveof no one命令,告知slave成为master
    // 由于是否成功通过info命令观察,所以这里发送的时候不关注slaveof的结果
    retval = sentinelSendSlaveOf(ri->promoted_slave,NULL,0);
    if (retval != REDIS_OK) return;
    sentinelEvent(REDIS_NOTICE, "+failover-state-wait-promotion",
        ri->promoted_slave,"%@");
    // 状态变成SENTINEL_FAILOVER_STATE_WAIT_PROMOTION
    ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
    ri->failover_state_change_time = mstime();
}

这样,就完成了slave到master的转变。failover状态变成了SENTINEL_FAILOVER_STATE_WAIT_PROMOTION。

在SENTINEL_FAILOVER_STATE_WAIT_PROMOTION这个状态,处理函数不真正去检查slave状态,而是通过定时的INFO命令来修改上位的slave。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
    /* Just handle the timeout. Switching to the next state is handled
     * by the function parsing the INFO command of the promoted slave. */
    if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
        sentinelEvent(REDIS_WARNING,"-failover-abort-slave-timeout",ri,"%@");
        sentinelAbortFailover(ri);
    }
}

void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
    ...
    // 发现有slave成为了master(之前发送了slaveof no one)
    if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
        /* If this is a promoted slave we can change state to the
         * failover state machine. */
        // 确定的确是sentinel发出的切换指令
        if ((ri->flags & SRI_PROMOTED) &&
            (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
            (ri->master->failover_state ==
                SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
        {
            /* Now that we are sure the slave was reconfigured as a master
             * set the master configuration epoch to the epoch we won the
             * election to perform this failover. This will force the other
             * Sentinels to update their config (assuming there is not
             * a newer one already available). */
            // 修改master配置纪元为新的纪元
            ri->master->config_epoch = ri->master->failover_epoch;
            // failover状态改成SENTINEL_FAILOVER_STATE_RECONF_SLAVES
            ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
            ri->master->failover_state_change_time = mstime();
            sentinelFlushConfig();
            sentinelEvent(REDIS_WARNING,"+promoted-slave",ri,"%@");
            sentinelEvent(REDIS_WARNING,"+failover-state-reconf-slaves",
                ri->master,"%@");
            sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
                "start",ri->master->addr,ri->addr);
            sentinelForceHelloUpdateForMaster(ri->master);
        } else {
            /* A slave turned into a master. We want to force our view and
             * reconfigure as slave. Wait some time after the change before
             * going forward, to receive new configs if any. */
            // 有一个自称是master的,强行把他变成slave
            mstime_t wait_time = SENTINEL_PUBLISH_PERIOD*4;

            if (!(ri->flags & SRI_PROMOTED) &&
                 sentinelMasterLooksSane(ri->master) &&
                 sentinelRedisInstanceNoDownFor(ri,wait_time) &&
                 mstime() - ri->role_reported_time > wait_time)
            {
                int retval = sentinelSendSlaveOf(ri,
                        ri->master->addr->ip,
                        ri->master->addr->port);
                if (retval == REDIS_OK)
                    sentinelEvent(REDIS_NOTICE,"+convert-to-slave",ri,"%@");
            }
        }
    }
    ...
}

在INFO命令回调中,处理了slave->master的切换,这时候failover状态变成了SENTINEL_FAILOVER_STATE_RECONF_SLAVES。

这个状态的时候,状态机处理函数会对所有slave发送slaveof命令,切换slave对应的master,重新建立主备关系。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
    dictIterator *di;
    dictEntry *de;
    int in_progress = 0;

    di = dictGetIterator(master->slaves);
    // 首先判断有多少处于重新建立主备关系
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *slave = dictGetVal(de);

        if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
            in_progress++;
    }
    dictReleaseIterator(di);

    di = dictGetIterator(master->slaves);
    // 控制网络传输等,确保并行创建主备的slave小于设置的阈值
    while(in_progress < master->parallel_syncs &&
          (de = dictNext(di)) != NULL)
    {
        sentinelRedisInstance *slave = dictGetVal(de);
        int retval;

        /* Skip the promoted slave, and already configured slaves. */
        if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;

        /* If too much time elapsed without the slave moving forward to
         * the next state, consider it reconfigured even if it is not.
         * Sentinels will detect the slave as misconfigured and fix its
         * configuration later. */
        // 同步超时,先不管
        if ((slave->flags & SRI_RECONF_SENT) &&
            (mstime() - slave->slave_reconf_sent_time) >
            SENTINEL_SLAVE_RECONF_TIMEOUT)
        {
            sentinelEvent(REDIS_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
            slave->flags &= ~SRI_RECONF_SENT;
            slave->flags |= SRI_RECONF_DONE;
        }

        /* Nothing to do for instances that are disconnected or already
         * in RECONF_SENT state. */
        // 已经在同步中
        if (slave->flags & (SRI_DISCONNECTED|SRI_RECONF_SENT|SRI_RECONF_INPROG))
            continue;

        /* Send SLAVEOF <new master>. */
        // 还没同步,发送slaveof命令,开始主备复制
        retval = sentinelSendSlaveOf(slave,
                master->promoted_slave->addr->ip,
                master->promoted_slave->addr->port);
        if (retval == REDIS_OK) {
            slave->flags |= SRI_RECONF_SENT;
            slave->slave_reconf_sent_time = mstime();
            sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent",slave,"%@");
            in_progress++;
        }
    }
    dictReleaseIterator(di);

    /* Check if all the slaves are reconfigured and handle timeout. */
    sentinelFailoverDetectEnd(master);
}

如果所有的slave都复制完毕,failover会进入SENTINEL_FAILOVER_STATE_UPDATE_CONFIG状态。

SENTINEL_FAILOVER_STATE_UPDATE_CONFIG状态会重置master,将master相关属性全部从原先的master改成被提升的master。
(sentinelResetMasterAndChangeAddress)。
这样整个failover的流程就结束了,redis集群又重新建立了新的主备关系。

[redis设计与实现][12]sentinel——故障检测

检查主观下线

sentinel每次发送PING命令,用于检测被监测的master和slave是否宕机。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int sentinelSendPing(sentinelRedisInstance *ri) {
    int retval = redisAsyncCommand(ri->cc,
        sentinelPingReplyCallback, NULL, "PING");
    if (retval == REDIS_OK) {
        ri->pending_commands++;
        /* We update the ping time only if we received the pong for
         * the previous ping, otherwise we are technically waiting
         * since the first ping that did not received a reply. */
         // 只有收到了PONG响应的时候,这个字段才会变成0
        if (ri->last_ping_time == 0) ri->last_ping_time = mstime();
        return 1;
    } else {
        return 0;
    }
}
void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
    ...
    if (strncmp(r->str,"PONG",4) == 0 ||
        strncmp(r->str,"LOADING",7) == 0 ||
        strncmp(r->str,"MASTERDOWN",10) == 0)
    {
        ri->last_avail_time = mstime();
        ri->last_ping_time = 0; /* Flag the pong as received. */
    }
    ...
}

只有收到PONG、LOADING、MASTERDOWN响应的时候,sentinel才认为master正常,记录最后的响应时间。

同时每次调用sentinelHandleRedisInstance函数的时候,都会去检查是否有监控的master活着slave已经主管超时。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
    mstime_t elapsed = 0;

    if (ri->last_ping_time)
        elapsed = mstime() - ri->last_ping_time;

    /* Check if we are in need for a reconnection of one of the
     * links, because we are detecting low activity.
     *
     * 1) Check if the command link seems connected, was connected not less
     *    than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have a
     *    pending ping for more than half the timeout. */
     // 检查命令连接是否已经超时
    if (ri->cc &&
        (mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
        ri->last_ping_time != 0 && /* Ther is a pending ping... */
        /* The pending ping is delayed, and we did not received
         * error replies as well. */
        (mstime() - ri->last_ping_time) > (ri->down_after_period/2) &&
        (mstime() - ri->last_pong_time) > (ri->down_after_period/2))
    {
        sentinelKillLink(ri,ri->cc);
    }

    /* 2) Check if the pubsub link seems connected, was connected not less
     *    than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
     *    activity in the Pub/Sub channel for more than
     *    SENTINEL_PUBLISH_PERIOD * 3.
     */
     // 检查订阅连接是否已经超时
    if (ri->pc &&
        (mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
        (mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
    {
        sentinelKillLink(ri,ri->pc);
    }

    /* Update the SDOWN flag. We believe the instance is SDOWN if:
     *
     * 1) It is not replying.
     * 2) We believe it is a master, it reports to be a slave for enough time
     *    to meet the down_after_period, plus enough time to get two times
     *    INFO report from the instance. */
    if (elapsed > ri->down_after_period ||
        (ri->flags & SRI_MASTER &&
         ri->role_reported == SRI_SLAVE &&
         mstime() - ri->role_reported_time >
          (ri->down_after_period+SENTINEL_INFO_PERIOD*2)))
    {
        /* Is subjectively down */
        if ((ri->flags & SRI_S_DOWN) == 0) {
            sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@");
            ri->s_down_since_time = mstime();
            ri->flags |= SRI_S_DOWN;
        }
    } else {
        /* Is subjectively up */
        if (ri->flags & SRI_S_DOWN) {
            sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@");
            ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
        }
    }
}

注意:同一个master的所有slave,主观下线超时时间相同(配置文件中配置的down-after-milliseconds),不同master可以独立配置。

检测客观下线


1
2
3
4
5
6
7
8
9
10
11
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
    ...
    /* Only masters */
    if (ri->flags & SRI_MASTER) {
        sentinelCheckObjectivelyDown(ri);
        if (sentinelStartFailoverIfNeeded(ri))
            sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
        sentinelFailoverStateMachine(ri);
        sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
    }
}

在sentinelHandleRedisInstance函数最后,针对master还会做这些操作:
* 检查客观下线
* 询问其他sentinel的检查结果
* 进行故障转移

首先先来看询问其他sentinel的检查结果:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#define SENTINEL_ASK_FORCED (1<<0)
void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
    dictIterator *di;
    dictEntry *de;

    // 迭代sentinels字典里面保存的所有已知sentinels
    di = dictGetIterator(master->sentinels);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);
        mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
        char port[32];
        int retval;

        /* If the master state from other sentinel is too old, we clear it. */
        if (elapsed > SENTINEL_ASK_PERIOD*5) {
            ri->flags &= ~SRI_MASTER_DOWN;
            sdsfree(ri->leader);
            ri->leader = NULL;
        }

        /* Only ask if master is down to other sentinels if:
         *
         * 1) We believe it is down, or there is a failover in progress.
         * 2) Sentinel is connected.
         * 3) We did not received the info within SENTINEL_ASK_PERIOD ms. */
         // 只有当自己已经检测到master已经进入主观下线状态才发送询问
        if ((master->flags & SRI_S_DOWN) == 0) continue;
        // 确保这个sentinel仍然连接着
        if (ri->flags & SRI_DISCONNECTED) continue;
        // 非强制,且在指定周期(1000ms)内还没有收到回复,暂时不发送
        if (!(flags & SENTINEL_ASK_FORCED) &&
            mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
            continue;

        /* Ask */
        ll2string(port,sizeof(port),master->addr->port);
        retval = redisAsyncCommand(ri->cc,
                    sentinelReceiveIsMasterDownReply, NULL,
                    "SENTINEL is-master-down-by-addr %s %s %llu %s",
                    master->addr->ip, port,
                    sentinel.current_epoch,
                    (master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
                    server.runid : "*");
        if (retval == REDIS_OK) ri->pending_commands++;
    }
    dictReleaseIterator(di);
}

发送消息包含:
* 被判断为主观下线的master IP(master->addr->ip)
* 被判断为主管下线的master端口(port)
* sentinel当前纪元
* 运行id,*表示用于检测该master是否客观下线,当前sentinel运行id用于选举头领

接收响应:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
    sentinelRedisInstance *ri = c->data;
    redisReply *r;
    REDIS_NOTUSED(privdata);

    if (ri) ri->pending_commands--;
    if (!reply || !ri) return;
    r = reply;

    /* Ignore every error or unexpected reply.
     * Note that if the command returns an error for any reason we'll
     * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
    // 响应格式:第一个元素表示是否同意主观下线,1表示同意,0表示不同意
    // 第二个元素,在查询状态的时候返回*
    // 第三个元素仅在第二个元素不为*的时候有效,其余情况都未0
    if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 &&
        r->element[0]->type == REDIS_REPLY_INTEGER &&
        r->element[1]->type == REDIS_REPLY_STRING &&
        r->element[2]->type == REDIS_REPLY_INTEGER)
    {
        ri->last_master_down_reply_time = mstime();
        if (r->element[0]->integer == 1) {
            ri->flags |= SRI_MASTER_DOWN;
        } else {
            ri->flags &= ~SRI_MASTER_DOWN;
        }
        if (strcmp(r->element[1]->str,"*")) {
            /* If the runid in the reply is not "*" the Sentinel actually
             * replied with a vote. */
            sdsfree(ri->leader);
            if ((long long)ri->leader_epoch != r->element[2]->integer)
                redisLog(REDIS_WARNING,
                    "%s voted for %s %llu", ri->name,
                    r->element[1]->str,
                    (unsigned long long) r->element[2]->integer);
            ri->leader = sdsnew(r->element[1]->str);
            ri->leader_epoch = r->element[2]->integer;
        }
    }
}

检查是否客观下线:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
    dictIterator *di;
    dictEntry *de;
    unsigned int quorum = 0, odown = 0;

    if (master->flags & SRI_S_DOWN) {
        /* Is down for enough sentinels? */
        quorum = 1; /* the current sentinel. */
        /* Count all the other sentinels. */
        // 遍历sentinels字典,统计标记master主观下线的sentinel数量
        di = dictGetIterator(master->sentinels);
        while((de = dictNext(di)) != NULL) {
            sentinelRedisInstance *ri = dictGetVal(de);

            if (ri->flags & SRI_MASTER_DOWN) quorum++;
        }
        dictReleaseIterator(di);
        if (quorum >= master->quorum) odown = 1;
    }

    /* Set the flag accordingly to the outcome. */
    if (odown) {
        if ((master->flags & SRI_O_DOWN) == 0) {
            sentinelEvent(REDIS_WARNING,"+odown",master,"%@ #quorum %d/%d",
                quorum, master->quorum);
            master->flags |= SRI_O_DOWN;
            master->o_down_since_time = mstime();
        }
    } else {
        if (master->flags & SRI_O_DOWN) {
            sentinelEvent(REDIS_WARNING,"-odown",master,"%@");
            master->flags &= ~SRI_O_DOWN;
        }
    }
}

有了前面通过is-master-down-by-addr询问和标记,就能够知道标记为主观下线的sentinel数量,如果这个数量超过了配置文件里面设置的quorum,
则标记该master进入客观下线。