thenApply / thenApplyAsync
thenApply 表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,测试用例如下:
@Test public void test5() throws Exception { ForkJoinPool pool=new ForkJoinPool(); CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis()); return 1.2; },pool); CompletableFuture<String> cf2=cf.thenApply((result)->{ System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis()); return "test:"+result; }); System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis()); System.out.println("run result->"+cf.get()); System.out.println("main thread start cf2.get(),time->"+System.currentTimeMillis()); System.out.println("run result->"+cf2.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
|
job1执行结束后,将job1的方法返回值作为入参传递到job2中并立即执行job2。
thenApplyAsync与thenApply的区别在于,前者是将job2提交到线程池中异步执行,实际执行job2的线程可能是另外一个线程,后者是由执行job1的线程立即执行job2,即两个job都是同一个线程执行的。
从输出可知,执行job1和job2是两个不同的线程。thenApplyAsync有一个重载版本,可以指定执行异步任务的Executor实现,如果不指定,默认使用ForkJoinPool.commonPool()。 下述的多个方法,每个方法都有两个以Async结尾的方法,一个使用默认的Executor实现,一个使用指定的Executor实现,不带Async的方法是由触发该任务的线程执行该任务,带Async的方法是由触发该任务的线程将任务提交到线程池,执行任务的线程跟触发任务的线程不一定是同一个。
thenAccept / thenRun
thenAccept 同 thenApply 接收上一个任务的返回值作为参数,但是无返回值;thenRun 的方法没有入参,也买有返回值,测试用例如下:
@Test public void test6() throws Exception { ForkJoinPool pool=new ForkJoinPool(); CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis()); return 1.2; },pool); CompletableFuture cf2=cf.thenApply((result)->{ System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis()); return "test:"+result; }).thenAccept((result)-> { System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(result); System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis()); }).thenRun(()->{ System.out.println(Thread.currentThread()+" start job4,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println("thenRun do something"); System.out.println(Thread.currentThread()+" exit job4,time->"+System.currentTimeMillis()); }); System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis()); System.out.println("run result->"+cf.get()); System.out.println("main thread start cf2.get(),time->"+System.currentTimeMillis()); System.out.println("run result->"+cf2.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
|
exceptionally
exceptionally方法指定某个任务执行异常时执行的回调方法,会将抛出异常作为参数传递到回调方法中,如果该任务正常执行则会exceptionally方法返回的CompletionStage的result就是该任务正常执行的结果,测试用例如下:
@Test public void test2() throws Exception { ForkJoinPool pool=new ForkJoinPool(); CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+"job1 start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } if(true){ throw new RuntimeException("test"); }else{ System.out.println(Thread.currentThread()+"job1 exit,time->"+System.currentTimeMillis()); return 1.2; } },pool); CompletableFuture<Double> cf2= cf.exceptionally((param)->{ System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println("error stack trace->"); param.printStackTrace(); System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis()); return -1.1; }); CompletableFuture cf3=cf.thenAccept((param)->{ System.out.println(Thread.currentThread()+"job2 start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println("param->"+param); System.out.println(Thread.currentThread()+"job2 exit,time->"+System.currentTimeMillis()); }); System.out.println("main thread start,time->"+System.currentTimeMillis()); System.out.println("run result->"+cf2.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
|
抛出异常后,只有cf2执行了,cf3没有执行。将上述示例中的if(true) 改成if(false),就能正常执行。
cf2没有指定,其result就是cf执行的结果,理论上cf2.get应该立即返回的,此处是等待了cf3,即job2执行完成后才返回。
whenComplete
whenComplete是当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法,如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。测试用例如下:
@Test public void test10() throws Exception { CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+"job1 start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } if(false){ throw new RuntimeException("test"); }else{ System.out.println(Thread.currentThread()+"job1 exit,time->"+System.currentTimeMillis()); return 1.2; } }); CompletableFuture<Double> cf2=cf.whenComplete((a,b)->{ System.out.println(Thread.currentThread()+"job2 start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } if(b!=null){ System.out.println("error stack trace->"); b.printStackTrace(); }else{ System.out.println("run succ,result->"+a); } System.out.println(Thread.currentThread()+"job2 exit,time->"+System.currentTimeMillis()); }); System.out.println("main thread start wait,time->"+System.currentTimeMillis()); System.out.println("run result->"+cf2.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
|
handle
跟whenComplete基本一致,区别在于handle的回调方法有返回值,且handle方法返回的CompletableFuture的result是回调方法的执行结果或者回调方法执行期间抛出的异常,与原始CompletableFuture的result无关了。测试用例如下:
@Test public void test10() throws Exception { CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+"job1 start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } if(true){ throw new RuntimeException("test"); }else{ System.out.println(Thread.currentThread()+"job1 exit,time->"+System.currentTimeMillis()); return 1.2; } }); CompletableFuture<String> cf2=cf.handle((a,b)->{ System.out.println(Thread.currentThread()+"job2 start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } if(b!=null){ System.out.println("error stack trace->"); b.printStackTrace(); }else{ System.out.println("run succ,result->"+a); } System.out.println(Thread.currentThread()+"job2 exit,time->"+System.currentTimeMillis()); if(b!=null){ return "run error"; }else{ return "run succ"; } }); System.out.println("main thread start wait,time->"+System.currentTimeMillis()); System.out.println("run result->"+cf2.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
|