我正在尝试使用简单的服务器 - 客户端应用程序进入Netty(代码见下文).
我正在努力解决两个问题:
ConfigServerHandler和.正确调用ConfigClientHandler.但是FeedbackServerHandler分别是 从不调用FeedbackClientHandler.为什么?根据文档,应该一个接一个地调用处理程序.
我想要几个处理程序.这些处理程序中的每一个仅对另一方发送的一些消息感兴趣(例如,由客户端发送,由服务器接收).
我应该在处理程序(channelRead)收到消息后过滤消息吗?我如何区分不同的字符串?对于不同的对象,通过解析它们应该非常容易.
是否可以为SocketChannel定义不同的ChannelPipelines?
更进一步?
谢谢你的帮助!
KJ
这是服务器的创建方式:
public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast( new ObjectEncoder(), new ObjectDecoder(ClassResolvers.cacheDisabled(null)), new ConfigServerHandler(), new FeedbackServerHandler()); } }); b.bind(mPort).sync().channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
其中一个Handler类(FeedbackServerHandler完全相同但解析为Integer):
public class ConfigServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("ConfigServerHandler::channelRead, " +(String)msg); ctx.write(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
客户端看起来非常相似:
public Client(String host, int port) throws InterruptedException { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(workerGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast( new ObjectEncoder(), new ObjectDecoder(ClassResolvers.cacheDisabled(null)), new ConfigClientHandler(), new FeedbackClientHandler()); } }); b.connect(host, port).sync().channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } }
这是一个客户端处理程序(另一个发送Integer消息并在'channelRead'方法中解析为Integer):
public class ConfigClientHandler extends ChannelInboundHandlerAdapter { private final String firstMessage = "blubber"; @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("ConfigClientHandler::channelActive"); ctx.writeAndFlush(firstMessage); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("ConfigClientHandler::channelRead, " +(String)msg); ctx.write(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); }
}
你正在使用ChannelInboundHandlerAdapter
,这对你的"中间"处理程序来说很好ConfigXxxxxHandler
.
但是你使用channelRead
方法然后在里面使用ctx.write(msg)
.ctx.write(msg)
将通过前一个处理程序first(ObjectDecoder
)将msg写回另一个服务器,而不是下一个处理程序(FeedbackClientHandler
在您的情况下).
如果要将消息发送到下一个处理程序,则应使用以下内容:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("ConfigClientHandler::channelRead, " +(String)msg); ctx.fireChannelRead(msg); }
,当然没有ctx.flush()
在channelReadComplete
(因为没有更多的写有).但是在你的决赛中FeedbackClientHandler
,当然要使用flush方法ctx.write(yourNewMessage)
或使用ctx.writeAndFlush(yourNewMessage)
.
所以要恢复:
ctx.write
将消息发送到线路,所以到前一个处理器下行到通道然后到网络,所以出站方式
ctx.fireChannelRead
将消息发送到下一个处理程序(相反的方式),所以入站方式
有关详细信息,请参见http://netty.io/wiki/new-and-noteworthy-in-4.0.html#wiki-h4-16.
您也许应该反转编码器/解码器,因为通常首先是解码器,然后是流水线中的编码器.
p.addLast( new ObjectDecoder(ClassResolvers.cacheDisabled(null)), new ObjectEncoder(), new ConfigClientHandler(), new FeedbackClientHandler());