coolEx

Today will be better

springboot和mybatis结合

springboot和mybatis结合

依赖和数据源配置

springboot依赖了spring4,需要依赖mybatis-spring,最新版本是1.2.2。
数据源相关的依赖:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<!-- datasource -->
<dependency>
    <groupId>com.zaxxer</groupId>
    <artifactId>HikariCP-java6</artifactId>
    <version>${HikariCP.version}</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>${mysql-connector-java.version}</version>
</dependency>
<dependency>
    <groupId>org.mybatis</groupId>
    <artifactId>mybatis</artifactId>
    <version>${mybatis.version}</version>
</dependency>
<dependency>
    <groupId>org.mybatis</groupId>
    <artifactId>mybatis-spring</artifactId>
    <version>${mybatis-spring.version}</version>
</dependency>

前两个是数据源的依赖,包括HikariCP和mysql驱动。后面两个是mybatis依赖,包括mybatis本身和mybatis-spring模块。

有了这些依赖之后,就可以通过spring4的配置类,对mybatis数据源等进行配置。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Configuration
@PropertySource("classpath:datasource.properties")
@MapperScan(basePackages="xxx.repository", sqlSessionFactoryRef = "sqlSessionFactory")
public class DatasourceConfig {
    @Autowired
    private Environment env;

    @Bean
    public DataSource dataSource() {
        HikariConfig config = new HikariConfig();
        config.setDriverClassName("com.mysql.jdbc.Driver");
        config.setAutoCommit(false);
        config.setJdbcUrl(env.getProperty("xxx.db.url"));
        config.setUsername(env.getProperty("xxx.db.username"));
        config.setPassword(env.getProperty("xxx.db.password"));

        return new HikariDataSource(config);
    }

    @Bean
    public DataSourceTransactionManager transactionManager() {
        return new DataSourceTransactionManager(dataSource());
    }

    @Bean
    public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
        final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
        sessionFactory.setDataSource(dataSource);
        sessionFactory.setTypeAliasesPackage("xxx.mybatis");
        return sessionFactory.getObject();
    }

}

首先引入配置文件,并且注入到env对象中。env类似System的properties对象,封装了配置文件中的key value。
然后通过MapperScan注解定义mapper接口包路径。这里同时定义了sqlSessionFactoryRef,是因为需要用到多数据源,
防止spring无法注入,后面会提到。

之后代码就可以开始定义输出的bean。一个是datasource,直接初始化一个Hikari的数据源,springboot提供了builder类,
但是查看源码和api之后,DataSourceBuilder无法配置autocommit属性。

再下面是事务管理,需要通过构造函数注入dataSource。最后一个是mybatis的sqlSessionFactory,主要也是注入一个数据源。

mapper(DAO)实现

dao实现和原先的ibatis差不多,但是mybatis可以通过注解的形式直接生成动态sql。既然springboot用了代码来取代xml,mybatis
中也同样去掉了xml。

插入

插入操作需要注意两个地方,一个是如何返回插入之后的主键(mysql),一个是如何使用数据类型的handler。
首先看代码:


1
2
3
4
5
6
7
@Insert("INSERT INTO aegis_cron_timer " +
            "(id, gmt_create, gmt_modified, name, expression, event_class_name, description, last_trigger_time, status, parameter) " +
            "VALUES (NULL, now(), now(), #{name:VARCHAR}, #{expression:VARCHAR}, " +
            "#{eventClassName:VARCHAR}, #{description:VARCHAR}, now(), #{status:VARCHAR}, " +
            "#{parameter,typeHandler=com.alibaba.aegis.seawater.cron.service.dao.mybatis.MapToJsonTypeHandler})")
@SelectKey(before = false, statement = "SELECT LAST_INSERT_ID()", keyProperty = "id", resultType = java.lang.Long.class)
public Long insertCronTimer(CronTimer cronTimer);

针对mysql,可以通过SelectKey这个注解,设置插入后主键的返回。由于mysql是自增主键,所以设置为插入后执行,定义返回的类型为long
(数据库中定义了bigint)。

另外,这里有个字段需要从map序列化成json字符串,作为varchar类型存放到数据库中。在插入的sql中,可以直接在变量后面定义typeHandler,
值是对应handler的完整类名。

更新

更新操作比较简单,直接使用Update注解即可。和插入类似,如果需要指定type handler,直接在字段后面增加参数即可。更新函数可以返回一个int值,
表示本次更新的行数。

查询

查询通过Select注解完成,mybatis可以直接通过字段名字和查询结果的java bean之间做自动关联。如果名字不匹配,有两种方式,一种是通过sql中
增加AS关键字转成java bean中的字段名,一种是通过@Result注解指定二者的映射关系。


1
2
3
4
5
6
7
@Select("SELECT name, expression, event_class_name AS eventClassName, description, status, parameter " +
            "FROM aegis_cron_timer " +
            "WHERE status = 'ENABLE'")
@Results({
        @Result(column = "parameter", jdbcType = JdbcType.VARCHAR, property = "parameter", typeHandler = MapToJsonTypeHandler.class)
})
public List<CronTimer> listAllAvailableCronTimer();

这里通过Result注解配置了type handler,特别注意Result注解必须在Results注解中,不然不会生效。

自定义type handler

前文已经提到了如何在插入、更新、查询语句中使用type handler,type handler实现也比较简单。mybatis自带的type handler都是通过extends
BaseTypeHandler来实现的,但例子中直接实现了TypeHandler接口:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@MappedTypes(Map.class)
@MappedJdbcTypes(JdbcType.VARCHAR)
public class MapToJsonTypeHandler implements TypeHandler<Map<String, Object>> {

    @Override
    public void setParameter(PreparedStatement ps, int i, Map<String, Object> parameter, JdbcType jdbcType) throws SQLException {
        ps.setString(i, JSON.toJSONString(parameter));
    }

    @Override
    public Map<String, Object> getResult(ResultSet rs, String columnName) throws SQLException {
        String value = rs.getString(columnName);
        return jsonToMap(value);
    }

    @Override
    public Map<String, Object> getResult(ResultSet rs, int columnIndex) throws SQLException {
        String value = rs.getString(columnIndex);
        return jsonToMap(value);
    }

    @Override
    public Map<String, Object> getResult(CallableStatement cs, int columnIndex) throws SQLException {
        String value = cs.getString(columnIndex);
        return jsonToMap(value);
    }

    private Map<String,Object> jsonToMap(String value) {
        if (StringUtils.isBlank(value)) {
            return Collections.emptyMap();
        } else {
            return JSON.parseObject(value, new TypeReference<Map<String, Object>>() {
            });
        }
    }
}

实现比较简单,序列化的时候直接通过fastjson将map对象转成json string,放到PreparedStatement中。反序列化的时候返回来转成Map即可。

多数据源实现

由于项目需要从老的数据库迁移到新的数据库,所以需要两个数据源,在设置多数据源的时候也踩了很多坑。

另一个数据源配置类:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Configuration
@PropertySource("classpath:amon-datasource.properties")
@MapperScan(basePackages="com.alibaba.aegis.seawater.cron.migrate.repository",
        sqlSessionFactoryRef = "amonSqlSessionFactory", sqlSessionTemplateRef = "amonSqlSessionTemplate")
public class AmonDataSourceConfig {
    @Autowired
    private Environment env;

    @Bean(name = "amonDataSource")
    public DataSource amonDataSource() {
        HikariConfig config = new HikariConfig();
        config.setDriverClassName("com.mysql.jdbc.Driver");
        config.setAutoCommit(true);
        config.setJdbcUrl(env.getProperty("amon.db.url"));
        config.setUsername(env.getProperty("amon.db.username"));
        config.setPassword(env.getProperty("amon.db.password"));

        return new HikariDataSource(config);
    }

    @Bean(name = "amonTransactionManager")
    public DataSourceTransactionManager amonTransactionManager(@Qualifier("amonDataSource")DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

    @Bean(name = "amonSqlSessionFactory")
    public SqlSessionFactory amonSqlSessionFactory(@Qualifier("amonDataSource")DataSource dataSource) throws Exception {
        final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
        sessionFactory.setDataSource(dataSource);
        return sessionFactory.getObject();
    }

    @Bean(name = "amonSqlSessionTemplate")
    public SqlSessionTemplate amonSqlSessionTemplate(@Qualifier("amonSqlSessionFactory")SqlSessionFactory sqlSessionFactory) throws Exception {
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}

这里也定义了一个配置文件,需要注意的是不要和前面一个的key相同,不然会覆盖的。定义bean的时候需要设置下name,或者函数名字改了也行。
需要定义的bean和之前的一样,特别注意MapperScan注解需要修改sqlSessionFactoryRef或者sqlSessionTemplateRef。这里两个都改了但是启动的时候会
提示:

Cannot use both: sqlSessionTemplate and sqlSessionFactory together. sqlSessionFactory is ignored.

这边定义了bean之后,直接使用就没有问题了。唯一需要特别注意的是@Transactional注解,由于定义了两个transactionManager,
无法通过类型来注入事务管理器了,需要注解中特别指定。比如使用前面定义的数据源的事物管理器,需要改成:


1
@Transactional("transactionManager")

这样spring可以通过名字注入bean。

DAO测试

为了方便测试,对应测试类中,重新覆盖了dataSource,采用h2这种内存数据库,解决单元测试数据干扰。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Configuration
@MapperScan(basePackages="com.alibaba.aegis.seawater.cron.repository")
public class TestDatasourceConfig extends DatasourceConfig {
    @Autowired
    private Environment env;

    @Bean
    public DataSource dataSource() {
        return new EmbeddedDatabaseBuilder()
                .setType(EmbeddedDatabaseType.H2)
                .setName("cron")
                .addScript("h2.sql")
                .build();
    }

}

这里直接通过springboot提供的EmbeddedDatabaseBuilder来创建一个h2的数据库,并添加初始化数据库schema的sql文件。
这里需要注意的是,如果这个sql文件直接叫schema.sql,之前mysql数据源在执行的时候也会去执行,因此这里没有使用默认的名字。

其他坑

在springboot注入properties文件中配置的时候,还遇到一个恶心的问题,除了PropertySource注解指定的properties文件之外,
spring还会默认带上jvm变量、系统环境变量。刚开始直接把数据库用户名字段的key写成了username,结果由于测试服务器上使用了sudo
命令,sudo在切换用户的同时设置了USERNAME这个环境变量标识原始执行用户,导致springboot一直在注入这个值,调试了很久。

svn co时排除目录

某些原因想在svn co的时候排除某些目录,可以绕个圈子,分三步来完成:

co外层目录:


1
svn checkout --depth empty $URL [$LOCATION]

完成之后,会有一个只包含空目录的根目录

设置忽略目录:


1
2
cd $LOCATION
svn up --set-depth exclude <$DIR_TO_EXCLUDE>

这样svn会提示被忽略的目录标记为D,然后文件系统上也看不见了

更新剩余文件


1
svn up --set-depth infinity *

这样其他目录会被重新递归的更新,被忽略的目录不会更新,即使后面有人执行svn up,也无法更新被忽略的文件。

nanomsg实验——survey

nanomsg实验——survey

survey模式是由server发出询问,client针对请求回复响应的一种模式。这种模式在分布式系统中非常有用,
可以用来做服务发现、分布式事物等分布式询问。

客户端

客户端实现比较方便,除了基础调用(创建socket、连接url)之外,就是先接收服务端询问
(例子中比较简单,服务端询问是固定的,所以没有对内容进行检查)针对询问发送响应
(例子中是发送服务端当前时间)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <ctime>
#include <nanomsg/nn.h>
#include <nanomsg/survey.h>

using namespace std;

int main(int argc, const char **argv) {
    if(argc != 3) {
        fprintf(stderr, "usage: %s NAME URLn", argv[0]);
        exit(-1);
    }
    const char *name = argv[1];
    const char *url = argv[2];

    int sock = nn_socket(AF_SP, NN_RESPONDENT);
    if(sock < 0){
        fprintf(stderr, "nn_socket fail: %sn", nn_strerror(errno));
        exit(-1);
    }
    if(nn_connect(sock, url) < 0) {
        fprintf(stderr, "nn_connect fail: %sn", nn_strerror(errno));
        exit(-1);
    }

    while(1){
        char *buf = NULL;
        int bytes = nn_recv (sock, &buf, NN_MSG, 0);

        if(bytes > 0) {
            printf ("CLIENT (%s): RECEIVED "%s" SURVEY REQUESTn", name, buf);
            nn_freemsg (buf);

            char sendBuffer[128];
            time_t rawtime;
            struct tm * timeinfo;

            time (&rawtime);
            timeinfo = localtime (&rawtime);
            char *timeText = asctime (timeinfo);
            int textLen = strlen(timeText);
            timeText[textLen - 1] = '\0';
            sprintf(sendBuffer, "[ %s ] %s", name, timeText);
            int sendSize = strlen(sendBuffer) + 1;
            int actualSendSize = nn_send(sock, sendBuffer, sendSize, 0);

            if(actualSendSize != sendSize) {
                fprintf(stderr, "nn_send fail, expect length %d, actual length %dn", sendSize, actualSendSize);
                continue;
            }
        }
    }

    nn_shutdown(sock, 0);

    return 0;
}

这里收到消息后,就简单的打印,然后将响应数据写会给服务端。

服务端

服务端有个问题,之前搜索了几个例子都不太正常。经过尝试和简单查看代码之后发现,通过nanomsg基础api,
无法获取当前有多少客户端。但是,如果当前所有连接的客户端的响应都已经收到,再次调用nn_recv之后,
会直接返回-1,表示读取失败,同时errno(通过errno函数获取)被设置为EFSM,表示当前状态机状态不正确。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/survey.h>

using namespace std;

const char *SURVEY_TYPE = "DATE";

int main(int argc, char** argv)
{

    if ( argc != 2 ) {
        fprintf(stderr, "usage: %s URLn", argv[0]);
        exit(-1);
    }
    const char *url = argv[1];
    int sock = nn_socket(AF_SP, NN_SURVEYOR);
    if(sock < 0) {
        fprintf (stderr, "nn_socket failed: %sn", nn_strerror (errno));
        exit(-1);
    }

    if(nn_bind(sock, url) < 0) {
        fprintf(stderr, "nn_bind fail: %sn", nn_strerror(errno));
        exit(-1);
    }

    while(1) {
        int sendSize = strlen(SURVEY_TYPE) + 1;
        int actualSendSize;
        printf ("SERVER: SENDING DATE SURVEY REQUESTn");
        if ((actualSendSize = nn_send(sock, SURVEY_TYPE, sendSize, 0)) != sendSize) {
            fprintf(stderr, "nn_send fail, expect length %d, actual length %dn", sendSize, actualSendSize);
            continue;
        }

        int count = 0;
        while(1) {
            char *buf = NULL;
            int bytes = nn_recv (sock, &buf, NN_MSG, 0);
            if (bytes < 0 && nn_errno() == ETIMEDOUT) break;
            if (bytes >= 0) {
                printf ("SERVER: RECEIVED "%s" SURVEY RESPONSEn", buf);
                ++count;
                nn_freemsg (buf);
            } else {
                fprintf(stderr, "nn_recv fail: %sn", nn_strerror(errno));
                break;
            }
        }
        printf("SERVER: current receive %d survey response.n", count);
        sleep(1);
    }

    nn_shutdown(sock, 0);

    return 0;

}

这里用了两个死循环,外层循环不停尝试向客户端发起询问。完成询问后,通过另外一个死循环读取所有的客户端响应,
当读取失败时退出循环。

之前找到的源码是直接判断错误是否ETIMEDOUT,经过打印会发现每次都没有超时,而是状态机错误:


1
2
3
/*  If no survey is going on return EFSM error. */
if (nn_slow (!nn_surveyor_inprogress (surveyor)))
    return -EFSM;

测试

测试和前文差不多,先启动一个server,然后再一个个启动client:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#!/bin/bash

BASE="$( cd "$( dirname "$0" )" && pwd )"
SERVER=$BASE/surveyserver
CLIENT=$BASE/surveyclient

URL="tcp://127.0.0.1:1234"

echo "start surveyserver to bind tcp: $URL"
$SERVER tcp://127.0.0.1:1234 &

echo "start to start surveyclient"
for((i = 0; i < 10; i++))
do
    echo "start client$i"
    $CLIENT client$i $URL &
    sleep 1
done

sleep 20
echo "kill all process and exit"

for pid in `jobs -p`
do
    echo "kill $pid"
    kill $pid
done

wait

输出为:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
start surveyserver to bind tcp: tcp://127.0.0.1:1234
start to start surveyclient
start client0
SERVER: SENDING DATE SURVEY REQUEST
start client1
nn_recv fail: Operation cannot be performed in this state
SERVER: current receive 0 survey response.
start client2
SERVER: SENDING DATE SURVEY REQUEST
CLIENT (client0): RECEIVED "DATE" SURVEY REQUEST
SERVER: RECEIVED "[ client0 ] Tue Feb 17 23:32:43 2015" SURVEY RESPONSE
CLIENT (client1): RECEIVED "DATE" SURVEY REQUEST
SERVER: RECEIVED "[ client1 ] Tue Feb 17 23:32:43 2015" SURVEY RESPONSE
nn_recv fail: Operation cannot be performed in this state
SERVER: current receive 2 survey response.
start client3
SERVER: SENDING DATE SURVEY REQUEST
CLIENT (client0): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client1): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client2): RECEIVED "DATE" SURVEY REQUEST
...
SERVER: SENDING DATE SURVEY REQUEST
CLIENT (client0): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client1): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client2): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client3): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client4): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client5): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client6): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client7): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client9): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client8): RECEIVED "DATE" SURVEY REQUEST
SERVER: RECEIVED "[ client0 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client1 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client2 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client3 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client4 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client5 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client6 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client7 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client9 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client8 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
nn_recv fail: Operation cannot be performed in this state
SERVER: current receive 10 survey response.

从输出可以看见,每次最后一个接收完成之后,都会有一个“Operation cannot be performed in this state”
错误,也就是EFSM错误。

nanomsg实验——pubsub

nanomsg实验——pubsub

发布订阅模式是很多消息中间件提供的常见功能。通过消息机制,能够将消息发布者和消息接收(消费)者
进行解耦。pubsub模式也是nanomsg直接支持的一直消息模型之一,因此通过pubsub模式实验,
同时也大致了解了下nanomsg的基础用法。

服务端


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>

void usage(const char *name)
{
    fprintf(stderr, "%s [ bind url]n", name);
}

int main(int argc, char **argv)
{
    if(argc != 2) {
        usage(argv[0]);
        exit(-1);
    }

    const char *url = argv[1];
    int sock = nn_socket(AF_SP, NN_PUB);
    if(sock < 0) {
        fprintf (stderr, "nn_socket failed: %sn", nn_strerror (errno));
        exit(-1);
    }

    if(nn_bind(sock, url) < 0) {
        fprintf(stderr, "nn_bind failed: %sn", nn_strerror(errno));
        exit(-1);
    }

    while(1) {
        time_t rawtime;
        struct tm * timeinfo;

        time (&rawtime);
        timeinfo = localtime (&rawtime);
        char *text = asctime (timeinfo);
        int textLen = strlen(text);
        text[textLen - 1] = '\0';

        printf ("SERVER: PUBLISHING DATE %sn", text);
        nn_send(sock, text, textLen, 0);
        sleep(1);
    }

    return 0;
}

nanomsg使用非常简单,只要直接include nanomsg/nn.h,即可使用基本API。使用内置的通信模式,
需要引入对应的头文件,如pubsub模式,引入nonomsg/pubsub.h即可。

pubsub server,需要首先通过nn_socket调用创建socket,这里模仿了POSIX接口,
函数返回一个文件描述符。因此直接通过判断返回值是否大于0,判断是否创建成功。注意第二个参数为协议,
在协议相关头文件中会定义对应的宏。然后所有操作都将基于这个文件描述符。
和berkeley sockets一样,server需要bind一个端口,nanomsg需要bind一个url。目前nanomsg支持的格式有:
* 进程内通信(inproc):url格式为inproc://test
* 进程间同in想(ipc):url格式为ipc:///tmp/test.ipc
* tcp通信:url格式为tcp://*:5555

github上源码貌似已经支持websocket了。

nanomsg的错误和UNIX相同,失败之后会设置errno,可以通过nn_strerror获取对应的错误文本。

bind完了之后,就可以通过nn_send函数向socket发送消息了。这个函数参数和berkeley sockets api接口类似。
这里直接获取当前时间,然后发出给所有订阅者。

客户端


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <stdio.h>
#include <stdlib.h>

#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>

int main(int argc, char **argv)
{
    if(argc != 3) {
        fprintf(stderr, "usage: %s NAME BIND_URLn", argv[0]);
        exit(-1);
    }
    const char *name = argv[1];
    const char *url = argv[2];

    int sock = nn_socket (AF_SP, NN_SUB);
    if(sock < 0) {
        fprintf(stderr, "fail to create socket: %sn", nn_strerror(errno));
        exit(-1);
    }
    if(nn_setsockopt (sock, NN_SUB, NN_SUB_SUBSCRIBE, "", 0) < 0) {
        fprintf(stderr, "fail to set sorket opts: %sn", nn_strerror(errno));
        exit(-1);
    }

    if (nn_connect(sock, url) < 0) {
        fprintf(stderr, "fail to connect to %s : %sn", url, nn_strerror(errno));
        exit(-1);
    }


    while ( 1 ) {
        char *buf = NULL;
        int bytes = nn_recv (sock, &buf, NN_MSG, 0);
        printf ("CLIENT (%s): RECEIVED %sn", name, buf);
        nn_freemsg (buf);
    }

    nn_shutdown(sock, 0);

    return 0;
}

客户端初始化和服务端差不多,在连接服务端之前,需要通过nn_setsockopt将当前socket设置成消息订阅者。
然后通过nn_connect连接发布者,参数和服务端bind的差不多,也是一个socket、一个url。
这里的url要和服务端bind的url相同。之后就是一个死循环不停的接收发布者的消息。

测试

首先是编译,和普通c程序相同,只是增加链接nanomsg。


1
2
gcc -o pubserver pubserver.c -lnanomsg
gcc -o pubclient pubclient.c -lnanomsg

为了方便测试,写了一个简单的shell脚本:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#!/bin/bash

BASE="$( cd "$( dirname "$0" )" && pwd )"
PUB=$BASE/pubserver
SUB=$BASE/pubclient

URL="tcp://127.0.0.1:1234"

echo "start pubserver to bind tcp: $URL"

$PUB tcp://127.0.0.1:1234 &

echo "start to start pubclient"
for((i = 0; i < 10; i++))
    do
    echo "start client$i"
    $SUB client$i $URL &
    sleep 1
done

sleep 20
echo "kill all process and exit"

for pid in `jobs -p`
do
    echo "kill $pid"
    kill $pid
done

wait

脚本很简单,首先启动一个消息发布者,然后每秒启动一个消息接受者。等待20s之后,kill掉所有子进程。

脚本的输出:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
start pubserver to bind tcp: tcp://127.0.0.1:1234
start to start pubclient
start client0
SERVER: PUBLISHING DATE Tue Feb 17 15:12:11 2015
start client1
SERVER: PUBLISHING DATE Tue Feb 17 15:12:12 2015
CLIENT (client0): RECEIVED Tue Feb 17 15:12:12 2015
CLIENT (client1): RECEIVED Tue Feb 17 15:12:12 2015
start client2
SERVER: PUBLISHING DATE Tue Feb 17 15:12:13 2015
CLIENT (client0): RECEIVED Tue Feb 17 15:12:13 2015
CLIENT (client1): RECEIVED Tue Feb 17 15:12:13 2015
CLIENT (client2): RECEIVED Tue Feb 17 15:12:13 2015
start client3
SERVER: PUBLISHING DATE Tue Feb 17 15:12:14 2015
CLIENT (client0): RECEIVED Tue Feb 17 15:12:14 2015
CLIENT (client1): RECEIVED Tue Feb 17 15:12:14 2015
CLIENT (client2): RECEIVED Tue Feb 17 15:12:14 2015
...
SERVER: PUBLISHING DATE Tue Feb 17 15:12:41 2015
CLIENT (client0): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client1): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client2): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client3): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client4): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client5): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client6): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client7): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client8): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client9): RECEIVED Tue Feb 17 15:12:41 2015
kill all process and exit

可以看见每次启动一个新的订阅者,每个订阅者都能够收到发布者发布的当前时间。

graphviz dot初探

graphviz dot初探

简介

现在文档都用markdown保存到github、gitlab这种代码仓库。markdown遇到最大的问题就是对图片的引用,
直接用工具绘制的图片可以引用,但是这样没法像md文件那样在git仓库中进行版本管理,而且既然文档用了描述语言,
引用图片源文件能用描述语言就更好了。

dot是graphviz的一种描述语言,可以通过graphviz提供的命令行工具生成图片文件。

安装

用gentoo(prefix)安装graphviz直接emerge即可,除了默认的选项,增加了svg格式的支持:


1
USE="svg" emerge media-gfx/graphviz

绘图

dot的文档可以参考官方pdf

节点和箭头

dot的节点创建和连接非常方便,直接a->b就可以创建节点a和b,并且用默认的箭头连接两个节点。


1
2
3
digraph e1 {
    a->b;
}

使用命令生成png图片:


1
dot -Tpng -o e1.png e1.dot

生成的效果图片为: e1

节点属性

节点也可以先“声明”再使用(连接),节点可以用方括号添加属性,比如节点标签、形状、填充、颜色等。
稍稍修改a、b的形状:


1
2
3
4
5
digraph e2 {
    a[label="label a" shape="box3d" color="red"];
    b[label="label b" shape="box" style="filled" fillcolor="yellow"];
    a->b;
}

生成效果图: e2

集群(cluster)

dot中可以设置子图,默认子图是不会显示边框和名称。除非子图的名字是cluster开头:


1
2
3
4
5
6
7
8
9
10
11
12
digraph e3 {
    subgraph cluster0 {
        a->{b c}
        label="cluster 0"
    }
    subgraph cluster1 {
        x->{y z}
        label="cluster 1"
    }
    edge[constraint=false];
    a->x;
}

效果图: e3

这里通过设置edge的constraint为false,不让右边的子图“掉下去”。

docker服务发现——最终测试

最终测试

首先启动confd,按照前面的命令,去掉onetime参数,放到后台最为守护进程长期运行,确保etcd注册目录修改之后,能准实时生成haproxy的配置文件。

然后在两台slave,一台启动两个nginx容器,一台启动一台,模拟上面的a.abc.com和b.abc.com两个域名。


1
docker run -P -v `pwd`/html:/var/www/html -d dockerfile/nginx

这里暴露所有的端口(80和443),然后挂载当前的html目录给该容器,再html目录中创建一个1.html文件,
包含容器id、内部ip、外部ip作为测试。同样启动两个之后,通过master上的etcdctl配置这几个启动的容器:


1
2
3
4
5
6
etcdctl set /services/web/a.abc.com/server1/ip 10.211.55.12
etcdctl set /services/web/a.abc.com/server1/port 49154
etcdctl set /services/web/a.abc.com/server2/ip 10.211.55.12
etcdctl set /services/web/a.abc.com/server2/port 49156
etcdctl set /services/web/b.abc.com/server1/ip 10.211.55.13
etcdctl set /services/web/b.abc.com/server1/port 49154

confd会间歇性检查目录修改状态:

INFO /home/babydragon/haproxy/haproxy.cfg has md5sum c8fb4ae9c10086b9f94cd11d0edecec1 should be 048c844d73c062014c0fd77d9548c47d

2015-02-09T11:42:00+08:00 master confd[3781]: INFO Target config /home/babydragon/haproxy/haproxy.cfg out of sync

2015-02-09T11:42:00+08:00 master confd[3781]: INFO Target config /home/babydragon/haproxy/haproxy.cfg has been updated

然后haproxy被更新:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
acl is_a.abc.com hdr(host) -i a.abc.com

acl is_b.abc.com hdr(host) -i b.abc.com



use_backend a.abc.com_cluster if is_a.abc.com

use_backend b.abc.com_cluster if is_b.abc.com



backend a.abc.com_cluster
cookie SERVERID insert indirect nocache

server server1 10.211.55.12:49154 cookie server1 check

server server2 10.211.55.12:49156 cookie server2 check


backend b.abc.com_cluster
cookie SERVERID insert indirect nocache

server server1 10.211.55.13:49154 cookie server1 check

重新启动haproxy的容器(没有配置直接加载haproxy.cfg),查看status页面,两个backend都已经生效。通过curl模拟下:


1
curl -H "Host: a.abc.com" http://10.211.55.11:49154/1.html

I am a80b37f78259 on 172.17.0.4 (Host: 10.211.55.12)


1
curl -H "Host: a.abc.com" http://10.211.55.11:49154/1.html

I am 209b20bab7ce on 172.17.0.3 (Host: 10.211.55.12)

由于配置了负载均衡为轮询方式,两次请求被落到了不同的容器上,haproxy正确的将请求分发到了两个容器中。

docker服务发现——confd

confd

confd通过读取配置(支持etcd,consul,环境变量),通过go的模板,生成最终的配置文件。

安装

安装和etcd一样,非常方便,已经提供了64位的可执行程序,下载下来之后直接放到PATH中(/usr/local/bin)即可(别忘了+x)。

haproxy配置生成

confd配置文件默认在/etc/confd中,可以通过参数-confdir指定。目录中包含两个子目录,分别是:conf.d templates。
confd会先读取conf.d目录中的配置文件(toml格式),然后根据文件指定的模板路径去渲染模板。

基于之前配置的etcd集群,读取/services/web中的目录和key。结构为:


1
2
/services/web/$DOMAIN/$HOST/ip
                          |-port

其中DOMAIN表示注册应用的域名,HOST表示注册机器的主机名。

首先创建confd配置文件:


1
2
3
4
5
6
7
[template]  
src = "haproxy.cfg.tmpl"  
dest = "/home/babydragon/haproxy/haproxy.cfg"  
keys = [  
"/services/web",  
]  
#reload_cmd = "/etc/init.d/haproxy reload"

现在只测试模板生成,所以不关注检查、重启等命令,配置模板名称、目标路径和获取的key即可。

着重看下模板变动部分(前面就是写死的一些haproxy的标准配置,如日志等)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
frontend webin
bind :80

{{$domains := lsdir "/services/web"}}
{{range $domain := $domains}}
acl is_{{$domain}} hdr(host) -i {{$domain}}
{{end}}

{{range $domain := $domains}}
use_backend {{$domain}}_cluster if is_{{$domain}}
{{end}}

{{range $domain := $domains}}
backend {{$domain}}_cluster
cookie SERVERID insert indirect nocache
{{$domain_dir := printf "/services/web/%s" $domain}}{{range $host := lsdir $domain_dir}}
server {{base $host}} {{$ip_key := printf "/services/web/%s/%s/ip" $domain $host}}{{getv $ip_key}}:{{$port_key := printf "/services/web/%s/%s/port" $domain $host}}{{getv $port_key}} cookie {{base $host}} check
{{end}}
{{end}}

这里主要有两个循环,第一个循环所有的域名,第一个循环每一个域名下的所有主机。
haproxy需要通过设置acl的方式来进行按照域名做负载均衡。因此首先循环域名,为每个域名创建一个acl规则和一个规则的使用。
下面再通过一个循环,创建没个域名对应的后段。

confd模板详细文档可以参考github上的文档
大致的意思是通过lsdir获取当前目录下的所有子目录。第一层子目录为域名,根据域名即可生成acl规则、规则使用、后端名称等数据。
再重新通过瓶装域名目录,对域名目录执行lsdir,读取目录下的每个主机名,创建后端的server条目(一个域名下的负载均衡后段服务器),
同时获取挂在这个目录下的属性键值对(这里只有ip和port)。

创建完模板和配置之后,先构造一些测试数据:


1
2
3
4
5
6
7
8
9
10
11
12
etcdctl mkdir /services/web
etcdctl mkdir /services/web/a.abc.com
etcdctl mkdir /services/web/b.abc.com
etcdctl mkdir /services/web/a.abc.com/server1
etcdctl mkdir /services/web/a.abc.com/server2
etcdctl mkdir /services/web/b.abc.com/server1
etcdctl set /services/web/a.abc.com/server1/ip 10.0.0.1
etcdctl set /services/web/a.abc.com/server1/port 10000
etcdctl set /services/web/a.abc.com/server2/port 10001
etcdctl set /services/web/a.abc.com/server2/ip 10.0.0.1
etcdctl set /services/web/b.abc.com/server1/ip 10.0.0.2
etcdctl set /services/web/b.abc.com/server1/port 12345

这里模拟三个容器,其中两个作为域名a.abc.com运行容器,一个作为b.abc.com容器。

然后执行confd,检查生成的配置文件:


1
confd -confdir ./confd -onetime -backend etcd -node 127.0.0.1:4001

刚才那段模板渲染后为:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
frontend webin
    bind :80



    acl is_a.abc.com hdr(host) -i a.abc.com

    acl is_b.abc.com hdr(host) -i b.abc.com



    use_backend a.abc.com_cluster if is_a.abc.com

    use_backend b.abc.com_cluster if is_b.abc.com



backend a.abc.com_cluster
    cookie SERVERID insert indirect nocache

    server server1 10.0.0.1:10000 cookie server1 check

    server server2 10.0.0.1:10001 cookie server2 check


backend b.abc.com_cluster
    cookie SERVERID insert indirect nocache

    server server1 10.0.0.2:12345 cookie server1 check

docker服务发现——etcd集群

etcd

etcd用于服务发现的基础注册和通知,功能类似于zk,通过注册和监听,实现基础的服务发现。

安装

etcd安装非常简单,可以用go自己编译,etcd也提供了可以直接使用的二进制包(64位)。
具体的安装提示页面在github上,
直接按照上面的描述下载即可。为了方便,把里面的etcd相关的二进制文件(etcd, etcdctl等)
复制到了/usr/local/bin中,方便后续使用。

运行

首先尝试单机版启动,参照手册先直接启动,etcd默认监听的是localhost,既只监听了lo设备,
这样会导致启动后集群中的其他机器无法访问,因此在启动的时候将默认的localhost改成0.0.0.0,
确保etcd监听了所有网卡。


1
etcd -listen-client-urls "http://0.0.0.0:4001" -listen-peer-urls="http://0.0.0.0:7001"

启动之后可通过rest接口或者etcdctl执行命令:


1
curl -L http://127.0.0.1:4001/version

输出结果为:

{“releaseVersion”:”2.0.0″,”internalVersion”:”2″}

简单写入和读取:


1
curl -L http://127.0.0.1:4001/v2/keys/message -XPUT -d value="Hello world"

{“action”:”set”,”node”:{“key”:”/message”,”value”:”Hello world”,”modifiedIndex”:3,”createdIndex”:3}}


1
curl -L http://127.0.0.1:4001/v2/keys/message

{“action”:”get”,”node”:{“key”:”/message”,”value”:”Hello world”,”modifiedIndex”:3,”createdIndex”:3}}

集群启动

之前的启动方式为单机启动,集群创建官方给了很多种方式,这里尝试动态添加机器的方式。


1
2
3
4
etcd -name "node1" -initial-cluster "node1=http://10.211.55.11:2380"
    -initial-advertise-peer-urls "http://10.211.55.11:2380"
    -listen-client-urls "http://0.0.0.0:4001"
    -listen-peer-urls="http://0.0.0.0:2380"

启动之后,通过member api,可以查看到当前集群:


1
curl -L http://10.211.55.11:4001/v2/members

{“members”:[{“id”:”f0b31008acf03099″,”name”:”node1″,”peerURLs”:[“http://10.211.55.11:2380″,”http://10.211.55.11:7001″],”clientURLs”:[“http://localhost:2379″,”http://localhost:4001″]}]}

向集群中添加一台机器:


1
etcdctl member add node2 http://10.211.55.12:2380

Added member named node2 with ID 6d345c68496f80fc to cluster

ETCD_NAME=”node2″
ETCD_INITIAL_CLUSTER=”node2=http://10.211.55.12:2380,node1=http://10.211.55.11:2380,node1=http://10.211.55.11:7001″
ETCD_INITIAL_CLUSTER_STATE=”existing”

这里为了方便,没有只用rest接口,直接使用了etcdctl,该命令会提示在启动新的节点时所需要配置的环境变量。

启动新的节点:


1
2
3
4
5
6
7
export ETCD_NAME="node2"
export ETCD_INITIAL_CLUSTER="node1=http://10.211.55.11:2380,node2=http://10.211.55.12:2380"
export ETCD_INITIAL_CLUSTER_STATE="existing"
etcd -listen-client-urls http://0.0.0.0:4001
    -advertise-client-urls http://0.0.0.0:4001
    -listen-peer-urls http://10.211.55.12:2380
    -initial-advertise-peer-urls http://10.211.55.12:2380

启动之后,通过member接口查看当前集群机器:


1
etcdctl member list

293ea5ba1d70f5f1: name=node2 peerURLs=http://10.211.55.12:2380 clientURLs=http://0.0.0.0:4001

bd93686a68a54c2d: name=node1 peerURLs=http://10.211.55.11:2380 clientURLs=http://localhost:2379,http://localhost:4001

同样的方式再添加一台,成为基础的3台机器集群:


1
2
3
4
5
6
7
export ETCD_NAME="node3"
export ETCD_INITIAL_CLUSTER="node2=http://10.211.55.12:2380,node3=http://10.211.55.13:2380,node1=http://10.211.55.11:2380"
export ETCD_INITIAL_CLUSTER_STATE="existing"
etcd -listen-client-urls http://0.0.0.0:4001
    -advertise-client-urls http://0.0.0.0:4001
    -listen-peer-urls http://10.211.55.13:2380
    -initial-advertise-peer-urls http://10.211.55.13:2380

最终集群为:

293ea5ba1d70f5f1: name=node2 peerURLs=http://10.211.55.12:2380 clientURLs=http://0.0.0.0:4001

76610041ace6c4f8: name=node3 peerURLs=http://10.211.55.13:2380 clientURLs=http://0.0.0.0:4001

bd93686a68a54c2d: name=node1 peerURLs=http://10.211.55.11:2380 clientURLs=http://localhost:2379,http://localhost:4001

在node1节点机器上运行:


1
etcdctl set /message hello

之后,在三台机器中执行:


1
etcdctl get /message

都能够正确的获取这个key的值:hello。

使用shc加密shell脚本

使用shc加密shell脚本

使用

之前为了防止svn认证问题,都是直接把svn的用户名、密码写在脚本里面的。由于明文密码非常不安全,所以必须采取密文保存。
为了不影响原有脚本流程,采用了抽取svn命令为独立脚本,在这个脚本里面写上用户名和密码,并加密保存。

加密采用了shc这个工具进行。该工具采用arc4的方式加密数据,而且比较简单,运行之后会根据输入的脚本(svn.sh)生成两个文件:
svn.sh.x这个是加密后的可执行程序,svn.sh.x.c这个是那个可执行程序的C源码。最终运行只需要前面那个.x结尾的可执行程序即可。

简单说下我的使用,刚开始什么参数都不加,最终生成的.x文件为动态链接,虽然没有什么依赖,但是需要动态链接glibc。由于我本地的
glibc版本远大于服务器上的,所以导致这个文件无法运行,提示找不到GLIBC 2.6找不到。查看了帮助文档之后,可以通过设置CFLAGS
环境变量,修改编译参数,再加上其他参数,最终的命令为:


1
CFLAGS="-static" shc -v -T -r -f svn.sh

其中通过环境变量,让编译输出的.x文件变成静态链接,这样就不需要依赖主机的任何动态链接库;-v参数输出命令的详细输出;
-T参数增加traceable,让输出的可执行程序可追踪;-r参数生成可分发的二进制程序。

这样原先脚本执行svn,只要直接改成svn.sh.x,去掉原先的用户名、密码,这样密码就被加密保存了。

原理

shc采用arc4加密,在命令中,大致通过
– 处理输入参数
– 读取脚本
– 解析使用的shell
– 生成C源码文件
– 编译C源码文件
这几步完成。在生成源码文件的时候,会对内容进行arc4加密,C源码中重新进行arc4进行解密,最终通过execvp调用运行最终的脚本。

几个参数的大致实现:

-T

如果不设置这个变量,生成的可执行文件将不能被trace


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void untraceable(char * argv0)
{
    char proc[80];
    int pid, mine;

    switch(pid = fork()) {
        case  0:
            pid = getppid();
            /* For problematic SunOS ptrace */
            #if defined(__FreeBSD__)
            sprintf(proc, "/proc/%d/mem", (int)pid);
            #else
            sprintf(proc, "/proc/%d/as",  (int)pid);
            #endif
            close(0);
            mine = !open(proc, O_RDWR|O_EXCL);
            if (!mine && errno != EBUSY)
                mine = !ptrace(PTRACE_ATTACH, pid, 0, 0);
                if (mine) {
                    kill(pid, SIGCONT);
                } else {
                    perror(argv0);
                    kill(pid, SIGKILL);
                }
                _exit(mine);
        case -1:
            break;
        default:
            if (pid == waitpid(pid, 0, 0))
                return;
    }
    perror(argv0);
    _exit(1);
}

首先fock一个进程,试图去通过ptrace调用挂载到新fock出来的进程上,如果成功了,说明没有被其他进程trace,则发送SIGCONT信号继续执行,
否则直接发送SIGKILL信号终止进程继续执行。但是在实际使用过程中,不知道为什么,偶尔会让服务器上多出T状态的进程,看上是因为ptrace调用
向进程发送了SIGSTOP之后,再次发送SIGCONT没有成功。因此后面都加上了-T参数,跳过这个不能追踪设置。当前这样会大大降低安全性。

-e

这个参数可以指定可执行程序失效时间,超过这个时间加密后的可执行程序将无法运行。实现也很简单直接把加密后的过期时间放在了生成的C代码前,
执行shell之前,会有


1
2
if (date[0] && (atoll(date)<time(NULL)))
    return msg1;

这样的判断,直接阻止程序的运行。

[redis设计与实现][14]sentinel——jedis客户端

sentinel客户端支持——jedis

简单的代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class App
{  
    public static void main( String[] args )  
    {  
        Set<String> sentinels = new HashSet<String>();
        // 添加sentinel
        sentinels.add("172.18.18.207:26379");
        // 初始化基于sentinel的连接池,需要一个master的名字(用于查询这个master)
        JedisSentinelPool pool = new JedisSentinelPool("mymaster", sentinels);  

        // 后面使用和标准的jedis连接池相同
        Jedis jedis = pool.getResource();  

        jedis.set("jedis", "jedis");  

        pool.returnResource(jedis);  

    }
}

JedisSentinelPool的源码在github上托管。
大致的流程:
1. 根据给定的sentinel列表和master名字,获得master信息(IP,端口)(通过sentinel master命令)
2. 启动一个线程,根据给定的sentinel列表,订阅+switch-master消息
3. 一旦发现有master切换,调用initPool方法,修改连接池使用的master