coolEx

Today will be better

[redis设计与实现][6]基本数据结构——压缩列表

压缩列表(ziplist)是列表键和哈希键的底层实现之一。
压缩列表结构:

属性 类型 长度 用途
zlbytes uint32_t 4字节 记录整个压缩列表占用的内存字节数:在对压缩列表进行内存重分配,或者计算zlend的位置时使用
zltail uint32_t 4字节 记录压缩列表尾节点记录压缩列表的起始地址便宜:用于快速定位尾节点
zllen uint16_t 2字节 记录压缩列表包含的节点数量:当这个值小于UINT64_MAX(65535)时,这个属性的值就是压缩列表包含的节点数量;当这个值等于UINT64_MAX时,节点的真实数量需要遍历整个压缩列表才能计算得出
entryX 列表节点 不定 压缩列表包含的各个节点,节点长度由节点保存的内容决定
zlend uint8_t 1字节 特殊值0xFF,用于标记压缩列表的末端

压缩列表节点结构:

  • previous_entry_length:以字节为单位,记录压缩列表中前一个节点的长度。previous_entry_length属性的长度可以是1字节或者5字节:
    • 如果前一节点的长度小于254字节,那么previous_entry_length属性的长度为1字节
    • 日过前一节点的长度大于等于254字节,那么previous_entry_length属性的长度为5字节:其中第一字解会被设置为0xFE(254),而之后的四个字节用于保存前一节点的长度
  • encoding:节点的encoding属性记录了节点content属性薄脆数据的类型以及长度:
    • 一字节、两字节或者五字节长,值的最高位为00、01或者10的是字节数组编码:这种编码便是节点的content属性保存着字节数组,数组的长度由编码除去最高两位之后的其他位记录;
    • 一字节长,值的最高位以11开头的是整数编码:这种编码表示节点的content属性保存着整数值,整数值的类型和长度由编码去除最高两位之后的其他位记录;
  • content

字节数组编码:

编码 编码长度 content属性保存的值
00bbbbbb 1字节 长度小于等于63字节的字节数组
01bbbbbb xxxxxxxx 2字节 长度小于等于16383字节的字节数组
10______ aaaaaaaa bbbbbbbb cccccccc dddddddd 5字节 长度小于等于4294967295的字节数组

整数编码:

编码 编码长度 content属性保存的值
11000000 1字节 int16_t类型的整数
11010000 1字节 int32_t类型的整数
11100000 1字节 int64_t类型的整数
11110000 1字节 24位有符号整数
11111110 1字节 8位有符号整数
1111xxxx 1字节 无contennt属性,xxxx保存了0到12之前的值

连锁更新:
多个连续、长度结余250到253字节之间的节点,如果前面有插入或删除节点,导致previous_entry_length属性需要扩张成5字节,可能导致这些连续的节点都需要更新。最坏情况可能会对整个压缩列表重新执行N次空间重新分配。

unsigned char ziplistNew(void);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
unsigned char *ziplistNew(void) {
//多一个表示结尾的字节
    unsigned int bytes = ZIPLIST_HEADER_SIZE+1;
    unsigned char *zl = zmalloc(bytes);
//#define ZIPLIST_BYTES(zl)       (</em>((uint32_t<em>)(zl)))
    ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
//#define ZIPLIST_TAIL_OFFSET(zl) (</em>((uint32_t<em>)((zl)+sizeof(uint32_t))))
    ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
//#define ZIPLIST_LENGTH(zl)      (</em>((uint16_t<em>)((zl)+sizeof(uint32_t)</em>2)))
    ZIPLIST_LENGTH(zl) = 0;
//#define ZIP_END 255
    zl[bytes-1] = ZIP_END;
    return zl;
}

unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where) {
    unsigned char *p;
//#define ZIPLIST_ENTRY_HEAD(zl)  ((zl)+ZIPLIST_HEADER_SIZE)
//#define ZIPLIST_ENTRY_END(zl)   ((zl)+intrev32ifbe(ZIPLIST_BYTES(zl))-1)
    p = (where == ZIPLIST_HEAD) ? ZIPLIST_ENTRY_HEAD(zl) : ZIPLIST_ENTRY_END(zl);
    return __ziplistInsert(zl,p,s,slen);
}
static unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen;
    unsigned int prevlensize, prevlen = 0;
    size_t offset;
    int nextdiff = 0;
    unsigned char encoding = 0;
    long long value = 123456789; /* initialized to avoid warning. Using a value
                                    that is easy to see if for some reason
                                    we use it uninitialized. */

    zlentry tail;

<pre><code>/* Find out prevlen for the entry that is inserted. */
if (p[0] != ZIP_END) {
</code></pre>

//获取当前节点的previous_entry_length
        ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
    } else {
        unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
        if (ptail[0] != ZIP_END) {
            prevlen = zipRawEntryLength(ptail);
        }
    }

<pre><code>/* See if the entry can be encoded */
</code></pre>

//尝试是否可以编码为整数,同时返回整数编码
    if (zipTryEncoding(s,slen,&value,&encoding)) {
        /* 'encoding' is set to the appropriate integer encoding <em>/
//是整数编码,返回content字节数
        reqlen = zipIntSize(encoding);
    } else {
        /</em> 'encoding' is untouched, however zipEncodeLength will use the
         * string length to figure out how to encode it. <em>/
//字符数组编码,长度直接是数组长度
        reqlen = slen;
    }
    /</em> We need space for both the length of the previous entry and
     * the length of the payload. */

//加上前置节点长度编码占用字节数
    reqlen += zipPrevEncodeLength(NULL,prevlen);
//加上编码占用字节数
    reqlen += zipEncodeLength(NULL,encoding,slen);

<pre><code>/* When the insert position is not equal to the tail, we need to
 * make sure that the next entry can hold this entry's length in
 * its prevlen field. */

</code></pre>

//判断插入位置的节点,实际前置节点部分字节差
//如果有差距后面需要从1字节变成5字节
    nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;

<pre><code>/* Store offset because a realloc may change the address of zl. */
offset = p-zl;
</code></pre>

//重新分配压缩列表内存,重新计算当前插入节点位置
    zl = ziplistResize(zl,curlen+reqlen+nextdiff);
    p = zl+offset;

<pre><code>/* Apply memory move when necessary and update tail offset. */
</code></pre>

//从头插入,后面的节点内存位置向后移动
    if (p[0] != ZIP_END) {
        /* Subtract one because of the ZIP_END bytes */
//向后移动,目标位置当前节点以后,起始位置当前位置减去前置节点便宜,长度为所有原节点长度减去尾部标示符
        memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);

<pre><code>    /* Encode this entry's raw length in the next entry. */
</code></pre>

//设置后面节点的前置节点长度
        zipPrevEncodeLength(p+reqlen,reqlen);

<pre><code>    /* Update offset for tail */
</code></pre>

//更新尾部节点偏移量
        ZIPLIST_TAIL_OFFSET(zl) =
            intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);

<pre><code>    /* When the tail contains more than one entry, we need to take
     * &quot;nextdiff&quot; in account as well. Otherwise, a change in the
     * size of prevlen doesn't have an effect on the *tail* offset. */

    tail = zipEntry(p+reqlen);
</code></pre>

//尾部标示符不对,说明后一个节点的前置节点便宜有扩大(nextdiff > 0),以新的为准
        if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
        }
    } else {
//尾部插入,直接更新尾部标记
        /* This element will be the new tail. */
        ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
    }

<pre><code>/* When nextdiff != 0, the raw length of the next entry has changed, so
 * we need to cascade the update throughout the ziplist */

if (nextdiff != 0) {
    offset = p-zl;
</code></pre>

//传说中的连锁更新
        zl = __ziplistCascadeUpdate(zl,p+reqlen);
        p = zl+offset;
    }

<pre><code>/* Write the entry */
</code></pre>

//写入当前节点
//前置节点偏移
    p += zipPrevEncodeLength(p,prevlen);
//编码
    p += zipEncodeLength(p,encoding,slen);
//#define ZIP_IS_STR(enc) (((enc) & ZIP_STR_MASK) < ZIP_STR_MASK)
    if (ZIP_IS_STR(encoding)) {
//是字符串编码,直接复制过去
        memcpy(p,s,slen);
    } else {
//是整数编码,写入整数
        zipSaveInteger(p,value,encoding);
    }
//增加压缩列表节点数
    ZIPLIST_INCR_LENGTH(zl,1);
    return zl;
}

static int zipTryEncoding(unsigned char *entry, unsigned int entrylen, long long *v, unsigned char *encoding) {
    long long value;

<pre><code>if (entrylen &gt;= 32 || entrylen == 0) return 0;
</code></pre>

//尝试将字符串转换成整数
    if (string2ll((char<em>)entry,entrylen,&value)) {
        /</em> Great, the string can be encoded. Check what's the smallest
         * of our encoding types that can hold this value. */
        if (value >= 0 && value <= 12) {
            *encoding = ZIP_INT_IMM_MIN+value;
        } else if (value >= INT8_MIN && value <= INT8_MAX) {
            *encoding = ZIP_INT_8B;
        } else if (value >= INT16_MIN && value <= INT16_MAX) {
            *encoding = ZIP_INT_16B;
        } else if (value >= INT24_MIN && value <= INT24_MAX) {
            *encoding = ZIP_INT_24B;
        } else if (value >= INT32_MIN && value <= INT32_MAX) {
            *encoding = ZIP_INT_32B;
        } else {
            *encoding = ZIP_INT_64B;
        }
        *v = value;
        return 1;
    }
    return 0;
}
//联锁(级联)更新
static unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p) {
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), rawlen, rawlensize;
    size_t offset, noffset, extra;
    unsigned char *np;
    zlentry cur, next;

<pre><code>while (p[0] != ZIP_END) {
    cur = zipEntry(p);
    rawlen = cur.headersize + cur.len;
    rawlensize = zipPrevEncodeLength(NULL,rawlen);

    /* Abort if there is no next entry. */
</code></pre>

//下一个节点是列表尾,退出
        if (p[rawlen] == ZIP_END) break;
        next = zipEntry(p+rawlen);

<pre><code>    /* Abort when &quot;prevlen&quot; has not changed. */
</code></pre>

//下一个节点前置长度节点没有变化,不需要级联更新
        if (next.prevrawlen == rawlen) break;

//下一个节点需要扩张前置节点偏移
        if (next.prevrawlensize < rawlensize) {
            /* The "prevlen" field of "next" needs more bytes to hold
             * the raw length of "cur". */
            offset = p-zl;
            extra = rawlensize-next.prevrawlensize;
//压缩列表重新分配空间,原空间加上扩充空间
            zl = ziplistResize(zl,curlen+extra);
            p = zl+offset;

<pre><code>        /* Current pointer and offset for next element. */
        np = p+rawlen;
        noffset = np-zl;

        /* Update tail offset when next element is not the tail element. */
</code></pre>

//更新尾节点偏移
            if ((zl+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) != np) {
                ZIPLIST_TAIL_OFFSET(zl) =
                    intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra);
            }

<pre><code>        /* Move the tail to the back. */
</code></pre>

//向后移动内存
            memmove(np+rawlensize,
                np+next.prevrawlensize,
                curlen-noffset-next.prevrawlensize-1);
//重新设置上一节点长度
            zipPrevEncodeLength(np,rawlen);

<pre><code>        /* Advance the cursor */
</code></pre>

//向后遍历是否需要继续调整
            p += rawlen;
            curlen += extra;
        } else {
//下一个节点是要缩短的,强制不缩短,防止过多的内存重新分配
            if (next.prevrawlensize > rawlensize) {
                /* This would result in shrinking, which we want to avoid.
                 * So, set "rawlen" in the available bytes. */
                zipPrevEncodeLengthForceLarge(p+rawlen,rawlen);
            } else {
                zipPrevEncodeLength(p+rawlen,rawlen);
            }

<pre><code>        /* Stop here, as the raw length of &quot;next&quot; has not changed. */
        break;
    }
}
return zl;
</code></pre>

}
//写入整数
static void zipSaveInteger(unsigned char <em>p, int64_t value, unsigned char encoding) {
    int16_t i16;
    int32_t i32;
    int64_t i64;
    if (encoding == ZIP_INT_8B) {
        ((int8_t</em>)p)[0] = (int8_t)value;
    } else if (encoding == ZIP_INT_16B) {
        i16 = value;
        memcpy(p,&i16,sizeof(i16));
        memrev16ifbe(p);
    } else if (encoding == ZIP_INT_24B) {
        i32 = value<<8;
        memrev32ifbe(&i32);
        memcpy(p,((uint8_t<em>)&i32)+1,sizeof(i32)-sizeof(uint8_t));
    } else if (encoding == ZIP_INT_32B) {
        i32 = value;
        memcpy(p,&i32,sizeof(i32));
        memrev32ifbe(p);
    } else if (encoding == ZIP_INT_64B) {
        i64 = value;
        memcpy(p,&i64,sizeof(i64));
        memrev64ifbe(p);
    } else if (encoding >= ZIP_INT_IMM_MIN && encoding <= ZIP_INT_IMM_MAX) {
        /</em> Nothing to do, the value is stored in the encoding itself. */
    } else {
        assert(NULL);
    }
}

[redis设计与实现][8]数据库

Redis数据库定义:

typedef struct redisDb {
    dict *dict;                 /* The keyspace for this DB */
    dict *expires;              /* Timeout of keys with a timeout set */
    dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP) */
    dict *ready_keys;           /* Blocked keys that received a PUSH */
    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
    int id;
    long long avg_ttl;          /* Average TTL, just for stats */
} redisDb;

dict

dict数组保存所有的数据库,Redis初始化的时候,默认会创建16个数据库

#define REDIS_DEFAULT_DBNUM     16

默认情况下,Redis客户端的目标数据库是0 号数据库,可以通过select命令切换。 注意,由于Redis缺少获取当前操作的数据库命令,使用select切换需要特别注意

读写数据库中的键值对的时候,Redis除了对键空间执行指定操作外,还有一些额外的操作:

  • 读取键之后(读和写操作都会先读取),记录键空间命中或不命中次数
  • 读取键之后,更新键的LRU
  • 读取时发现已经过期,会先删除过期键
  • 如果有客户端使用watch命令监视了key,会在修改后标记为dirty
  • 修改之后,会对dirty键计数器加1,用于持久化和复制
  • 如果开启了数据库通知,修改之后会发送相应通知
robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) {
    robj *o = lookupKeyRead(c->db, key);
    if (!o) addReply(c,reply);
    return o;
}
robj *lookupKeyRead(redisDb *db, robj *key) {
    robj *val;
 
    //查询是否已经过期 
    expireIfNeeded(db,key);
    val = lookupKey(db,key);
    if (val == NULL)
        server.stat_keyspace_misses++;
    else
        server.stat_keyspace_hits++;
    return val;
}
robj *lookupKey(redisDb *db, robj *key) {
    dictEntry *de = dictFind(db->dict,key->ptr);
    if (de) {
        robj *val = dictGetVal(de);
 
        /* Update the access time for the ageing algorithm.
         * Don’t do it if we have a saving child, as this will trigger
         * a copy on write madness. */
        if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
            //设置lru时间 
            val->lru = server.lruclock;
        return val;
    } else {
        return NULL;
    }
}

expires

通过exprire或者pexpire命令,可以设置键的TTL,如果键的TTL为0,会被自动删除。

expires字典保存了数据库中所有键的过期时间。

  • 过期字典的键是指向某个数据中的键对象
  • 过期字段的值是long long类型的整数,保存这个键的过期时间
    void expireCommand(redisClient *c) {
      expireGenericCommand(c,mstime(),UNIT_SECONDS);
    }
    void expireGenericCommand(redisClient *c, long long basetime, int unit) {
      robj *key = c->argv[1], *param = c->argv[2];
      long long when; /* unix time in milliseconds when the key will expire. */
     
      if (getLongLongFromObjectOrReply(c, param, &when, NULL) != REDIS_OK)
          return;
     
      if (unit == UNIT_SECONDS) when *= 1000;
      when += basetime;
     
      /* No key, return zero. */
      if (lookupKeyRead(c->db,key) == NULL{
          addReply(c,shared.czero);
          return;
      }
     
      /* EXPIRE with negative TTL, or EXPIREAT with a timestamp into the past
       * should never be executed as a DEL when load the AOF or in the context
       * of a slave instance.
       *
       * Instead we take the other branch of the IF statement setting an expire
       * (possibly in the past) and wait for an explicit DEL from the master. */
      if (when <= mstime() && !server.loading && !server.masterhost{
          robj *aux;
     
          redisAssertWithInfo(c,key,dbDelete(c->db,key));
          server.dirty++;
     
          /* Replicate/AOF this as an explicit DEL. */
          aux = createStringObject(DEL,3);
          rewriteClientCommandVector(c,2,aux,key);
          decrRefCount(aux);
          signalModifiedKey(c->db,key);
          notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,del,key,c->db->id);
          addReply(c, shared.cone);
          return;
      } else {
          //放到expires字典中 
          setExpire(c->db,key,when);
          addReply(c,shared.cone);
          signalModifiedKey(c->db,key);
          notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,expire,key,c->db->id);
          server.dirty++;
          return;
      }
    }

过期键删除策略

  • 惰性删除:每次执行命令前,都会调用expireIfNeeded函数检查是否过期,如果已经过期,改函数会删除过期键
  • 定时删除:定时执行activeExpireCycleTryExpire函数

    expireIfNeeded

    int expireIfNeeded(redisDb *db, robj *key) {
      mstime_t when = getExpire(db,key);
      mstime_t now;
     
      if (when < 0return 0/* No expire for this key */
     
      /* Don’t expire anything while loading. It will be done later. */
      if (server.loadingreturn 0;
     
      /* If we are in the context of a Lua script, we claim that time is
       * blocked to when the Lua script started. This way a key can expire
       * only the first time it is accessed and not in the middle of the
       * script execution, making propagation to slaves / AOF consistent.
       * See issue #1525 on Github for more information. */
      now = server.lua_caller ? server.lua_time_start : mstime();
     
      /* If we are running in the context of a slave, return ASAP:
       * the slave key expiration is controlled by the master that will
       * send us synthesized DEL operations for expired keys.
       *
       * Still we try to return the right information to the caller,
       * that is, 0 if we think the key should be still valid, 1 if
       * we think the key is expired at this time. */
      if (server.masterhost != NULLreturn now > when;
     
      /* Return when this key has not expired */
      if (now <= when) return 0;
     
      /* Delete the key */
      server.stat_expiredkeys++;
      propagateExpire(db,key);
      notifyKeyspaceEvent(REDIS_NOTIFY_EXPIRED,
          expired,key,db->id);
      return dbDelete(db,key);
    }

    activeExpireCycleTryExpire

    while (num–) {
      dictEntry *de;
      long long ttl;
     
      if ((de = dictGetRandomKey(db->expires)) == NULLbreak;
      ttl = dictGetSignedIntegerVal(de)-now;
      if (activeExpireCycleTryExpire(db,de,now)) expired++;
      if (ttl < 0) ttl = 0;
      ttl_sum += ttl;
      ttl_samples++;
    }

    AOF、RDB和复制功能对过期键的处理

  • 生成RDB文件时,已过期的键不会被保存到新的RDB文件中
  • 载入RDB文件:
    • 主服务器载入时,会忽略过期键
    • 从服务器载入时,都会被载入(但是很快会因为同步被覆盖)
  • AOF写入,已过期未删除的键没有影响,被删除后,会追加一条del命令
  • AOF重写,会对键进行检查,过期键不会保存到重写后的AOF文件
  • 复制:
    • 主服务器删除一个过期键后,会显式向所有从服务器发送DEL命令
    • 从服务器执行读命令,及时过期也不会删除,只有接受到主服务器DEL命令才会删除

netty4 UDP 通信

netty4 UDP 通信

netty4 UDP 通信的例子很少,官方代码里面只有https://github.com/netty/netty/tree/4.0/example/src/main/java/io/netty/example/qotm这一个例子。但是,因为需求是从原先TCP 方式修改成UDP,需要能够最大方式的利用原先的解码器、处理器,但是官方的例子里面, 客户端只有一个广播消息,服务端回复一个消息。这当然是无法满足真正的业务需求的。特别是这次的需求是将原有的TCP长连接方式发送心跳, 改成UDP方式。

复用客户端编解码器和handler

原先的解码器是为TCP 包写的,直接通过继承ReplayingDecoder,对bytebuf中的字节按照自定义的协议,转换成内部对象。 按照官方例子,基于UDP的解码器,都是直接将从DatagramPacket对象封装的UDP包中进行解码。 因此,最简单的编码复用方式,是在自己的ByteToMessage的解码器之前,放置一个MessageToMessage解码器,将DatagramPacket中的bytebuf提取出来。

public class UdpMessageToByteDecode extends MessageToMessageDecoder<DatagramPacket> {
    @Override
    protected void decode(ChannelHandlerContext ctxDatagramPacket msgList<Object> out) throws Exception {
        out.add(msg.content());
        msg.retain();
    }
}

这里特别注意两个问题:

  1. 在out中添加了msg的bytebuf之后,注意调用msg的retain方法,防止msg中的bytebuf提前释放抛出异常
  2. 这样写了之后,丢失了DatagramPacket中的sender,这样在后续的处理器中,无法直接向来源发送消息,所以这种方式基本只试用于客户端

除了解码器,编码器也需要有对应的改造。由于UDP包没有连接,没有办法在连接的时候指定一个目标地址。 所以解码器利用了channel的attribute map来让客户端能够随时切换服务端地址。

public class UdpByteToMessageEncode extends MessageToMessageEncoder<ByteBuf> {
    private static Log log = LogFactory.getLog(UdpByteToMessageEncode.class);
 
    public static final AttributeKey<InetSocketAddress> TARGET_ADDRESS = AttributeKey.valueOf(TARGET_ADDRESS);
 
    @Override
    protected void encode(ChannelHandlerContext ctxByteBuf msgList<Object> out) throws Exception {
        InetSocketAddress ip = ctx.channel().attr(TARGET_ADDRESS).get();
        if (ip == null{
            log.error(no server ip);
            return;
        }
 
        DatagramPacket packet = new DatagramPacket(msg, ip);
        msg.retain();
        out.add(packet);
    }
}

首先从原始编码器获得bytebuf,从channel的attribute中获取目标服务器地址,然后组装成最终发送的DatagramPacket。 这里要注意的是:

  1. 目标地址为了能在整个连接上下文中共享,需要保存在channel中,而不是context中
  2. 发送的bytebuf仍然需要调用retain来防止提前释放抛出异常

服务端数据应答

之前在客户端的编码解码器之前(后)加上简单封装,就可以正常的首发UDP包。因为客户端的业务逻辑比较简单,UDP包只包含了客户端心跳, 无需对服务端的应答包进行处理。而服务器端,则需要对应答包做出业务逻辑处理(这里大致处理是将心跳信息写入Redis中),并发送响应包。 因此,会发现一个严重的问题,原先处理TCP的流程,缺少了一个重要的信息,就是客户端地址。

为了解决这个问题,只能新建一个新的编码、解码器,并且继承原先的业务对象(请求、应答包)增加来源(目标)地址。但是由于UDP包没有TCP 包 粘连的问题,因此只需要继承MessageToMessageDecoder即可,不需要使用类似ReplayingDecoder这样复杂的解码器上层实现。

public class UdpRequestDecode extends MessageToMessageDecoder<DatagramPacket> {
    private static final Log log = LogFactory.getLog(UdpRequestDecode.class);
 
    @Override
    protected void decode(ChannelHandlerContext ctxDatagramPacket packetList<Object> out) throws Exception {
        try {
            UdpRequestData requestData = new UdpRequestData();
            ByteBuf content = packet.content();
            requestData.setEncode(content.readByte());
 
            ...
 
            requestData.setSender(packet.sender());
 
            out.add(requestData);
        } catch (Exception e) {
            log.error(String.format(error decode udp packet from [ %s ], ignore!, packet.sender().toString()));
        }
    }
}

编码器也是类似,需要在业务处理的handler中判断是否为UdpRequestData,确定是否需要在write的时候使用对应的UdpResponseData。

服务器端连接管理

服务器连接管理,主要用于记录服务器端链接数量和负载的记录,这些数据会被写入到Redis中,客户端将依据这些数据来挑选连接数量最少、 压力最小的服务器进行连接。之前使用的TCP协议,因此连接管理非常方便,服务器端维护一个ChannelGroup,在active和inactive的时候从ChannelGroup 中添加或者删除。

由于UDP是无连接的,无法通过handler的几个回调来判断连接状态,所以将逻辑改成了服务器端维护一个hashmap,客户端IP作为key, 最后一次udp包接收时间作为value。在处理业务逻辑的handler中,每当收到一个UDP包,就将这对数据放入到hashmap中。服务器端定时器在定时写入Redis 的时候,通过判断value是否过期,来移除这个键值对。通过这样的逻辑,来记录服务器端的连接数基本上是靠谱的。

同时,为了兼容原先的TCP方式心跳,需要将TCP方式的连接对应的时间戳,改成Long.MAX_VALUE,确保TCP连接只在inactive的时候才被移除。

@Override
protected void channelRead0(ChannelHandlerContext ctx, RequestData request) throws Exception {
    RequestContext requestContext = new RequestContext(request, ctx.channel());
    ResponseData response = DO_SOMETHING(requestContext);
 
    if (request instanceof UdpRequestData{
        UdpRequestData udpRequest = (UdpRequestData) request;
        // log current sender ip 
        Server.addClient(udpRequest.getSender().getAddress().getHostAddress(), System.currentTimeMillis());
 
        UdpResponseData udpResponseData = UdpResponseData.fromResponseData(response);
        udpResponseData.setRecipient(udpRequest.getSender());
        ctx.writeAndFlush(udpResponseData);
    } else {
        ctx.writeAndFlush(response);
    }
}
 
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    Channel channel = ctx.channel();
    SocketAddress remoteAddress = channel.remoteAddress();
    if (remoteAddress != null{
        InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
        // tcp never timeout 
        Server.addClient(inetSocketAddress.getAddress().getHostAddress(), Long.MAX_VALUE);
    }
    super.channelActive(ctx);
}
 
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    Channel channel = ctx.channel();
    SocketAddress remoteAddress = channel.remoteAddress();
    if (remoteAddress != null{
        InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
        Server.removeClient(inetSocketAddress.getAddress().getHostAddress());
    }
    super.channelInactive(ctx);
}

定时器处理逻辑大致为:

public ServerClusterInfo getServerClusterInfo() {
    ServerClusterInfo serverClusterInfo = new ServerClusterInfo();
    serverClusterInfo.setPort(this.serverClusterInfo.getPort());
    serverClusterInfo.setHost(this.serverClusterInfo.getHost());
 
    Set<String> ips = new HashSet<String>();
    long currentTs = System.currentTimeMillis();
    Iterator<Map.Entry<StringLong>> iter = ipMap.entrySet().iterator();
    while (iter.hasNext(){
        Map.Entry<StringLong> ipEntry = iter.next();
        if (currentTs - ipEntry.getValue() < TIMEOUT{
            ips.add(ipEntry.getKey());
        } else {
            iter.remove();
        }
    }
 
    serverClusterInfo.setCurChannel(ips.size());
    serverClusterInfo.setConnectIps(ips);
    serverClusterInfo.setCurConnect(ips.size());
    return serverClusterInfo;
}

客户端的超时重连

之前TCP版本的重连非常方便,当发现客户端连接服务器的channel isActive方法返回false的时候,直接重新获取服务器列表, 按照连接数排序,重新去连接一个连接数最低的服务器即可。

同样是因为UDP是无连接的,同时UDP也是不安全的,一个包发出去很有可能肉包子打狗——有去无回了。(当然,在内网环境下,网络环境还是应该乐观点的~) 处理类似的最简单办法,就是超时重连了。

之前的客户端,所有消息会被放到一个阻塞队列(因为心跳什么的都是由定时器线程产生的),由一个线程来完成从阻塞队列读取并发送的过程。 这次的修改,也只能从这里下手了。首先在客户端维护一个最后一个数据包返回的时间戳(为了线程安全神马的,直接用了AtomicLong类型)。

首先在初始化的时候,将这个值设置为0。确保第一次发送消息的时候,客户端会主动去“连接”服务端。为什么这里的“连接”是加上引号的呢? 原因很简答,还是那句话,UDP是无连接的。按照之前的设计,这个所谓的“连接”,只是简单的在channel的attribute map中设置了TARGET_ADDRESS这个变量而已。

Thread sendThread = new Thread() {
    public void run() {
        while (true{
            RequestData requestData;
            try {
                requestData = queue.take();
 
                long currentTimeMillis = System.currentTimeMillis();
 
                if (currentTimeMillis - lastResponse.get() > TIMEOUT{
                    boolean connect = BootStrapConnectUtil.connect(channelFuture, serverClusterInfos);
                    if(!connect) {
                        log.error(fail to connect to watch dog! ignore current package);
                        continue;
                    }
                }
 
                channelFuture.channel().writeAndFlush(requestData);
            } catch (Throwable e) {
                log.error(, e);
            }
 
        }
    }
};
 
public static boolean connect(ChannelFuture channelFuture, List<ServerClusterInfo> serverClusterInfos) {
    InetSocketAddress remoteAddress = channelFuture.channel().attr(UdpByteToMessageEncode.TARGET_ADDRESS).get();
    serverClusterInfos = sortServer(serverClusterInfos, remoteAddress);
    boolean result = false;
    for (ServerClusterInfo server : serverClusterInfos) {
        try {
            channelFuture.channel().attr(UdpByteToMessageEncode.TARGET_ADDRESS).set(server.getAddress());
            result = true;
            Client.getClient().setLastResponse(System.currentTimeMillis());
        } catch (Throwable ignored) {
        }
    }
    return result;
}

这里好多命名和流程,一看就不像是UDP协议的,的确,这些代码之前都走的是TCP协议~~

除此之外,当然是handler里面,要在每次收到服务器端响应包的时候,回写这个lastResponse了。

@Override
protected void channelRead0(ChannelHandlerContext ctx, ResponseData response) throws Exception {
    ResultEnum responseCode = ResultEnum.fromCode(response.getResult());
    if (responseCode != null{
        switch (responseCode) {
            case SUCCESS:// 处理成功 
                Client.getClient().setLastResponse(System.currentTimeMillis());
                break;
            default:
                // do nothing 
        }
    }
}
 
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.close();
    // 强行过期 
    AgentClient.getClient().setLastResponse(0l);
}

这里还加了一个,如果发生了异常,强行“关闭”连接。

这样,原先TCP方式的心跳,被改成了UDP方式。为了避免UDP协议的无连接、不可靠,整个流程中加了两个超时,确保了客户端、服务器端都能够和 以前一样正常的首发消息、断线重连等。

[redis设计与实现][7]基本数据结构——对象

Redis对基础数据类型进行了封装,构建出上层的对象系统,这个系统包含:字符串对象、列表对象、哈希对象、集合对象和有序集合对象。

Redis对象结构:

1
2
3
4
5
6
7
8
9
10
11
12
typedef struct redisObject {
//类型
unsigned type:4;
//编码
unsigned encoding:4;
//LRU时间
unsigned lru:REDIS_LRU_BITS; /* lru time (relative to server.lruclock) */
//引用计数
int refcount;
//底层实现数据结构的指针
void *ptr;
} robj;

相关宏定义:

1
2
3
#define REDIS_LRU_BITS 24
#define REDIS_LRU_CLOCK_MAX ((1<<REDIS_LRU_BITS)-1) /* Max value of obj->lru */
#define REDIS_LRU_CLOCK_RESOLUTION 1 /* LRU clock resolution in seconds */

Redis对象类型:(使用type命令查看)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* Object types */
#define REDIS_STRING 0
#define REDIS_LIST 1
#define REDIS_SET 2
#define REDIS_ZSET 3
#define REDIS_HASH 4

//对象编码类型:(使用object encoding命令)
#define REDIS_ENCODING_RAW 0     /* Raw representation */
#define REDIS_ENCODING_INT 1     /* Encoded as integer */
#define REDIS_ENCODING_HT 2      /* Encoded as hash table */
#define REDIS_ENCODING_ZIPMAP 3  /* Encoded as zipmap */
#define REDIS_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */
#define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define REDIS_ENCODING_INTSET 6  /* Encoded as intset */
#define REDIS_ENCODING_SKIPLIST 7  /* Encoded as skiplist */

不同类型和编码的对象:

类型 编码 对象
REDIS_STRING REDIS_ENCODING_INT 使用整数值实现的字符串对象
REDIS_STRING REDIS_ENCODING_EMBSTR 这货redis3.0引入的,不管
REDIS_STRING REDIS_ENCODING_RAW sds实现的字符串对象
REDIS_LIST REDIS_ENCODING_ZIPLIST 使用压缩列表实现的列表对象
REDIS_LIST REDIS_ENCODING_LINKEDLIST 使用双向链表实现的列表对象
REDIS_HASH REDIS_ENCODING_ZIPLIST 使用压缩列表实现的哈希对象
REDIS_HASH REDIS_ENCODING_HT 使用字典实现的哈希对象
REDIS_SET REDIS_ENCODING_INTSET 使用整数集合实现的集合对象
REDIS_SET REDIS_ENCODING_HT 使用字典实现的集合对象
REDIS_ZSET REDIS_ENCODING_ZIPLIST 使用压缩列表实现的有序集合对象
REDIS_ZSET REDIS_ENCODING_SKIPLIST 使用跳跃表想和字典实现的有序集合对象

字符串对象:
整数(long)被保存为int类型
浮点保存为字符串,浮点运算(INCRYBYFLOAT)redis先转成浮点,进行运算后再转回字符串

列表对象:
列表对象同时满足一下两个条件时,列表对象使用ziplist编码
列表对象保存的所有字符串元素的长度都小于64个字节(list-max-ziplist-value)
列表对象保存的元素数量小于512个(list-max-ziplist-entries)
不能满足条件的都需要使用linkedlist编码。

哈希对象:
使用压缩列表,键和值分别作为节点按顺序放入列表
使用ziplist的条件:
哈希对象保存的所有键值对的键和值的字符串长度小于64个字节(hash-max-ziplist-value)
哈希对象保存的键值对数量小于512个(hash-max-ziplist-entries)

集合对象:
hashtable编码,字典的每个键都是一个字符串对象,字典的值全部设置为NULL
使用intset编码条件:
集合对象保存的所有元素是整数值
集合对象保存的元素数量不超过512个(set-max-intset-entries)
不满足条件的集合对象需要使用hashtable编码

有序集合对象:
ziplist编码的有序集合,每个集合元素使用两个紧挨在一起的压缩节点列表来保存,第一个节点保存元素的成员,第二个节点保存元素的分值。
skiplist编码的有序集合,采用一个skiplist和一个hashtable实现。
使用ziplist编码条件:
有序集合保存的元素数量小于128个(zset-max-ziplist-entries)
有序集合保存的所有元素成员的长度都小于64字节(zset-max-ziplist-value)

对象共享:
Redis在初始化时,会创建1w个字符串对象(REDIS_SHARED_INTEGERS),包含整数0~9999,当需要使用这些对象的时候,会使用这些对象的引用(引用计数)而不新创建。

1
2
3
4
if (value >= 0 && value < REDIS_SHARED_INTEGERS) {
incrRefCount(shared.integers[value]);
o = shared.integers[value];
}

[redis设计与实现][5]基本数据结构——整数集合

整数集合(intset)用于集合键。当一个集合只包含整数值元素,并且数量不多的时候,会使用整数集合作为集合键的底层实现。相对于直接保存字符串,整数集合能够很好地节约内存,但是由于是数组保存,需要特别关注数组长度。

定义:(intset.h)

1
2
3
4
5
6
7
8
typedef struct intset {
//编码方式
    uint32_t encoding;
//集合包含的元素数量
    uint32_t length;
//保存元素的数组
    int8_t contents[];
} intset;

encoding:

  • INTSETENCINT16:content数组每一项都是一个int16_t类型的数值(有符号)
  • INTSETENCINT32:content数组每一项都是一个int32_t类型的数值
  • INTSETENCINT64:content数组每一项都是一个int64_t类型的数值

整数集合支持长度升级,但是不支持长度降级。当插入的最大长度超过当前编码方式容纳的最大值的时候,会对编码类型进行升级。但是如果删除了一个大数字,整体整数集合不会再进行降级。

intset *intsetNew(void);(创建一个新的整数集合)

1
2
3
4
5
6
7
8
intset *intsetNew(void) {
    intset *is = zmalloc(sizeof(intset));
//对于x86架构采用小端序,#define intrev32ifbe(v) (v)
// 初始化的时候编码为INTSET_ENC_INT16
    is->encoding = intrev32ifbe(INTSET_ENC_INT16);
    is->length = 0;
    return is;
}

intset *intsetAdd(intset *is, int64t value, uint8t *success);(添加到整数集合)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
//判断当前值需要的编码类型

    uint8_t valenc = _intsetValueEncoding(value);
    uint32_t pos;
    if (success) *success = 1;

    /* Upgrade encoding if necessary. If we need to upgrade, we know that
     * this value should be either appended (if > 0) or prepended (if < 0),
     * because it lies outside the range of existing values. */

//如果当前值的编码类型大于当前整数集合的编码,需要进行升级
    if (valenc > intrev32ifbe(is->encoding)) {
        /* This always succeeds, so we don't need to curry *success. */
        return intsetUpgradeAndAdd(is,value);
    } else {
        /* Abort if the value is already present in the set.
         * This call will populate "pos" with the right position to insert
         * the value when it cannot be found. */

//查找当前数值是否已经存在
        if (intsetSearch(is,value,&pos)) {
            if (success) *success = 0;
            return is;
        }

        is = intsetResize(is,intrev32ifbe(is->length)+1);
//移动插入点以后的元素,空出插入位置
        if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
    }

    _intsetSet(is,pos,value);
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    return is;
}
static uint8_t _intsetValueEncoding(int64_t v) {
    if (v < INT32_MIN || v > INT32_MAX)
        return INTSET_ENC_INT64;
    else if (v < INT16_MIN || v > INT16_MAX)
        return INTSET_ENC_INT32;
    else
        return INTSET_ENC_INT16;
}
//升级整数集合
static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
    uint8_t curenc = intrev32ifbe(is->encoding);
    uint8_t newenc = _intsetValueEncoding(value);
    int length = intrev32ifbe(is->length);
    int prepend = value < 0 ? 1 : 0;

    /* First set new encoding and resize */
//设置新的编码
    is->encoding = intrev32ifbe(newenc);
//重新分配整数集合大小
    is = intsetResize(is,intrev32ifbe(is->length)+1);

    /* Upgrade back-to-front so we don't overwrite values.
     * Note that the "prepend" variable is used to make sure we have an empty
     * space at either the beginning or the end of the intset. */

//倒着重新设置值,防止内存覆盖
    while(length--)
        _intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));

    /* Set the value at the beginning or the end. */
    if (prepend)
//插入的值小于0,放到最前面
        _intsetSet(is,0,value);
    else
//插入的值大于0,放到最后面
        _intsetSet(is,intrev32ifbe(is->length),value);
//修改整数集合长度
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    return is;
}
//重新分配整数集合大小
static intset *intsetResize(intset *is, uint32_t len) {
    uint32_t size = len*intrev32ifbe(is->encoding);
    is = zrealloc(is,sizeof(intset)+size);
    return is;
}
//获取指定位置的值
static int64_t _intsetGetEncoded(intset *is, int pos, uint8_t enc) {
    int64_t v64;
    int32_t v32;
    int16_t v16;

    if (enc == INTSET_ENC_INT64) {
        memcpy(&v64,((int64_t*)is->contents)+pos,sizeof(v64));
        memrev64ifbe(&v64);
        return v64;
    } else if (enc == INTSET_ENC_INT32) {
        memcpy(&v32,((int32_t*)is->contents)+pos,sizeof(v32));
        memrev32ifbe(&v32);
        return v32;
    } else {
        memcpy(&v16,((int16_t*)is->contents)+pos,sizeof(v16));
        memrev16ifbe(&v16);
        return v16;
    }
}
//插入到指定位置
static void _intsetSet(intset *is, int pos, int64_t value) {
    uint32_t encoding = intrev32ifbe(is->encoding);

    if (encoding == INTSET_ENC_INT64) {
        ((int64_t*)is->contents)[pos] = value;
        memrev64ifbe(((int64_t*)is->contents)+pos);
    } else if (encoding == INTSET_ENC_INT32) {
        ((int32_t*)is->contents)[pos] = value;
        memrev32ifbe(((int32_t*)is->contents)+pos);
    } else {
        ((int16_t*)is->contents)[pos] = value;
        memrev16ifbe(((int16_t*)is->contents)+pos);
    }
}
//查找数值是否存在(二分查找)
static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
    int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
    int64_t cur = -1;

    /* The value can never be found when the set is empty */
//为空直接退出
    if (intrev32ifbe(is->length) == 0) {
        if (pos) *pos = 0;
        return 0;
    } else {
        /* Check for the case where we know we cannot find the value,
         * but do know the insert position. */

//如果插入数值比最大的大或者比最小的小,直接退出,设置pos
        if (value > _intsetGet(is,intrev32ifbe(is->length)-1)) {
            if (pos) *pos = intrev32ifbe(is->length);
            return 0;
        } else if (value < _intsetGet(is,0)) {
            if (pos) *pos = 0;
            return 0;
        }
    }

//折半查找
    while(max >= min) {
        mid = ((unsigned int)min + (unsigned int)max) >> 1;
        cur = _intsetGet(is,mid);
        if (value > cur) {
            min = mid+1;
        } else if (value < cur) {
            max = mid-1;
        } else {
            break;
        }
    }

    if (value == cur) {
        if (pos) *pos = mid;
        return 1;
    } else {
        if (pos) *pos = min;
        return 0;
    }
}
//从指定位置开始移动到最尾
static void intsetMoveTail(intset *is, uint32_t from, uint32_t to) {
    void *src, *dst;
    uint32_t bytes = intrev32ifbe(is->length)-from;
    uint32_t encoding = intrev32ifbe(is->encoding);

    if (encoding == INTSET_ENC_INT64) {
        src = (int64_t*)is->contents+from;
        dst = (int64_t*)is->contents+to;
        bytes *= sizeof(int64_t);
    } else if (encoding == INTSET_ENC_INT32) {
        src = (int32_t*)is->contents+from;
        dst = (int32_t*)is->contents+to;
        bytes *= sizeof(int32_t);
    } else {
        src = (int16_t*)is->contents+from;
        dst = (int16_t*)is->contents+to;
        bytes *= sizeof(int16_t);
    }
    memmove(dst,src,bytes);
}
intset *intsetRemove(intset *is, int64_t value, int *success);(移除元素)

intset *intsetRemove(intset *is, int64_t value, int *success) {
    uint8_t valenc = _intsetValueEncoding(value);
    uint32_t pos;
    if (success) *success = 0;

//匹配当前编码并查找元素
    if (valenc <= intrev32ifbe(is->encoding) && intsetSearch(is,value,&pos)) {
        uint32_t len = intrev32ifbe(is->length);

        /* We know we can delete */
        if (success) *success = 1;

        /* Overwrite value with tail and update length */
//找到后,向前移动数组
        if (pos < (len-1)) intsetMoveTail(is,pos+1,pos);
//收缩数组
        is = intsetResize(is,len-1);
        is->length = intrev32ifbe(len-1);
    }
    return is;
}

[redis设计与实现][4]基本数据结构——跳跃表

Redis使用跳跃表和字典共同来实现有序集合键(sorted set)。

定义:
跳跃表节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
typedef struct zskiplistNode {
//成员对象
    robj *obj;
//分值
    double score;
//后退指针
    struct zskiplistNode *backward;
//层结构
    struct zskiplistLevel {
     //前进指针
        struct zskiplistNode *forward;
     //跨度
        unsigned int span;
    } level[];
} zskiplistNode;

跳跃表定义:

1
2
3
4
5
6
7
8
typedef struct zskiplist {
//跳跃表头、尾指针
    struct zskiplistNode *header, *tail;
//跳跃表长度
    unsigned long length;
//跳跃表层数
    int level;
} zskiplist;

zskiplist *zslCreate(void);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;

    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;
    zsl->length = 0;
//创建头节点,头节点永远有最高层
//#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
//初始化头节点的每一层
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}
zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
    zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
    zn->score = score;
    zn->obj = obj;
    return zn;
}

zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    redisAssert(!isnan(score));
    x = zsl->header;
//遍历查找当前score对应的插入位置
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
//累加跨度
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
//当前层要更新的节点
        update[i] = x;
    }
    /* we assume the key is not already inside, since we allow duplicated
     * scores, and the re-insertion of score and redis object should never
     * happen since the caller of zslInsert() should test in the hash table
     * if the element is already inside or not. */

//随机一个当前节点的层数
    level = zslRandomLevel();
//如果是最高层,需要修正
//在以前最高层以上的,跨度都修复为跳跃表原长度,前置节点都改为头结点
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
//跳跃表的总层数改成最新的层数
        zsl->level = level;
    }
    x = zslCreateNode(level,score,obj);
//每一层的链表构建
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here */
//当前节点每层跨度计算
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
//当前节点每层插入节点跨度修改
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

//修改当前节点的前置节点等
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}
int zslRandomLevel(void) {
    int level = 1;
//#define ZSKIPLIST_P 0.25      /* Skiplist P = 1/4 */
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

snoopy初探

snoopy是什么?刚了解这货的时候,是公司服务器上有snoopy的so无法加载的错误,然后是系统日志里面一堆日志,导致机器空间不足。官方说明是:
Snoopy is designed to aid a sysadmin by providing a log of commands executed.也就是说这货会监控服务器上的命令执行,并记录到syslog。

首先来看下这个功能是怎么被完成的。首先会发现,服务器上的/etc/ld.so.preload这个文件被修改,强制可执行程序加载之前加载snoopy的so:

1
2
#cat /etc/ld.so.preload
/usr/local/snoopy/lib/snoopy.so

关于ld.so.preload和LD_PRELOAD,可以参考man ld.so:

LD_PRELOAD

A whitespace-separated list of additional, user-specified, ELF shared libraries to be loaded before all others. This can be used to selectively override functions in other shared libraries. For set-user-ID/set-group-ID ELF binaries, only libraries in the standard search directories that are also set-user-ID will be loaded.

/etc/ld.so.preload

File containing a whitespace separated list of ELF shared libraries to be loaded before the program.

也就是说,snoopy.so会在所有ELF文件加载之前预加载,确保将一些系统调用被这货劫持。

如果在机器上执行

1
ls /notfound

会在系统日志中记录类似:

snoopy[25505]: [uid:0 sid:9701 tty:/dev/pts/15 cwd:/root filename:/bin/ls]: ls /notfound

这样的内容。通过ldd,可以看下一个命令加载的动态链接库:

1
2
3
4
5
6
7
8
9
10
11
12
#ldd /bin/ls
linux-vdso.so.1 =>  (0x00007fff99fff000)
/usr/local/snoopy/lib/snoopy.so (0x00007fc407938000)
librt.so.1 => /lib64/librt.so.1 (0x0000003e4f600000)
libacl.so.1 => /lib64/libacl.so.1 (0x0000003e50200000)
libselinux.so.1 => /lib64/libselinux.so.1 (0x0000003e4fa00000)
libc.so.6 => /lib64/libc.so.6 (0x0000003e4e200000)
libdl.so.2 => /lib64/libdl.so.2 (0x0000003e4e600000)
libpthread.so.0 => /lib64/libpthread.so.0 (0x0000003e4ea00000)
/lib64/ld-linux-x86-64.so.2 (0x0000003e4de00000)
libattr.so.1 => /lib64/libattr.so.1 (0x0000003e4f200000)
libsepol.so.1 => /lib64/libsepol.so.1 (0x0000003e4fe00000)

果然snoopy被最早加载了。

那么如果想不要snoopy加载,又有什么办法呢?

搜索了半天,没有看见Linux中有什么办法能够将设置在ld.so.preload中预先加载的so给卸载掉,但是有一个值得注意的是:LD_PRELOAD加载的优先级高于ld.so.preload。
我们能不能通过更早的加载被劫持的系统调用,来避免被劫持呢?答案是肯定的。

首先,我们可以猜测出来这货是不会劫持太多的系统调用,通过nm命令看下snoopy的符号表:

1
2
3
4
5
#nm /usr/local/snoopy/lib/snoopy.so
...
0000000000000890 T execv
0000000000000b00 T execve
...

这两个系统调用应该是非常熟悉的,函数定义在unistd.h中,主要用于创建新的进程,并加载另一个可执行程序。只要截获了这几个系统调用,就能知道要执行什么命令了。
我们还可以再用bash来验证下:

1
2
3
4
5
6
#strace bash -c "ls /abc"
...
connect(3, {sa_family=AF_FILE, path="/dev/log"...}, 110) = 0
sendto(3, "<86>Sep 23 04:34:56 snoopy[28052"..., 104, MSG_NOSIGNAL, NULL, 0) = 104
execve("/bin/ls", ["ls", "/abc"], [/* 29 vars */]) = 0
...

从这里可以看出两个地方,首先bash在执行ls的时候,的确是通过execve这个系统调用的;其次,这个系统调用已经被snoopy劫持了(向日志设备发送了数据)。

最后,通过代码也验证了这个猜想:代码在这里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int execv (const char *filename, char *const argv[]) {
    static int (*func)(const char *, char **);

    FN(func,int,"execv",(const char *, char **const));
    snoopy_log_syscall_execv(filename, argv);

    return (*func) (filename, (char **) argv);
}
int execve (const char *filename, char *const argv[], char *const envp[])
{
    static int (*func)(const char *, char **, char **);

    FN(func,int,"execve",(const char *, char **const, char **const));
    snoopy_log_syscall_execve(filename, argv, envp);

    return (*func) (filename, (char**) argv, (char **) envp);
}

知道了原理,想到绕开,就很简单了。首先,找到execve真实的提供者:从之前ls命令的动态链接库就可以看见大部分系统的系统调用,都在/lib64/libc.so.6中,用nm命令验证下:

1
2
3
4
5
6
7
#nm /lib64/libc.so.6 | grep execv
0000003e4e29acf0 t __GI_execvp
0000003e4e29a880 t __execve
0000003e4e29a980 T execv
0000003e4e29a880 W execve
0000003e4e29acf0 T execvp
0000003e4e29a8b0 T fexecve

因此,然后尝试执行:

1
LD_PRELOAD="/lib64/libc.so.6" bash -c "ls /abc"

再去查看系统日志,会发现只记录了bash -c “ls /abc”这个命令,真正执行的ls /abc这个命令没有记录。
通过strace也可以看见,在执行ls之前,没有了和系统日志交互的连接了。

stat(“/bin/ls”, {stmode=SIFREG|0755, st_size=91272, …}) = 0

access(“/bin/ls”, X_OK) = 0

access(“/bin/ls”, R_OK) = 0

rtsigaction(SIGINT, {SIGDFL, [], SARESTORER, 0x3e4e2302d0}, {SIGDFL, [], SA_RESTORER, 0x3e4e2302d0}, 8) = 0

rtsigaction(SIGQUIT, {SIGDFL, [], SARESTORER, 0x3e4e2302d0}, {0x1, [], SARESTORER, 0x3e4e2302d0}, 8) = 0

rtsigaction(SIGCHLD, {SIGDFL, [], SARESTORER, 0x3e4e2302d0}, {0x436360, [], SARESTORER, 0x3e4e2302d0}, 8) = 0

execve(“/bin/ls”, ["ls", "/abc"], [/* 30 vars */]) = 0

也就是说,如果你登陆服务器之后,执行了:

1
LD_PRELOAD="/lib64/libc.so.6" bash

用当前被snoopy劫持的bash再创建一个没有被snoopy劫持的bash,之后执行的所有命令,都不会再被记录。

gentoo vlc qt5环境下编译失败

之前在虚拟机里面安装了个gentoo,用来尝试安装kde5。在升级系统的时候,发现vlc一直编译失败(好像是phonon引入的)。查了下发现是vlc在编译的时候,发现了qt5,但是按照qt4的方式编译了,导致自身图形界面相关的类编译失败。
编译失败的信息:

1
2
3
4
5
6
7
make[6]: Entering directory '/var/tmp/portage/media-video/vlc-2.1.4/work/vlc-2.1.4/modules/gui/qt4'
../../../doltlibtool  --tag=CXX   --mode=compile x86_64-pc-linux-gnu-g++ -DHAVE_CONFIG_H -I. -I../../..  -DMODULE_NAME=$(p="libqt4_plugin_la-qt4.lo"; p="${p##*/}"; p="${p#lib}"; echo "${p%_plugin*}") -DMODULE_NAME_IS_$(p="libqt4_plugin_la-qt4.lo"; p="${p##*/}"; p="${p#lib}"; echo "${p%_plugin*}") -DMODULE_STRING=\"$(p="libqt4_plugin_la-qt4.lo"; p="${p##*/}"; p="${p#lib}"; echo "${p%_plugin*}")\" -D__PLUGIN__  -I../../../include -I../../../include  -I/usr/include/samba-4.0  -DQT_SHARED -I/usr/include/qt4 -I/usr/include/qt4/QtGui -I/usr/include/qt4 -I/usr/include/qt4/QtCore   -O2 -mprefer-avx128 -fomit-frame-pointer -pipe -march=bdver2 -mmmx -mno-3dnow -msse -msse2 -msse3 -mssse3 -msse4a -mcx16 -msahf -mno-movbe -maes -mno-sha -mpclmul -mpopcnt -mabm -mlwp -mfma -mfma4 -mxop -mbmi -mno-bmi2 -mtbm -mavx -mno-avx2 -msse4.2 -msse4.1 -mlzcnt -mno-rtm -mno-hle -mno-rdrnd -mf16c -mno-fsgsbase -mno-rdseed -mprfchw -mno-adx -mfxsr -mxsave -mno-xsaveopt -mno-avx512f -mno-avx512er -mno-avx512cd -mno-avx512pf -mno-prefetchwt1 --param l1-cache-size=16 --param l1-cache-line-size=64 --param l2-cache-size=2048 -mtune=bdver2 -fstack-protector-strong -Wall -Wextra -Wsign-compare -Wundef -Wpointer-arith -Wvolatile-register-var -fvisibility=hidden -c -o libqt4_plugin_la-qt4.lo `test -f 'qt4.cpp' || echo './'`qt4.cpp
In file included from dialogs/open.hpp:35:0,
                 from dialogs_provider.hpp:36,
                 from qt4.cpp:36:
./ui/open.h:14:29: fatal error: QtWidgets/QAction: No such file or directory
 #include <QtWidgets/QAction>

参考vlcgentoo的bug,发现已经有人提交了一个补丁。具体补丁文件在:这里

[redis设计与实现][3]基本数据结构——字典

Redis字典采用哈希表实现。
哈希表:

1
2
3
4
5
6
7
8
9
10
typedef struct dictht {
//哈希表数组
    dictEntry **table;
//哈希表大小
    unsigned long size;
//哈希表掩码,用于计算索引值,总是等于size - 1
    unsigned long size mask;
//已有的节点数量
    unsigned long used;
} dictht;

哈希表节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
typedef struct dictEntry {
//键
    void *key;
//值
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
//下一个哈希表节点
    struct dictEntry *next;
} dictEntry;

字典:

1
2
3
4
5
6
7
8
9
10
typedef struct dict {
//类型特定的函数
    dictType *type;
//私有数据
    void *privdata;
//哈希表
    dictht ht[2];
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */
    int iterators; /* number of iterators currently running */
} dict;

* type属性是一个指向dictType结构的指针,每个dictType结构保存了一簇用于操作特定类型键值对的函数
* privdata保存了需要传给类型特定函数的可选参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
typedef struct dictType {
//计算哈希值的函数
    unsigned int (*hashFunction)(const void *key);
//复制键的函数
    void *(*keyDup)(void *privdata, const void *key);
//复制值的函数
    void *(*valDup)(void *privdata, const void *obj);
//对比键的函数
    int (*keyCompare)(void *privdata, const void *key1, const void *key2);
//销毁键的函数
    void (*keyDestructor)(void *privdata, void *key);
//销毁值的函数
    void (*valDestructor)(void *privdata, void *obj);
} dictType;

ht属性包含两个项的数组。一般情况下只使用ht[0]哈希表,ht[1]哈希表只在对ht[0]进行rehash的时候才会使用。

哈希算法:
计算:
hash = dict->type->hashFunction(key)
index = hash & dict->ht[x].sizemask
当字典被用作数据库的底层实现,或者哈希键的底层实现时,Redis使用MurmurHash2(https://code.google.com/p/smhasher/wiki/MurmurHash2)算法计算键的哈希值。

int dictAdd(dict *d, void *key, void *val);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
int dictAdd(dict *d, void *key, void *val)
{
    dictEntry *entry = dictAddRaw(d,key);

    if (!entry) return DICT_ERR;
    dictSetVal(d, entry, val);
    return DICT_OK;
}
dictEntry *dictAddRaw(dict *d, void *key)
{
    int index;
    dictEntry *entry;
    dictht *ht;
//#define dictIsRehashing(d) ((d)->rehashidx != -1)
    if (dictIsRehashing(d)) _dictRehashStep(d);

    /* Get the index of the new element, or -1 if
     * the element already exists. */

    if ((index = _dictKeyIndex(d, key)) == -1)
        return NULL;

    /* Allocate the memory and store the new entry */
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    entry = zmalloc(sizeof(*entry));
    entry->next = ht->table[index];
    ht->table[index] = entry;
    ht->used++;

    /* Set the hash entry fields. */
    dictSetKey(d, entry, key);
    return entry;
}
static int _dictKeyIndex(dict *d, const void *key)
{
    unsigned int h, idx, table;
    dictEntry *he;

    /* Expand the hash table if needed */
    if (_dictExpandIfNeeded(d) == DICT_ERR)
        return -1;
    /* Compute the key hash value */
//#define dictHashKey(d, key) (d)->type->hashFunction(key)
    h = dictHashKey(d, key);
    for (table = 0; table <= 1; table++) {
        idx = h & d->ht[table].sizemask;
        /* Search if this slot does not already contain the given key */
        he = d->ht[table].table[idx];
        while(he) {
//比较key是否已经存在,已经存在返回-1
            if (dictCompareKeys(d, key, he->key))
                return -1;
            he = he->next;
        }
        if (!dictIsRehashing(d)) break;
    }
    return idx;
}
static int _dictExpandIfNeeded(dict *d)
{
    /* Incremental rehashing already in progress. Return. */
    if (dictIsRehashing(d)) return DICT_OK;

    /* If the hash table is empty expand it to the initial size. */
//#define DICT_HT_INITIAL_SIZE     4
    if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

    /* If we reached the 1:1 ratio, and we are allowed to resize the hash
     * table (global setting) or we should avoid it but the ratio between
     * elements/buckets is over the "safe" threshold, we resize doubling
     * the number of buckets. */

//static unsigned int dict_force_resize_ratio = 5;
/*
dict_can_resize设置:
void updateDictResizePolicy(void) {
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
        dictEnableResize();
    else
        dictDisableResize();
}
当有同步硬盘进程的时候改成不能扩充
*/

    if (d->ht[0].used >= d->ht[0].size &&
        (dict_can_resize ||
         d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
    {
        return dictExpand(d, d->ht[0].used*2);
    }
    return DICT_OK;
}
int dictExpand(dict *d, unsigned long size)
{
    dictht n; /* the new hash table */
    unsigned long realsize = _dictNextPower(size);

    /* the size is invalid if it is smaller than the number of
     * elements already inside the hash table */

    if (dictIsRehashing(d) || d->ht[0].used > size)
        return DICT_ERR;

    /* Allocate the new hash table and initialize all pointers to NULL */
    n.size = realsize;
    n.sizemask = realsize-1;
    n.table = zcalloc(realsize*sizeof(dictEntry*));
    n.used = 0;

    /* Is this the first initialization? If so it's not really a rehashing
     * we just set the first hash table so that it can accept keys. */

    if (d->ht[0].table == NULL) {
        d->ht[0] = n;
        return DICT_OK;
    }

    /* Prepare a second hash table for incremental rehashing */
    d->ht[1] = n;
    d->rehashidx = 0;
    return DICT_OK;
}
int dictReplace(dict *d, void *key, void *val);

/* Add an element, discarding the old if the key already exists.
 * Return 1 if the key was added from scratch, 0 if there was already an
 * element with such key and dictReplace() just performed a value update
 * operation. */

int dictReplace(dict *d, void *key, void *val)
{
    dictEntry *entry, auxentry;

    /* Try to add the element. If the key
     * does not exists dictAdd will suceed. */

    if (dictAdd(d, key, val) == DICT_OK)
        return 1;
    /* It already exists, get the entry */
    entry = dictFind(d, key);
    /* Set the new value and free the old one. Note that it is important
     * to do that in this order, as the value may just be exactly the same
     * as the previous one. In this context, think to reference counting,
     * you want to increment (set), and then decrement (free), and not the
     * reverse. */

    auxentry = *entry;
    dictSetVal(d, entry, val);
    dictFreeVal(d, &auxentry);
    return 0;
}
dictEntry *dictFind(dict *d, const void *key)
{
    dictEntry *he;
    unsigned int h, idx, table;

    if (d->ht[0].size == 0) return NULL; /* We don't have a table at all */
    if (dictIsRehashing(d)) _dictRehashStep(d);
    h = dictHashKey(d, key);
    for (table = 0; table <= 1; table++) {
        idx = h & d->ht[table].sizemask;
        he = d->ht[table].table[idx];
        while(he) {
            if (dictCompareKeys(d, key, he->key))
                return he;
            he = he->next;
        }
        if (!dictIsRehashing(d)) return NULL;
    }
    return NULL;
}

int dictRehash(dict *d, int n);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
int dictRehash(dict *d, int n) {
    if (!dictIsRehashing(d)) return 0;

    while(n--) {
        dictEntry *de, *nextde;

        /* Check if we already rehashed the whole table... */
//已经完成hash,释放ht[0]。将ht[0]指向ht[1]
        if (d->ht[0].used == 0) {
            zfree(d->ht[0].table);
            d->ht[0] = d->ht[1];
            _dictReset(&d->ht[1]);
            d->rehashidx = -1;
            return 0;
        }

        /* Note that rehashidx can't overflow as we are sure there are more
         * elements because ht[0].used != 0 */

        assert(d->ht[0].size > (unsigned long)d->rehashed);
//如果rehash索引为空,跳过
        while(d->ht[0].table[d->rehashidx] == NULL) d->rehashidx++;
        de = d->ht[0].table[d->rehashidx];
        /* Move all the keys in this bucket from the old to the new hash HT */
//移动一个桶里面的所有key到新的哈希表
        while(de) {
            unsigned int h;

            nextde = de->next;
            /* Get the index in the new hash table */
            h = dictHashKey(d, de->key) & d->ht[1].sizemask;
            de->next = d->ht[1].table[h];
            d->ht[1].table[h] = de;
            d->ht[0].used--;
            d->ht[1].used++;
            de = nextde;
        }
        d->ht[0].table[d->rehashidx] = NULL;
        d->rehashidx++;
    }
    return 1;
}

//为了防止占用太多的CPU时间,限制一次rehash的CPU时间
int dictRehashMilliseconds(dict *d, int ms) {
    long long start = timeInMilliseconds();
    int rehashes = 0;

    while(dictRehash(d,100)) {
        rehashes += 100;
        if (timeInMilliseconds()-start > ms) break;
    }
    return rehashes;
}

调用者(redis.c):每次尝试渐进式rehash执行1ms

1
2
3
4
5
6
7
8
9
10
11
12
13
int incrementallyRehash(int dbid) {
    /* Keys dictionary */
    if (dictIsRehashing(server.db[dbid].dict)) {
        dictRehashMilliseconds(server.db[dbid].dict,1);
        return 1; /* already used our millisecond for this loop... */
    }
    /* Expires */
    if (dictIsRehashing(server.db[dbid].expires)) {
        dictRehashMilliseconds(server.db[dbid].expires,1);
        return 1; /* already used our millisecond for this loop... */
    }
    return 0;
}

[redis设计与实现][2]基本数据结构——链表

链表在Redis中的应用:

* 列表键
* 发布与订阅
* 慢查询
* 监视器等

链表节点:

1
2
3
4
5
typedef struct listNode {
    struct listNode *prev;
    struct listNode *next;
    void *value;
} listNode;

链表结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
typedef struct list {
//头结点
    listNode *head;
//尾节点
    listNode *tail;
//节点复制函数
    void *(*dup)(void *ptr);
//节点释放函数
    void (*free)(void *ptr);
//节点对比函数
    int (*match)(void *ptr, void *key);
//链表长度
    unsigned long len;
} list;

Redis链表特性:

* 双向链表:prev,next指针
* 无环:终点节点指向NULL
* 带表头指针和表尾指针
* 带链表长度计数器
* 多态:数据采用void*指针,通过三个函数指针,来操作不同类型的值