通信
初始化完成之后,sentinel会主动和master、slave进行通信,获取他们的信息。
获取主服务器信息
首先,sentinel会和master建立两个连接,分别是命令连接和订阅连接(分别保存在sentinelRedisInstance的cc和pc字段中)。
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
sentinelReconnectInstance(ri);
...
}
#define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
命令连接用于后续获取master的信息(包括slave的信息),订阅(sentinel:hello频道)用于获取master的掉线状态等消息的推送。
连接完成之后,sentinel会定时发送消息:
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
mstime_t now = mstime();
mstime_t info_period, ping_period;
int retval;
/* Return ASAP if we have already a PING or INFO already pending, or
* in the case the instance is not properly connected. */
if (ri->flags & SRI_DISCONNECTED) return;
/* For INFO, PING, PUBLISH that are not critical commands to send we
* also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
* want to use a lot of memory just because a link is not working
* properly (note that anyway there is a redundant protection about this,
* that is, the link will be disconnected and reconnected if a long
* timeout condition is detected. */
if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;
/* If this is a slave of a master in O_DOWN condition we start sending
* it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
* period. In this state we want to closely monitor slaves in case they
* are turned into masters by another Sentinel, or by the sysadmin. */
// INFO命令默认发送间隔为 10s #define SENTINEL_INFO_PERIOD 10000
// 如果为已经宕机的master的slave,改1s
if ((ri->flags & SRI_SLAVE) &&
(ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
info_period = 1000;
} else {
info_period = SENTINEL_INFO_PERIOD;
}
/* We ping instances every time the last received pong is older than
* the configured 'down-after-milliseconds' time, but every second
* anyway if 'down-after-milliseconds' is greater than 1 second. */
// ping命令默认间隔1s #define SENTINEL_PING_PERIOD 1000
ping_period = ri->down_after_period;
if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;
if ((ri->flags & SRI_SENTINEL) == 0 &&
(ri->info_refresh == 0 ||
(now - ri->info_refresh) > info_period))
{
/* Send INFO to masters and slaves, not sentinels. */
retval = redisAsyncCommand(ri->cc,
sentinelInfoReplyCallback, NULL, "INFO");
if (retval == REDIS_OK) ri->pending_commands++;
} else if ((now - ri->last_pong_time) > ping_period) {
/* Send PING to all the three kinds of instances. */
sentinelSendPing(ri);
} else if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
/* PUBLISH hello messages to all the three kinds of instances. */
sentinelSendHello(ri);
}
}
处理INFO消息的返回:
void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = c->data;
redisReply *r;
REDIS_NOTUSED(privdata);
if (ri) ri->pending_commands--;
if (!reply || !ri) return;
r = reply;
if (r->type == REDIS_REPLY_STRING) {
sentinelRefreshInstanceInfo(ri,r->str);
}
}
这里针对向master发送的INFO,sentinel会:
1. 获取master run_id记录,检查master的role,更新自己维护的master列表
2. 通过复制字段,获取master对应的slave列表,更新自己维护的slave列表
获取slave信息
同样,sentinel也会以同样的频率向slave发送INFO命令,并且提取以下参数:
* run_id
* role
* master的host和port
* 主从服务器的连接状态(master_link_status)
* slave优先级(slave_priority)
* slave复制偏移量(slave_repl_offset)
并更新自己维护的sentinelRedisInstance结构。
发送和接受订阅信息
sentinel每秒通过命令连接向所有master和slave发送信息。
int sentinelSendHello(sentinelRedisInstance *ri) {
char ip[REDIS_IP_STR_LEN];
char payload[REDIS_IP_STR_LEN+1024];
int retval;
char *announce_ip;
int announce_port;
sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master;
sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master);
if (ri->flags & SRI_DISCONNECTED) return REDIS_ERR;
/* Use the specified announce address if specified, otherwise try to
* obtain our own IP address. */
if (sentinel.announce_ip) {
announce_ip = sentinel.announce_ip;
} else {
if (anetSockName(ri->cc->c.fd,ip,sizeof(ip),NULL) == -1)
return REDIS_ERR;
announce_ip = ip;
}
announce_port = sentinel.announce_port ?
sentinel.announce_port : server.port;
/* Format and send the Hello message. */
snprintf(payload,sizeof(payload),
"%s,%d,%s,%llu," /* Info about this sentinel. */
"%s,%s,%d,%llu", /* Info about current master. */
announce_ip, announce_port, server.runid,
(unsigned long long) sentinel.current_epoch,
/* --- */
master->name,master_addr->ip,master_addr->port,
(unsigned long long) master->config_epoch);
retval = redisAsyncCommand(ri->cc,
sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
SENTINEL_HELLO_CHANNEL,payload);
if (retval != REDIS_OK) return REDIS_ERR;
ri->pending_commands++;
return REDIS_OK;
}
发送内容包括:
* sentinel ip(announce_ip)
* sentinel端口(announce_port)
* sentinel运行id(server.runid)
* sentinel配置纪元(sentinel.current_epoch)
* master名称(master->name)
* master IP(master_addr->ip)
* master端口(master_addr->port)
* master纪元(master->config_epoch)
同时,所有连接到这个master上的sentinel都会收到这个消息,然后做出回应:
* 更新sentinels字典
* 创建连接其他sentinel的命令连接