Java的Future虽然已经提供异步操作,但是不能直接回调。Guava对Future进行了增强,核心接口就是ListenableFuture。
guava对JDK的异步增强可以通过看MoreExecutor和Futures两个类的源码入手。
ListenableFuture继承了Future,额外新增了一个方法,listener是任务结束后的回调方法,executor是执行回调方法的执行器(通常是线程池)。guava中对future的增强就是在addListener这个方法上进行了各种各样的封装,所以addListener是核心方法。
jdk的FutureTask类是对Future接口的实现,guava中ListenableFutureTask继承了FutureTask并实现了ListenableFuture。
addListener @Override public void addListener (Runnable listener, Executor exec) { executionList.add(listener, exec); } public void add (Runnable runnable, Executor executor) { checkNotNull(runnable, "Runnable was null." ); checkNotNull(executor, "Executor was null." ); synchronized (this ) { if (!executed) { runnables = new RunnableExecutorPair(runnable, executor, runnables); return ; } } executeListener(runnable, executor); }
如果task已经执行完成,执行executeListener方法
private static void executeListener (Runnable runnable, Executor executor) { try { executor.execute(runnable); } catch (RuntimeException e) { log.log( Level.SEVERE, "RuntimeException while executing runnable " + runnable + " with executor " + executor, e); } }
如果task还没被执行,则把回调函数保存到队列中,这个队列是一个单链表,等待task执行完,再依次执行这个队列所有等待的回调函数。这个单链表的节点类:
为什么要用链表呢?因为一个task任务可以有多个回调函数。
private static final class RunnableExecutorPair { final Runnable runnable; final Executor executor; @Nullable RunnableExecutorPair next; RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) { this .runnable = runnable; this .executor = executor; this .next = next; } }
重点!实际上listener模式只是重写了FutureTask的done方法,因为在futureTask中任务执行后在finishCompletion方法会调用done方法。
@Override protected void done () { executionList.execute(); } public void execute () { RunnableExecutorPair list; synchronized (this ) { if (executed) { return ; } executed = true ; list = runnables; runnables = null ; } RunnableExecutorPair reversedList = null ; while (list != null ) { RunnableExecutorPair tmp = list; list = list.next; tmp.next = reversedList; reversedList = tmp; } while (reversedList != null ) { executeListener(reversedList.runnable, reversedList.executor); reversedList = reversedList.next; } }
这里指的是某项task执行完之后,会调用它的所有回调函数列表,按照先监听先服务的原则,依次执行该task的所有回调函数。
addCallback(future, futureCallback, pool) public static <V> void addCallback ( final ListenableFuture<V> future, final FutureCallback<? super V> callback, Executor executor) { Preconditions.checkNotNull(callback); future.addListener(new CallbackListener<V>(future, callback), executor); }
底层用CallbackListener封装了一下,还是调用了addListener方法。
为什么要封装一下呢?因为要拿到task任务的返回值。
private static final class CallbackListener <V > implements Runnable { final Future<V> future; final FutureCallback<? super V> callback; CallbackListener(Future<V> future, FutureCallback<? super V> callback) { this .future = future; this .callback = callback; } @Override public void run () { final V value; try { value = getDone(future); } catch (ExecutionException e) { callback.onFailure(e.getCause()); return ; } catch (RuntimeException | Error e) { callback.onFailure(e); return ; } callback.onSuccess(value); } }
在这个run方法里面就调用getDone等待返回值,如果有异常就调用callback.onFailure,没有异常就调动callback.onSuccess(value)。
public static <V> V getDone (Future<V> future) throws ExecutionException { checkState(future.isDone(), "Future was expected to be done: %s" , future); return getUninterruptibly(future); } public static <V> V getUninterruptibly (Future<V> future) throws ExecutionException { boolean interrupted = false ; try { while (true ) { try { return future.get(); } catch (InterruptedException e) { interrupted = true ; } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } }
listener中的回调函数在哪执行。
private final class TrustedFutureInterruptibleTask extends InterruptibleTask <V > { private final Callable<V> callable; @Override void afterRanInterruptibly (V result, Throwable error) { if (error == null ) { TrustedListenableFutureTask.this .set(result); } else { setException(error); } } }
在任务执行完之后,这里回调执行调用了set方法:
protected boolean set (@Nullable V value) { Object valueToSet = value == null ? NULL : value; if (ATOMIC_HELPER.casValue(this , null , valueToSet)) { complete(this ); return true ; } return false ; }
调用complete方法,执行该task的所有回调任务: