创建线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| package com.example.demo.thread;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger;
public class ThreadPoolFactory {
public static ThreadPoolTaskExecutor springThreadPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5); executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 5); executor.setQueueCapacity(500); executor.setThreadNamePrefix("ali-asr-"); executor.initialize(); return executor; }
public static ExecutorService newThreadPoolExecutor() { ThreadFactory threadFactory = new ThreadFactory() { private final AtomicInteger id = new AtomicInteger();
@Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("idea360" + id.getAndIncrement()); return thread; } }; BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(500); RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.DiscardPolicy(); return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 0L, TimeUnit.MILLISECONDS, workQueue, threadFactory, rejectedExecutionHandler); }
public static ExecutorService newFixedThreadPool() { return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), r -> { Thread t = new Thread(r); t.setName("idea360"); t.setDaemon(true); return t; }); }
public static ScheduledExecutorService newScheduledThreadPool() { return Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), r -> { Thread t = new Thread(r); t.setName("idea360"); t.setDaemon(true); return t; }); } }
|
基本示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
|
public class ThreadTest {
public static void main(String[] args) throws Exception { System.out.println("主线程开始工作:"); long startTime = System.currentTimeMillis(); ExecutorService executorService = ThreadPoolFactory.newThreadPoolExecutor();
futureTask(executorService);
long stopTime = System.currentTimeMillis(); System.out.println("主线程结束,耗时:"+(stopTime - startTime)+"ms"); executorService.shutdown(); }
public static void asyncThread(ExecutorService executorService) { executorService.execute(new RunnableTask()); }
private static void blockThread(ExecutorService executorService) throws ExecutionException, InterruptedException { Future<String> future = executorService.submit(new CallableTask()); String returnValue = future.get(); System.out.println("返回值: " + returnValue); }
public static void asyncUnBlockThread(ExecutorService executorService) { ExecutorService secondExecutorService = Executors.newFixedThreadPool(1); Future<String> future = executorService.submit(new CallableTask()); secondExecutorService.execute(() -> { try { String returnValue = future.get(); System.out.println("返回值: " + returnValue); } catch (InterruptedException|ExecutionException e) { e.printStackTrace(); } }); }
public static void futureTask(ExecutorService executorService) throws ExecutionException, InterruptedException { FutureTask<String> futureTask = new FutureTask<>(new CallableTask()); executorService.execute(futureTask); String returnValue = futureTask.get(); System.out.println("返回值: " + returnValue); }
public static void asyncTask(ExecutorService executorService) { CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> { System.out.println("线程中的任务,开始运行!"); doSomeThing(); System.out.println("线程中的任务,结束执行!"); return "当我遇上你~"; }, executorService); try { String returnValue = supplyAsync.get(); System.out.println("返回值: " + returnValue); } catch (ExecutionException | InterruptedException e) { e.printStackTrace(); } }
private static class RunnableTask implements Runnable { @Override public void run() { System.out.println("线程中的任务,开始运行!"); doSomeThing(); System.out.println("线程中的任务,结束执行!"); } }
private static class CallableTask implements Callable<String> { @Override public String call() throws Exception { System.out.println("线程中的任务,开始运行!"); doSomeThing(); System.out.println("线程中的任务,结束执行!"); return "当我遇上你~"; } }
private static void doSomeThing(){ try { System.out.println("正在执行!"); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } }
|
定时调度
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
|
public class ScheduledTest {
public static void main(String[] args) throws Exception { System.out.println("主线程开始工作:"); ScheduledExecutorService executorService = ThreadPoolFactory.newScheduledThreadPool(); executorService.scheduleWithFixedDelay(new RunnableTask(), 1L, 2L, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(10); }
private static class RunnableTask implements Runnable { @Override public void run() { System.out.println("线程中的任务,开始运行!"); doSomeThing(); System.out.println("线程中的任务,结束执行!"); } }
private static void doSomeThing(){ try { System.out.println("正在执行!"); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } }
|
并发测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| package com.example.demo.thread;
import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;
public class LockTest {
private static final Logger log = LoggerFactory.getLogger(LockTest.class); public static int clientTotal = 5000; public static int threadTotal = 200; public static int count = 0; private final static Lock lock = new ReentrantLock();
public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count); }
private static void add() { lock.lock(); try { count++; } finally { lock.unlock(); } } }
|
循环屏障
一个线程组的线程需要等待所有线程完成任务后再继续执行下一次任务。可以用于多线程计算数据,最后合并计算结果的场景
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
|
public class CyclicBarrierTest {
private static final Logger log = LoggerFactory.getLogger(CyclicBarrierTest.class);
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) { final int threadNum = i; TimeUnit.MILLISECONDS.sleep(500); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); }
private static void race(int threadNum) throws Exception { log.info("{} is ready", threadNum); barrier.await(); log.info("{} continue", threadNum); } }
|
最后
本文到此结束,感谢阅读。如果您觉得不错,请关注公众号【当我遇上你】,您的支持是我写作的最大动力。