概述
以下案例不只是AsyncContext,还有其他异步实现方案。
案例1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 @RestController public class WebAsyncController { private final WebAsyncService asyncService; private final static String ERROR_MESSAGE = "Task error" ; private final static String TIME_MESSAGE = "Task timeout" ; @Autowired @Qualifier("taskExecutor") private ThreadPoolTaskExecutor executor; @Autowired public WebAsyncController (WebAsyncService asyncService) { this .asyncService = asyncService; } @GetMapping("/completion") public WebAsyncTask<String> asyncTaskCompletion () { out.println(format("请求处理线程:%s" , currentThread().getName())); WebAsyncTask<String> asyncTask = new WebAsyncTask<>(5 * 1000L , () -> { out.println(format("异步工作线程:%s" , currentThread().getName())); sleep(3 * 1000L ); return asyncService.generateUUID(); }); asyncTask.onCompletion(() -> out.println("任务执行完成" )); out.println("继续处理其他事情" ); return asyncTask; } @GetMapping("/exception") public WebAsyncTask<String> asyncTaskException () { out.println(format("请求处理线程:%s" , currentThread().getName())); WebAsyncTask<String> asyncTask = new WebAsyncTask<>(5 * 1000L , () -> { out.println(format("异步工作线程:%s" , currentThread().getName())); sleep(3 * 1000L ); throw new Exception(ERROR_MESSAGE); }); asyncTask.onCompletion(() -> out.println("任务执行完成" )); asyncTask.onError(() -> { out.println("任务执行异常" ); return ERROR_MESSAGE; }); out.println("继续处理其他事情" ); return asyncTask; } @GetMapping("/timeout") public WebAsyncTask<String> asyncTaskTimeout () { out.println(format("请求处理线程:%s" , currentThread().getName())); WebAsyncTask<String> asyncTask = new WebAsyncTask<>(5 * 1000L , () -> { out.println(format("异步工作线程:%s" , currentThread().getName())); sleep(7 * 1000L ); return TIME_MESSAGE; }); asyncTask.onCompletion(() -> out.println("任务执行完成" )); asyncTask.onTimeout(() -> { out.println("任务执行超时" ); return TIME_MESSAGE; }); out.println("继续处理其他事情" ); return asyncTask; } @GetMapping("/threadPool") public WebAsyncTask<String> asyncTaskThreadPool () { return new WebAsyncTask<>(10 * 1000L , executor, () -> { out.println(format("异步工作线程:%s" , currentThread().getName())); return asyncService.generateUUID(); }); } }
1 2 3 4 5 6 7 8 9 10 11 12 @Configuration public class TaskConfiguration { @Bean("taskExecutor") public ThreadPoolTaskExecutor taskExecutor () { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(5 ); taskExecutor.setMaxPoolSize(10 ); taskExecutor.setQueueCapacity(10 ); taskExecutor.setThreadNamePrefix("asyncTask" ); return taskExecutor; } }
1 2 3 4 5 6 @Service public class WebAsyncService { public String generateUUID () { return UUID.randomUUID().toString(); } }
案例2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 @Slf4j @RestController @RequestMapping("/servlet") public class AsyncController { @RequestMapping("/origin") public void origin (HttpServletRequest request, HttpServletResponse response) throws Exception { Thread.sleep(100 ); response.getWriter().println("这是【正常】的请求返回" ); } @RequestMapping("/async") public void todoAsync (HttpServletRequest request,HttpServletResponse response) { AsyncContext asyncContext = request.startAsync(); asyncContext.addListener(new AsyncListener() { @Override public void onTimeout (AsyncEvent event) throws IOException { log.info("超时了:" ); } @Override public void onStartAsync (AsyncEvent event) throws IOException { log.info("线程开始" ); } @Override public void onError (AsyncEvent event) throws IOException { log.info("发生错误:" ,event.getThrowable()); } @Override public void onComplete (AsyncEvent event) throws IOException { log.info("执行完成" ); } }); asyncContext.setTimeout(200 ); asyncContext.start(new Runnable() { @Override public void run () { try { Thread.sleep(100 ); log.info("内部线程:" + Thread.currentThread().getName()); asyncContext.getResponse().setCharacterEncoding("utf-8" ); asyncContext.getResponse().setContentType("text/html;charset=UTF-8" ); asyncContext.getResponse().getWriter().println("这是【异步】的请求返回" ); } catch (Exception e) { log.error("异常:" ,e); } asyncContext.complete(); } }); log.info("线程:" + Thread.currentThread().getName()); } @RequestMapping("/order") public Callable<String> order () { log.info("主线程开始" ); Callable<String> result = new Callable<String>() { @Override public String call () throws Exception { log.info("副线程开始" ); Thread.sleep(1000 ); log.info("副线程返回" ); return "success" ; } }; log.info("主线程返回" ); return result; } @Autowired private MockQueue mockQueue; @Autowired private DeferredResultHolder deferredResultHolder; @RequestMapping("/order1") public DeferredResult<String> order1 () throws InterruptedException { log.info("主线程开始" ); String orderNumber= RandomStringUtils.randomNumeric(8 ); mockQueue.setPlaceOrder(orderNumber); DeferredResult<String> result=new DeferredResult<>(); deferredResultHolder.getMap().put(orderNumber,result); log.info("主线程返回" ); return result; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Slf4j @RestController public class DeferredResultController { public static ExecutorService FIXED_THREAD_POOL = Executors.newFixedThreadPool(30 ); @RequestMapping("/deferredresult") public DeferredResult<String> deferredResult () { log.info("外部线程:" + Thread.currentThread().getName()); DeferredResult<String> result = new DeferredResult<String>(60 *1000L ); result.onTimeout(() -> { log.error("DeferredResult超时" ); result.setResult("超时了!" ); }); result.onCompletion(() -> log.info("调用完成" )); FIXED_THREAD_POOL.execute(() -> { log.info("内部线程:" + Thread.currentThread().getName()); result.setResult("DeferredResult!!" ); }); return result; } }
1 2 3 4 5 6 7 8 9 10 11 12 @Component public class DeferredResultHolder { private Map<String, DeferredResult<String>> map=new HashMap<String,DeferredResult<String>>(); public Map<String,DeferredResult<String>> getMap(){ return map; } public void setMap (Map<String,DeferredResult<String>> map) { this .map=map; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Slf4j @Data @Component public class MockQueue { private String placeOrder; private String completeOrder; public void setPlaceOrder (String placeOrder) throws InterruptedException { new Thread(()->{ log.info("接到下单请求," +placeOrder); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } this .completeOrder=placeOrder; log.info("下单请求处理完毕," +placeOrder); }).start(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Slf4j @Component public class QueueListener implements ApplicationListener <ContextRefreshedEvent > { @Autowired private MockQueue mockQueue; @Autowired private DeferredResultHolder deferredResultHolder; @Override public void onApplicationEvent (ContextRefreshedEvent event) { new Thread(()->{ while (true ){ if (StringUtils.isNotBlank(mockQueue.getCompleteOrder())){ String orderNumber=mockQueue.getCompleteOrder(); log.info("返回订单处理结果:" +orderNumber); deferredResultHolder.getMap().get(orderNumber).setResult("place order" ); mockQueue.setCompleteOrder(null ); }else { try { Thread.sleep(100 ); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } }
案例3
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @RestController @RequestMapping("/nacos") public class NacosLongPollingController extends HttpServlet { @Autowired private NacosLongPollingService nacosLongPollingService; @RequestMapping("/pull") @Override protected void doGet (HttpServletRequest req, HttpServletResponse resp) { String dataId = req.getParameter("dataId" ); if (StringUtils.isEmpty(dataId)) { throw new IllegalArgumentException("请求参数异常,dataId能为空" ); } nacosLongPollingService.doGet(dataId, req, resp); } @GetMapping("/push") public Result push (@RequestParam("dataId") String dataId, @RequestParam("data") String data) { if (StringUtils.isEmpty(dataId) || StringUtils.isEmpty(data)) { throw new IllegalArgumentException("请求参数异常,dataId和data均不能为空" ); } nacosLongPollingService.push(dataId, data); return ResultUtil.success(); } }
1 2 3 4 public interface NacosLongPollingService { void doGet (String dataId, HttpServletRequest req, HttpServletResponse resp) ; void push (String dataId, String data) ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Service public class NacosLongPollingServiceImpl implements NacosLongPollingService { final ScheduledExecutorService scheduler; final Queue<NacosPullTask> nacosPullTasks; public NacosLongPollingServiceImpl () { scheduler = new ScheduledThreadPoolExecutor(1 , r -> { Thread t = new Thread(r); t.setName("NacosLongPollingTask" ); t.setDaemon(true ); return t; }); nacosPullTasks = new ConcurrentLinkedQueue<>(); scheduler.scheduleAtFixedRate(() -> System.out.println("线程存活状态:" + new Date()), 0L , 60 , TimeUnit.SECONDS); } @Override public void doGet (String dataId, HttpServletRequest req, HttpServletResponse resp) { final AsyncContext asyncContext = req.startAsync(); scheduler.execute(new NacosPullTask(nacosPullTasks, scheduler, asyncContext, dataId, req, resp)); } @Override public void push (String dataId, String data) { scheduler.schedule(new NacosPushTask(dataId, data, nacosPullTasks), 0L , TimeUnit.MILLISECONDS); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 @Slf4j public class NacosPullTask implements Runnable { Queue<NacosPullTask> nacosPullTasks; ScheduledExecutorService scheduler; AsyncContext asyncContext; String dataId; HttpServletRequest req; HttpServletResponse resp; Future<?> asyncTimeoutFuture; public NacosPullTask (Queue<NacosPullTask> nacosPullTasks, ScheduledExecutorService scheduler, AsyncContext asyncContext, String dataId, HttpServletRequest req, HttpServletResponse resp) { this .nacosPullTasks = nacosPullTasks; this .scheduler = scheduler; this .asyncContext = asyncContext; this .dataId = dataId; this .req = req; this .resp = resp; } @Override public void run () { asyncTimeoutFuture = scheduler.schedule(() -> { log.info("10秒后开始执行长轮询任务:" + new Date()); nacosPullTasks.remove(NacosPullTask.this ); }, 10 , TimeUnit.SECONDS); nacosPullTasks.add(this ); } public void sendResponse (String result) { System.out.println("发送响应:" + new Date()); if (asyncTimeoutFuture != null ) { asyncTimeoutFuture.cancel(false ); } resp.setContentType("application/json; charset=utf-8" ); resp.setCharacterEncoding("utf-8" ); resp.setHeader("Pragma" , "no-cache" ); resp.setHeader("Cache-Control" , "no-cache,no-store" ); resp.setDateHeader("Expires" , 0 ); resp.setStatus(HttpServletResponse.SC_OK); sendJsonResult(result); } private void sendJsonResult (String result) { Result<String> pojoResult = new Result<>(); pojoResult.setCode(200 ); pojoResult.setSuccess(!StringUtils.isEmpty(result)); pojoResult.setData(result); PrintWriter writer = null ; try { writer = asyncContext.getResponse().getWriter(); writer.write(pojoResult.toString()); writer.flush(); } catch (IOException e) { e.printStackTrace(); } finally { asyncContext.complete(); if (null != writer) { writer.close(); } } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class NacosPushTask implements Runnable { private String dataId; private String data; private Queue<NacosPullTask> nacosPullTasks; public NacosPushTask (String dataId, String data, Queue<NacosPullTask> nacosPullTasks) { this .dataId = dataId; this .data = data; this .nacosPullTasks = nacosPullTasks; } @Override public void run () { Iterator<NacosPullTask> iterator = nacosPullTasks.iterator(); while (iterator.hasNext()) { NacosPullTask nacosPullTask = iterator.next(); if (dataId.equals(nacosPullTask.dataId)) { iterator.remove(); nacosPullTask.sendResponse(data); break ; } } } }
1 2 3 4 5 6 @Data public class Result <T > { private T data; private Integer code; private Boolean success; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class ResultUtil { public static Result success () { Result result = new Result(); result.setCode(200 ); result.setSuccess(true ); return result; } public static Result success (Object data) { Result result = new Result(); result.setSuccess(true ); result.setCode(200 ); result.setData(data); return result; } }
案例4
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class MyReadListener implements ReadListener { private ServletInputStream inputStream; private AsyncContext asyncContext; public MyReadListener (ServletInputStream input,AsyncContext context) { this .inputStream = input; this .asyncContext = context; } @Override public void onDataAvailable () throws IOException { System.out.println("数据可用时触发执行" ); } @Override public void onAllDataRead () throws IOException { try { Thread.sleep(3000 ); PrintWriter out = asyncContext.getResponse().getWriter(); out.write("数据读完了" ); out.flush(); System.out.println("数据读完了" ); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void onError (Throwable t) { System.out.println("数据 出错" ); t.printStackTrace(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @RestController @RequestMapping("/ex4") public class AsyncContextController { @GetMapping("/test") public void test (HttpServletRequest request, HttpServletResponse response) throws Exception { request.setCharacterEncoding("UTF-8" ); response.setContentType("text/html;charset=UTF-8" ); AsyncContext actx = request.startAsync(); actx.setTimeout(30 *3000 ); ServletInputStream in = request.getInputStream(); in.setReadListener(new MyReadListener(in,actx)); PrintWriter out = response.getWriter(); out.println("<h1>直接返回页面,不等异步处理结果了</h1>" ); out.flush(); } }
案例5
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Slf4j @RestController @RequestMapping("/ex5") public class AsyncContextController5 { @GetMapping("/test") public void test (HttpServletRequest request, HttpServletResponse response) throws Exception { request.setCharacterEncoding("UTF-8" ); response.setContentType("text/html;charset=UTF-8" ); long start = System.currentTimeMillis(); AsyncContext asyncContext = request.startAsync(); CompletableFuture.runAsync(() -> execute( asyncContext, asyncContext.getRequest(), asyncContext.getResponse()) ); log.info("总耗时:" + (System.currentTimeMillis() - start) + "ms" ); } private void execute (AsyncContext asyncContext, ServletRequest request, ServletResponse response) { try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } try { response.getWriter().append("hello" ); } catch (IOException e) { e.printStackTrace(); } asyncContext.complete(); } }
案例6
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 @Slf4j @RestController @RequestMapping("/ex6") public class AsyncContextController6 { @GetMapping("/test") public void test (HttpServletRequest request, HttpServletResponse response) throws Exception { request.setCharacterEncoding("UTF-8" ); response.setContentType("text/html;charset=UTF-8" ); AsyncContext asyncContext = request.startAsync(); asyncContext.addListener(new AsyncListener() { @Override public void onTimeout (AsyncEvent event) throws IOException { log.info("超时了:" ); } @Override public void onStartAsync (AsyncEvent event) throws IOException { log.info("线程开始" ); } @Override public void onError (AsyncEvent event) throws IOException { log.info("发生错误:" ,event.getThrowable()); } @Override public void onComplete (AsyncEvent event) throws IOException { log.info("执行完成" ); } }); asyncContext.setTimeout(200 ); asyncContext.start(new Runnable() { @Override public void run () { try { Thread.sleep(100 ); log.info("内部线程:" + Thread.currentThread().getName()); asyncContext.getResponse().setCharacterEncoding("utf-8" ); asyncContext.getResponse().setContentType("text/html;charset=UTF-8" ); asyncContext.getResponse().getWriter().println("这是【异步】的请求返回" ); } catch (Exception e) { log.error("异常:" ,e); } asyncContext.complete(); } }); log.info("线程:" + Thread.currentThread().getName()); } }
参考