mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3885 - limiting number of threads used by session executor and provide a way to set custom task runner factory
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1363790 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d301b41638
commit
c33231bd43
|
@ -110,6 +110,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
|||
public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
|
||||
public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
|
||||
public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
|
||||
public static int DEFAULT_THREAD_POOL_SIZE = 1000;
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
|
||||
|
||||
|
@ -200,6 +201,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
|||
private boolean transactedIndividualAck = false;
|
||||
private boolean nonBlockingRedelivery = false;
|
||||
|
||||
private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;
|
||||
|
||||
/**
|
||||
* Construct an <code>ActiveMQConnection</code>
|
||||
*
|
||||
|
@ -978,7 +981,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
|||
public TaskRunnerFactory getSessionTaskRunner() {
|
||||
synchronized (this) {
|
||||
if (sessionTaskRunner == null) {
|
||||
sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner());
|
||||
sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize);
|
||||
}
|
||||
}
|
||||
return sessionTaskRunner;
|
||||
|
@ -2568,4 +2571,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
|||
public RedeliveryPolicyMap getRedeliveryPolicyMap() {
|
||||
return redeliveryPolicyMap;
|
||||
}
|
||||
|
||||
public int getMaxThreadPoolSize() {
|
||||
return maxThreadPoolSize;
|
||||
}
|
||||
|
||||
public void setMaxThreadPoolSize(int maxThreadPoolSize) {
|
||||
this.maxThreadPoolSize = maxThreadPoolSize;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.activemq.jndi.JNDIBaseStorable;
|
|||
import org.apache.activemq.management.JMSStatsImpl;
|
||||
import org.apache.activemq.management.StatsCapable;
|
||||
import org.apache.activemq.management.StatsImpl;
|
||||
import org.apache.activemq.thread.TaskRunnerFactory;
|
||||
import org.apache.activemq.transport.Transport;
|
||||
import org.apache.activemq.transport.TransportFactory;
|
||||
import org.apache.activemq.transport.TransportListener;
|
||||
|
@ -67,6 +68,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
|||
public static final String DEFAULT_PASSWORD = null;
|
||||
public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0;
|
||||
|
||||
|
||||
protected static final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
|
||||
public Thread newThread(Runnable run) {
|
||||
Thread thread = new Thread(run);
|
||||
|
@ -127,6 +129,8 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
|||
private boolean messagePrioritySupported = true;
|
||||
private boolean transactedIndividualAck = false;
|
||||
private boolean nonBlockingRedelivery = false;
|
||||
private int maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE;
|
||||
private TaskRunnerFactory sessionTaskRunner;
|
||||
|
||||
// /////////////////////////////////////////////
|
||||
//
|
||||
|
@ -338,6 +342,8 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
|||
connection.setMessagePrioritySupported(isMessagePrioritySupported());
|
||||
connection.setTransactedIndividualAck(isTransactedIndividualAck());
|
||||
connection.setNonBlockingRedelivery(isNonBlockingRedelivery());
|
||||
connection.setMaxThreadPoolSize(getMaxThreadPoolSize());
|
||||
connection.setSessionTaskRunner(getSessionTaskRunner());
|
||||
if (transportListener != null) {
|
||||
connection.addTransportListener(transportListener);
|
||||
}
|
||||
|
@ -1095,4 +1101,19 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
|||
this.nonBlockingRedelivery = nonBlockingRedelivery;
|
||||
}
|
||||
|
||||
public int getMaxThreadPoolSize() {
|
||||
return maxThreadPoolSize;
|
||||
}
|
||||
|
||||
public void setMaxThreadPoolSize(int maxThreadPoolSize) {
|
||||
this.maxThreadPoolSize = maxThreadPoolSize;
|
||||
}
|
||||
|
||||
public TaskRunnerFactory getSessionTaskRunner() {
|
||||
return sessionTaskRunner;
|
||||
}
|
||||
|
||||
public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
|
||||
this.sessionTaskRunner = sessionTaskRunner;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.thread;
|
|||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
@ -44,6 +45,7 @@ public class TaskRunnerFactory implements Executor {
|
|||
private AtomicLong id = new AtomicLong(0);
|
||||
private boolean dedicatedTaskRunner;
|
||||
private AtomicBoolean initDone = new AtomicBoolean(false);
|
||||
private int maxThreadPoolSize = Integer.MAX_VALUE;
|
||||
|
||||
public TaskRunnerFactory() {
|
||||
this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000);
|
||||
|
@ -54,11 +56,16 @@ public class TaskRunnerFactory implements Executor {
|
|||
}
|
||||
|
||||
public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner) {
|
||||
this(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner, Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner, int maxThreadPoolSize) {
|
||||
this.name = name;
|
||||
this.priority = priority;
|
||||
this.daemon = daemon;
|
||||
this.maxIterationsPerRun = maxIterationsPerRun;
|
||||
this.dedicatedTaskRunner = dedicatedTaskRunner;
|
||||
this.maxThreadPoolSize = maxThreadPoolSize;
|
||||
}
|
||||
|
||||
public void init() {
|
||||
|
@ -103,7 +110,7 @@ public class TaskRunnerFactory implements Executor {
|
|||
}
|
||||
|
||||
protected ExecutorService createDefaultExecutor() {
|
||||
ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
|
||||
ThreadPoolExecutor rc = new ThreadPoolExecutor(0, getMaxThreadPoolSize(), 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
|
||||
public Thread newThread(Runnable runnable) {
|
||||
Thread thread = new Thread(runnable, name + "-" + id.incrementAndGet());
|
||||
thread.setDaemon(daemon);
|
||||
|
@ -161,4 +168,12 @@ public class TaskRunnerFactory implements Executor {
|
|||
public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
|
||||
this.dedicatedTaskRunner = dedicatedTaskRunner;
|
||||
}
|
||||
|
||||
public int getMaxThreadPoolSize() {
|
||||
return maxThreadPoolSize;
|
||||
}
|
||||
|
||||
public void setMaxThreadPoolSize(int maxThreadPoolSize) {
|
||||
this.maxThreadPoolSize = maxThreadPoolSize;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue