一、Future
Future源自java.util.concurrent.Future,用于获取异步操作的结果,它通过get()方法获取异步操作结果,操作尚未完成,则阻塞。
Netty认为这是一个很不好的设计,操作结束时间难以确定,何不通过回调的方式获取结果呢。Netty的ChannelFuture通过监听的方式,当操作结束时调用注册在上面的方法获取操作结果。
Future的方法定义:
public interface Future<V> extends java.util.concurrent.Future<V> {
//执行是否成功
boolean isSuccess();
//是否可以取消
boolean isCancellable();
//操作失败时,返回失败原因
Throwable cause();
//添加Listener,当操作结束后回调
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
//移除listener
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
//阻塞至任务结束,如果操作失败,抛异常
Future<V> sync() throws InterruptedException;
//不响应中断的阻塞
Future<V> syncUninterruptibly();
//阻塞等待任务,操作失败不抛异常
//不要在ChannelHandler中调用ChannelFuture的await()方法,这样会导致死锁
//原因是发起IO操作后,由IO线程负责异步通知发起IO线程的用户线程,如果IO线程和用户线程是统一线程,就会导致IO线程等待自己通知操作完成,这就导致了死锁
Future<V> await() throws InterruptedException;
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
//获取执行结果,不阻塞
V getNow();
//取消任务执行
@Override
boolean cancel(boolean mayInterruptIfRunning);
}
知识兔二、ChannelFuture
这个接口只是将Future的方法的返回参数改为ChannelFuture,这样就可以使用一直点下去,有点像builder模式(ex: channelFuture.addListener(..).addListener(..).sync())。
public interface ChannelFuture extends Future<Void> {
//返回和ChannelFuture关联的Channel
Channel channel();
@Override
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture sync() throws InterruptedException;
@Override
ChannelFuture syncUninterruptibly();
@Override
ChannelFuture await() throws InterruptedException;
@Override
ChannelFuture awaitUninterruptibly();
}
知识兔三、Promise
Promise继承了Future,是可写的Future,因为 future 不支持写操作接口,netty 使用 promise 扩展了 future, 可以对异步操作结果进行设置。
public interface Promise<V> extends Future<V> {
// 标记该 future 成功及设置其执行结果,并且会通知所有的 listeners
// 如果操作失败,将抛出异常(失败指的是该 future 已经有了结果了,成功的结果,或者失败的结果)
Promise<V> setSuccess(V result);
// 和 setSuccess 方法一样,只不过如果失败,它不抛异常,返回 false
boolean trySuccess(V result);
// 标记该 future 失败,及其失败原因。
// 如果失败,将抛出异常(失败指的是已经有了结果了)
Promise<V> setFailure(Throwable cause);
// 标记该 future 失败,及其失败原因。
// 如果已经有结果,返回 false,不抛出异常
boolean tryFailure(Throwable cause);
// 标记该 future 不可以被取消
boolean setUncancellable();
// 同ChannelFuture一样,返回this
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> await() throws InterruptedException;
@Override
Promise<V> awaitUninterruptibly();
@Override
Promise<V> sync() throws InterruptedException;
@Override
Promise<V> syncUninterruptibly();
}
知识兔四、ChannelPromise
ChannelPromise继承了ChannelFuture和Promise,将功能聚合起来。同样的,方法返回值改成ChannelPromise以便调用。
public interface ChannelPromise extends ChannelFuture, Promise<Void> {
// 覆写 ChannelFuture 中的 channel() 方法,其实这个方法一点没变
@Override
Channel channel();
// 下面几个方法是覆写 Promise 中的接口,为了返回值类型是 ChannelPromise
@Override
ChannelPromise setSuccess(Void result);
ChannelPromise setSuccess();
boolean trySuccess();
@Override
ChannelPromise setFailure(Throwable cause);
// 到这里大家应该都熟悉了,下面几个方法的覆写也是为了得到 ChannelPromise 类型的实例
@Override
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise sync() throws InterruptedException;
@Override
ChannelPromise syncUninterruptibly();
@Override
ChannelPromise await() throws InterruptedException;
@Override
ChannelPromise awaitUninterruptibly();
/**
* Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself.
*/
// 我们忽略这个方法吧。
ChannelPromise unvoid();
}
知识兔