本文共 6402 字,大约阅读时间需要 21 分钟。
ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之间的关 联,每当有 ChannelHandler 添加到 ChannelPipeline 中时,都会创建 ChannelHandler- Context。ChannelHandlerContext 的主要功能是管理它所关联的 ChannelHandler 和在 同一个 ChannelPipeline 中的其他 ChannelHandler 之间的交互。
ChannelHandlerContext 有很多的方法,其中一些方法也存在于 Channel 和 ChannelPipeline 本身上,但是有一点重要的不同。
当使用 ChannelHandlerContext 的 API 的时候,请牢记以下两点:
通过 ChannelHandlerContext 获取到 Channel 的引用。调用 Channel 上的 write()方法将会导致写入事件从尾端到头部地流经 ChannelPipeline。
//从ChannelHandlerContext访问ChannelChannelHandlerContext ctx = ...;Channel channel = ctx.channel();channel.write(Unpooled.copieBuffer("Netty in Action", CharsetUtil.UTF_8))
//通过ChannelHandlerContext访问ChannelPipelineChannelHandlerContext ctx = ...;ChannelPipeline pipeline = ctx.pipeline();pipeline.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
虽然被调用的 Channel 或 ChannelPipeline 上的 write()方法将一直传播事件通 过整个 ChannelPipeline,但是在 ChannelHandler 的级别上,事件从一个 ChannelHandler 到下一个 ChannelHandler 的移动是由 ChannelHandlerContext 上的调用完成的。
为什么会想要从 ChannelPipeline 中的某个特定点开始传播事件呢?
要想调用从某个特定的 ChannelHandler 开始的处理过程,必须获取到在(ChannelPipeline)该 ChannelHandler 之前的 ChannelHandler 所关联的 ChannelHandlerContext。这个 ChannelHandlerContext 将调用和它所关联的 ChannelHandler 之后的 ChannelHandler。
//调用ChannelHandlerContext的write()方法ChannelHandlerContext ctx = ...;ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));//write()方法将把缓冲区数据发送到下一个ChannelHandler
消息将从下一个 ChannelHandler 开始流经 ChannelPipeline,绕过了 所有前面的 ChannelHandler。
ChannelHandler和ChannelHandlerContext的高级用法:
可以通过调用 ChannelHandlerContext 上的 pipeline()方法来获得被封闭的 ChannelPipeline 的引用。这使得运行时得以操作 ChannelPipeline 的 ChannelHandler,我们可以利用这一点来实现一些复杂的设计。例如, 你可以通过将 ChannelHandler 添加到 ChannelPipeline 中来实现动态的协议切换。
另一种高级的用法是缓存到 ChannelHandlerContext 的引用以供稍后使用,这可能会发 生在任何的 ChannelHandler 方法之外,甚至来自于不同的线程。
//缓存到ChannelHandlerContext的引用public class WriteHandler extends ChannelHandlerAdapter { private ChannelHandlerContext ctx; @Over public void handlerAdded(ChannelHandlerContext ctx){ this.ctx = ctx; } public void send(String msg){ //使用之前的ctx来发送消息 ctx.writeAndFlush(msg); }}因为一个 ChannelHandler 可以从属于多个 ChannelPipeline,所以它也可以绑定到多 个 ChannelHandlerContext 实例。对于这种用法指在多个 ChannelPipeline 中共享同一 个 ChannelHandler,`对应的 ChannelHandler 必须要使用@Sharable 注解标注;否则, 试图将它添加到多个 ChannelPipeline 时将会触发异常`。显而易见,为了安全地被用于多个 并发的 Channel(即连接),这样的 ChannelHandler 必须是线程安全的。```java//可共享的ChannelHandler@ChannelHandler.Sharable public class SharableHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Channel read message: " + msg); //记录方法调用,并转发给下一个ChannelHandler ctx.fireChannelRead(msg); } }
前面的 ChannelHandler 实现符合所有的将其加入到多个 ChannelPipeline 的需求, 即它使用了注解@Sharable 标注,并且也不持有任何的状态。相反,下面中的实现将 会导致问题。
@ChannelHandler.Sharable public class UnsharableHandler extends ChannelInboundHandlerAdapter { private int count; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { count++; System.out.println("channelRead(...) called the " + count + "tieme"); ctx.fireChannelRead(msg); } }
这段代码的问题在于它拥有状态,即用于跟踪方法调用次数的实例变量count。将这个类 的一个实例添加到ChannelPipeline将极有可能在它被多个并发的Channel访问时导致问 题。(当然,这个简单的问题可以通过使channelRead()方法变为同步方法来修正。)
总之,只应该在确定了你的 ChannelHandler 是线程安全的时才使用@Sharable 注解。为何要共享同一个ChannelHandler 在多个ChannelPipeline中安装同一个ChannelHandler的一个常见的原因是用于收集跨越多个 Channel 的统计信息。
处理入站异常:
如果在处理入站事件的过程中有异常被抛出,那么它将从它在 ChannelInboundHandler 里被触发的那一点开始流经 ChannelPipeline。要想处理这种类型的入站异常,你需要在你 的 ChannelInboundHandler 实现中重写下面的方法:
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
//基本的入站异常处理public class InboundException extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
因为异常将会继续按照入站方向流动(就像所有的入站事件一样),所以实现了前面所示逻 辑的 ChannelInboundHandler 通常位于 ChannelPipeline 的最后。这确保了所有的入站 异常都总是会被处理,无论它们可能会发生在 ChannelPipeline 中的什么位置。
exceptionCaught()
方法。然后你需要决定是否需要将该异常传播出去。处理出站异常:
用于处理出站操作中的正常完成以及异常的选项,都基于以下的通知机制:
ChannelPromise setSuccess();ChannelPromise setFailure(Throwable cause);
添加 ChannelFutureListener 只需要调用 ChannelFuture 实例上的 addListener (ChannelFutureListener)
方法,并且有两种不同的方式可以做到这一点。其中最常用的方式是, 调用出站操作(如 write()方法)所返回的 ChannelFuture 上的 addListener()方法。
//添加ChannelFutureListener到ChannelFutureChannelFuture future = channel.write(someMessage);future.addListener(new ChannelFutureListener(){ @Override public void operationComplete(ChannelFuture f){ if(!f.isSuccess()){ f.cause().printStackTrace(); f.channel().close(); } }});
第二种方式是将 ChannelFutureListener 添加到即将作为参数传递给 ChannelOutboundHandler 的方法的 ChannelPromise。
//添加ChannelFutureListener到ChannelPromisepublic class OutboundExceptionHandler extens ChannelOutboundHandlerAdapter { @Over public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise){ promise.addListener(new ChannelFutureListener(){ @Override public void operationComplete(ChannelFuture f){ if(!f.isSuccess()){ f.cause().printStackTrance(); f.channel().close(); } } }); }}
为何选择一种方式而不是另一种呢?
对于细致的异常处理,你可能会发现,在调用出站操 作时添加 ChannelFutureListener 更合适,而对于一般的异常处 理,你可能会发现,自定义的 ChannelOutboundHandler 实现的方式 更加的简单。
如果你的 ChannelOutboundHandler 本身抛出了异常会发生什么呢?在这种情况下, Netty 本身会通知任何已经注册到对应 ChannelPromise 的监听器。
转载地址:http://olpqb.baihongyu.com/