线程池底层原理详解与源码分析(补充部分---ScheduledThreadPoolExecutor类分析)

乎语百科 339 0

【1】前言

  本篇幅是对 线程池底层原理详解与源码分析  的补充,默认你已经看完了上一篇对ThreadPoolExecutor类有了足够的了解。

【2】ScheduledThreadPoolExecutor的介绍

  1.ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运行任务,或者定期执行任务。ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。

  2.构造函数展示

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,new DelayedWorkQueue());
}

public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE,DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,new DelayedWorkQueue(), threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize,RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE,DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,new DelayedWorkQueue(), handler);
}

public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE,DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,new DelayedWorkQueue(), threadFactory, handler);
}

  3.通过构造函数我们可以看到,它的线程池本身就是调用ThreadPoolExecutor类的构造方法,因此也继承了ThreadPoolExecutor类所存在的隐患:

    允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。    允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。(且CPU会变成100%)

  4.PS:既然隐患这么严重,使用原生的不太合适。正所谓,人无横财不富,马无夜草不肥,打不过就加入。ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,那就写个类继承它然后调用ThreadPoolExecutor的构造方法区解决掉创建线程数被写死为最大值的情况,然后了解一下DelayedWorkQueue(这个本质上也是优先级队列),继承一下也改写吧。毕竟自己的最合适不是吗。【毕竟我觉得这些都是大佬们留给菜鸡的底版,如拒绝策略不也是四个默认都没人用吗,都是要你根据自己的场景改】(毕竟我这猜测的原因是因为有了无尽队列,其实线程数设置为Integer.MAX_VALUE已经没有意义了)

【3】ScheduledThreadPoolExecutor的使用

  1)schedule(Runnable command, long delay, TimeUnit unit)

    方法说明:无返回值的延迟任务,有个严重的问题,就是没有办法获知task的执行结果  2)schedule(Callable callable, long delay, TimeUnit unit)

    方法说明:有返回值的延迟任务 :接收的是Callable实例,会返回一个ScheduleFuture对象,通过ScheduleFuture可以取消一个未执行的task,也可以获得这个task的执行结果  3)scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) 

    方法说明: 固定频率周期任务:第一次执行的延迟根据initialDelay参数确定,以后每一次执行都间隔period时长  4)scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

    方法说明: 固定延迟周期任务 :scheduleWithFixedDelay的参数和scheduleAtFixedRate参数完全一致,它们的不同之处在于对period调度周期的解释。在scheduleAtFixedRate中,period指的两个任务开始执行的时间间隔,也就是当前任务的开始执行时间和下个任务的开始执行时间之间的间隔。而在scheduleWithFixedDelay中,period指的当前任务的结束执行时间到下个任务的开始执行时间。

【4】任务ScheduledFutureTask类源码分析

  1.构造方法展示

    代码展示

private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
...
    ScheduledFutureTask(Runnable r, V result, long triggerTime, long sequenceNumber) {
        super(r, result);
        this.time = triggerTime; //表示这个任务将要被执行的具体时间
        this.period = 0;  //表示任务执行的间隔周期
        this.sequenceNumber = sequenceNumber;  //表示这个任务被添加到ScheduledThreadPoolExecutor中的序号(采用AtomicLong原子类累加当做序号)
    }

    ScheduledFutureTask(Runnable r, V result, long triggerTime, long period, long sequenceNumber) {
        super(r, result);
        this.time = triggerTime;
        this.period = period;
        this.sequenceNumber = sequenceNumber;
    }

    ScheduledFutureTask(Callable<V> callable, long triggerTime, long sequenceNumber) {
        super(callable);
        this.time = triggerTime;
        this.period = 0;
        this.sequenceNumber = sequenceNumber;
    }

 ...
}

    代码说明

      1.三个标注的参数是任务中主要的成员变量。

      2.其次,我们会发现callable的任务是没有间隔周期的:因为callable本身就是阻塞等待,而且周期性的也不合适。

      3.实现了RunnableScheduledFuture接口,其主要方法isPeriodic()用于判断是不是周期任务,又继承了RunnableFuture接口.

      4.ScheduledFutureTask又继承了FutureTask类,而FutureTask类实现了RunnableFuture接口。(故感觉RunnableFuture接口的那些方法挺重要的)

      5.RunnableFuture接口主要是由Runnable和Future两大接口组成(自己去看继承关系),主要有run()方法。

  2.ScheduledFutureTask类#run方法

    代码展示

// 重写FutureTask,如果是周期性任务需要重新放入队列
public void run() {
    // 检查当前状态 不能执行任务,则取消任务
    if (!canRunInCurrentRunState(this))
        cancel(false);
    //如果不是周期任务,调用FutureTask.run()执行任务(非周期任务直接执行)
    else if (!isPeriodic())
        super.run();
    // 周期性任务
    else if (super.runAndReset()) {
        //与run方法的不同就是正常完成后任务的状态不会变化,依旧是NEW,且返回值为成功或失败,不会设置result属性
        setNextRunTime(); //设置任务下次执行时间
        reExecutePeriodic(outerTask);
    }
}

    代码说明

      1.这里面很明显存在一个隐患,那就是没有捕捉异常,所以如果我们自定义的run()方法中如果没有捕捉异常的话,那么出现异常的时候我们容易两眼摸瞎。

      2.故使用定时任务的时候,自定义的run方法需要自行捕捉异常进行处理。

  3.ScheduledFutureTask类#setNextRunTime方法

    代码展示

//判断指定的任务是否为定期任务
private void setNextRunTime() {
    long p = period; //取出周期时间
    if (p > 0)
        time += p; //time是周期任务的下一次执行时间
    else
        time = triggerTime(-p);
}

// ScheduledThreadPoolExecutor中的方法
long triggerTime(long delay) {
   //delay 的值是否小于 Long.MAX_VALUE 的一半,是的话,当前时间+延迟时间
    return System.nanoTime() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

// ScheduledThreadPoolExecutor中的方法
private long overflowFree(long delay) {
    //获取队列中的首节点
    Delayed head = (Delayed) super.getQueue().peek();
    //获取的节点不为空,则进行后续处理
    if (head != null) {
        //从队列节点中获取延迟时间
        long headDelay = head.getDelay(NANOSECONDS);

        //如果从队列中获取的延迟时间小于0,并且传递的delay值减去从队列节点中获取延迟时间小于0
        if (headDelay < 0 && (delay - headDelay < 0))
            //将delay的值设置为Long.MAX_VALUE + headDelay(该数字为负数)
            delay = Long.MAX_VALUE + headDelay;
    }
    //返回延迟时间
    return delay;
}

    代码说明

      1.周期时间period有正有负,这是ScheduledThreadPoolExecutor的ScheduledAtFixedRate和ScheduledWithFixedDelay的方法区别,前者为正数,后者为负数。      2.正数时,下一次执行时间为原来的执行时间+周期,即以执行开始时间为基准。      3.负数时,不考虑溢出情况,下一次执行时间为当前时间+周期,即以执行结束时间为基准。如果溢出,下一次执行时间为Long.MAX_VALUE + headDelay。

    疑问说明(这一步有兴趣的需要自己去调试然后在核心方法处断点查看就可以了)

      其实只要当做作System.nanoTime() + delay就可以了,没必要关注overflowFree这一步,原因:

        1.如果执行了  Long.MAX_VALUE + headDelay ,triggerTime方法会获得负数,示例代码

executor.scheduleAtFixedRate(task, 20, 1244574199069500L, TimeUnit.NANOSECONDS);//任延迟取最大值 稳定定时器
executor.scheduleWithFixedDelay(task, 1, 9223272036854775807L, TimeUnit.NANOSECONDS); //任务+延迟

        2.如果不执行  Long.MAX_VALUE + headDelay ,triggerTime方法也有可能获得负数,示例代码:

executor.scheduleAtFixedRate(task, 20, 4611686018427387900L, TimeUnit.NANOSECONDS);
executor.scheduleWithFixedDelay(task, 1, 9223272036854775807L, TimeUnit.NANOSECONDS); 

        3.而且获得负数在compareTo这一步不影响排序。【可能是由于科技发展的缘故吧,现在Long.MAX_VALUE【9223372036854775807L】溢出了,就会变为-9223372036854775808L,对排序不影响】

【5】ScheduledThreadPoolExecutor类源码分析

  1.ScheduledThreadPoolExecutor的四种使用方法

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<Void> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit), sequencer.getAndIncrement()));
    delayedExecute(t);
    return t;
}

public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay,  TimeUnit unit) {
    if (callable == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit), sequencer.getAndIncrement()));
    delayedExecute(t);
    return t;
}

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0L)
        throw new IllegalArgumentException();

    //这里设置的-unit.toNanos(delay)是负数
    ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), -unit.toNanos(delay), sequencer.getAndIncrement());

    //这个方法是用于以后做扩展的
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,  long initialDelay, long period,  TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0L)
        throw new IllegalArgumentException();

    //这里设置unit.toNanos(period)是正数
    ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null,  triggerTime(initialDelay, unit), unit.toNanos(period), sequencer.getAndIncrement());

    //这个方法是用于以后做扩展的
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

  2.ScheduledThreadPoolExecutor类#triggerTime方法

//获取初始的延迟执行时间(以纳秒的形式,相当于我在哪个时间点要执行)
private long triggerTime(long delay, TimeUnit unit) {
    return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}

long triggerTime(long delay) {
    return System.nanoTime() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

  3.ScheduledThreadPoolExecutor类#delayedExecute方法

private void delayedExecute(RunnableScheduledFuture<?> task) {
    //如果处于非运行状态则拒绝任务(这个方法里面比较的是不是比关闭状态大)
    if (isShutdown())
        reject(task);
    else {
        //加入队列
        super.getQueue().add(task);
        //如果加入队列后canRunInCurrentRunState检测线程池,返回false则移除任务
        if (!canRunInCurrentRunState(task) && remove(task))
            task.cancel(false); //以不可中断方式执行完成执行中的调度任务
        else
            ensurePrestart();
    }
}

boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) {
    //如果处于运行状态返回true
    if (!isShutdown())
        return true;
    //处于停止状态,整理状态,销毁状态,三者之一返回false
    if (isStopped())
        return false;
    //处于关闭状态,返回run-after-shutdown参数
    return task.isPeriodic()
        ? continueExistingPeriodicTasksAfterShutdown //默认false
        : (executeExistingDelayedTasksAfterShutdown
           || task.getDelay(NANOSECONDS) <= 0);
}

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize) //保持工作者与核心线程数持平
        addWorker(null, true);
    else if (wc == 0) //即时核心线程是0,也至少会启动一个
        addWorker(null, false);
}

【6】DelayedWorkQueue类源码分析

  0.DelayedWorkQueue类#核心属性

private static final int INITIAL_CAPACITY = 16;  // 初始容量
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
// 控制并发和阻塞等待
private final ReentrantLock lock = new ReentrantLock();
private final Condition available = lock.newCondition(); //这个可以参考take方法与offer方法,个人觉得是采用中断方式唤醒持有锁的线程
private int size; // 节点数量
private Thread leader;//记录持有锁的线程(当等待的时候)

  1.DelayedWorkQueue类#add方法

public boolean add(Runnable e) {
    return offer(e);
}

public boolean offer(Runnable x) {
    //空值校验
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    //加锁
    lock.lock();
    try {
        int i = size;
        // 超过容量,扩容
        if (i >= queue.length)
            grow();
        size = i + 1; //更新当前节点数

        if (i == 0) {  //插入的是第一个节点(阻塞队列原本为空)
            queue[0] = e;
            setIndex(e, 0); //setIndex(e, 0)用于修改ScheduledFutureTask的heapIndex属性,表示该对象在队列里的下标
        } else {//阻塞队列非空
            siftUp(i, e); //在插入新节点后对堆进行调整,进行节点上移,保持其特性(节点的值小于子节点的值)不变
        }

        /**
         * 这里最好结合take方法理解一下
         * 队列头等于当前任务,说明了当前任务的等待时间是最小的。此时为什么要去清空leader?
         * leader代表的是某一个正在等待获取元素的线程句柄,
         * 在take的时候因为之前的头结点时间未到,不能拿,被休眠了一定时间(而这个时间就是距离之前那个队列头结点的可以出队列的时间差)。
         * 此时头结点换了,理应清空句柄,唤醒它,让它再次尝试去获取最新的头结点(就算是再次休眠,时间也会比之前的少)。
         */
        if (queue[0] == e) {
            leader = null;
            available.signal();
        }
    } finally {
        lock.unlock(); //解锁
    }
    return true;
}

  2.DelayedWorkQueue类#siftUp方法

//其实把这个队列看作树结构会更容易理解(要理解数组与完全二叉树的关联)
private void siftUp(int k, RunnableScheduledFuture<?> key) {
    while (k > 0) {
        int parent = (k - 1) >>> 1; //父节点坐标
        RunnableScheduledFuture<?> e = queue[parent]; //获取父节点的值
        // 如果 节点>= 父节点,确定最终位置
        if (key.compareTo(e) >= 0)
            break;
        // 节点<父节点,将节点向上移动(就是将父节点放在k处)
        queue[k] = e;
        setIndex(e, k);
        k = parent;
    }
    //确定key的最后落脚处
    queue[k] = key;
    setIndex(key, k);
}

  3.ScheduledFutureTask类#compareTo方法

/**
 * compareTo 作用是加入元素到延迟队列后,内部建立或者调整堆时候会使用该元素的 compareTo 方法与队列里面其他元素进行比较,
 * 让最快要过期的元素放到队首。所以无论什么时候向队列里面添加元素,队首的的元素都是最即将过期的元素。
 * 如果时间相同,序列号小的排前面。
 */
public int compareTo(Delayed other) {
    if (other == this) // 如果2个指向的同一个对象,则返回0
        return 0;

    // other必须是ScheduledFutureTask类型的
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time; //两者之间的时间差
        if (diff < 0)
            return -1; //返回当前对象时间比目标对象小的标记【这个标记仅仅是标记,具体还要在上层方法逻辑中决定】
        else if (diff > 0)
            return 1;  //返回当前对象时间比目标对象大的标记
        // 时间相同,比较序列号
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }

    // 到这里,说明other不是ScheduledFutureTask类型的
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

  4.DelayedWorkQueue类#take方法

public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); //加锁,响应中断
    try {
        // 死循环自旋
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0]; //头节点
            // 队列为null,则等待在条件上
            if (first == null)
                available.await();

            //队列非空
            else {
                //判断延时时间是否满足条件
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0L)
                    return finishPoll(first);
                // 头节点时间没到,还不能取出头节点
                first = null; // 等待的时候,不要持有头节点
                if (leader != null)
                    //已经存在leader线程,当前线程await阻塞
                    available.await();
                else {
                    //如果不存在leader线程,当前线程作为leader线程,并制定头结点的延迟时间作为阻塞时间
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        //leader线程阻塞结束
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        //leader线程没有阻塞,可以找到头结点,唤醒阻塞线程
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

  5.DelayedWorkQueue类#grow方法

private void grow() {
    int oldCapacity = queue.length;
    int newCapacity = oldCapacity + (oldCapacity >> 1); //新容量为原来的1.5倍
    if (newCapacity < 0) // overflow
        newCapacity = Integer.MAX_VALUE;
    queue = Arrays.copyOf(queue, newCapacity); //从旧数组 复制到 新数组
}

  6.DelayedWorkQueue类#remove方法

public boolean remove(Object x) {
    final ReentrantLock lock = this.lock;
    lock.lock(); //加锁
    try {
        int i = indexOf(x); //定位x
        if (i < 0) //节点元素不存在
            return false;

        setIndex(queue[i], -1);
        int s = --size;
        //末节点作为替代节点
        RunnableScheduledFuture<?> replacement = queue[s];
        queue[s] = null;  //原本末节点处置空,便于GC
        if (s != i) {
            //下移,保证该节点的子孙节点保持特性
            siftDown(i, replacement);

            // queue[i] == replacement说明下移没有发生
            if (queue[i] == replacement)
                //上移,保证该节点的祖先节点保持特性
                siftUp(i, replacement);
        }
        return true;
    } finally {
        lock.unlock(); //加锁
    }
}

  7.DelayedWorkQueue类#siftDown方法

//情况说明:一般发生在队列头结点任务被取出了;这时候头结点空闲,会把队列【可看做是数组的情况会更好理解】末尾的元素【看作是树的话,上层数据要比下层的要小】放入头结点,然后向下转移,达到保持优先队列的情况。
private void siftDown(int k, RunnableScheduledFuture<?> key) {
    int half = size >>> 1;
    while (k < half) {
        int child = (k << 1) + 1; //左子节点坐标
        RunnableScheduledFuture<?> c = queue[child]; //c表示左右子节点中的较小者,暂时是左
        int right = child + 1; //右子节点坐标
        //两者进行比较,且下标没有超出数据个数
        if (right < size && c.compareTo(queue[right]) > 0)
            c = queue[child = right]; //右节点更小的话要变更数据和记录下标
        //直至找到下层没有比自身小的元素时就停下
        if (key.compareTo(c) <= 0)
            break;
        queue[k] = c;
        setIndex(c, k);
        k = child;
    }
    queue[k] = key;
    setIndex(key, k);
}

  8.DelayedWorkQueue类#finishPoll方法

// f是队列头节点(!!!)
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
    int s = --size;
    RunnableScheduledFuture<?> x = queue[s];  //取出队列尾节点的值(之后放到合适位置)
    queue[s] = null;  //置空,便于GC
    // 尾节点从0开始向下遍历调整顺序
    if (s != 0)
        siftDown(0, x);
    setIndex(f, -1); //设置f的heapIndex属性
    return f;
}
1244574199069500L

标签:

留言评论

  • 这篇文章还没有收到评论,赶紧来抢沙发吧~