thenCombine / thenAcceptBoth / runAfterBoth
这三个方法都是将两个CompletableFuture组合起来,只有这两个都正常执行完了才会执行某个任务,区别在于,thenCombine会将两个任务的执行结果作为方法入参传递到指定方法中,且该方法有返回值;thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值;runAfterBoth没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。测试用例如下:
@Test public void test7() throws Exception { ForkJoinPool pool=new ForkJoinPool(); CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis()); return 1.2; }); CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis()); try { Thread.sleep(1500); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis()); return 3.2; }); CompletableFuture<Double> cf3=cf.thenCombine(cf2,(a,b)->{ System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis()); System.out.println("job3 param a->"+a+",b->"+b); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis()); return a+b; });
CompletableFuture cf4=cf.thenAcceptBoth(cf2,(a,b)->{ System.out.println(Thread.currentThread()+" start job4,time->"+System.currentTimeMillis()); System.out.println("job4 param a->"+a+",b->"+b); try { Thread.sleep(1500); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job4,time->"+System.currentTimeMillis()); });
CompletableFuture cf5=cf4.runAfterBoth(cf3,()->{ System.out.println(Thread.currentThread()+" start job5,time->"+System.currentTimeMillis()); try { Thread.sleep(1000); } catch (InterruptedException e) { } System.out.println("cf5 do something"); System.out.println(Thread.currentThread()+" exit job5,time->"+System.currentTimeMillis()); });
System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis()); System.out.println("cf run result->"+cf.get()); System.out.println("main thread start cf5.get(),time->"+System.currentTimeMillis()); System.out.println("cf5 run result->"+cf5.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
|
job1 和 job2几乎同时运行,job2比job1先执行完成,等job1退出后,job3和job4几乎同时开始运行,job4先退出,等job3执行完成,job5开始了,等job5执行完成后,主线程退出。
applyToEither / acceptEither / runAfterEither
这三个方法都是将两个CompletableFuture组合起来,只要其中一个执行完了就会执行某个任务,其区别在于applyToEither会将已经执行完成的任务的执行结果作为方法入参,并有返回值;acceptEither同样将已经执行完成的任务的执行结果作为方法入参,但是没有返回值;runAfterEither没有方法入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。测试用例如下:
@Test public void test8() throws Exception { CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis()); return 1.2; }); CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis()); try { Thread.sleep(1500); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis()); return 3.2; }); CompletableFuture<Double> cf3=cf.applyToEither(cf2,(result)->{ System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis()); System.out.println("job3 param result->"+result); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis()); return result; });
CompletableFuture cf4=cf.acceptEither(cf2,(result)->{ System.out.println(Thread.currentThread()+" start job4,time->"+System.currentTimeMillis()); System.out.println("job4 param result->"+result); try { Thread.sleep(1500); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job4,time->"+System.currentTimeMillis()); });
CompletableFuture cf5=cf4.runAfterEither(cf3,()->{ System.out.println(Thread.currentThread()+" start job5,time->"+System.currentTimeMillis()); try { Thread.sleep(1000); } catch (InterruptedException e) { } System.out.println("cf5 do something"); System.out.println(Thread.currentThread()+" exit job5,time->"+System.currentTimeMillis()); });
System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis()); System.out.println("cf run result->"+cf.get()); System.out.println("main thread start cf5.get(),time->"+System.currentTimeMillis()); System.out.println("cf5 run result->"+cf5.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
|
thenCompose
thenCompose方法会在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法,该方法会返回一个新的CompletableFuture实例,如果该CompletableFuture实例的result不为null,则返回一个基于该result的新的CompletableFuture实例;如果该CompletableFuture实例为null,则,然后执行这个新任务,测试用例如下:
@Test public void test9() throws Exception { CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis()); return 1.2; }); CompletableFuture<String> cf2= cf.thenCompose((param)->{ System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis()); return CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis()); return "job3 test"; }); }); System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis()); System.out.println("cf run result->"+cf.get()); System.out.println("main thread start cf2.get(),time->"+System.currentTimeMillis()); System.out.println("cf2 run result->"+cf2.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
|
job1执行完成后job2开始执行,等job2执行完成后会把job3返回,然后执行job3,等job3执行完成后,主线程退出。
allOf / anyOf
allOf返回的CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。
@Test public void test11() throws Exception { CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis()); return 1.2; }); CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis()); try { Thread.sleep(1500); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis()); return 3.2; }); CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis()); try { Thread.sleep(1300); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis()); return 2.2; }); CompletableFuture cf4=CompletableFuture.allOf(cf,cf2,cf3).whenComplete((a,b)->{ if(b!=null){ System.out.println("error stack trace->"); b.printStackTrace(); }else{ System.out.println("run succ,result->"+a); } });
System.out.println("main thread start cf4.get(),time->"+System.currentTimeMillis()); System.out.println("cf4 run result->"+cf4.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
|
主线程等待最后一个job1执行完成后退出。anyOf返回的CompletableFuture是多个任务只要其中一个执行完成就会执行,其get返回的是已经执行完成的任务的执行结果,如果该任务执行异常,则抛出异常。