java8异步任务CompletableFuture

前言

场景接 Java导入包含图片的Excel。从Excel中获取的图片bytes我们需要上传到阿里云的oss服务, 然后将图片的url回填到Excel的图片字段。

基本使用

java8新特性~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package com.example;

import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
* @author cuishiying
* @date 2021-01-22
*/
public class CompletableFutureTest {

// 获取一个已经完成的 CompletableFuture 对象
@Test
void completedFuture() {
CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("当我遇上你");
assertTrue(completableFuture.isDone());
try {
assertEquals("当我遇上你", completableFuture.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

// 异步任务获取结果
@Test
void supplyAsync() {
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> "当我遇上你");
try {
assertEquals("当我遇上你", supplyAsync.get()); // Blocking
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}

// 异步任务获取结果+自定义线程池
@Test
void supplyAsyncWithExecutor() {
ExecutorService threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> "当我遇上你", threadPool);
try {
assertEquals("当我遇上你", supplyAsync.get()); // Blocking
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}

// 异步结果聚合
@Test
void returnList() {
ExecutorService threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
List<CompletableFuture<String>> futureList = Arrays.asList("a", "b").stream()
.map(i -> CompletableFuture.supplyAsync(() -> "当我遇上你:" + i, threadPool))
.collect(Collectors.toList());
// join 操作等待所有异步操作的结果
List<String> stringList = futureList.stream().map(CompletableFuture::join).collect(Collectors.toList());
// [当我遇上你:a, 当我遇上你:b]
System.out.println(stringList);
}

// 1. 创建异步执行任务, 2. 执行成功逻辑(同步), 3. 执行异常逻辑
@Test
void thenAccept() {
ExecutorService threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
List<CompletableFuture<String>> futureList = Stream.of("a", "b").map(i -> CompletableFuture
.supplyAsync(() -> "当我遇上你:" + i, threadPool)
// 返回
.thenApply((result) -> "Hello:" + result)
// 不返回结果
// .thenAccept((result) -> System.out.println(result))
.exceptionally(Throwable::getMessage)).collect(Collectors.toList());
List<String> stringList = futureList.stream().map(CompletableFuture::join).collect(Collectors.toList());
// [Hello:当我遇上你:a, Hello:当我遇上你:b]
System.out.println(stringList);
}
}

最后

本文到此结束,感谢阅读。如果您觉得不错,请关注公众号【当我遇上你】,您的支持是我写作的最大动力。