netty4 UDP 通信
netty4 UDP 通信的例子很少,官方代码里面只有https://github.com/netty/netty/tree/4.0/example/src/main/java/io/netty/example/qotm这一个例子。但是,因为需求是从原先TCP 方式修改成UDP,需要能够最大方式的利用原先的解码器、处理器,但是官方的例子里面, 客户端只有一个广播消息,服务端回复一个消息。这当然是无法满足真正的业务需求的。特别是这次的需求是将原有的TCP长连接方式发送心跳, 改成UDP方式。
复用客户端编解码器和handler
原先的解码器是为TCP 包写的,直接通过继承ReplayingDecoder,对bytebuf中的字节按照自定义的协议,转换成内部对象。 按照官方例子,基于UDP的解码器,都是直接将从DatagramPacket对象封装的UDP包中进行解码。 因此,最简单的编码复用方式,是在自己的ByteToMessage的解码器之前,放置一个MessageToMessage解码器,将DatagramPacket中的bytebuf提取出来。
这里特别注意两个问题:
- 在out中添加了msg的bytebuf之后,注意调用msg的retain方法,防止msg中的bytebuf提前释放抛出异常
- 这样写了之后,丢失了DatagramPacket中的sender,这样在后续的处理器中,无法直接向来源发送消息,所以这种方式基本只试用于客户端
除了解码器,编码器也需要有对应的改造。由于UDP包没有连接,没有办法在连接的时候指定一个目标地址。 所以解码器利用了channel的attribute map来让客户端能够随时切换服务端地址。
首先从原始编码器获得bytebuf,从channel的attribute中获取目标服务器地址,然后组装成最终发送的DatagramPacket。 这里要注意的是:
- 目标地址为了能在整个连接上下文中共享,需要保存在channel中,而不是context中
- 发送的bytebuf仍然需要调用retain来防止提前释放抛出异常
服务端数据应答
之前在客户端的编码解码器之前(后)加上简单封装,就可以正常的首发UDP包。因为客户端的业务逻辑比较简单,UDP包只包含了客户端心跳, 无需对服务端的应答包进行处理。而服务器端,则需要对应答包做出业务逻辑处理(这里大致处理是将心跳信息写入Redis中),并发送响应包。 因此,会发现一个严重的问题,原先处理TCP的流程,缺少了一个重要的信息,就是客户端地址。
为了解决这个问题,只能新建一个新的编码、解码器,并且继承原先的业务对象(请求、应答包)增加来源(目标)地址。但是由于UDP包没有TCP 包 粘连的问题,因此只需要继承MessageToMessageDecoder即可,不需要使用类似ReplayingDecoder这样复杂的解码器上层实现。
编码器也是类似,需要在业务处理的handler中判断是否为UdpRequestData,确定是否需要在write的时候使用对应的UdpResponseData。
服务器端连接管理
服务器连接管理,主要用于记录服务器端链接数量和负载的记录,这些数据会被写入到Redis中,客户端将依据这些数据来挑选连接数量最少、 压力最小的服务器进行连接。之前使用的TCP协议,因此连接管理非常方便,服务器端维护一个ChannelGroup,在active和inactive的时候从ChannelGroup 中添加或者删除。
由于UDP是无连接的,无法通过handler的几个回调来判断连接状态,所以将逻辑改成了服务器端维护一个hashmap,客户端IP作为key, 最后一次udp包接收时间作为value。在处理业务逻辑的handler中,每当收到一个UDP包,就将这对数据放入到hashmap中。服务器端定时器在定时写入Redis 的时候,通过判断value是否过期,来移除这个键值对。通过这样的逻辑,来记录服务器端的连接数基本上是靠谱的。
同时,为了兼容原先的TCP方式心跳,需要将TCP方式的连接对应的时间戳,改成Long.MAX_VALUE,确保TCP连接只在inactive的时候才被移除。
定时器处理逻辑大致为:
客户端的超时重连
之前TCP版本的重连非常方便,当发现客户端连接服务器的channel isActive方法返回false的时候,直接重新获取服务器列表, 按照连接数排序,重新去连接一个连接数最低的服务器即可。
同样是因为UDP是无连接的,同时UDP也是不安全的,一个包发出去很有可能肉包子打狗——有去无回了。(当然,在内网环境下,网络环境还是应该乐观点的~) 处理类似的最简单办法,就是超时重连了。
之前的客户端,所有消息会被放到一个阻塞队列(因为心跳什么的都是由定时器线程产生的),由一个线程来完成从阻塞队列读取并发送的过程。 这次的修改,也只能从这里下手了。首先在客户端维护一个最后一个数据包返回的时间戳(为了线程安全神马的,直接用了AtomicLong类型)。
首先在初始化的时候,将这个值设置为0。确保第一次发送消息的时候,客户端会主动去“连接”服务端。为什么这里的“连接”是加上引号的呢? 原因很简答,还是那句话,UDP是无连接的。按照之前的设计,这个所谓的“连接”,只是简单的在channel的attribute map中设置了TARGET_ADDRESS这个变量而已。
这里好多命名和流程,一看就不像是UDP协议的,的确,这些代码之前都走的是TCP协议~~
除此之外,当然是handler里面,要在每次收到服务器端响应包的时候,回写这个lastResponse了。
这里还加了一个,如果发生了异常,强行“关闭”连接。
这样,原先TCP方式的心跳,被改成了UDP方式。为了避免UDP协议的无连接、不可靠,整个流程中加了两个超时,确保了客户端、服务器端都能够和 以前一样正常的首发消息、断线重连等。
UDP用netty的好处不明显,因为单发udp包,其实不太容易遇到阻塞,只是写入了系统的发送缓冲区而已。
另外你得在UDP上自己重建TCP的稳定有状态,各家实现方式不同,实在也没啥example可以给,你看大部分异步框架(libevent/libev/libuv)对UDP都是一个很简陋的支持,就是这个原因。
其实就是因为这个重要性很低,才改成用UDP的。其他超时什么的,主要为了确认服务端收到了和能够重连。后者是因为没找到UDP层的负载均衡,只能自己来~
我在使用过程中发现udp方式,不会调用decoder,可以指点下吗?
需要添加在后面的,
大神,能共享源码么 ?499165711@qq.com
不好意思,是公司内部项目,代码没法公开~
大神我的邮箱是hobbesgo@163.com 希望可以与您取得联系,我现在手头也是一个TCP改写UDP的项目,希望可以得到您的指点
楼主不知道是否方便回复下消息,目前也遇到同样的项目TCP改为UDP通讯的需求,希望可以和您联系上请教一下您。希望您可以联系我留下的邮箱或者回复我,谢谢。