This closes #211

This commit is contained in:
Clebert Suconic 2015-10-22 10:14:42 -04:00
commit aa4cac64ba
9 changed files with 85 additions and 85 deletions

View File

@ -36,6 +36,14 @@ public final class ActiveMQThreadFactory implements ThreadFactory {
private final AccessControlContext acc; private final AccessControlContext acc;
/**
* Construct a new instance. The access control context of the calling thread will be the one used to create
* new threads if a security manager is installed.
*
* @param groupName the name of the thread group to assign threads to by default
* @param daemon whether the created threads should be daemon threads
* @param tccl the context class loader of newly created threads
*/
public ActiveMQThreadFactory(final String groupName, final boolean daemon, final ClassLoader tccl) { public ActiveMQThreadFactory(final String groupName, final boolean daemon, final ClassLoader tccl) {
group = new ThreadGroup(groupName + "-" + System.identityHashCode(this)); group = new ThreadGroup(groupName + "-" + System.identityHashCode(this));
@ -45,12 +53,12 @@ public final class ActiveMQThreadFactory implements ThreadFactory {
this.daemon = daemon; this.daemon = daemon;
this.acc = (System.getSecurityManager() == null) ? null : AccessController.getContext(); this.acc = AccessController.getContext();
} }
public Thread newThread(final Runnable command) { public Thread newThread(final Runnable command) {
// create a thread in a privileged block if running with Security Manager // create a thread in a privileged block if running with Security Manager
if (acc != null && System.getSecurityManager() != null) { if (acc != null) {
return AccessController.doPrivileged(new ThreadCreateAction(command), acc); return AccessController.doPrivileged(new ThreadCreateAction(command), acc);
} }
else { else {
@ -76,7 +84,6 @@ public final class ActiveMQThreadFactory implements ThreadFactory {
t.setDaemon(daemon); t.setDaemon(daemon);
t.setPriority(threadPriority); t.setPriority(threadPriority);
t.setContextClassLoader(tccl); t.setContextClassLoader(tccl);
return t; return t;
} }

View File

@ -242,7 +242,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private static synchronized ExecutorService getGlobalThreadPool() { private static synchronized ExecutorService getGlobalThreadPool() {
if (globalThreadPool == null) { if (globalThreadPool == null) {
ThreadFactory factory = new ActiveMQThreadFactory("ActiveMQ-client-global-threads", true, getThisClassLoader()); ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
public ThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-client-global-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
}
});
globalThreadPool = Executors.newCachedThreadPool(factory); globalThreadPool = Executors.newCachedThreadPool(factory);
} }
@ -252,11 +257,16 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private static synchronized ScheduledExecutorService getGlobalScheduledThreadPool() { private static synchronized ScheduledExecutorService getGlobalScheduledThreadPool() {
if (globalScheduledThreadPool == null) { if (globalScheduledThreadPool == null) {
ThreadFactory factory = new ActiveMQThreadFactory("ActiveMQ-client-global-scheduled-threads", true, getThisClassLoader()); ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
public ThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-client-global-scheduled-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
}
});
globalScheduledThreadPool = Executors.newScheduledThreadPool(ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, globalScheduledThreadPool = Executors.newScheduledThreadPool(ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
factory); factory);
} }
return globalScheduledThreadPool; return globalScheduledThreadPool;
@ -274,7 +284,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
else { else {
this.shutdownPool = true; this.shutdownPool = true;
ThreadFactory factory = new ActiveMQThreadFactory("ActiveMQ-client-factory-threads-" + System.identityHashCode(this), true, getThisClassLoader()); ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
public ThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-client-factory-threads-" + System.identityHashCode(this), true, ClientSessionFactoryImpl.class.getClassLoader());
}
});
if (threadPoolMaxSize == -1) { if (threadPoolMaxSize == -1) {
threadPool = Executors.newCachedThreadPool(factory); threadPool = Executors.newCachedThreadPool(factory);
@ -283,21 +298,17 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
threadPool = Executors.newFixedThreadPool(threadPoolMaxSize, factory); threadPool = Executors.newFixedThreadPool(threadPoolMaxSize, factory);
} }
factory = new ActiveMQThreadFactory("ActiveMQ-client-factory-pinger-threads-" + System.identityHashCode(this), true, getThisClassLoader()); factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
public ThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-client-factory-pinger-threads-" + System.identityHashCode(this), true, ClientSessionFactoryImpl.class.getClassLoader());
}
});
scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory); scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
} }
} }
private static ClassLoader getThisClassLoader() {
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
public ClassLoader run() {
return ClientSessionFactoryImpl.class.getClassLoader();
}
});
}
private void instantiateLoadBalancingPolicy() { private void instantiateLoadBalancingPolicy() {
if (connectionLoadBalancingPolicyClassName == null) { if (connectionLoadBalancingPolicyClassName == null) {
throw new IllegalStateException("Please specify a load balancing policy class name on the session factory"); throw new IllegalStateException("Please specify a load balancing policy class name on the session factory");

View File

@ -44,14 +44,6 @@ public class SharedNioEventLoopGroup extends NioEventLoopGroup {
super(numThreads, factory); super(numThreads, factory);
} }
private static ClassLoader getThisClassLoader() {
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
public ClassLoader run() {
return ClientSessionFactoryImpl.class.getClassLoader();
}
});
}
public static synchronized void forceShutdown() { public static synchronized void forceShutdown() {
if (instance != null) { if (instance != null) {
instance.shutdown(); instance.shutdown();
@ -68,7 +60,12 @@ public class SharedNioEventLoopGroup extends NioEventLoopGroup {
} }
} }
else { else {
instance = new SharedNioEventLoopGroup(numThreads, new ActiveMQThreadFactory("ActiveMQ-client-netty-threads", true, getThisClassLoader())); instance = new SharedNioEventLoopGroup(numThreads, AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
public ThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-client-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
}
}));
} }
instance.nioChannelFactoryCount.incrementAndGet(); instance.nioChannelFactoryCount.incrementAndGet();
return instance; return instance;

View File

@ -112,7 +112,12 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
} }
if (isSupportsCallbacks()) { if (isSupportsCallbacks()) {
writeExecutor = Executors.newSingleThreadExecutor(new ActiveMQThreadFactory("ActiveMQ-Asynchronous-Persistent-Writes" + System.identityHashCode(this), true, AbstractSequentialFileFactory.getThisClassLoader())); writeExecutor = Executors.newSingleThreadExecutor(AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() {
@Override
public ActiveMQThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-Asynchronous-Persistent-Writes" + System.identityHashCode(this), true, AbstractSequentialFileFactory.class.getClassLoader());
}
}));
} }
} }
@ -177,13 +182,4 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
return Arrays.asList(fileNames); return Arrays.asList(fileNames);
} }
private static ClassLoader getThisClassLoader() {
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
public ClassLoader run() {
return AbstractSequentialFileFactory.class.getClassLoader();
}
});
}
} }

View File

@ -186,7 +186,12 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
this.running.set(true); this.running.set(true);
pollerExecutor = Executors.newCachedThreadPool(new ActiveMQThreadFactory("ActiveMQ-AIO-poller-pool" + System.identityHashCode(this), true, AIOSequentialFileFactory.getThisClassLoader())); pollerExecutor = Executors.newCachedThreadPool(AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() {
@Override
public ActiveMQThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-AIO-poller-pool" + System.identityHashCode(this), true, AIOSequentialFileFactory.class.getClassLoader());
}
}));
pollerExecutor.execute(new PollerRunnable()); pollerExecutor.execute(new PollerRunnable());
} }
@ -431,15 +436,6 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
} }
} }
private static ClassLoader getThisClassLoader() {
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
public ClassLoader run() {
return AIOSequentialFileFactory.class.getClassLoader();
}
});
}
@Override @Override
public String toString() { public String toString() {
return AIOSequentialFileFactory.class.getSimpleName() + "(buffersControl.stopped=" + buffersControl.stopped + return AIOSequentialFileFactory.class.getSimpleName() + "(buffersControl.stopped=" + buffersControl.stopped +

View File

@ -1813,7 +1813,12 @@ public class JournalStorageManager implements StorageManager {
cleanupIncompleteFiles(); cleanupIncompleteFiles();
singleThreadExecutor = Executors.newSingleThreadExecutor(new ActiveMQThreadFactory("ActiveMQ-IO-SingleThread", true, getThisClassLoader())); singleThreadExecutor = Executors.newSingleThreadExecutor(AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() {
@Override
public ActiveMQThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-IO-SingleThread", true, JournalStorageManager.class.getClassLoader());
}
}));
bindingsJournal.start(); bindingsJournal.start();
@ -2287,14 +2292,6 @@ public class JournalStorageManager implements StorageManager {
} }
} }
private static ClassLoader getThisClassLoader() {
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
public ClassLoader run() {
return JournalStorageManager.class.getClassLoader();
}
});
}
// Inner Classes // Inner Classes
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------

View File

@ -266,7 +266,12 @@ public class NettyAcceptor implements Acceptor {
threadsToUse = this.nioRemotingThreads; threadsToUse = this.nioRemotingThreads;
} }
channelClazz = NioServerSocketChannel.class; channelClazz = NioServerSocketChannel.class;
eventLoopGroup = new NioEventLoopGroup(threadsToUse, new ActiveMQThreadFactory("activemq-netty-threads", true, getThisClassLoader())); eventLoopGroup = new NioEventLoopGroup(threadsToUse, AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() {
@Override
public ActiveMQThreadFactory run() {
return new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
}
}));
} }
bootstrap = new ServerBootstrap(); bootstrap = new ServerBootstrap();
@ -682,13 +687,4 @@ public class NettyAcceptor implements Acceptor {
cancelled = true; cancelled = true;
} }
} }
private static ClassLoader getThisClassLoader() {
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
public ClassLoader run() {
return ClientSessionFactoryImpl.class.getClassLoader();
}
});
}
} }

View File

@ -188,21 +188,20 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
return; return;
} }
ClassLoader tccl = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
public ClassLoader run() {
return Thread.currentThread().getContextClassLoader();
}
});
// The remoting service maintains it's own thread pool for handling remoting traffic // The remoting service maintains it's own thread pool for handling remoting traffic
// If OIO each connection will have it's own thread // If OIO each connection will have it's own thread
// If NIO these are capped at nio-remoting-threads which defaults to num cores * 3 // If NIO these are capped at nio-remoting-threads which defaults to num cores * 3
// This needs to be a different thread pool to the main thread pool especially for OIO where we may need // This needs to be a different thread pool to the main thread pool especially for OIO where we may need
// to support many hundreds of connections, but the main thread pool must be kept small for better performance // to support many hundreds of connections, but the main thread pool must be kept small for better performance
ThreadFactory tFactory = new ActiveMQThreadFactory("ActiveMQ-remoting-threads-" + server.toString() + ThreadFactory tFactory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
"-" + @Override
System.identityHashCode(this), false, tccl); public ThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-remoting-threads-" + server.toString() +
"-" +
System.identityHashCode(this), false, Thread.currentThread().getContextClassLoader());
}
});
threadPool = Executors.newCachedThreadPool(tFactory); threadPool = Executors.newCachedThreadPool(tFactory);

View File

@ -1405,7 +1405,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
* Executor based on the provided Thread pool. Otherwise we create a new ThreadPool. * Executor based on the provided Thread pool. Otherwise we create a new ThreadPool.
*/ */
if (serviceRegistry.getExecutorService() == null) { if (serviceRegistry.getExecutorService() == null) {
ThreadFactory tFactory = new ActiveMQThreadFactory("ActiveMQ-server-" + this.toString(), false, getThisClassLoader()); ThreadFactory tFactory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
public ThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-server-" + this.toString(), false, ClientSessionFactoryImpl.class.getClassLoader());
}
});
if (configuration.getThreadPoolMaxSize() == -1) { if (configuration.getThreadPoolMaxSize() == -1) {
threadPool = Executors.newCachedThreadPool(tFactory); threadPool = Executors.newCachedThreadPool(tFactory);
} }
@ -1423,7 +1428,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
* Scheduled ExecutorService otherwise we create a new one. * Scheduled ExecutorService otherwise we create a new one.
*/ */
if (serviceRegistry.getScheduledExecutorService() == null) { if (serviceRegistry.getScheduledExecutorService() == null) {
ThreadFactory tFactory = new ActiveMQThreadFactory("ActiveMQ-scheduled-threads", false, getThisClassLoader()); ThreadFactory tFactory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
public ThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-scheduled-threads", false, ClientSessionFactoryImpl.class.getClassLoader());
}
});
scheduledPool = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), tFactory); scheduledPool = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), tFactory);
} }
else { else {
@ -1784,15 +1794,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
} }
} }
private static ClassLoader getThisClassLoader() {
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
public ClassLoader run() {
return ClientSessionFactoryImpl.class.getClassLoader();
}
});
}
/** /**
* Check if journal directory exists or create it (if configured to do so) * Check if journal directory exists or create it (if configured to do so)
*/ */