ARTEMIS-312 Allow configurable of, and inject of client global thread pools

This commit is contained in:
Martyn Taylor 2015-12-11 17:53:25 +00:00 committed by Clebert Suconic
parent f4235a6b9f
commit 0e8f2f39af
6 changed files with 517 additions and 92 deletions

View File

@ -16,14 +16,26 @@
*/ */
package org.apache.activemq.artemis.api.core.client; package org.apache.activemq.artemis.api.core.client;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import java.net.URI;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy; import org.apache.activemq.artemis.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.uri.ServerLocatorParser; import org.apache.activemq.artemis.uri.ServerLocatorParser;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import java.net.URI;
/** /**
* Utility class for creating ActiveMQ Artemis {@link ClientSessionFactory} objects. * Utility class for creating ActiveMQ Artemis {@link ClientSessionFactory} objects.
@ -34,6 +46,10 @@ import java.net.URI;
*/ */
public final class ActiveMQClient { public final class ActiveMQClient {
public static int globalThreadMaxPoolSize;
public static int globalScheduledThreadPoolSize;
public static final String DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME = RoundRobinConnectionLoadBalancingPolicy.class.getCanonicalName(); public static final String DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME = RoundRobinConnectionLoadBalancingPolicy.class.getCanonicalName();
public static final long DEFAULT_CLIENT_FAILURE_CHECK_PERIOD = ActiveMQDefaultConfiguration.getDefaultClientFailureCheckPeriod(); public static final long DEFAULT_CLIENT_FAILURE_CHECK_PERIOD = ActiveMQDefaultConfiguration.getDefaultClientFailureCheckPeriod();
@ -102,6 +118,8 @@ public final class ActiveMQClient {
public static final int DEFAULT_THREAD_POOL_MAX_SIZE = -1; public static final int DEFAULT_THREAD_POOL_MAX_SIZE = -1;
public static final int DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE = 500;
public static final int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 5; public static final int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 5;
public static final boolean DEFAULT_CACHE_LARGE_MESSAGE_CLIENT = false; public static final boolean DEFAULT_CACHE_LARGE_MESSAGE_CLIENT = false;
@ -114,6 +132,160 @@ public final class ActiveMQClient {
public static final String DEFAULT_CORE_PROTOCOL = "CORE"; public static final String DEFAULT_CORE_PROTOCOL = "CORE";
public static final String THREAD_POOL_MAX_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.thread.pool.max.size";
public static final String SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.scheduled.thread.pool.core.size";
private static ThreadPoolExecutor globalThreadPool;
private static boolean injectedPools = false;
private static ScheduledThreadPoolExecutor globalScheduledThreadPool;
static {
initializeGlobalThreadPoolProperties();
}
public static synchronized void clearThreadPools() {
clearThreadPools(10, TimeUnit.SECONDS);
}
public static synchronized void clearThreadPools(long time, TimeUnit unit) {
if (injectedPools) {
globalThreadPool = null;
globalScheduledThreadPool = null;
injectedPools = false;
return;
}
if (globalThreadPool != null) {
globalThreadPool.shutdown();
try {
if (!globalThreadPool.awaitTermination(time, unit)) {
globalThreadPool.shutdownNow();
ActiveMQClientLogger.LOGGER.warn("Couldn't finish the client globalThreadPool in less than 10 seconds, interrupting it now");
}
}
catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
finally {
globalThreadPool = null;
}
}
if (globalScheduledThreadPool != null) {
globalScheduledThreadPool.shutdown();
try {
if (!globalScheduledThreadPool.awaitTermination(time, unit)) {
globalScheduledThreadPool.shutdownNow();
ActiveMQClientLogger.LOGGER.warn("Couldn't finish the client scheduled in less than 10 seconds, interrupting it now");
}
}
catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
finally {
globalScheduledThreadPool = null;
}
}
}
/** Warning: This method has to be called before any clients or servers is started on the JVM otherwise previous ServerLocator would be broken after this call. */
public static synchronized void injectPools(ThreadPoolExecutor globalThreadPool, ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) {
// We call clearThreadPools as that will shutdown any previously used executor
clearThreadPools();
ActiveMQClient.globalThreadPool = globalThreadPool;
ActiveMQClient.globalScheduledThreadPool = scheduledThreadPoolExecutor;
injectedPools = true;
}
public static synchronized ThreadPoolExecutor getGlobalThreadPool() {
if (globalThreadPool == null) {
ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
public ThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-client-global-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
}
});
if (globalThreadMaxPoolSize == -1) {
globalThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
}
else {
globalThreadPool = new ThreadPoolExecutor(ActiveMQClient.globalThreadMaxPoolSize, ActiveMQClient.globalThreadMaxPoolSize, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory);
}
}
return globalThreadPool;
}
public static synchronized ScheduledThreadPoolExecutor getGlobalScheduledThreadPool() {
if (globalScheduledThreadPool == null) {
ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
public ThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-client-global-scheduled-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
}
});
globalScheduledThreadPool = new ScheduledThreadPoolExecutor(ActiveMQClient.globalScheduledThreadPoolSize, factory);
}
return globalScheduledThreadPool;
}
/**
* (Re)Initializes the global thread pools properties from System properties. This method will update the global
* thread pool configuration based on defined System properties (or defaults if they are not set) notifying
* all globalThreadPoolListeners. The System properties key names are as follow:
*
* ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY="activemq.artemis.client.global.thread.pool.max.size"
* ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY="activemq.artemis.client.global.scheduled.thread.pool.core.size
*
* The min value for max thread pool size is 2. Providing a value lower than 2 will be ignored and will defaul to 2.
*
* Note. The ServerLocatorImpl registers a listener and uses it to configure it's global thread pools. If global
* thread pools have already been created, they will be updated with these new values.
*/
public static void initializeGlobalThreadPoolProperties() {
setGlobalThreadPoolProperties(Integer.valueOf(Integer.valueOf(System.getProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE))), Integer.valueOf(System.getProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE)));
}
/**
* Allows programatically configuration of global thread pools properties. This method will update the global
* thread pool configuration based on the provided values notifying all globalThreadPoolListeners.
*
* Note. The ServerLocatorImpl registers a listener and uses it to configure it's global thread pools. If global
* thread pools have already been created, they will be updated with these new values.
*
* The min value for max thread pool size is 2. Providing a value lower than 2 will be ignored and will default to 2.
*/
public static void setGlobalThreadPoolProperties(int globalThreadMaxPoolSize, int globalScheduledThreadPoolSize) {
if (globalThreadMaxPoolSize < 2) globalThreadMaxPoolSize = 2;
ActiveMQClient.globalScheduledThreadPoolSize = globalScheduledThreadPoolSize;
ActiveMQClient.globalThreadMaxPoolSize = globalThreadMaxPoolSize;
// if injected, we won't do anything with the pool as they're not ours
if (!injectedPools) {
// Right now I'm ignoring the corePool size on purpose as there's no way to have two values for the number of threads
// this is basically a fixed size thread pool (although the pool grows on demand)
getGlobalThreadPool().setCorePoolSize(globalThreadMaxPoolSize);
getGlobalThreadPool().setMaximumPoolSize(globalThreadMaxPoolSize);
getGlobalScheduledThreadPool().setCorePoolSize(globalScheduledThreadPoolSize);
}
}
/** /**
* Creates an ActiveMQConnectionFactory; * Creates an ActiveMQConnectionFactory;
* *

View File

@ -16,9 +16,12 @@
*/ */
package org.apache.activemq.artemis.api.core.client; package org.apache.activemq.artemis.api.core.client;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.core.client.impl.Topology; import org.apache.activemq.artemis.core.client.impl.Topology;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory;
@ -784,6 +787,5 @@ public interface ServerLocator extends AutoCloseable {
String getOutgoingInterceptorList(); String getOutgoingInterceptorList();
boolean setThreadPools(ExecutorService threadPool, ScheduledExecutorService scheduledThreadPoolExecutor);
} }

View File

@ -187,12 +187,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private final List<Interceptor> outgoingInterceptors = new CopyOnWriteArrayList<>(); private final List<Interceptor> outgoingInterceptors = new CopyOnWriteArrayList<>();
private static ExecutorService globalThreadPool;
private Executor startExecutor; private Executor startExecutor;
private static ScheduledExecutorService globalScheduledThreadPool;
private AfterConnectInternalListener afterConnectListener; private AfterConnectInternalListener afterConnectListener;
private String groupID; private String groupID;
@ -208,68 +204,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
public static Runnable finalizeCallback = null; public static Runnable finalizeCallback = null;
public static synchronized void clearThreadPools() { public static synchronized void clearThreadPools() {
ActiveMQClient.clearThreadPools();
if (globalThreadPool != null) {
globalThreadPool.shutdown();
try {
if (!globalThreadPool.awaitTermination(10, TimeUnit.SECONDS)) {
throw new IllegalStateException("Couldn't finish the globalThreadPool");
}
}
catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
finally {
globalThreadPool = null;
}
}
if (globalScheduledThreadPool != null) {
globalScheduledThreadPool.shutdown();
try {
if (!globalScheduledThreadPool.awaitTermination(10, TimeUnit.SECONDS)) {
throw new IllegalStateException("Couldn't finish the globalScheduledThreadPool");
}
}
catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
finally {
globalScheduledThreadPool = null;
}
}
}
private static synchronized ExecutorService getGlobalThreadPool() {
if (globalThreadPool == null) {
ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
public ThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-client-global-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
}
});
globalThreadPool = Executors.newCachedThreadPool(factory);
}
return globalThreadPool;
}
private static synchronized ScheduledExecutorService getGlobalScheduledThreadPool() {
if (globalScheduledThreadPool == null) {
ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
public ThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-client-global-scheduled-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
}
});
globalScheduledThreadPool = Executors.newScheduledThreadPool(ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
factory);
}
return globalScheduledThreadPool;
} }
private synchronized void setThreadPools() { private synchronized void setThreadPools() {
@ -277,9 +212,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
return; return;
} }
else if (useGlobalPools) { else if (useGlobalPools) {
threadPool = getGlobalThreadPool(); threadPool = ActiveMQClient.getGlobalThreadPool();
scheduledThreadPool = getGlobalScheduledThreadPool(); scheduledThreadPool = ActiveMQClient.getGlobalScheduledThreadPool();
} }
else { else {
this.shutdownPool = true; this.shutdownPool = true;
@ -309,6 +244,23 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
} }
} }
@Override
public synchronized boolean setThreadPools(ExecutorService threadPool, ScheduledExecutorService scheduledThreadPool) {
if (threadPool == null || scheduledThreadPool == null) return false;
if (this.threadPool == null && this.scheduledThreadPool == null) {
useGlobalPools = false;
shutdownPool = false;
this.threadPool = threadPool;
this.scheduledThreadPool = scheduledThreadPool;
return true;
}
else {
return false;
}
}
private void instantiateLoadBalancingPolicy() { private void instantiateLoadBalancingPolicy() {
if (connectionLoadBalancingPolicyClassName == null) { if (connectionLoadBalancingPolicyClassName == null) {
throw new IllegalStateException("Please specify a load balancing policy class name on the session factory"); throw new IllegalStateException("Please specify a load balancing policy class name on the session factory");
@ -409,10 +361,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
useGlobalPools = ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS; useGlobalPools = ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS;
scheduledThreadPoolMaxSize = ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
threadPoolMaxSize = ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE; threadPoolMaxSize = ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
scheduledThreadPoolMaxSize = ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
retryInterval = ActiveMQClient.DEFAULT_RETRY_INTERVAL; retryInterval = ActiveMQClient.DEFAULT_RETRY_INTERVAL;
retryIntervalMultiplier = ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER; retryIntervalMultiplier = ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;

View File

@ -0,0 +1,261 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class ClientThreadPoolsTest {
private static Properties systemProperties;
@BeforeClass
public static void setup() {
systemProperties = System.getProperties();
}
@AfterClass
public static void tearDown() {
System.clearProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY);
System.clearProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY);
ActiveMQClient.initializeGlobalThreadPoolProperties();
ActiveMQClient.clearThreadPools();
Assert.assertEquals(ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE, ActiveMQClient.globalThreadMaxPoolSize);
}
@Test
public void testSystemPropertyThreadPoolSettings() throws Exception {
int threadPoolMaxSize = 100;
int scheduledThreadPoolSize = 10;
System.setProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY, "" + threadPoolMaxSize);
System.setProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY, "" + scheduledThreadPoolSize);
ActiveMQClient.initializeGlobalThreadPoolProperties();
testSystemPropertiesThreadPoolSettings(threadPoolMaxSize, scheduledThreadPoolSize);
}
@Test
public void testShutdownPoolInUse() throws Exception {
ActiveMQClient.clearThreadPools();
ActiveMQClient.setGlobalThreadPoolProperties(10, 1);
final CountDownLatch inUse = new CountDownLatch(1);
final CountDownLatch neverLeave = new CountDownLatch(1);
ActiveMQClient.getGlobalThreadPool().execute(new Runnable() {
@Override
public void run() {
System.err.println("Hello!");
try {
inUse.countDown();
neverLeave.await();
}
catch (Exception e) {
e.printStackTrace();
neverLeave.countDown();
}
}
});
Assert.assertTrue(inUse.await(10, TimeUnit.SECONDS));
ActiveMQClient.clearThreadPools(100, TimeUnit.MILLISECONDS);
Assert.assertTrue(neverLeave.await(10, TimeUnit.SECONDS));
}
@Test
public void testInjectPools() throws Exception {
ActiveMQClient.clearThreadPools();
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor)Executors.newScheduledThreadPool(1);
ActiveMQClient.injectPools(poolExecutor, scheduledThreadPoolExecutor);
final CountDownLatch inUse = new CountDownLatch(1);
final CountDownLatch neverLeave = new CountDownLatch(1);
ActiveMQClient.getGlobalThreadPool().execute(new Runnable() {
@Override
public void run() {
System.err.println("Hello!");
try {
inUse.countDown();
neverLeave.await();
}
catch (Exception e) {
e.printStackTrace();
neverLeave.countDown();
}
}
});
Assert.assertTrue(inUse.await(10, TimeUnit.SECONDS));
poolExecutor.shutdownNow();
scheduledThreadPoolExecutor.shutdownNow();
Assert.assertTrue(neverLeave.await(10, TimeUnit.SECONDS));
Assert.assertTrue(inUse.await(10, TimeUnit.SECONDS));
Assert.assertTrue(neverLeave.await(10, TimeUnit.SECONDS));
ActiveMQClient.clearThreadPools(100, TimeUnit.MILLISECONDS);
}
@Test
public void testStaticPropertiesThreadPoolSettings() throws Exception {
int testMaxSize = 999;
int testScheduleSize = 9;
ActiveMQClient.setGlobalThreadPoolProperties(testMaxSize, testScheduleSize);
testSystemPropertiesThreadPoolSettings(testMaxSize, testScheduleSize);
}
@Test
public void testSmallPool() throws Exception {
int testMaxSize = 2;
int testScheduleSize = 9;
ActiveMQClient.setGlobalThreadPoolProperties(testMaxSize, testScheduleSize);
testSystemPropertiesThreadPoolSettings(testMaxSize, testScheduleSize);
}
private void testSystemPropertiesThreadPoolSettings(int expectedMax, int expectedScheduled) throws Exception {
ServerLocatorImpl serverLocator = new ServerLocatorImpl(false);
serverLocator.isUseGlobalPools();
Method setThreadPools = ServerLocatorImpl.class.getDeclaredMethod("setThreadPools");
setThreadPools.setAccessible(true);
setThreadPools.invoke(serverLocator);
// TODO: I would get this from the ActiveMQClient
Field threadPoolField = ServerLocatorImpl.class.getDeclaredField("threadPool");
Field scheduledThreadPoolField = ServerLocatorImpl.class.getDeclaredField("scheduledThreadPool");
threadPoolField.setAccessible(true);
scheduledThreadPoolField.setAccessible(true);
ThreadPoolExecutor threadPool = ActiveMQClient.getGlobalThreadPool();
final CountDownLatch doneMax = new CountDownLatch(expectedMax);
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch latchTotal = new CountDownLatch(expectedMax * 3); // we will schedule 3 * max, so all runnables should execute
final AtomicInteger errors = new AtomicInteger(0);
// Set this to true if you need to debug why executions are not being performed.
final boolean debugExecutions = false;
for (int i = 0; i < expectedMax * 3; i++) {
final int localI = i;
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
if (debugExecutions) {
System.out.println("runnable " + localI);
}
doneMax.countDown();
latch.await();
latchTotal.countDown();
}
catch (Exception e) {
errors.incrementAndGet();
}
finally {
if (debugExecutions) {
System.out.println("done " + localI);
}
}
}
});
}
Assert.assertTrue(doneMax.await(5, TimeUnit.SECONDS));
latch.countDown();
Assert.assertTrue(latchTotal.await(5, TimeUnit.SECONDS));
ScheduledThreadPoolExecutor scheduledThreadPool = (ScheduledThreadPoolExecutor) scheduledThreadPoolField.get(serverLocator);
// TODO: We need to figure out what to do with getCorePoolSize
assertEquals(expectedMax, threadPool.getCorePoolSize());
assertEquals(expectedMax, threadPool.getMaximumPoolSize());
assertEquals(expectedScheduled, scheduledThreadPool.getCorePoolSize());
}
@Test
public void testThreadPoolInjection() throws Exception {
ServerLocator serverLocator = new ServerLocatorImpl(false);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
ScheduledThreadPoolExecutor scheduledThreadPool = new ScheduledThreadPoolExecutor(1);
serverLocator.setThreadPools(threadPool, scheduledThreadPool);
Field threadPoolField = ServerLocatorImpl.class.getDeclaredField("threadPool");
Field scheduledThreadPoolField = ServerLocatorImpl.class.getDeclaredField("scheduledThreadPool");
Method initialise = ServerLocatorImpl.class.getDeclaredMethod("initialise");
initialise.setAccessible(true);
initialise.invoke(serverLocator);
threadPoolField.setAccessible(true);
scheduledThreadPoolField.setAccessible(true);
ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadPoolField.get(serverLocator);
ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) scheduledThreadPoolField.get(serverLocator);
assertEquals(threadPool, tpe);
assertEquals(scheduledThreadPool, stpe);
}
@After
public void cleanup() {
// Resets the global thread pool properties back to default.
System.setProperties(systemProperties);
ActiveMQClient.initializeGlobalThreadPoolProperties();
}
}

View File

@ -2403,8 +2403,10 @@ public abstract class ActiveMQTestBase extends Assert {
protected void closeAllSessionFactories() { protected void closeAllSessionFactories() {
synchronized (sessionFactories) { synchronized (sessionFactories) {
for (ClientSessionFactory sf : sessionFactories) { for (ClientSessionFactory sf : sessionFactories) {
closeSessionFactory(sf); if (!sf.isClosed()) {
assert sf.isClosed(); closeSessionFactory(sf);
assert sf.isClosed();
}
} }
sessionFactories.clear(); sessionFactories.clear();
} }

View File

@ -16,8 +16,14 @@
*/ */
package org.apache.activemq.artemis.tests.integration.client; package org.apache.activemq.artemis.tests.integration.client;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientProducer;
@ -36,33 +42,61 @@ public class CoreClientTest extends ActiveMQTestBase {
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER; private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
@Test @Test
public void testCoreClientNetty() throws Exception { public void testCoreClientNetty() throws Exception {
testCoreClient(true); testCoreClient(true, null);
} }
@Test @Test
public void testCoreClientInVM() throws Exception { public void testCoreClientInVM() throws Exception {
testCoreClient(false); testCoreClient(false, null);
} }
private void testCoreClient(final boolean netty) throws Exception { @Test
public void testCoreClientWithInjectedThreadPools() throws Exception {
ExecutorService threadPool = Executors.newCachedThreadPool();
ScheduledThreadPoolExecutor scheduledThreadPool = new ScheduledThreadPoolExecutor(10);
ServerLocator locator = createNonHALocator(false);
boolean setThreadPools = locator.setThreadPools(threadPool, scheduledThreadPool);
assertTrue(setThreadPools);
testCoreClient(true, locator);
threadPool.shutdown();
scheduledThreadPool.shutdown();
threadPool.awaitTermination(60, TimeUnit.SECONDS);
scheduledThreadPool.awaitTermination(60, TimeUnit.SECONDS);
}
@Test
public void testCoreClientWithGlobalThreadPoolParamtersChanged() throws Exception {
int originalScheduled = ActiveMQClient.globalScheduledThreadPoolSize;
int originalGlobal = ActiveMQClient.globalThreadMaxPoolSize;
try {
ActiveMQClient.setGlobalThreadPoolProperties(2, 1);
ServerLocator locator = createNonHALocator(false);
testCoreClient(true, locator);
}
finally {
// restoring original value otherwise future tests would be screwed up
ActiveMQClient.setGlobalThreadPoolProperties(originalGlobal, originalScheduled);
ActiveMQClient.clearThreadPools();
}
}
private void testCoreClient(final boolean netty, ServerLocator serverLocator) throws Exception {
final SimpleString QUEUE = new SimpleString("CoreClientTestQueue"); final SimpleString QUEUE = new SimpleString("CoreClientTestQueue");
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultConfig(netty), false)); ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultConfig(netty), false));
server.start(); server.start();
ServerLocator locator = createNonHALocator(netty);
ServerLocator locator = serverLocator == null ? createNonHALocator(netty) : serverLocator;
ClientSessionFactory sf = createSessionFactory(locator); ClientSessionFactory sf = createSessionFactory(locator);
@ -105,5 +139,7 @@ public class CoreClientTest extends ActiveMQTestBase {
message2.acknowledge(); message2.acknowledge();
} }
sf.close();
} }
} }