From c33231bd439f02e8651d28f16ad273fa5eed384d Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Fri, 20 Jul 2012 14:09:21 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3885 - limiting number of threads used by session executor and provide a way to set custom task runner factory git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1363790 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/ActiveMQConnection.java | 13 +++++++++++- .../activemq/ActiveMQConnectionFactory.java | 21 +++++++++++++++++++ .../activemq/thread/TaskRunnerFactory.java | 17 ++++++++++++++- 3 files changed, 49 insertions(+), 2 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index 20ddf7732c..0eefd8e25e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -110,6 +110,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER; public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; + public static int DEFAULT_THREAD_POOL_SIZE = 1000; private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class); @@ -200,6 +201,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private boolean transactedIndividualAck = false; private boolean nonBlockingRedelivery = false; + private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE; + /** * Construct an ActiveMQConnection * @@ -978,7 +981,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon public TaskRunnerFactory getSessionTaskRunner() { synchronized (this) { if (sessionTaskRunner == null) { - sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner()); + sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize); } } return sessionTaskRunner; @@ -2568,4 +2571,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon public RedeliveryPolicyMap getRedeliveryPolicyMap() { return redeliveryPolicyMap; } + + public int getMaxThreadPoolSize() { + return maxThreadPoolSize; + } + + public void setMaxThreadPoolSize(int maxThreadPoolSize) { + this.maxThreadPoolSize = maxThreadPoolSize; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java index 19802ac197..9f1ae372ce 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java @@ -41,6 +41,7 @@ import org.apache.activemq.jndi.JNDIBaseStorable; import org.apache.activemq.management.JMSStatsImpl; import org.apache.activemq.management.StatsCapable; import org.apache.activemq.management.StatsImpl; +import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportListener; @@ -67,6 +68,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne public static final String DEFAULT_PASSWORD = null; public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0; + protected static final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() { public Thread newThread(Runnable run) { Thread thread = new Thread(run); @@ -127,6 +129,8 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne private boolean messagePrioritySupported = true; private boolean transactedIndividualAck = false; private boolean nonBlockingRedelivery = false; + private int maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE; + private TaskRunnerFactory sessionTaskRunner; // ///////////////////////////////////////////// // @@ -338,6 +342,8 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne connection.setMessagePrioritySupported(isMessagePrioritySupported()); connection.setTransactedIndividualAck(isTransactedIndividualAck()); connection.setNonBlockingRedelivery(isNonBlockingRedelivery()); + connection.setMaxThreadPoolSize(getMaxThreadPoolSize()); + connection.setSessionTaskRunner(getSessionTaskRunner()); if (transportListener != null) { connection.addTransportListener(transportListener); } @@ -1095,4 +1101,19 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne this.nonBlockingRedelivery = nonBlockingRedelivery; } + public int getMaxThreadPoolSize() { + return maxThreadPoolSize; + } + + public void setMaxThreadPoolSize(int maxThreadPoolSize) { + this.maxThreadPoolSize = maxThreadPoolSize; + } + + public TaskRunnerFactory getSessionTaskRunner() { + return sessionTaskRunner; + } + + public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) { + this.sessionTaskRunner = sessionTaskRunner; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java b/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java index 486ac0551c..285ee8c02a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java @@ -18,6 +18,7 @@ package org.apache.activemq.thread; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -44,6 +45,7 @@ public class TaskRunnerFactory implements Executor { private AtomicLong id = new AtomicLong(0); private boolean dedicatedTaskRunner; private AtomicBoolean initDone = new AtomicBoolean(false); + private int maxThreadPoolSize = Integer.MAX_VALUE; public TaskRunnerFactory() { this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000); @@ -54,11 +56,16 @@ public class TaskRunnerFactory implements Executor { } public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner) { + this(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner, Integer.MAX_VALUE); + } + + public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner, int maxThreadPoolSize) { this.name = name; this.priority = priority; this.daemon = daemon; this.maxIterationsPerRun = maxIterationsPerRun; this.dedicatedTaskRunner = dedicatedTaskRunner; + this.maxThreadPoolSize = maxThreadPoolSize; } public void init() { @@ -103,7 +110,7 @@ public class TaskRunnerFactory implements Executor { } protected ExecutorService createDefaultExecutor() { - ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { + ThreadPoolExecutor rc = new ThreadPoolExecutor(0, getMaxThreadPoolSize(), 30, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable, name + "-" + id.incrementAndGet()); thread.setDaemon(daemon); @@ -161,4 +168,12 @@ public class TaskRunnerFactory implements Executor { public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) { this.dedicatedTaskRunner = dedicatedTaskRunner; } + + public int getMaxThreadPoolSize() { + return maxThreadPoolSize; + } + + public void setMaxThreadPoolSize(int maxThreadPoolSize) { + this.maxThreadPoolSize = maxThreadPoolSize; + } }