Guava ListenableFuture 源码解析

Java的Future虽然已经提供异步操作,但是不能直接回调。Guava对Future进行了增强,核心接口就是ListenableFuture。

guava对JDK的异步增强可以通过看MoreExecutor和Futures两个类的源码入手。

ListenableFuture继承了Future,额外新增了一个方法,listener是任务结束后的回调方法,executor是执行回调方法的执行器(通常是线程池)。guava中对future的增强就是在addListener这个方法上进行了各种各样的封装,所以addListener是核心方法。

jdk的FutureTask类是对Future接口的实现,guava中ListenableFutureTask继承了FutureTask并实现了ListenableFuture。

addListener

// 无返回值
@Override
public void addListener(Runnable listener, Executor exec) {
executionList.add(listener, exec);
}
public void add(Runnable runnable, Executor executor) {

checkNotNull(runnable, "Runnable was null.");
checkNotNull(executor, "Executor was null.");
synchronized (this) {
if (!executed) {
//如果还没执完,放入任务队列中(单链表),等待被执行。
runnables = new RunnableExecutorPair(runnable, executor, runnables);
return;
}
}
executeListener(runnable, executor);
}

如果task已经执行完成,执行executeListener方法

// 执行task的回调任务
private static void executeListener(Runnable runnable, Executor executor) {
try {
//这里指的是执行task的所有回调函数,因为回调函数也是一种任务,这里的runnable是执行task的回调任务。
executor.execute(runnable);
} catch (RuntimeException e) {
log.log(
Level.SEVERE,
"RuntimeException while executing runnable " + runnable + " with executor " + executor,
e);
}
}

如果task还没被执行,则把回调函数保存到队列中,这个队列是一个单链表,等待task执行完,再依次执行这个队列所有等待的回调函数。这个单链表的节点类:

为什么要用链表呢?因为一个task任务可以有多个回调函数。

private static final class RunnableExecutorPair {
final Runnable runnable;
final Executor executor;
@Nullable RunnableExecutorPair next;

RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) {
this.runnable = runnable;
this.executor = executor;
this.next = next;
}
}

重点!实际上listener模式只是重写了FutureTask的done方法,因为在futureTask中任务执行后在finishCompletion方法会调用done方法。

@Override
protected void done() {
executionList.execute(); //执行任务队列
}
public void execute() {
// Lock while we update our state so the add method above will finish adding any listeners
// before we start to run them.
//在我们开始执行run方法前,先完成了增加listen监听。
RunnableExecutorPair list;
synchronized (this) {
if (executed) {
return;
}
executed = true;
list = runnables;
runnables = null; // allow GC to free listeners even if this stays around for a while.
}

//将任务链表反转,存到reversedList中,因为按先来先服务的原则,先添加进来的runnable一般先执行。
RunnableExecutorPair reversedList = null;
while (list != null) { // 反转单链表
RunnableExecutorPair tmp = list;
list = list.next;
tmp.next = reversedList;
reversedList = tmp;
}
//执行任务链表,执行监听作用
while (reversedList != null) {
executeListener(reversedList.runnable, reversedList.executor);
reversedList = reversedList.next;
}
}

这里指的是某项task执行完之后,会调用它的所有回调函数列表,按照先监听先服务的原则,依次执行该task的所有回调函数。

addCallback(future, futureCallback, pool)

// 有返回值
public static <V> void addCallback(
final ListenableFuture<V> future,
final FutureCallback<? super V> callback,
Executor executor) {
Preconditions.checkNotNull(callback);
future.addListener(new CallbackListener<V>(future, callback), executor);
}

底层用CallbackListener封装了一下,还是调用了addListener方法。

为什么要封装一下呢?因为要拿到task任务的返回值。

private static final class CallbackListener<V> implements Runnable {
final Future<V> future;
final FutureCallback<? super V> callback;

CallbackListener(Future<V> future, FutureCallback<? super V> callback) {
this.future = future;
this.callback = callback;
}

@Override
public void run() {
final V value;
try {
//在这里拿到任务执行完成后的返回值
value = getDone(future);
} catch (ExecutionException e) {
callback.onFailure(e.getCause());
return;
} catch (RuntimeException | Error e) {
callback.onFailure(e);
return;
}
callback.onSuccess(value);
}
}

在这个run方法里面就调用getDone等待返回值,如果有异常就调用callback.onFailure,没有异常就调动callback.onSuccess(value)。

public static <V> V getDone(Future<V> future) throws ExecutionException {
checkState(future.isDone(), "Future was expected to be done: %s", future);
return getUninterruptibly(future);
}

public static <V> V getUninterruptibly(Future<V> future) throws ExecutionException {
boolean interrupted = false;
try {
while (true) {
try {
//future.get()就是到等待任务执行结束
return future.get();
} catch (InterruptedException e) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}

listener中的回调函数在哪执行。

private final class TrustedFutureInterruptibleTask extends InterruptibleTask<V> {
private final Callable<V> callable;

@Override
void afterRanInterruptibly(V result, Throwable error) {
if (error == null) {
TrustedListenableFutureTask.this.set(result);
} else {
setException(error);
}
}
}

在任务执行完之后,这里回调执行调用了set方法:

protected boolean set(@Nullable V value) {
Object valueToSet = value == null ? NULL : value;
if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
complete(this);
return true;
}
return false;
}

调用complete方法,执行该task的所有回调任务:

// 在这里插入代码片
Author: Tunan
Link: http://yerias.github.io/2021/09/05/java/29/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.