Java线程调度

概述

多线程是Java开发中的基本内容, 线程有自己的生命周期,如何根据其生命周期实现调度呢?下文见分晓。

一些业务中,任务需要创建、启动、暂停、恢复、停止、销毁等操作,映射到线程就是 createstartblockrunninginterruptestop

在笔者的业务中, 目标是实现一个并发调度服务,每个任务的并发控制用一个线程来处理。

业务实现

线程池创建

由于线程与任务关联,每个任务的执行只是调度,无IO密集和CPU密集之说,所以不需要核心线程。直接根据任务数动态的创建和回收线程即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* @author 当我遇上你
* @公众号 当我遇上你
* @since 2020-05-28
*/
@Configuration
public class BeanConfig {

/**
* 无核心线程
* 最大线程无限大
* 每个调度任务分配1个线程
* @return
*/
@Bean("executorService")
public ExecutorService executorService() {
ExecutorService executorService = Executors.newCachedThreadPool();
return executorService;
}

}

作为一个服务,线程池是不需要回收的,只需要回收线程即可。线程池的使用如下:

1
2
3
4
5
@Autowired
private ExecutorService executorService;

// 将实现了Runnable的线程加入线程池
executorService.submit(task);

核心实现(任务)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
/**
* @author 当我遇上你
* @公众号 当我遇上你
* @since 2020-05-28
*/
@Slf4j
public class Task implements Callable<Job>, Serializable {

// 获取不到令牌的默认休眠时间
private final Integer DEFAULT_BLOCK_TIME = 1000 * 5;
private final Integer MIN_BLOCK_TIME = 500;
private final Integer MAX_BLOCK_TIME = 1000 * 60 * 10;

// 线程暂停标识
private volatile boolean suspend = false;

// 线程运行标识
private volatile boolean running = true;

// 线程锁
private final Object lock = new Object();

// 当前线程
private Thread currentThread;

// 当前任务信息
private Job job;

// 任务服务
private JobService jobService;

// 令牌服务
private TokenService tokenService;

// 呼叫服务
private CallService callService;

// 任务存储
private TaskStorage taskStorage;



private Task(Job job, JobService jobService, TokenService tokenService, CallService callService, TaskStorage taskStorage) {
this.job = job;
this.jobService = jobService;
this.tokenService = tokenService;
this.callService = callService;
this.taskStorage = taskStorage;
this.onCreate();
}

@Override
public Job call() throws Exception {

onStart();
currentThread = Thread.currentThread();
while (running && !Thread.currentThread().isInterrupted()) {

if (suspend) {
// 当前线程暂停
System.out.println("task:" + this.job.getJobName() + " suspend...");
synchronized (lock) {
try {
lock.wait();
} catch (InterruptedException e) {
break;
}
}
}

// 线程正常执行
System.out.println("\ntask:" + this.job.getJobName() + " running...");

// 获取到令牌
String token = tokenService.acquire(job);
if (StringUtils.isNotEmpty(token)) {

CallData callData = jobService.loadTaskData(job);

if (callData == null) {

// 如果没有数据,判断业务动作指令
Job taskInfo = jobService.getTaskInfo(job);
if (taskInfo.getTaskAction() == TaskActionEnum.SUSPEND.getValue()) {
System.out.println("所有任务数据执行完成...暂停任务");
this.suspend();
} else if (taskInfo.getTaskAction() == TaskActionEnum.STOP.getValue()) {
System.out.println("所有任务数据执行完成...关闭任务");
this.stop();
} else {
System.out.println("所有任务数据执行完成...销毁任务");
this.destroy();
}

} else {
// 发送给外呼机器人拨打电话, 携带token, 挂机时归还token
callData.setToken(token);
callService.call(job, callData);
continue;
}
}

// 获取不到令牌, 则休眠30s后继续尝试获取令牌
try {
long blockTime = job.getBlockTime();
if (blockTime < MIN_BLOCK_TIME || blockTime > MAX_BLOCK_TIME) {
blockTime = DEFAULT_BLOCK_TIME;
}
System.out.println("获取不到令牌,睡眠" + blockTime/1000 + "s后再次获取...");
Thread.sleep(blockTime);

} catch (InterruptedException e) {
// 线程中断,当前线程结束
break;
}
}
return null;
}

/**
* 获取任务信息
* @return
*/
public Job getJob() {
return job;
}

/**
* 暂停当前线程
*/
public void suspend() {
onPause();
if (currentThread == null) {
throw new RuntimeException("任务未启动");
}

this.suspend = true;
}

/**
* 唤起当前线程
*/
public void resume() {
onResume();
if (currentThread == null) {
throw new RuntimeException("任务未启动");
}
if (currentThread.isAlive() && !currentThread.isInterrupted()) {
System.out.println("当前任务正在执行, 无需重复操作...");
}
this.suspend = false;
synchronized (lock) {
lock.notifyAll();
}
}

/**
* 停止线程
*/
public void stop() {
onStop();
if (currentThread == null) {
System.out.println("任务未启动");
return;
// throw new RuntimeException("任务未启动");
}
if (!currentThread.isAlive() && currentThread.isInterrupted()) {
System.out.println("当前任务已停止, 无需重复操作...");
} else {
currentThread.interrupt();
currentThread = null;
}

}

/**
* 主动销毁资源
*/
public void destroy() {
onDestroy();
System.out.println("\n");
log.info("SchedulerService:stop:appName:[{}]:taskId:[{}]调度任务结束, 即将释放资源...", job.getAppName(), job.getTaskId());

// 清除任务基本信息
jobService.clearTaskInfo(job);

// 销毁令牌
tokenService.destroyToken(job);

// 停止任务
this.stop();

// 将内存中任务移除
taskStorage.removeTask(this);

log.info("SchedulerService:stop:appName:[{}]:taskId:[{}]调度任务结束, 资源释放完成...", job.getAppName(), job.getTaskId());
}

/**
* 线程创建回调
*/
private void onCreate() {
System.out.println("\nonCreate...");
}

/**
* 线程启动回调
*/
private void onStart() {
System.out.println("\nonStart...");
};
/**
* 线程暂停回调
*/
private void onPause() {
System.out.println("\nonPause...");
};
/**
* 线程唤起回调
*/
private void onResume() {
System.out.println("\nonResume...");
};
/**
* 线程终止回调
*/
private void onStop() {
System.out.println("\nonStop...");
};
/**
* 线程销毁回调
*/
private void onDestroy() {
System.out.println("\nonDestroy...");
};

public static Builder builder() {
return new Task.Builder();
}

public static class Builder {

// 任务基本信息
private Job job;

// 任务数据
private JobService jobService;

// 令牌管理
private TokenService tokenService;

// 外呼服务
private CallService callService;

// 任务存储
private TaskStorage taskStorage;

public Builder() {
}

public Builder setJob(Job job) {
this.job = job;
return this;
}

public Builder setJobService(JobService jobService) {
this.jobService = jobService;
return this;
}

public Builder setTokenService(TokenService tokenService) {
this.tokenService = tokenService;
return this;
}

public Builder setCallService(CallService callService) {
this.callService = callService;
return this;
}

public Builder setTaskStorage(TaskStorage taskStorage) {
this.taskStorage = taskStorage;
return this;
}

public Task build() {
return new Task(job, jobService, tokenService, callService, taskStorage);
}
}

public static void main(String[] args) throws Exception{

Job jobDetail = new Job("job1");

ExecutorService executorService = Executors.newCachedThreadPool();
Task task = Task.builder().setJob(jobDetail).build();

Map<Integer, Task> tasks = new HashMap<Integer, Task>();
tasks.put(1, task);

// 提交线程任务
Future<?> future = executorService.submit(task);

// 暂停
TimeUnit.SECONDS.sleep(2);
Task task1 = tasks.get(1);
task1.suspend();

// 恢复
TimeUnit.SECONDS.sleep(2);
task1.resume();

// 停止
TimeUnit.SECONDS.sleep(2);
task1.stop();

// 销毁线程池
executorService.shutdown();
}
}

调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* @author 当我遇上你
* @公众号 当我遇上你
* @since 2020-05-28
*/
// 线程我们维护在map中
public final static Map<Serializable, Task> taskMap = new ConcurrentHashMap<>();

// 启动任务
executorService.submit(task);
// 暂停任务(线程阻塞)
task.suspend();
// 任务恢复执行
task.resume();
// 停止任务(回收线程,不回收令牌等其他资源,可以再次start用新线程执行任务)
task.stop();
// 销毁任务(回收所以资源, 由于令牌等其他业务数据被回收,除非create否则无法再次start线程)
task.destroy()

最后

本文到此结束,感谢阅读。如果您觉得不错,请关注公众号【当我遇上你】支持。