概述
多线程是Java开发中的基本内容, 线程有自己的生命周期,如何根据其生命周期实现调度呢?下文见分晓。
一些业务中,任务需要创建、启动、暂停、恢复、停止、销毁等操作,映射到线程就是 create
、start
、block
、running
、interrupte
、stop
。
在笔者的业务中, 目标是实现一个并发调度服务,每个任务的并发控制用一个线程来处理。
业务实现
线程池创建
由于线程与任务关联,每个任务的执行只是调度,无IO密集和CPU密集之说,所以不需要核心线程。直接根据任务数动态的创建和回收线程即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
@Configuration public class BeanConfig {
@Bean("executorService") public ExecutorService executorService() { ExecutorService executorService = Executors.newCachedThreadPool(); return executorService; }
}
|
作为一个服务,线程池是不需要回收的,只需要回收线程即可。线程池的使用如下:
1 2 3 4 5
| @Autowired private ExecutorService executorService;
executorService.submit(task);
|
核心实现(任务)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
|
@Slf4j public class Task implements Callable<Job>, Serializable {
private final Integer DEFAULT_BLOCK_TIME = 1000 * 5; private final Integer MIN_BLOCK_TIME = 500; private final Integer MAX_BLOCK_TIME = 1000 * 60 * 10;
private volatile boolean suspend = false;
private volatile boolean running = true;
private final Object lock = new Object();
private Thread currentThread;
private Job job;
private JobService jobService;
private TokenService tokenService;
private CallService callService;
private TaskStorage taskStorage;
private Task(Job job, JobService jobService, TokenService tokenService, CallService callService, TaskStorage taskStorage) { this.job = job; this.jobService = jobService; this.tokenService = tokenService; this.callService = callService; this.taskStorage = taskStorage; this.onCreate(); }
@Override public Job call() throws Exception {
onStart(); currentThread = Thread.currentThread(); while (running && !Thread.currentThread().isInterrupted()) {
if (suspend) { System.out.println("task:" + this.job.getJobName() + " suspend..."); synchronized (lock) { try { lock.wait(); } catch (InterruptedException e) { break; } } }
System.out.println("\ntask:" + this.job.getJobName() + " running...");
String token = tokenService.acquire(job); if (StringUtils.isNotEmpty(token)) {
CallData callData = jobService.loadTaskData(job);
if (callData == null) {
Job taskInfo = jobService.getTaskInfo(job); if (taskInfo.getTaskAction() == TaskActionEnum.SUSPEND.getValue()) { System.out.println("所有任务数据执行完成...暂停任务"); this.suspend(); } else if (taskInfo.getTaskAction() == TaskActionEnum.STOP.getValue()) { System.out.println("所有任务数据执行完成...关闭任务"); this.stop(); } else { System.out.println("所有任务数据执行完成...销毁任务"); this.destroy(); }
} else { callData.setToken(token); callService.call(job, callData); continue; } }
try { long blockTime = job.getBlockTime(); if (blockTime < MIN_BLOCK_TIME || blockTime > MAX_BLOCK_TIME) { blockTime = DEFAULT_BLOCK_TIME; } System.out.println("获取不到令牌,睡眠" + blockTime/1000 + "s后再次获取..."); Thread.sleep(blockTime);
} catch (InterruptedException e) { break; } } return null; }
public Job getJob() { return job; }
public void suspend() { onPause(); if (currentThread == null) { throw new RuntimeException("任务未启动"); }
this.suspend = true; }
public void resume() { onResume(); if (currentThread == null) { throw new RuntimeException("任务未启动"); } if (currentThread.isAlive() && !currentThread.isInterrupted()) { System.out.println("当前任务正在执行, 无需重复操作..."); } this.suspend = false; synchronized (lock) { lock.notifyAll(); } }
public void stop() { onStop(); if (currentThread == null) { System.out.println("任务未启动"); return;
} if (!currentThread.isAlive() && currentThread.isInterrupted()) { System.out.println("当前任务已停止, 无需重复操作..."); } else { currentThread.interrupt(); currentThread = null; }
}
public void destroy() { onDestroy(); System.out.println("\n"); log.info("SchedulerService:stop:appName:[{}]:taskId:[{}]调度任务结束, 即将释放资源...", job.getAppName(), job.getTaskId());
jobService.clearTaskInfo(job);
tokenService.destroyToken(job);
this.stop();
taskStorage.removeTask(this);
log.info("SchedulerService:stop:appName:[{}]:taskId:[{}]调度任务结束, 资源释放完成...", job.getAppName(), job.getTaskId()); }
private void onCreate() { System.out.println("\nonCreate..."); }
private void onStart() { System.out.println("\nonStart..."); };
private void onPause() { System.out.println("\nonPause..."); };
private void onResume() { System.out.println("\nonResume..."); };
private void onStop() { System.out.println("\nonStop..."); };
private void onDestroy() { System.out.println("\nonDestroy..."); };
public static Builder builder() { return new Task.Builder(); }
public static class Builder {
private Job job;
private JobService jobService;
private TokenService tokenService;
private CallService callService;
private TaskStorage taskStorage;
public Builder() { }
public Builder setJob(Job job) { this.job = job; return this; }
public Builder setJobService(JobService jobService) { this.jobService = jobService; return this; }
public Builder setTokenService(TokenService tokenService) { this.tokenService = tokenService; return this; }
public Builder setCallService(CallService callService) { this.callService = callService; return this; }
public Builder setTaskStorage(TaskStorage taskStorage) { this.taskStorage = taskStorage; return this; }
public Task build() { return new Task(job, jobService, tokenService, callService, taskStorage); } }
public static void main(String[] args) throws Exception{
Job jobDetail = new Job("job1");
ExecutorService executorService = Executors.newCachedThreadPool(); Task task = Task.builder().setJob(jobDetail).build();
Map<Integer, Task> tasks = new HashMap<Integer, Task>(); tasks.put(1, task);
Future<?> future = executorService.submit(task);
TimeUnit.SECONDS.sleep(2); Task task1 = tasks.get(1); task1.suspend();
TimeUnit.SECONDS.sleep(2); task1.resume();
TimeUnit.SECONDS.sleep(2); task1.stop();
executorService.shutdown(); } }
|
调度
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
public final static Map<Serializable, Task> taskMap = new ConcurrentHashMap<>();
executorService.submit(task);
task.suspend();
task.resume();
task.stop();
task.destroy()
|
最后
本文到此结束,感谢阅读。如果您觉得不错,请关注公众号【当我遇上你】支持。