前言
很多时候需要做一些延时任务, 但是有些场景可能会丢弃任务, 如下示例演示基于java的延时任务方案。
DelayQueue
- 首先将任务包装起来
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
|
@Data public class DelayTask implements Delayed {
private long exeTime; private ChatActionDTO action;
public DelayTask(long exeTime, ChatActionDTO action) { this.exeTime = System.currentTimeMillis() + (exeTime > 0 ? TimeUnit.SECONDS.toMillis(exeTime) : 0); this.action = action; }
@Override public long getDelay(TimeUnit unit) { return exeTime - System.currentTimeMillis(); }
@Override public int compareTo(Delayed o) { DelayTask t = (DelayTask) o; if (this.exeTime - t.exeTime <= 0) { return -1; } else { return 1; } } }
|
- 取出延时任务
1
| DelayTask delayTask = delayQueue.take();
|
ScheduledExecutorService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class ScheduledMessageHandler implements IMessageHandler{
private Map<String, List<ScheduledFuture<?>>> delayQueues = new HashMap<>(); private static final ScheduledExecutorService executor = Executors.newScheduledThreadPool(30);
@Override public void sendMessage(Channel channel, ChatDTO chat, Long requestTime, Integer chatId) { cancelDelayMessageFuture(channel); chat.getAction().stream().sorted(Comparator.comparing(ChatActionDTO::getDelayTime).thenComparing(ChatActionDTO::getOrderIndex)).forEach(action -> { ScheduledFuture<?> future = executor.schedule(() -> { channel.writeAndFlush(new TextWebSocketFrame(JsonUtils.to(action))); }, action.getDelayTime(), TimeUnit.SECONDS); addDelayMessageFuture(channel, future); }); }
public void addDelayMessageFuture(Channel channel, ScheduledFuture<?> delayMessageFuture){ List<ScheduledFuture<?>> futures = delayQueues.computeIfAbsent(ChannelUtils.getChannelId(channel), k -> new ArrayList<>()); futures.add(delayMessageFuture); }
public void cancelDelayMessageFuture(Channel channel){ List<ScheduledFuture<?>> futures = delayQueues.computeIfAbsent(ChannelUtils.getChannelId(channel), k -> new ArrayList<>()); for(ScheduledFuture<?> future : futures){ if(!future.isCancelled() && !future.isDone()){ future.cancel(false); } } delayQueues.remove(ChannelUtils.getChannelId(channel)); }
}
|
最后
本文到此结束,感谢阅读。如果您觉得不错,请关注公众号【当我遇上你】,您的支持是我写作的最大动力。