2018/4/16 15:09:59当前位置推荐好文程序员浏览文章

线程池就是维持几个工作线程,而后从任务队列中获取任务执行。所以要实现延时或者者定时执行任务,就要做到以下三点:

  1. 任务要可以返回它的延时时间和能否为定时任务。
  2. 任务队列要根据任务的延时时间进行排序。这个我们在上一章DelayedWorkQueue原理分析中已经讲解过了。
  3. 假如是定时任务,任务执行完成之后,还能再次执行它。

所以要分析ScheduledThreadPoolExecutor原理关键就是对它的任务类ScheduledFutureTask的分析。

    private class ScheduledFutureTask<V>            extends FutureTask<V> implements RunnableScheduledFuture<V> {}

FutureTask类我们已经在以前的文章分析过了,而RunnableScheduledFuture接口的作消耗是什么呢?

一. RunnableScheduledFuture接口

我们知道要实现延时或者者定时执行任务,任务要可以返回它的延时时间,任务能否为定时任务,任务可以根据延时时间排序。所以能想象出RunnableScheduledFuture接口中的方法了。

public interface Comparable<T> {    // 比较两个实例的大小    public int compareTo(T o);}/  实现Comparable接口,说明Delayed实例能进行比较 /public interface Delayed extends Comparable<Delayed> {    /      @return 返回剩余的延时时间     /    long getDelay(TimeUnit unit);}/  实现了Delayed接口,能返回延时时间以及可以够根据延时时间进行比较 /public interface ScheduledFuture<V> extends Delayed, Future<V> {}// 继承自Runnable和Future接口。能当做Runnable实例使消耗public interface RunnableFuture<V> extends Runnable, Future<V> {    // 运行任务    void run();}public interface RunnableScheduledFuture<V> extends                  RunnableFuture<V>, ScheduledFuture<V> {    /      能否为周期定时任务      @return 返回true,表示是定时任务     /    boolean isPeriodic();}

RunnableScheduledFuture接口中,最重要的就是这四个方法:

  1. compareTo(T o): 能比较任务的延时时间,进行排序消耗的。
  2. getDelay(TimeUnit unit): 返回任务剩余的延时时间
  3. run(): 运行任务。假如是定时任务,任务完成之后,还能继续执行。
  4. isPeriodic(): 能否为周期定时任务

二. 何时创立ScheduledFutureTask任务

何时创立ScheduledFutureTask任务,就是ScheduledExecutorService接口四个方法会创立ScheduledFutureTask任务实例。

2.1 创立延时任务

    // 给定的推迟时间delay之后,才会执行任务comm   public ScheduledFuture<?> schedule(Runnable command,                                       long delay,                                       TimeUnit unit) {        if (comm== null || unit == null)            throw new NullPointerException();        /          创立一个延时任务ScheduledFutureTask实例。          triggerTime方法会根据给定的延时时间delay,返回任务开始的时间。          decorateTask方法是让子类可以够修饰ScheduledFutureTask任务实例,本类中没做解决。         /        RunnableScheduledFuture<?> t = decorateTask(command,            new ScheduledFutureTask<Void>(command, null,                                          triggerTime(delay, unit)));        // 延时执行任务        delayedExecute(t);        return t;    }    // 给定的推迟时间delay之后,才会执行任务callable    public <V> ScheduledFuture<V> schedule(Callable<V> callable,                                           long delay,                                           TimeUnit unit) {        if (callable == null || unit == null)            throw new NullPointerException();        /          创立一个延时任务ScheduledFutureTask实例。          triggerTime方法会根据给定的延时时间delay,返回任务开始的时间。          decorateTask方法是让子类可以够修饰ScheduledFutureTask任务实例,本类中没做解决。         /        RunnableScheduledFuture<V> t = decorateTask(callable,            new ScheduledFutureTask<V>(callable,                                       triggerTime(delay, unit)));        // 延时执行任务        delayedExecute(t);        return t;    }

这两个方法的流程是一样的,只不过一个是Runnable类型的任务,一个Callable类型的任务。

  1. 调消耗triggerTime方法根据给定的延时时间delay,返回任务开始的时间。
  2. 创立一个延时任务ScheduledFutureTask实例。
  3. 调消耗decorateTask方法是让子类可以够修饰ScheduledFutureTask任务实例。
  4. 调消耗delayedExecute(t)方法延时执行任务。

2.2 创立延时定时任务

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,                                                  long initialDelay,                                                  long period,                                                  TimeUnit unit) {        if (comm== null || unit == null)            throw new NullPointerException();        if (period <= 0)            throw new IllegalArgumentException();        /          创立一个延时定时任务ScheduledFutureTask实例。          triggerTime方法会根据给定的延时时间delay,返回任务开始的时间。         /        ScheduledFutureTask<Void> sft =            new ScheduledFutureTask<Void>(command,                                          null,                                          triggerTime(initialDelay, unit),                                          unit.toNanos(period));        // decorateTask方法是让子类可以够修饰ScheduledFutureTask任务实例,本类中没做解决。        RunnableScheduledFuture<Void> t = decorateTask(command, sft);        // outerTask作消耗就是下一次定时执行的任务,在reExecutePeriodic方法中需要        sft.outerTask = t;        // 延时执行任务        delayedExecute(t);        return t;    }    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,                                                     long initialDelay,                                                     long delay,                                                     TimeUnit unit) {        if (comm== null || unit == null)            throw new NullPointerException();        if (delay <= 0)            throw new IllegalArgumentException();        /          创立一个延时定时任务ScheduledFutureTask实例。          triggerTime方法会根据给定的延时时间delay,返回任务开始的时间。         /        ScheduledFutureTask<Void> sft =            new ScheduledFutureTask<Void>(command,                                          null,                                          triggerTime(initialDelay, unit),                                          unit.toNanos(-delay));        // decorateTask方法是让子类可以够修饰ScheduledFutureTask任务实例,本类中没做解决。        RunnableScheduledFuture<Void> t = decorateTask(command, sft);        // outerTask作消耗就是下一次定时执行的任务,在reExecutePeriodic方法中需要        sft.outerTask = t;        // 延时执行任务        delayedExecute(t);        return t;    }

我们知道scheduleAtFixedRate方法是固定周期时间去执行任务,而scheduleWithFixedDelay方法是在任务完成之后,延时delay时间再去执行任务。
但是我们发现这两个方法几乎是一模一样的,唯一不同地就是创立延时定时任务ScheduledFutureTask实例时,一个传递的是正数period,一个传递的是负数-delay。

  1. 调消耗triggerTime方法根据给定的延时时间delay,返回任务开始的时间。
  2. 创立一个延时定时任务ScheduledFutureTask实例。
  3. 调消耗decorateTask方法是让子类可以够修饰ScheduledFutureTask任务实例。
  4. 设置outerTask,作消耗就是下一次定时执行的任务,在reExecutePeriodic方法中需要。
  5. 延时执行任务。

2.3 triggerTime方法

    /      返回延时任务开始的时间     /    private long triggerTime(long delay, TimeUnit unit) {        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));    }    /      返回延时任务开始的时间。利消耗当前时间加上给定的延时时间     /    long triggerTime(long delay) {        return now() +            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));    }

2.4 decorateTask方法

    /      让子类可以够修饰或者者替换任务task。      这里只是简单地返回task     /    protected <V> RunnableScheduledFuture<V> decorateTask(        Runnable runnable, RunnableScheduledFuture<V> task) {        return task;    }

2.5 delayedExecute方法

    private void delayedExecute(RunnableScheduledFuture<?> task) {        // 假如线程池不是RUNNING状态,那么调消耗reject(task)方法,        // 拒绝执行任务task        if (isShutdown())            reject(task);        else {            // 将任务增加到任务队列中,会根据任务的延时时间进行排序            super.getQueue().add(task);            /              假如线程池不是RUNNING状态,那么就判断可以不可以在当前状态下运行,              主要就是可以不可以在SHUTDOWN状态下运行。              假如不可以在当前状态下运行,那么就调消耗remove方法,              从任务队列中移除刚刚增加的任务task。                           只有移除成功了,才能调消耗task.cancel(false)方法取消任务,              否则这个延时任务task都还要执行。             /            if (isShutdown() &&                !canRunInCurrentRunState(task.isPeriodic()) &&                remove(task))                task.cancel(false);            else                // 预先启动工作线程,确保线程池中有工作线程。                ensurePrestart();        }    }

这个方法的主要作消耗就是将任务增加到任务队列中,由于这里任务队列是优先级队列DelayedWorkQueue,它会根据任务的延时时间进行排序。

  1. 假如线程池不是RUNNING状态,不可以执行延时任务task,那么调消耗reject(task)方法,拒绝执行任务task。
  2. 将任务增加到任务队列中,会根据任务的延时时间进行排序。
  3. 由于是多线程并发环境,就必需判断在增加任务的过程中,线程池状态能否被别的线程更改了,那么即可可以要取消任务了。
  4. 将任务增加到任务队列后,还要确保线程池中有工作线程,不然任务也不为执行。所以ensurePrestart()方法预先启动工作线程,确保线程池中有工作线程。

与ThreadPoolExecutor类中execute方法执行任务方法不同:

  1. 由于是延时任务task,所以不可以将任务当成工作线程第一个任务,只可以将任务增加到任务队列中,等待着工作线程来执行。
  2. 任务队列DelayedWorkQueue容量几乎是无限的,所以也不需要最大池。除非核心池数量是0,那么必需创立一个工作线程来运行任务,否则线程池的线程数不可可以超过核心池数量。
  3. 当线程池状态被别的线程改变时,取消任务的判断条件不同。当线程池状态不是RUNNING,而且不可以在当前状态下运行,那么就调消耗remove方法,移除刚刚增加的任务task。只有移除任务成功,才能调消耗task.cancel(false)方法取消任务,否则这个延时任务task都还要执行。

2.6 canRunInCurrentRunState方法

    // 判断可以不可以在当前状态下运行    boolean canRunInCurrentRunState(boolean periodic) {        // 调消耗父类ThreadPoolExecutor中的isRunningOrShutdown方法        return isRunningOrShutdown(periodic ?                                   continueExistingPeriodicTasksAfterShutdown :                                   executeExistingDelayedTasksAfterShutdown);    }    /      是ThreadPoolExecutor中的方法      @param shutdownOK 表示线程池可以在SHUTDOWN状态下运行     /    final boolean isRunningOrShutdown(boolean shutdownOK) {        // 线程状态        int rs = runStateOf(ctl.get());        // 当线程是RUNNING状态,或者者是SHUTDOWN状态且shutdownOK也为true        return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);    }

canRunInCurrentRunState方法就是返回线程池可以不可以在当前状态下运行。

注: continueExistingPeriodicTasksAfterShutdown与executeExistingDelayedTasksAfterShutdown都是提供给外部设置。

2.7 ensurePrestart方法

    void ensurePrestart() {        // 线程池中的线程数量        int wc = workerCountOf(ctl.get());        // 假如小于核心池数量,就创立新的工作线程        if (wc < corePoolSize)            addWorker(null, true);        // 说明corePoolSize数量是0,必需创立一个工作线程来执行任务        else if (wc == 0)            addWorker(null, false);    }

ensurePrestart方法作消耗:预先启动工作线程,确保线程池中有工作线程。

三. ScheduledFutureTask类

ScheduledFutureTask是一个延时定时任务,它能返回任务剩余延时时间,能被周期性地执行。

3.1 重要成员属性

        / 是一个序列,每次创立任务的时候,都会自增。 /        private final long sequenceNumber;        / 任务可以够开始执行的时间 /        private long time;        /          任务周期执行的时间          0表示不是一个周期定时任务          正数表示固定周期时间去执行任务          负数表示任务完成之后,延时period时间再去执行任务         /        private final long period;        / 表示再次执行的任务,在reExecutePeriodic中调消耗 /        RunnableScheduledFuture<V> outerTask = this;        /          表示在任务队列中的索引位置,消耗来支持快速从队列中删除任务。         /        int heapIndex;

属性说明:

  1. sequenceNumber: 是一个序列,每次创立任务的时候,都会自增。
  2. time: 任务可以够开始执行的时间。
  3. period: 任务周期执行的时间。0表示不是一个周期定时任务。
  4. outerTask: 表示再次执行的任务,在reExecutePeriodic中调消耗
  5. heapIndex: 表示在任务队列中的索引位置,消耗来支持快速从队列中删除任务。

3.2 构造函数

3.2.1 创立延时任务

        /          创立延时任务         /        ScheduledFutureTask(Runnable r, V result, long ns) {            // 调消耗父类的方法            super(r, result);            // 任务开始的时间            this.time = ns;            // period是0,不是一个周期定时任务            this.period = 0;            // 每次创立任务的时候,sequenceNumber都会自增            this.sequenceNumber = sequencer.getAndIncrement();        }         /          创立延时任务         /        ScheduledFutureTask(Callable<V> callable, long ns) {            // 调消耗父类的方法            super(callable);            // 任务开始的时间            this.time = ns;            // period是0,不是一个周期定时任务            this.period = 0;            // 每次创立任务的时候,sequenceNumber都会自增            this.sequenceNumber = sequencer.getAndIncrement();        }

3.2.2 创立延时定时任务

        /          创立延时定时任务         /        ScheduledFutureTask(Runnable r, V result, long ns, long period) {            // 调消耗父类的方法            super(r, result);            // 任务开始的时间            this.time = ns;            // 周期定时时间            this.period = period;            // 每次创立任务的时候,sequenceNumber都会自增            this.sequenceNumber = sequencer.getAndIncrement();        }

3.3 运行任务run方法

        public void run() {            // 能否是周期任务            boolean periodic = isPeriodic();            // 假如不可以在当前状态下运行,那么就要取消任务            if (!canRunInCurrentRunState(periodic))                cancel(false);            // 假如只是延时任务,那么就调消耗run方法,运行任务。            else if (!periodic)                ScheduledFutureTask.super.run();            // 假如是周期定时任务,调消耗runAndReset方法,运行任务。            // 这个方法不会改变任务的状态,所以能反复执行。            else if (ScheduledFutureTask.super.runAndReset()) {                // 设置周期任务下一次执行的开始时间time                setNextRunTime();                // 重新执行任务outerTask                reExecutePeriodic(outerTask);            }        }

这个方法会在ThreadPoolExecutor的runWorker方法中调消耗,而且这个方法调消耗,说明一定已经到了任务的开始时间time了。

  1. 先判断当前线程状态可以不可以运行任务,假如不可以,就调消耗cancel()方法取消本任务。
  2. 假如任务只是一个延时任务,那么调消耗父类的run()运行任务,改变任务的状态,表示任务已经运行完成了。
  3. 假如任务只是一个周期定时任务,那么就任务必需可以够反复执行,那么就不可以调消耗run()方法,它会改变任务的状态。而是调消耗runAndReset()方法,只是简单地运行任务,而不会改变任务状态。
  4. 设置周期任务下一次执行的开始时间time,并重新执行任务。

setNextRunTime设置任务下一次执行的开始时间time的方法:

        /          设置任务下一次执行的开始时间time         /        private void setNextRunTime() {            // 周期时间            long p = period;            if (p > 0)                time += p;            else                time = triggerTime(-p);        }

我们发现当p大于0的时候,调消耗time += p,那么即可可以出现计算后的time可可以小于当前时间,由于任务执行时间超过了周期时间p,所以将这个任务增加到任务队列中,它就会立即执行。
而当p小于0的时候,调消耗time = triggerTime(-p)方法,就是在当前时间上,再加上-p的延时时间,所以这个任务增加到任务队列中,必需延时-p时间后,才可以执行。

reExecutePeriodic重新执行任务的方法:

    /      重新执行任务task     /    void reExecutePeriodic(RunnableScheduledFuture<?> task) {        // 判断当前线程池状态可以不可以运行任务        if (canRunInCurrentRunState(true)) {            // 将任务增加到任务队列,会根据任务延时时间进行排序            super.getQueue().add(task);            // 假如线程池状态改变了,当前状态不可以运行任务,那么就尝试移除任务,            // 移除成功,就取消任务。            if (!canRunInCurrentRunState(true) && remove(task))                task.cancel(false);            else                // 预先启动工作线程,确保线程池中有工作线程。                ensurePrestart();        }    }

这个方法与delayedExecute方法很像,都是将任务增加到任务队列中。

  1. 假如当前线程池状态可以够运行任务,那么任务增加到任务队列。
  2. 假如在在增加任务的过程中,线程池状态能否被别的线程更改了,那么就要进行判断,能否需要取消任务。
  3. 调消耗ensurePrestart()方法,预先启动工作线程,确保线程池中有工作线程。

3.4 其余重要方法

3.4.1 取消任务

      public boolean cancel(boolean mayInterruptIfRunning) {            // 调消耗父类的cancel方法取消任务            boolean cancelled = super.cancel(mayInterruptIfRunning);            // 将任务从任务队列中移除            if (cancelled && removeOnCancel && heapIndex >= 0)                remove(this);            return cancelled;        }

3.4.2 getDelay方法

        // 返回任务剩余延时时间        public long getDelay(TimeUnit unit) {            return unit.convert(time - now(), NANOSECONDS);        }

3.4.3 compareTo方法

        public int compareTo(Delayed other) {            if (other == this)                return 0;            // 假如是ScheduledFutureTask实例            if (other instanceof ScheduledFutureTask) {                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;                // 比较任务开始时间                long diff = time - x.time;                if (diff < 0)                    return -1;                else if (diff > 0)                    return 1;                // 比较那么任务是新创立的,由于新创立的sequenceNumber小一点。                else if (sequenceNumber < x.sequenceNumber)                    return -1;                else                    return 1;            }            // 比较任务的剩余延时时间            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;        }

3.4.4 isPeriodic方法

       /          能否是周期定时任务         /        public boolean isPeriodic() {            return period != 0;        }

四. 创立延时定时线程池

4.1 构造函数

    public ScheduledThreadPoolExecutor(int corePoolSize) {        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,              new DelayedWorkQueue());    }    public ScheduledThreadPoolExecutor(int corePoolSize,                                       ThreadFactory threadFactory) {        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,              new DelayedWorkQueue(), threadFactory);    }    public ScheduledThreadPoolExecutor(int corePoolSize,                                       RejectedExecutionHandler handler) {        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,              new DelayedWorkQueue(), handler);    }    public ScheduledThreadPoolExecutor(int corePoolSize,                                       ThreadFactory threadFactory,                                       RejectedExecutionHandler handler) {        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,              new DelayedWorkQueue(), threadFactory, handler);    }

都是调消耗父类ThreadPoolExecutor构造函数的方法,唯一要注意的地方就是任务队列只可以是DelayedWorkQueue实例,消耗户没有办法更换ScheduledThreadPoolExecutor的任务队列属性。

4.2 创立单个线程的定时线程池

    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {        return new DelegatedScheduledExecutorService            (new ScheduledThreadPoolExecutor(1));    }    public static ScheduledExecutorService newSingleThreadScheduledExecutor(                 ThreadFactory threadFactory) {        return new DelegatedScheduledExecutorService            (new ScheduledThreadPoolExecutor(1, threadFactory));    }

4.3 创立固定数量的定时线程池

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {        return new ScheduledThreadPoolExecutor(corePoolSize);    }    public static ScheduledExecutorService newScheduledThreadPool(            int corePoolSize, ThreadFactory threadFactory) {        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);    }
网友评论