coolEx

Today will be better

chrome爬页面上表格某列的数据

有的时候会有这样一个需求,页面上有个大表格,我需要复制里面的一列到本地。比如,我要从表格里面,复制列出来的所有机器IP(这样比数据库导出方便点~)

首先,先用chrome的开发者工具,找到要复制的列中的某一个单元格,然后选择复制xpath。这样会复制下来这个元素的xpath路径,比如:

//*[@id="machineGroupTable"]/tbody/tr[2]/td[2]

chrome的console,支持用$x()函数直接用xpath来定位元素,因此,可以通过类似这样的js,来获取刚选中单元格所在的所有列:

1
$x('//<em>[@id="machineGroupTable"]/tbody//td[2]')

这样返回的是chrome经过处理的xpath结果,直接就是dom的数组,因此可以直接遍历,获取单元格中的文本。

1
2
var ip=[];
$x('//</em>[@id="machineGroupTable"]/tbody//td[2]').forEach(function(e){ip.push(e.innerText)})

这样就把这列的所有内容,放到了ip这个数组中。

最后,把ip数组复制出来:

1
copy(ip.join('\n'))

copy也是chrome console内置的命令,可以把传进去的参数复制到剪切板。注意这里要自己join下,不然会直接输出json格式的字符串。

这样,表格的列已经被复制到了剪切板,直接粘贴到需要的文本中即可。

[redis设计与实现][9]复制

复制(Redis2.8)

设置主服务器的地址和端口(SLAVE OF命令)

SLAVEOF host port

Redis的主从复制设置非常方便,只需要在从服务器上设置主服务器的IP和端口即可。如果需要关闭主从同步,只需要执行SLAVEOF NO ONE即可。 该命令的具体描述见官方文档

void slaveofCommand(redisClient *c) {
    // 先处理no one,解除现有的主从同步 
    if (!strcasecmp(c->argv[1]->ptr,no) &&
        !strcasecmp(c->argv[2]->ptr,one)) {
        if (server.masterhost{
            replicationUnsetMaster();
            redisLog(REDIS_NOTICE,MASTER MODE enabled (user request));
        }
    } else {
        long port;
 
        if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK))
            return;
 
        /* Check if we are already attached to the specified slave */
        if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
            && server.masterport == port) {
            redisLog(REDIS_NOTICE,SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.);
            addReplySds(c,sdsnew(+OK Already connected to specified master\r\n));
            return;
        }
        /* There was no previous master or the user specified a different one,
         * we can continue. */
         // 设置新的主从同步,这里只是设置,然后直接返回 
        replicationSetMaster(c->argv[1]->ptr, port);
        redisLog(REDIS_NOTICE,SLAVE OF %s:%d enabled (user request),
            server.masterhost, server.masterport);
    }
    addReply(c,shared.ok);
}
 
void replicationSetMaster(char *ip, int port) {
    sdsfree(server.masterhost);
    server.masterhost = sdsdup(ip);
    server.masterport = port;
    //如果当前slave以前是master,断开所有原先的连接 
    if (server.master) freeClient(server.master);
    disconnectSlaves(); /* Force our slaves to resync with us as well. */
    replicationDiscardCachedMaster(); /* Don’t try a PSYNC. */
    freeReplicationBacklog(); /* Don’t allow our chained slaves to PSYNC. */
    cancelReplicationHandshake();
    server.repl_state = REDIS_REPL_CONNECT;
    server.master_repl_offset = 0;
}

可以看到,slaveof命令是一个异步命令,执行的时候只是设置了新的主服务器,然后就立马返回结果了。真正执行连接等操作的, 是在定时器中执行的。

/* Replication cron function — used to reconnect to master and
 * to detect transfer failures. */
run_with_period(1000) replicationCron();

建立套接字连接

提醒哦那个每隔1秒钟,会调用replicationCron函数,该函数会根据状态执行定时操作。当状态为REDIS_REPL_CONNECT的时候 执行逻辑为:

void replicationCron(void) {
    …
    /* Check if we should connect to a MASTER */
    if (server.repl_state == REDIS_REPL_CONNECT) {
        redisLog(REDIS_NOTICE,Connecting to MASTER %s:%d,
            server.masterhost, server.masterport);
        if (connectWithMaster() == REDIS_OK) {
            redisLog(REDIS_NOTICE,MASTER <-> SLAVE sync started);
        }
    }
    …
}
 
int connectWithMaster(void) {
    int fd;
 
    fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
    if (fd == -1{
        redisLog(REDIS_WARNING,Unable to connect to MASTER: %s,
            strerror(errno));
        return REDIS_ERR;
    }
 
    if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
            AE_ERR)
    {
        close(fd);
        redisLog(REDIS_WARNING,Can’t create readable event for SYNC);
        return REDIS_ERR;
    }
 
    server.repl_transfer_lastio = server.unixtime;
    server.repl_transfer_s = fd;
    server.repl_state = REDIS_REPL_CONNECTING;
    return REDIS_OK;
}

如果发现当前主从状态是REDIS_REPL_CONNECT(刚执行slaveof的时候设置的),就会去连接主服务器。当socket连接建立之后, 会注册syncWithMaster这个回调,并且设置主从状态为REDIS_REPL_CONNECTING。

发送PING命令

PING命令都很熟悉了,jedis pool中用来检测当前连接是否有效,用的就是这个命令。手工执行PING命令,Redis会返回一个PONG作为响应。

这里发送PING命令,主要也是为了检测当前和master连接是否正常,master是否能够正常处理命令。

void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
    …
    if (server.repl_state == REDIS_REPL_CONNECTING) {
        redisLog(REDIS_NOTICE,Non blocking connect for SYNC fired the event.);
        /* Delete the writable event so that the readable event remains
         * registered and we can wait for the PONG reply. */
        aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
        server.repl_state = REDIS_REPL_RECEIVE_PONG;
        /* Send the PING, don’t check for errors at all, we have the timeout
         * that will take care about this. */
        syncWrite(fd,PING\r\n,6,100);
        return;
    }
    …
}

这里当状态是REDIS_REPL_CONNECTING的时候,向master发送了PING命令,然后就等待master返回PONG的响应。

PONG响应也是在这个函数中进行处理的:

void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
    …
    /* Receive the PONG command. */
    if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
        char buf[1024];
 
        /* Delete the readable event, we no longer need it now that there is
         * the PING reply to read. */
        aeDeleteFileEvent(server.el,fd,AE_READABLE);
 
        /* Read the reply with explicit timeout. */
        buf[0] = \0;
        if (syncReadLine(fd,buf,sizeof(buf),
            server.repl_syncio_timeout*1000) == -1)
        {
            redisLog(REDIS_WARNING,
                I/O error reading PING reply from master: %s,
                strerror(errno));
            goto error;
        }
 
        /* We accept only two replies as valid, a positive +PONG reply
         * (we just check for “+”) or an authentication error.
         * Note that older versions of Redis replied with “operation not
         * permitted” instead of using a proper error code, so we test
         * both. */
        if (buf[0] != + &&
            strncmp(buf,-NOAUTH,7) != 0 &&
            strncmp(buf,-ERR operation not permitted,28) != 0)
        {
            redisLog(REDIS_WARNING,Error reply to PING from master: ‘%s,buf);
            goto error;
        } else {
            redisLog(REDIS_NOTICE,
                Master replied to PING, replication can continue…);
        }
    }
 
    /* AUTH with the master if required. */
    if(server.masterauth{
        err = sendSynchronousCommand(fd,AUTH,server.masterauth,NULL);
        if (err[0] == -{
            redisLog(REDIS_WARNING,Unable to AUTH to MASTER: %s,err);
            sdsfree(err);
            goto error;
        }
        sdsfree(err);
    }
 
    /* Set the slave port, so that Master’s INFO command can list the
     * slave listening port correctly. */
    {
        sds port = sdsfromlonglong(server.port);
        err = sendSynchronousCommand(fd,REPLCONF,listening-port,port,
                                         NULL);
        sdsfree(port);
        /* Ignore the error if any, not all the Redis versions support
         * REPLCONF listening-port. */
        if (err[0] == -{
            redisLog(REDIS_NOTICE,(Non critical) Master does not understand REPLCONF listening-port: %s, err);
        }
        sdsfree(err);
    }
    …
    error:
        close(fd);
        server.repl_transfer_s = -1;
        server.repl_state = REDIS_REPL_CONNECT;
        return;
}
  • 如果读取master返回值失败,直接跳转到error,关闭连接,重新将连接状态设置为REDIS_REPL_CONNECT(也就是SLAVEOF执行完成之后的状态), 等待下次定时器重连;
  • 读取响应成功,判断响应值是否为PONG,如果为PONG则表示连接检测完成,将发送当前slave端口信息,用于master同步数据
  • 如果判断是需要认证,切设置了masterauth,则发送AUTH命令,向master发起授权。
    • 如果授权成功,将继续后续的同步流程
    • 如果授权失败,则进入error流程,关闭连接,并等待下次重试

发送端口信息

前面的PONG响应流程里面已经提到了,当正确接收到了PONG响应,或者是完成了认证之后,slave会发起一个REPLCONF命令,将自己的端口发送给master。 master接受到这个命令之后,将slave的端口信息记录到这个slave对应的client对象的slave_listening_port属性中。

void replconfCommand(redisClient *c) {
    …
    if (!strcasecmp(c->argv[j]->ptr,listening-port)) {
        long port;
 
        if ((getLongFromObjectOrReply(c,c->argv[j+1],
                &port,NULL) != REDIS_OK))
            return;
        c->slave_listening_port = port;
    }
    …
}

这时,在master上通过INFO命令,就可以看见slave的端口信息:

INFO replication

同步

还是在syncWithMaster函数中。发送完端口信息之后,slave会尝试进行增量同步:

void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
    …
    psync_result = slaveTryPartialResynchronization(fd);
    if (psync_result == PSYNC_CONTINUE) {
        redisLog(REDIS_NOTICE, MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.);
        return;
    }
 
    /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
     * and the server.repl_master_runid and repl_master_initial_offset are
     * already populated. */
    if (psync_result == PSYNC_NOT_SUPPORTED) {
        redisLog(REDIS_NOTICE,Retrying with SYNC…);
        if (syncWrite(fd,SYNC\r\n,6,server.repl_syncio_timeout*1000) == -1{
            redisLog(REDIS_WARNING,I/O error writing to MASTER: %s,
                strerror(errno));
            goto error;
        }
    }
    …

如果不支持增量同步,会向master发送SYNC命令做全量同步。增量同步是在Redis2.8中支持的,所以全量同步就不管了。大致的操作流程就是 master做一次BGSAVE,然后将保存的rdb文件通过TCP连接发送给slave,slave加载这个rdb文件。

这里着重了解增量同步:

#define PSYNC_CONTINUE 0
#define PSYNC_FULLRESYNC 1
#define PSYNC_NOT_SUPPORTED 2
int slaveTryPartialResynchronization(int fd) {
    char *psync_runid;
    char psync_offset[32];
    sds reply;
 
    /* Initially set repl_master_initial_offset to -1 to mark the current
     * master run_id and offset as not valid. Later if we’ll be able to do
     * a FULL resync using the PSYNC command we’ll set the offset at the
     * right value, so that this information will be propagated to the
     * client structure representing the master into server.master. */
    server.repl_master_initial_offset = -1;
 
    if (server.cached_master{
        psync_runid = server.cached_master->replrunid;
        snprintf(psync_offset,sizeof(psync_offset),%lld, server.cached_master->reploff+1);
        redisLog(REDIS_NOTICE,Trying a partial resynchronization (request %s:%s)., psync_runid, psync_offset);
    } else {
        redisLog(REDIS_NOTICE,Partial resynchronization not possible (no cached master));
        psync_runid = ?;
        memcpy(psync_offset,-1,3);
    }
 
    /* Issue the PSYNC command */
    reply = sendSynchronousCommand(fd,PSYNC,psync_runid,psync_offset,NULL);
 
    if (!strncmp(reply,+FULLRESYNC,11)) {
        char *runid = NULL, *offset = NULL;
 
        /* FULL RESYNC, parse the reply in order to extract the run id
         * and the replication offset. */
        runid = strchr(reply, );
        if (runid) {
            runid++;
            offset = strchr(runid, );
            if (offset) offset++;
        }
        if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
            redisLog(REDIS_WARNING,
                Master replied with wrong +FULLRESYNC syntax.);
            /* This is an unexpected condition, actually the +FULLRESYNC
             * reply means that the master supports PSYNC, but the reply
             * format seems wrong. To stay safe we blank the master
             * runid to make sure next PSYNCs will fail. */
            memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
        } else {
            memcpy(server.repl_master_runid, runid, offset-runid-1);
            server.repl_master_runid[REDIS_RUN_ID_SIZE] = \0;
            server.repl_master_initial_offset = strtoll(offset,NULL,10);
            redisLog(REDIS_NOTICE,Full resync from master: %s:%lld,
                server.repl_master_runid,
                server.repl_master_initial_offset);
        }
        /* We are going to full resync, discard the cached master structure. */
        replicationDiscardCachedMaster();
        sdsfree(reply);
        return PSYNC_FULLRESYNC;
    }
 
    if (!strncmp(reply,+CONTINUE,9)) {
        /* Partial resync was accepted, set the replication state accordingly */
        redisLog(REDIS_NOTICE,
            Successful partial resynchronization with master.);
        sdsfree(reply);
        replicationResurrectCachedMaster(fd);
        return PSYNC_CONTINUE;
    }
 
    /* If we reach this point we receied either an error since the master does
     * not understand PSYNC, or an unexpected reply from the master.
     * Return PSYNC_NOT_SUPPORTED to the caller in both cases. */
 
    if (strncmp(reply,-ERR,4)) {
        /* If it’s not an error, log the unexpected event. */
        redisLog(REDIS_WARNING,
            Unexpected reply to PSYNC from master: %s, reply);
    } else {
        redisLog(REDIS_NOTICE,
            Master does not support PSYNC or is in 
            error state (reply: %s), reply);
    }
    sdsfree(reply);
    replicationDiscardCachedMaster();
    return PSYNC_NOT_SUPPORTED;
}
  1. 首先设置同步偏移量为-1,表示第一次增量更新(其实也就是个全量更新)
  2. 向master发送PSYNC命令,告知master自己的id和同步偏移量
  3. master返回全量更新(FULLRESYNC),保存master返回的偏移量和运行id,清除之前缓存的master信息 确认可以增量同步后,由于第一次是全量同步,因此操作和原全量同步相同:
    void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
     …
     /* Prepare a suitable temp file for bulk transfer */
     while(maxtries–) {
         snprintf(tmpfile,256,
             temp-%d.%ld.rdb,(int)server.unixtime,(long int)getpid());
         dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
         if (dfd != -1break;
         sleep(1);
     }
     if (dfd == -1{
         redisLog(REDIS_WARNING,Opening the temp file needed for MASTER <-> SLAVE synchronization: %s,strerror(errno));
         goto error;
     }
     
     /* Setup the non blocking download of the bulk file. */
     if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
             == AE_ERR)
     {
         redisLog(REDIS_WARNING,
             Can’t create readable event for SYNC: %s (fd=%d),
             strerror(errno),fd);
         goto error;
     }
     
     server.repl_state = REDIS_REPL_TRANSFER;
     server.repl_transfer_size = -1;
     server.repl_transfer_read = 0;
     server.repl_transfer_last_fsync_off = 0;
     server.repl_transfer_fd = dfd;
     server.repl_transfer_lastio = server.unixtime;
     server.repl_transfer_tmpfile = zstrdup(tmpfile);
     return;
    }
  4. 创建一个临时文件,用于保存master传回的rdb文件
  5. 开始读取master传输回来的rdb文件,注册readSyncBulkPayload回调函数来处理
  6. 设置当前的状态为REDIS_REPL_TRANSFER,并保存传输文件等中间内容

readSyncBulkPayload函数用于接收master传输的rdb文件,并加载到Redis中,大致流程:

  1. 读取文件长度
  2. 读取文件内容,并保存到本地rdb临时文件中
  3. 读取完成之后,清空Redis数据库
  4. 加载rdb文件
  5. 创建一个master -> slave的通道,将当前slave作为master的client,以继续执行master同步过来的命令
  6. 将同步状态改成REDIS_REPL_CONNECTED,并回写同步偏移量等
  7. 开启aof如果需要(server.aof_state != REDIS_AOF_OFF)

master对PSYNC命令的处理

void syncCommand(redisClient *c) {
    /* ignore SYNC if already slave or in monitor mode */
    if (c->flags & REDIS_SLAVE) return;
 
    /* Refuse SYNC requests if we are a slave but the link with our master
     * is not ok… */
    if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) {
        addReplyError(c,Can’t SYNC while not connected with my master);
        return;
    }
 
    /* SYNC can’t be issued when the server has pending data to send to
     * the client about already issued commands. We need a fresh reply
     * buffer registering the differences between the BGSAVE and the current
     * dataset, so that we can copy to other slaves if needed. */
    if (listLength(c->reply) != 0 || c->bufpos != 0{
        addReplyError(c,SYNC and PSYNC are invalid with pending output);
        return;
    }
 
    redisLog(REDIS_NOTICE,Slave asks for synchronization);
 
    /* Try a partial resynchronization if this is a PSYNC command.
     * If it fails, we continue with usual full resynchronization, however
     * when this happens masterTryPartialResynchronization() already
     * replied with:
     *
     * +FULLRESYNC <runid> <offset>
     *
     * So the slave knows the new runid and offset to try a PSYNC later
     * if the connection with the master is lost. */
    if (!strcasecmp(c->argv[0]->ptr,psync)) {
        if (masterTryPartialResynchronization(c) == REDIS_OK) {
            server.stat_sync_partial_ok++;
            return/* No full resync needed, return. */
        } else {
            char *master_runid = c->argv[1]->ptr;
 
            /* Increment stats for failed PSYNCs, but only if the
             * runid is not “?”, as this is used by slaves to force a full
             * resync on purpose when they are not albe to partially
             * resync. */
            if (master_runid[0] != ?) server.stat_sync_partial_err++;
        }
    } else {
        /* If a slave uses SYNC, we are dealing with an old implementation
         * of the replication protocol (like redis-cli –slave). Flag the client
         * so that we don’t expect to receive REPLCONF ACK feedbacks. */
        c->flags |= REDIS_PRE_PSYNC;
    }
 
    /* Full resynchronization. */
    server.stat_sync_full++;
 
    /* Here we need to check if there is a background saving operation
     * in progress, or if it is required to start one */
    if (server.rdb_child_pid != -1{
        /* Ok a background save is in progress. Let’s check if it is a good
         * one for replication, i.e. if there is another slave that is
         * registering differences since the server forked to save */
        redisClient *slave;
        listNode *ln;
        listIter li;
 
        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            slave = ln->value;
            if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
        }
        if (ln) {
            /* Perfect, the server is already registering differences for
             * another slave. Set the right state, and copy the buffer. */
            copyClientOutputBuffer(c,slave);
            c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
            redisLog(REDIS_NOTICE,Waiting for end of BGSAVE for SYNC);
        } else {
            /* No way, we need to wait for the next BGSAVE in order to
             * register differences */
            c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
            redisLog(REDIS_NOTICE,Waiting for next BGSAVE for SYNC);
        }
    } else {
        /* Ok we don’t have a BGSAVE in progress, let’s start one */
        redisLog(REDIS_NOTICE,Starting BGSAVE for SYNC);
        if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
            redisLog(REDIS_NOTICE,Replication failed, can’t BGSAVE);
            addReplyError(c,Unable to perform background save);
            return;
        }
        c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
        /* Flush the script cache for the new slave. */
        replicationScriptCacheFlush();
    }
 
    if (server.repl_disable_tcp_nodelay)
        anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
    c->repldbfd = -1;
    c->flags |= REDIS_SLAVE;
    server.slaveseldb = -1/* Force to re-emit the SELECT command. */
    listAddNodeTail(server.slaves,c);
    if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
        createReplicationBacklog();
    return;
}
  1. 首先判断自己是slave的时候不能执行psync
  2. 判断是否需要全量同步,如果不需要,直接退出
  3. 如果需要全量同步,创建一个rdb文件
    • 如果已经在写rdb文件,尽量复用当前的文件
    • 如果没有,则发起一个bgsave

判断是否需要全量同步:

int masterTryPartialResynchronization(redisClient *c) {
    long long psync_offset, psync_len;
    char *master_runid = c->argv[1]->ptr;
    char buf[128];
    int buflen;
 
    /* Is the runid of this master the same advertised by the wannabe slave
     * via PSYNC? If runid changed this master is a different instance and
     * there is no way to continue. */
    if (strcasecmp(master_runid, server.runid)) {
        /* Run id “?” is used by slaves that want to force a full resync. */
        if (master_runid[0] != ?{
            redisLog(REDIS_NOTICE,Partial resynchronization not accepted: 
                Runid mismatch (Client asked for ‘%s‘, I’m ‘%s‘),
                master_runid, server.runid);
        } else {
            redisLog(REDIS_NOTICE,Full resync requested by slave.);
        }
        goto need_full_resync;
    }
 
    /* We still have the data our slave is asking for? */
    if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
       REDIS_OK) goto need_full_resync;
    if (!server.repl_backlog ||
        psync_offset < server.repl_backlog_off ||
        psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
    {
        redisLog(REDIS_NOTICE,
            Unable to partial resync with the slave for lack of backlog (Slave request was: %lld)., psync_offset);
        if (psync_offset > server.master_repl_offset{
            redisLog(REDIS_WARNING,
                Warning: slave tried to PSYNC with an offset that is greater than the master replication offset.);
        }
        goto need_full_resync;
    }
 
    /* If we reached this point, we are able to perform a partial resync:
     * 1) Set client state to make it a slave.
     * 2) Inform the client we can continue with +CONTINUE
     * 3) Send the backlog data (from the offset to the end) to the slave. */
    c->flags |= REDIS_SLAVE;
    c->replstate = REDIS_REPL_ONLINE;
    c->repl_ack_time = server.unixtime;
    listAddNodeTail(server.slaves,c);
    /* We can’t use the connection buffers since they are used to accumulate
     * new commands at this stage. But we are sure the socket send buffer is
     * emtpy so this write will never fail actually. */
    buflen = snprintf(buf,sizeof(buf),+CONTINUE\r\n);
    if (write(c->fd,buf,buflen) != buflen) {
        freeClientAsync(c);
        return REDIS_OK;
    }
    psync_len = addReplyReplicationBacklog(c,psync_offset);
    redisLog(REDIS_NOTICE,
        Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld., psync_len, psync_offset);
    /* Note that we don’t need to set the selected DB at server.slaveseldb
     * to -1 to force the master to emit SELECT, since the slave already
     * has this state from the previous connection with the master. */
 
    refreshGoodSlavesCount();
    return REDIS_OK; /* The caller can return, no full resync needed. */
 
need_full_resync:
    /* We need a full resync for some reason… notify the client. */
    psync_offset = server.master_repl_offset;
    /* Add 1 to psync_offset if it the replication backlog does not exists
     * as when it will be created later we’ll increment the offset by one. */
    if (server.repl_backlog == NULL) psync_offset++;
    /* Again, we can’t use the connection buffers (see above). */
    buflen = snprintf(buf,sizeof(buf),+FULLRESYNC %s %lld\r\n,
                      server.runid,psync_offset);
    if (write(c->fd,buf,buflen) != buflen) {
        freeClientAsync(c);
        return REDIS_OK;
    }
    return REDIS_ERR;
}

主要场景有两个:

  1. 当前请求的id和server的id不匹配
  2. 当前Redis保存的日志无法满足slave要求的偏移量
    • master还没有back log
    • master back log长度不够

同时,每次rdb文件保存完毕的时候,都会调用updateSlavesWaitingBgsave函数,处理保存的rdb文件。

void updateSlavesWaitingBgsave(int bgsaveerr) {
    listNode *ln;
    int startbgsave = 0;
    listIter li;
 
    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        redisClient *slave = ln->value;
 
        if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
            startbgsave = 1;
            slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
        } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
            // rdb文件写入完毕 
            struct redis_stat buf;
 
            if (bgsaveerr != REDIS_OK) {
                freeClient(slave);
                redisLog(REDIS_WARNING,SYNC failed. BGSAVE child returned an error);
                continue;
            }
            // 打开刚写入的rdb文件 
            if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
                redis_fstat(slave->repldbfd,&buf) == -1{
                freeClient(slave);
                redisLog(REDIS_WARNING,SYNC failed. Can’t open/stat DB after BGSAVE: %s, strerror(errno));
                continue;
            }
            slave->repldboff = 0;
            slave->repldbsize = buf.st_size;
            slave->replstate = REDIS_REPL_SEND_BULK;
            aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
            // 开始发送rdb文件 
            if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
                freeClient(slave);
                continue;
            }
        }
    }
    if (startbgsave) {
        /* Since we are starting a new background save for one or more slaves,
         * we flush the Replication Script Cache to use EVAL to propagate every
         * new EVALSHA for the first time, since all the new slaves don’t know
         * about previous scripts. */
        replicationScriptCacheFlush();
        if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
            listIter li;
 
            listRewind(server.slaves,&li);
            redisLog(REDIS_WARNING,SYNC failed. BGSAVE failed);
            while((ln = listNext(&li))) {
                redisClient *slave = ln->value;
 
                if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
                    freeClient(slave);
            }
        }
    }
}

命令传播

上面的流程结束,slave已经包含了master BGSAVE时所包含的所有数据。后续就需要master一直将自己的命令发送给slave。

void call(redisClient *c, int flags) {
    …
    /* Propagate the command into the AOF and replication link */
    if (flags & REDIS_CALL_PROPAGATE) {
        int flags = REDIS_PROPAGATE_NONE;
 
        if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;
        if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;
        if (dirty)
            flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
        if (flags != REDIS_PROPAGATE_NONE)
            propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
    }
    …
}
 
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);
    if (flags & REDIS_PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

在调用任何命令的时候,都会将命令分发到slave上去(除了AOF加载或者命令加了REDIS_CMD_SKIP_MONITOR标签)。

replicationFeedSlaves函数主要作用有两个:

  1. 将命令发送给所有在线的slave
  2. 将命令写入到back log中,方便后续增量同步
    void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
     listNode *ln;
     listIter li;
     int j, len;
     char llstr[REDIS_LONGSTR_SIZE];
     
     /* If there aren’t slaves, and there is no backlog buffer to populate,
      * we can return ASAP. */
     if (server.repl_backlog == NULL && listLength(slaves) == 0return;
     
     /* We can’t have slaves attached and no backlog. */
     redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
     
     /* Send SELECT command to every slave if needed. */
     if (server.slaveseldb != dictid) {
         robj *selectcmd;
     
         /* For a few DBs we have pre-computed SELECT command. */
         // 每次都增加一个SELECT命令,防止弄错db 
         if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
             selectcmd = shared.select[dictid];
         } else {
             int dictid_len;
     
             dictid_len = ll2string(llstr,sizeof(llstr),dictid);
             selectcmd = createObject(REDIS_STRING,
                 sdscatprintf(sdsempty(),
                 *2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n,
                 dictid_len, llstr));
         }
     
         /* Add the SELECT command into the backlog. */
         // 将select命令写入到backlog中 
         if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
     
         /* Send it to slaves. */
         // 将select命令发送给slave 
         listRewind(slaves,&li);
         while((ln = listNext(&li))) {
             redisClient *slave = ln->value;
             addReply(slave,selectcmd);
         }
     
         if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)
             decrRefCount(selectcmd);
     }
     server.slaveseldb = dictid;
     
     /* Write the command to the replication backlog if any. */
     // 将命令写入到backlog中 
     if (server.repl_backlog{
         char aux[REDIS_LONGSTR_SIZE+3];
     
         /* Add the multi bulk reply length. */
         aux[0] = *;
         len = ll2string(aux+1,sizeof(aux)-1,argc);
         aux[len+1] = \r;
         aux[len+2] = \n;
         feedReplicationBacklog(aux,len+3);
     
         for (j = 0; j < argc; j++) {
             long objlen = stringObjectLen(argv[j]);
     
             /* We need to feed the buffer with the object as a bulk reply
              * not just as a plain string, so create the $..CRLF payload len
              * ad add the final CRLF */
             aux[0] = $;
             len = ll2string(aux+1,sizeof(aux)-1,objlen);
             aux[len+1] = \r;
             aux[len+2] = \n;
             feedReplicationBacklog(aux,len+3);
             feedReplicationBacklogWithObject(argv[j]);
             feedReplicationBacklog(aux+len+1,2);
         }
     }
     
     /* Write the command to every slave. */
     // 将命令发送到所有的slave 
     listRewind(slaves,&li);
     while((ln = listNext(&li))) {
         redisClient *slave = ln->value;
     
         /* Don’t feed slaves that are still waiting for BGSAVE to start */
         if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
     
         /* Feed slaves that are waiting for the initial SYNC (so these commands
          * are queued in the output buffer until the initial SYNC completes),
          * or are already in sync with the master. */
     
         /* Add the multi bulk length. */
         addReplyMultiBulkLen(slave,argc);
     
         /* Finally any additional argument that was not stored inside the
          * static buffer if any (from j to argc). */
         for (j = 0; j < argc; j++)
             addReplyBulk(slave,argv[j]);
     }
    }

    注:backlog大小可以设置,默认的大小为1M,如果超过,覆盖最初的日志

    #define REDIS_DEFAULT_REPL_BACKLOG_SIZE (1024*1024)    /* 1mb */

心跳检测和命令丢失补偿

在命令传播阶段,slave每秒一次向master发送REPLCONF命令,发送当前的offset,让master检测是否有命令丢失。 这个也是在定时器中发送的。

void replicationCron(void) {
    …
    if (server.masterhost && server.master &&
        !(server.master->flags & REDIS_PRE_PSYNC))
        replicationSendAck();
    …
}
 
void replicationSendAck(void) {
    redisClient *c = server.master;
 
    if (c != NULL{
        c->flags |= REDIS_MASTER_FORCE_REPLY;
        addReplyMultiBulkLen(c,3);
        addReplyBulkCString(c,REPLCONF);
        addReplyBulkCString(c,ACK);
        addReplyBulkLongLong(c,c->reploff);
        c->flags &= ~REDIS_MASTER_FORCE_REPLY;
    }
}

同时,master在接收到这个ACK包的时候,会记录slave的ack offset和ack时间:

void replconfCommand(redisClient *c) {
    …
    else if (!strcasecmp(c->argv[j]->ptr,ack)) {
        /* REPLCONF ACK is used by slave to inform the master the amount
         * of replication stream that it processed so far. It is an
         * internal only command that normal clients should never use. */
        long long offset;
 
        if (!(c->flags & REDIS_SLAVE)) return;
        if ((getLongLongFromObject(c->argv[j+1], &offset) != REDIS_OK))
            return;
        if (offset > c->repl_ack_off)
            c->repl_ack_off = offset;
        c->repl_ack_time = server.unixtime;
        /* Note: this command does not reply anything! */
        return;
    }
    …
}

还是在定时器中,每次调用的时候都会清理已经超时的slave:

void replicationCron(void) {
    …
    /* Disconnect timedout slaves. */
    if (listLength(server.slaves)) {
        listIter li;
        listNode *ln;
 
        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            redisClient *slave = ln->value;
 
            if (slave->replstate != REDIS_REPL_ONLINE) continue;
            if (slave->flags & REDIS_PRE_PSYNC) continue;
            if ((server.unixtime – slave->repl_ack_time) > server.repl_timeout)
            {
                char ip[REDIS_IP_STR_LEN];
                int port;
 
                if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) != -1{
                    redisLog(REDIS_WARNING,
                        Disconnecting timedout slave: %s:%d,
                        ip, slave->slave_listening_port);
                }
                freeClient(slave);
            }
        }
    }
    …
}

这里的repl_ack_time由slave每次发送的ack包写入,server.repl_timeout默认值是60s:

#define REDIS_REPL_TIMEOUT 60

增量同步

master断开了slave连接之后,slave为了能够进行增量同步,freeClient的实现,针对master的slave client,也有不同的处理:

void freeClient(redisClient *c) {
    …
    /* If it is our master that’s beging disconnected we should make sure
     * to cache the state to try a partial resynchronization later.
     *
     * Note that before doing this we make sure that the client is not in
     * some unexpected state, by checking its flags. */
    if (server.master && c->flags & REDIS_MASTER) {
        redisLog(REDIS_WARNING,Connection with master lost.);
        if (!(c->flags & (REDIS_CLOSE_AFTER_REPLY|
                          REDIS_CLOSE_ASAP|
                          REDIS_BLOCKED|
                          REDIS_UNBLOCKED)))
        {
            replicationCacheMaster(c);
            return;
        }
    }
    …
}
 
void replicationCacheMaster(redisClient *c) {
    listNode *ln;
 
    redisAssert(server.master != NULL && server.cached_master == NULL);
    redisLog(REDIS_NOTICE,Caching the disconnected master state.);
 
    /* Remove from the list of clients, we don’t want this client to be
     * listed by CLIENT LIST or processed in any way by batch operations. */
     // 首先将slave从client列表中删除 
    ln = listSearchKey(server.clients,c);
    redisAssert(ln != NULL);
    listDelNode(server.clients,ln);
 
    /* Save the master. Server.master will be set to null later by
     * replicationHandleMasterDisconnection(). */
     // 把slave的master保存到cached_master中 
    server.cached_master = server.master;
 
    /* Remove the event handlers and close the socket. We’ll later reuse
     * the socket of the new connection with the master during PSYNC. */
     // 清理slave连接,释放资源 
    aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
    aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
    close(c->fd);
 
    /* Set fd to -1 so that we can safely call freeClient(c) later. */
    c->fd = -1;
 
    /* Invalidate the Peer ID cache. */
    if (c->peerid) {
        sdsfree(c->peerid);
        c->peerid = NULL;
    }
·
    /* Caching the master happens instead of the actual freeClient() call,
     * so make sure to adjust the replication state. This function will
     * also set server.master to NULL. */
    replicationHandleMasterDisconnection();
}
void replicationHandleMasterDisconnection(void) {
    server.master = NULL;
    server.repl_state = REDIS_REPL_CONNECT;
    server.repl_down_since = server.unixtime;
    /* We lost connection with our master, force our slaves to resync
     * with us as well to load the new data set.
     *
     * If server.masterhost is NULL the user called SLAVEOF NO ONE so
     * slave resync is not needed. */
    if (server.masterhost != NULL) disconnectSlaves();
}

经过这些处理,一个断开连接的slave,复制状态变成了REDIS_REPL_CONNECT。按照之前的流程,定时器会去尝试连接master, 发送PING命令,然后再发送PSYNC命令的时候,由于已经有了cached_master,会在PSYNC命令中带上之前master的id和偏移量。 相关slave和master的处理逻辑,前面代码中已经有了。

[redis设计与实现][6]基本数据结构——压缩列表

压缩列表(ziplist)是列表键和哈希键的底层实现之一。
压缩列表结构:

属性 类型 长度 用途
zlbytes uint32_t 4字节 记录整个压缩列表占用的内存字节数:在对压缩列表进行内存重分配,或者计算zlend的位置时使用
zltail uint32_t 4字节 记录压缩列表尾节点记录压缩列表的起始地址便宜:用于快速定位尾节点
zllen uint16_t 2字节 记录压缩列表包含的节点数量:当这个值小于UINT64_MAX(65535)时,这个属性的值就是压缩列表包含的节点数量;当这个值等于UINT64_MAX时,节点的真实数量需要遍历整个压缩列表才能计算得出
entryX 列表节点 不定 压缩列表包含的各个节点,节点长度由节点保存的内容决定
zlend uint8_t 1字节 特殊值0xFF,用于标记压缩列表的末端

压缩列表节点结构:

  • previous_entry_length:以字节为单位,记录压缩列表中前一个节点的长度。previous_entry_length属性的长度可以是1字节或者5字节:
    • 如果前一节点的长度小于254字节,那么previous_entry_length属性的长度为1字节
    • 日过前一节点的长度大于等于254字节,那么previous_entry_length属性的长度为5字节:其中第一字解会被设置为0xFE(254),而之后的四个字节用于保存前一节点的长度
  • encoding:节点的encoding属性记录了节点content属性薄脆数据的类型以及长度:
    • 一字节、两字节或者五字节长,值的最高位为00、01或者10的是字节数组编码:这种编码便是节点的content属性保存着字节数组,数组的长度由编码除去最高两位之后的其他位记录;
    • 一字节长,值的最高位以11开头的是整数编码:这种编码表示节点的content属性保存着整数值,整数值的类型和长度由编码去除最高两位之后的其他位记录;
  • content

字节数组编码:

编码 编码长度 content属性保存的值
00bbbbbb 1字节 长度小于等于63字节的字节数组
01bbbbbb xxxxxxxx 2字节 长度小于等于16383字节的字节数组
10______ aaaaaaaa bbbbbbbb cccccccc dddddddd 5字节 长度小于等于4294967295的字节数组

整数编码:

编码 编码长度 content属性保存的值
11000000 1字节 int16_t类型的整数
11010000 1字节 int32_t类型的整数
11100000 1字节 int64_t类型的整数
11110000 1字节 24位有符号整数
11111110 1字节 8位有符号整数
1111xxxx 1字节 无contennt属性,xxxx保存了0到12之前的值

连锁更新:
多个连续、长度结余250到253字节之间的节点,如果前面有插入或删除节点,导致previous_entry_length属性需要扩张成5字节,可能导致这些连续的节点都需要更新。最坏情况可能会对整个压缩列表重新执行N次空间重新分配。

unsigned char ziplistNew(void);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
unsigned char *ziplistNew(void) {
//多一个表示结尾的字节
    unsigned int bytes = ZIPLIST_HEADER_SIZE+1;
    unsigned char *zl = zmalloc(bytes);
//#define ZIPLIST_BYTES(zl)       (</em>((uint32_t<em>)(zl)))
    ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
//#define ZIPLIST_TAIL_OFFSET(zl) (</em>((uint32_t<em>)((zl)+sizeof(uint32_t))))
    ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
//#define ZIPLIST_LENGTH(zl)      (</em>((uint16_t<em>)((zl)+sizeof(uint32_t)</em>2)))
    ZIPLIST_LENGTH(zl) = 0;
//#define ZIP_END 255
    zl[bytes-1] = ZIP_END;
    return zl;
}

unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where) {
    unsigned char *p;
//#define ZIPLIST_ENTRY_HEAD(zl)  ((zl)+ZIPLIST_HEADER_SIZE)
//#define ZIPLIST_ENTRY_END(zl)   ((zl)+intrev32ifbe(ZIPLIST_BYTES(zl))-1)
    p = (where == ZIPLIST_HEAD) ? ZIPLIST_ENTRY_HEAD(zl) : ZIPLIST_ENTRY_END(zl);
    return __ziplistInsert(zl,p,s,slen);
}
static unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen;
    unsigned int prevlensize, prevlen = 0;
    size_t offset;
    int nextdiff = 0;
    unsigned char encoding = 0;
    long long value = 123456789; /* initialized to avoid warning. Using a value
                                    that is easy to see if for some reason
                                    we use it uninitialized. */

    zlentry tail;

<pre><code>/* Find out prevlen for the entry that is inserted. */
if (p[0] != ZIP_END) {
</code></pre>

//获取当前节点的previous_entry_length
        ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
    } else {
        unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
        if (ptail[0] != ZIP_END) {
            prevlen = zipRawEntryLength(ptail);
        }
    }

<pre><code>/* See if the entry can be encoded */
</code></pre>

//尝试是否可以编码为整数,同时返回整数编码
    if (zipTryEncoding(s,slen,&value,&encoding)) {
        /* 'encoding' is set to the appropriate integer encoding <em>/
//是整数编码,返回content字节数
        reqlen = zipIntSize(encoding);
    } else {
        /</em> 'encoding' is untouched, however zipEncodeLength will use the
         * string length to figure out how to encode it. <em>/
//字符数组编码,长度直接是数组长度
        reqlen = slen;
    }
    /</em> We need space for both the length of the previous entry and
     * the length of the payload. */

//加上前置节点长度编码占用字节数
    reqlen += zipPrevEncodeLength(NULL,prevlen);
//加上编码占用字节数
    reqlen += zipEncodeLength(NULL,encoding,slen);

<pre><code>/* When the insert position is not equal to the tail, we need to
 * make sure that the next entry can hold this entry's length in
 * its prevlen field. */

</code></pre>

//判断插入位置的节点,实际前置节点部分字节差
//如果有差距后面需要从1字节变成5字节
    nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;

<pre><code>/* Store offset because a realloc may change the address of zl. */
offset = p-zl;
</code></pre>

//重新分配压缩列表内存,重新计算当前插入节点位置
    zl = ziplistResize(zl,curlen+reqlen+nextdiff);
    p = zl+offset;

<pre><code>/* Apply memory move when necessary and update tail offset. */
</code></pre>

//从头插入,后面的节点内存位置向后移动
    if (p[0] != ZIP_END) {
        /* Subtract one because of the ZIP_END bytes */
//向后移动,目标位置当前节点以后,起始位置当前位置减去前置节点便宜,长度为所有原节点长度减去尾部标示符
        memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);

<pre><code>    /* Encode this entry's raw length in the next entry. */
</code></pre>

//设置后面节点的前置节点长度
        zipPrevEncodeLength(p+reqlen,reqlen);

<pre><code>    /* Update offset for tail */
</code></pre>

//更新尾部节点偏移量
        ZIPLIST_TAIL_OFFSET(zl) =
            intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);

<pre><code>    /* When the tail contains more than one entry, we need to take
     * &quot;nextdiff&quot; in account as well. Otherwise, a change in the
     * size of prevlen doesn't have an effect on the *tail* offset. */

    tail = zipEntry(p+reqlen);
</code></pre>

//尾部标示符不对,说明后一个节点的前置节点便宜有扩大(nextdiff > 0),以新的为准
        if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
        }
    } else {
//尾部插入,直接更新尾部标记
        /* This element will be the new tail. */
        ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
    }

<pre><code>/* When nextdiff != 0, the raw length of the next entry has changed, so
 * we need to cascade the update throughout the ziplist */

if (nextdiff != 0) {
    offset = p-zl;
</code></pre>

//传说中的连锁更新
        zl = __ziplistCascadeUpdate(zl,p+reqlen);
        p = zl+offset;
    }

<pre><code>/* Write the entry */
</code></pre>

//写入当前节点
//前置节点偏移
    p += zipPrevEncodeLength(p,prevlen);
//编码
    p += zipEncodeLength(p,encoding,slen);
//#define ZIP_IS_STR(enc) (((enc) & ZIP_STR_MASK) < ZIP_STR_MASK)
    if (ZIP_IS_STR(encoding)) {
//是字符串编码,直接复制过去
        memcpy(p,s,slen);
    } else {
//是整数编码,写入整数
        zipSaveInteger(p,value,encoding);
    }
//增加压缩列表节点数
    ZIPLIST_INCR_LENGTH(zl,1);
    return zl;
}

static int zipTryEncoding(unsigned char *entry, unsigned int entrylen, long long *v, unsigned char *encoding) {
    long long value;

<pre><code>if (entrylen &gt;= 32 || entrylen == 0) return 0;
</code></pre>

//尝试将字符串转换成整数
    if (string2ll((char<em>)entry,entrylen,&value)) {
        /</em> Great, the string can be encoded. Check what's the smallest
         * of our encoding types that can hold this value. */
        if (value >= 0 && value <= 12) {
            *encoding = ZIP_INT_IMM_MIN+value;
        } else if (value >= INT8_MIN && value <= INT8_MAX) {
            *encoding = ZIP_INT_8B;
        } else if (value >= INT16_MIN && value <= INT16_MAX) {
            *encoding = ZIP_INT_16B;
        } else if (value >= INT24_MIN && value <= INT24_MAX) {
            *encoding = ZIP_INT_24B;
        } else if (value >= INT32_MIN && value <= INT32_MAX) {
            *encoding = ZIP_INT_32B;
        } else {
            *encoding = ZIP_INT_64B;
        }
        *v = value;
        return 1;
    }
    return 0;
}
//联锁(级联)更新
static unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p) {
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), rawlen, rawlensize;
    size_t offset, noffset, extra;
    unsigned char *np;
    zlentry cur, next;

<pre><code>while (p[0] != ZIP_END) {
    cur = zipEntry(p);
    rawlen = cur.headersize + cur.len;
    rawlensize = zipPrevEncodeLength(NULL,rawlen);

    /* Abort if there is no next entry. */
</code></pre>

//下一个节点是列表尾,退出
        if (p[rawlen] == ZIP_END) break;
        next = zipEntry(p+rawlen);

<pre><code>    /* Abort when &quot;prevlen&quot; has not changed. */
</code></pre>

//下一个节点前置长度节点没有变化,不需要级联更新
        if (next.prevrawlen == rawlen) break;

//下一个节点需要扩张前置节点偏移
        if (next.prevrawlensize < rawlensize) {
            /* The "prevlen" field of "next" needs more bytes to hold
             * the raw length of "cur". */
            offset = p-zl;
            extra = rawlensize-next.prevrawlensize;
//压缩列表重新分配空间,原空间加上扩充空间
            zl = ziplistResize(zl,curlen+extra);
            p = zl+offset;

<pre><code>        /* Current pointer and offset for next element. */
        np = p+rawlen;
        noffset = np-zl;

        /* Update tail offset when next element is not the tail element. */
</code></pre>

//更新尾节点偏移
            if ((zl+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) != np) {
                ZIPLIST_TAIL_OFFSET(zl) =
                    intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra);
            }

<pre><code>        /* Move the tail to the back. */
</code></pre>

//向后移动内存
            memmove(np+rawlensize,
                np+next.prevrawlensize,
                curlen-noffset-next.prevrawlensize-1);
//重新设置上一节点长度
            zipPrevEncodeLength(np,rawlen);

<pre><code>        /* Advance the cursor */
</code></pre>

//向后遍历是否需要继续调整
            p += rawlen;
            curlen += extra;
        } else {
//下一个节点是要缩短的,强制不缩短,防止过多的内存重新分配
            if (next.prevrawlensize > rawlensize) {
                /* This would result in shrinking, which we want to avoid.
                 * So, set "rawlen" in the available bytes. */
                zipPrevEncodeLengthForceLarge(p+rawlen,rawlen);
            } else {
                zipPrevEncodeLength(p+rawlen,rawlen);
            }

<pre><code>        /* Stop here, as the raw length of &quot;next&quot; has not changed. */
        break;
    }
}
return zl;
</code></pre>

}
//写入整数
static void zipSaveInteger(unsigned char <em>p, int64_t value, unsigned char encoding) {
    int16_t i16;
    int32_t i32;
    int64_t i64;
    if (encoding == ZIP_INT_8B) {
        ((int8_t</em>)p)[0] = (int8_t)value;
    } else if (encoding == ZIP_INT_16B) {
        i16 = value;
        memcpy(p,&i16,sizeof(i16));
        memrev16ifbe(p);
    } else if (encoding == ZIP_INT_24B) {
        i32 = value<<8;
        memrev32ifbe(&i32);
        memcpy(p,((uint8_t<em>)&i32)+1,sizeof(i32)-sizeof(uint8_t));
    } else if (encoding == ZIP_INT_32B) {
        i32 = value;
        memcpy(p,&i32,sizeof(i32));
        memrev32ifbe(p);
    } else if (encoding == ZIP_INT_64B) {
        i64 = value;
        memcpy(p,&i64,sizeof(i64));
        memrev64ifbe(p);
    } else if (encoding >= ZIP_INT_IMM_MIN && encoding <= ZIP_INT_IMM_MAX) {
        /</em> Nothing to do, the value is stored in the encoding itself. */
    } else {
        assert(NULL);
    }
}

[redis设计与实现][8]数据库

Redis数据库定义:

typedef struct redisDb {
    dict *dict;                 /* The keyspace for this DB */
    dict *expires;              /* Timeout of keys with a timeout set */
    dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP) */
    dict *ready_keys;           /* Blocked keys that received a PUSH */
    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
    int id;
    long long avg_ttl;          /* Average TTL, just for stats */
} redisDb;

dict

dict数组保存所有的数据库,Redis初始化的时候,默认会创建16个数据库

#define REDIS_DEFAULT_DBNUM     16

默认情况下,Redis客户端的目标数据库是0 号数据库,可以通过select命令切换。 注意,由于Redis缺少获取当前操作的数据库命令,使用select切换需要特别注意

读写数据库中的键值对的时候,Redis除了对键空间执行指定操作外,还有一些额外的操作:

  • 读取键之后(读和写操作都会先读取),记录键空间命中或不命中次数
  • 读取键之后,更新键的LRU
  • 读取时发现已经过期,会先删除过期键
  • 如果有客户端使用watch命令监视了key,会在修改后标记为dirty
  • 修改之后,会对dirty键计数器加1,用于持久化和复制
  • 如果开启了数据库通知,修改之后会发送相应通知
robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) {
    robj *o = lookupKeyRead(c->db, key);
    if (!o) addReply(c,reply);
    return o;
}
robj *lookupKeyRead(redisDb *db, robj *key) {
    robj *val;
 
    //查询是否已经过期 
    expireIfNeeded(db,key);
    val = lookupKey(db,key);
    if (val == NULL)
        server.stat_keyspace_misses++;
    else
        server.stat_keyspace_hits++;
    return val;
}
robj *lookupKey(redisDb *db, robj *key) {
    dictEntry *de = dictFind(db->dict,key->ptr);
    if (de) {
        robj *val = dictGetVal(de);
 
        /* Update the access time for the ageing algorithm.
         * Don’t do it if we have a saving child, as this will trigger
         * a copy on write madness. */
        if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
            //设置lru时间 
            val->lru = server.lruclock;
        return val;
    } else {
        return NULL;
    }
}

expires

通过exprire或者pexpire命令,可以设置键的TTL,如果键的TTL为0,会被自动删除。

expires字典保存了数据库中所有键的过期时间。

  • 过期字典的键是指向某个数据中的键对象
  • 过期字段的值是long long类型的整数,保存这个键的过期时间
    void expireCommand(redisClient *c) {
      expireGenericCommand(c,mstime(),UNIT_SECONDS);
    }
    void expireGenericCommand(redisClient *c, long long basetime, int unit) {
      robj *key = c->argv[1], *param = c->argv[2];
      long long when; /* unix time in milliseconds when the key will expire. */
     
      if (getLongLongFromObjectOrReply(c, param, &when, NULL) != REDIS_OK)
          return;
     
      if (unit == UNIT_SECONDS) when *= 1000;
      when += basetime;
     
      /* No key, return zero. */
      if (lookupKeyRead(c->db,key) == NULL{
          addReply(c,shared.czero);
          return;
      }
     
      /* EXPIRE with negative TTL, or EXPIREAT with a timestamp into the past
       * should never be executed as a DEL when load the AOF or in the context
       * of a slave instance.
       *
       * Instead we take the other branch of the IF statement setting an expire
       * (possibly in the past) and wait for an explicit DEL from the master. */
      if (when <= mstime() && !server.loading && !server.masterhost{
          robj *aux;
     
          redisAssertWithInfo(c,key,dbDelete(c->db,key));
          server.dirty++;
     
          /* Replicate/AOF this as an explicit DEL. */
          aux = createStringObject(DEL,3);
          rewriteClientCommandVector(c,2,aux,key);
          decrRefCount(aux);
          signalModifiedKey(c->db,key);
          notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,del,key,c->db->id);
          addReply(c, shared.cone);
          return;
      } else {
          //放到expires字典中 
          setExpire(c->db,key,when);
          addReply(c,shared.cone);
          signalModifiedKey(c->db,key);
          notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,expire,key,c->db->id);
          server.dirty++;
          return;
      }
    }

过期键删除策略

  • 惰性删除:每次执行命令前,都会调用expireIfNeeded函数检查是否过期,如果已经过期,改函数会删除过期键
  • 定时删除:定时执行activeExpireCycleTryExpire函数

    expireIfNeeded

    int expireIfNeeded(redisDb *db, robj *key) {
      mstime_t when = getExpire(db,key);
      mstime_t now;
     
      if (when < 0return 0/* No expire for this key */
     
      /* Don’t expire anything while loading. It will be done later. */
      if (server.loadingreturn 0;
     
      /* If we are in the context of a Lua script, we claim that time is
       * blocked to when the Lua script started. This way a key can expire
       * only the first time it is accessed and not in the middle of the
       * script execution, making propagation to slaves / AOF consistent.
       * See issue #1525 on Github for more information. */
      now = server.lua_caller ? server.lua_time_start : mstime();
     
      /* If we are running in the context of a slave, return ASAP:
       * the slave key expiration is controlled by the master that will
       * send us synthesized DEL operations for expired keys.
       *
       * Still we try to return the right information to the caller,
       * that is, 0 if we think the key should be still valid, 1 if
       * we think the key is expired at this time. */
      if (server.masterhost != NULLreturn now > when;
     
      /* Return when this key has not expired */
      if (now <= when) return 0;
     
      /* Delete the key */
      server.stat_expiredkeys++;
      propagateExpire(db,key);
      notifyKeyspaceEvent(REDIS_NOTIFY_EXPIRED,
          expired,key,db->id);
      return dbDelete(db,key);
    }

    activeExpireCycleTryExpire

    while (num–) {
      dictEntry *de;
      long long ttl;
     
      if ((de = dictGetRandomKey(db->expires)) == NULLbreak;
      ttl = dictGetSignedIntegerVal(de)-now;
      if (activeExpireCycleTryExpire(db,de,now)) expired++;
      if (ttl < 0) ttl = 0;
      ttl_sum += ttl;
      ttl_samples++;
    }

    AOF、RDB和复制功能对过期键的处理

  • 生成RDB文件时,已过期的键不会被保存到新的RDB文件中
  • 载入RDB文件:
    • 主服务器载入时,会忽略过期键
    • 从服务器载入时,都会被载入(但是很快会因为同步被覆盖)
  • AOF写入,已过期未删除的键没有影响,被删除后,会追加一条del命令
  • AOF重写,会对键进行检查,过期键不会保存到重写后的AOF文件
  • 复制:
    • 主服务器删除一个过期键后,会显式向所有从服务器发送DEL命令
    • 从服务器执行读命令,及时过期也不会删除,只有接受到主服务器DEL命令才会删除

netty4 UDP 通信

netty4 UDP 通信

netty4 UDP 通信的例子很少,官方代码里面只有https://github.com/netty/netty/tree/4.0/example/src/main/java/io/netty/example/qotm这一个例子。但是,因为需求是从原先TCP 方式修改成UDP,需要能够最大方式的利用原先的解码器、处理器,但是官方的例子里面, 客户端只有一个广播消息,服务端回复一个消息。这当然是无法满足真正的业务需求的。特别是这次的需求是将原有的TCP长连接方式发送心跳, 改成UDP方式。

复用客户端编解码器和handler

原先的解码器是为TCP 包写的,直接通过继承ReplayingDecoder,对bytebuf中的字节按照自定义的协议,转换成内部对象。 按照官方例子,基于UDP的解码器,都是直接将从DatagramPacket对象封装的UDP包中进行解码。 因此,最简单的编码复用方式,是在自己的ByteToMessage的解码器之前,放置一个MessageToMessage解码器,将DatagramPacket中的bytebuf提取出来。

public class UdpMessageToByteDecode extends MessageToMessageDecoder<DatagramPacket> {
    @Override
    protected void decode(ChannelHandlerContext ctxDatagramPacket msgList<Object> out) throws Exception {
        out.add(msg.content());
        msg.retain();
    }
}

这里特别注意两个问题:

  1. 在out中添加了msg的bytebuf之后,注意调用msg的retain方法,防止msg中的bytebuf提前释放抛出异常
  2. 这样写了之后,丢失了DatagramPacket中的sender,这样在后续的处理器中,无法直接向来源发送消息,所以这种方式基本只试用于客户端

除了解码器,编码器也需要有对应的改造。由于UDP包没有连接,没有办法在连接的时候指定一个目标地址。 所以解码器利用了channel的attribute map来让客户端能够随时切换服务端地址。

public class UdpByteToMessageEncode extends MessageToMessageEncoder<ByteBuf> {
    private static Log log = LogFactory.getLog(UdpByteToMessageEncode.class);
 
    public static final AttributeKey<InetSocketAddress> TARGET_ADDRESS = AttributeKey.valueOf(TARGET_ADDRESS);
 
    @Override
    protected void encode(ChannelHandlerContext ctxByteBuf msgList<Object> out) throws Exception {
        InetSocketAddress ip = ctx.channel().attr(TARGET_ADDRESS).get();
        if (ip == null{
            log.error(no server ip);
            return;
        }
 
        DatagramPacket packet = new DatagramPacket(msg, ip);
        msg.retain();
        out.add(packet);
    }
}

首先从原始编码器获得bytebuf,从channel的attribute中获取目标服务器地址,然后组装成最终发送的DatagramPacket。 这里要注意的是:

  1. 目标地址为了能在整个连接上下文中共享,需要保存在channel中,而不是context中
  2. 发送的bytebuf仍然需要调用retain来防止提前释放抛出异常

服务端数据应答

之前在客户端的编码解码器之前(后)加上简单封装,就可以正常的首发UDP包。因为客户端的业务逻辑比较简单,UDP包只包含了客户端心跳, 无需对服务端的应答包进行处理。而服务器端,则需要对应答包做出业务逻辑处理(这里大致处理是将心跳信息写入Redis中),并发送响应包。 因此,会发现一个严重的问题,原先处理TCP的流程,缺少了一个重要的信息,就是客户端地址。

为了解决这个问题,只能新建一个新的编码、解码器,并且继承原先的业务对象(请求、应答包)增加来源(目标)地址。但是由于UDP包没有TCP 包 粘连的问题,因此只需要继承MessageToMessageDecoder即可,不需要使用类似ReplayingDecoder这样复杂的解码器上层实现。

public class UdpRequestDecode extends MessageToMessageDecoder<DatagramPacket> {
    private static final Log log = LogFactory.getLog(UdpRequestDecode.class);
 
    @Override
    protected void decode(ChannelHandlerContext ctxDatagramPacket packetList<Object> out) throws Exception {
        try {
            UdpRequestData requestData = new UdpRequestData();
            ByteBuf content = packet.content();
            requestData.setEncode(content.readByte());
 
            ...
 
            requestData.setSender(packet.sender());
 
            out.add(requestData);
        } catch (Exception e) {
            log.error(String.format(error decode udp packet from [ %s ], ignore!, packet.sender().toString()));
        }
    }
}

编码器也是类似,需要在业务处理的handler中判断是否为UdpRequestData,确定是否需要在write的时候使用对应的UdpResponseData。

服务器端连接管理

服务器连接管理,主要用于记录服务器端链接数量和负载的记录,这些数据会被写入到Redis中,客户端将依据这些数据来挑选连接数量最少、 压力最小的服务器进行连接。之前使用的TCP协议,因此连接管理非常方便,服务器端维护一个ChannelGroup,在active和inactive的时候从ChannelGroup 中添加或者删除。

由于UDP是无连接的,无法通过handler的几个回调来判断连接状态,所以将逻辑改成了服务器端维护一个hashmap,客户端IP作为key, 最后一次udp包接收时间作为value。在处理业务逻辑的handler中,每当收到一个UDP包,就将这对数据放入到hashmap中。服务器端定时器在定时写入Redis 的时候,通过判断value是否过期,来移除这个键值对。通过这样的逻辑,来记录服务器端的连接数基本上是靠谱的。

同时,为了兼容原先的TCP方式心跳,需要将TCP方式的连接对应的时间戳,改成Long.MAX_VALUE,确保TCP连接只在inactive的时候才被移除。

@Override
protected void channelRead0(ChannelHandlerContext ctx, RequestData request) throws Exception {
    RequestContext requestContext = new RequestContext(request, ctx.channel());
    ResponseData response = DO_SOMETHING(requestContext);
 
    if (request instanceof UdpRequestData{
        UdpRequestData udpRequest = (UdpRequestData) request;
        // log current sender ip 
        Server.addClient(udpRequest.getSender().getAddress().getHostAddress(), System.currentTimeMillis());
 
        UdpResponseData udpResponseData = UdpResponseData.fromResponseData(response);
        udpResponseData.setRecipient(udpRequest.getSender());
        ctx.writeAndFlush(udpResponseData);
    } else {
        ctx.writeAndFlush(response);
    }
}
 
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    Channel channel = ctx.channel();
    SocketAddress remoteAddress = channel.remoteAddress();
    if (remoteAddress != null{
        InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
        // tcp never timeout 
        Server.addClient(inetSocketAddress.getAddress().getHostAddress(), Long.MAX_VALUE);
    }
    super.channelActive(ctx);
}
 
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    Channel channel = ctx.channel();
    SocketAddress remoteAddress = channel.remoteAddress();
    if (remoteAddress != null{
        InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
        Server.removeClient(inetSocketAddress.getAddress().getHostAddress());
    }
    super.channelInactive(ctx);
}

定时器处理逻辑大致为:

public ServerClusterInfo getServerClusterInfo() {
    ServerClusterInfo serverClusterInfo = new ServerClusterInfo();
    serverClusterInfo.setPort(this.serverClusterInfo.getPort());
    serverClusterInfo.setHost(this.serverClusterInfo.getHost());
 
    Set<String> ips = new HashSet<String>();
    long currentTs = System.currentTimeMillis();
    Iterator<Map.Entry<StringLong>> iter = ipMap.entrySet().iterator();
    while (iter.hasNext(){
        Map.Entry<StringLong> ipEntry = iter.next();
        if (currentTs - ipEntry.getValue() < TIMEOUT{
            ips.add(ipEntry.getKey());
        } else {
            iter.remove();
        }
    }
 
    serverClusterInfo.setCurChannel(ips.size());
    serverClusterInfo.setConnectIps(ips);
    serverClusterInfo.setCurConnect(ips.size());
    return serverClusterInfo;
}

客户端的超时重连

之前TCP版本的重连非常方便,当发现客户端连接服务器的channel isActive方法返回false的时候,直接重新获取服务器列表, 按照连接数排序,重新去连接一个连接数最低的服务器即可。

同样是因为UDP是无连接的,同时UDP也是不安全的,一个包发出去很有可能肉包子打狗——有去无回了。(当然,在内网环境下,网络环境还是应该乐观点的~) 处理类似的最简单办法,就是超时重连了。

之前的客户端,所有消息会被放到一个阻塞队列(因为心跳什么的都是由定时器线程产生的),由一个线程来完成从阻塞队列读取并发送的过程。 这次的修改,也只能从这里下手了。首先在客户端维护一个最后一个数据包返回的时间戳(为了线程安全神马的,直接用了AtomicLong类型)。

首先在初始化的时候,将这个值设置为0。确保第一次发送消息的时候,客户端会主动去“连接”服务端。为什么这里的“连接”是加上引号的呢? 原因很简答,还是那句话,UDP是无连接的。按照之前的设计,这个所谓的“连接”,只是简单的在channel的attribute map中设置了TARGET_ADDRESS这个变量而已。

Thread sendThread = new Thread() {
    public void run() {
        while (true{
            RequestData requestData;
            try {
                requestData = queue.take();
 
                long currentTimeMillis = System.currentTimeMillis();
 
                if (currentTimeMillis - lastResponse.get() > TIMEOUT{
                    boolean connect = BootStrapConnectUtil.connect(channelFuture, serverClusterInfos);
                    if(!connect) {
                        log.error(fail to connect to watch dog! ignore current package);
                        continue;
                    }
                }
 
                channelFuture.channel().writeAndFlush(requestData);
            } catch (Throwable e) {
                log.error(, e);
            }
 
        }
    }
};
 
public static boolean connect(ChannelFuture channelFuture, List<ServerClusterInfo> serverClusterInfos) {
    InetSocketAddress remoteAddress = channelFuture.channel().attr(UdpByteToMessageEncode.TARGET_ADDRESS).get();
    serverClusterInfos = sortServer(serverClusterInfos, remoteAddress);
    boolean result = false;
    for (ServerClusterInfo server : serverClusterInfos) {
        try {
            channelFuture.channel().attr(UdpByteToMessageEncode.TARGET_ADDRESS).set(server.getAddress());
            result = true;
            Client.getClient().setLastResponse(System.currentTimeMillis());
        } catch (Throwable ignored) {
        }
    }
    return result;
}

这里好多命名和流程,一看就不像是UDP协议的,的确,这些代码之前都走的是TCP协议~~

除此之外,当然是handler里面,要在每次收到服务器端响应包的时候,回写这个lastResponse了。

@Override
protected void channelRead0(ChannelHandlerContext ctx, ResponseData response) throws Exception {
    ResultEnum responseCode = ResultEnum.fromCode(response.getResult());
    if (responseCode != null{
        switch (responseCode) {
            case SUCCESS:// 处理成功 
                Client.getClient().setLastResponse(System.currentTimeMillis());
                break;
            default:
                // do nothing 
        }
    }
}
 
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.close();
    // 强行过期 
    AgentClient.getClient().setLastResponse(0l);
}

这里还加了一个,如果发生了异常,强行“关闭”连接。

这样,原先TCP方式的心跳,被改成了UDP方式。为了避免UDP协议的无连接、不可靠,整个流程中加了两个超时,确保了客户端、服务器端都能够和 以前一样正常的首发消息、断线重连等。

[redis设计与实现][7]基本数据结构——对象

Redis对基础数据类型进行了封装,构建出上层的对象系统,这个系统包含:字符串对象、列表对象、哈希对象、集合对象和有序集合对象。

Redis对象结构:

1
2
3
4
5
6
7
8
9
10
11
12
typedef struct redisObject {
//类型
unsigned type:4;
//编码
unsigned encoding:4;
//LRU时间
unsigned lru:REDIS_LRU_BITS; /* lru time (relative to server.lruclock) */
//引用计数
int refcount;
//底层实现数据结构的指针
void *ptr;
} robj;

相关宏定义:

1
2
3
#define REDIS_LRU_BITS 24
#define REDIS_LRU_CLOCK_MAX ((1<<REDIS_LRU_BITS)-1) /* Max value of obj->lru */
#define REDIS_LRU_CLOCK_RESOLUTION 1 /* LRU clock resolution in seconds */

Redis对象类型:(使用type命令查看)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* Object types */
#define REDIS_STRING 0
#define REDIS_LIST 1
#define REDIS_SET 2
#define REDIS_ZSET 3
#define REDIS_HASH 4

//对象编码类型:(使用object encoding命令)
#define REDIS_ENCODING_RAW 0     /* Raw representation */
#define REDIS_ENCODING_INT 1     /* Encoded as integer */
#define REDIS_ENCODING_HT 2      /* Encoded as hash table */
#define REDIS_ENCODING_ZIPMAP 3  /* Encoded as zipmap */
#define REDIS_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */
#define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define REDIS_ENCODING_INTSET 6  /* Encoded as intset */
#define REDIS_ENCODING_SKIPLIST 7  /* Encoded as skiplist */

不同类型和编码的对象:

类型 编码 对象
REDIS_STRING REDIS_ENCODING_INT 使用整数值实现的字符串对象
REDIS_STRING REDIS_ENCODING_EMBSTR 这货redis3.0引入的,不管
REDIS_STRING REDIS_ENCODING_RAW sds实现的字符串对象
REDIS_LIST REDIS_ENCODING_ZIPLIST 使用压缩列表实现的列表对象
REDIS_LIST REDIS_ENCODING_LINKEDLIST 使用双向链表实现的列表对象
REDIS_HASH REDIS_ENCODING_ZIPLIST 使用压缩列表实现的哈希对象
REDIS_HASH REDIS_ENCODING_HT 使用字典实现的哈希对象
REDIS_SET REDIS_ENCODING_INTSET 使用整数集合实现的集合对象
REDIS_SET REDIS_ENCODING_HT 使用字典实现的集合对象
REDIS_ZSET REDIS_ENCODING_ZIPLIST 使用压缩列表实现的有序集合对象
REDIS_ZSET REDIS_ENCODING_SKIPLIST 使用跳跃表想和字典实现的有序集合对象

字符串对象:
整数(long)被保存为int类型
浮点保存为字符串,浮点运算(INCRYBYFLOAT)redis先转成浮点,进行运算后再转回字符串

列表对象:
列表对象同时满足一下两个条件时,列表对象使用ziplist编码
列表对象保存的所有字符串元素的长度都小于64个字节(list-max-ziplist-value)
列表对象保存的元素数量小于512个(list-max-ziplist-entries)
不能满足条件的都需要使用linkedlist编码。

哈希对象:
使用压缩列表,键和值分别作为节点按顺序放入列表
使用ziplist的条件:
哈希对象保存的所有键值对的键和值的字符串长度小于64个字节(hash-max-ziplist-value)
哈希对象保存的键值对数量小于512个(hash-max-ziplist-entries)

集合对象:
hashtable编码,字典的每个键都是一个字符串对象,字典的值全部设置为NULL
使用intset编码条件:
集合对象保存的所有元素是整数值
集合对象保存的元素数量不超过512个(set-max-intset-entries)
不满足条件的集合对象需要使用hashtable编码

有序集合对象:
ziplist编码的有序集合,每个集合元素使用两个紧挨在一起的压缩节点列表来保存,第一个节点保存元素的成员,第二个节点保存元素的分值。
skiplist编码的有序集合,采用一个skiplist和一个hashtable实现。
使用ziplist编码条件:
有序集合保存的元素数量小于128个(zset-max-ziplist-entries)
有序集合保存的所有元素成员的长度都小于64字节(zset-max-ziplist-value)

对象共享:
Redis在初始化时,会创建1w个字符串对象(REDIS_SHARED_INTEGERS),包含整数0~9999,当需要使用这些对象的时候,会使用这些对象的引用(引用计数)而不新创建。

1
2
3
4
if (value >= 0 && value < REDIS_SHARED_INTEGERS) {
incrRefCount(shared.integers[value]);
o = shared.integers[value];
}

[redis设计与实现][5]基本数据结构——整数集合

整数集合(intset)用于集合键。当一个集合只包含整数值元素,并且数量不多的时候,会使用整数集合作为集合键的底层实现。相对于直接保存字符串,整数集合能够很好地节约内存,但是由于是数组保存,需要特别关注数组长度。

定义:(intset.h)

1
2
3
4
5
6
7
8
typedef struct intset {
//编码方式
    uint32_t encoding;
//集合包含的元素数量
    uint32_t length;
//保存元素的数组
    int8_t contents[];
} intset;

encoding:

  • INTSETENCINT16:content数组每一项都是一个int16_t类型的数值(有符号)
  • INTSETENCINT32:content数组每一项都是一个int32_t类型的数值
  • INTSETENCINT64:content数组每一项都是一个int64_t类型的数值

整数集合支持长度升级,但是不支持长度降级。当插入的最大长度超过当前编码方式容纳的最大值的时候,会对编码类型进行升级。但是如果删除了一个大数字,整体整数集合不会再进行降级。

intset *intsetNew(void);(创建一个新的整数集合)

1
2
3
4
5
6
7
8
intset *intsetNew(void) {
    intset *is = zmalloc(sizeof(intset));
//对于x86架构采用小端序,#define intrev32ifbe(v) (v)
// 初始化的时候编码为INTSET_ENC_INT16
    is->encoding = intrev32ifbe(INTSET_ENC_INT16);
    is->length = 0;
    return is;
}

intset *intsetAdd(intset *is, int64t value, uint8t *success);(添加到整数集合)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
//判断当前值需要的编码类型

    uint8_t valenc = _intsetValueEncoding(value);
    uint32_t pos;
    if (success) *success = 1;

    /* Upgrade encoding if necessary. If we need to upgrade, we know that
     * this value should be either appended (if > 0) or prepended (if < 0),
     * because it lies outside the range of existing values. */

//如果当前值的编码类型大于当前整数集合的编码,需要进行升级
    if (valenc > intrev32ifbe(is->encoding)) {
        /* This always succeeds, so we don't need to curry *success. */
        return intsetUpgradeAndAdd(is,value);
    } else {
        /* Abort if the value is already present in the set.
         * This call will populate "pos" with the right position to insert
         * the value when it cannot be found. */

//查找当前数值是否已经存在
        if (intsetSearch(is,value,&pos)) {
            if (success) *success = 0;
            return is;
        }

        is = intsetResize(is,intrev32ifbe(is->length)+1);
//移动插入点以后的元素,空出插入位置
        if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
    }

    _intsetSet(is,pos,value);
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    return is;
}
static uint8_t _intsetValueEncoding(int64_t v) {
    if (v < INT32_MIN || v > INT32_MAX)
        return INTSET_ENC_INT64;
    else if (v < INT16_MIN || v > INT16_MAX)
        return INTSET_ENC_INT32;
    else
        return INTSET_ENC_INT16;
}
//升级整数集合
static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
    uint8_t curenc = intrev32ifbe(is->encoding);
    uint8_t newenc = _intsetValueEncoding(value);
    int length = intrev32ifbe(is->length);
    int prepend = value < 0 ? 1 : 0;

    /* First set new encoding and resize */
//设置新的编码
    is->encoding = intrev32ifbe(newenc);
//重新分配整数集合大小
    is = intsetResize(is,intrev32ifbe(is->length)+1);

    /* Upgrade back-to-front so we don't overwrite values.
     * Note that the "prepend" variable is used to make sure we have an empty
     * space at either the beginning or the end of the intset. */

//倒着重新设置值,防止内存覆盖
    while(length--)
        _intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));

    /* Set the value at the beginning or the end. */
    if (prepend)
//插入的值小于0,放到最前面
        _intsetSet(is,0,value);
    else
//插入的值大于0,放到最后面
        _intsetSet(is,intrev32ifbe(is->length),value);
//修改整数集合长度
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    return is;
}
//重新分配整数集合大小
static intset *intsetResize(intset *is, uint32_t len) {
    uint32_t size = len*intrev32ifbe(is->encoding);
    is = zrealloc(is,sizeof(intset)+size);
    return is;
}
//获取指定位置的值
static int64_t _intsetGetEncoded(intset *is, int pos, uint8_t enc) {
    int64_t v64;
    int32_t v32;
    int16_t v16;

    if (enc == INTSET_ENC_INT64) {
        memcpy(&v64,((int64_t*)is->contents)+pos,sizeof(v64));
        memrev64ifbe(&v64);
        return v64;
    } else if (enc == INTSET_ENC_INT32) {
        memcpy(&v32,((int32_t*)is->contents)+pos,sizeof(v32));
        memrev32ifbe(&v32);
        return v32;
    } else {
        memcpy(&v16,((int16_t*)is->contents)+pos,sizeof(v16));
        memrev16ifbe(&v16);
        return v16;
    }
}
//插入到指定位置
static void _intsetSet(intset *is, int pos, int64_t value) {
    uint32_t encoding = intrev32ifbe(is->encoding);

    if (encoding == INTSET_ENC_INT64) {
        ((int64_t*)is->contents)[pos] = value;
        memrev64ifbe(((int64_t*)is->contents)+pos);
    } else if (encoding == INTSET_ENC_INT32) {
        ((int32_t*)is->contents)[pos] = value;
        memrev32ifbe(((int32_t*)is->contents)+pos);
    } else {
        ((int16_t*)is->contents)[pos] = value;
        memrev16ifbe(((int16_t*)is->contents)+pos);
    }
}
//查找数值是否存在(二分查找)
static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
    int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
    int64_t cur = -1;

    /* The value can never be found when the set is empty */
//为空直接退出
    if (intrev32ifbe(is->length) == 0) {
        if (pos) *pos = 0;
        return 0;
    } else {
        /* Check for the case where we know we cannot find the value,
         * but do know the insert position. */

//如果插入数值比最大的大或者比最小的小,直接退出,设置pos
        if (value > _intsetGet(is,intrev32ifbe(is->length)-1)) {
            if (pos) *pos = intrev32ifbe(is->length);
            return 0;
        } else if (value < _intsetGet(is,0)) {
            if (pos) *pos = 0;
            return 0;
        }
    }

//折半查找
    while(max >= min) {
        mid = ((unsigned int)min + (unsigned int)max) >> 1;
        cur = _intsetGet(is,mid);
        if (value > cur) {
            min = mid+1;
        } else if (value < cur) {
            max = mid-1;
        } else {
            break;
        }
    }

    if (value == cur) {
        if (pos) *pos = mid;
        return 1;
    } else {
        if (pos) *pos = min;
        return 0;
    }
}
//从指定位置开始移动到最尾
static void intsetMoveTail(intset *is, uint32_t from, uint32_t to) {
    void *src, *dst;
    uint32_t bytes = intrev32ifbe(is->length)-from;
    uint32_t encoding = intrev32ifbe(is->encoding);

    if (encoding == INTSET_ENC_INT64) {
        src = (int64_t*)is->contents+from;
        dst = (int64_t*)is->contents+to;
        bytes *= sizeof(int64_t);
    } else if (encoding == INTSET_ENC_INT32) {
        src = (int32_t*)is->contents+from;
        dst = (int32_t*)is->contents+to;
        bytes *= sizeof(int32_t);
    } else {
        src = (int16_t*)is->contents+from;
        dst = (int16_t*)is->contents+to;
        bytes *= sizeof(int16_t);
    }
    memmove(dst,src,bytes);
}
intset *intsetRemove(intset *is, int64_t value, int *success);(移除元素)

intset *intsetRemove(intset *is, int64_t value, int *success) {
    uint8_t valenc = _intsetValueEncoding(value);
    uint32_t pos;
    if (success) *success = 0;

//匹配当前编码并查找元素
    if (valenc <= intrev32ifbe(is->encoding) && intsetSearch(is,value,&pos)) {
        uint32_t len = intrev32ifbe(is->length);

        /* We know we can delete */
        if (success) *success = 1;

        /* Overwrite value with tail and update length */
//找到后,向前移动数组
        if (pos < (len-1)) intsetMoveTail(is,pos+1,pos);
//收缩数组
        is = intsetResize(is,len-1);
        is->length = intrev32ifbe(len-1);
    }
    return is;
}

[redis设计与实现][4]基本数据结构——跳跃表

Redis使用跳跃表和字典共同来实现有序集合键(sorted set)。

定义:
跳跃表节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
typedef struct zskiplistNode {
//成员对象
    robj *obj;
//分值
    double score;
//后退指针
    struct zskiplistNode *backward;
//层结构
    struct zskiplistLevel {
     //前进指针
        struct zskiplistNode *forward;
     //跨度
        unsigned int span;
    } level[];
} zskiplistNode;

跳跃表定义:

1
2
3
4
5
6
7
8
typedef struct zskiplist {
//跳跃表头、尾指针
    struct zskiplistNode *header, *tail;
//跳跃表长度
    unsigned long length;
//跳跃表层数
    int level;
} zskiplist;

zskiplist *zslCreate(void);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;

    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;
    zsl->length = 0;
//创建头节点,头节点永远有最高层
//#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
//初始化头节点的每一层
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}
zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
    zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
    zn->score = score;
    zn->obj = obj;
    return zn;
}

zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    redisAssert(!isnan(score));
    x = zsl->header;
//遍历查找当前score对应的插入位置
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
//累加跨度
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
//当前层要更新的节点
        update[i] = x;
    }
    /* we assume the key is not already inside, since we allow duplicated
     * scores, and the re-insertion of score and redis object should never
     * happen since the caller of zslInsert() should test in the hash table
     * if the element is already inside or not. */

//随机一个当前节点的层数
    level = zslRandomLevel();
//如果是最高层,需要修正
//在以前最高层以上的,跨度都修复为跳跃表原长度,前置节点都改为头结点
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
//跳跃表的总层数改成最新的层数
        zsl->level = level;
    }
    x = zslCreateNode(level,score,obj);
//每一层的链表构建
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here */
//当前节点每层跨度计算
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
//当前节点每层插入节点跨度修改
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

//修改当前节点的前置节点等
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}
int zslRandomLevel(void) {
    int level = 1;
//#define ZSKIPLIST_P 0.25      /* Skiplist P = 1/4 */
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

snoopy初探

snoopy是什么?刚了解这货的时候,是公司服务器上有snoopy的so无法加载的错误,然后是系统日志里面一堆日志,导致机器空间不足。官方说明是:
Snoopy is designed to aid a sysadmin by providing a log of commands executed.也就是说这货会监控服务器上的命令执行,并记录到syslog。

首先来看下这个功能是怎么被完成的。首先会发现,服务器上的/etc/ld.so.preload这个文件被修改,强制可执行程序加载之前加载snoopy的so:

1
2
#cat /etc/ld.so.preload
/usr/local/snoopy/lib/snoopy.so

关于ld.so.preload和LD_PRELOAD,可以参考man ld.so:

LD_PRELOAD

A whitespace-separated list of additional, user-specified, ELF shared libraries to be loaded before all others. This can be used to selectively override functions in other shared libraries. For set-user-ID/set-group-ID ELF binaries, only libraries in the standard search directories that are also set-user-ID will be loaded.

/etc/ld.so.preload

File containing a whitespace separated list of ELF shared libraries to be loaded before the program.

也就是说,snoopy.so会在所有ELF文件加载之前预加载,确保将一些系统调用被这货劫持。

如果在机器上执行

1
ls /notfound

会在系统日志中记录类似:

snoopy[25505]: [uid:0 sid:9701 tty:/dev/pts/15 cwd:/root filename:/bin/ls]: ls /notfound

这样的内容。通过ldd,可以看下一个命令加载的动态链接库:

1
2
3
4
5
6
7
8
9
10
11
12
#ldd /bin/ls
linux-vdso.so.1 =>  (0x00007fff99fff000)
/usr/local/snoopy/lib/snoopy.so (0x00007fc407938000)
librt.so.1 => /lib64/librt.so.1 (0x0000003e4f600000)
libacl.so.1 => /lib64/libacl.so.1 (0x0000003e50200000)
libselinux.so.1 => /lib64/libselinux.so.1 (0x0000003e4fa00000)
libc.so.6 => /lib64/libc.so.6 (0x0000003e4e200000)
libdl.so.2 => /lib64/libdl.so.2 (0x0000003e4e600000)
libpthread.so.0 => /lib64/libpthread.so.0 (0x0000003e4ea00000)
/lib64/ld-linux-x86-64.so.2 (0x0000003e4de00000)
libattr.so.1 => /lib64/libattr.so.1 (0x0000003e4f200000)
libsepol.so.1 => /lib64/libsepol.so.1 (0x0000003e4fe00000)

果然snoopy被最早加载了。

那么如果想不要snoopy加载,又有什么办法呢?

搜索了半天,没有看见Linux中有什么办法能够将设置在ld.so.preload中预先加载的so给卸载掉,但是有一个值得注意的是:LD_PRELOAD加载的优先级高于ld.so.preload。
我们能不能通过更早的加载被劫持的系统调用,来避免被劫持呢?答案是肯定的。

首先,我们可以猜测出来这货是不会劫持太多的系统调用,通过nm命令看下snoopy的符号表:

1
2
3
4
5
#nm /usr/local/snoopy/lib/snoopy.so
...
0000000000000890 T execv
0000000000000b00 T execve
...

这两个系统调用应该是非常熟悉的,函数定义在unistd.h中,主要用于创建新的进程,并加载另一个可执行程序。只要截获了这几个系统调用,就能知道要执行什么命令了。
我们还可以再用bash来验证下:

1
2
3
4
5
6
#strace bash -c "ls /abc"
...
connect(3, {sa_family=AF_FILE, path="/dev/log"...}, 110) = 0
sendto(3, "<86>Sep 23 04:34:56 snoopy[28052"..., 104, MSG_NOSIGNAL, NULL, 0) = 104
execve("/bin/ls", ["ls", "/abc"], [/* 29 vars */]) = 0
...

从这里可以看出两个地方,首先bash在执行ls的时候,的确是通过execve这个系统调用的;其次,这个系统调用已经被snoopy劫持了(向日志设备发送了数据)。

最后,通过代码也验证了这个猜想:代码在这里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int execv (const char *filename, char *const argv[]) {
    static int (*func)(const char *, char **);

    FN(func,int,"execv",(const char *, char **const));
    snoopy_log_syscall_execv(filename, argv);

    return (*func) (filename, (char **) argv);
}
int execve (const char *filename, char *const argv[], char *const envp[])
{
    static int (*func)(const char *, char **, char **);

    FN(func,int,"execve",(const char *, char **const, char **const));
    snoopy_log_syscall_execve(filename, argv, envp);

    return (*func) (filename, (char**) argv, (char **) envp);
}

知道了原理,想到绕开,就很简单了。首先,找到execve真实的提供者:从之前ls命令的动态链接库就可以看见大部分系统的系统调用,都在/lib64/libc.so.6中,用nm命令验证下:

1
2
3
4
5
6
7
#nm /lib64/libc.so.6 | grep execv
0000003e4e29acf0 t __GI_execvp
0000003e4e29a880 t __execve
0000003e4e29a980 T execv
0000003e4e29a880 W execve
0000003e4e29acf0 T execvp
0000003e4e29a8b0 T fexecve

因此,然后尝试执行:

1
LD_PRELOAD="/lib64/libc.so.6" bash -c "ls /abc"

再去查看系统日志,会发现只记录了bash -c “ls /abc”这个命令,真正执行的ls /abc这个命令没有记录。
通过strace也可以看见,在执行ls之前,没有了和系统日志交互的连接了。

stat(“/bin/ls”, {stmode=SIFREG|0755, st_size=91272, …}) = 0

access(“/bin/ls”, X_OK) = 0

access(“/bin/ls”, R_OK) = 0

rtsigaction(SIGINT, {SIGDFL, [], SARESTORER, 0x3e4e2302d0}, {SIGDFL, [], SA_RESTORER, 0x3e4e2302d0}, 8) = 0

rtsigaction(SIGQUIT, {SIGDFL, [], SARESTORER, 0x3e4e2302d0}, {0x1, [], SARESTORER, 0x3e4e2302d0}, 8) = 0

rtsigaction(SIGCHLD, {SIGDFL, [], SARESTORER, 0x3e4e2302d0}, {0x436360, [], SARESTORER, 0x3e4e2302d0}, 8) = 0

execve(“/bin/ls”, ["ls", "/abc"], [/* 30 vars */]) = 0

也就是说,如果你登陆服务器之后,执行了:

1
LD_PRELOAD="/lib64/libc.so.6" bash

用当前被snoopy劫持的bash再创建一个没有被snoopy劫持的bash,之后执行的所有命令,都不会再被记录。

gentoo vlc qt5环境下编译失败

之前在虚拟机里面安装了个gentoo,用来尝试安装kde5。在升级系统的时候,发现vlc一直编译失败(好像是phonon引入的)。查了下发现是vlc在编译的时候,发现了qt5,但是按照qt4的方式编译了,导致自身图形界面相关的类编译失败。
编译失败的信息:

1
2
3
4
5
6
7
make[6]: Entering directory '/var/tmp/portage/media-video/vlc-2.1.4/work/vlc-2.1.4/modules/gui/qt4'
../../../doltlibtool  --tag=CXX   --mode=compile x86_64-pc-linux-gnu-g++ -DHAVE_CONFIG_H -I. -I../../..  -DMODULE_NAME=$(p="libqt4_plugin_la-qt4.lo"; p="${p##*/}"; p="${p#lib}"; echo "${p%_plugin*}") -DMODULE_NAME_IS_$(p="libqt4_plugin_la-qt4.lo"; p="${p##*/}"; p="${p#lib}"; echo "${p%_plugin*}") -DMODULE_STRING=\"$(p="libqt4_plugin_la-qt4.lo"; p="${p##*/}"; p="${p#lib}"; echo "${p%_plugin*}")\" -D__PLUGIN__  -I../../../include -I../../../include  -I/usr/include/samba-4.0  -DQT_SHARED -I/usr/include/qt4 -I/usr/include/qt4/QtGui -I/usr/include/qt4 -I/usr/include/qt4/QtCore   -O2 -mprefer-avx128 -fomit-frame-pointer -pipe -march=bdver2 -mmmx -mno-3dnow -msse -msse2 -msse3 -mssse3 -msse4a -mcx16 -msahf -mno-movbe -maes -mno-sha -mpclmul -mpopcnt -mabm -mlwp -mfma -mfma4 -mxop -mbmi -mno-bmi2 -mtbm -mavx -mno-avx2 -msse4.2 -msse4.1 -mlzcnt -mno-rtm -mno-hle -mno-rdrnd -mf16c -mno-fsgsbase -mno-rdseed -mprfchw -mno-adx -mfxsr -mxsave -mno-xsaveopt -mno-avx512f -mno-avx512er -mno-avx512cd -mno-avx512pf -mno-prefetchwt1 --param l1-cache-size=16 --param l1-cache-line-size=64 --param l2-cache-size=2048 -mtune=bdver2 -fstack-protector-strong -Wall -Wextra -Wsign-compare -Wundef -Wpointer-arith -Wvolatile-register-var -fvisibility=hidden -c -o libqt4_plugin_la-qt4.lo `test -f 'qt4.cpp' || echo './'`qt4.cpp
In file included from dialogs/open.hpp:35:0,
                 from dialogs_provider.hpp:36,
                 from qt4.cpp:36:
./ui/open.h:14:29: fatal error: QtWidgets/QAction: No such file or directory
 #include <QtWidgets/QAction>

参考vlcgentoo的bug,发现已经有人提交了一个补丁。具体补丁文件在:这里