线程池笔记

创建线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.example.demo.thread;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author cuishiying
* @date 2021-01-22
*/
public class ThreadPoolFactory {

/**
* spring线程池
*/
public static ThreadPoolTaskExecutor springThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5); // 核心线程数
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 5); // 最大线程数
executor.setQueueCapacity(500); // 任务队列容量
executor.setThreadNamePrefix("ali-asr-");
executor.initialize();
return executor;
}

/**
* Java自定义线程池
*/
public static ExecutorService newThreadPoolExecutor() {
ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicInteger id = new AtomicInteger();

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("idea360" + id.getAndIncrement());
return thread;
}
};
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(500);
// 超过队列策略:丢弃
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.DiscardPolicy();
return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 0L, TimeUnit.MILLISECONDS, workQueue, threadFactory, rejectedExecutionHandler);
}

/**
* Java线程池工具
*/
public static ExecutorService newFixedThreadPool() {
return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), r -> {
Thread t = new Thread(r);
t.setName("idea360");
t.setDaemon(true);
return t;
});
}

/**
* Java线程池工具:调度线程池
*/
public static ScheduledExecutorService newScheduledThreadPool() {
return Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), r -> {
Thread t = new Thread(r);
t.setName("idea360");
t.setDaemon(true);
return t;
});
}
}

基本示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/**
* @author cuishiying
* @date 2021-01-22
*/
public class ThreadTest {

public static void main(String[] args) throws Exception {
System.out.println("主线程开始工作:");
long startTime = System.currentTimeMillis();
ExecutorService executorService = ThreadPoolFactory.newThreadPoolExecutor();

futureTask(executorService);

long stopTime = System.currentTimeMillis();
System.out.println("主线程结束,耗时:"+(stopTime - startTime)+"ms");
executorService.shutdown();
}

/**
* 主线程开始工作:
* 主线程结束,耗时:7ms
* 线程中的任务,开始运行!
* 正在执行!
* 线程中的任务,结束执行!
*/
public static void asyncThread(ExecutorService executorService) {
executorService.execute(new RunnableTask());
}

/**
* 主线程开始工作:
* 线程中的任务,开始运行!
* 正在执行!
* 线程中的任务,结束执行!
* 返回值: 当我遇上你~
* 主线程结束,耗时:1072ms
*/
private static void blockThread(ExecutorService executorService) throws ExecutionException, InterruptedException {
Future<String> future = executorService.submit(new CallableTask());
String returnValue = future.get();
System.out.println("返回值: " + returnValue);
}

/**
* 主线程开始工作:
* 线程中的任务,开始运行!
* 正在执行!
* 主线程结束,耗时:61ms
* 线程中的任务,结束执行!
* 返回值: 当我遇上你~
*/
public static void asyncUnBlockThread(ExecutorService executorService) {
ExecutorService secondExecutorService = Executors.newFixedThreadPool(1);
Future<String> future = executorService.submit(new CallableTask());
secondExecutorService.execute(() -> {
try {
String returnValue = future.get();
System.out.println("返回值: " + returnValue);
} catch (InterruptedException|ExecutionException e) {
e.printStackTrace();
}
});
}

/**
* 主线程开始工作:
* 线程中的任务,开始运行!
* 正在执行!
* 线程中的任务,结束执行!
* 返回值: 当我遇上你~
* 主线程结束,耗时:1046ms
*/
public static void futureTask(ExecutorService executorService) throws ExecutionException, InterruptedException {
FutureTask<String> futureTask = new FutureTask<>(new CallableTask());
executorService.execute(futureTask);
String returnValue = futureTask.get();
System.out.println("返回值: " + returnValue);
}

/**
* Java8异步任务
* 主线程开始工作:
* 线程中的任务,开始运行!
* 正在执行!
* 线程中的任务,结束执行!
* 返回值: 当我遇上你~
* 主线程结束,耗时:1086ms
*/
public static void asyncTask(ExecutorService executorService) {
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
System.out.println("线程中的任务,开始运行!");
doSomeThing();
System.out.println("线程中的任务,结束执行!");
return "当我遇上你~";
}, executorService);
try {
String returnValue = supplyAsync.get();// Blocking
System.out.println("返回值: " + returnValue);
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}

private static class RunnableTask implements Runnable {
@Override
public void run() {
System.out.println("线程中的任务,开始运行!");
doSomeThing();
System.out.println("线程中的任务,结束执行!");
}
}

private static class CallableTask implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("线程中的任务,开始运行!");
doSomeThing();
System.out.println("线程中的任务,结束执行!");
return "当我遇上你~";
}
}

private static void doSomeThing(){
try {
System.out.println("正在执行!");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}

定时调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* @author cuishiying
* @date 2021-01-22
*/
public class ScheduledTest {

/**
* 线程中的任务,开始运行!
* 正在执行!
* 线程中的任务,结束执行!
* 线程中的任务,开始运行!
* 正在执行!
* 线程中的任务,结束执行!
* 线程中的任务,开始运行!
* 正在执行!
* 线程中的任务,结束执行!
*/
public static void main(String[] args) throws Exception {
System.out.println("主线程开始工作:");
ScheduledExecutorService executorService = ThreadPoolFactory.newScheduledThreadPool();
executorService.scheduleWithFixedDelay(new RunnableTask(), 1L, 2L, TimeUnit.SECONDS);

// 主线程执行结束后子线程就不会再执行了
TimeUnit.SECONDS.sleep(10);
}

private static class RunnableTask implements Runnable {
@Override
public void run() {
System.out.println("线程中的任务,开始运行!");
doSomeThing();
System.out.println("线程中的任务,结束执行!");
}
}

private static void doSomeThing(){
try {
System.out.println("正在执行!");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}

并发测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.example.demo.thread;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author cuishiying
* @date 2021-01-22
*/
public class LockTest {

private static final Logger log = LoggerFactory.getLogger(LockTest.class);
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
// 初始计数
public static int count = 0;
// juc lock
private final static Lock lock = new ReentrantLock();

public static void main(String[] args) throws Exception {
// 线程池
ExecutorService executorService = Executors.newCachedThreadPool();
// 控制并发
final Semaphore semaphore = new Semaphore(threadTotal);
// 闭锁(让主线程等待子线程5000个任务执行完毕)
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
// 此处能够同时获取200个令牌, 然后等待令牌释放
semaphore.acquire();
add();
// 令牌释放后其他任务才能继续执行, 直到5000任务执行完毕
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
// 计数器-1
countDownLatch.countDown();
});
}
// 阻塞主线程, 等待子线程执行完毕(countDownLatch计数器变为0)
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}

private static void add() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
}

循环屏障

一个线程组的线程需要等待所有线程完成任务后再继续执行下一次任务。可以用于多线程计算数据,最后合并计算结果的场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* @author cuishiying
* @date 2021-01-22
*/
public class CyclicBarrierTest {

private static final Logger log = LoggerFactory.getLogger(CyclicBarrierTest.class);

private static CyclicBarrier barrier = new CyclicBarrier(5);

/**
* 14:34:20.041 [pool-1-thread-1] INFO com.example.demo.thread.CyclicBarrierTest - 0 is ready
* 14:34:20.543 [pool-1-thread-2] INFO com.example.demo.thread.CyclicBarrierTest - 1 is ready
* 14:34:21.048 [pool-1-thread-3] INFO com.example.demo.thread.CyclicBarrierTest - 2 is ready
* 14:34:21.552 [pool-1-thread-4] INFO com.example.demo.thread.CyclicBarrierTest - 3 is ready
* 14:34:22.054 [pool-1-thread-5] INFO com.example.demo.thread.CyclicBarrierTest - 4 is ready
* 14:34:22.055 [pool-1-thread-5] INFO com.example.demo.thread.CyclicBarrierTest - 4 continue
* 14:34:22.055 [pool-1-thread-1] INFO com.example.demo.thread.CyclicBarrierTest - 0 continue
* 14:34:22.055 [pool-1-thread-2] INFO com.example.demo.thread.CyclicBarrierTest - 1 continue
* 14:34:22.055 [pool-1-thread-3] INFO com.example.demo.thread.CyclicBarrierTest - 2 continue
* 14:34:22.055 [pool-1-thread-4] INFO com.example.demo.thread.CyclicBarrierTest - 3 continue
* 14:34:22.555 [pool-1-thread-6] INFO com.example.demo.thread.CyclicBarrierTest - 5 is ready
* 14:34:23.058 [pool-1-thread-5] INFO com.example.demo.thread.CyclicBarrierTest - 6 is ready
* 14:34:23.561 [pool-1-thread-3] INFO com.example.demo.thread.CyclicBarrierTest - 7 is ready
* 14:34:24.065 [pool-1-thread-4] INFO com.example.demo.thread.CyclicBarrierTest - 8 is ready
* 14:34:24.571 [pool-1-thread-1] INFO com.example.demo.thread.CyclicBarrierTest - 9 is ready
* 14:34:24.571 [pool-1-thread-1] INFO com.example.demo.thread.CyclicBarrierTest - 9 continue
* 14:34:24.571 [pool-1-thread-6] INFO com.example.demo.thread.CyclicBarrierTest - 5 continue
* 14:34:24.571 [pool-1-thread-5] INFO com.example.demo.thread.CyclicBarrierTest - 6 continue
* 14:34:24.571 [pool-1-thread-4] INFO com.example.demo.thread.CyclicBarrierTest - 8 continue
* 14:34:24.571 [pool-1-thread-3] INFO com.example.demo.thread.CyclicBarrierTest - 7 continue
*/
public static void main(String[] args) throws Exception {

ExecutorService executor = Executors.newCachedThreadPool();

for (int i = 0; i < 10; i++) {
final int threadNum = i;
TimeUnit.MILLISECONDS.sleep(500);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}

private static void race(int threadNum) throws Exception {
log.info("{} is ready", threadNum);
// 所有线程阻塞, 直到barrier计数器变为0
barrier.await();
log.info("{} continue", threadNum);
}
}

最后

本文到此结束,感谢阅读。如果您觉得不错,请关注公众号【当我遇上你】,您的支持是我写作的最大动力。