|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
|
|
package sun.nio.ch; |
|
|
|
import java.nio.channels.Channel; |
|
import java.nio.channels.AsynchronousChannelGroup; |
|
import java.nio.channels.spi.AsynchronousChannelProvider; |
|
import java.io.IOException; |
|
import java.io.FileDescriptor; |
|
import java.util.Queue; |
|
import java.util.concurrent.*; |
|
import java.util.concurrent.atomic.AtomicInteger; |
|
import java.util.concurrent.atomic.AtomicBoolean; |
|
import java.security.PrivilegedAction; |
|
import java.security.AccessController; |
|
import java.security.AccessControlContext; |
|
import sun.security.action.GetIntegerAction; |
|
|
|
/** |
|
* Base implementation of AsynchronousChannelGroup |
|
*/ |
|
|
|
abstract class AsynchronousChannelGroupImpl |
|
extends AsynchronousChannelGroup implements Executor |
|
{ |
|
// number of internal threads handling I/O events when using an unbounded |
|
|
|
private static final int internalThreadCount = AccessController.doPrivileged( |
|
new GetIntegerAction("sun.nio.ch.internalThreadPoolSize", 1)); |
|
|
|
|
|
private final ThreadPool pool; |
|
|
|
|
|
private final AtomicInteger threadCount = new AtomicInteger(); |
|
|
|
|
|
private ScheduledThreadPoolExecutor timeoutExecutor; |
|
|
|
// task queue for when using a fixed thread pool. In that case, a thread |
|
|
|
private final Queue<Runnable> taskQueue; |
|
|
|
|
|
private final AtomicBoolean shutdown = new AtomicBoolean(); |
|
private final Object shutdownNowLock = new Object(); |
|
private volatile boolean terminateInitiated; |
|
|
|
AsynchronousChannelGroupImpl(AsynchronousChannelProvider provider, |
|
ThreadPool pool) |
|
{ |
|
super(provider); |
|
this.pool = pool; |
|
|
|
if (pool.isFixedThreadPool()) { |
|
taskQueue = new ConcurrentLinkedQueue<>(); |
|
} else { |
|
taskQueue = null; |
|
} |
|
|
|
// use default thread factory as thread should not be visible to |
|
|
|
this.timeoutExecutor = (ScheduledThreadPoolExecutor) |
|
Executors.newScheduledThreadPool(1, ThreadPool.defaultThreadFactory()); |
|
this.timeoutExecutor.setRemoveOnCancelPolicy(true); |
|
} |
|
|
|
final ExecutorService executor() { |
|
return pool.executor(); |
|
} |
|
|
|
final boolean isFixedThreadPool() { |
|
return pool.isFixedThreadPool(); |
|
} |
|
|
|
final int fixedThreadCount() { |
|
if (isFixedThreadPool()) { |
|
return pool.poolSize(); |
|
} else { |
|
return pool.poolSize() + internalThreadCount; |
|
} |
|
} |
|
|
|
private Runnable bindToGroup(final Runnable task) { |
|
final AsynchronousChannelGroupImpl thisGroup = this; |
|
return new Runnable() { |
|
public void run() { |
|
Invoker.bindToGroup(thisGroup); |
|
task.run(); |
|
} |
|
}; |
|
} |
|
|
|
private void startInternalThread(final Runnable task) { |
|
AccessController.doPrivileged(new PrivilegedAction<>() { |
|
@Override |
|
public Void run() { |
|
// internal threads should not be visible to application so |
|
|
|
ThreadPool.defaultThreadFactory().newThread(task).start(); |
|
return null; |
|
} |
|
}); |
|
} |
|
|
|
protected final void startThreads(Runnable task) { |
|
if (!isFixedThreadPool()) { |
|
for (int i=0; i<internalThreadCount; i++) { |
|
startInternalThread(task); |
|
threadCount.incrementAndGet(); |
|
} |
|
} |
|
if (pool.poolSize() > 0) { |
|
task = bindToGroup(task); |
|
try { |
|
for (int i=0; i<pool.poolSize(); i++) { |
|
pool.executor().execute(task); |
|
threadCount.incrementAndGet(); |
|
} |
|
} catch (RejectedExecutionException x) { |
|
// nothing we can do |
|
} |
|
} |
|
} |
|
|
|
final int threadCount() { |
|
return threadCount.get(); |
|
} |
|
|
|
|
|
|
|
*/ |
|
final int threadExit(Runnable task, boolean replaceMe) { |
|
if (replaceMe) { |
|
try { |
|
if (Invoker.isBoundToAnyGroup()) { |
|
|
|
pool.executor().execute(bindToGroup(task)); |
|
} else { |
|
|
|
startInternalThread(task); |
|
} |
|
return threadCount.get(); |
|
} catch (RejectedExecutionException x) { |
|
// unable to replace |
|
} |
|
} |
|
return threadCount.decrementAndGet(); |
|
} |
|
|
|
|
|
|
|
*/ |
|
abstract void executeOnHandlerTask(Runnable task); |
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
final void executeOnPooledThread(Runnable task) { |
|
if (isFixedThreadPool()) { |
|
executeOnHandlerTask(task); |
|
} else { |
|
pool.executor().execute(bindToGroup(task)); |
|
} |
|
} |
|
|
|
final void offerTask(Runnable task) { |
|
taskQueue.offer(task); |
|
} |
|
|
|
final Runnable pollTask() { |
|
return (taskQueue == null) ? null : taskQueue.poll(); |
|
} |
|
|
|
final Future<?> schedule(Runnable task, long timeout, TimeUnit unit) { |
|
try { |
|
return timeoutExecutor.schedule(task, timeout, unit); |
|
} catch (RejectedExecutionException rej) { |
|
if (terminateInitiated) { |
|
|
|
return null; |
|
} |
|
throw new AssertionError(rej); |
|
} |
|
} |
|
|
|
@Override |
|
public final boolean isShutdown() { |
|
return shutdown.get(); |
|
} |
|
|
|
@Override |
|
public final boolean isTerminated() { |
|
return pool.executor().isTerminated(); |
|
} |
|
|
|
|
|
|
|
*/ |
|
abstract boolean isEmpty(); |
|
|
|
|
|
|
|
*/ |
|
abstract Object attachForeignChannel(Channel channel, FileDescriptor fdo) |
|
throws IOException; |
|
|
|
|
|
|
|
*/ |
|
abstract void detachForeignChannel(Object key); |
|
|
|
|
|
|
|
*/ |
|
abstract void closeAllChannels() throws IOException; |
|
|
|
|
|
|
|
*/ |
|
abstract void shutdownHandlerTasks(); |
|
|
|
private void shutdownExecutors() { |
|
AccessController.doPrivileged( |
|
new PrivilegedAction<>() { |
|
public Void run() { |
|
pool.executor().shutdown(); |
|
timeoutExecutor.shutdown(); |
|
return null; |
|
} |
|
}, |
|
null, |
|
new RuntimePermission("modifyThread")); |
|
} |
|
|
|
@Override |
|
public final void shutdown() { |
|
if (shutdown.getAndSet(true)) { |
|
|
|
return; |
|
} |
|
// if there are channels in the group then shutdown will continue |
|
|
|
if (!isEmpty()) { |
|
return; |
|
} |
|
// initiate termination (acquire shutdownNowLock to ensure that other |
|
|
|
synchronized (shutdownNowLock) { |
|
if (!terminateInitiated) { |
|
terminateInitiated = true; |
|
shutdownHandlerTasks(); |
|
shutdownExecutors(); |
|
} |
|
} |
|
} |
|
|
|
@Override |
|
public final void shutdownNow() throws IOException { |
|
shutdown.set(true); |
|
synchronized (shutdownNowLock) { |
|
if (!terminateInitiated) { |
|
terminateInitiated = true; |
|
closeAllChannels(); |
|
shutdownHandlerTasks(); |
|
shutdownExecutors(); |
|
} |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
*/ |
|
final void detachFromThreadPool() { |
|
if (shutdown.getAndSet(true)) |
|
throw new AssertionError("Already shutdown"); |
|
if (!isEmpty()) |
|
throw new AssertionError("Group not empty"); |
|
shutdownHandlerTasks(); |
|
} |
|
|
|
@Override |
|
public final boolean awaitTermination(long timeout, TimeUnit unit) |
|
throws InterruptedException |
|
{ |
|
return pool.executor().awaitTermination(timeout, unit); |
|
} |
|
|
|
|
|
|
|
*/ |
|
@Override |
|
public final void execute(Runnable task) { |
|
SecurityManager sm = System.getSecurityManager(); |
|
if (sm != null) { |
|
// when a security manager is installed then the user's task |
|
|
|
final AccessControlContext acc = AccessController.getContext(); |
|
final Runnable delegate = task; |
|
task = new Runnable() { |
|
@Override |
|
public void run() { |
|
AccessController.doPrivileged(new PrivilegedAction<>() { |
|
@Override |
|
public Void run() { |
|
delegate.run(); |
|
return null; |
|
} |
|
}, acc); |
|
} |
|
}; |
|
} |
|
executeOnPooledThread(task); |
|
} |
|
} |