task-scheduler
请实现一个定时任务调度器,有很多任务,每个任务都有一个时间戳,任务会在该时间点开始执行。
定时执行任务是一个很常见的需求,例如 Uber 打车 48 小时后自动好评,淘宝购物 15 天后默认好评,等等。
方案 1: PriorityBlockingQueue + Polling
我们很快可以想到第一个办法:
- 用一个java.util.concurrent.PriorityBlockingQueue来作为优先队列。因为我们需要一个优先队列,又需要线程安全,用PriorityBlockingQueue再合适不过了。你也可以手工实现一个自己的PriorityBlockingQueue,用java.util.PriorityQueue+ReentrantLock,用一把锁把这个队列保护起来,就是线程安全的啦
- 对于生产者,可以用一个while(true),造一些随机任务塞进去
- 对于消费者,起一个线程,在 while(true)里每隔几秒检查一下队列,如果有任务,则取出来执行。
这个方案的确可行,总结起来就是轮询(polling)。轮询通常有个很大的缺点,就是时间间隔不好设置,间隔太长,任务无法及时处理,间隔太短,会很耗 CPU。
方案 2: PriorityBlockingQueue + 时间差
可以把方案 1 改进一下,while(true)里的逻辑变成:
- 偷看一下堆顶的元素,但并不取出来,如果该任务过期了,则取出来
- 如果没过期,则计算一下时间差,然后 sleep()该时间差
不再是 sleep() 一个固定间隔了,消除了轮询的缺点。
稍等!这个方案其实有个致命的缺陷,导致它比 PiorityBlockingQueue + Polling 更加不可用,这个缺点是什么呢?。。。假设当前堆顶的任务在 100 秒后执行,消费者线程 peek()偷看到了后,开始 sleep 100 秒,这时候一个新的任务插了进来,该任务在 10 秒后应该执行,但是由于消费者线程要睡眠 100 秒,这个新任务无法及时处理。
方案 3: DelayQueue
方案 2 虽然已经不错了,但是还可以优化一下,Java 里有一个DelayQueue,完全符合题目的要求。DelayQueue 设计得非常巧妙,可以看做是一个特化版的PriorityBlockingQueue,它把计算时间差并让消费者等待该时间差的功能集成进了队列,消费者不需要关心时间差的事情了,直接在while(true)里不断take()就行了。
DelayQueue 的实现原理见下面的代码。
import java.util.PriorityQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
public class DelayQueue<E extends Delayed> {
    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    private final Condition available = lock.newCondition();
    private Thread leader = null;
    public DelayQueue() {}
    /**
     * Inserts the specified element into this delay queue.
     *
     * @param e the element to add
     * @return {@code true}
     * @throws NullPointerException if the specified element is null
     */
    public boolean put(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
    /**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element with an expired delay is available on this queue.
     *
     * @return the head of this queue
     * @throws InterruptedException {@inheritDoc}
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
}
这个代码中有几个要点要注意一下。
1. put()方法
if (q.peek() == e) {
    leader = null;
    available.signal();
}
如果第一个元素等于刚刚插入进去的元素,说明刚才队列是空的。现在队列里有了一个任务,那么就应该唤醒所有在等待的消费者线程,避免了方案 2 的缺点。将leader重置为 null,这些消费者之间互相竞争,自然有一个会被选为 leader。
2. 线程 leader 的作用
leader这个成员有啥作用?DelayQueue 的设计其实是一个 Leader/Follower 模式,leader就是指向 Leader 线程的。该模式可以减少不必要的等待时间,当一个线程是 Leader 时,它只需要一个时间差;其他 Follower 线程则无限等待。比如头节点任务还有 5 秒就要开始了,那么 Leader 线程会 sleep 5 秒,不需要傻傻地等待固定时间间隔。
想象一下有个多个消费者线程用 take 方法去取任务,内部先加锁,然后每个线程都去 peek 头节点。如果 leader 不为空说明已经有线程在取了,让当前消费者无限等待。
if (leader != null)
   available.await();
如果为空说明没有其他消费者去取任务,设置 leader 为当前消费者,并让改消费者等待指定的时间,
else {
    Thread thisThread = Thread.currentThread();
    leader = thisThread;
    try {
         available.awaitNanos(delay);
    } finally {
         if (leader == thisThread)
             leader = null;
    }
}
下次循环会走如下分支,取到任务结束,
if (delay <= 0)
    return q.poll();
3. take()方法中为什么释放 first
first = null; // don't retain ref while waiting
我们可以看到 Doug Lea 后面写的注释,那么这行代码有什么用呢?
如果删除这行代码,会发生什么呢?假设现在有 3 个消费者线程,
- 线程 A 进来获取 first,然后进入 else 的 else ,设置了 leader 为当前线程 A,并让 A 等待一段时间
- 线程 B 进来获取 first, 进入 else 的阻塞操作,然后无限期等待,这时线程 B 是持有 first 引用的
- 线程 A 等待指定时间后被唤醒,获取对象成功,出队,这个对象理应被 GC 回收,但是它还被线程 B 持有着,GC 链可达,所以不能回收这个 first
- 只要线程 B 无限期的睡眠,那么这个本该被回收的对象就不能被 GC 销毁掉,那么就会造成内存泄露
Task 对象
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class Task implements Delayed {
    private String name;
    private long startTime;  // milliseconds
    public Task(String name, long delay) {
        this.name = name;
        this.startTime = System.currentTimeMillis() + delay;
    }
    @Override
    public long getDelay(TimeUnit unit) {
        long diff = startTime - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }
    @Override
    public int compareTo(Delayed o) {
        return (int)(this.startTime - ((Task) o).startTime);
    }
    @Override
    public String toString() {
        return "task " + name + " at " + startTime;
    }
}
JDK 中有一个接口java.util.concurrent.Delayed,可以用于表示具有过期时间的元素,刚好可以拿来表示任务这个概念。
生产者
import java.util.Random;
import java.util.UUID;
public class TaskProducer implements Runnable {
    private final Random random = new Random();
    private DelayQueue<Task> q;
    public TaskProducer(DelayQueue<Task> q) {
        this.q = q;
    }
    @Override
    public void run() {
        while (true) {
            try {
                int delay = random.nextInt(10000);
                Task task = new Task(UUID.randomUUID().toString(), delay);
                System.out.println("Put " + task);
                q.put(task);
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
生产者很简单,就是一个死循环,不断地产生一些是时间随机的任务。
消费者
public class TaskConsumer implements Runnable {
    private DelayQueue<Task> q;
    public TaskConsumer(DelayQueue<Task> q) {
        this.q = q;
    }
    @Override
    public void run() {
        while (true) {
            try {
                Task task = q.take();
                System.out.println("Take " + task);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
当 DelayQueue 里没有任务时,TaskConsumer会无限等待,直到被唤醒,因此它不会消耗 CPU。
定时任务调度器
public class TaskScheduler {
    public static void main(String[] args) {
        DelayQueue<Task> queue = new DelayQueue<>();
        new Thread(new TaskProducer(queue), "Producer Thread").start();
        new Thread(new TaskConsumer(queue), "Consumer Thread").start();
    }
}
DelayQueue 这个方案,每个消费者线程只需要等待所需要的时间差,因此响应速度更快。它内部用了一个优先队列,所以插入和删除的时间复杂度都是$$\log n$$。
JDK 里还有一个ScheduledThreadPoolExecutor,原理跟 DelayQueue 类似,封装的更完善,平时工作中可以用它,不过面试中,还是拿 DelayQueue 来讲吧,它封装得比较薄,容易讲清楚原理。
方案 4: 时间轮(HashedWheelTimer)
时间轮(HashedWheelTimer)其实很简单,就是一个循环队列,如下图所示,
  
上图是一个长度为 8 的循环队列,假设该时间轮精度为秒,即每秒走一格,像手表那样,走完一圈就是 8 秒。每个格子指向一个任务集合,时间轮无限循环,每转到一个格子,就扫描该格子下面的所有任务,把时间到期的任务取出来执行。
举个例子,假设指针当前正指向格子 0,来了一个任务需要 4 秒后执行,那么这个任务就会放在格子 4 下面,如果来了一个任务需要 20 秒后执行怎么?由于这个循环队列转一圈只需要 8 秒,这个任务需要多转 2 圈,所以这个任务的位置虽然依旧在格子 4(20%8+0=4)下面,不过需要多转 2 圈后才执行。因此每个任务需要有一个字段记录需圈数,每转一圈就减 1,减到 0 则立刻取出来执行。
怎么实现时间轮呢?Netty 中已经有了一个时间轮的实现, HashedWheelTimer.java,可以参考它的源代码。
时间轮的优点是性能高,插入和删除的时间复杂度都是 O(1)。Linux 内核中的定时器采用的就是这个方案。
Follow up: 如何设计一个分布式的定时任务调度器呢? 答: Redis ZSet, RabbitMQ 等
参考资料
- java.util.concurrent.DelayQueue
- HashedWheelTimer.java - Github
- delayQueue 原理理解之源码解析 - 简书
- 细说延时任务的处理 - 简书
- 延迟任务的实现总结 - nick hao - 博客园
- 定时器(Timer)的实现
- java.util.concurrent.DelayQueue Example
- HashedWheelTimer 原理 - ZimZz - 博客园
- Hash 算法系列-具体算法(HashedWheelTimer) - CSDN
- java Disruptor 工作原理,谁能用一个比喻形容下? - 知乎
- 1 分钟实现“延迟消息”功能 - 58 沈剑
- Linux 下定时器的实现方式分析 - IBM
- 1 分钟了解 Leader-Follower 线程模型 - 58 沈剑
