Java延时任务

前言

很多时候需要做一些延时任务, 但是有些场景可能会丢弃任务, 如下示例演示基于java的延时任务方案。

DelayQueue

  1. 首先将任务包装起来
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* @author cuishiying
* @date 2021-06-22
*/
@Data
public class DelayTask implements Delayed {

private long exeTime;
private ChatActionDTO action;

/**
* 延时任务
*
* @param exeTime 延时s
* @param action 任务
*/
public DelayTask(long exeTime, ChatActionDTO action) {
this.exeTime = System.currentTimeMillis() + (exeTime > 0 ? TimeUnit.SECONDS.toMillis(exeTime) : 0);
this.action = action;
}

/**
* 代表延时的时间
*/
@Override
public long getDelay(TimeUnit unit) {
return exeTime - System.currentTimeMillis();
}

/**
* 任务在队列中的排序
*/
@Override
public int compareTo(Delayed o) {
DelayTask t = (DelayTask) o;
if (this.exeTime - t.exeTime <= 0) {
return -1;
} else {
return 1;
}
}
}
  1. 取出延时任务
1
DelayTask delayTask = delayQueue.take();

ScheduledExecutorService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ScheduledMessageHandler implements IMessageHandler{

private Map<String, List<ScheduledFuture<?>>> delayQueues = new HashMap<>();
private static final ScheduledExecutorService executor = Executors.newScheduledThreadPool(30);

@Override
public void sendMessage(Channel channel, ChatDTO chat, Long requestTime, Integer chatId) {
cancelDelayMessageFuture(channel);
chat.getAction().stream().sorted(Comparator.comparing(ChatActionDTO::getDelayTime).thenComparing(ChatActionDTO::getOrderIndex)).forEach(action -> {
ScheduledFuture<?> future = executor.schedule(() -> {
channel.writeAndFlush(new TextWebSocketFrame(JsonUtils.to(action)));
}, action.getDelayTime(), TimeUnit.SECONDS);
addDelayMessageFuture(channel, future);
});
}

public void addDelayMessageFuture(Channel channel, ScheduledFuture<?> delayMessageFuture){
List<ScheduledFuture<?>> futures = delayQueues.computeIfAbsent(ChannelUtils.getChannelId(channel), k -> new ArrayList<>());
futures.add(delayMessageFuture);
}


public void cancelDelayMessageFuture(Channel channel){
List<ScheduledFuture<?>> futures = delayQueues.computeIfAbsent(ChannelUtils.getChannelId(channel), k -> new ArrayList<>());
for(ScheduledFuture<?> future : futures){
if(!future.isCancelled() && !future.isDone()){
future.cancel(false);
}
}
delayQueues.remove(ChannelUtils.getChannelId(channel));
}

}

最后

本文到此结束,感谢阅读。如果您觉得不错,请关注公众号【当我遇上你】,您的支持是我写作的最大动力。