coolEx

Today will be better

netty4 UDP 通信

netty4 UDP 通信

netty4 UDP 通信的例子很少,官方代码里面只有https://github.com/netty/netty/tree/4.0/example/src/main/java/io/netty/example/qotm这一个例子。但是,因为需求是从原先TCP 方式修改成UDP,需要能够最大方式的利用原先的解码器、处理器,但是官方的例子里面, 客户端只有一个广播消息,服务端回复一个消息。这当然是无法满足真正的业务需求的。特别是这次的需求是将原有的TCP长连接方式发送心跳, 改成UDP方式。

复用客户端编解码器和handler

原先的解码器是为TCP 包写的,直接通过继承ReplayingDecoder,对bytebuf中的字节按照自定义的协议,转换成内部对象。 按照官方例子,基于UDP的解码器,都是直接将从DatagramPacket对象封装的UDP包中进行解码。 因此,最简单的编码复用方式,是在自己的ByteToMessage的解码器之前,放置一个MessageToMessage解码器,将DatagramPacket中的bytebuf提取出来。

public class UdpMessageToByteDecode extends MessageToMessageDecoder<DatagramPacket> {
    @Override
    protected void decode(ChannelHandlerContext ctxDatagramPacket msgList<Object> out) throws Exception {
        out.add(msg.content());
        msg.retain();
    }
}

这里特别注意两个问题:

  1. 在out中添加了msg的bytebuf之后,注意调用msg的retain方法,防止msg中的bytebuf提前释放抛出异常
  2. 这样写了之后,丢失了DatagramPacket中的sender,这样在后续的处理器中,无法直接向来源发送消息,所以这种方式基本只试用于客户端

除了解码器,编码器也需要有对应的改造。由于UDP包没有连接,没有办法在连接的时候指定一个目标地址。 所以解码器利用了channel的attribute map来让客户端能够随时切换服务端地址。

public class UdpByteToMessageEncode extends MessageToMessageEncoder<ByteBuf> {
    private static Log log = LogFactory.getLog(UdpByteToMessageEncode.class);
 
    public static final AttributeKey<InetSocketAddress> TARGET_ADDRESS = AttributeKey.valueOf(TARGET_ADDRESS);
 
    @Override
    protected void encode(ChannelHandlerContext ctxByteBuf msgList<Object> out) throws Exception {
        InetSocketAddress ip = ctx.channel().attr(TARGET_ADDRESS).get();
        if (ip == null{
            log.error(no server ip);
            return;
        }
 
        DatagramPacket packet = new DatagramPacket(msg, ip);
        msg.retain();
        out.add(packet);
    }
}

首先从原始编码器获得bytebuf,从channel的attribute中获取目标服务器地址,然后组装成最终发送的DatagramPacket。 这里要注意的是:

  1. 目标地址为了能在整个连接上下文中共享,需要保存在channel中,而不是context中
  2. 发送的bytebuf仍然需要调用retain来防止提前释放抛出异常

服务端数据应答

之前在客户端的编码解码器之前(后)加上简单封装,就可以正常的首发UDP包。因为客户端的业务逻辑比较简单,UDP包只包含了客户端心跳, 无需对服务端的应答包进行处理。而服务器端,则需要对应答包做出业务逻辑处理(这里大致处理是将心跳信息写入Redis中),并发送响应包。 因此,会发现一个严重的问题,原先处理TCP的流程,缺少了一个重要的信息,就是客户端地址。

为了解决这个问题,只能新建一个新的编码、解码器,并且继承原先的业务对象(请求、应答包)增加来源(目标)地址。但是由于UDP包没有TCP 包 粘连的问题,因此只需要继承MessageToMessageDecoder即可,不需要使用类似ReplayingDecoder这样复杂的解码器上层实现。

public class UdpRequestDecode extends MessageToMessageDecoder<DatagramPacket> {
    private static final Log log = LogFactory.getLog(UdpRequestDecode.class);
 
    @Override
    protected void decode(ChannelHandlerContext ctxDatagramPacket packetList<Object> out) throws Exception {
        try {
            UdpRequestData requestData = new UdpRequestData();
            ByteBuf content = packet.content();
            requestData.setEncode(content.readByte());
 
            ...
 
            requestData.setSender(packet.sender());
 
            out.add(requestData);
        } catch (Exception e) {
            log.error(String.format(error decode udp packet from [ %s ], ignore!, packet.sender().toString()));
        }
    }
}

编码器也是类似,需要在业务处理的handler中判断是否为UdpRequestData,确定是否需要在write的时候使用对应的UdpResponseData。

服务器端连接管理

服务器连接管理,主要用于记录服务器端链接数量和负载的记录,这些数据会被写入到Redis中,客户端将依据这些数据来挑选连接数量最少、 压力最小的服务器进行连接。之前使用的TCP协议,因此连接管理非常方便,服务器端维护一个ChannelGroup,在active和inactive的时候从ChannelGroup 中添加或者删除。

由于UDP是无连接的,无法通过handler的几个回调来判断连接状态,所以将逻辑改成了服务器端维护一个hashmap,客户端IP作为key, 最后一次udp包接收时间作为value。在处理业务逻辑的handler中,每当收到一个UDP包,就将这对数据放入到hashmap中。服务器端定时器在定时写入Redis 的时候,通过判断value是否过期,来移除这个键值对。通过这样的逻辑,来记录服务器端的连接数基本上是靠谱的。

同时,为了兼容原先的TCP方式心跳,需要将TCP方式的连接对应的时间戳,改成Long.MAX_VALUE,确保TCP连接只在inactive的时候才被移除。

@Override
protected void channelRead0(ChannelHandlerContext ctx, RequestData request) throws Exception {
    RequestContext requestContext = new RequestContext(request, ctx.channel());
    ResponseData response = DO_SOMETHING(requestContext);
 
    if (request instanceof UdpRequestData{
        UdpRequestData udpRequest = (UdpRequestData) request;
        // log current sender ip 
        Server.addClient(udpRequest.getSender().getAddress().getHostAddress(), System.currentTimeMillis());
 
        UdpResponseData udpResponseData = UdpResponseData.fromResponseData(response);
        udpResponseData.setRecipient(udpRequest.getSender());
        ctx.writeAndFlush(udpResponseData);
    } else {
        ctx.writeAndFlush(response);
    }
}
 
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    Channel channel = ctx.channel();
    SocketAddress remoteAddress = channel.remoteAddress();
    if (remoteAddress != null{
        InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
        // tcp never timeout 
        Server.addClient(inetSocketAddress.getAddress().getHostAddress(), Long.MAX_VALUE);
    }
    super.channelActive(ctx);
}
 
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    Channel channel = ctx.channel();
    SocketAddress remoteAddress = channel.remoteAddress();
    if (remoteAddress != null{
        InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
        Server.removeClient(inetSocketAddress.getAddress().getHostAddress());
    }
    super.channelInactive(ctx);
}

定时器处理逻辑大致为:

public ServerClusterInfo getServerClusterInfo() {
    ServerClusterInfo serverClusterInfo = new ServerClusterInfo();
    serverClusterInfo.setPort(this.serverClusterInfo.getPort());
    serverClusterInfo.setHost(this.serverClusterInfo.getHost());
 
    Set<String> ips = new HashSet<String>();
    long currentTs = System.currentTimeMillis();
    Iterator<Map.Entry<StringLong>> iter = ipMap.entrySet().iterator();
    while (iter.hasNext(){
        Map.Entry<StringLong> ipEntry = iter.next();
        if (currentTs - ipEntry.getValue() < TIMEOUT{
            ips.add(ipEntry.getKey());
        } else {
            iter.remove();
        }
    }
 
    serverClusterInfo.setCurChannel(ips.size());
    serverClusterInfo.setConnectIps(ips);
    serverClusterInfo.setCurConnect(ips.size());
    return serverClusterInfo;
}

客户端的超时重连

之前TCP版本的重连非常方便,当发现客户端连接服务器的channel isActive方法返回false的时候,直接重新获取服务器列表, 按照连接数排序,重新去连接一个连接数最低的服务器即可。

同样是因为UDP是无连接的,同时UDP也是不安全的,一个包发出去很有可能肉包子打狗——有去无回了。(当然,在内网环境下,网络环境还是应该乐观点的~) 处理类似的最简单办法,就是超时重连了。

之前的客户端,所有消息会被放到一个阻塞队列(因为心跳什么的都是由定时器线程产生的),由一个线程来完成从阻塞队列读取并发送的过程。 这次的修改,也只能从这里下手了。首先在客户端维护一个最后一个数据包返回的时间戳(为了线程安全神马的,直接用了AtomicLong类型)。

首先在初始化的时候,将这个值设置为0。确保第一次发送消息的时候,客户端会主动去“连接”服务端。为什么这里的“连接”是加上引号的呢? 原因很简答,还是那句话,UDP是无连接的。按照之前的设计,这个所谓的“连接”,只是简单的在channel的attribute map中设置了TARGET_ADDRESS这个变量而已。

Thread sendThread = new Thread() {
    public void run() {
        while (true{
            RequestData requestData;
            try {
                requestData = queue.take();
 
                long currentTimeMillis = System.currentTimeMillis();
 
                if (currentTimeMillis - lastResponse.get() > TIMEOUT{
                    boolean connect = BootStrapConnectUtil.connect(channelFuture, serverClusterInfos);
                    if(!connect) {
                        log.error(fail to connect to watch dog! ignore current package);
                        continue;
                    }
                }
 
                channelFuture.channel().writeAndFlush(requestData);
            } catch (Throwable e) {
                log.error(, e);
            }
 
        }
    }
};
 
public static boolean connect(ChannelFuture channelFuture, List<ServerClusterInfo> serverClusterInfos) {
    InetSocketAddress remoteAddress = channelFuture.channel().attr(UdpByteToMessageEncode.TARGET_ADDRESS).get();
    serverClusterInfos = sortServer(serverClusterInfos, remoteAddress);
    boolean result = false;
    for (ServerClusterInfo server : serverClusterInfos) {
        try {
            channelFuture.channel().attr(UdpByteToMessageEncode.TARGET_ADDRESS).set(server.getAddress());
            result = true;
            Client.getClient().setLastResponse(System.currentTimeMillis());
        } catch (Throwable ignored) {
        }
    }
    return result;
}

这里好多命名和流程,一看就不像是UDP协议的,的确,这些代码之前都走的是TCP协议~~

除此之外,当然是handler里面,要在每次收到服务器端响应包的时候,回写这个lastResponse了。

@Override
protected void channelRead0(ChannelHandlerContext ctx, ResponseData response) throws Exception {
    ResultEnum responseCode = ResultEnum.fromCode(response.getResult());
    if (responseCode != null{
        switch (responseCode) {
            case SUCCESS:// 处理成功 
                Client.getClient().setLastResponse(System.currentTimeMillis());
                break;
            default:
                // do nothing 
        }
    }
}
 
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.close();
    // 强行过期 
    AgentClient.getClient().setLastResponse(0l);
}

这里还加了一个,如果发生了异常,强行“关闭”连接。

这样,原先TCP方式的心跳,被改成了UDP方式。为了避免UDP协议的无连接、不可靠,整个流程中加了两个超时,确保了客户端、服务器端都能够和 以前一样正常的首发消息、断线重连等。

[redis设计与实现][7]基本数据结构——对象

Redis对基础数据类型进行了封装,构建出上层的对象系统,这个系统包含:字符串对象、列表对象、哈希对象、集合对象和有序集合对象。

Redis对象结构:

1
2
3
4
5
6
7
8
9
10
11
12
typedef struct redisObject {
//类型
unsigned type:4;
//编码
unsigned encoding:4;
//LRU时间
unsigned lru:REDIS_LRU_BITS; /* lru time (relative to server.lruclock) */
//引用计数
int refcount;
//底层实现数据结构的指针
void *ptr;
} robj;

相关宏定义:

1
2
3
#define REDIS_LRU_BITS 24
#define REDIS_LRU_CLOCK_MAX ((1<<REDIS_LRU_BITS)-1) /* Max value of obj->lru */
#define REDIS_LRU_CLOCK_RESOLUTION 1 /* LRU clock resolution in seconds */

Redis对象类型:(使用type命令查看)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* Object types */
#define REDIS_STRING 0
#define REDIS_LIST 1
#define REDIS_SET 2
#define REDIS_ZSET 3
#define REDIS_HASH 4

//对象编码类型:(使用object encoding命令)
#define REDIS_ENCODING_RAW 0     /* Raw representation */
#define REDIS_ENCODING_INT 1     /* Encoded as integer */
#define REDIS_ENCODING_HT 2      /* Encoded as hash table */
#define REDIS_ENCODING_ZIPMAP 3  /* Encoded as zipmap */
#define REDIS_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */
#define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define REDIS_ENCODING_INTSET 6  /* Encoded as intset */
#define REDIS_ENCODING_SKIPLIST 7  /* Encoded as skiplist */

不同类型和编码的对象:

类型 编码 对象
REDIS_STRING REDIS_ENCODING_INT 使用整数值实现的字符串对象
REDIS_STRING REDIS_ENCODING_EMBSTR 这货redis3.0引入的,不管
REDIS_STRING REDIS_ENCODING_RAW sds实现的字符串对象
REDIS_LIST REDIS_ENCODING_ZIPLIST 使用压缩列表实现的列表对象
REDIS_LIST REDIS_ENCODING_LINKEDLIST 使用双向链表实现的列表对象
REDIS_HASH REDIS_ENCODING_ZIPLIST 使用压缩列表实现的哈希对象
REDIS_HASH REDIS_ENCODING_HT 使用字典实现的哈希对象
REDIS_SET REDIS_ENCODING_INTSET 使用整数集合实现的集合对象
REDIS_SET REDIS_ENCODING_HT 使用字典实现的集合对象
REDIS_ZSET REDIS_ENCODING_ZIPLIST 使用压缩列表实现的有序集合对象
REDIS_ZSET REDIS_ENCODING_SKIPLIST 使用跳跃表想和字典实现的有序集合对象

字符串对象:
整数(long)被保存为int类型
浮点保存为字符串,浮点运算(INCRYBYFLOAT)redis先转成浮点,进行运算后再转回字符串

列表对象:
列表对象同时满足一下两个条件时,列表对象使用ziplist编码
列表对象保存的所有字符串元素的长度都小于64个字节(list-max-ziplist-value)
列表对象保存的元素数量小于512个(list-max-ziplist-entries)
不能满足条件的都需要使用linkedlist编码。

哈希对象:
使用压缩列表,键和值分别作为节点按顺序放入列表
使用ziplist的条件:
哈希对象保存的所有键值对的键和值的字符串长度小于64个字节(hash-max-ziplist-value)
哈希对象保存的键值对数量小于512个(hash-max-ziplist-entries)

集合对象:
hashtable编码,字典的每个键都是一个字符串对象,字典的值全部设置为NULL
使用intset编码条件:
集合对象保存的所有元素是整数值
集合对象保存的元素数量不超过512个(set-max-intset-entries)
不满足条件的集合对象需要使用hashtable编码

有序集合对象:
ziplist编码的有序集合,每个集合元素使用两个紧挨在一起的压缩节点列表来保存,第一个节点保存元素的成员,第二个节点保存元素的分值。
skiplist编码的有序集合,采用一个skiplist和一个hashtable实现。
使用ziplist编码条件:
有序集合保存的元素数量小于128个(zset-max-ziplist-entries)
有序集合保存的所有元素成员的长度都小于64字节(zset-max-ziplist-value)

对象共享:
Redis在初始化时,会创建1w个字符串对象(REDIS_SHARED_INTEGERS),包含整数0~9999,当需要使用这些对象的时候,会使用这些对象的引用(引用计数)而不新创建。

1
2
3
4
if (value >= 0 && value < REDIS_SHARED_INTEGERS) {
incrRefCount(shared.integers[value]);
o = shared.integers[value];
}

[redis设计与实现][5]基本数据结构——整数集合

整数集合(intset)用于集合键。当一个集合只包含整数值元素,并且数量不多的时候,会使用整数集合作为集合键的底层实现。相对于直接保存字符串,整数集合能够很好地节约内存,但是由于是数组保存,需要特别关注数组长度。

定义:(intset.h)

1
2
3
4
5
6
7
8
typedef struct intset {
//编码方式
    uint32_t encoding;
//集合包含的元素数量
    uint32_t length;
//保存元素的数组
    int8_t contents[];
} intset;

encoding:

  • INTSETENCINT16:content数组每一项都是一个int16_t类型的数值(有符号)
  • INTSETENCINT32:content数组每一项都是一个int32_t类型的数值
  • INTSETENCINT64:content数组每一项都是一个int64_t类型的数值

整数集合支持长度升级,但是不支持长度降级。当插入的最大长度超过当前编码方式容纳的最大值的时候,会对编码类型进行升级。但是如果删除了一个大数字,整体整数集合不会再进行降级。

intset *intsetNew(void);(创建一个新的整数集合)

1
2
3
4
5
6
7
8
intset *intsetNew(void) {
    intset *is = zmalloc(sizeof(intset));
//对于x86架构采用小端序,#define intrev32ifbe(v) (v)
// 初始化的时候编码为INTSET_ENC_INT16
    is->encoding = intrev32ifbe(INTSET_ENC_INT16);
    is->length = 0;
    return is;
}

intset *intsetAdd(intset *is, int64t value, uint8t *success);(添加到整数集合)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
//判断当前值需要的编码类型

    uint8_t valenc = _intsetValueEncoding(value);
    uint32_t pos;
    if (success) *success = 1;

    /* Upgrade encoding if necessary. If we need to upgrade, we know that
     * this value should be either appended (if > 0) or prepended (if < 0),
     * because it lies outside the range of existing values. */

//如果当前值的编码类型大于当前整数集合的编码,需要进行升级
    if (valenc > intrev32ifbe(is->encoding)) {
        /* This always succeeds, so we don't need to curry *success. */
        return intsetUpgradeAndAdd(is,value);
    } else {
        /* Abort if the value is already present in the set.
         * This call will populate "pos" with the right position to insert
         * the value when it cannot be found. */

//查找当前数值是否已经存在
        if (intsetSearch(is,value,&pos)) {
            if (success) *success = 0;
            return is;
        }

        is = intsetResize(is,intrev32ifbe(is->length)+1);
//移动插入点以后的元素,空出插入位置
        if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
    }

    _intsetSet(is,pos,value);
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    return is;
}
static uint8_t _intsetValueEncoding(int64_t v) {
    if (v < INT32_MIN || v > INT32_MAX)
        return INTSET_ENC_INT64;
    else if (v < INT16_MIN || v > INT16_MAX)
        return INTSET_ENC_INT32;
    else
        return INTSET_ENC_INT16;
}
//升级整数集合
static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
    uint8_t curenc = intrev32ifbe(is->encoding);
    uint8_t newenc = _intsetValueEncoding(value);
    int length = intrev32ifbe(is->length);
    int prepend = value < 0 ? 1 : 0;

    /* First set new encoding and resize */
//设置新的编码
    is->encoding = intrev32ifbe(newenc);
//重新分配整数集合大小
    is = intsetResize(is,intrev32ifbe(is->length)+1);

    /* Upgrade back-to-front so we don't overwrite values.
     * Note that the "prepend" variable is used to make sure we have an empty
     * space at either the beginning or the end of the intset. */

//倒着重新设置值,防止内存覆盖
    while(length--)
        _intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));

    /* Set the value at the beginning or the end. */
    if (prepend)
//插入的值小于0,放到最前面
        _intsetSet(is,0,value);
    else
//插入的值大于0,放到最后面
        _intsetSet(is,intrev32ifbe(is->length),value);
//修改整数集合长度
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    return is;
}
//重新分配整数集合大小
static intset *intsetResize(intset *is, uint32_t len) {
    uint32_t size = len*intrev32ifbe(is->encoding);
    is = zrealloc(is,sizeof(intset)+size);
    return is;
}
//获取指定位置的值
static int64_t _intsetGetEncoded(intset *is, int pos, uint8_t enc) {
    int64_t v64;
    int32_t v32;
    int16_t v16;

    if (enc == INTSET_ENC_INT64) {
        memcpy(&v64,((int64_t*)is->contents)+pos,sizeof(v64));
        memrev64ifbe(&v64);
        return v64;
    } else if (enc == INTSET_ENC_INT32) {
        memcpy(&v32,((int32_t*)is->contents)+pos,sizeof(v32));
        memrev32ifbe(&v32);
        return v32;
    } else {
        memcpy(&v16,((int16_t*)is->contents)+pos,sizeof(v16));
        memrev16ifbe(&v16);
        return v16;
    }
}
//插入到指定位置
static void _intsetSet(intset *is, int pos, int64_t value) {
    uint32_t encoding = intrev32ifbe(is->encoding);

    if (encoding == INTSET_ENC_INT64) {
        ((int64_t*)is->contents)[pos] = value;
        memrev64ifbe(((int64_t*)is->contents)+pos);
    } else if (encoding == INTSET_ENC_INT32) {
        ((int32_t*)is->contents)[pos] = value;
        memrev32ifbe(((int32_t*)is->contents)+pos);
    } else {
        ((int16_t*)is->contents)[pos] = value;
        memrev16ifbe(((int16_t*)is->contents)+pos);
    }
}
//查找数值是否存在(二分查找)
static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
    int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
    int64_t cur = -1;

    /* The value can never be found when the set is empty */
//为空直接退出
    if (intrev32ifbe(is->length) == 0) {
        if (pos) *pos = 0;
        return 0;
    } else {
        /* Check for the case where we know we cannot find the value,
         * but do know the insert position. */

//如果插入数值比最大的大或者比最小的小,直接退出,设置pos
        if (value > _intsetGet(is,intrev32ifbe(is->length)-1)) {
            if (pos) *pos = intrev32ifbe(is->length);
            return 0;
        } else if (value < _intsetGet(is,0)) {
            if (pos) *pos = 0;
            return 0;
        }
    }

//折半查找
    while(max >= min) {
        mid = ((unsigned int)min + (unsigned int)max) >> 1;
        cur = _intsetGet(is,mid);
        if (value > cur) {
            min = mid+1;
        } else if (value < cur) {
            max = mid-1;
        } else {
            break;
        }
    }

    if (value == cur) {
        if (pos) *pos = mid;
        return 1;
    } else {
        if (pos) *pos = min;
        return 0;
    }
}
//从指定位置开始移动到最尾
static void intsetMoveTail(intset *is, uint32_t from, uint32_t to) {
    void *src, *dst;
    uint32_t bytes = intrev32ifbe(is->length)-from;
    uint32_t encoding = intrev32ifbe(is->encoding);

    if (encoding == INTSET_ENC_INT64) {
        src = (int64_t*)is->contents+from;
        dst = (int64_t*)is->contents+to;
        bytes *= sizeof(int64_t);
    } else if (encoding == INTSET_ENC_INT32) {
        src = (int32_t*)is->contents+from;
        dst = (int32_t*)is->contents+to;
        bytes *= sizeof(int32_t);
    } else {
        src = (int16_t*)is->contents+from;
        dst = (int16_t*)is->contents+to;
        bytes *= sizeof(int16_t);
    }
    memmove(dst,src,bytes);
}
intset *intsetRemove(intset *is, int64_t value, int *success);(移除元素)

intset *intsetRemove(intset *is, int64_t value, int *success) {
    uint8_t valenc = _intsetValueEncoding(value);
    uint32_t pos;
    if (success) *success = 0;

//匹配当前编码并查找元素
    if (valenc <= intrev32ifbe(is->encoding) && intsetSearch(is,value,&pos)) {
        uint32_t len = intrev32ifbe(is->length);

        /* We know we can delete */
        if (success) *success = 1;

        /* Overwrite value with tail and update length */
//找到后,向前移动数组
        if (pos < (len-1)) intsetMoveTail(is,pos+1,pos);
//收缩数组
        is = intsetResize(is,len-1);
        is->length = intrev32ifbe(len-1);
    }
    return is;
}

[redis设计与实现][4]基本数据结构——跳跃表

Redis使用跳跃表和字典共同来实现有序集合键(sorted set)。

定义:
跳跃表节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
typedef struct zskiplistNode {
//成员对象
    robj *obj;
//分值
    double score;
//后退指针
    struct zskiplistNode *backward;
//层结构
    struct zskiplistLevel {
     //前进指针
        struct zskiplistNode *forward;
     //跨度
        unsigned int span;
    } level[];
} zskiplistNode;

跳跃表定义:

1
2
3
4
5
6
7
8
typedef struct zskiplist {
//跳跃表头、尾指针
    struct zskiplistNode *header, *tail;
//跳跃表长度
    unsigned long length;
//跳跃表层数
    int level;
} zskiplist;

zskiplist *zslCreate(void);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;

    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;
    zsl->length = 0;
//创建头节点,头节点永远有最高层
//#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
//初始化头节点的每一层
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}
zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
    zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
    zn->score = score;
    zn->obj = obj;
    return zn;
}

zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    redisAssert(!isnan(score));
    x = zsl->header;
//遍历查找当前score对应的插入位置
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
//累加跨度
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
//当前层要更新的节点
        update[i] = x;
    }
    /* we assume the key is not already inside, since we allow duplicated
     * scores, and the re-insertion of score and redis object should never
     * happen since the caller of zslInsert() should test in the hash table
     * if the element is already inside or not. */

//随机一个当前节点的层数
    level = zslRandomLevel();
//如果是最高层,需要修正
//在以前最高层以上的,跨度都修复为跳跃表原长度,前置节点都改为头结点
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
//跳跃表的总层数改成最新的层数
        zsl->level = level;
    }
    x = zslCreateNode(level,score,obj);
//每一层的链表构建
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here */
//当前节点每层跨度计算
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
//当前节点每层插入节点跨度修改
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

//修改当前节点的前置节点等
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}
int zslRandomLevel(void) {
    int level = 1;
//#define ZSKIPLIST_P 0.25      /* Skiplist P = 1/4 */
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

snoopy初探

snoopy是什么?刚了解这货的时候,是公司服务器上有snoopy的so无法加载的错误,然后是系统日志里面一堆日志,导致机器空间不足。官方说明是:
Snoopy is designed to aid a sysadmin by providing a log of commands executed.也就是说这货会监控服务器上的命令执行,并记录到syslog。

首先来看下这个功能是怎么被完成的。首先会发现,服务器上的/etc/ld.so.preload这个文件被修改,强制可执行程序加载之前加载snoopy的so:

1
2
#cat /etc/ld.so.preload
/usr/local/snoopy/lib/snoopy.so

关于ld.so.preload和LD_PRELOAD,可以参考man ld.so:

LD_PRELOAD

A whitespace-separated list of additional, user-specified, ELF shared libraries to be loaded before all others. This can be used to selectively override functions in other shared libraries. For set-user-ID/set-group-ID ELF binaries, only libraries in the standard search directories that are also set-user-ID will be loaded.

/etc/ld.so.preload

File containing a whitespace separated list of ELF shared libraries to be loaded before the program.

也就是说,snoopy.so会在所有ELF文件加载之前预加载,确保将一些系统调用被这货劫持。

如果在机器上执行

1
ls /notfound

会在系统日志中记录类似:

snoopy[25505]: [uid:0 sid:9701 tty:/dev/pts/15 cwd:/root filename:/bin/ls]: ls /notfound

这样的内容。通过ldd,可以看下一个命令加载的动态链接库:

1
2
3
4
5
6
7
8
9
10
11
12
#ldd /bin/ls
linux-vdso.so.1 =>  (0x00007fff99fff000)
/usr/local/snoopy/lib/snoopy.so (0x00007fc407938000)
librt.so.1 => /lib64/librt.so.1 (0x0000003e4f600000)
libacl.so.1 => /lib64/libacl.so.1 (0x0000003e50200000)
libselinux.so.1 => /lib64/libselinux.so.1 (0x0000003e4fa00000)
libc.so.6 => /lib64/libc.so.6 (0x0000003e4e200000)
libdl.so.2 => /lib64/libdl.so.2 (0x0000003e4e600000)
libpthread.so.0 => /lib64/libpthread.so.0 (0x0000003e4ea00000)
/lib64/ld-linux-x86-64.so.2 (0x0000003e4de00000)
libattr.so.1 => /lib64/libattr.so.1 (0x0000003e4f200000)
libsepol.so.1 => /lib64/libsepol.so.1 (0x0000003e4fe00000)

果然snoopy被最早加载了。

那么如果想不要snoopy加载,又有什么办法呢?

搜索了半天,没有看见Linux中有什么办法能够将设置在ld.so.preload中预先加载的so给卸载掉,但是有一个值得注意的是:LD_PRELOAD加载的优先级高于ld.so.preload。
我们能不能通过更早的加载被劫持的系统调用,来避免被劫持呢?答案是肯定的。

首先,我们可以猜测出来这货是不会劫持太多的系统调用,通过nm命令看下snoopy的符号表:

1
2
3
4
5
#nm /usr/local/snoopy/lib/snoopy.so
...
0000000000000890 T execv
0000000000000b00 T execve
...

这两个系统调用应该是非常熟悉的,函数定义在unistd.h中,主要用于创建新的进程,并加载另一个可执行程序。只要截获了这几个系统调用,就能知道要执行什么命令了。
我们还可以再用bash来验证下:

1
2
3
4
5
6
#strace bash -c "ls /abc"
...
connect(3, {sa_family=AF_FILE, path="/dev/log"...}, 110) = 0
sendto(3, "<86>Sep 23 04:34:56 snoopy[28052"..., 104, MSG_NOSIGNAL, NULL, 0) = 104
execve("/bin/ls", ["ls", "/abc"], [/* 29 vars */]) = 0
...

从这里可以看出两个地方,首先bash在执行ls的时候,的确是通过execve这个系统调用的;其次,这个系统调用已经被snoopy劫持了(向日志设备发送了数据)。

最后,通过代码也验证了这个猜想:代码在这里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int execv (const char *filename, char *const argv[]) {
    static int (*func)(const char *, char **);

    FN(func,int,"execv",(const char *, char **const));
    snoopy_log_syscall_execv(filename, argv);

    return (*func) (filename, (char **) argv);
}
int execve (const char *filename, char *const argv[], char *const envp[])
{
    static int (*func)(const char *, char **, char **);

    FN(func,int,"execve",(const char *, char **const, char **const));
    snoopy_log_syscall_execve(filename, argv, envp);

    return (*func) (filename, (char**) argv, (char **) envp);
}

知道了原理,想到绕开,就很简单了。首先,找到execve真实的提供者:从之前ls命令的动态链接库就可以看见大部分系统的系统调用,都在/lib64/libc.so.6中,用nm命令验证下:

1
2
3
4
5
6
7
#nm /lib64/libc.so.6 | grep execv
0000003e4e29acf0 t __GI_execvp
0000003e4e29a880 t __execve
0000003e4e29a980 T execv
0000003e4e29a880 W execve
0000003e4e29acf0 T execvp
0000003e4e29a8b0 T fexecve

因此,然后尝试执行:

1
LD_PRELOAD="/lib64/libc.so.6" bash -c "ls /abc"

再去查看系统日志,会发现只记录了bash -c “ls /abc”这个命令,真正执行的ls /abc这个命令没有记录。
通过strace也可以看见,在执行ls之前,没有了和系统日志交互的连接了。

stat(“/bin/ls”, {stmode=SIFREG|0755, st_size=91272, …}) = 0

access(“/bin/ls”, X_OK) = 0

access(“/bin/ls”, R_OK) = 0

rtsigaction(SIGINT, {SIGDFL, [], SARESTORER, 0x3e4e2302d0}, {SIGDFL, [], SA_RESTORER, 0x3e4e2302d0}, 8) = 0

rtsigaction(SIGQUIT, {SIGDFL, [], SARESTORER, 0x3e4e2302d0}, {0x1, [], SARESTORER, 0x3e4e2302d0}, 8) = 0

rtsigaction(SIGCHLD, {SIGDFL, [], SARESTORER, 0x3e4e2302d0}, {0x436360, [], SARESTORER, 0x3e4e2302d0}, 8) = 0

execve(“/bin/ls”, ["ls", "/abc"], [/* 30 vars */]) = 0

也就是说,如果你登陆服务器之后,执行了:

1
LD_PRELOAD="/lib64/libc.so.6" bash

用当前被snoopy劫持的bash再创建一个没有被snoopy劫持的bash,之后执行的所有命令,都不会再被记录。

gentoo vlc qt5环境下编译失败

之前在虚拟机里面安装了个gentoo,用来尝试安装kde5。在升级系统的时候,发现vlc一直编译失败(好像是phonon引入的)。查了下发现是vlc在编译的时候,发现了qt5,但是按照qt4的方式编译了,导致自身图形界面相关的类编译失败。
编译失败的信息:

1
2
3
4
5
6
7
make[6]: Entering directory '/var/tmp/portage/media-video/vlc-2.1.4/work/vlc-2.1.4/modules/gui/qt4'
../../../doltlibtool  --tag=CXX   --mode=compile x86_64-pc-linux-gnu-g++ -DHAVE_CONFIG_H -I. -I../../..  -DMODULE_NAME=$(p="libqt4_plugin_la-qt4.lo"; p="${p##*/}"; p="${p#lib}"; echo "${p%_plugin*}") -DMODULE_NAME_IS_$(p="libqt4_plugin_la-qt4.lo"; p="${p##*/}"; p="${p#lib}"; echo "${p%_plugin*}") -DMODULE_STRING=\"$(p="libqt4_plugin_la-qt4.lo"; p="${p##*/}"; p="${p#lib}"; echo "${p%_plugin*}")\" -D__PLUGIN__  -I../../../include -I../../../include  -I/usr/include/samba-4.0  -DQT_SHARED -I/usr/include/qt4 -I/usr/include/qt4/QtGui -I/usr/include/qt4 -I/usr/include/qt4/QtCore   -O2 -mprefer-avx128 -fomit-frame-pointer -pipe -march=bdver2 -mmmx -mno-3dnow -msse -msse2 -msse3 -mssse3 -msse4a -mcx16 -msahf -mno-movbe -maes -mno-sha -mpclmul -mpopcnt -mabm -mlwp -mfma -mfma4 -mxop -mbmi -mno-bmi2 -mtbm -mavx -mno-avx2 -msse4.2 -msse4.1 -mlzcnt -mno-rtm -mno-hle -mno-rdrnd -mf16c -mno-fsgsbase -mno-rdseed -mprfchw -mno-adx -mfxsr -mxsave -mno-xsaveopt -mno-avx512f -mno-avx512er -mno-avx512cd -mno-avx512pf -mno-prefetchwt1 --param l1-cache-size=16 --param l1-cache-line-size=64 --param l2-cache-size=2048 -mtune=bdver2 -fstack-protector-strong -Wall -Wextra -Wsign-compare -Wundef -Wpointer-arith -Wvolatile-register-var -fvisibility=hidden -c -o libqt4_plugin_la-qt4.lo `test -f 'qt4.cpp' || echo './'`qt4.cpp
In file included from dialogs/open.hpp:35:0,
                 from dialogs_provider.hpp:36,
                 from qt4.cpp:36:
./ui/open.h:14:29: fatal error: QtWidgets/QAction: No such file or directory
 #include <QtWidgets/QAction>

参考vlcgentoo的bug,发现已经有人提交了一个补丁。具体补丁文件在:这里

[redis设计与实现][3]基本数据结构——字典

Redis字典采用哈希表实现。
哈希表:

1
2
3
4
5
6
7
8
9
10
typedef struct dictht {
//哈希表数组
    dictEntry **table;
//哈希表大小
    unsigned long size;
//哈希表掩码,用于计算索引值,总是等于size - 1
    unsigned long size mask;
//已有的节点数量
    unsigned long used;
} dictht;

哈希表节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
typedef struct dictEntry {
//键
    void *key;
//值
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
//下一个哈希表节点
    struct dictEntry *next;
} dictEntry;

字典:

1
2
3
4
5
6
7
8
9
10
typedef struct dict {
//类型特定的函数
    dictType *type;
//私有数据
    void *privdata;
//哈希表
    dictht ht[2];
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */
    int iterators; /* number of iterators currently running */
} dict;

* type属性是一个指向dictType结构的指针,每个dictType结构保存了一簇用于操作特定类型键值对的函数
* privdata保存了需要传给类型特定函数的可选参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
typedef struct dictType {
//计算哈希值的函数
    unsigned int (*hashFunction)(const void *key);
//复制键的函数
    void *(*keyDup)(void *privdata, const void *key);
//复制值的函数
    void *(*valDup)(void *privdata, const void *obj);
//对比键的函数
    int (*keyCompare)(void *privdata, const void *key1, const void *key2);
//销毁键的函数
    void (*keyDestructor)(void *privdata, void *key);
//销毁值的函数
    void (*valDestructor)(void *privdata, void *obj);
} dictType;

ht属性包含两个项的数组。一般情况下只使用ht[0]哈希表,ht[1]哈希表只在对ht[0]进行rehash的时候才会使用。

哈希算法:
计算:
hash = dict->type->hashFunction(key)
index = hash & dict->ht[x].sizemask
当字典被用作数据库的底层实现,或者哈希键的底层实现时,Redis使用MurmurHash2(https://code.google.com/p/smhasher/wiki/MurmurHash2)算法计算键的哈希值。

int dictAdd(dict *d, void *key, void *val);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
int dictAdd(dict *d, void *key, void *val)
{
    dictEntry *entry = dictAddRaw(d,key);

    if (!entry) return DICT_ERR;
    dictSetVal(d, entry, val);
    return DICT_OK;
}
dictEntry *dictAddRaw(dict *d, void *key)
{
    int index;
    dictEntry *entry;
    dictht *ht;
//#define dictIsRehashing(d) ((d)->rehashidx != -1)
    if (dictIsRehashing(d)) _dictRehashStep(d);

    /* Get the index of the new element, or -1 if
     * the element already exists. */

    if ((index = _dictKeyIndex(d, key)) == -1)
        return NULL;

    /* Allocate the memory and store the new entry */
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    entry = zmalloc(sizeof(*entry));
    entry->next = ht->table[index];
    ht->table[index] = entry;
    ht->used++;

    /* Set the hash entry fields. */
    dictSetKey(d, entry, key);
    return entry;
}
static int _dictKeyIndex(dict *d, const void *key)
{
    unsigned int h, idx, table;
    dictEntry *he;

    /* Expand the hash table if needed */
    if (_dictExpandIfNeeded(d) == DICT_ERR)
        return -1;
    /* Compute the key hash value */
//#define dictHashKey(d, key) (d)->type->hashFunction(key)
    h = dictHashKey(d, key);
    for (table = 0; table <= 1; table++) {
        idx = h & d->ht[table].sizemask;
        /* Search if this slot does not already contain the given key */
        he = d->ht[table].table[idx];
        while(he) {
//比较key是否已经存在,已经存在返回-1
            if (dictCompareKeys(d, key, he->key))
                return -1;
            he = he->next;
        }
        if (!dictIsRehashing(d)) break;
    }
    return idx;
}
static int _dictExpandIfNeeded(dict *d)
{
    /* Incremental rehashing already in progress. Return. */
    if (dictIsRehashing(d)) return DICT_OK;

    /* If the hash table is empty expand it to the initial size. */
//#define DICT_HT_INITIAL_SIZE     4
    if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

    /* If we reached the 1:1 ratio, and we are allowed to resize the hash
     * table (global setting) or we should avoid it but the ratio between
     * elements/buckets is over the "safe" threshold, we resize doubling
     * the number of buckets. */

//static unsigned int dict_force_resize_ratio = 5;
/*
dict_can_resize设置:
void updateDictResizePolicy(void) {
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
        dictEnableResize();
    else
        dictDisableResize();
}
当有同步硬盘进程的时候改成不能扩充
*/

    if (d->ht[0].used >= d->ht[0].size &&
        (dict_can_resize ||
         d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
    {
        return dictExpand(d, d->ht[0].used*2);
    }
    return DICT_OK;
}
int dictExpand(dict *d, unsigned long size)
{
    dictht n; /* the new hash table */
    unsigned long realsize = _dictNextPower(size);

    /* the size is invalid if it is smaller than the number of
     * elements already inside the hash table */

    if (dictIsRehashing(d) || d->ht[0].used > size)
        return DICT_ERR;

    /* Allocate the new hash table and initialize all pointers to NULL */
    n.size = realsize;
    n.sizemask = realsize-1;
    n.table = zcalloc(realsize*sizeof(dictEntry*));
    n.used = 0;

    /* Is this the first initialization? If so it's not really a rehashing
     * we just set the first hash table so that it can accept keys. */

    if (d->ht[0].table == NULL) {
        d->ht[0] = n;
        return DICT_OK;
    }

    /* Prepare a second hash table for incremental rehashing */
    d->ht[1] = n;
    d->rehashidx = 0;
    return DICT_OK;
}
int dictReplace(dict *d, void *key, void *val);

/* Add an element, discarding the old if the key already exists.
 * Return 1 if the key was added from scratch, 0 if there was already an
 * element with such key and dictReplace() just performed a value update
 * operation. */

int dictReplace(dict *d, void *key, void *val)
{
    dictEntry *entry, auxentry;

    /* Try to add the element. If the key
     * does not exists dictAdd will suceed. */

    if (dictAdd(d, key, val) == DICT_OK)
        return 1;
    /* It already exists, get the entry */
    entry = dictFind(d, key);
    /* Set the new value and free the old one. Note that it is important
     * to do that in this order, as the value may just be exactly the same
     * as the previous one. In this context, think to reference counting,
     * you want to increment (set), and then decrement (free), and not the
     * reverse. */

    auxentry = *entry;
    dictSetVal(d, entry, val);
    dictFreeVal(d, &auxentry);
    return 0;
}
dictEntry *dictFind(dict *d, const void *key)
{
    dictEntry *he;
    unsigned int h, idx, table;

    if (d->ht[0].size == 0) return NULL; /* We don't have a table at all */
    if (dictIsRehashing(d)) _dictRehashStep(d);
    h = dictHashKey(d, key);
    for (table = 0; table <= 1; table++) {
        idx = h & d->ht[table].sizemask;
        he = d->ht[table].table[idx];
        while(he) {
            if (dictCompareKeys(d, key, he->key))
                return he;
            he = he->next;
        }
        if (!dictIsRehashing(d)) return NULL;
    }
    return NULL;
}

int dictRehash(dict *d, int n);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
int dictRehash(dict *d, int n) {
    if (!dictIsRehashing(d)) return 0;

    while(n--) {
        dictEntry *de, *nextde;

        /* Check if we already rehashed the whole table... */
//已经完成hash,释放ht[0]。将ht[0]指向ht[1]
        if (d->ht[0].used == 0) {
            zfree(d->ht[0].table);
            d->ht[0] = d->ht[1];
            _dictReset(&d->ht[1]);
            d->rehashidx = -1;
            return 0;
        }

        /* Note that rehashidx can't overflow as we are sure there are more
         * elements because ht[0].used != 0 */

        assert(d->ht[0].size > (unsigned long)d->rehashed);
//如果rehash索引为空,跳过
        while(d->ht[0].table[d->rehashidx] == NULL) d->rehashidx++;
        de = d->ht[0].table[d->rehashidx];
        /* Move all the keys in this bucket from the old to the new hash HT */
//移动一个桶里面的所有key到新的哈希表
        while(de) {
            unsigned int h;

            nextde = de->next;
            /* Get the index in the new hash table */
            h = dictHashKey(d, de->key) & d->ht[1].sizemask;
            de->next = d->ht[1].table[h];
            d->ht[1].table[h] = de;
            d->ht[0].used--;
            d->ht[1].used++;
            de = nextde;
        }
        d->ht[0].table[d->rehashidx] = NULL;
        d->rehashidx++;
    }
    return 1;
}

//为了防止占用太多的CPU时间,限制一次rehash的CPU时间
int dictRehashMilliseconds(dict *d, int ms) {
    long long start = timeInMilliseconds();
    int rehashes = 0;

    while(dictRehash(d,100)) {
        rehashes += 100;
        if (timeInMilliseconds()-start > ms) break;
    }
    return rehashes;
}

调用者(redis.c):每次尝试渐进式rehash执行1ms

1
2
3
4
5
6
7
8
9
10
11
12
13
int incrementallyRehash(int dbid) {
    /* Keys dictionary */
    if (dictIsRehashing(server.db[dbid].dict)) {
        dictRehashMilliseconds(server.db[dbid].dict,1);
        return 1; /* already used our millisecond for this loop... */
    }
    /* Expires */
    if (dictIsRehashing(server.db[dbid].expires)) {
        dictRehashMilliseconds(server.db[dbid].expires,1);
        return 1; /* already used our millisecond for this loop... */
    }
    return 0;
}

[redis设计与实现][2]基本数据结构——链表

链表在Redis中的应用:

* 列表键
* 发布与订阅
* 慢查询
* 监视器等

链表节点:

1
2
3
4
5
typedef struct listNode {
    struct listNode *prev;
    struct listNode *next;
    void *value;
} listNode;

链表结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
typedef struct list {
//头结点
    listNode *head;
//尾节点
    listNode *tail;
//节点复制函数
    void *(*dup)(void *ptr);
//节点释放函数
    void (*free)(void *ptr);
//节点对比函数
    int (*match)(void *ptr, void *key);
//链表长度
    unsigned long len;
} list;

Redis链表特性:

* 双向链表:prev,next指针
* 无环:终点节点指向NULL
* 带表头指针和表尾指针
* 带链表长度计数器
* 多态:数据采用void*指针,通过三个函数指针,来操作不同类型的值

[redis设计与实现][1]基本数据结构——sds

SDS(Simple Dynamic String):对C字符串的封装,可修改、可自动伸缩的字符串实现。Redis默认的字符串实现。

SDS定义:(sds.h)

1
2
3
4
5
struct sdshdr {
unsigned int len;
unsigned int free;
char buf[];
};

与C字符串的区别:

* 常数复杂度获取字符串长度(字符串长度已经记录在结构体中)
* 杜绝缓冲区溢出(每次操作前都会检查空间是否充足,自动扩张和收缩)
* 减少修改字符串带来的内存重分配次数:
*
* 空间预分配(提前预留空间)
* 惰性空间释放(释放的空间暂时保留,防止扩张)
* 二进制安全(不采用\0表示结束)
* 兼容部分C字符串函数(buf数组多保存了一个\0,用于兼容部分C字符串函数)

API:
typedef char *sds;
创建一个字符串:sds sdsnew(const char *init);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
sds sdsnew(const char *init) {
size_t initlen = (init == NULL) ? 0 : strlen(init);
return sdsnewlen(init, initlen);
}
sds sdsnewlen(const void *init, size_t initlen) {
struct sdshdr *sh;
//根据sdshdr结构分配内存,多一个用来放\0
if (init) {
sh = zmalloc(sizeof(struct sdshdr)+initlen+1);
} else {
sh = zcalloc(sizeof(struct sdshdr)+initlen+1);
}
if (sh == NULL) return NULL;
sh->len = initlen;
sh->free = 0;
//初始字符串不为NULL,复制过去,然后最后补上\0
if (initlen && init)
memcpy(sh->buf, init, initlen);
sh->buf[initlen] = '\0';
return (char*)sh->buf;
}

拼接字符串:sds sdscat(sds s, const char *t);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
sds sdscat(sds s, const char *t) {
return sdscatlen(s, t, strlen(t));
}
sds sdscatlen(sds s, const void *t, size_t len) {
struct sdshdr *sh;
size_t curlen = sdslen(s);

s = sdsMakeRoomFor(s,len);
if (s == NULL) return NULL;
sh = (void*) (s-(sizeof(struct sdshdr)));
memcpy(s+curlen, t, len);
sh->len = curlen+len;
sh->free = sh->free-len;
s[curlen+len] = '\0';
return s;
}
sds sdsMakeRoomFor(sds s, size_t addlen) {
struct sdshdr *sh, *newsh;
size_t free = sdsavail(s);
size_t len, newlen;

//仍然有空闲,直接返回
if (free >= addlen) return s;
len = sdslen(s);
sh = (void*) (s-(sizeof(struct sdshdr)));
newlen = (len+addlen);
//新的空间比最大分配空间小,扩容两倍
//#define SDS_MAX_PREALLOC (1024*1024)
if (newlen < SDS_MAX_PREALLOC) newlen *= 2; else newlen += SDS_MAX_PREALLOC; //重新分配空间:sdshdr+字符串长度+1(\0) newsh = zrealloc(sh, sizeof(struct sdshdr)+newlen+1); if (newsh == NULL) return NULL; newsh->free = newlen - len;
return newsh->buf;
}
static inline size_t sdsavail(const sds s) {
struct sdshdr *sh = (void*)(s-(sizeof(struct sdshdr)));
return sh->free;
}

mac gentoo-prefix安装git svn

之前参照yegal的文章在mac上安装了gentoo-prefix。但是在emerge git的时候,会发现如果增加了subversion这个USE,就会编译失败。

从编译失败的错误上,可以看出,编译失败的来源是svn相关的代码,然后错误是链接的时候提示一些符号找不到:

1
2
3
4
5
6
7
8
Undefined symbols for architecture x86_64:
  "_libintl_ngettext", referenced from:
      _show_date_relative in libgit.a(date.o)
  "_libintl_gettext", referenced from:
      _show_date_relative in libgit.a(date.o)
      _warn_on_inaccessible in libgit.a(wrapper.o)
      _xgetpwuid_self in libgit.a(wrapper.o)
ld: symbol(s) not found for architecture x86_64

大致可以看出,是intl相关的库没有链接。在gentoo的bugzilla上也查到了类似的bug。按照附件提供的补丁,需要判断当前系统为mac的时候,增加-lintl,以链接intl这个库。

除了这个库之外,还有一个iconv相关的符号找不到。bug里面没有描述。在另一台gentoo的机器上用e-file查询了之后,发现原生linux的iconv是由glibc提供的。但是prefix是不能自由安装glibc的库的。但是系统里面已经安装了dev-libs/libiconv这个包,提供了iconv相关的库。因此和前面一样,需要手工在链接的时候增加-liconv。

最后修改完的ebuild文件大致为:

1
2
3
4
5
6
7
8
9
10
--- git-1.9.2.ebuild.old    2014-04-20 15:10:34.000000000 +0800
+++ git-1.9.2.ebuild    2014-04-20 15:09:54.000000000 +0800
@@ -324,6 +324,7 @@
 
    if use subversion ; then
        cd "${S}"/contrib/svn-fe
+       [[ ${CHOST} = *-darwin* ]] && EXTLIBS="${EXTLIBS} -lintl -liconv"
        git_emake EXTLIBS="${EXTLIBS}" || die "emake svn-fe failed"
        if use doc ; then
            git_emake svn-fe.{1,html} || die "emake svn-fe.1 svn-fe.html failed"