Protected ActiveMQClient API against misuse.

1. Changed public fields in ActiveMQClient to private and added getters.

Exposing fields for thread pool sized allow to modify them in undesired ways.
I made these fields private and added corresponding getter methods.
In addition, I renamed the field 'globalThreadMaxPoolSize'
to 'globalThreadPoolSize' to be more consistent with the
'globalScheduledThreadPoolSize' field name.
I also adapted some tests to always call clearThreadPools after
the thread pool size configuration has been changed.

2. Protect against injecting null as thread pools

ActiveMQClient.injectPools allowed null as injected thread pools.
The effect was that internal threads pools were created,
but not shutdown correctly.
This commit is contained in:
Bernd Gutjahr 2016-04-15 09:14:30 +02:00 committed by Martyn Taylor
parent 2360fb4c9f
commit 1b5396c033
4 changed files with 26 additions and 14 deletions

View File

@ -46,9 +46,9 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
*/ */
public final class ActiveMQClient { public final class ActiveMQClient {
public static int globalThreadMaxPoolSize; private static int globalThreadPoolSize;
public static int globalScheduledThreadPoolSize; private static int globalScheduledThreadPoolSize;
public static final String DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME = RoundRobinConnectionLoadBalancingPolicy.class.getCanonicalName(); public static final String DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME = RoundRobinConnectionLoadBalancingPolicy.class.getCanonicalName();
@ -195,13 +195,15 @@ public final class ActiveMQClient {
} }
/** Warning: This method has to be called before any clients or servers is started on the JVM otherwise previous ServerLocator would be broken after this call. */ /** Warning: This method has to be called before any clients or servers is started on the JVM otherwise previous ServerLocator would be broken after this call. */
public static synchronized void injectPools(ThreadPoolExecutor globalThreadPool, ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) { public static synchronized void injectPools(ThreadPoolExecutor globalThreadPool, ScheduledThreadPoolExecutor scheduledThreadPool) {
if (globalThreadPool == null || scheduledThreadPool == null)
throw new IllegalArgumentException("thread pools must not be null");
// We call clearThreadPools as that will shutdown any previously used executor // We call clearThreadPools as that will shutdown any previously used executor
clearThreadPools(); clearThreadPools();
ActiveMQClient.globalThreadPool = globalThreadPool; ActiveMQClient.globalThreadPool = globalThreadPool;
ActiveMQClient.globalScheduledThreadPool = scheduledThreadPoolExecutor; ActiveMQClient.globalScheduledThreadPool = scheduledThreadPool;
injectedPools = true; injectedPools = true;
} }
@ -214,11 +216,11 @@ public final class ActiveMQClient {
} }
}); });
if (globalThreadMaxPoolSize == -1) { if (globalThreadPoolSize == -1) {
globalThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory); globalThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
} }
else { else {
globalThreadPool = new ThreadPoolExecutor(ActiveMQClient.globalThreadMaxPoolSize, ActiveMQClient.globalThreadMaxPoolSize, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory); globalThreadPool = new ThreadPoolExecutor(ActiveMQClient.globalThreadPoolSize, ActiveMQClient.globalThreadPoolSize, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory);
} }
} }
return globalThreadPool; return globalThreadPool;
@ -238,8 +240,13 @@ public final class ActiveMQClient {
return globalScheduledThreadPool; return globalScheduledThreadPool;
} }
public static int getGlobalThreadPoolSize() {
return globalThreadPoolSize;
}
public static int getGlobalScheduledThreadPoolSize() {
return globalScheduledThreadPoolSize;
}
/** /**
* Initializes the global thread pools properties from System properties. This method will update the global * Initializes the global thread pools properties from System properties. This method will update the global
@ -273,7 +280,7 @@ public final class ActiveMQClient {
if (globalThreadMaxPoolSize < 2 && globalThreadMaxPoolSize != -1) globalThreadMaxPoolSize = 2; if (globalThreadMaxPoolSize < 2 && globalThreadMaxPoolSize != -1) globalThreadMaxPoolSize = 2;
ActiveMQClient.globalScheduledThreadPoolSize = globalScheduledThreadPoolSize; ActiveMQClient.globalScheduledThreadPoolSize = globalScheduledThreadPoolSize;
ActiveMQClient.globalThreadMaxPoolSize = globalThreadMaxPoolSize; ActiveMQClient.globalThreadPoolSize = globalThreadMaxPoolSize;
} }
/** /**

View File

@ -54,7 +54,7 @@ public class ClientThreadPoolsTest {
System.clearProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY); System.clearProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY);
ActiveMQClient.initializeGlobalThreadPoolProperties(); ActiveMQClient.initializeGlobalThreadPoolProperties();
ActiveMQClient.clearThreadPools(); ActiveMQClient.clearThreadPools();
Assert.assertEquals(ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE, ActiveMQClient.globalThreadMaxPoolSize); Assert.assertEquals(ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE, ActiveMQClient.getGlobalThreadPoolSize());
} }
@Test @Test
@ -65,14 +65,15 @@ public class ClientThreadPoolsTest {
System.setProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY, "" + threadPoolMaxSize); System.setProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY, "" + threadPoolMaxSize);
System.setProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY, "" + scheduledThreadPoolSize); System.setProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY, "" + scheduledThreadPoolSize);
ActiveMQClient.initializeGlobalThreadPoolProperties(); ActiveMQClient.initializeGlobalThreadPoolProperties();
ActiveMQClient.clearThreadPools();
testSystemPropertiesThreadPoolSettings(threadPoolMaxSize, scheduledThreadPoolSize); testSystemPropertiesThreadPoolSettings(threadPoolMaxSize, scheduledThreadPoolSize);
} }
@Test @Test
public void testShutdownPoolInUse() throws Exception { public void testShutdownPoolInUse() throws Exception {
ActiveMQClient.clearThreadPools();
ActiveMQClient.setGlobalThreadPoolProperties(10, 1); ActiveMQClient.setGlobalThreadPoolProperties(10, 1);
ActiveMQClient.clearThreadPools();
final CountDownLatch inUse = new CountDownLatch(1); final CountDownLatch inUse = new CountDownLatch(1);
final CountDownLatch neverLeave = new CountDownLatch(1); final CountDownLatch neverLeave = new CountDownLatch(1);
@ -146,6 +147,7 @@ public class ClientThreadPoolsTest {
int testScheduleSize = 9; int testScheduleSize = 9;
ActiveMQClient.setGlobalThreadPoolProperties(testMaxSize, testScheduleSize); ActiveMQClient.setGlobalThreadPoolProperties(testMaxSize, testScheduleSize);
ActiveMQClient.clearThreadPools();
testSystemPropertiesThreadPoolSettings(testMaxSize, testScheduleSize); testSystemPropertiesThreadPoolSettings(testMaxSize, testScheduleSize);
} }
@ -156,6 +158,7 @@ public class ClientThreadPoolsTest {
int testScheduleSize = 9; int testScheduleSize = 9;
ActiveMQClient.setGlobalThreadPoolProperties(testMaxSize, testScheduleSize); ActiveMQClient.setGlobalThreadPoolProperties(testMaxSize, testScheduleSize);
ActiveMQClient.clearThreadPools();
testSystemPropertiesThreadPoolSettings(testMaxSize, testScheduleSize); testSystemPropertiesThreadPoolSettings(testMaxSize, testScheduleSize);
} }
@ -255,6 +258,7 @@ public class ClientThreadPoolsTest {
// Resets the global thread pool properties back to default. // Resets the global thread pool properties back to default.
System.setProperties(systemProperties); System.setProperties(systemProperties);
ActiveMQClient.initializeGlobalThreadPoolProperties(); ActiveMQClient.initializeGlobalThreadPoolProperties();
ActiveMQClient.clearThreadPools();
} }

View File

@ -103,11 +103,11 @@ public class InVMConnector extends AbstractConnector {
private static synchronized ExecutorService getInVMExecutor() { private static synchronized ExecutorService getInVMExecutor() {
if (threadPoolExecutor == null) { if (threadPoolExecutor == null) {
if (ActiveMQClient.globalThreadMaxPoolSize <= -1) { if (ActiveMQClient.getGlobalThreadPoolSize() <= -1) {
threadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory()); threadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory());
} }
else { else {
threadPoolExecutor = Executors.newFixedThreadPool(ActiveMQClient.globalThreadMaxPoolSize); threadPoolExecutor = Executors.newFixedThreadPool(ActiveMQClient.getGlobalThreadPoolSize());
} }
} }
return threadPoolExecutor; return threadPoolExecutor;

View File

@ -74,11 +74,12 @@ public class CoreClientTest extends ActiveMQTestBase {
@Test @Test
public void testCoreClientWithGlobalThreadPoolParamtersChanged() throws Exception { public void testCoreClientWithGlobalThreadPoolParamtersChanged() throws Exception {
int originalScheduled = ActiveMQClient.globalScheduledThreadPoolSize; int originalScheduled = ActiveMQClient.getGlobalScheduledThreadPoolSize();
int originalGlobal = ActiveMQClient.globalThreadMaxPoolSize; int originalGlobal = ActiveMQClient.getGlobalThreadPoolSize();
try { try {
ActiveMQClient.setGlobalThreadPoolProperties(2, 1); ActiveMQClient.setGlobalThreadPoolProperties(2, 1);
ActiveMQClient.clearThreadPools();
ServerLocator locator = createNonHALocator(false); ServerLocator locator = createNonHALocator(false);
testCoreClient(true, locator); testCoreClient(true, locator);
} }