概述
多线程是Java开发中的基本内容, 线程有自己的生命周期,如何根据其生命周期实现调度呢?下文见分晓。
一些业务中,任务需要创建、启动、暂停、恢复、停止、销毁等操作,映射到线程就是 create
、start
、block
、running
、interrupte
、stop
。
在笔者的业务中, 目标是实现一个并发调度服务,每个任务的并发控制用一个线程来处理。
业务实现
线程池创建
由于线程与任务关联,每个任务的执行只是调度,无IO密集和CPU密集之说,所以不需要核心线程。直接根据任务数动态的创建和回收线程即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
@Configuration public class BeanConfig {
@Bean("executorService") public ExecutorService executorService() { ExecutorService executorService = Executors.newCachedThreadPool(); return executorService; }
}
|
作为一个服务,线程池是不需要回收的,只需要回收线程即可。线程池的使用如下:
1 2 3 4 5
| @Autowired private ExecutorService executorService;
executorService.submit(task);
|
核心实现(任务)

|
@Slf4j public class Task implements Callable<Job>, Serializable {
private final Integer DEFAULT_BLOCK_TIME = 1000 * 5; private final Integer MIN_BLOCK_TIME = 500; private final Integer MAX_BLOCK_TIME = 1000 * 60 * 10;
private volatile boolean suspend = false;
private volatile boolean running = true;
private final Object lock = new Object();
private Thread currentThread;
private Job job;
private JobService jobService;
private TokenService tokenService;
private CallService callService;
private TaskStorage taskStorage;
private Task(Job job, JobService jobService, TokenService tokenService, CallService callService, TaskStorage taskStorage) { this.job = job; this.jobService = jobService; this.tokenService = tokenService; this.callService = callService; this.taskStorage = taskStorage; this.onCreate(); }
@Override public Job call() throws Exception {
onStart(); currentThread = Thread.currentThread(); while (running && !Thread.currentThread().isInterrupted()) {
if (suspend) { System.out.println("task:" + this.job.getJobName() + " suspend..."); synchronized (lock) { try { lock.wait(); } catch (InterruptedException e) { break; } } }
System.out.println("\ntask:" + this.job.getJobName() + " running...");
String token = tokenService.acquire(job); if (StringUtils.isNotEmpty(token)) {
CallData callData = jobService.loadTaskData(job);
if (callData == null) {
Job taskInfo = jobService.getTaskInfo(job); if (taskInfo.getTaskAction() == TaskActionEnum.SUSPEND.getValue()) { System.out.println("所有任务数据执行完成...暂停任务"); this.suspend(); } else if (taskInfo.getTaskAction() == TaskActionEnum.STOP.getValue()) { System.out.println("所有任务数据执行完成...关闭任务"); this.stop(); } else { System.out.println("所有任务数据执行完成...销毁任务"); this.destroy(); }
} else { callData.setToken(token); callService.call(job, callData); continue; } }
try { long blockTime = job.getBlockTime(); if (blockTime < MIN_BLOCK_TIME || blockTime > MAX_BLOCK_TIME) { blockTime = DEFAULT_BLOCK_TIME; } System.out.println("获取不到令牌,睡眠" + blockTime/1000 + "s后再次获取..."); Thread.sleep(blockTime);
} catch (InterruptedException e) { break; } } return null; }
public Job getJob() { return job; }
public void suspend() { onPause(); if (currentThread == null) { throw new RuntimeException("任务未启动"); }
this.suspend = true; }
public void resume() { onResume(); if (currentThread == null) { throw new RuntimeException("任务未启动"); } if (currentThread.isAlive() && !currentThread.isInterrupted()) { System.out.println("当前任务正在执行, 无需重复操作..."); } this.suspend = false; synchronized (lock) { lock.notifyAll(); } }
public void stop() { onStop(); if (currentThread == null) { System.out.println("任务未启动"); return;
} if (!currentThread.isAlive() && currentThread.isInterrupted()) { System.out.println("当前任务已停止, 无需重复操作..."); } else { currentThread.interrupt(); currentThread = null; }
}
public void destroy() { onDestroy(); System.out.println("\n"); log.info("SchedulerService:stop:appName:[{}]:taskId:[{}]调度任务结束, 即将释放资源...", job.getAppName(), job.getTaskId());
jobService.clearTaskInfo(job);
tokenService.destroyToken(job);
this.stop();
taskStorage.removeTask(this);
log.info("SchedulerService:stop:appName:[{}]:taskId:[{}]调度任务结束, 资源释放完成...", job.getAppName(), job.getTaskId()); }
private void onCreate() { System.out.println("\nonCreate..."); }
private void onStart() { System.out.println("\nonStart..."); };
private void onPause() { System.out.println("\nonPause..."); };
private void onResume() { System.out.println("\nonResume..."); };
private void onStop() { System.out.println("\nonStop..."); };
private void onDestroy() { System.out.println("\nonDestroy..."); };
public static Builder builder() { return new Task.Builder(); }
public static class Builder {
private Job job;
private JobService jobService;
private TokenService tokenService;
private CallService callService;
private TaskStorage taskStorage;
public Builder() { }
public Builder setJob(Job job) { this.job = job; return this; }
public Builder setJobService(JobService jobService) { this.jobService = jobService; return this; }
public Builder setTokenService(TokenService tokenService) { this.tokenService = tokenService; return this; }
public Builder setCallService(CallService callService) { this.callService = callService; return this; }
public Builder setTaskStorage(TaskStorage taskStorage) { this.taskStorage = taskStorage; return this; }
public Task build() { return new Task(job, jobService, tokenService, callService, taskStorage); } }
public static void main(String[] args) throws Exception{
Job jobDetail = new Job("job1");
ExecutorService executorService = Executors.newCachedThreadPool(); Task task = Task.builder().setJob(jobDetail).build();
Map<Integer, Task> tasks = new HashMap<Integer, Task>(); tasks.put(1, task);
Future<?> future = executorService.submit(task);
TimeUnit.SECONDS.sleep(2); Task task1 = tasks.get(1); task1.suspend();
TimeUnit.SECONDS.sleep(2); task1.resume();
TimeUnit.SECONDS.sleep(2); task1.stop();
executorService.shutdown(); } }
|
调度
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
public final static Map<Serializable, Task> taskMap = new ConcurrentHashMap<>();
executorService.submit(task);
task.suspend();
task.resume();
task.stop();
task.destroy()
|
最后
本文到此结束,感谢阅读。如果您觉得不错,请关注公众号【当我遇上你】支持。