ARTEMIS-4522 simplify test & complete implementation
This commit is contained in:
parent
ff2b76c252
commit
9baad30827
|
@ -39,6 +39,7 @@ public class ServerLocatorConfig {
|
||||||
public boolean useGlobalPools = ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS;
|
public boolean useGlobalPools = ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS;
|
||||||
public int threadPoolMaxSize = ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
|
public int threadPoolMaxSize = ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
|
||||||
public int scheduledThreadPoolMaxSize = ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
|
public int scheduledThreadPoolMaxSize = ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
|
||||||
|
public int flowControlThreadPoolMaxSize = ActiveMQClient.DEFAULT_FLOW_CONTROL_THREAD_POOL_MAX_SIZE;
|
||||||
public long retryInterval = ActiveMQClient.DEFAULT_RETRY_INTERVAL;
|
public long retryInterval = ActiveMQClient.DEFAULT_RETRY_INTERVAL;
|
||||||
public double retryIntervalMultiplier = ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
|
public double retryIntervalMultiplier = ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
|
||||||
public long maxRetryInterval = ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL;
|
public long maxRetryInterval = ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL;
|
||||||
|
@ -76,6 +77,7 @@ public class ServerLocatorConfig {
|
||||||
ackBatchSize = locator.ackBatchSize;
|
ackBatchSize = locator.ackBatchSize;
|
||||||
useGlobalPools = locator.useGlobalPools;
|
useGlobalPools = locator.useGlobalPools;
|
||||||
scheduledThreadPoolMaxSize = locator.scheduledThreadPoolMaxSize;
|
scheduledThreadPoolMaxSize = locator.scheduledThreadPoolMaxSize;
|
||||||
|
flowControlThreadPoolMaxSize = locator.flowControlThreadPoolMaxSize;
|
||||||
threadPoolMaxSize = locator.threadPoolMaxSize;
|
threadPoolMaxSize = locator.threadPoolMaxSize;
|
||||||
retryInterval = locator.retryInterval;
|
retryInterval = locator.retryInterval;
|
||||||
retryIntervalMultiplier = locator.retryIntervalMultiplier;
|
retryIntervalMultiplier = locator.retryIntervalMultiplier;
|
||||||
|
|
|
@ -51,7 +51,7 @@ public final class ActiveMQClient {
|
||||||
|
|
||||||
private static int globalScheduledThreadPoolSize;
|
private static int globalScheduledThreadPoolSize;
|
||||||
|
|
||||||
private static int flowControlThreadPoolSize;
|
private static int globalFlowControlThreadPoolSize;
|
||||||
|
|
||||||
public static final String DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME = RoundRobinConnectionLoadBalancingPolicy.class.getCanonicalName();
|
public static final String DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME = RoundRobinConnectionLoadBalancingPolicy.class.getCanonicalName();
|
||||||
|
|
||||||
|
@ -150,9 +150,11 @@ public final class ActiveMQClient {
|
||||||
|
|
||||||
public static final String SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.scheduled.thread.pool.core.size";
|
public static final String SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.scheduled.thread.pool.core.size";
|
||||||
|
|
||||||
|
public static final String FLOW_CONTROL_THREAD_POOL_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.flowcontrol.thread.pool.core.size";
|
||||||
|
|
||||||
private static ExecutorService globalThreadPool;
|
private static ExecutorService globalThreadPool;
|
||||||
|
|
||||||
private static ExecutorService flowControlThreadPool;
|
private static ExecutorService globalFlowControlThreadPool;
|
||||||
|
|
||||||
private static boolean injectedPools = false;
|
private static boolean injectedPools = false;
|
||||||
|
|
||||||
|
@ -171,12 +173,13 @@ public final class ActiveMQClient {
|
||||||
if (injectedPools) {
|
if (injectedPools) {
|
||||||
globalThreadPool = null;
|
globalThreadPool = null;
|
||||||
globalScheduledThreadPool = null;
|
globalScheduledThreadPool = null;
|
||||||
flowControlThreadPool = null;
|
globalFlowControlThreadPool = null;
|
||||||
injectedPools = false;
|
injectedPools = false;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (globalThreadPool != null) {
|
if (globalThreadPool != null) {
|
||||||
|
System.out.println("Shutting down main pool...");
|
||||||
globalThreadPool.shutdownNow();
|
globalThreadPool.shutdownNow();
|
||||||
try {
|
try {
|
||||||
if (!globalThreadPool.awaitTermination(time, unit)) {
|
if (!globalThreadPool.awaitTermination(time, unit)) {
|
||||||
|
@ -191,6 +194,7 @@ public final class ActiveMQClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (globalScheduledThreadPool != null) {
|
if (globalScheduledThreadPool != null) {
|
||||||
|
System.out.println("Shutting down scheduled pool...");
|
||||||
globalScheduledThreadPool.shutdownNow();
|
globalScheduledThreadPool.shutdownNow();
|
||||||
try {
|
try {
|
||||||
if (!globalScheduledThreadPool.awaitTermination(time, unit)) {
|
if (!globalScheduledThreadPool.awaitTermination(time, unit)) {
|
||||||
|
@ -204,17 +208,18 @@ public final class ActiveMQClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (flowControlThreadPool != null) {
|
if (globalFlowControlThreadPool != null) {
|
||||||
flowControlThreadPool.shutdownNow();
|
System.out.println("Shutting down flow-control pool...");
|
||||||
|
globalFlowControlThreadPool.shutdownNow();
|
||||||
try {
|
try {
|
||||||
if (!flowControlThreadPool.awaitTermination(time, unit)) {
|
if (!globalFlowControlThreadPool.awaitTermination(time, unit)) {
|
||||||
flowControlThreadPool.shutdownNow();
|
globalFlowControlThreadPool.shutdownNow();
|
||||||
ActiveMQClientLogger.LOGGER.unableToProcessScheduledlIn10Sec();
|
ActiveMQClientLogger.LOGGER.unableToProcessGlobalFlowControlThreadPoolIn10Sec();
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw new ActiveMQInterruptedException(e);
|
throw new ActiveMQInterruptedException(e);
|
||||||
} finally {
|
} finally {
|
||||||
flowControlThreadPool = null;
|
globalFlowControlThreadPool = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -223,8 +228,9 @@ public final class ActiveMQClient {
|
||||||
* Warning: This method has to be called before any clients or servers is started on the JVM otherwise previous ServerLocator would be broken after this call.
|
* Warning: This method has to be called before any clients or servers is started on the JVM otherwise previous ServerLocator would be broken after this call.
|
||||||
*/
|
*/
|
||||||
public static synchronized void injectPools(ExecutorService globalThreadPool,
|
public static synchronized void injectPools(ExecutorService globalThreadPool,
|
||||||
ScheduledExecutorService scheduledThreadPool) {
|
ScheduledExecutorService scheduledThreadPool,
|
||||||
if (globalThreadPool == null || scheduledThreadPool == null)
|
ExecutorService flowControlThreadPool) {
|
||||||
|
if (globalThreadPool == null || scheduledThreadPool == null || flowControlThreadPool == null)
|
||||||
throw new IllegalArgumentException("thread pools must not be null");
|
throw new IllegalArgumentException("thread pools must not be null");
|
||||||
|
|
||||||
// We call clearThreadPools as that will shutdown any previously used executor
|
// We call clearThreadPools as that will shutdown any previously used executor
|
||||||
|
@ -232,44 +238,31 @@ public final class ActiveMQClient {
|
||||||
|
|
||||||
ActiveMQClient.globalThreadPool = globalThreadPool;
|
ActiveMQClient.globalThreadPool = globalThreadPool;
|
||||||
ActiveMQClient.globalScheduledThreadPool = scheduledThreadPool;
|
ActiveMQClient.globalScheduledThreadPool = scheduledThreadPool;
|
||||||
|
ActiveMQClient.globalFlowControlThreadPool = flowControlThreadPool;
|
||||||
injectedPools = true;
|
injectedPools = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static synchronized ExecutorService getGlobalThreadPool() {
|
public static synchronized ExecutorService getGlobalThreadPool() {
|
||||||
if (globalThreadPool == null) {
|
globalThreadPool = internalGetGlobalThreadPool(globalThreadPool, "ActiveMQ-client-global-threads", ActiveMQClient.globalThreadPoolSize);
|
||||||
ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
|
|
||||||
@Override
|
|
||||||
public ThreadFactory run() {
|
|
||||||
return new ActiveMQThreadFactory("ActiveMQ-client-global-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
if (globalThreadPoolSize == -1) {
|
|
||||||
globalThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
|
|
||||||
} else {
|
|
||||||
globalThreadPool = new ActiveMQThreadPoolExecutor(0, ActiveMQClient.globalThreadPoolSize, 60L, TimeUnit.SECONDS, factory);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return globalThreadPool;
|
return globalThreadPool;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static synchronized ExecutorService getGlobalFlowControlThreadPool() {
|
||||||
|
globalFlowControlThreadPool = internalGetGlobalThreadPool(globalFlowControlThreadPool, "ActiveMQ-client-global-flow-control-threads", ActiveMQClient.globalFlowControlThreadPoolSize);
|
||||||
|
return globalFlowControlThreadPool;
|
||||||
|
}
|
||||||
|
|
||||||
public static synchronized ExecutorService getFlowControlThreadPool() {
|
private static synchronized ExecutorService internalGetGlobalThreadPool(ExecutorService executorService, String groupName, int poolSize) {
|
||||||
if (flowControlThreadPool == null) {
|
if (executorService == null) {
|
||||||
ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
|
ThreadFactory factory = AccessController.doPrivileged((PrivilegedAction<ThreadFactory>) () -> new ActiveMQThreadFactory(groupName, true, ClientSessionFactoryImpl.class.getClassLoader()));
|
||||||
@Override
|
|
||||||
public ThreadFactory run() {
|
|
||||||
return new ActiveMQThreadFactory("ActiveMQ-client-flow-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
if (flowControlThreadPoolSize == -1) {
|
if (poolSize == -1) {
|
||||||
flowControlThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), factory);
|
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), factory);
|
||||||
} else {
|
} else {
|
||||||
flowControlThreadPool = new ActiveMQThreadPoolExecutor(0, ActiveMQClient.flowControlThreadPoolSize, 60L, TimeUnit.SECONDS, factory);
|
executorService = new ActiveMQThreadPoolExecutor(0, poolSize, 60L, TimeUnit.SECONDS, factory);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return flowControlThreadPool;
|
return executorService;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static synchronized ScheduledExecutorService getGlobalScheduledThreadPool() {
|
public static synchronized ScheduledExecutorService getGlobalScheduledThreadPool() {
|
||||||
|
@ -294,6 +287,10 @@ public final class ActiveMQClient {
|
||||||
return globalScheduledThreadPoolSize;
|
return globalScheduledThreadPoolSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static int getGlobalFlowControlThreadPoolSize() {
|
||||||
|
return globalFlowControlThreadPoolSize;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initializes the global thread pools properties from System properties. This method will update the global
|
* Initializes the global thread pools properties from System properties. This method will update the global
|
||||||
* thread pool configuration based on defined System properties (or defaults if they are not set).
|
* thread pool configuration based on defined System properties (or defaults if they are not set).
|
||||||
|
@ -309,7 +306,7 @@ public final class ActiveMQClient {
|
||||||
*/
|
*/
|
||||||
public static void initializeGlobalThreadPoolProperties() {
|
public static void initializeGlobalThreadPoolProperties() {
|
||||||
|
|
||||||
setGlobalThreadPoolProperties(Integer.valueOf(System.getProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE)), Integer.valueOf(System.getProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE)));
|
setGlobalThreadPoolProperties(Integer.valueOf(System.getProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE)), Integer.valueOf(System.getProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE)), Integer.valueOf(System.getProperty(ActiveMQClient.FLOW_CONTROL_THREAD_POOL_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_FLOW_CONTROL_THREAD_POOL_MAX_SIZE)));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -321,14 +318,14 @@ public final class ActiveMQClient {
|
||||||
* The min value for globalThreadMaxPoolSize is 2. If the value is not -1, but lower than 2, it will be ignored and will default to 2.
|
* The min value for globalThreadMaxPoolSize is 2. If the value is not -1, but lower than 2, it will be ignored and will default to 2.
|
||||||
* A value of -1 configures an unbounded thread pool.
|
* A value of -1 configures an unbounded thread pool.
|
||||||
*/
|
*/
|
||||||
public static void setGlobalThreadPoolProperties(int globalThreadMaxPoolSize, int globalScheduledThreadPoolSize) {
|
public static void setGlobalThreadPoolProperties(int globalThreadMaxPoolSize, int globalScheduledThreadPoolSize, int globalFlowControlThreadPoolSize) {
|
||||||
|
|
||||||
if (globalThreadMaxPoolSize < 2 && globalThreadMaxPoolSize != -1)
|
if (globalThreadMaxPoolSize < 2 && globalThreadMaxPoolSize != -1)
|
||||||
globalThreadMaxPoolSize = 2;
|
globalThreadMaxPoolSize = 2;
|
||||||
|
|
||||||
ActiveMQClient.globalScheduledThreadPoolSize = globalScheduledThreadPoolSize;
|
ActiveMQClient.globalScheduledThreadPoolSize = globalScheduledThreadPoolSize;
|
||||||
ActiveMQClient.globalThreadPoolSize = globalThreadMaxPoolSize;
|
ActiveMQClient.globalThreadPoolSize = globalThreadMaxPoolSize;
|
||||||
ActiveMQClient.flowControlThreadPoolSize = DEFAULT_FLOW_CONTROL_THREAD_POOL_MAX_SIZE;
|
ActiveMQClient.globalFlowControlThreadPoolSize = globalFlowControlThreadPoolSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -561,6 +561,26 @@ public interface ServerLocator extends AutoCloseable {
|
||||||
*/
|
*/
|
||||||
ServerLocator setThreadPoolMaxSize(int threadPoolMaxSize);
|
ServerLocator setThreadPoolMaxSize(int threadPoolMaxSize);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the maximum size of the flow-control thread pool.
|
||||||
|
* <p>
|
||||||
|
* Default value is {@link ActiveMQClient#DEFAULT_FLOW_CONTROL_THREAD_POOL_MAX_SIZE}.
|
||||||
|
*
|
||||||
|
* @return the maximum size of the flow-control thread pool.
|
||||||
|
*/
|
||||||
|
int getFlowControlThreadPoolMaxSize();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the maximum size of the flow-control thread pool.
|
||||||
|
* <p>
|
||||||
|
* This setting is relevant only if this factory does not use global pools.
|
||||||
|
* Value must be -1 (for unlimited thread pool) or greater than 0.
|
||||||
|
*
|
||||||
|
* @param flowControlThreadPoolMaxSize maximum size of the flow-control thread pool.
|
||||||
|
* @return this ServerLocator
|
||||||
|
*/
|
||||||
|
ServerLocator setFlowControlThreadPoolMaxSize(int flowControlThreadPoolMaxSize);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the time to retry connections created by this factory after failure.
|
* Returns the time to retry connections created by this factory after failure.
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -847,7 +867,7 @@ public interface ServerLocator extends AutoCloseable {
|
||||||
|
|
||||||
String getOutgoingInterceptorList();
|
String getOutgoingInterceptorList();
|
||||||
|
|
||||||
boolean setThreadPools(Executor threadPool, ScheduledExecutorService scheduledThreadPoolExecutor);
|
boolean setThreadPools(Executor threadPool, ScheduledExecutorService scheduledThreadPoolExecutor, Executor flowControlThreadPool);
|
||||||
|
|
||||||
/** This will only instantiate internal objects such as the topology */
|
/** This will only instantiate internal objects such as the topology */
|
||||||
void initialize() throws ActiveMQException;
|
void initialize() throws ActiveMQException;
|
||||||
|
|
|
@ -347,4 +347,7 @@ public interface ActiveMQClientLogger {
|
||||||
|
|
||||||
@LogMessage(id = 214034, value = "{} has negative counts {}\n{}", level = LogMessage.Level.ERROR)
|
@LogMessage(id = 214034, value = "{} has negative counts {}\n{}", level = LogMessage.Level.ERROR)
|
||||||
void negativeRefCount(String message, String count, String debugString);
|
void negativeRefCount(String message, String count, String debugString);
|
||||||
|
|
||||||
|
@LogMessage(id = 214035, value = "Couldn't finish the client globalFlowControlThreadPool in less than 10 seconds, interrupting it now", level = LogMessage.Level.WARN)
|
||||||
|
void unableToProcessGlobalFlowControlThreadPoolIn10Sec();
|
||||||
}
|
}
|
||||||
|
|
|
@ -175,12 +175,12 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
||||||
final int reconnectAttempts,
|
final int reconnectAttempts,
|
||||||
final Executor threadPool,
|
final Executor threadPool,
|
||||||
final ScheduledExecutorService scheduledThreadPool,
|
final ScheduledExecutorService scheduledThreadPool,
|
||||||
final Executor flowControlPool,
|
final Executor flowControlThreadPool,
|
||||||
final List<Interceptor> incomingInterceptors,
|
final List<Interceptor> incomingInterceptors,
|
||||||
final List<Interceptor> outgoingInterceptors) {
|
final List<Interceptor> outgoingInterceptors) {
|
||||||
this(serverLocator, new Pair<>(connectorConfig, null),
|
this(serverLocator, new Pair<>(connectorConfig, null),
|
||||||
locatorConfig, reconnectAttempts, threadPool,
|
locatorConfig, reconnectAttempts, threadPool,
|
||||||
scheduledThreadPool,flowControlPool, incomingInterceptors, outgoingInterceptors);
|
scheduledThreadPool, flowControlThreadPool, incomingInterceptors, outgoingInterceptors);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -190,12 +190,12 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
||||||
final int reconnectAttempts,
|
final int reconnectAttempts,
|
||||||
final Executor threadPool,
|
final Executor threadPool,
|
||||||
final ScheduledExecutorService scheduledThreadPool,
|
final ScheduledExecutorService scheduledThreadPool,
|
||||||
final Executor flowControlPool,
|
final Executor flowControlThreadPool,
|
||||||
final List<Interceptor> incomingInterceptors,
|
final List<Interceptor> incomingInterceptors,
|
||||||
final List<Interceptor> outgoingInterceptors) {
|
final List<Interceptor> outgoingInterceptors) {
|
||||||
this(serverLocator, connectorConfig,
|
this(serverLocator, connectorConfig,
|
||||||
locatorConfig, reconnectAttempts, threadPool,
|
locatorConfig, reconnectAttempts, threadPool,
|
||||||
scheduledThreadPool, flowControlPool,incomingInterceptors, outgoingInterceptors, null);
|
scheduledThreadPool, flowControlThreadPool, incomingInterceptors, outgoingInterceptors, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
|
ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
|
||||||
|
@ -204,7 +204,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
||||||
final int reconnectAttempts,
|
final int reconnectAttempts,
|
||||||
final Executor threadPool,
|
final Executor threadPool,
|
||||||
final ScheduledExecutorService scheduledThreadPool,
|
final ScheduledExecutorService scheduledThreadPool,
|
||||||
final Executor flowControlPoll,
|
final Executor flowControlThreadPool,
|
||||||
final List<Interceptor> incomingInterceptors,
|
final List<Interceptor> incomingInterceptors,
|
||||||
final List<Interceptor> outgoingInterceptors,
|
final List<Interceptor> outgoingInterceptors,
|
||||||
final TransportConfiguration[] connectorConfigs) {
|
final TransportConfiguration[] connectorConfigs) {
|
||||||
|
@ -256,7 +256,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
||||||
|
|
||||||
orderedExecutorFactory = new OrderedExecutorFactory(threadPool);
|
orderedExecutorFactory = new OrderedExecutorFactory(threadPool);
|
||||||
|
|
||||||
flowControlExecutor = new OrderedExecutorFactory(flowControlPoll).getExecutor();
|
flowControlExecutor = new OrderedExecutorFactory(flowControlThreadPool).getExecutor();
|
||||||
|
|
||||||
closeExecutor = orderedExecutorFactory.getExecutor();
|
closeExecutor = orderedExecutorFactory.getExecutor();
|
||||||
|
|
||||||
|
|
|
@ -137,7 +137,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
||||||
|
|
||||||
private transient Executor threadPool;
|
private transient Executor threadPool;
|
||||||
|
|
||||||
private transient Executor flowControlPool;
|
private transient Executor flowControlThreadPool;
|
||||||
private transient ScheduledExecutorService scheduledThreadPool;
|
private transient ScheduledExecutorService scheduledThreadPool;
|
||||||
|
|
||||||
private transient DiscoveryGroup discoveryGroup;
|
private transient DiscoveryGroup discoveryGroup;
|
||||||
|
@ -196,51 +196,53 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
||||||
} else if (config.useGlobalPools) {
|
} else if (config.useGlobalPools) {
|
||||||
threadPool = ActiveMQClient.getGlobalThreadPool();
|
threadPool = ActiveMQClient.getGlobalThreadPool();
|
||||||
|
|
||||||
flowControlPool = ActiveMQClient.getFlowControlThreadPool(); //TODO add option for config
|
flowControlThreadPool = ActiveMQClient.getGlobalFlowControlThreadPool();
|
||||||
|
|
||||||
scheduledThreadPool = ActiveMQClient.getGlobalScheduledThreadPool();
|
scheduledThreadPool = ActiveMQClient.getGlobalScheduledThreadPool();
|
||||||
} else {
|
} else {
|
||||||
this.shutdownPool = true;
|
this.shutdownPool = true;
|
||||||
|
|
||||||
ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
|
ThreadFactory factory = getThreadFactory("ActiveMQ-client-factory-threads-");
|
||||||
@Override
|
|
||||||
public ThreadFactory run() {
|
|
||||||
return new ActiveMQThreadFactory("ActiveMQ-client-factory-threads-" + System.identityHashCode(this), true,
|
|
||||||
ServerLocatorImpl.class.getClassLoader());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
if (config.threadPoolMaxSize == -1) {
|
if (config.threadPoolMaxSize == -1) {
|
||||||
threadPool = Executors.newCachedThreadPool(factory);
|
threadPool = Executors.newCachedThreadPool(factory);
|
||||||
} else {
|
} else {
|
||||||
threadPool = new ActiveMQThreadPoolExecutor(0, config.threadPoolMaxSize, 60L, TimeUnit.SECONDS, factory);
|
threadPool = new ActiveMQThreadPoolExecutor(0, config.threadPoolMaxSize, 60L, TimeUnit.SECONDS, factory);
|
||||||
}
|
}
|
||||||
|
|
||||||
factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
|
factory = getThreadFactory("ActiveMQ-client-factory-flow-control-threads-");
|
||||||
@Override
|
if (config.flowControlThreadPoolMaxSize == -1) {
|
||||||
public ThreadFactory run() {
|
flowControlThreadPool = Executors.newCachedThreadPool(factory);
|
||||||
return new ActiveMQThreadFactory("ActiveMQ-client-factory-pinger-threads-" + System.identityHashCode(this), true,
|
} else {
|
||||||
ClientSessionFactoryImpl.class.getClassLoader());
|
flowControlThreadPool = new ActiveMQThreadPoolExecutor(0, config.flowControlThreadPoolMaxSize, 60L, TimeUnit.SECONDS, factory);
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
|
||||||
flowControlPool = ActiveMQClient.getFlowControlThreadPool(); //TODO add option for config;
|
factory = getThreadFactory("ActiveMQ-client-factory-pinger-threads-");
|
||||||
scheduledThreadPool = Executors.newScheduledThreadPool(config.scheduledThreadPoolMaxSize, factory);
|
scheduledThreadPool = Executors.newScheduledThreadPool(config.scheduledThreadPoolMaxSize, factory);
|
||||||
}
|
}
|
||||||
this.updateArrayActor = new Actor<>(threadPool, this::internalUpdateArray);
|
this.updateArrayActor = new Actor<>(threadPool, this::internalUpdateArray);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ThreadFactory getThreadFactory(String groupName) {
|
||||||
|
return AccessController.doPrivileged(new PrivilegedAction<>() {
|
||||||
|
@Override
|
||||||
|
public ThreadFactory run() {
|
||||||
|
return new ActiveMQThreadFactory(groupName + System.identityHashCode(this), true, ClientSessionFactoryImpl.class.getClassLoader());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized boolean setThreadPools(Executor threadPool, ScheduledExecutorService scheduledThreadPool) {
|
public synchronized boolean setThreadPools(Executor threadPool, ScheduledExecutorService scheduledThreadPool, Executor flowControlThreadPool) {
|
||||||
|
|
||||||
if (threadPool == null || scheduledThreadPool == null)
|
if (threadPool == null || scheduledThreadPool == null)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
if (this.threadPool == null && this.scheduledThreadPool == null) {
|
if (this.threadPool == null && this.scheduledThreadPool == null && this.flowControlThreadPool == null) {
|
||||||
config.useGlobalPools = false;
|
config.useGlobalPools = false;
|
||||||
shutdownPool = false;
|
shutdownPool = false;
|
||||||
this.threadPool = threadPool;
|
this.threadPool = threadPool;
|
||||||
this.scheduledThreadPool = scheduledThreadPool;
|
this.scheduledThreadPool = scheduledThreadPool;
|
||||||
|
this.flowControlThreadPool = flowControlThreadPool;
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
return false;
|
return false;
|
||||||
|
@ -632,7 +634,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
||||||
|
|
||||||
initialize();
|
initialize();
|
||||||
|
|
||||||
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, config, reconnectAttempts, threadPool, scheduledThreadPool, flowControlPool,incomingInterceptors, outgoingInterceptors);
|
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, config, reconnectAttempts, threadPool, scheduledThreadPool, flowControlThreadPool,incomingInterceptors, outgoingInterceptors);
|
||||||
|
|
||||||
addToConnecting(factory);
|
addToConnecting(factory);
|
||||||
try {
|
try {
|
||||||
|
@ -708,7 +710,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
||||||
// try each factory in the list until we find one which works
|
// try each factory in the list until we find one which works
|
||||||
|
|
||||||
try {
|
try {
|
||||||
factory = new ClientSessionFactoryImpl(this, tc, config, config.reconnectAttempts, threadPool, scheduledThreadPool, flowControlPool, incomingInterceptors, outgoingInterceptors, initialConnectors);
|
factory = new ClientSessionFactoryImpl(this, tc, config, config.reconnectAttempts, threadPool, scheduledThreadPool, flowControlThreadPool, incomingInterceptors, outgoingInterceptors, initialConnectors);
|
||||||
try {
|
try {
|
||||||
addToConnecting(factory);
|
addToConnecting(factory);
|
||||||
// We always try to connect here with only one attempt,
|
// We always try to connect here with only one attempt,
|
||||||
|
@ -1134,6 +1136,18 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getFlowControlThreadPoolMaxSize() {
|
||||||
|
return config.flowControlThreadPoolMaxSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ServerLocatorImpl setFlowControlThreadPoolMaxSize(final int flowControlThreadPoolMaxSize) {
|
||||||
|
checkWrite();
|
||||||
|
this.config.flowControlThreadPoolMaxSize = flowControlThreadPoolMaxSize;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getRetryInterval() {
|
public long getRetryInterval() {
|
||||||
return config.retryInterval;
|
return config.retryInterval;
|
||||||
|
@ -1461,6 +1475,19 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
||||||
throw new ActiveMQInterruptedException(e);
|
throw new ActiveMQInterruptedException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (flowControlThreadPool != null) {
|
||||||
|
ExecutorService executorService = (ExecutorService) flowControlThreadPool;
|
||||||
|
executorService.shutdown();
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (!executorService.awaitTermination(10000, TimeUnit.MILLISECONDS)) {
|
||||||
|
ActiveMQClientLogger.LOGGER.timedOutWaitingForTermination();
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new ActiveMQInterruptedException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
synchronized (stateGuard) {
|
synchronized (stateGuard) {
|
||||||
state = STATE.CLOSED;
|
state = STATE.CLOSED;
|
||||||
|
@ -1811,7 +1838,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
||||||
connectors = new ArrayList<>();
|
connectors = new ArrayList<>();
|
||||||
if (initialConnectors != null) {
|
if (initialConnectors != null) {
|
||||||
for (TransportConfiguration initialConnector : initialConnectors) {
|
for (TransportConfiguration initialConnector : initialConnectors) {
|
||||||
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this, initialConnector, config, config.reconnectAttempts, threadPool, scheduledThreadPool,flowControlPool, incomingInterceptors, outgoingInterceptors);
|
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this, initialConnector, config, config.reconnectAttempts, threadPool, scheduledThreadPool,flowControlThreadPool, incomingInterceptors, outgoingInterceptors);
|
||||||
|
|
||||||
connectors.add(new Connector(initialConnector, factory));
|
connectors.add(new Connector(initialConnector, factory));
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,18 +62,20 @@ public class ClientThreadPoolsTest {
|
||||||
public void testSystemPropertyThreadPoolSettings() throws Exception {
|
public void testSystemPropertyThreadPoolSettings() throws Exception {
|
||||||
int threadPoolMaxSize = 100;
|
int threadPoolMaxSize = 100;
|
||||||
int scheduledThreadPoolSize = 10;
|
int scheduledThreadPoolSize = 10;
|
||||||
|
int flowControlThreadPoolMaxSize = 99;
|
||||||
|
|
||||||
System.setProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY, "" + threadPoolMaxSize);
|
System.setProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY, "" + threadPoolMaxSize);
|
||||||
System.setProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY, "" + scheduledThreadPoolSize);
|
System.setProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY, "" + scheduledThreadPoolSize);
|
||||||
|
System.setProperty(ActiveMQClient.FLOW_CONTROL_THREAD_POOL_SIZE_PROPERTY_KEY, "" + flowControlThreadPoolMaxSize);
|
||||||
ActiveMQClient.initializeGlobalThreadPoolProperties();
|
ActiveMQClient.initializeGlobalThreadPoolProperties();
|
||||||
ActiveMQClient.clearThreadPools();
|
ActiveMQClient.clearThreadPools();
|
||||||
|
|
||||||
testSystemPropertiesThreadPoolSettings(threadPoolMaxSize, scheduledThreadPoolSize);
|
testSystemPropertiesThreadPoolSettings(threadPoolMaxSize, scheduledThreadPoolSize, flowControlThreadPoolMaxSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testShutdownPoolInUse() throws Exception {
|
public void testShutdownPoolInUse() throws Exception {
|
||||||
ActiveMQClient.setGlobalThreadPoolProperties(10, 1);
|
ActiveMQClient.setGlobalThreadPoolProperties(10, 1, 2);
|
||||||
ActiveMQClient.clearThreadPools();
|
ActiveMQClient.clearThreadPools();
|
||||||
|
|
||||||
final CountDownLatch inUse = new CountDownLatch(1);
|
final CountDownLatch inUse = new CountDownLatch(1);
|
||||||
|
@ -103,9 +105,11 @@ public class ClientThreadPoolsTest {
|
||||||
|
|
||||||
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
|
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
|
||||||
|
|
||||||
|
ThreadPoolExecutor flowControlPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
|
||||||
|
|
||||||
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory(getClass().getName()));
|
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory(getClass().getName()));
|
||||||
|
|
||||||
ActiveMQClient.injectPools(poolExecutor, scheduledThreadPoolExecutor);
|
ActiveMQClient.injectPools(poolExecutor, scheduledThreadPoolExecutor, flowControlPoolExecutor);
|
||||||
|
|
||||||
final CountDownLatch inUse = new CountDownLatch(1);
|
final CountDownLatch inUse = new CountDownLatch(1);
|
||||||
final CountDownLatch neverLeave = new CountDownLatch(1);
|
final CountDownLatch neverLeave = new CountDownLatch(1);
|
||||||
|
@ -126,6 +130,7 @@ public class ClientThreadPoolsTest {
|
||||||
Assert.assertTrue(inUse.await(10, TimeUnit.SECONDS));
|
Assert.assertTrue(inUse.await(10, TimeUnit.SECONDS));
|
||||||
poolExecutor.shutdownNow();
|
poolExecutor.shutdownNow();
|
||||||
scheduledThreadPoolExecutor.shutdownNow();
|
scheduledThreadPoolExecutor.shutdownNow();
|
||||||
|
flowControlPoolExecutor.shutdownNow();
|
||||||
Assert.assertTrue(neverLeave.await(10, TimeUnit.SECONDS));
|
Assert.assertTrue(neverLeave.await(10, TimeUnit.SECONDS));
|
||||||
|
|
||||||
Assert.assertTrue(inUse.await(10, TimeUnit.SECONDS));
|
Assert.assertTrue(inUse.await(10, TimeUnit.SECONDS));
|
||||||
|
@ -139,10 +144,11 @@ public class ClientThreadPoolsTest {
|
||||||
|
|
||||||
int testMaxSize = 999;
|
int testMaxSize = 999;
|
||||||
int testScheduleSize = 9;
|
int testScheduleSize = 9;
|
||||||
|
int testFlowControlMaxSize = 888;
|
||||||
|
|
||||||
ActiveMQClient.setGlobalThreadPoolProperties(testMaxSize, testScheduleSize);
|
ActiveMQClient.setGlobalThreadPoolProperties(testMaxSize, testScheduleSize, testFlowControlMaxSize);
|
||||||
ActiveMQClient.clearThreadPools();
|
ActiveMQClient.clearThreadPools();
|
||||||
testSystemPropertiesThreadPoolSettings(testMaxSize, testScheduleSize);
|
testSystemPropertiesThreadPoolSettings(testMaxSize, testScheduleSize, testFlowControlMaxSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -150,13 +156,14 @@ public class ClientThreadPoolsTest {
|
||||||
|
|
||||||
int testMaxSize = 2;
|
int testMaxSize = 2;
|
||||||
int testScheduleSize = 9;
|
int testScheduleSize = 9;
|
||||||
|
int testFlowControlMaxSize = 8;
|
||||||
|
|
||||||
ActiveMQClient.setGlobalThreadPoolProperties(testMaxSize, testScheduleSize);
|
ActiveMQClient.setGlobalThreadPoolProperties(testMaxSize, testScheduleSize, testFlowControlMaxSize);
|
||||||
ActiveMQClient.clearThreadPools();
|
ActiveMQClient.clearThreadPools();
|
||||||
testSystemPropertiesThreadPoolSettings(testMaxSize, testScheduleSize);
|
testSystemPropertiesThreadPoolSettings(testMaxSize, testScheduleSize, testFlowControlMaxSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testSystemPropertiesThreadPoolSettings(int expectedMax, int expectedScheduled) throws Exception {
|
private void testSystemPropertiesThreadPoolSettings(int expectedMax, int expectedScheduled, int expectedFlowControl) throws Exception {
|
||||||
ServerLocatorImpl serverLocator = new ServerLocatorImpl(false);
|
ServerLocatorImpl serverLocator = new ServerLocatorImpl(false);
|
||||||
serverLocator.isUseGlobalPools();
|
serverLocator.isUseGlobalPools();
|
||||||
|
|
||||||
|
@ -167,9 +174,11 @@ public class ClientThreadPoolsTest {
|
||||||
// TODO: I would get this from the ActiveMQClient
|
// TODO: I would get this from the ActiveMQClient
|
||||||
Field threadPoolField = ServerLocatorImpl.class.getDeclaredField("threadPool");
|
Field threadPoolField = ServerLocatorImpl.class.getDeclaredField("threadPool");
|
||||||
Field scheduledThreadPoolField = ServerLocatorImpl.class.getDeclaredField("scheduledThreadPool");
|
Field scheduledThreadPoolField = ServerLocatorImpl.class.getDeclaredField("scheduledThreadPool");
|
||||||
|
Field flowControlThreadPoolField = ServerLocatorImpl.class.getDeclaredField("flowControlThreadPool");
|
||||||
|
|
||||||
threadPoolField.setAccessible(true);
|
threadPoolField.setAccessible(true);
|
||||||
scheduledThreadPoolField.setAccessible(true);
|
scheduledThreadPoolField.setAccessible(true);
|
||||||
|
flowControlThreadPoolField.setAccessible(true);
|
||||||
|
|
||||||
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) ActiveMQClient.getGlobalThreadPool();
|
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) ActiveMQClient.getGlobalThreadPool();
|
||||||
|
|
||||||
|
@ -210,9 +219,11 @@ public class ClientThreadPoolsTest {
|
||||||
Assert.assertTrue(latchTotal.await(5, TimeUnit.SECONDS));
|
Assert.assertTrue(latchTotal.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
ScheduledThreadPoolExecutor scheduledThreadPool = (ScheduledThreadPoolExecutor) scheduledThreadPoolField.get(serverLocator);
|
ScheduledThreadPoolExecutor scheduledThreadPool = (ScheduledThreadPoolExecutor) scheduledThreadPoolField.get(serverLocator);
|
||||||
|
ThreadPoolExecutor flowControlThreadPool = (ThreadPoolExecutor) flowControlThreadPoolField.get(serverLocator);
|
||||||
|
|
||||||
assertEquals(expectedMax, threadPool.getMaximumPoolSize());
|
assertEquals(expectedMax, threadPool.getMaximumPoolSize());
|
||||||
assertEquals(expectedScheduled, scheduledThreadPool.getCorePoolSize());
|
assertEquals(expectedScheduled, scheduledThreadPool.getCorePoolSize());
|
||||||
|
assertEquals(expectedFlowControl, flowControlThreadPool.getMaximumPoolSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -222,21 +233,26 @@ public class ClientThreadPoolsTest {
|
||||||
|
|
||||||
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
|
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
|
||||||
ScheduledThreadPoolExecutor scheduledThreadPool = new ScheduledThreadPoolExecutor(1);
|
ScheduledThreadPoolExecutor scheduledThreadPool = new ScheduledThreadPoolExecutor(1);
|
||||||
serverLocator.setThreadPools(threadPool, scheduledThreadPool);
|
ThreadPoolExecutor flowControlThreadPool = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
|
||||||
|
serverLocator.setThreadPools(threadPool, scheduledThreadPool, flowControlThreadPool);
|
||||||
|
|
||||||
Field threadPoolField = ServerLocatorImpl.class.getDeclaredField("threadPool");
|
Field threadPoolField = ServerLocatorImpl.class.getDeclaredField("threadPool");
|
||||||
Field scheduledThreadPoolField = ServerLocatorImpl.class.getDeclaredField("scheduledThreadPool");
|
Field scheduledThreadPoolField = ServerLocatorImpl.class.getDeclaredField("scheduledThreadPool");
|
||||||
|
Field flowControlThreadPoolField = ServerLocatorImpl.class.getDeclaredField("flowControlThreadPool");
|
||||||
|
|
||||||
serverLocator.initialize();
|
serverLocator.initialize();
|
||||||
|
|
||||||
threadPoolField.setAccessible(true);
|
threadPoolField.setAccessible(true);
|
||||||
scheduledThreadPoolField.setAccessible(true);
|
scheduledThreadPoolField.setAccessible(true);
|
||||||
|
flowControlThreadPoolField.setAccessible(true);
|
||||||
|
|
||||||
ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadPoolField.get(serverLocator);
|
ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadPoolField.get(serverLocator);
|
||||||
ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) scheduledThreadPoolField.get(serverLocator);
|
ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) scheduledThreadPoolField.get(serverLocator);
|
||||||
|
ThreadPoolExecutor fctpe = (ThreadPoolExecutor) flowControlThreadPoolField.get(serverLocator);
|
||||||
|
|
||||||
assertEquals(threadPool, tpe);
|
assertEquals(threadPool, tpe);
|
||||||
assertEquals(scheduledThreadPool, stpe);
|
assertEquals(scheduledThreadPool, stpe);
|
||||||
|
assertEquals(flowControlThreadPool, fctpe);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
|
|
@ -237,7 +237,7 @@ public class ClusterController implements ActiveMQComponent {
|
||||||
serverLocator.setMaxRetryInterval(config.getMaxRetryInterval());
|
serverLocator.setMaxRetryInterval(config.getMaxRetryInterval());
|
||||||
//this is used for replication so need to use the server packet decoder
|
//this is used for replication so need to use the server packet decoder
|
||||||
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator, server.getStorageManager()));
|
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator, server.getStorageManager()));
|
||||||
serverLocator.setThreadPools(server.getThreadPool(), server.getScheduledPool());
|
serverLocator.setThreadPools(server.getThreadPool(), server.getScheduledPool(), server.getThreadPool());
|
||||||
if (connector != null) {
|
if (connector != null) {
|
||||||
serverLocator.setClusterTransportConfiguration(connector);
|
serverLocator.setClusterTransportConfiguration(connector);
|
||||||
}
|
}
|
||||||
|
|
|
@ -157,7 +157,7 @@ public class BackupActivationNoReconnectTest {
|
||||||
final ServerLocatorConfig locatorConfig = Mockito.mock(ServerLocatorConfig.class);
|
final ServerLocatorConfig locatorConfig = Mockito.mock(ServerLocatorConfig.class);
|
||||||
final int reconnectAttempts = 1;
|
final int reconnectAttempts = 1;
|
||||||
final Executor threadPool = Mockito.mock(Executor.class);
|
final Executor threadPool = Mockito.mock(Executor.class);
|
||||||
final Executor flowControlPool = Mockito.mock(Executor.class);
|
final Executor flowControlThreadPool = Mockito.mock(Executor.class);
|
||||||
final ScheduledExecutorService scheduledThreadPool = Mockito.mock(ScheduledExecutorService.class);
|
final ScheduledExecutorService scheduledThreadPool = Mockito.mock(ScheduledExecutorService.class);
|
||||||
final ClientProtocolManager clientProtocolManager = Mockito.mock(ClientProtocolManager.class);
|
final ClientProtocolManager clientProtocolManager = Mockito.mock(ClientProtocolManager.class);
|
||||||
when(serverLocator.newProtocolManager()).thenReturn(clientProtocolManager);
|
when(serverLocator.newProtocolManager()).thenReturn(clientProtocolManager);
|
||||||
|
@ -165,7 +165,7 @@ public class BackupActivationNoReconnectTest {
|
||||||
Map<String, Object> urlParams = new HashMap<>();
|
Map<String, Object> urlParams = new HashMap<>();
|
||||||
urlParams.put("port", serverSocket.getLocalPort());
|
urlParams.put("port", serverSocket.getLocalPort());
|
||||||
when(connectorConfig.getCombinedParams()).thenReturn(urlParams);
|
when(connectorConfig.getCombinedParams()).thenReturn(urlParams);
|
||||||
ClientSessionFactoryImpl sessionFactory = new ClientSessionFactoryImpl(serverLocator, connectorConfig, locatorConfig, reconnectAttempts, threadPool, scheduledThreadPool, flowControlPool,null, null);
|
ClientSessionFactoryImpl sessionFactory = new ClientSessionFactoryImpl(serverLocator, connectorConfig, locatorConfig, reconnectAttempts, threadPool, scheduledThreadPool, flowControlThreadPool,null, null);
|
||||||
when(clusterControl.getSessionFactory()).thenReturn(sessionFactory);
|
when(clusterControl.getSessionFactory()).thenReturn(sessionFactory);
|
||||||
when(clientProtocolManager.isAlive()).thenReturn(true);
|
when(clientProtocolManager.isAlive()).thenReturn(true);
|
||||||
|
|
||||||
|
|
|
@ -1,215 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.artemis.tests.integration.client;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
|
||||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
|
||||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
|
||||||
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
|
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.lang.invoke.MethodHandles;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test for slow consumer handling getting influenced by thread pool exhaustion.
|
|
||||||
* See ARTEMIS-4522.
|
|
||||||
* <p/>
|
|
||||||
* Test is easily reproducing the issue described in ARTEMIS-4522 with settings of 2000 {@link #NUM_OF_MESSAGES}, 100 {@link #CONSUMER_COUNT} and 50 {@link #THREAD_POOL_SIZE_CLIENT} on an 8 core machine.
|
|
||||||
* Fewer messages and fewer consumers make the probability lower for the issue to appear.
|
|
||||||
* More client-threads make the issue less likely to appear.
|
|
||||||
* Theory: {@link #THREAD_POOL_SIZE_CLIENT} being larger than {@link #CONSUMER_COUNT} will make the issue impossible to appear.
|
|
||||||
* <p/>
|
|
||||||
* With 2000 messages, the test usually runs into the issue of ClientConsumerImpl#startSlowConsumer reaching its 10-second timeout (or taking significant long between 1 and 10 seconds)
|
|
||||||
* on pendingFlowControl#await after about 1200-1500 messages.
|
|
||||||
* Visible in log as 10 second pause before next bulk of messages get processed.
|
|
||||||
*/
|
|
||||||
public class ConsumerSlowConsumerTest extends ActiveMQTestBase {
|
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
|
||||||
private final SimpleString QUEUE = new SimpleString("SlowConsumerTestQueue");
|
|
||||||
|
|
||||||
private ServerLocator locator;
|
|
||||||
|
|
||||||
private static final int WINDOW_SIZE = 0;
|
|
||||||
|
|
||||||
private static final int NUM_OF_MESSAGES = 2_000;
|
|
||||||
private static final int CONSUMER_COUNT = 500;
|
|
||||||
private static final int THREAD_POOL_SIZE_CLIENT = 5;
|
|
||||||
|
|
||||||
private volatile boolean stopped = false;
|
|
||||||
|
|
||||||
private final Set<ClientConsumerImpl> consumers = ConcurrentHashMap.newKeySet();
|
|
||||||
private final Set<ClientSession> sessions = ConcurrentHashMap.newKeySet();
|
|
||||||
|
|
||||||
|
|
||||||
protected boolean isNetty() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@Before
|
|
||||||
public void setUp() throws Exception {
|
|
||||||
super.setUp();
|
|
||||||
|
|
||||||
locator = createFactory(isNetty());
|
|
||||||
locator.setConsumerWindowSize(WINDOW_SIZE);
|
|
||||||
locator.setUseGlobalPools(false);
|
|
||||||
locator.setThreadPoolMaxSize(THREAD_POOL_SIZE_CLIENT);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testSlowConsumer() throws Exception {
|
|
||||||
ActiveMQServer messagingService = createServer(false, isNetty());
|
|
||||||
|
|
||||||
messagingService.start();
|
|
||||||
messagingService.createQueue(new QueueConfiguration(QUEUE).setRoutingType(RoutingType.ANYCAST));
|
|
||||||
|
|
||||||
ClientSessionFactory cf = createSessionFactory(locator);
|
|
||||||
|
|
||||||
AtomicInteger sentMessages = new AtomicInteger(0);
|
|
||||||
sendMessages(cf, sentMessages);
|
|
||||||
|
|
||||||
AtomicInteger receivedMessages = new AtomicInteger(0);
|
|
||||||
createConsumers(cf, receivedMessages);
|
|
||||||
|
|
||||||
final long startTime = System.currentTimeMillis();
|
|
||||||
// allow for duration of 5ms per message (neglecting concurrency, which makes it even a lot faster)
|
|
||||||
// typical runtime for 1000 messages 100 consumers and 50 threads without issues is 160ms, so deadline is very generous
|
|
||||||
final long deadLine = System.currentTimeMillis() + sentMessages.get() * 5L;
|
|
||||||
int counter = 0;
|
|
||||||
while (receivedMessages.get() < sentMessages.get() && System.currentTimeMillis() < deadLine) {
|
|
||||||
Thread.sleep(5);
|
|
||||||
counter++;
|
|
||||||
if (counter % 1000 == 0) {
|
|
||||||
logger.info("Waiting for " + (sentMessages.get() - receivedMessages.get()) + " more messages...");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
final long endTime = System.currentTimeMillis();
|
|
||||||
stopped = true; // signal stop to potentially still running consumer-creation thread
|
|
||||||
|
|
||||||
// check amount of sent and received messages
|
|
||||||
if (receivedMessages.get() < sentMessages.get()) {
|
|
||||||
logger.error("Received only " + receivedMessages.get() + " messages out of " + sentMessages.get());
|
|
||||||
} else {
|
|
||||||
logger.info("Received all " + receivedMessages.get() + " messages");
|
|
||||||
}
|
|
||||||
assertEquals(sentMessages.get(), receivedMessages.get());
|
|
||||||
|
|
||||||
final long duration = endTime - startTime;
|
|
||||||
logger.info("Test took " + duration + " ms");
|
|
||||||
|
|
||||||
long expectedDuration = NUM_OF_MESSAGES * 10;
|
|
||||||
|
|
||||||
assertTrue("Test took " + duration + " ms, expected " + expectedDuration + " ms", duration < expectedDuration);
|
|
||||||
|
|
||||||
cleanup(cf, messagingService);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void cleanup(ClientSessionFactory cf, ActiveMQServer messagingService) throws Exception {
|
|
||||||
consumers.parallelStream().forEach(c -> {
|
|
||||||
try {
|
|
||||||
c.close();
|
|
||||||
} catch (ActiveMQException e) {
|
|
||||||
//ignore
|
|
||||||
}
|
|
||||||
});
|
|
||||||
logger.info("Closed " + consumers.size() + " consumers");
|
|
||||||
sessions.parallelStream().forEach(s -> {
|
|
||||||
try {
|
|
||||||
s.close();
|
|
||||||
} catch (ActiveMQException e) {
|
|
||||||
//ignore
|
|
||||||
}
|
|
||||||
});
|
|
||||||
logger.info("Closed " + sessions.size() + " sessions");
|
|
||||||
|
|
||||||
cf.close();
|
|
||||||
messagingService.stop();
|
|
||||||
|
|
||||||
logger.info("Cleaned up.");
|
|
||||||
}
|
|
||||||
|
|
||||||
private void sendMessages(ClientSessionFactory cf, AtomicInteger sentMessages) throws ActiveMQException {
|
|
||||||
logger.info("Creating " + NUM_OF_MESSAGES + " messages...");
|
|
||||||
|
|
||||||
try (ClientSession sendingSession = cf.createSession(false, true, true);
|
|
||||||
ClientProducer producer = sendingSession.createProducer(QUEUE)) {
|
|
||||||
for (int i = 0; i < NUM_OF_MESSAGES; i++) {
|
|
||||||
ClientMessage message = createTextMessage(sendingSession, "m" + i);
|
|
||||||
producer.send(message);
|
|
||||||
sentMessages.incrementAndGet();
|
|
||||||
}
|
|
||||||
logger.info("Created " + NUM_OF_MESSAGES + " messages");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void createConsumers(ClientSessionFactory cf, AtomicInteger receivedMessages) throws ActiveMQException {
|
|
||||||
Thread consumerCreator = new Thread() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
logger.info("Creating " + CONSUMER_COUNT + " consumers...");
|
|
||||||
try {
|
|
||||||
for (int i = 0; i < CONSUMER_COUNT; i++) {
|
|
||||||
if (stopped) {
|
|
||||||
logger.info("Stopping consumer creation, since test has ended already. Created " + i + " out of " + CONSUMER_COUNT + " consumers so far.");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
ClientSession session = cf.createSession(false, true, true);
|
|
||||||
ClientConsumerImpl consumer = (ClientConsumerImpl) session.createConsumer(QUEUE);
|
|
||||||
sessions.add(session);
|
|
||||||
consumers.add(consumer);
|
|
||||||
|
|
||||||
assertEquals(WINDOW_SIZE == 0 ? 0 : WINDOW_SIZE / 2, consumer.getClientWindowSize());
|
|
||||||
String consumerId = "[" + i + "]";
|
|
||||||
consumer.setMessageHandler(message -> {
|
|
||||||
Thread.yield(); // simulate processing and yield to other threads
|
|
||||||
receivedMessages.incrementAndGet();
|
|
||||||
//instanceLog.info(consumerId + "\t- Received message: " + message.getMessageID());
|
|
||||||
});
|
|
||||||
|
|
||||||
session.start();
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info("Created all " + CONSUMER_COUNT + " consumers.");
|
|
||||||
} catch (Exception ex) {
|
|
||||||
logger.error("Error creating consumers!", ex);
|
|
||||||
//fail("Error creating consumers!");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
consumerCreator.start();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -71,18 +71,21 @@ public class CoreClientTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
ExecutorService threadPool = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName()));
|
ExecutorService threadPool = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName()));
|
||||||
ScheduledThreadPoolExecutor scheduledThreadPool = new ScheduledThreadPoolExecutor(10);
|
ScheduledThreadPoolExecutor scheduledThreadPool = new ScheduledThreadPoolExecutor(10);
|
||||||
|
ExecutorService flowControlThreadPool = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName()));
|
||||||
|
|
||||||
ServerLocator locator = createNonHALocator(false);
|
ServerLocator locator = createNonHALocator(false);
|
||||||
boolean setThreadPools = locator.setThreadPools(threadPool, scheduledThreadPool);
|
boolean setThreadPools = locator.setThreadPools(threadPool, scheduledThreadPool, flowControlThreadPool);
|
||||||
|
|
||||||
assertTrue(setThreadPools);
|
assertTrue(setThreadPools);
|
||||||
testCoreClient(true, locator);
|
testCoreClient(true, locator);
|
||||||
|
|
||||||
threadPool.shutdown();
|
threadPool.shutdown();
|
||||||
scheduledThreadPool.shutdown();
|
scheduledThreadPool.shutdown();
|
||||||
|
flowControlThreadPool.shutdown();
|
||||||
|
|
||||||
threadPool.awaitTermination(60, TimeUnit.SECONDS);
|
threadPool.awaitTermination(60, TimeUnit.SECONDS);
|
||||||
scheduledThreadPool.awaitTermination(60, TimeUnit.SECONDS);
|
scheduledThreadPool.awaitTermination(60, TimeUnit.SECONDS);
|
||||||
|
flowControlThreadPool.awaitTermination(60, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -90,15 +93,16 @@ public class CoreClientTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
int originalScheduled = ActiveMQClient.getGlobalScheduledThreadPoolSize();
|
int originalScheduled = ActiveMQClient.getGlobalScheduledThreadPoolSize();
|
||||||
int originalGlobal = ActiveMQClient.getGlobalThreadPoolSize();
|
int originalGlobal = ActiveMQClient.getGlobalThreadPoolSize();
|
||||||
|
int originalFlowControl = ActiveMQClient.getGlobalFlowControlThreadPoolSize();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
ActiveMQClient.setGlobalThreadPoolProperties(2, 1);
|
ActiveMQClient.setGlobalThreadPoolProperties(2, 1, 3);
|
||||||
ActiveMQClient.clearThreadPools();
|
ActiveMQClient.clearThreadPools();
|
||||||
ServerLocator locator = createNonHALocator(false);
|
ServerLocator locator = createNonHALocator(false);
|
||||||
testCoreClient(true, locator);
|
testCoreClient(true, locator);
|
||||||
} finally {
|
} finally {
|
||||||
// restoring original value otherwise future tests would be screwed up
|
// restoring original value otherwise future tests would be screwed up
|
||||||
ActiveMQClient.setGlobalThreadPoolProperties(originalGlobal, originalScheduled);
|
ActiveMQClient.setGlobalThreadPoolProperties(originalGlobal, originalScheduled, originalFlowControl);
|
||||||
ActiveMQClient.clearThreadPools();
|
ActiveMQClient.clearThreadPools();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.tests.integration.client;
|
package org.apache.activemq.artemis.tests.integration.client;
|
||||||
|
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -52,7 +53,6 @@ import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import java.lang.invoke.MethodHandles;
|
|
||||||
|
|
||||||
import static org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_DAY;
|
import static org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_DAY;
|
||||||
import static org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_MINUTE;
|
import static org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_MINUTE;
|
||||||
|
@ -98,6 +98,43 @@ public class SlowConsumerTest extends ActiveMQTestBase {
|
||||||
locator = createFactory(isNetty);
|
locator = createFactory(isNetty);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSlowConsumerWithSmallThreadPool() throws Exception {
|
||||||
|
final int MESSAGE_COUNT = 2;
|
||||||
|
CountDownLatch latch = new CountDownLatch(MESSAGE_COUNT);
|
||||||
|
server.createQueue(new QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST));
|
||||||
|
/*
|
||||||
|
* Even though the threadPoolMaxSize is 1 the client shouldn't stall on flow control due to the independent flow
|
||||||
|
* control thread pool.
|
||||||
|
*/
|
||||||
|
ServerLocator locator = createInVMNonHALocator()
|
||||||
|
.setConsumerWindowSize(0)
|
||||||
|
.setUseGlobalPools(false)
|
||||||
|
.setThreadPoolMaxSize(1);
|
||||||
|
|
||||||
|
ClientSessionFactory cf = createSessionFactory(locator);
|
||||||
|
try (ClientSession session = cf.createSession(true, true);
|
||||||
|
ClientProducer producer = session.createProducer(getName())) {
|
||||||
|
for (int i = 0; i < MESSAGE_COUNT; i++) {
|
||||||
|
producer.send(session.createMessage(true));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try (ClientSession session = cf.createSession(true, true);
|
||||||
|
ClientConsumer consumer = session.createConsumer(getName())) {
|
||||||
|
consumer.setMessageHandler((m) -> {
|
||||||
|
latch.countDown();
|
||||||
|
try {
|
||||||
|
m.acknowledge();
|
||||||
|
} catch (ActiveMQException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
session.start();
|
||||||
|
assertTrue("All messages should be received within the timeout", latch.await(1, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
assertEquals(0, server.locateQueue(getName()).getMessageCount());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSlowConsumerKilled() throws Exception {
|
public void testSlowConsumerKilled() throws Exception {
|
||||||
ClientSessionFactory sf = createSessionFactory(locator);
|
ClientSessionFactory sf = createSessionFactory(locator);
|
||||||
|
|
Loading…
Reference in New Issue