This closes #468

This commit is contained in:
Martyn Taylor 2016-04-20 15:29:25 +01:00
commit 0a719e08ed
4 changed files with 44 additions and 40 deletions

View File

@ -19,7 +19,9 @@ package org.apache.activemq.artemis.api.core.client;
import java.net.URI;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
@ -46,9 +48,9 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
*/
public final class ActiveMQClient {
public static int globalThreadMaxPoolSize;
private static int globalThreadPoolSize;
public static int globalScheduledThreadPoolSize;
private static int globalScheduledThreadPoolSize;
public static final String DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME = RoundRobinConnectionLoadBalancingPolicy.class.getCanonicalName();
@ -136,11 +138,11 @@ public final class ActiveMQClient {
public static final String SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.scheduled.thread.pool.core.size";
private static ThreadPoolExecutor globalThreadPool;
private static ExecutorService globalThreadPool;
private static boolean injectedPools = false;
private static ScheduledThreadPoolExecutor globalScheduledThreadPool;
private static ScheduledExecutorService globalScheduledThreadPool;
static {
@ -195,17 +197,19 @@ public final class ActiveMQClient {
}
/** Warning: This method has to be called before any clients or servers is started on the JVM otherwise previous ServerLocator would be broken after this call. */
public static synchronized void injectPools(ThreadPoolExecutor globalThreadPool, ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) {
public static synchronized void injectPools(ExecutorService globalThreadPool, ScheduledExecutorService scheduledThreadPool) {
if (globalThreadPool == null || scheduledThreadPool == null)
throw new IllegalArgumentException("thread pools must not be null");
// We call clearThreadPools as that will shutdown any previously used executor
clearThreadPools();
ActiveMQClient.globalThreadPool = globalThreadPool;
ActiveMQClient.globalScheduledThreadPool = scheduledThreadPoolExecutor;
ActiveMQClient.globalScheduledThreadPool = scheduledThreadPool;
injectedPools = true;
}
public static synchronized ThreadPoolExecutor getGlobalThreadPool() {
public static synchronized ExecutorService getGlobalThreadPool() {
if (globalThreadPool == null) {
ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
@ -214,17 +218,17 @@ public final class ActiveMQClient {
}
});
if (globalThreadMaxPoolSize == -1) {
if (globalThreadPoolSize == -1) {
globalThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
}
else {
globalThreadPool = new ThreadPoolExecutor(ActiveMQClient.globalThreadMaxPoolSize, ActiveMQClient.globalThreadMaxPoolSize, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory);
globalThreadPool = new ThreadPoolExecutor(ActiveMQClient.globalThreadPoolSize, ActiveMQClient.globalThreadPoolSize, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory);
}
}
return globalThreadPool;
}
public static synchronized ScheduledThreadPoolExecutor getGlobalScheduledThreadPool() {
public static synchronized ScheduledExecutorService getGlobalScheduledThreadPool() {
if (globalScheduledThreadPool == null) {
ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
@ -238,52 +242,47 @@ public final class ActiveMQClient {
return globalScheduledThreadPool;
}
public static int getGlobalThreadPoolSize() {
return globalThreadPoolSize;
}
public static int getGlobalScheduledThreadPoolSize() {
return globalScheduledThreadPoolSize;
}
/**
* (Re)Initializes the global thread pools properties from System properties. This method will update the global
* thread pool configuration based on defined System properties (or defaults if they are not set) notifying
* all globalThreadPoolListeners. The System properties key names are as follow:
* Initializes the global thread pools properties from System properties. This method will update the global
* thread pool configuration based on defined System properties (or defaults if they are not set).
* The System properties key names are as follow:
*
* ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY="activemq.artemis.client.global.thread.pool.max.size"
* ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY="activemq.artemis.client.global.scheduled.thread.pool.core.size
*
* The min value for max thread pool size is 2. Providing a value lower than 2 will be ignored and will defaul to 2.
* The min value for max thread pool size is 2. If the value is not -1, but lower than 2, it will be ignored and will default to 2.
* A value of -1 configures an unbounded thread pool.
*
* Note. The ServerLocatorImpl registers a listener and uses it to configure it's global thread pools. If global
* thread pools have already been created, they will be updated with these new values.
* Note: If global thread pools have already been created, they will not be updated with these new values.
*/
public static void initializeGlobalThreadPoolProperties() {
setGlobalThreadPoolProperties(Integer.valueOf(Integer.valueOf(System.getProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE))), Integer.valueOf(System.getProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE)));
setGlobalThreadPoolProperties(Integer.valueOf(System.getProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE)), Integer.valueOf(System.getProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE)));
}
/**
* Allows programatically configuration of global thread pools properties. This method will update the global
* thread pool configuration based on the provided values notifying all globalThreadPoolListeners.
*
* Note. The ServerLocatorImpl registers a listener and uses it to configure it's global thread pools. If global
* thread pools have already been created, they will be updated with these new values.
* Note: If global thread pools have already been created, they will not be updated with these new values.
*
* The min value for max thread pool size is 2. Providing a value lower than 2 will be ignored and will default to 2.
* The min value for globalThreadMaxPoolSize is 2. If the value is not -1, but lower than 2, it will be ignored and will default to 2.
* A value of -1 configures an unbounded thread pool.
*/
public static void setGlobalThreadPoolProperties(int globalThreadMaxPoolSize, int globalScheduledThreadPoolSize) {
if (globalThreadMaxPoolSize < 2) globalThreadMaxPoolSize = 2;
if (globalThreadMaxPoolSize < 2 && globalThreadMaxPoolSize != -1) globalThreadMaxPoolSize = 2;
ActiveMQClient.globalScheduledThreadPoolSize = globalScheduledThreadPoolSize;
ActiveMQClient.globalThreadMaxPoolSize = globalThreadMaxPoolSize;
// if injected, we won't do anything with the pool as they're not ours
if (!injectedPools) {
// Right now I'm ignoring the corePool size on purpose as there's no way to have two values for the number of threads
// this is basically a fixed size thread pool (although the pool grows on demand)
getGlobalThreadPool().setCorePoolSize(globalThreadMaxPoolSize);
getGlobalThreadPool().setMaximumPoolSize(globalThreadMaxPoolSize);
getGlobalScheduledThreadPool().setCorePoolSize(globalScheduledThreadPoolSize);
}
ActiveMQClient.globalThreadPoolSize = globalThreadMaxPoolSize;
}
/**

View File

@ -54,7 +54,7 @@ public class ClientThreadPoolsTest {
System.clearProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY);
ActiveMQClient.initializeGlobalThreadPoolProperties();
ActiveMQClient.clearThreadPools();
Assert.assertEquals(ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE, ActiveMQClient.globalThreadMaxPoolSize);
Assert.assertEquals(ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE, ActiveMQClient.getGlobalThreadPoolSize());
}
@Test
@ -65,14 +65,15 @@ public class ClientThreadPoolsTest {
System.setProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY, "" + threadPoolMaxSize);
System.setProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY, "" + scheduledThreadPoolSize);
ActiveMQClient.initializeGlobalThreadPoolProperties();
ActiveMQClient.clearThreadPools();
testSystemPropertiesThreadPoolSettings(threadPoolMaxSize, scheduledThreadPoolSize);
}
@Test
public void testShutdownPoolInUse() throws Exception {
ActiveMQClient.clearThreadPools();
ActiveMQClient.setGlobalThreadPoolProperties(10, 1);
ActiveMQClient.clearThreadPools();
final CountDownLatch inUse = new CountDownLatch(1);
final CountDownLatch neverLeave = new CountDownLatch(1);
@ -146,6 +147,7 @@ public class ClientThreadPoolsTest {
int testScheduleSize = 9;
ActiveMQClient.setGlobalThreadPoolProperties(testMaxSize, testScheduleSize);
ActiveMQClient.clearThreadPools();
testSystemPropertiesThreadPoolSettings(testMaxSize, testScheduleSize);
}
@ -156,6 +158,7 @@ public class ClientThreadPoolsTest {
int testScheduleSize = 9;
ActiveMQClient.setGlobalThreadPoolProperties(testMaxSize, testScheduleSize);
ActiveMQClient.clearThreadPools();
testSystemPropertiesThreadPoolSettings(testMaxSize, testScheduleSize);
}
@ -174,7 +177,7 @@ public class ClientThreadPoolsTest {
threadPoolField.setAccessible(true);
scheduledThreadPoolField.setAccessible(true);
ThreadPoolExecutor threadPool = ActiveMQClient.getGlobalThreadPool();
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) ActiveMQClient.getGlobalThreadPool();
final CountDownLatch doneMax = new CountDownLatch(expectedMax);
final CountDownLatch latch = new CountDownLatch(1);
@ -255,6 +258,7 @@ public class ClientThreadPoolsTest {
// Resets the global thread pool properties back to default.
System.setProperties(systemProperties);
ActiveMQClient.initializeGlobalThreadPoolProperties();
ActiveMQClient.clearThreadPools();
}

View File

@ -103,11 +103,11 @@ public class InVMConnector extends AbstractConnector {
private static synchronized ExecutorService getInVMExecutor() {
if (threadPoolExecutor == null) {
if (ActiveMQClient.globalThreadMaxPoolSize <= -1) {
if (ActiveMQClient.getGlobalThreadPoolSize() <= -1) {
threadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory());
}
else {
threadPoolExecutor = Executors.newFixedThreadPool(ActiveMQClient.globalThreadMaxPoolSize);
threadPoolExecutor = Executors.newFixedThreadPool(ActiveMQClient.getGlobalThreadPoolSize());
}
}
return threadPoolExecutor;

View File

@ -74,11 +74,12 @@ public class CoreClientTest extends ActiveMQTestBase {
@Test
public void testCoreClientWithGlobalThreadPoolParamtersChanged() throws Exception {
int originalScheduled = ActiveMQClient.globalScheduledThreadPoolSize;
int originalGlobal = ActiveMQClient.globalThreadMaxPoolSize;
int originalScheduled = ActiveMQClient.getGlobalScheduledThreadPoolSize();
int originalGlobal = ActiveMQClient.getGlobalThreadPoolSize();
try {
ActiveMQClient.setGlobalThreadPoolProperties(2, 1);
ActiveMQClient.clearThreadPools();
ServerLocator locator = createNonHALocator(false);
testCoreClient(true, locator);
}