/* | 
|
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. | 
|
 * | 
|
 * This code is free software; you can redistribute it and/or modify it | 
|
 * under the terms of the GNU General Public License version 2 only, as | 
|
 * published by the Free Software Foundation.  Oracle designates this | 
|
 * particular file as subject to the "Classpath" exception as provided | 
|
 * by Oracle in the LICENSE file that accompanied this code. | 
|
 * | 
|
 * This code is distributed in the hope that it will be useful, but WITHOUT | 
|
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | 
|
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License | 
|
 * version 2 for more details (a copy is included in the LICENSE file that | 
|
 * accompanied this code). | 
|
 * | 
|
 * You should have received a copy of the GNU General Public License version | 
|
 * 2 along with this work; if not, write to the Free Software Foundation, | 
|
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. | 
|
 * | 
|
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA | 
|
 * or visit www.oracle.com if you need additional information or have any | 
|
 * questions. | 
|
*/  | 
|
/*  | 
|
* This file is available under and governed by the GNU General Public  | 
|
* License version 2 only, as published by the Free Software Foundation.  | 
|
* However, the following notice accompanied the original version of this  | 
|
* file:  | 
|
*  | 
|
* Written by Doug Lea with assistance from members of JCP JSR-166  | 
|
* Expert Group and released to the public domain, as explained at  | 
|
* http://creativecommons.org/publicdomain/zero/1.0/  | 
|
*/  | 
|
package java.util.concurrent;  | 
|
import static java.util.concurrent.TimeUnit.NANOSECONDS;  | 
|
import java.util.concurrent.atomic.AtomicLong;  | 
|
import java.util.concurrent.locks.Condition;  | 
|
import java.util.concurrent.locks.ReentrantLock;  | 
|
import java.util.*;  | 
|
/** | 
|
 * A {@link ThreadPoolExecutor} that can additionally schedule | 
|
 * commands to run after a given delay, or to execute | 
|
 * periodically. This class is preferable to {@link java.util.Timer} | 
|
 * when multiple worker threads are needed, or when the additional | 
|
 * flexibility or capabilities of {@link ThreadPoolExecutor} (which | 
|
 * this class extends) are required. | 
|
 * | 
|
 * <p>Delayed tasks execute no sooner than they are enabled, but | 
|
 * without any real-time guarantees about when, after they are | 
|
 * enabled, they will commence. Tasks scheduled for exactly the same | 
|
 * execution time are enabled in first-in-first-out (FIFO) order of | 
|
 * submission. | 
|
 * | 
|
 * <p>When a submitted task is cancelled before it is run, execution | 
|
 * is suppressed. By default, such a cancelled task is not | 
|
 * automatically removed from the work queue until its delay | 
|
 * elapses. While this enables further inspection and monitoring, it | 
|
 * may also cause unbounded retention of cancelled tasks. To avoid | 
|
 * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which | 
|
 * causes tasks to be immediately removed from the work queue at | 
|
 * time of cancellation. | 
|
 * | 
|
 * <p>Successive executions of a task scheduled via | 
|
 * {@code scheduleAtFixedRate} or | 
|
 * {@code scheduleWithFixedDelay} do not overlap. While different | 
|
 * executions may be performed by different threads, the effects of | 
|
 * prior executions <a | 
|
 * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> | 
|
 * those of subsequent ones. | 
|
 * | 
|
 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few | 
|
 * of the inherited tuning methods are not useful for it. In | 
|
 * particular, because it acts as a fixed-sized pool using | 
|
 * {@code corePoolSize} threads and an unbounded queue, adjustments | 
|
 * to {@code maximumPoolSize} have no useful effect. Additionally, it | 
|
 * is almost never a good idea to set {@code corePoolSize} to zero or | 
|
 * use {@code allowCoreThreadTimeOut} because this may leave the pool | 
|
 * without threads to handle tasks once they become eligible to run. | 
|
 * | 
|
 * <p><b>Extension notes:</b> This class overrides the | 
|
 * {@link ThreadPoolExecutor#execute(Runnable) execute} and | 
|
 * {@link AbstractExecutorService#submit(Runnable) submit} | 
|
 * methods to generate internal {@link ScheduledFuture} objects to | 
|
 * control per-task delays and scheduling.  To preserve | 
|
 * functionality, any further overrides of these methods in | 
|
 * subclasses must invoke superclass versions, which effectively | 
|
 * disables additional task customization.  However, this class | 
|
 * provides alternative protected extension method | 
|
 * {@code decorateTask} (one version each for {@code Runnable} and | 
|
 * {@code Callable}) that can be used to customize the concrete task | 
|
 * types used to execute commands entered via {@code execute}, | 
|
 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate}, | 
|
 * and {@code scheduleWithFixedDelay}.  By default, a | 
|
 * {@code ScheduledThreadPoolExecutor} uses a task type extending | 
|
 * {@link FutureTask}. However, this may be modified or replaced using | 
|
 * subclasses of the form: | 
|
 * | 
|
 *  <pre> {@code | 
|
 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor { | 
|
 * | 
|
 *   static class CustomTask<V> implements RunnableScheduledFuture<V> { ... } | 
|
 * | 
|
 *   protected <V> RunnableScheduledFuture<V> decorateTask( | 
|
 *                Runnable r, RunnableScheduledFuture<V> task) { | 
|
 *       return new CustomTask<V>(r, task); | 
|
 *   } | 
|
 * | 
|
 *   protected <V> RunnableScheduledFuture<V> decorateTask( | 
|
 *                Callable<V> c, RunnableScheduledFuture<V> task) { | 
|
 *       return new CustomTask<V>(c, task); | 
|
 *   } | 
|
 *   // ... add constructors, etc. | 
|
 * }}</pre> | 
|
 * | 
|
 * @since 1.5 | 
|
 * @author Doug Lea | 
|
*/  | 
|
public class ScheduledThreadPoolExecutor  | 
|
extends ThreadPoolExecutor  | 
|
implements ScheduledExecutorService {  | 
|
/*  | 
|
* This class specializes ThreadPoolExecutor implementation by  | 
|
*  | 
|
* 1. Using a custom task type, ScheduledFutureTask for  | 
|
* tasks, even those that don't require scheduling (i.e.,  | 
|
* those submitted using ExecutorService execute, not  | 
|
* ScheduledExecutorService methods) which are treated as  | 
|
* delayed tasks with a delay of zero.  | 
|
*  | 
|
* 2. Using a custom queue (DelayedWorkQueue), a variant of  | 
|
* unbounded DelayQueue. The lack of capacity constraint and  | 
|
* the fact that corePoolSize and maximumPoolSize are  | 
|
* effectively identical simplifies some execution mechanics  | 
|
* (see delayedExecute) compared to ThreadPoolExecutor.  | 
|
*  | 
|
* 3. Supporting optional run-after-shutdown parameters, which  | 
|
* leads to overrides of shutdown methods to remove and cancel  | 
|
* tasks that should NOT be run after shutdown, as well as  | 
|
* different recheck logic when task (re)submission overlaps  | 
|
* with a shutdown.  | 
|
*  | 
|
* 4. Task decoration methods to allow interception and  | 
|
* instrumentation, which are needed because subclasses cannot  | 
|
* otherwise override submit methods to get this effect. These  | 
|
* don't have any impact on pool control logic though.  | 
|
*/  | 
|
    /** | 
|
     * False if should cancel/suppress periodic tasks on shutdown. | 
|
*/  | 
|
private volatile boolean continueExistingPeriodicTasksAfterShutdown;  | 
|
    /** | 
|
     * False if should cancel non-periodic tasks on shutdown. | 
|
*/  | 
|
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;  | 
|
    /** | 
|
     * True if ScheduledFutureTask.cancel should remove from queue | 
|
*/  | 
|
private volatile boolean removeOnCancel = false;  | 
|
    /** | 
|
     * Sequence number to break scheduling ties, and in turn to | 
|
     * guarantee FIFO order among tied entries. | 
|
*/  | 
|
private static final AtomicLong sequencer = new AtomicLong();  | 
|
    /** | 
|
     * Returns current nanosecond time. | 
|
*/  | 
|
    final long now() { | 
|
return System.nanoTime();  | 
|
}  | 
|
private class ScheduledFutureTask<V>  | 
|
extends FutureTask<V> implements RunnableScheduledFuture<V> {  | 
|
        /** Sequence number to break ties FIFO */ | 
|
private final long sequenceNumber;  | 
|
        /** The time the task is enabled to execute in nanoTime units */ | 
|
private long time;  | 
|
        /** | 
|
         * Period in nanoseconds for repeating tasks.  A positive | 
|
         * value indicates fixed-rate execution.  A negative value | 
|
         * indicates fixed-delay execution.  A value of 0 indicates a | 
|
         * non-repeating task. | 
|
*/  | 
|
private final long period;  | 
|
        /** The actual task to be re-enqueued by reExecutePeriodic */ | 
|
RunnableScheduledFuture<V> outerTask = this;  | 
|
        /** | 
|
         * Index into delay queue, to support faster cancellation. | 
|
*/  | 
|
int heapIndex;  | 
|
        /** | 
|
         * Creates a one-shot action with given nanoTime-based trigger time. | 
|
*/  | 
|
ScheduledFutureTask(Runnable r, V result, long ns) {  | 
|
super(r, result);  | 
|
this.time = ns;  | 
|
this.period = 0;  | 
|
this.sequenceNumber = sequencer.getAndIncrement();  | 
|
}  | 
|
        /** | 
|
         * Creates a periodic action with given nano time and period. | 
|
*/  | 
|
ScheduledFutureTask(Runnable r, V result, long ns, long period) {  | 
|
super(r, result);  | 
|
this.time = ns;  | 
|
this.period = period;  | 
|
this.sequenceNumber = sequencer.getAndIncrement();  | 
|
}  | 
|
        /** | 
|
         * Creates a one-shot action with given nanoTime-based trigger time. | 
|
*/  | 
|
ScheduledFutureTask(Callable<V> callable, long ns) {  | 
|
super(callable);  | 
|
this.time = ns;  | 
|
this.period = 0;  | 
|
this.sequenceNumber = sequencer.getAndIncrement();  | 
|
}  | 
|
public long getDelay(TimeUnit unit) {  | 
|
return unit.convert(time - now(), NANOSECONDS);  | 
|
}  | 
|
public int compareTo(Delayed other) {  | 
|
if (other == this) // compare zero if same object  | 
|
return 0;  | 
|
if (other instanceof ScheduledFutureTask) {  | 
|
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;  | 
|
long diff = time - x.time;  | 
|
if (diff < 0)  | 
|
return -1;  | 
|
else if (diff > 0)  | 
|
return 1;  | 
|
else if (sequenceNumber < x.sequenceNumber)  | 
|
return -1;  | 
|
else  | 
|
return 1;  | 
|
}  | 
|
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);  | 
|
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;  | 
|
}  | 
|
        /** | 
|
         * Returns {@code true} if this is a periodic (not a one-shot) action. | 
|
         * | 
|
         * @return {@code true} if periodic | 
|
*/  | 
|
        public boolean isPeriodic() { | 
|
return period != 0;  | 
|
}  | 
|
        /** | 
|
         * Sets the next time to run for a periodic task. | 
|
*/  | 
|
        private void setNextRunTime() { | 
|
long p = period;  | 
|
if (p > 0)  | 
|
time += p;  | 
|
else  | 
|
time = triggerTime(-p);  | 
|
}  | 
|
        public boolean cancel(boolean mayInterruptIfRunning) { | 
|
boolean cancelled = super.cancel(mayInterruptIfRunning);  | 
|
if (cancelled && removeOnCancel && heapIndex >= 0)  | 
|
remove(this);  | 
|
return cancelled;  | 
|
}  | 
|
        /** | 
|
         * Overrides FutureTask version so as to reset/requeue if periodic. | 
|
*/  | 
|
        public void run() { | 
|
boolean periodic = isPeriodic();  | 
|
if (!canRunInCurrentRunState(periodic))  | 
|
cancel(false);  | 
|
else if (!periodic)  | 
|
ScheduledFutureTask.super.run();  | 
|
else if (ScheduledFutureTask.super.runAndReset()) {  | 
|
setNextRunTime();  | 
|
reExecutePeriodic(outerTask);  | 
|
}  | 
|
}  | 
|
}  | 
|
    /** | 
|
     * Returns true if can run a task given current run state | 
|
     * and run-after-shutdown parameters. | 
|
     * | 
|
     * @param periodic true if this task periodic, false if delayed | 
|
*/  | 
|
    boolean canRunInCurrentRunState(boolean periodic) { | 
|
return isRunningOrShutdown(periodic ?  | 
|
continueExistingPeriodicTasksAfterShutdown :  | 
|
executeExistingDelayedTasksAfterShutdown);  | 
|
}  | 
|
    /** | 
|
     * Main execution method for delayed or periodic tasks.  If pool | 
|
     * is shut down, rejects the task. Otherwise adds task to queue | 
|
     * and starts a thread, if necessary, to run it.  (We cannot | 
|
     * prestart the thread to run the task because the task (probably) | 
|
     * shouldn't be run yet.)  If the pool is shut down while the task | 
|
     * is being added, cancel and remove it if required by state and | 
|
     * run-after-shutdown parameters. | 
|
     * | 
|
     * @param task the task | 
|
*/  | 
|
private void delayedExecute(RunnableScheduledFuture<?> task) {  | 
|
if (isShutdown())  | 
|
reject(task);  | 
|
        else { | 
|
super.getQueue().add(task);  | 
|
if (isShutdown() &&  | 
|
!canRunInCurrentRunState(task.isPeriodic()) &&  | 
|
remove(task))  | 
|
task.cancel(false);  | 
|
else  | 
|
ensurePrestart();  | 
|
}  | 
|
}  | 
|
    /** | 
|
     * Requeues a periodic task unless current run state precludes it. | 
|
     * Same idea as delayedExecute except drops task rather than rejecting. | 
|
     * | 
|
     * @param task the task | 
|
*/  | 
|
void reExecutePeriodic(RunnableScheduledFuture<?> task) {  | 
|
if (canRunInCurrentRunState(true)) {  | 
|
super.getQueue().add(task);  | 
|
if (!canRunInCurrentRunState(true) && remove(task))  | 
|
task.cancel(false);  | 
|
else  | 
|
ensurePrestart();  | 
|
}  | 
|
}  | 
|
    /** | 
|
     * Cancels and clears the queue of all tasks that should not be run | 
|
     * due to shutdown policy.  Invoked within super.shutdown. | 
|
*/  | 
|
    @Override void onShutdown() { | 
|
BlockingQueue<Runnable> q = super.getQueue();  | 
|
boolean keepDelayed =  | 
|
getExecuteExistingDelayedTasksAfterShutdownPolicy();  | 
|
boolean keepPeriodic =  | 
|
getContinueExistingPeriodicTasksAfterShutdownPolicy();  | 
|
if (!keepDelayed && !keepPeriodic) {  | 
|
for (Object e : q.toArray())  | 
|
if (e instanceof RunnableScheduledFuture<?>)  | 
|
((RunnableScheduledFuture<?>) e).cancel(false);  | 
|
q.clear();  | 
|
}  | 
|
        else { | 
|
            // Traverse snapshot to avoid iterator exceptions | 
|
for (Object e : q.toArray()) {  | 
|
if (e instanceof RunnableScheduledFuture) {  | 
|
RunnableScheduledFuture<?> t =  | 
|
(RunnableScheduledFuture<?>)e;  | 
|
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||  | 
|
t.isCancelled()) { // also remove if already cancelled  | 
|
if (q.remove(t))  | 
|
t.cancel(false);  | 
|
}  | 
|
}  | 
|
}  | 
|
}  | 
|
tryTerminate();  | 
|
}  | 
|
    /** | 
|
     * Modifies or replaces the task used to execute a runnable. | 
|
     * This method can be used to override the concrete | 
|
     * class used for managing internal tasks. | 
|
     * The default implementation simply returns the given task. | 
|
     * | 
|
     * @param runnable the submitted Runnable | 
|
     * @param task the task created to execute the runnable | 
|
     * @param <V> the type of the task's result | 
|
     * @return a task that can execute the runnable | 
|
     * @since 1.6 | 
|
*/  | 
|
protected <V> RunnableScheduledFuture<V> decorateTask(  | 
|
Runnable runnable, RunnableScheduledFuture<V> task) {  | 
|
return task;  | 
|
}  | 
|
    /** | 
|
     * Modifies or replaces the task used to execute a callable. | 
|
     * This method can be used to override the concrete | 
|
     * class used for managing internal tasks. | 
|
     * The default implementation simply returns the given task. | 
|
     * | 
|
     * @param callable the submitted Callable | 
|
     * @param task the task created to execute the callable | 
|
     * @param <V> the type of the task's result | 
|
     * @return a task that can execute the callable | 
|
     * @since 1.6 | 
|
*/  | 
|
protected <V> RunnableScheduledFuture<V> decorateTask(  | 
|
Callable<V> callable, RunnableScheduledFuture<V> task) {  | 
|
return task;  | 
|
}  | 
|
    /** | 
|
     * Creates a new {@code ScheduledThreadPoolExecutor} with the | 
|
     * given core pool size. | 
|
     * | 
|
     * @param corePoolSize the number of threads to keep in the pool, even | 
|
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set | 
|
     * @throws IllegalArgumentException if {@code corePoolSize < 0} | 
|
*/  | 
|
    public ScheduledThreadPoolExecutor(int corePoolSize) { | 
|
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,  | 
|
new DelayedWorkQueue());  | 
|
}  | 
|
    /** | 
|
     * Creates a new {@code ScheduledThreadPoolExecutor} with the | 
|
     * given initial parameters. | 
|
     * | 
|
     * @param corePoolSize the number of threads to keep in the pool, even | 
|
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set | 
|
     * @param threadFactory the factory to use when the executor | 
|
     *        creates a new thread | 
|
     * @throws IllegalArgumentException if {@code corePoolSize < 0} | 
|
     * @throws NullPointerException if {@code threadFactory} is null | 
|
*/  | 
|
public ScheduledThreadPoolExecutor(int corePoolSize,  | 
|
ThreadFactory threadFactory) {  | 
|
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,  | 
|
new DelayedWorkQueue(), threadFactory);  | 
|
}  | 
|
    /** | 
|
     * Creates a new ScheduledThreadPoolExecutor with the given | 
|
     * initial parameters. | 
|
     * | 
|
     * @param corePoolSize the number of threads to keep in the pool, even | 
|
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set | 
|
     * @param handler the handler to use when execution is blocked | 
|
     *        because the thread bounds and queue capacities are reached | 
|
     * @throws IllegalArgumentException if {@code corePoolSize < 0} | 
|
     * @throws NullPointerException if {@code handler} is null | 
|
*/  | 
|
public ScheduledThreadPoolExecutor(int corePoolSize,  | 
|
RejectedExecutionHandler handler) {  | 
|
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,  | 
|
new DelayedWorkQueue(), handler);  | 
|
}  | 
|
    /** | 
|
     * Creates a new ScheduledThreadPoolExecutor with the given | 
|
     * initial parameters. | 
|
     * | 
|
     * @param corePoolSize the number of threads to keep in the pool, even | 
|
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set | 
|
     * @param threadFactory the factory to use when the executor | 
|
     *        creates a new thread | 
|
     * @param handler the handler to use when execution is blocked | 
|
     *        because the thread bounds and queue capacities are reached | 
|
     * @throws IllegalArgumentException if {@code corePoolSize < 0} | 
|
     * @throws NullPointerException if {@code threadFactory} or | 
|
     *         {@code handler} is null | 
|
*/  | 
|
public ScheduledThreadPoolExecutor(int corePoolSize,  | 
|
ThreadFactory threadFactory,  | 
|
RejectedExecutionHandler handler) {  | 
|
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,  | 
|
new DelayedWorkQueue(), threadFactory, handler);  | 
|
}  | 
|
    /** | 
|
     * Returns the trigger time of a delayed action. | 
|
*/  | 
|
private long triggerTime(long delay, TimeUnit unit) {  | 
|
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));  | 
|
}  | 
|
    /** | 
|
     * Returns the trigger time of a delayed action. | 
|
*/  | 
|
    long triggerTime(long delay) { | 
|
return now() +  | 
|
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));  | 
|
}  | 
|
    /** | 
|
     * Constrains the values of all delays in the queue to be within | 
|
     * Long.MAX_VALUE of each other, to avoid overflow in compareTo. | 
|
     * This may occur if a task is eligible to be dequeued, but has | 
|
     * not yet been, while some other task is added with a delay of | 
|
     * Long.MAX_VALUE. | 
|
*/  | 
|
    private long overflowFree(long delay) { | 
|
Delayed head = (Delayed) super.getQueue().peek();  | 
|
if (head != null) {  | 
|
long headDelay = head.getDelay(NANOSECONDS);  | 
|
if (headDelay < 0 && (delay - headDelay < 0))  | 
|
delay = Long.MAX_VALUE + headDelay;  | 
|
}  | 
|
return delay;  | 
|
}  | 
|
    /** | 
|
     * @throws RejectedExecutionException {@inheritDoc} | 
|
     * @throws NullPointerException       {@inheritDoc} | 
|
*/  | 
|
public ScheduledFuture<?> schedule(Runnable command,  | 
|
long delay,  | 
|
TimeUnit unit) {  | 
|
if (command == null || unit == null)  | 
|
throw new NullPointerException();  | 
|
RunnableScheduledFuture<?> t = decorateTask(command,  | 
|
new ScheduledFutureTask<Void>(command, null,  | 
|
triggerTime(delay, unit)));  | 
|
delayedExecute(t);  | 
|
return t;  | 
|
}  | 
|
    /** | 
|
     * @throws RejectedExecutionException {@inheritDoc} | 
|
     * @throws NullPointerException       {@inheritDoc} | 
|
*/  | 
|
public <V> ScheduledFuture<V> schedule(Callable<V> callable,  | 
|
long delay,  | 
|
TimeUnit unit) {  | 
|
if (callable == null || unit == null)  | 
|
throw new NullPointerException();  | 
|
RunnableScheduledFuture<V> t = decorateTask(callable,  | 
|
new ScheduledFutureTask<V>(callable,  | 
|
triggerTime(delay, unit)));  | 
|
delayedExecute(t);  | 
|
return t;  | 
|
}  | 
|
    /** | 
|
     * @throws RejectedExecutionException {@inheritDoc} | 
|
     * @throws NullPointerException       {@inheritDoc} | 
|
     * @throws IllegalArgumentException   {@inheritDoc} | 
|
*/  | 
|
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,  | 
|
long initialDelay,  | 
|
long period,  | 
|
TimeUnit unit) {  | 
|
if (command == null || unit == null)  | 
|
throw new NullPointerException();  | 
|
if (period <= 0)  | 
|
throw new IllegalArgumentException();  | 
|
ScheduledFutureTask<Void> sft =  | 
|
new ScheduledFutureTask<Void>(command,  | 
|
null,  | 
|
triggerTime(initialDelay, unit),  | 
|
unit.toNanos(period));  | 
|
RunnableScheduledFuture<Void> t = decorateTask(command, sft);  | 
|
sft.outerTask = t;  | 
|
delayedExecute(t);  | 
|
return t;  | 
|
}  | 
|
    /** | 
|
     * @throws RejectedExecutionException {@inheritDoc} | 
|
     * @throws NullPointerException       {@inheritDoc} | 
|
     * @throws IllegalArgumentException   {@inheritDoc} | 
|
*/  | 
|
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,  | 
|
long initialDelay,  | 
|
long delay,  | 
|
TimeUnit unit) {  | 
|
if (command == null || unit == null)  | 
|
throw new NullPointerException();  | 
|
if (delay <= 0)  | 
|
throw new IllegalArgumentException();  | 
|
ScheduledFutureTask<Void> sft =  | 
|
new ScheduledFutureTask<Void>(command,  | 
|
null,  | 
|
triggerTime(initialDelay, unit),  | 
|
unit.toNanos(-delay));  | 
|
RunnableScheduledFuture<Void> t = decorateTask(command, sft);  | 
|
sft.outerTask = t;  | 
|
delayedExecute(t);  | 
|
return t;  | 
|
}  | 
|
    /** | 
|
     * Executes {@code command} with zero required delay. | 
|
     * This has effect equivalent to | 
|
     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}. | 
|
     * Note that inspections of the queue and of the list returned by | 
|
     * {@code shutdownNow} will access the zero-delayed | 
|
     * {@link ScheduledFuture}, not the {@code command} itself. | 
|
     * | 
|
     * <p>A consequence of the use of {@code ScheduledFuture} objects is | 
|
     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always | 
|
     * called with a null second {@code Throwable} argument, even if the | 
|
     * {@code command} terminated abruptly.  Instead, the {@code Throwable} | 
|
     * thrown by such a task can be obtained via {@link Future#get}. | 
|
     * | 
|
     * @throws RejectedExecutionException at discretion of | 
|
     *         {@code RejectedExecutionHandler}, if the task | 
|
     *         cannot be accepted for execution because the | 
|
     *         executor has been shut down | 
|
     * @throws NullPointerException {@inheritDoc} | 
|
*/  | 
|
public void execute(Runnable command) {  | 
|
schedule(command, 0, NANOSECONDS);  | 
|
}  | 
|
// Override AbstractExecutorService methods  | 
|
    /** | 
|
     * @throws RejectedExecutionException {@inheritDoc} | 
|
     * @throws NullPointerException       {@inheritDoc} | 
|
*/  | 
|
public Future<?> submit(Runnable task) {  | 
|
return schedule(task, 0, NANOSECONDS);  | 
|
}  | 
|
    /** | 
|
     * @throws RejectedExecutionException {@inheritDoc} | 
|
     * @throws NullPointerException       {@inheritDoc} | 
|
*/  | 
|
public <T> Future<T> submit(Runnable task, T result) {  | 
|
return schedule(Executors.callable(task, result), 0, NANOSECONDS);  | 
|
}  | 
|
    /** | 
|
     * @throws RejectedExecutionException {@inheritDoc} | 
|
     * @throws NullPointerException       {@inheritDoc} | 
|
*/  | 
|
public <T> Future<T> submit(Callable<T> task) {  | 
|
return schedule(task, 0, NANOSECONDS);  | 
|
}  | 
|
    /** | 
|
     * Sets the policy on whether to continue executing existing | 
|
     * periodic tasks even when this executor has been {@code shutdown}. | 
|
     * In this case, these tasks will only terminate upon | 
|
     * {@code shutdownNow} or after setting the policy to | 
|
     * {@code false} when already shutdown. | 
|
     * This value is by default {@code false}. | 
|
     * | 
|
     * @param value if {@code true}, continue after shutdown, else don't | 
|
     * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy | 
|
*/  | 
|
    public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) { | 
|
continueExistingPeriodicTasksAfterShutdown = value;  | 
|
if (!value && isShutdown())  | 
|
onShutdown();  | 
|
}  | 
|
    /** | 
|
     * Gets the policy on whether to continue executing existing | 
|
     * periodic tasks even when this executor has been {@code shutdown}. | 
|
     * In this case, these tasks will only terminate upon | 
|
     * {@code shutdownNow} or after setting the policy to | 
|
     * {@code false} when already shutdown. | 
|
     * This value is by default {@code false}. | 
|
     * | 
|
     * @return {@code true} if will continue after shutdown | 
|
     * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy | 
|
*/  | 
|
    public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() { | 
|
return continueExistingPeriodicTasksAfterShutdown;  | 
|
}  | 
|
    /** | 
|
     * Sets the policy on whether to execute existing delayed | 
|
     * tasks even when this executor has been {@code shutdown}. | 
|
     * In this case, these tasks will only terminate upon | 
|
     * {@code shutdownNow}, or after setting the policy to | 
|
     * {@code false} when already shutdown. | 
|
     * This value is by default {@code true}. | 
|
     * | 
|
     * @param value if {@code true}, execute after shutdown, else don't | 
|
     * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy | 
|
*/  | 
|
    public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) { | 
|
executeExistingDelayedTasksAfterShutdown = value;  | 
|
if (!value && isShutdown())  | 
|
onShutdown();  | 
|
}  | 
|
    /** | 
|
     * Gets the policy on whether to execute existing delayed | 
|
     * tasks even when this executor has been {@code shutdown}. | 
|
     * In this case, these tasks will only terminate upon | 
|
     * {@code shutdownNow}, or after setting the policy to | 
|
     * {@code false} when already shutdown. | 
|
     * This value is by default {@code true}. | 
|
     * | 
|
     * @return {@code true} if will execute after shutdown | 
|
     * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy | 
|
*/  | 
|
    public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() { | 
|
return executeExistingDelayedTasksAfterShutdown;  | 
|
}  | 
|
    /** | 
|
     * Sets the policy on whether cancelled tasks should be immediately | 
|
     * removed from the work queue at time of cancellation.  This value is | 
|
     * by default {@code false}. | 
|
     * | 
|
     * @param value if {@code true}, remove on cancellation, else don't | 
|
     * @see #getRemoveOnCancelPolicy | 
|
     * @since 1.7 | 
|
*/  | 
|
    public void setRemoveOnCancelPolicy(boolean value) { | 
|
removeOnCancel = value;  | 
|
}  | 
|
    /** | 
|
     * Gets the policy on whether cancelled tasks should be immediately | 
|
     * removed from the work queue at time of cancellation.  This value is | 
|
     * by default {@code false}. | 
|
     * | 
|
     * @return {@code true} if cancelled tasks are immediately removed | 
|
     *         from the queue | 
|
     * @see #setRemoveOnCancelPolicy | 
|
     * @since 1.7 | 
|
*/  | 
|
    public boolean getRemoveOnCancelPolicy() { | 
|
return removeOnCancel;  | 
|
}  | 
|
    /** | 
|
     * Initiates an orderly shutdown in which previously submitted | 
|
     * tasks are executed, but no new tasks will be accepted. | 
|
     * Invocation has no additional effect if already shut down. | 
|
     * | 
|
     * <p>This method does not wait for previously submitted tasks to | 
|
     * complete execution.  Use {@link #awaitTermination awaitTermination} | 
|
     * to do that. | 
|
     * | 
|
     * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} | 
|
     * has been set {@code false}, existing delayed tasks whose delays | 
|
     * have not yet elapsed are cancelled.  And unless the {@code | 
|
     * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set | 
|
     * {@code true}, future executions of existing periodic tasks will | 
|
     * be cancelled. | 
|
     * | 
|
     * @throws SecurityException {@inheritDoc} | 
|
*/  | 
|
    public void shutdown() { | 
|
super.shutdown();  | 
|
}  | 
|
    /** | 
|
     * Attempts to stop all actively executing tasks, halts the | 
|
     * processing of waiting tasks, and returns a list of the tasks | 
|
     * that were awaiting execution. | 
|
     * | 
|
     * <p>This method does not wait for actively executing tasks to | 
|
     * terminate.  Use {@link #awaitTermination awaitTermination} to | 
|
     * do that. | 
|
     * | 
|
     * <p>There are no guarantees beyond best-effort attempts to stop | 
|
     * processing actively executing tasks.  This implementation | 
|
     * cancels tasks via {@link Thread#interrupt}, so any task that | 
|
     * fails to respond to interrupts may never terminate. | 
|
     * | 
|
     * @return list of tasks that never commenced execution. | 
|
     *         Each element of this list is a {@link ScheduledFuture}, | 
|
     *         including those tasks submitted using {@code execute}, | 
|
     *         which are for scheduling purposes used as the basis of a | 
|
     *         zero-delay {@code ScheduledFuture}. | 
|
     * @throws SecurityException {@inheritDoc} | 
|
*/  | 
|
public List<Runnable> shutdownNow() {  | 
|
return super.shutdownNow();  | 
|
}  | 
|
    /** | 
|
     * Returns the task queue used by this executor.  Each element of | 
|
     * this queue is a {@link ScheduledFuture}, including those | 
|
     * tasks submitted using {@code execute} which are for scheduling | 
|
     * purposes used as the basis of a zero-delay | 
|
     * {@code ScheduledFuture}.  Iteration over this queue is | 
|
     * <em>not</em> guaranteed to traverse tasks in the order in | 
|
     * which they will execute. | 
|
     * | 
|
     * @return the task queue | 
|
*/  | 
|
public BlockingQueue<Runnable> getQueue() {  | 
|
return super.getQueue();  | 
|
}  | 
|
    /** | 
|
     * Specialized delay queue. To mesh with TPE declarations, this | 
|
     * class must be declared as a BlockingQueue<Runnable> even though | 
|
     * it can only hold RunnableScheduledFutures. | 
|
*/  | 
|
static class DelayedWorkQueue extends AbstractQueue<Runnable>  | 
|
implements BlockingQueue<Runnable> {  | 
|
/*  | 
|
* A DelayedWorkQueue is based on a heap-based data structure  | 
|
* like those in DelayQueue and PriorityQueue, except that  | 
|
* every ScheduledFutureTask also records its index into the  | 
|
* heap array. This eliminates the need to find a task upon  | 
|
* cancellation, greatly speeding up removal (down from O(n)  | 
|
* to O(log n)), and reducing garbage retention that would  | 
|
* otherwise occur by waiting for the element to rise to top  | 
|
* before clearing. But because the queue may also hold  | 
|
* RunnableScheduledFutures that are not ScheduledFutureTasks,  | 
|
* we are not guaranteed to have such indices available, in  | 
|
* which case we fall back to linear search. (We expect that  | 
|
* most tasks will not be decorated, and that the faster cases  | 
|
* will be much more common.)  | 
|
*  | 
|
* All heap operations must record index changes -- mainly  | 
|
* within siftUp and siftDown. Upon removal, a task's  | 
|
* heapIndex is set to -1. Note that ScheduledFutureTasks can  | 
|
* appear at most once in the queue (this need not be true for  | 
|
* other kinds of tasks or work queues), so are uniquely  | 
|
* identified by heapIndex.  | 
|
*/  | 
|
private static final int INITIAL_CAPACITY = 16;  | 
|
private RunnableScheduledFuture<?>[] queue =  | 
|
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];  | 
|
private final ReentrantLock lock = new ReentrantLock();  | 
|
private int size = 0;  | 
|
        /** | 
|
         * Thread designated to wait for the task at the head of the | 
|
         * queue.  This variant of the Leader-Follower pattern | 
|
         * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to | 
|
         * minimize unnecessary timed waiting.  When a thread becomes | 
|
         * the leader, it waits only for the next delay to elapse, but | 
|
         * other threads await indefinitely.  The leader thread must | 
|
         * signal some other thread before returning from take() or | 
|
         * poll(...), unless some other thread becomes leader in the | 
|
         * interim.  Whenever the head of the queue is replaced with a | 
|
         * task with an earlier expiration time, the leader field is | 
|
         * invalidated by being reset to null, and some waiting | 
|
         * thread, but not necessarily the current leader, is | 
|
         * signalled.  So waiting threads must be prepared to acquire | 
|
         * and lose leadership while waiting. | 
|
*/  | 
|
private Thread leader = null;  | 
|
        /** | 
|
         * Condition signalled when a newer task becomes available at the | 
|
         * head of the queue or a new thread may need to become leader. | 
|
*/  | 
|
private final Condition available = lock.newCondition();  | 
|
        /** | 
|
         * Sets f's heapIndex if it is a ScheduledFutureTask. | 
|
*/  | 
|
private void setIndex(RunnableScheduledFuture<?> f, int idx) {  | 
|
if (f instanceof ScheduledFutureTask)  | 
|
((ScheduledFutureTask)f).heapIndex = idx;  | 
|
}  | 
|
        /** | 
|
         * Sifts element added at bottom up to its heap-ordered spot. | 
|
         * Call only when holding lock. | 
|
*/  | 
|
private void siftUp(int k, RunnableScheduledFuture<?> key) {  | 
|
while (k > 0) {  | 
|
int parent = (k - 1) >>> 1;  | 
|
RunnableScheduledFuture<?> e = queue[parent];  | 
|
if (key.compareTo(e) >= 0)  | 
|
break;  | 
|
queue[k] = e;  | 
|
setIndex(e, k);  | 
|
k = parent;  | 
|
}  | 
|
queue[k] = key;  | 
|
setIndex(key, k);  | 
|
}  | 
|
        /** | 
|
         * Sifts element added at top down to its heap-ordered spot. | 
|
         * Call only when holding lock. | 
|
*/  | 
|
private void siftDown(int k, RunnableScheduledFuture<?> key) {  | 
|
int half = size >>> 1;  | 
|
while (k < half) {  | 
|
int child = (k << 1) + 1;  | 
|
RunnableScheduledFuture<?> c = queue[child];  | 
|
int right = child + 1;  | 
|
if (right < size && c.compareTo(queue[right]) > 0)  | 
|
c = queue[child = right];  | 
|
if (key.compareTo(c) <= 0)  | 
|
break;  | 
|
queue[k] = c;  | 
|
setIndex(c, k);  | 
|
k = child;  | 
|
}  | 
|
queue[k] = key;  | 
|
setIndex(key, k);  | 
|
}  | 
|
        /** | 
|
         * Resizes the heap array.  Call only when holding lock. | 
|
*/  | 
|
        private void grow() { | 
|
int oldCapacity = queue.length;  | 
|
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%  | 
|
if (newCapacity < 0) // overflow  | 
|
newCapacity = Integer.MAX_VALUE;  | 
|
queue = Arrays.copyOf(queue, newCapacity);  | 
|
}  | 
|
        /** | 
|
         * Finds index of given object, or -1 if absent. | 
|
*/  | 
|
private int indexOf(Object x) {  | 
|
if (x != null) {  | 
|
if (x instanceof ScheduledFutureTask) {  | 
|
int i = ((ScheduledFutureTask) x).heapIndex;  | 
|
// Sanity check; x could conceivably be a  | 
|
                    // ScheduledFutureTask from some other pool. | 
|
if (i >= 0 && i < size && queue[i] == x)  | 
|
return i;  | 
|
                } else { | 
|
for (int i = 0; i < size; i++)  | 
|
if (x.equals(queue[i]))  | 
|
return i;  | 
|
}  | 
|
}  | 
|
return -1;  | 
|
}  | 
|
public boolean contains(Object x) {  | 
|
final ReentrantLock lock = this.lock;  | 
|
lock.lock();  | 
|
            try { | 
|
return indexOf(x) != -1;  | 
|
            } finally { | 
|
lock.unlock();  | 
|
}  | 
|
}  | 
|
public boolean remove(Object x) {  | 
|
final ReentrantLock lock = this.lock;  | 
|
lock.lock();  | 
|
            try { | 
|
int i = indexOf(x);  | 
|
if (i < 0)  | 
|
return false;  | 
|
setIndex(queue[i], -1);  | 
|
int s = --size;  | 
|
RunnableScheduledFuture<?> replacement = queue[s];  | 
|
queue[s] = null;  | 
|
if (s != i) {  | 
|
siftDown(i, replacement);  | 
|
if (queue[i] == replacement)  | 
|
siftUp(i, replacement);  | 
|
}  | 
|
return true;  | 
|
            } finally { | 
|
lock.unlock();  | 
|
}  | 
|
}  | 
|
        public int size() { | 
|
final ReentrantLock lock = this.lock;  | 
|
lock.lock();  | 
|
            try { | 
|
return size;  | 
|
            } finally { | 
|
lock.unlock();  | 
|
}  | 
|
}  | 
|
        public boolean isEmpty() { | 
|
return size() == 0;  | 
|
}  | 
|
        public int remainingCapacity() { | 
|
return Integer.MAX_VALUE;  | 
|
}  | 
|
public RunnableScheduledFuture<?> peek() {  | 
|
final ReentrantLock lock = this.lock;  | 
|
lock.lock();  | 
|
            try { | 
|
return queue[0];  | 
|
            } finally { | 
|
lock.unlock();  | 
|
}  | 
|
}  | 
|
public boolean offer(Runnable x) {  | 
|
if (x == null)  | 
|
throw new NullPointerException();  | 
|
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;  | 
|
final ReentrantLock lock = this.lock;  | 
|
lock.lock();  | 
|
            try { | 
|
int i = size;  | 
|
if (i >= queue.length)  | 
|
grow();  | 
|
size = i + 1;  | 
|
if (i == 0) {  | 
|
queue[0] = e;  | 
|
setIndex(e, 0);  | 
|
                } else { | 
|
siftUp(i, e);  | 
|
}  | 
|
if (queue[0] == e) {  | 
|
leader = null;  | 
|
available.signal();  | 
|
}  | 
|
            } finally { | 
|
lock.unlock();  | 
|
}  | 
|
return true;  | 
|
}  | 
|
public void put(Runnable e) {  | 
|
offer(e);  | 
|
}  | 
|
public boolean add(Runnable e) {  | 
|
return offer(e);  | 
|
}  | 
|
public boolean offer(Runnable e, long timeout, TimeUnit unit) {  | 
|
return offer(e);  | 
|
}  | 
|
        /** | 
|
         * Performs common bookkeeping for poll and take: Replaces | 
|
         * first element with last and sifts it down.  Call only when | 
|
         * holding lock. | 
|
         * @param f the task to remove and return | 
|
*/  | 
|
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {  | 
|
int s = --size;  | 
|
RunnableScheduledFuture<?> x = queue[s];  | 
|
queue[s] = null;  | 
|
if (s != 0)  | 
|
siftDown(0, x);  | 
|
setIndex(f, -1);  | 
|
return f;  | 
|
}  | 
|
public RunnableScheduledFuture<?> poll() {  | 
|
final ReentrantLock lock = this.lock;  | 
|
lock.lock();  | 
|
            try { | 
|
RunnableScheduledFuture<?> first = queue[0];  | 
|
if (first == null || first.getDelay(NANOSECONDS) > 0)  | 
|
return null;  | 
|
else  | 
|
return finishPoll(first);  | 
|
            } finally { | 
|
lock.unlock();  | 
|
}  | 
|
}  | 
|
public RunnableScheduledFuture<?> take() throws InterruptedException {  | 
|
final ReentrantLock lock = this.lock;  | 
|
lock.lockInterruptibly();  | 
|
            try { | 
|
                for (;;) { | 
|
RunnableScheduledFuture<?> first = queue[0];  | 
|
if (first == null)  | 
|
available.await();  | 
|
                    else { | 
|
long delay = first.getDelay(NANOSECONDS);  | 
|
if (delay <= 0)  | 
|
return finishPoll(first);  | 
|
first = null; // don't retain ref while waiting  | 
|
if (leader != null)  | 
|
available.await();  | 
|
                        else { | 
|
Thread thisThread = Thread.currentThread();  | 
|
leader = thisThread;  | 
|
                            try { | 
|
available.awaitNanos(delay);  | 
|
                            } finally { | 
|
if (leader == thisThread)  | 
|
leader = null;  | 
|
}  | 
|
}  | 
|
}  | 
|
}  | 
|
            } finally { | 
|
if (leader == null && queue[0] != null)  | 
|
available.signal();  | 
|
lock.unlock();  | 
|
}  | 
|
}  | 
|
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)  | 
|
throws InterruptedException {  | 
|
long nanos = unit.toNanos(timeout);  | 
|
final ReentrantLock lock = this.lock;  | 
|
lock.lockInterruptibly();  | 
|
            try { | 
|
                for (;;) { | 
|
RunnableScheduledFuture<?> first = queue[0];  | 
|
if (first == null) {  | 
|
if (nanos <= 0)  | 
|
return null;  | 
|
else  | 
|
nanos = available.awaitNanos(nanos);  | 
|
                    } else { | 
|
long delay = first.getDelay(NANOSECONDS);  | 
|
if (delay <= 0)  | 
|
return finishPoll(first);  | 
|
if (nanos <= 0)  | 
|
return null;  | 
|
first = null; // don't retain ref while waiting  | 
|
if (nanos < delay || leader != null)  | 
|
nanos = available.awaitNanos(nanos);  | 
|
                        else { | 
|
Thread thisThread = Thread.currentThread();  | 
|
leader = thisThread;  | 
|
                            try { | 
|
long timeLeft = available.awaitNanos(delay);  | 
|
nanos -= delay - timeLeft;  | 
|
                            } finally { | 
|
if (leader == thisThread)  | 
|
leader = null;  | 
|
}  | 
|
}  | 
|
}  | 
|
}  | 
|
            } finally { | 
|
if (leader == null && queue[0] != null)  | 
|
available.signal();  | 
|
lock.unlock();  | 
|
}  | 
|
}  | 
|
        public void clear() { | 
|
final ReentrantLock lock = this.lock;  | 
|
lock.lock();  | 
|
            try { | 
|
for (int i = 0; i < size; i++) {  | 
|
RunnableScheduledFuture<?> t = queue[i];  | 
|
if (t != null) {  | 
|
queue[i] = null;  | 
|
setIndex(t, -1);  | 
|
}  | 
|
}  | 
|
size = 0;  | 
|
            } finally { | 
|
lock.unlock();  | 
|
}  | 
|
}  | 
|
        /** | 
|
         * Returns first element only if it is expired. | 
|
         * Used only by drainTo.  Call only when holding lock. | 
|
*/  | 
|
private RunnableScheduledFuture<?> peekExpired() {  | 
|
            // assert lock.isHeldByCurrentThread(); | 
|
RunnableScheduledFuture<?> first = queue[0];  | 
|
return (first == null || first.getDelay(NANOSECONDS) > 0) ?  | 
|
null : first;  | 
|
}  | 
|
public int drainTo(Collection<? super Runnable> c) {  | 
|
if (c == null)  | 
|
throw new NullPointerException();  | 
|
if (c == this)  | 
|
throw new IllegalArgumentException();  | 
|
final ReentrantLock lock = this.lock;  | 
|
lock.lock();  | 
|
            try { | 
|
RunnableScheduledFuture<?> first;  | 
|
int n = 0;  | 
|
while ((first = peekExpired()) != null) {  | 
|
c.add(first); // In this order, in case add() throws.  | 
|
finishPoll(first);  | 
|
++n;  | 
|
}  | 
|
return n;  | 
|
            } finally { | 
|
lock.unlock();  | 
|
}  | 
|
}  | 
|
public int drainTo(Collection<? super Runnable> c, int maxElements) {  | 
|
if (c == null)  | 
|
throw new NullPointerException();  | 
|
if (c == this)  | 
|
throw new IllegalArgumentException();  | 
|
if (maxElements <= 0)  | 
|
return 0;  | 
|
final ReentrantLock lock = this.lock;  | 
|
lock.lock();  | 
|
            try { | 
|
RunnableScheduledFuture<?> first;  | 
|
int n = 0;  | 
|
while (n < maxElements && (first = peekExpired()) != null) {  | 
|
c.add(first); // In this order, in case add() throws.  | 
|
finishPoll(first);  | 
|
++n;  | 
|
}  | 
|
return n;  | 
|
            } finally { | 
|
lock.unlock();  | 
|
}  | 
|
}  | 
|
public Object[] toArray() {  | 
|
final ReentrantLock lock = this.lock;  | 
|
lock.lock();  | 
|
            try { | 
|
return Arrays.copyOf(queue, size, Object[].class);  | 
|
            } finally { | 
|
lock.unlock();  | 
|
}  | 
|
}  | 
|
        @SuppressWarnings("unchecked") | 
|
public <T> T[] toArray(T[] a) {  | 
|
final ReentrantLock lock = this.lock;  | 
|
lock.lock();  | 
|
            try { | 
|
if (a.length < size)  | 
|
return (T[]) Arrays.copyOf(queue, size, a.getClass());  | 
|
System.arraycopy(queue, 0, a, 0, size);  | 
|
if (a.length > size)  | 
|
a[size] = null;  | 
|
return a;  | 
|
            } finally { | 
|
lock.unlock();  | 
|
}  | 
|
}  | 
|
public Iterator<Runnable> iterator() {  | 
|
return new Itr(Arrays.copyOf(queue, size));  | 
|
}  | 
|
        /** | 
|
         * Snapshot iterator that works off copy of underlying q array. | 
|
*/  | 
|
private class Itr implements Iterator<Runnable> {  | 
|
final RunnableScheduledFuture<?>[] array;  | 
|
            int cursor = 0;     // index of next element to return | 
|
            int lastRet = -1;   // index of last element, or -1 if no such | 
|
Itr(RunnableScheduledFuture<?>[] array) {  | 
|
this.array = array;  | 
|
}  | 
|
            public boolean hasNext() { | 
|
return cursor < array.length;  | 
|
}  | 
|
public Runnable next() {  | 
|
if (cursor >= array.length)  | 
|
throw new NoSuchElementException();  | 
|
lastRet = cursor;  | 
|
return array[cursor++];  | 
|
}  | 
|
            public void remove() { | 
|
if (lastRet < 0)  | 
|
throw new IllegalStateException();  | 
|
DelayedWorkQueue.this.remove(array[lastRet]);  | 
|
lastRet = -1;  | 
|
}  | 
|
}  | 
|
}  | 
|
}  |