AsyncContext使用案例

概述

以下案例不只是AsyncContext,还有其他异步实现方案。

案例1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
/**
* @author cc
* @description
* @since 2020-12-18
* https://juejin.cn/post/6844903621826445319
*/
@RestController
public class WebAsyncController {
private final WebAsyncService asyncService;
private final static String ERROR_MESSAGE = "Task error";
private final static String TIME_MESSAGE = "Task timeout";

@Autowired
@Qualifier("taskExecutor")
private ThreadPoolTaskExecutor executor;

@Autowired
public WebAsyncController(WebAsyncService asyncService) {
this.asyncService = asyncService;
}

/**
* http://localhost:8080/completion
* @return
*/
@GetMapping("/completion")
public WebAsyncTask<String> asyncTaskCompletion() {
// 打印处理线程名
out.println(format("请求处理线程:%s", currentThread().getName()));

// 模拟开启一个异步任务,超时时间为10s
WebAsyncTask<String> asyncTask = new WebAsyncTask<>(5 * 1000L, () -> {
out.println(format("异步工作线程:%s", currentThread().getName()));
// 任务处理时间5s,不超时
sleep(3 * 1000L);
return asyncService.generateUUID();
});

// 任务执行完成时调用该方法
asyncTask.onCompletion(() -> out.println("任务执行完成"));
out.println("继续处理其他事情");
return asyncTask;
}

/**
* http://localhost:8080/exception
* @return
*/
@GetMapping("/exception")
public WebAsyncTask<String> asyncTaskException() {
// 打印处理线程名
out.println(format("请求处理线程:%s", currentThread().getName()));

// 模拟开启一个异步任务,超时时间为10s
WebAsyncTask<String> asyncTask = new WebAsyncTask<>(5 * 1000L, () -> {
out.println(format("异步工作线程:%s", currentThread().getName()));
// 任务处理时间5s,不超时
sleep(3 * 1000L);
throw new Exception(ERROR_MESSAGE);
});

// 任务执行完成时调用该方法
asyncTask.onCompletion(() -> out.println("任务执行完成"));
asyncTask.onError(() -> {
out.println("任务执行异常");
return ERROR_MESSAGE;
});

out.println("继续处理其他事情");
return asyncTask;
}

/**
* http://localhost:8080/timeout
* @return
*/
@GetMapping("/timeout")
public WebAsyncTask<String> asyncTaskTimeout() {
// 打印处理线程名
out.println(format("请求处理线程:%s", currentThread().getName()));

// 模拟开启一个异步任务,超时时间为10s
WebAsyncTask<String> asyncTask = new WebAsyncTask<>(5 * 1000L, () -> {
out.println(format("异步工作线程:%s", currentThread().getName()));
// 任务处理时间5s,不超时
sleep(7 * 1000L);
return TIME_MESSAGE;
});

// 任务执行完成时调用该方法
asyncTask.onCompletion(() -> out.println("任务执行完成"));
asyncTask.onTimeout(() -> {
out.println("任务执行超时");
return TIME_MESSAGE;
});

out.println("继续处理其他事情");
return asyncTask;
}

/**
* http://localhost:8080/threadPool
* @return
*/
@GetMapping("/threadPool")
public WebAsyncTask<String> asyncTaskThreadPool() {
return new WebAsyncTask<>(10 * 1000L, executor,
() -> {
out.println(format("异步工作线程:%s", currentThread().getName()));
return asyncService.generateUUID();
});
}

}
1
2
3
4
5
6
7
8
9
10
11
12
@Configuration
public class TaskConfiguration {
@Bean("taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(10);
taskExecutor.setQueueCapacity(10);
taskExecutor.setThreadNamePrefix("asyncTask");
return taskExecutor;
}
}
1
2
3
4
5
6
@Service
public class WebAsyncService {
public String generateUUID() {
return UUID.randomUUID().toString();
}
}

案例2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/**
* @author cc
* @description
* @since 2020-12-18
* https://lingmoumou.github.io/p/2020/01/30/1326f080/
*/
@Slf4j
@RestController
@RequestMapping("/servlet")
public class AsyncController {

/**
* http://localhost:8080/servlet/origin
* @param request
* @param response
* @throws Exception
*/
@RequestMapping("/origin")
public void origin(HttpServletRequest request,
HttpServletResponse response) throws Exception {

Thread.sleep(100);
response.getWriter().println("这是【正常】的请求返回");
}

/**
* http://localhost:8080/servlet/async
* @param request
* @param response
*/
@RequestMapping("/async")
public void todoAsync(HttpServletRequest request,HttpServletResponse response) {
AsyncContext asyncContext = request.startAsync();
asyncContext.addListener(new AsyncListener() {

@Override
public void onTimeout(AsyncEvent event) throws IOException {
log.info("超时了:");
//做一些超时后的相关操作
}

@Override
public void onStartAsync(AsyncEvent event) throws IOException {
log.info("线程开始");
}

@Override
public void onError(AsyncEvent event) throws IOException {
log.info("发生错误:",event.getThrowable());
}

@Override
public void onComplete(AsyncEvent event) throws IOException {
log.info("执行完成");
//这里可以做一些清理资源的操作
}
});

//设置超时时间
asyncContext.setTimeout(200);

asyncContext.start(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(100);
log.info("内部线程:" + Thread.currentThread().getName());
asyncContext.getResponse().setCharacterEncoding("utf-8");
asyncContext.getResponse().setContentType("text/html;charset=UTF-8");
asyncContext.getResponse().getWriter().println("这是【异步】的请求返回");
} catch (Exception e) {
log.error("异常:",e);
}
//异步请求完成通知,此时整个请求才完成
//其实可以利用此特性 进行多条消息的推送把连接挂起。。
asyncContext.complete();
}
});
//此时之类 request的线程连接已经释放了
log.info("线程:" + Thread.currentThread().getName());
}

/**
* http://localhost:8080/servlet/order
* @return
*/
@RequestMapping("/order")
public Callable<String> order() {
log.info("主线程开始");
Callable<String> result = new Callable<String>() {
@Override
public String call() throws Exception {
log.info("副线程开始");
Thread.sleep(1000);
log.info("副线程返回");
return "success";
}
};
log.info("主线程返回");
return result;
}


@Autowired
private MockQueue mockQueue;

@Autowired
private DeferredResultHolder deferredResultHolder;

/**
* http://localhost:8080/servlet/order1
* @return
* @throws InterruptedException
*/
@RequestMapping("/order1")
public DeferredResult<String> order1() throws InterruptedException {
log.info("主线程开始");
String orderNumber= RandomStringUtils.randomNumeric(8);
mockQueue.setPlaceOrder(orderNumber);

DeferredResult<String> result=new DeferredResult<>();

deferredResultHolder.getMap().put(orderNumber,result);
log.info("主线程返回");
return result;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
@RestController
public class DeferredResultController {

public static ExecutorService FIXED_THREAD_POOL = Executors.newFixedThreadPool(30); // 线程池

/**
* http://localhost:8080/deferredresult
* @return
*/
@RequestMapping("/deferredresult")
public DeferredResult<String> deferredResult(){
log.info("外部线程:" + Thread.currentThread().getName());
//设置超时时间
DeferredResult<String> result = new DeferredResult<String>(60*1000L);
//处理超时事件 采用委托机制
result.onTimeout(() -> {
log.error("DeferredResult超时");
result.setResult("超时了!");
});

result.onCompletion(() -> log.info("调用完成"));

FIXED_THREAD_POOL.execute(() -> {
log.info("内部线程:" + Thread.currentThread().getName());
//返回结果
result.setResult("DeferredResult!!");
});
return result;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
@Component
public class DeferredResultHolder {
private Map<String, DeferredResult<String>> map=new HashMap<String,DeferredResult<String>>();

public Map<String,DeferredResult<String>> getMap(){
return map;
}

public void setMap(Map<String,DeferredResult<String>> map){
this.map=map;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
@Data
@Component
public class MockQueue {
private String placeOrder;
private String completeOrder;

public void setPlaceOrder(String placeOrder) throws InterruptedException {
new Thread(()->{
log.info("接到下单请求,"+placeOrder);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.completeOrder=placeOrder;
log.info("下单请求处理完毕,"+placeOrder);
}).start();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Slf4j
@Component
public class QueueListener implements ApplicationListener<ContextRefreshedEvent> {

@Autowired
private MockQueue mockQueue;

@Autowired
private DeferredResultHolder deferredResultHolder;

@Override
public void onApplicationEvent(ContextRefreshedEvent event){
new Thread(()->{
while (true){
if(StringUtils.isNotBlank(mockQueue.getCompleteOrder())){
String orderNumber=mockQueue.getCompleteOrder();
log.info("返回订单处理结果:"+orderNumber);
deferredResultHolder.getMap().get(orderNumber).setResult("place order");
mockQueue.setCompleteOrder(null);
}else {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}

案例3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* @author cc
* @description springboot asyncContext 长轮询
* @since 2020-12-18
* https://blog.csdn.net/lovexiaotaozi/article/details/102775350
*/
@RestController
@RequestMapping("/nacos")
public class NacosLongPollingController extends HttpServlet {
@Autowired
private NacosLongPollingService nacosLongPollingService;

/**
* http://localhost:8080/nacos/pull?dataId=1
* @param req
* @param resp
*/
@RequestMapping("/pull")
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) {
String dataId = req.getParameter("dataId");
if (StringUtils.isEmpty(dataId)) {
throw new IllegalArgumentException("请求参数异常,dataId能为空");
}
nacosLongPollingService.doGet(dataId, req, resp);
}
//为了在浏览器中演示,我这里先用Get请求,dataId可以区分不同应用的请求
@GetMapping("/push")
public Result push(@RequestParam("dataId") String dataId, @RequestParam("data") String data) {
if (StringUtils.isEmpty(dataId) || StringUtils.isEmpty(data)) {
throw new IllegalArgumentException("请求参数异常,dataId和data均不能为空");
}
nacosLongPollingService.push(dataId, data);
return ResultUtil.success();
}
}
1
2
3
4
public interface NacosLongPollingService {
void doGet(String dataId, HttpServletRequest req, HttpServletResponse resp);
void push(String dataId, String data);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Service
public class NacosLongPollingServiceImpl implements NacosLongPollingService {
final ScheduledExecutorService scheduler;
final Queue<NacosPullTask> nacosPullTasks;

public NacosLongPollingServiceImpl() {
scheduler = new ScheduledThreadPoolExecutor(1, r -> {
Thread t = new Thread(r);
t.setName("NacosLongPollingTask");
t.setDaemon(true);
return t;
});
nacosPullTasks = new ConcurrentLinkedQueue<>();
scheduler.scheduleAtFixedRate(() -> System.out.println("线程存活状态:" + new Date()), 0L, 60, TimeUnit.SECONDS);
}

@Override
public void doGet(String dataId, HttpServletRequest req, HttpServletResponse resp) {
// 一定要由当前HTTP线程调用,如果放在task线程容器会立即发送响应
final AsyncContext asyncContext = req.startAsync();
scheduler.execute(new NacosPullTask(nacosPullTasks, scheduler, asyncContext, dataId, req, resp));
}

@Override
public void push(String dataId, String data) {
scheduler.schedule(new NacosPushTask(dataId, data, nacosPullTasks), 0L, TimeUnit.MILLISECONDS);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@Slf4j
public class NacosPullTask implements Runnable {
Queue<NacosPullTask> nacosPullTasks;
ScheduledExecutorService scheduler;
AsyncContext asyncContext;
String dataId;
HttpServletRequest req;
HttpServletResponse resp;

Future<?> asyncTimeoutFuture;

public NacosPullTask(Queue<NacosPullTask> nacosPullTasks, ScheduledExecutorService scheduler,
AsyncContext asyncContext, String dataId, HttpServletRequest req, HttpServletResponse resp) {
this.nacosPullTasks = nacosPullTasks;
this.scheduler = scheduler;
this.asyncContext = asyncContext;
this.dataId = dataId;
this.req = req;
this.resp = resp;
}

@Override
public void run() {
asyncTimeoutFuture = scheduler.schedule(() -> {
log.info("10秒后开始执行长轮询任务:" + new Date());
//这里如果remove this会失败,内部类中的this指向的并非当前对象,而是匿名内部类对象
nacosPullTasks.remove(NacosPullTask.this);
//sendResponse(null);
}, 10, TimeUnit.SECONDS);
nacosPullTasks.add(this);
}

/**
* 发送响应
*
* @param result
*/
public void sendResponse(String result) {
System.out.println("发送响应:" + new Date());
//取消等待执行的任务,避免已经响完了,还有资源被占用
if (asyncTimeoutFuture != null) {
//设置为true会立即中断执行中的任务,false对执行中的任务无影响,但会取消等待执行的任务
asyncTimeoutFuture.cancel(false);
}

//设置页码编码
resp.setContentType("application/json; charset=utf-8");
resp.setCharacterEncoding("utf-8");

//禁用缓存
resp.setHeader("Pragma", "no-cache");
resp.setHeader("Cache-Control", "no-cache,no-store");
resp.setDateHeader("Expires", 0);
resp.setStatus(HttpServletResponse.SC_OK);
//输出Json流
sendJsonResult(result);
}

/**
* 发送响应流
*
* @param result
*/
private void sendJsonResult(String result) {
Result<String> pojoResult = new Result<>();
pojoResult.setCode(200);
pojoResult.setSuccess(!StringUtils.isEmpty(result));
pojoResult.setData(result);
PrintWriter writer = null;
try {
writer = asyncContext.getResponse().getWriter();
writer.write(pojoResult.toString());
writer.flush();
} catch (IOException e) {
e.printStackTrace();
} finally {
asyncContext.complete();
if (null != writer) {
writer.close();
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class NacosPushTask implements Runnable {
private String dataId;
private String data;
private Queue<NacosPullTask> nacosPullTasks;

public NacosPushTask(String dataId, String data,
Queue<NacosPullTask> nacosPullTasks) {
this.dataId = dataId;
this.data = data;
this.nacosPullTasks = nacosPullTasks;
}

@Override
public void run() {
Iterator<NacosPullTask> iterator = nacosPullTasks.iterator();
while (iterator.hasNext()) {
NacosPullTask nacosPullTask = iterator.next();
if (dataId.equals(nacosPullTask.dataId)) {
//可根据内容的MD5判断数据是否发生改变,这里为了演示简单就不写了
//移除队列中的任务,确保下次请求时响应的task不是上次请求留在队列中的task
iterator.remove();
//执行数据变更,发送响应
nacosPullTask.sendResponse(data);
break;
}
}
}
}
1
2
3
4
5
6
@Data
public class Result<T> {
private T data;
private Integer code;
private Boolean success;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ResultUtil {
public static Result success() {
Result result = new Result();
result.setCode(200);
result.setSuccess(true);
return result;
}

public static Result success(Object data) {
Result result = new Result();
result.setSuccess(true);
result.setCode(200);
result.setData(data);
return result;
}
}

案例4

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class MyReadListener implements ReadListener {
private ServletInputStream inputStream;
private AsyncContext asyncContext;
public MyReadListener(ServletInputStream input,AsyncContext context){
this.inputStream = input;
this.asyncContext = context;
}
//数据可用时触发执行
@Override
public void onDataAvailable() throws IOException {
System.out.println("数据可用时触发执行");
}

//数据读完时触发调用
@Override
public void onAllDataRead() throws IOException {
try {
Thread.sleep(3000);//暂停5秒,模拟耗时处理数据
PrintWriter out = asyncContext.getResponse().getWriter();
out.write("数据读完了");
out.flush();
System.out.println("数据读完了");
} catch (InterruptedException e) {
e.printStackTrace();
}

}

//数据出错触发调用
@Override
public void onError(Throwable t){
System.out.println("数据 出错");
t.printStackTrace();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@RestController
@RequestMapping("/ex4")
public class AsyncContextController {

// http://localhost:8080/ex4/test
@GetMapping("/test")
public void test(HttpServletRequest request, HttpServletResponse response) throws Exception{
request.setCharacterEncoding("UTF-8");
response.setContentType("text/html;charset=UTF-8");

AsyncContext actx = request.startAsync();//通过request获得AsyncContent对象
actx.setTimeout(30*3000);//设置异步调用超时时长
ServletInputStream in = request.getInputStream();
//异步读取(实现了非阻塞式读取)
in.setReadListener(new MyReadListener(in,actx));
//直接输出到页面的内容(不等异步完成就直接给页面)
PrintWriter out = response.getWriter();
out.println("<h1>直接返回页面,不等异步处理结果了</h1>");
out.flush();
}
}

案例5

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Slf4j
@RestController
@RequestMapping("/ex5")
public class AsyncContextController5 {

// http://localhost:8080/ex5/test
@GetMapping("/test")
public void test(HttpServletRequest request, HttpServletResponse response) throws Exception{
request.setCharacterEncoding("UTF-8");
response.setContentType("text/html;charset=UTF-8");

long start = System.currentTimeMillis();
AsyncContext asyncContext = request.startAsync();

CompletableFuture.runAsync(() -> execute(
asyncContext,
asyncContext.getRequest(),
asyncContext.getResponse())
);
log.info("总耗时:" + (System.currentTimeMillis() - start) + "ms");
}

private void execute(AsyncContext asyncContext, ServletRequest request, ServletResponse response) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
response.getWriter().append("hello");
} catch (IOException e) {
e.printStackTrace();
}
asyncContext.complete();
}
}

案例6

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@Slf4j
@RestController
@RequestMapping("/ex6")
public class AsyncContextController6 {

// http://localhost:8080/ex6/test
@GetMapping("/test")
public void test(HttpServletRequest request, HttpServletResponse response) throws Exception{
request.setCharacterEncoding("UTF-8");
response.setContentType("text/html;charset=UTF-8");

AsyncContext asyncContext = request.startAsync();
asyncContext.addListener(new AsyncListener() {

@Override
public void onTimeout(AsyncEvent event) throws IOException {
log.info("超时了:");
//做一些超时后的相关操作
}

@Override
public void onStartAsync(AsyncEvent event) throws IOException {
// TODO Auto-generated method stub
log.info("线程开始");
}

@Override
public void onError(AsyncEvent event) throws IOException {
log.info("发生错误:",event.getThrowable());
}

@Override
public void onComplete(AsyncEvent event) throws IOException {
log.info("执行完成");
//这里可以做一些清理资源的操作

}
});
//设置超时时间
asyncContext.setTimeout(200);
//也可以不使用start 进行异步调用
// new Thread(new Runnable() {
//
// @Override
// public void run() {
// 编写业务逻辑
//
// }
// }).start();

asyncContext.start(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(100);
log.info("内部线程:" + Thread.currentThread().getName());
asyncContext.getResponse().setCharacterEncoding("utf-8");
asyncContext.getResponse().setContentType("text/html;charset=UTF-8");
asyncContext.getResponse().getWriter().println("这是【异步】的请求返回");
} catch (Exception e) {
log.error("异常:",e);
}
//异步请求完成通知
//此时整个请求才完成
//其实可以利用此特性 进行多条消息的推送 把连接挂起。。
asyncContext.complete();
}
});
//此时之类 request的线程连接已经释放了
log.info("线程:" + Thread.currentThread().getName());
}
}

参考