nanomsg实验——survey

nanomsg实验——survey

survey模式是由server发出询问,client针对请求回复响应的一种模式。这种模式在分布式系统中非常有用,
可以用来做服务发现、分布式事物等分布式询问。

客户端

客户端实现比较方便,除了基础调用(创建socket、连接url)之外,就是先接收服务端询问
(例子中比较简单,服务端询问是固定的,所以没有对内容进行检查)针对询问发送响应
(例子中是发送服务端当前时间)

#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <ctime>
#include <nanomsg/nn.h>
#include <nanomsg/survey.h>

using namespace std;

int main(int argc, const char **argv) {
    if(argc != 3) {
        fprintf(stderr, "usage: %s NAME URLn", argv[0]);
        exit(-1);
    }
    const char *name = argv[1];
    const char *url = argv[2];

    int sock = nn_socket(AF_SP, NN_RESPONDENT);
    if(sock < 0){
        fprintf(stderr, "nn_socket fail: %sn", nn_strerror(errno));
        exit(-1);
    }
    if(nn_connect(sock, url) < 0) {
        fprintf(stderr, "nn_connect fail: %sn", nn_strerror(errno));
        exit(-1);
    }

    while(1){
        char *buf = NULL;
        int bytes = nn_recv (sock, &buf, NN_MSG, 0);

        if(bytes > 0) {
            printf ("CLIENT (%s): RECEIVED "%s" SURVEY REQUESTn", name, buf);
            nn_freemsg (buf);

            char sendBuffer[128];
            time_t rawtime;
            struct tm * timeinfo;

            time (&rawtime);
            timeinfo = localtime (&rawtime);
            char *timeText = asctime (timeinfo);
            int textLen = strlen(timeText);
            timeText[textLen - 1] = '';
            sprintf(sendBuffer, "[ %s ] %s", name, timeText);
            int sendSize = strlen(sendBuffer) + 1;
            int actualSendSize = nn_send(sock, sendBuffer, sendSize, 0);

            if(actualSendSize != sendSize) {
                fprintf(stderr, "nn_send fail, expect length %d, actual length %dn", sendSize, actualSendSize);
                continue;
            }
        }
    }

    nn_shutdown(sock, 0);

    return 0;
}

这里收到消息后,就简单的打印,然后将响应数据写会给服务端。

服务端

服务端有个问题,之前搜索了几个例子都不太正常。经过尝试和简单查看代码之后发现,通过nanomsg基础api,
无法获取当前有多少客户端。但是,如果当前所有连接的客户端的响应都已经收到,再次调用nn_recv之后,
会直接返回-1,表示读取失败,同时errno(通过errno函数获取)被设置为EFSM,表示当前状态机状态不正确。

#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/survey.h>

using namespace std;

const char *SURVEY_TYPE = "DATE";

int main(int argc, char** argv)
{

    if ( argc != 2 ) {
        fprintf(stderr, "usage: %s URLn", argv[0]);
        exit(-1);
    }
    const char *url = argv[1];
    int sock = nn_socket(AF_SP, NN_SURVEYOR);
    if(sock < 0) {
        fprintf (stderr, "nn_socket failed: %sn", nn_strerror (errno));
        exit(-1);
    }

    if(nn_bind(sock, url) < 0) {
        fprintf(stderr, "nn_bind fail: %sn", nn_strerror(errno));
        exit(-1);
    }

    while(1) {
        int sendSize = strlen(SURVEY_TYPE) + 1;
        int actualSendSize;
        printf ("SERVER: SENDING DATE SURVEY REQUESTn");
        if ((actualSendSize = nn_send(sock, SURVEY_TYPE, sendSize, 0)) != sendSize) {
            fprintf(stderr, "nn_send fail, expect length %d, actual length %dn", sendSize, actualSendSize);
            continue;
        }

        int count = 0;
        while(1) {
            char *buf = NULL;
            int bytes = nn_recv (sock, &buf, NN_MSG, 0);
            if (bytes < 0 && nn_errno() == ETIMEDOUT) break;
            if (bytes >= 0) {
                printf ("SERVER: RECEIVED "%s" SURVEY RESPONSEn", buf);
                ++count;
                nn_freemsg (buf);
            } else {
                fprintf(stderr, "nn_recv fail: %sn", nn_strerror(errno));
                break;
            }
        }
        printf("SERVER: current receive %d survey response.n", count);
        sleep(1);
    }

    nn_shutdown(sock, 0);

    return 0;

}

这里用了两个死循环,外层循环不停尝试向客户端发起询问。完成询问后,通过另外一个死循环读取所有的客户端响应,
当读取失败时退出循环。

之前找到的源码是直接判断错误是否ETIMEDOUT,经过打印会发现每次都没有超时,而是状态机错误:

/*  If no survey is going on return EFSM error. */
if (nn_slow (!nn_surveyor_inprogress (surveyor)))
    return -EFSM;

测试

测试和前文差不多,先启动一个server,然后再一个个启动client:

#!/bin/bash

BASE="$( cd "$( dirname "$0" )" && pwd )"
SERVER=$BASE/surveyserver
CLIENT=$BASE/surveyclient

URL="tcp://127.0.0.1:1234"

echo "start surveyserver to bind tcp: $URL"
$SERVER tcp://127.0.0.1:1234 &

echo "start to start surveyclient"
for((i = 0; i < 10; i++))
do
    echo "start client$i"
    $CLIENT client$i $URL &
    sleep 1
done

sleep 20
echo "kill all process and exit"

for pid in `jobs -p`
do
    echo "kill $pid"
    kill $pid
done

wait

输出为:

start surveyserver to bind tcp: tcp://127.0.0.1:1234
start to start surveyclient
start client0
SERVER: SENDING DATE SURVEY REQUEST
start client1
nn_recv fail: Operation cannot be performed in this state
SERVER: current receive 0 survey response.
start client2
SERVER: SENDING DATE SURVEY REQUEST
CLIENT (client0): RECEIVED "DATE" SURVEY REQUEST
SERVER: RECEIVED "[ client0 ] Tue Feb 17 23:32:43 2015" SURVEY RESPONSE
CLIENT (client1): RECEIVED "DATE" SURVEY REQUEST
SERVER: RECEIVED "[ client1 ] Tue Feb 17 23:32:43 2015" SURVEY RESPONSE
nn_recv fail: Operation cannot be performed in this state
SERVER: current receive 2 survey response.
start client3
SERVER: SENDING DATE SURVEY REQUEST
CLIENT (client0): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client1): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client2): RECEIVED "DATE" SURVEY REQUEST
...
SERVER: SENDING DATE SURVEY REQUEST
CLIENT (client0): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client1): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client2): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client3): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client4): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client5): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client6): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client7): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client9): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client8): RECEIVED "DATE" SURVEY REQUEST
SERVER: RECEIVED "[ client0 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client1 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client2 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client3 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client4 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client5 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client6 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client7 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client9 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client8 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
nn_recv fail: Operation cannot be performed in this state
SERVER: current receive 10 survey response.

从输出可以看见,每次最后一个接收完成之后,都会有一个“Operation cannot be performed in this state”
错误,也就是EFSM错误。

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据