netty4 UDP 通信

netty4 UDP 通信

netty4 UDP 通信的例子很少,官方代码里面只有https://github.com/netty/netty/tree/4.0/example/src/main/java/io/netty/example/qotm这一个例子。但是,因为需求是从原先TCP 方式修改成UDP,需要能够最大方式的利用原先的解码器、处理器,但是官方的例子里面, 客户端只有一个广播消息,服务端回复一个消息。这当然是无法满足真正的业务需求的。特别是这次的需求是将原有的TCP长连接方式发送心跳, 改成UDP方式。

复用客户端编解码器和handler

原先的解码器是为TCP 包写的,直接通过继承ReplayingDecoder,对bytebuf中的字节按照自定义的协议,转换成内部对象。 按照官方例子,基于UDP的解码器,都是直接将从DatagramPacket对象封装的UDP包中进行解码。 因此,最简单的编码复用方式,是在自己的ByteToMessage的解码器之前,放置一个MessageToMessage解码器,将DatagramPacket中的bytebuf提取出来。

public class UdpMessageToByteDecode extends MessageToMessageDecoder<DatagramPacket> {
    @Override
    protected void decode(ChannelHandlerContext ctxDatagramPacket msgList<Object> out) throws Exception {
        out.add(msg.content());
        msg.retain();
    }
}

这里特别注意两个问题:

  1. 在out中添加了msg的bytebuf之后,注意调用msg的retain方法,防止msg中的bytebuf提前释放抛出异常
  2. 这样写了之后,丢失了DatagramPacket中的sender,这样在后续的处理器中,无法直接向来源发送消息,所以这种方式基本只试用于客户端

除了解码器,编码器也需要有对应的改造。由于UDP包没有连接,没有办法在连接的时候指定一个目标地址。 所以解码器利用了channel的attribute map来让客户端能够随时切换服务端地址。

public class UdpByteToMessageEncode extends MessageToMessageEncoder<ByteBuf> {
    private static Log log = LogFactory.getLog(UdpByteToMessageEncode.class);
 
    public static final AttributeKey<InetSocketAddress> TARGET_ADDRESS = AttributeKey.valueOf("TARGET_ADDRESS");
 
    @Override
    protected void encode(ChannelHandlerContext ctxByteBuf msgList<Object> out) throws Exception {
        InetSocketAddress ip = ctx.channel().attr(TARGET_ADDRESS).get();
        if (ip == null{
            log.error("no server ip");
            return;
        }
 
        DatagramPacket packet = new DatagramPacket(msg, ip);
        msg.retain();
        out.add(packet);
    }
}

首先从原始编码器获得bytebuf,从channel的attribute中获取目标服务器地址,然后组装成最终发送的DatagramPacket。 这里要注意的是:

  1. 目标地址为了能在整个连接上下文中共享,需要保存在channel中,而不是context中
  2. 发送的bytebuf仍然需要调用retain来防止提前释放抛出异常

服务端数据应答

之前在客户端的编码解码器之前(后)加上简单封装,就可以正常的首发UDP包。因为客户端的业务逻辑比较简单,UDP包只包含了客户端心跳, 无需对服务端的应答包进行处理。而服务器端,则需要对应答包做出业务逻辑处理(这里大致处理是将心跳信息写入Redis中),并发送响应包。 因此,会发现一个严重的问题,原先处理TCP的流程,缺少了一个重要的信息,就是客户端地址。

为了解决这个问题,只能新建一个新的编码、解码器,并且继承原先的业务对象(请求、应答包)增加来源(目标)地址。但是由于UDP包没有TCP 包 粘连的问题,因此只需要继承MessageToMessageDecoder即可,不需要使用类似ReplayingDecoder这样复杂的解码器上层实现。

public class UdpRequestDecode extends MessageToMessageDecoder<DatagramPacket> {
    private static final Log log = LogFactory.getLog(UdpRequestDecode.class);
 
    @Override
    protected void decode(ChannelHandlerContext ctxDatagramPacket packetList<Object> out) throws Exception {
        try {
            UdpRequestData requestData = new UdpRequestData();
            ByteBuf content = packet.content();
            requestData.setEncode(content.readByte());
 
            ...
 
            requestData.setSender(packet.sender());
 
            out.add(requestData);
        } catch (Exception e) {
            log.error(String.format("error decode udp packet from [ %s ], ignore!", packet.sender().toString()));
        }
    }
}

编码器也是类似,需要在业务处理的handler中判断是否为UdpRequestData,确定是否需要在write的时候使用对应的UdpResponseData。

服务器端连接管理

服务器连接管理,主要用于记录服务器端链接数量和负载的记录,这些数据会被写入到Redis中,客户端将依据这些数据来挑选连接数量最少、 压力最小的服务器进行连接。之前使用的TCP协议,因此连接管理非常方便,服务器端维护一个ChannelGroup,在active和inactive的时候从ChannelGroup 中添加或者删除。

由于UDP是无连接的,无法通过handler的几个回调来判断连接状态,所以将逻辑改成了服务器端维护一个hashmap,客户端IP作为key, 最后一次udp包接收时间作为value。在处理业务逻辑的handler中,每当收到一个UDP包,就将这对数据放入到hashmap中。服务器端定时器在定时写入Redis 的时候,通过判断value是否过期,来移除这个键值对。通过这样的逻辑,来记录服务器端的连接数基本上是靠谱的。

同时,为了兼容原先的TCP方式心跳,需要将TCP方式的连接对应的时间戳,改成Long.MAX_VALUE,确保TCP连接只在inactive的时候才被移除。

@Override
protected void channelRead0(ChannelHandlerContext ctx, RequestData request) throws Exception {
    RequestContext requestContext = new RequestContext(request, ctx.channel());
    ResponseData response = DO_SOMETHING(requestContext);
 
    if (request instanceof UdpRequestData{
        UdpRequestData udpRequest = (UdpRequestData) request;
        // log current sender ip 
        Server.addClient(udpRequest.getSender().getAddress().getHostAddress(), System.currentTimeMillis());
 
        UdpResponseData udpResponseData = UdpResponseData.fromResponseData(response);
        udpResponseData.setRecipient(udpRequest.getSender());
        ctx.writeAndFlush(udpResponseData);
    } else {
        ctx.writeAndFlush(response);
    }
}
 
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    Channel channel = ctx.channel();
    SocketAddress remoteAddress = channel.remoteAddress();
    if (remoteAddress != null{
        InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
        // tcp never timeout 
        Server.addClient(inetSocketAddress.getAddress().getHostAddress(), Long.MAX_VALUE);
    }
    super.channelActive(ctx);
}
 
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    Channel channel = ctx.channel();
    SocketAddress remoteAddress = channel.remoteAddress();
    if (remoteAddress != null{
        InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
        Server.removeClient(inetSocketAddress.getAddress().getHostAddress());
    }
    super.channelInactive(ctx);
}

定时器处理逻辑大致为:

public ServerClusterInfo getServerClusterInfo() {
    ServerClusterInfo serverClusterInfo = new ServerClusterInfo();
    serverClusterInfo.setPort(this.serverClusterInfo.getPort());
    serverClusterInfo.setHost(this.serverClusterInfo.getHost());
 
    Set<String> ips = new HashSet<String>();
    long currentTs = System.currentTimeMillis();
    Iterator<Map.Entry<StringLong>> iter = ipMap.entrySet().iterator();
    while (iter.hasNext(){
        Map.Entry<StringLong> ipEntry = iter.next();
        if (currentTs - ipEntry.getValue() < TIMEOUT{
            ips.add(ipEntry.getKey());
        } else {
            iter.remove();
        }
    }
 
    serverClusterInfo.setCurChannel(ips.size());
    serverClusterInfo.setConnectIps(ips);
    serverClusterInfo.setCurConnect(ips.size());
    return serverClusterInfo;
}

客户端的超时重连

之前TCP版本的重连非常方便,当发现客户端连接服务器的channel isActive方法返回false的时候,直接重新获取服务器列表, 按照连接数排序,重新去连接一个连接数最低的服务器即可。

同样是因为UDP是无连接的,同时UDP也是不安全的,一个包发出去很有可能肉包子打狗——有去无回了。(当然,在内网环境下,网络环境还是应该乐观点的~) 处理类似的最简单办法,就是超时重连了。

之前的客户端,所有消息会被放到一个阻塞队列(因为心跳什么的都是由定时器线程产生的),由一个线程来完成从阻塞队列读取并发送的过程。 这次的修改,也只能从这里下手了。首先在客户端维护一个最后一个数据包返回的时间戳(为了线程安全神马的,直接用了AtomicLong类型)。

首先在初始化的时候,将这个值设置为0。确保第一次发送消息的时候,客户端会主动去“连接”服务端。为什么这里的“连接”是加上引号的呢? 原因很简答,还是那句话,UDP是无连接的。按照之前的设计,这个所谓的“连接”,只是简单的在channel的attribute map中设置了TARGET_ADDRESS这个变量而已。

Thread sendThread = new Thread() {
    public void run() {
        while (true{
            RequestData requestData;
            try {
                requestData = queue.take();
 
                long currentTimeMillis = System.currentTimeMillis();
 
                if (currentTimeMillis - lastResponse.get() > TIMEOUT{
                    boolean connect = BootStrapConnectUtil.connect(channelFuture, serverClusterInfos);
                    if(!connect) {
                        log.error("fail to connect to watch dog! ignore current package");
                        continue;
                    }
                }
 
                channelFuture.channel().writeAndFlush(requestData);
            } catch (Throwable e) {
                log.error("", e);
            }
 
        }
    }
};
 
public static boolean connect(ChannelFuture channelFuture, List<ServerClusterInfo> serverClusterInfos) {
    InetSocketAddress remoteAddress = channelFuture.channel().attr(UdpByteToMessageEncode.TARGET_ADDRESS).get();
    serverClusterInfos = sortServer(serverClusterInfos, remoteAddress);
    boolean result = false;
    for (ServerClusterInfo server : serverClusterInfos) {
        try {
            channelFuture.channel().attr(UdpByteToMessageEncode.TARGET_ADDRESS).set(server.getAddress());
            result = true;
            Client.getClient().setLastResponse(System.currentTimeMillis());
        } catch (Throwable ignored) {
        }
    }
    return result;
}

这里好多命名和流程,一看就不像是UDP协议的,的确,这些代码之前都走的是TCP协议~~

除此之外,当然是handler里面,要在每次收到服务器端响应包的时候,回写这个lastResponse了。

@Override
protected void channelRead0(ChannelHandlerContext ctx, ResponseData response) throws Exception {
    ResultEnum responseCode = ResultEnum.fromCode(response.getResult());
    if (responseCode != null{
        switch (responseCode) {
            case SUCCESS:// 处理成功 
                Client.getClient().setLastResponse(System.currentTimeMillis());
                break;
            default:
                // do nothing 
        }
    }
}
 
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.close();
    // 强行过期 
    AgentClient.getClient().setLastResponse(0l);
}

这里还加了一个,如果发生了异常,强行“关闭”连接。

这样,原先TCP方式的心跳,被改成了UDP方式。为了避免UDP协议的无连接、不可靠,整个流程中加了两个超时,确保了客户端、服务器端都能够和 以前一样正常的首发消息、断线重连等。

8 Replies to “netty4 UDP 通信”

  1. UDP用netty的好处不明显,因为单发udp包,其实不太容易遇到阻塞,只是写入了系统的发送缓冲区而已。
    另外你得在UDP上自己重建TCP的稳定有状态,各家实现方式不同,实在也没啥example可以给,你看大部分异步框架(libevent/libev/libuv)对UDP都是一个很简陋的支持,就是这个原因。

    1. 其实就是因为这个重要性很低,才改成用UDP的。其他超时什么的,主要为了确认服务端收到了和能够重连。后者是因为没找到UDP层的负载均衡,只能自己来~

  2. 大神我的邮箱是hobbesgo@163.com 希望可以与您取得联系,我现在手头也是一个TCP改写UDP的项目,希望可以得到您的指点

  3. 楼主不知道是否方便回复下消息,目前也遇到同样的项目TCP改为UDP通讯的需求,希望可以和您联系上请教一下您。希望您可以联系我留下的邮箱或者回复我,谢谢。

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据