Java 8 CompleteFuture

Java8 CompletableFuture

1.线程池

默认使用ForkJoinPool线程池,可自定义线程池

ThreadPoolExecutor threadPool1 = new ThreadPoolExecutor(1, 1, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>());

2.基本用法

2.1 CompletableFuture.runAsync();

表示使用线程池执行一个无返回值的方法,可指定线程池

示例代码

1
2
3
4
CompletableFuture.runAsync(() -> {
System.out.println("runAsync -> start");
System.out.println("runAsync -> finish");
});
2.2 CompletableFuture.supplyAsync();

表示使用线程池执行一个有返回值的方法,可指定线程池

示例代码

1
2
3
4
5
6
7
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync -> start");
System.out.println("supplyAsync -> finish");
return "supplyAsync:result";
});
String result = future.get();
System.out.println(result);

3.进阶用法=>回调处理

3.1 thenApply();

某个任务完成后继续执行的动作,该方法接收上个任务的返回值,并在执行完成后返回值.使用的是相同的线程

示例代码

1
2
3
4
5
6
7
8
9
10
CompletableFuture.supplyAsync(() -> {
System.out.println("First Task -> start");
System.out.println("First Task -> finish");
return "First Task Result";
}).thenApply((result) -> {
System.out.println("Pre Task Result Value : " + result);
System.out.println("Second Task -> start");
System.out.println("Second Task -> finish");
return "Second Task Result";
});
3.2 thenApplyAsync();

某个任务完成后继续执行的动作,该方法接收上个任务的返回值,并在执行完成后返回值.可能使用的是别的线程,在指定线程池的情况下就是使用别的线程

示例代码

1
2
3
4
5
6
7
8
9
10
CompletableFuture.supplyAsync(() -> {
System.out.println("First Task -> start");
System.out.println("First Task -> finish");
return "First Task Result";
}).thenApplyAsync((result) -> {
System.out.println("Pre Task Result Value : " + result);
System.out.println("Second Task -> start");
System.out.println("Second Task -> finish");
return "Second Task Result";
});
3.3 thenAccept();

某个任务完成后继续执行的动作,该方法接收上个任务的返回值,但执行完后没有返回值

示例代码

1
2
3
4
5
6
7
8
9
CompletableFuture.supplyAsync(() -> {
System.out.println("First Task -> start");
System.out.println("First Task -> finish");
return "First Task Result";
}).thenAccept((result) -> {
System.out.println("Pre Task Result Value : " + result);
System.out.println("Second Task -> start");
System.out.println("Second Task -> finish");
});
3.4 thenRun();

某个任务完成后继续执行的动作后,继续执行该方法

示例代码

1
2
3
4
5
6
7
8
CompletableFuture.supplyAsync(() -> {
System.out.println("First Task -> start");
System.out.println("First Task -> finish");
return "First Task Result";
}).thenRun(() -> {
System.out.println("Second Task -> start");
System.out.println("Second Task -> finish");
});
3.5 exceptionally();

接收前面任务抛出的CompletionException异常,并返回最后一个任务的返回值类型的值.最后调用xxx.get();时不会抛出异常

public class CompletionException extends RuntimeException {}

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("First Task -> start");
int a = 1 / 0;
System.out.println("First Task -> finish");
return "First Task Result";
}).exceptionally((exception) -> {
System.out.println("exceptionally -> start");
logger.info("接收到异常:", exception);
System.out.println("exceptionally -> finish");
return "Exception";
});
System.out.println(future.get());

3.6 whenComplete();

同时接收任务执行期间的结果或者抛出的异常.最后调用xxx.get();时会抛出的ExecutionException异常

使用不同线程的方法 whenCompleteAsync();

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("First Task -> start");
int a = 1 / 0;
System.out.println("First Task -> finish");
return "First Task Result";
}).thenApply((result) -> {
System.out.println("Second Task -> start");
System.out.println("Second Task -> finish");
return "Second Task Result";
}).whenComplete((result, exception) -> {
logger.info("whenComplete -> result:{}", result);
logger.info("whenComplete -> exception:", exception);
});
System.out.println(future.get());

3.7 handle();

跟whenComplete差不多,区别在于handle的回调方法有返回值.使用这个方法之后,无论前面的任务返回什么值,最后拿到的是这个方法的返回值

使用不同线程的方法 handleAsync();

示例代码

1
2
3
4
5
6
7
8
9
10
11
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("First Task -> start");
int a = 1 / 0;
System.out.println("First Task -> finish");
return "First Task Result";
}).handle((result, exception) -> {
logger.info("whenComplete -> result:{}", result);
logger.info("whenComplete -> exception:", exception);
return "handle Result";
});
System.out.println(future.get());

4.进阶用法=>组合处理

4.1 两个CompletableFuture同时执行完成
  • thenCombine();
  • thenAcceptBoth();
  • runAfterBoth();

这三个方法都是两个CompletableFuture执行完之后才会继续执行,但有些区别:

  • thenCombine(); 接收两个CompletableFuture的返回值,有返回值

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
     CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("First Task -> start");
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    }
    System.out.println("First Task -> finish");
    return "First Task Result";
    });
    CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("Second Task -> start");
    try {
    Thread.sleep(100);
    } catch (InterruptedException e) {
    }
    System.out.println("Second Task -> finish");
    return "Second Task Result";
    });
    CompletableFuture<String> task3 = task2.thenCombine(task1, (taskResult1, taskResult2) -> {
    System.out.println("Combine Task -> start");
    System.out.println("Combine Task -> finish");
    return "Combine Task Result";
    });
    System.out.println(task3.get());
  • thenAcceptBoth(); 接收两个CompletableFuture的返回值,无返回值

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
     CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("First Task -> start");

    System.out.println("First Task -> finish");
    return "First Task Result";
    });
    CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("Second Task -> start");
    System.out.println("Second Task -> finish");
    return "Second Task Result";
    });
    CompletableFuture<Void> task3 = task2.thenAcceptBoth(task1, (taskResult1, taskResult2) -> {
    System.out.println("Combine Task -> start");
    System.out.println("Combine Task -> finish");
    });
    System.out.println(task3.get());
  • runAfterBoth(); 无接收值,也无返回值

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("First Task -> start");
    System.out.println("First Task -> finish");
    return "First Task Result";
    });
    CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("Second Task -> start");
    System.out.println("Second Task -> finish");
    return "Second Task Result";
    });
    CompletableFuture<Void> task3 = task2.runAfterBoth(task1, () -> {
    System.out.println("Combine Task -> start");
    System.out.println("Combine Task -> finish");
    });
    System.out.println(task3.get());
4.2 两个CompletableFuture执行完成其中一个就会触发
  • applyToEither();
  • acceptEither();
  • runAfterEither();

三个方法都是表示只要其中一个CompletableFuture执行完成就触发调用,区别:

  • applyToEither(); 接收第一个完成的任务的结果,有返回值

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
     CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("First Task -> start");
    System.out.println("First Task -> finish");
    return "First Task Result";
    });
    CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("Second Task -> start");
    System.out.println("Second Task -> finish");
    return "Second Task Result";
    });
    CompletableFuture<String> task3 = task2.applyToEither(task1, (result) -> {
    System.out.println("Either Task -> start");
    System.out.println("Either Task -> finish");
    return "Either Task";
    });
    System.out.println(task3.get());
  • acceptEither(); 接收第一个完成的任务的结果,无返回值

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
     CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("First Task -> start");
    System.out.println("First Task -> finish");
    return "First Task Result";
    });
    CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("Second Task -> start");
    System.out.println("Second Task -> finish");
    return "Second Task Result";
    });
    CompletableFuture<Void> task3 = task2.acceptEither(task1, (result) -> {
    System.out.println("Either Task -> start");
    System.out.println("Either Task -> finish");
    });
    System.out.println(task3.get());
  • runAfterEither(); 不接收,无返回值

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("First Task -> start");
    System.out.println("First Task -> finish");
    return "First Task Result";
    });
    CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("Second Task -> start");
    System.out.println("Second Task -> finish");
    return "Second Task Result";
    });
    CompletableFuture<Void> task3 = task2.runAfterEither(task1, () -> {
    System.out.println("Either Task -> start");
    System.out.println("Either Task -> finish");
    });
    System.out.println(task3.get());
4.3 thenCompose();

thenCompose方法会在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法,该方法会返回一个新的CompletableFuture实例

示例代码

1
2
3
4
5
6
7
8
9
10
11
 CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
System.out.println("First Task -> start");
System.out.println("First Task -> finish");
return "First Task Result";
});
CompletableFuture<String> compose = task1.thenCompose((x) -> {
System.out.println("Compose");
return CompletableFuture.supplyAsync(() -> "new CompleteFuture From Compose");
});
String s = compose.get();
System.out.println(s);

4.4 allOf();

allOf返回的CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
  CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
System.out.println("First Task -> start");
System.out.println("First Task -> finish");
return "First Task Result";
});
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
System.out.println("Second Task -> start");
System.out.println("Second Task -> finish");
return "Second Task Result";
});
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
System.out.println("Third Task -> start");

System.out.println("Third Task -> finish");
return "Third Task Result";
});

CompletableFuture<Void> all = CompletableFuture.allOf(task1, task2, task3);
Void v = all.get();
System.out.println(v);

4.5 anyOf();

anyOf返回的CompletableFuture是多个任务只要其中一个执行完成就会执行,其get返回的是已经执行完成的任务的执行结果,如果该任务执行异常,则抛出异常

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
  CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
logger.info("First Task -> start");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
logger.info("First Task -> finish");
return "First Task Result";
});
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
logger.info("Second Task -> start");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
logger.info("Second Task -> finish");
return "Second Task Result";
});
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
logger.info("Third Task -> start");
logger.info("Third Task -> finish");
return "Third Task Result";
});

CompletableFuture<Object> all = CompletableFuture.anyOf(task1, task2, task3)
.whenComplete((result, exception) -> {
logger.info("all -> whenComplete.result={}", result);
logger.info("all -> whenComplete.exception", exception);
});
String v = (String)all.get();
System.out.println(v);

4.实际例子

TODO …


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!