前面写了好多关于源码的文章,我觉得还是要来点实战的,解决一些问题,最近想用Netty写个游戏服务器框架,已经写了一点了,我想让一个netty
进程一个端口可以响应多个协议,比如我希望一个端口就可以响应HTTP
,WebSocket
,TCP私有协议
。如果对这些还不了解的可以看下我写的源码文章,基本都有讲了,至于私有协议,我这里也会给出例子。
netty
进程,一个进程相应一个。前两个应该比较简单,我们来简单的实现下第三个吧。
主服务类没别的设置,就是基本模板,但是只有一个处理器,叫ProtocolSelectorHandler
,这个就是我们来解析不同协议做动态添加的关键。
public class MyTestServer {public static void main(String[] args) throws Exception{EventLoopGroup bossGroup &#61; new NioEventLoopGroup(1);EventLoopGroup workerGroup &#61; new NioEventLoopGroup();try {ServerBootstrap serverBootstrap &#61; new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup);serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.childOption(ChannelOption.TCP_NODELAY,true);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {&#64;Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline &#61; ch.pipeline();pipeline.addLast(new ProtocolSelectorDecoder());}});//启动服务器ChannelFuture channelFuture &#61; serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();}finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
ProtocolSelectorHandler协议选择器
这个就是实现动态协议的关键&#xff0c;其实也只是简单的判断前几个字节是什么&#xff0c;也就是要一些约定&#xff0c;比如说约定好WebSocket
的话&#xff0c;URI
就是/ws
&#xff0c;如果看到GET /ws
前缀的话&#xff0c;就说明要进行WebSocket
握手了。我就添加WebSocket的处理器&#xff0c;重新来处理这个数据&#xff0c;然后把协议选择器删除&#xff0c;当然这里是简单的实现&#xff0c;可能期间出现问题&#xff0c;你把协议选择器删了就惨了&#xff0c;我们暂时不管哈哈。如果是自定义
协议的话&#xff0c;我们一般不会传空格
&#xff0c;所以我可以用空格
来区别自定义
协议和HTTP
协议&#xff0c;HTTP
请求行有空格
嘛。当然我也只是简单的实现&#xff0c;如果要较真&#xff0c;还是把判断换行&#xff0c;然后看是不是HTTP
协议比较好。
/*** 协议选择器&#xff0c;支持动态协议HTTP WEBSOCKET TCP私有协议* Author: wangwei*/
public class ProtocolSelectorHandler extends ByteToMessageDecoder {/*** websocket定义请求行前缀*/private static final String WEBSOCKET_LINE_PREFIX &#61; "GET /ws";/*** websocket的uri*/private static final String WEBSOCKET_PREFIX &#61; "/ws";/*** 检查10个字节&#xff0c;没有空格就是自定义协议*/private static final int SPACE_LENGTH &#61; 10;&#64;Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {System.out.println("before :" &#43; ctx.pipeline().toString());if (isWebSocketUrl(in)) {System.out.println("addWebSocketHandlers");addWebSocketHandlers(ctx.pipeline());} else if (isCustomProcotol(in)) {System.out.println("addTCPProtocolHandlers");addTCPProtocolHandlers(ctx.pipeline());} else {System.out.println("addHTTPHandlers");addHTTPHandlers(ctx.pipeline());}ctx.pipeline().remove(this);System.out.println("after :" &#43; ctx.pipeline().toString());}/*** 是否有websocket请求行前缀** &#64;param byteBuf* &#64;return*/private boolean isWebSocketUrl(ByteBuf byteBuf) {if (byteBuf.readableBytes() < WEBSOCKET_LINE_PREFIX.length()) {return false;}byteBuf.markReaderIndex();byte[] content &#61; new byte[WEBSOCKET_LINE_PREFIX.length()];byteBuf.readBytes(content);byteBuf.resetReaderIndex();String s &#61; new String(content, CharsetUtil.UTF_8);return s.equals(WEBSOCKET_LINE_PREFIX);}/*** 是否是自定义是有协议* &#64;param byteBuf* &#64;return*/private boolean isCustomProcotol(ByteBuf byteBuf) {byteBuf.markReaderIndex();byte[] content &#61; new byte[SPACE_LENGTH];byteBuf.readBytes(content);byteBuf.resetReaderIndex();String s &#61; new String(content, CharsetUtil.UTF_8);return s.indexOf(" ") &#61;&#61; -1;}/*** 动态添加WebSocket处理器* &#64;param pipeline*/private void addWebSocketHandlers(ChannelPipeline pipeline) {pipeline.addLast(new HttpServerCodec());pipeline.addLast(new HttpObjectAggregator(8192));pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PREFIX));pipeline.addLast(new MyTextWebSocketFrameHandler());}/*** 动态添加TCP私有协议处理器* &#64;param pipeline*/private void addTCPProtocolHandlers(ChannelPipeline pipeline) {pipeline.addLast(new CustomDecoder(1024, 1, 4));//这里1代表长度属性是从索引1位置开始的&#xff0c;4代表有4个字节的长度pipeline.addLast(new CutsomServerHandler());}/*** 动态添加HTTP处理器* &#64;param pipeline*/private void addHTTPHandlers(ChannelPipeline pipeline) {pipeline.addLast(new HttpServerCodec());pipeline.addLast(new HttpObjectAggregator(8192));pipeline.addLast(new MyHttpServerHandler());}
}
MyTextWebSocketFrameHandler服务端处理websocket
这个很简单&#xff0c;就是收到了返回一下。
public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{&#64;Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {System.out.println(String.format("收到websocket客户端[%s]消息:", ctx.channel().remoteAddress()&#43;":"&#43;ctx.channel().id().asLongText()) &#43; msg.text());ctx.channel().writeAndFlush(new TextWebSocketFrame( msg.text()));}&#64;Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("异常发生:" &#43; cause.getMessage());ctx.close();}
}
MyHttpServerHandler服务端处理http
也是简单的打印下收到的信息&#xff0c;没有回复。
public class MyHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {&#64;Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {if (msg instanceof FullHttpRequest) {FullHttpRequest httpRequest &#61; (FullHttpRequest) msg;ByteBuf content1 &#61; httpRequest.content();String cont &#61; content1.toString(CharsetUtil.UTF_8);System.out.println(String.format("收到HTTP客户端[%s]消息", ctx.channel().remoteAddress()&#43;":"&#43;ctx.channel().id().asLongText()) &#43; ":" &#43; cont);}}}
自定义协议
首先定义了要传递的协议格式&#xff0c;这个就是用TCP
字节流直接来传的。
&#64;Data
public class SimpleProtocol {/*** 协议类型*/private byte protocolType;/*** 消息体长度*/private int bodyLength;/*** 消息内容*/private byte[] body;
}
自定义协议当然需编解码啦&#xff0c;最简单的实现&#xff1a;
public class CustomEncoder extends MessageToByteEncoder<SimpleProtocol> {&#64;Overrideprotected void encode(ChannelHandlerContext ctx, SimpleProtocol simpleProtocol, ByteBuf out) throws Exception {byte protocolType &#61; simpleProtocol.getProtocolType();int length &#61; simpleProtocol.getBodyLength();byte[] body &#61; simpleProtocol.getBody();out.writeByte(protocolType);out.writeInt(length);if(length>0){out.writeBytes(body);}}
}
我偷懒了直接集成LengthFieldBasedFrameDecoder
&#xff0c;内部处理了粘包拆包问题&#xff0c;这里就注意要记得处理要完释放缓冲区。
public class CustomDecoder extends LengthFieldBasedFrameDecoder {public CustomDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {super(maxFrameLength, lengthFieldOffset, lengthFieldLength);}&#64;Overrideprotected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {ByteBuf byteBuf &#61; (ByteBuf) super.decode(ctx, in);if (byteBuf &#61;&#61; null) {return null;}SimpleProtocol simpleProtocol &#61; new SimpleProtocol();simpleProtocol.setProtocolType(byteBuf.readByte());int length &#61; byteBuf.readInt();simpleProtocol.setBodyLength(length);if (simpleProtocol.getBodyLength() > 0) {byte[] body &#61; new byte[length];byteBuf.readBytes(body);simpleProtocol.setBody(body);}in.release();//记得释放return simpleProtocol;}}
也是简单的打印下收到的信息。
public class CutsomServerHandler extends SimpleChannelInboundHandler<SimpleProtocol> {&#64;Overrideprotected void channelRead0(ChannelHandlerContext ctx, SimpleProtocol msg) throws Exception {System.out.println(String.format("收到TCP私有协议客户端[%s]消息:", ctx.channel().remoteAddress()&#43;":"&#43;ctx.channel().id().asLongText()) &#43;new String(msg.getBody(), CharsetUtil.UTF_8));}&#64;Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}}
验证结果
基本的都写了&#xff0c;我们看看执行的结果吧。先HTTP
。
然后上WebSocket
&#xff0c;看看两个一起怎么样。
再加自定义协议。
其实只要你理解了HTTP
&#xff0c;WebSocket
协议是怎么解析的&#xff0c;TCP
私有协议怎么解析&#xff0c;自然知道怎么去区分啦&#xff0c;因为他们都是基于TCP
的&#xff0c;你能拿到TCP
的字节数据&#xff0c;就有办法判断是哪种协议&#xff0c;所有的应用层的协议都可以处理&#xff0c;当然我这里是简单的实现了3
个&#xff0c;你可以扩展N
个。
源码&#xff1a;netty_action
更多参考&#xff1a;
https://cloud.tencent.com/developer/article/1366184
好了&#xff0c;今天就到这里了&#xff0c;希望对学习理解有帮助&#xff0c;大神看见勿喷&#xff0c;仅为自己的学习理解&#xff0c;能力有限&#xff0c;请多包涵。