This closes #263
This commit is contained in:
commit
f8741db4a3
|
@ -16,14 +16,26 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.api.core.client;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import java.net.URI;
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy;
|
||||
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
|
||||
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
|
||||
import org.apache.activemq.artemis.uri.ServerLocatorParser;
|
||||
|
||||
import java.net.URI;
|
||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||
|
||||
/**
|
||||
* Utility class for creating ActiveMQ Artemis {@link ClientSessionFactory} objects.
|
||||
|
@ -34,6 +46,10 @@ import java.net.URI;
|
|||
*/
|
||||
public final class ActiveMQClient {
|
||||
|
||||
public static int globalThreadMaxPoolSize;
|
||||
|
||||
public static int globalScheduledThreadPoolSize;
|
||||
|
||||
public static final String DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME = RoundRobinConnectionLoadBalancingPolicy.class.getCanonicalName();
|
||||
|
||||
public static final long DEFAULT_CLIENT_FAILURE_CHECK_PERIOD = ActiveMQDefaultConfiguration.getDefaultClientFailureCheckPeriod();
|
||||
|
@ -102,6 +118,8 @@ public final class ActiveMQClient {
|
|||
|
||||
public static final int DEFAULT_THREAD_POOL_MAX_SIZE = -1;
|
||||
|
||||
public static final int DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE = 500;
|
||||
|
||||
public static final int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 5;
|
||||
|
||||
public static final boolean DEFAULT_CACHE_LARGE_MESSAGE_CLIENT = false;
|
||||
|
@ -114,6 +132,160 @@ public final class ActiveMQClient {
|
|||
|
||||
public static final String DEFAULT_CORE_PROTOCOL = "CORE";
|
||||
|
||||
public static final String THREAD_POOL_MAX_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.thread.pool.max.size";
|
||||
|
||||
public static final String SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.scheduled.thread.pool.core.size";
|
||||
|
||||
private static ThreadPoolExecutor globalThreadPool;
|
||||
|
||||
private static boolean injectedPools = false;
|
||||
|
||||
private static ScheduledThreadPoolExecutor globalScheduledThreadPool;
|
||||
|
||||
|
||||
static {
|
||||
initializeGlobalThreadPoolProperties();
|
||||
}
|
||||
|
||||
public static synchronized void clearThreadPools() {
|
||||
clearThreadPools(10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
|
||||
public static synchronized void clearThreadPools(long time, TimeUnit unit) {
|
||||
|
||||
if (injectedPools) {
|
||||
globalThreadPool = null;
|
||||
globalScheduledThreadPool = null;
|
||||
injectedPools = false;
|
||||
return;
|
||||
}
|
||||
|
||||
if (globalThreadPool != null) {
|
||||
globalThreadPool.shutdown();
|
||||
try {
|
||||
if (!globalThreadPool.awaitTermination(time, unit)) {
|
||||
globalThreadPool.shutdownNow();
|
||||
ActiveMQClientLogger.LOGGER.warn("Couldn't finish the client globalThreadPool in less than 10 seconds, interrupting it now");
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
throw new ActiveMQInterruptedException(e);
|
||||
}
|
||||
finally {
|
||||
globalThreadPool = null;
|
||||
}
|
||||
}
|
||||
|
||||
if (globalScheduledThreadPool != null) {
|
||||
globalScheduledThreadPool.shutdown();
|
||||
try {
|
||||
if (!globalScheduledThreadPool.awaitTermination(time, unit)) {
|
||||
globalScheduledThreadPool.shutdownNow();
|
||||
ActiveMQClientLogger.LOGGER.warn("Couldn't finish the client scheduled in less than 10 seconds, interrupting it now");
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
throw new ActiveMQInterruptedException(e);
|
||||
}
|
||||
finally {
|
||||
globalScheduledThreadPool = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Warning: This method has to be called before any clients or servers is started on the JVM otherwise previous ServerLocator would be broken after this call. */
|
||||
public static synchronized void injectPools(ThreadPoolExecutor globalThreadPool, ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) {
|
||||
|
||||
// We call clearThreadPools as that will shutdown any previously used executor
|
||||
clearThreadPools();
|
||||
|
||||
ActiveMQClient.globalThreadPool = globalThreadPool;
|
||||
ActiveMQClient.globalScheduledThreadPool = scheduledThreadPoolExecutor;
|
||||
injectedPools = true;
|
||||
}
|
||||
|
||||
public static synchronized ThreadPoolExecutor getGlobalThreadPool() {
|
||||
if (globalThreadPool == null) {
|
||||
ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
|
||||
@Override
|
||||
public ThreadFactory run() {
|
||||
return new ActiveMQThreadFactory("ActiveMQ-client-global-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
|
||||
}
|
||||
});
|
||||
|
||||
if (globalThreadMaxPoolSize == -1) {
|
||||
globalThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
|
||||
}
|
||||
else {
|
||||
globalThreadPool = new ThreadPoolExecutor(ActiveMQClient.globalThreadMaxPoolSize, ActiveMQClient.globalThreadMaxPoolSize, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory);
|
||||
}
|
||||
}
|
||||
return globalThreadPool;
|
||||
}
|
||||
|
||||
public static synchronized ScheduledThreadPoolExecutor getGlobalScheduledThreadPool() {
|
||||
if (globalScheduledThreadPool == null) {
|
||||
ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
|
||||
@Override
|
||||
public ThreadFactory run() {
|
||||
return new ActiveMQThreadFactory("ActiveMQ-client-global-scheduled-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
|
||||
}
|
||||
});
|
||||
|
||||
globalScheduledThreadPool = new ScheduledThreadPoolExecutor(ActiveMQClient.globalScheduledThreadPoolSize, factory);
|
||||
}
|
||||
return globalScheduledThreadPool;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* (Re)Initializes the global thread pools properties from System properties. This method will update the global
|
||||
* thread pool configuration based on defined System properties (or defaults if they are not set) notifying
|
||||
* all globalThreadPoolListeners. The System properties key names are as follow:
|
||||
*
|
||||
* ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY="activemq.artemis.client.global.thread.pool.max.size"
|
||||
* ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY="activemq.artemis.client.global.scheduled.thread.pool.core.size
|
||||
*
|
||||
* The min value for max thread pool size is 2. Providing a value lower than 2 will be ignored and will defaul to 2.
|
||||
*
|
||||
* Note. The ServerLocatorImpl registers a listener and uses it to configure it's global thread pools. If global
|
||||
* thread pools have already been created, they will be updated with these new values.
|
||||
*/
|
||||
public static void initializeGlobalThreadPoolProperties() {
|
||||
|
||||
setGlobalThreadPoolProperties(Integer.valueOf(Integer.valueOf(System.getProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE))), Integer.valueOf(System.getProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows programatically configuration of global thread pools properties. This method will update the global
|
||||
* thread pool configuration based on the provided values notifying all globalThreadPoolListeners.
|
||||
*
|
||||
* Note. The ServerLocatorImpl registers a listener and uses it to configure it's global thread pools. If global
|
||||
* thread pools have already been created, they will be updated with these new values.
|
||||
*
|
||||
* The min value for max thread pool size is 2. Providing a value lower than 2 will be ignored and will default to 2.
|
||||
*/
|
||||
public static void setGlobalThreadPoolProperties(int globalThreadMaxPoolSize, int globalScheduledThreadPoolSize) {
|
||||
|
||||
if (globalThreadMaxPoolSize < 2) globalThreadMaxPoolSize = 2;
|
||||
|
||||
ActiveMQClient.globalScheduledThreadPoolSize = globalScheduledThreadPoolSize;
|
||||
ActiveMQClient.globalThreadMaxPoolSize = globalThreadMaxPoolSize;
|
||||
|
||||
// if injected, we won't do anything with the pool as they're not ours
|
||||
if (!injectedPools) {
|
||||
// Right now I'm ignoring the corePool size on purpose as there's no way to have two values for the number of threads
|
||||
// this is basically a fixed size thread pool (although the pool grows on demand)
|
||||
getGlobalThreadPool().setCorePoolSize(globalThreadMaxPoolSize);
|
||||
getGlobalThreadPool().setMaximumPoolSize(globalThreadMaxPoolSize);
|
||||
|
||||
getGlobalScheduledThreadPool().setCorePoolSize(globalScheduledThreadPoolSize);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an ActiveMQConnectionFactory;
|
||||
*
|
||||
|
|
|
@ -16,9 +16,12 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.api.core.client;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.Interceptor;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
|
||||
import org.apache.activemq.artemis.core.client.impl.Topology;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory;
|
||||
|
||||
|
@ -784,6 +787,5 @@ public interface ServerLocator extends AutoCloseable {
|
|||
|
||||
String getOutgoingInterceptorList();
|
||||
|
||||
|
||||
|
||||
boolean setThreadPools(ExecutorService threadPool, ScheduledExecutorService scheduledThreadPoolExecutor);
|
||||
}
|
||||
|
|
|
@ -187,12 +187,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
|
||||
private final List<Interceptor> outgoingInterceptors = new CopyOnWriteArrayList<>();
|
||||
|
||||
private static ExecutorService globalThreadPool;
|
||||
|
||||
private Executor startExecutor;
|
||||
|
||||
private static ScheduledExecutorService globalScheduledThreadPool;
|
||||
|
||||
private AfterConnectInternalListener afterConnectListener;
|
||||
|
||||
private String groupID;
|
||||
|
@ -208,68 +204,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
public static Runnable finalizeCallback = null;
|
||||
|
||||
public static synchronized void clearThreadPools() {
|
||||
|
||||
if (globalThreadPool != null) {
|
||||
globalThreadPool.shutdown();
|
||||
try {
|
||||
if (!globalThreadPool.awaitTermination(10, TimeUnit.SECONDS)) {
|
||||
throw new IllegalStateException("Couldn't finish the globalThreadPool");
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
throw new ActiveMQInterruptedException(e);
|
||||
}
|
||||
finally {
|
||||
globalThreadPool = null;
|
||||
}
|
||||
}
|
||||
|
||||
if (globalScheduledThreadPool != null) {
|
||||
globalScheduledThreadPool.shutdown();
|
||||
try {
|
||||
if (!globalScheduledThreadPool.awaitTermination(10, TimeUnit.SECONDS)) {
|
||||
throw new IllegalStateException("Couldn't finish the globalScheduledThreadPool");
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
throw new ActiveMQInterruptedException(e);
|
||||
}
|
||||
finally {
|
||||
globalScheduledThreadPool = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static synchronized ExecutorService getGlobalThreadPool() {
|
||||
if (globalThreadPool == null) {
|
||||
ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
|
||||
@Override
|
||||
public ThreadFactory run() {
|
||||
return new ActiveMQThreadFactory("ActiveMQ-client-global-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
|
||||
}
|
||||
});
|
||||
|
||||
globalThreadPool = Executors.newCachedThreadPool(factory);
|
||||
}
|
||||
|
||||
return globalThreadPool;
|
||||
}
|
||||
|
||||
private static synchronized ScheduledExecutorService getGlobalScheduledThreadPool() {
|
||||
if (globalScheduledThreadPool == null) {
|
||||
ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
|
||||
@Override
|
||||
public ThreadFactory run() {
|
||||
return new ActiveMQThreadFactory("ActiveMQ-client-global-scheduled-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
|
||||
}
|
||||
});
|
||||
|
||||
globalScheduledThreadPool = Executors.newScheduledThreadPool(ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
|
||||
|
||||
factory);
|
||||
}
|
||||
|
||||
return globalScheduledThreadPool;
|
||||
ActiveMQClient.clearThreadPools();
|
||||
}
|
||||
|
||||
private synchronized void setThreadPools() {
|
||||
|
@ -277,9 +212,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
return;
|
||||
}
|
||||
else if (useGlobalPools) {
|
||||
threadPool = getGlobalThreadPool();
|
||||
threadPool = ActiveMQClient.getGlobalThreadPool();
|
||||
|
||||
scheduledThreadPool = getGlobalScheduledThreadPool();
|
||||
scheduledThreadPool = ActiveMQClient.getGlobalScheduledThreadPool();
|
||||
}
|
||||
else {
|
||||
this.shutdownPool = true;
|
||||
|
@ -309,6 +244,23 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean setThreadPools(ExecutorService threadPool, ScheduledExecutorService scheduledThreadPool) {
|
||||
|
||||
if (threadPool == null || scheduledThreadPool == null) return false;
|
||||
|
||||
if (this.threadPool == null && this.scheduledThreadPool == null) {
|
||||
useGlobalPools = false;
|
||||
shutdownPool = false;
|
||||
this.threadPool = threadPool;
|
||||
this.scheduledThreadPool = scheduledThreadPool;
|
||||
return true;
|
||||
}
|
||||
else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private void instantiateLoadBalancingPolicy() {
|
||||
if (connectionLoadBalancingPolicyClassName == null) {
|
||||
throw new IllegalStateException("Please specify a load balancing policy class name on the session factory");
|
||||
|
@ -409,10 +361,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
|
||||
useGlobalPools = ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS;
|
||||
|
||||
scheduledThreadPoolMaxSize = ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
|
||||
|
||||
threadPoolMaxSize = ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
|
||||
|
||||
scheduledThreadPoolMaxSize = ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
|
||||
|
||||
retryInterval = ActiveMQClient.DEFAULT_RETRY_INTERVAL;
|
||||
|
||||
retryIntervalMultiplier = ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
|
||||
|
|
|
@ -0,0 +1,261 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class ClientThreadPoolsTest {
|
||||
|
||||
private static Properties systemProperties;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
systemProperties = System.getProperties();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
System.clearProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY);
|
||||
System.clearProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY);
|
||||
ActiveMQClient.initializeGlobalThreadPoolProperties();
|
||||
ActiveMQClient.clearThreadPools();
|
||||
Assert.assertEquals(ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE, ActiveMQClient.globalThreadMaxPoolSize);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSystemPropertyThreadPoolSettings() throws Exception {
|
||||
int threadPoolMaxSize = 100;
|
||||
int scheduledThreadPoolSize = 10;
|
||||
|
||||
System.setProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY, "" + threadPoolMaxSize);
|
||||
System.setProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY, "" + scheduledThreadPoolSize);
|
||||
ActiveMQClient.initializeGlobalThreadPoolProperties();
|
||||
|
||||
testSystemPropertiesThreadPoolSettings(threadPoolMaxSize, scheduledThreadPoolSize);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShutdownPoolInUse() throws Exception {
|
||||
ActiveMQClient.clearThreadPools();
|
||||
ActiveMQClient.setGlobalThreadPoolProperties(10, 1);
|
||||
|
||||
final CountDownLatch inUse = new CountDownLatch(1);
|
||||
final CountDownLatch neverLeave = new CountDownLatch(1);
|
||||
|
||||
ActiveMQClient.getGlobalThreadPool().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
System.err.println("Hello!");
|
||||
try {
|
||||
inUse.countDown();
|
||||
neverLeave.await();
|
||||
}
|
||||
catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
neverLeave.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertTrue(inUse.await(10, TimeUnit.SECONDS));
|
||||
ActiveMQClient.clearThreadPools(100, TimeUnit.MILLISECONDS);
|
||||
Assert.assertTrue(neverLeave.await(10, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInjectPools() throws Exception {
|
||||
ActiveMQClient.clearThreadPools();
|
||||
|
||||
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 1,
|
||||
0L, TimeUnit.MILLISECONDS,
|
||||
new LinkedBlockingQueue<Runnable>());
|
||||
|
||||
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor)Executors.newScheduledThreadPool(1);
|
||||
|
||||
ActiveMQClient.injectPools(poolExecutor, scheduledThreadPoolExecutor);
|
||||
|
||||
final CountDownLatch inUse = new CountDownLatch(1);
|
||||
final CountDownLatch neverLeave = new CountDownLatch(1);
|
||||
|
||||
ActiveMQClient.getGlobalThreadPool().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
System.err.println("Hello!");
|
||||
try {
|
||||
inUse.countDown();
|
||||
neverLeave.await();
|
||||
}
|
||||
catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
neverLeave.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
Assert.assertTrue(inUse.await(10, TimeUnit.SECONDS));
|
||||
poolExecutor.shutdownNow();
|
||||
scheduledThreadPoolExecutor.shutdownNow();
|
||||
Assert.assertTrue(neverLeave.await(10, TimeUnit.SECONDS));
|
||||
|
||||
Assert.assertTrue(inUse.await(10, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(neverLeave.await(10, TimeUnit.SECONDS));
|
||||
|
||||
ActiveMQClient.clearThreadPools(100, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStaticPropertiesThreadPoolSettings() throws Exception {
|
||||
|
||||
int testMaxSize = 999;
|
||||
int testScheduleSize = 9;
|
||||
|
||||
ActiveMQClient.setGlobalThreadPoolProperties(testMaxSize, testScheduleSize);
|
||||
testSystemPropertiesThreadPoolSettings(testMaxSize, testScheduleSize);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSmallPool() throws Exception {
|
||||
|
||||
int testMaxSize = 2;
|
||||
int testScheduleSize = 9;
|
||||
|
||||
ActiveMQClient.setGlobalThreadPoolProperties(testMaxSize, testScheduleSize);
|
||||
testSystemPropertiesThreadPoolSettings(testMaxSize, testScheduleSize);
|
||||
}
|
||||
|
||||
private void testSystemPropertiesThreadPoolSettings(int expectedMax, int expectedScheduled) throws Exception {
|
||||
ServerLocatorImpl serverLocator = new ServerLocatorImpl(false);
|
||||
serverLocator.isUseGlobalPools();
|
||||
|
||||
Method setThreadPools = ServerLocatorImpl.class.getDeclaredMethod("setThreadPools");
|
||||
setThreadPools.setAccessible(true);
|
||||
setThreadPools.invoke(serverLocator);
|
||||
|
||||
// TODO: I would get this from the ActiveMQClient
|
||||
Field threadPoolField = ServerLocatorImpl.class.getDeclaredField("threadPool");
|
||||
Field scheduledThreadPoolField = ServerLocatorImpl.class.getDeclaredField("scheduledThreadPool");
|
||||
|
||||
threadPoolField.setAccessible(true);
|
||||
scheduledThreadPoolField.setAccessible(true);
|
||||
|
||||
ThreadPoolExecutor threadPool = ActiveMQClient.getGlobalThreadPool();
|
||||
|
||||
final CountDownLatch doneMax = new CountDownLatch(expectedMax);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final CountDownLatch latchTotal = new CountDownLatch(expectedMax * 3); // we will schedule 3 * max, so all runnables should execute
|
||||
final AtomicInteger errors = new AtomicInteger(0);
|
||||
|
||||
|
||||
// Set this to true if you need to debug why executions are not being performed.
|
||||
final boolean debugExecutions = false;
|
||||
|
||||
for (int i = 0; i < expectedMax * 3; i++) {
|
||||
final int localI = i;
|
||||
threadPool.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
|
||||
if (debugExecutions) {
|
||||
System.out.println("runnable " + localI);
|
||||
}
|
||||
doneMax.countDown();
|
||||
latch.await();
|
||||
latchTotal.countDown();
|
||||
}
|
||||
catch (Exception e) {
|
||||
errors.incrementAndGet();
|
||||
}
|
||||
finally {
|
||||
if (debugExecutions) {
|
||||
System.out.println("done " + localI);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Assert.assertTrue(doneMax.await(5, TimeUnit.SECONDS));
|
||||
latch.countDown();
|
||||
Assert.assertTrue(latchTotal.await(5, TimeUnit.SECONDS));
|
||||
|
||||
|
||||
ScheduledThreadPoolExecutor scheduledThreadPool = (ScheduledThreadPoolExecutor) scheduledThreadPoolField.get(serverLocator);
|
||||
|
||||
// TODO: We need to figure out what to do with getCorePoolSize
|
||||
assertEquals(expectedMax, threadPool.getCorePoolSize());
|
||||
assertEquals(expectedMax, threadPool.getMaximumPoolSize());
|
||||
assertEquals(expectedScheduled, scheduledThreadPool.getCorePoolSize());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThreadPoolInjection() throws Exception {
|
||||
|
||||
ServerLocator serverLocator = new ServerLocatorImpl(false);
|
||||
|
||||
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
|
||||
ScheduledThreadPoolExecutor scheduledThreadPool = new ScheduledThreadPoolExecutor(1);
|
||||
serverLocator.setThreadPools(threadPool, scheduledThreadPool);
|
||||
|
||||
Field threadPoolField = ServerLocatorImpl.class.getDeclaredField("threadPool");
|
||||
Field scheduledThreadPoolField = ServerLocatorImpl.class.getDeclaredField("scheduledThreadPool");
|
||||
|
||||
Method initialise = ServerLocatorImpl.class.getDeclaredMethod("initialise");
|
||||
initialise.setAccessible(true);
|
||||
initialise.invoke(serverLocator);
|
||||
|
||||
threadPoolField.setAccessible(true);
|
||||
scheduledThreadPoolField.setAccessible(true);
|
||||
|
||||
ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadPoolField.get(serverLocator);
|
||||
ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) scheduledThreadPoolField.get(serverLocator);
|
||||
|
||||
assertEquals(threadPool, tpe);
|
||||
assertEquals(scheduledThreadPool, stpe);
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup() {
|
||||
// Resets the global thread pool properties back to default.
|
||||
System.setProperties(systemProperties);
|
||||
ActiveMQClient.initializeGlobalThreadPoolProperties();
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -2403,8 +2403,10 @@ public abstract class ActiveMQTestBase extends Assert {
|
|||
protected void closeAllSessionFactories() {
|
||||
synchronized (sessionFactories) {
|
||||
for (ClientSessionFactory sf : sessionFactories) {
|
||||
closeSessionFactory(sf);
|
||||
assert sf.isClosed();
|
||||
if (!sf.isClosed()) {
|
||||
closeSessionFactory(sf);
|
||||
assert sf.isClosed();
|
||||
}
|
||||
}
|
||||
sessionFactories.clear();
|
||||
}
|
||||
|
|
|
@ -16,8 +16,14 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.client;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||
|
@ -36,33 +42,61 @@ public class CoreClientTest extends ActiveMQTestBase {
|
|||
|
||||
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
|
||||
|
||||
// Constants -----------------------------------------------------
|
||||
|
||||
// Attributes ----------------------------------------------------
|
||||
|
||||
// Static --------------------------------------------------------
|
||||
|
||||
// Constructors --------------------------------------------------
|
||||
|
||||
// Public --------------------------------------------------------
|
||||
|
||||
@Test
|
||||
public void testCoreClientNetty() throws Exception {
|
||||
testCoreClient(true);
|
||||
testCoreClient(true, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCoreClientInVM() throws Exception {
|
||||
testCoreClient(false);
|
||||
testCoreClient(false, null);
|
||||
}
|
||||
|
||||
private void testCoreClient(final boolean netty) throws Exception {
|
||||
@Test
|
||||
public void testCoreClientWithInjectedThreadPools() throws Exception {
|
||||
|
||||
ExecutorService threadPool = Executors.newCachedThreadPool();
|
||||
ScheduledThreadPoolExecutor scheduledThreadPool = new ScheduledThreadPoolExecutor(10);
|
||||
|
||||
ServerLocator locator = createNonHALocator(false);
|
||||
boolean setThreadPools = locator.setThreadPools(threadPool, scheduledThreadPool);
|
||||
|
||||
assertTrue(setThreadPools);
|
||||
testCoreClient(true, locator);
|
||||
|
||||
threadPool.shutdown();
|
||||
scheduledThreadPool.shutdown();
|
||||
|
||||
threadPool.awaitTermination(60, TimeUnit.SECONDS);
|
||||
scheduledThreadPool.awaitTermination(60, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCoreClientWithGlobalThreadPoolParamtersChanged() throws Exception {
|
||||
|
||||
int originalScheduled = ActiveMQClient.globalScheduledThreadPoolSize;
|
||||
int originalGlobal = ActiveMQClient.globalThreadMaxPoolSize;
|
||||
|
||||
try {
|
||||
ActiveMQClient.setGlobalThreadPoolProperties(2, 1);
|
||||
ServerLocator locator = createNonHALocator(false);
|
||||
testCoreClient(true, locator);
|
||||
}
|
||||
finally {
|
||||
// restoring original value otherwise future tests would be screwed up
|
||||
ActiveMQClient.setGlobalThreadPoolProperties(originalGlobal, originalScheduled);
|
||||
ActiveMQClient.clearThreadPools();
|
||||
}
|
||||
}
|
||||
|
||||
private void testCoreClient(final boolean netty, ServerLocator serverLocator) throws Exception {
|
||||
final SimpleString QUEUE = new SimpleString("CoreClientTestQueue");
|
||||
|
||||
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultConfig(netty), false));
|
||||
|
||||
server.start();
|
||||
ServerLocator locator = createNonHALocator(netty);
|
||||
|
||||
ServerLocator locator = serverLocator == null ? createNonHALocator(netty) : serverLocator;
|
||||
|
||||
ClientSessionFactory sf = createSessionFactory(locator);
|
||||
|
||||
|
@ -105,5 +139,7 @@ public class CoreClientTest extends ActiveMQTestBase {
|
|||
|
||||
message2.acknowledge();
|
||||
}
|
||||
|
||||
sf.close();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue