From ce5d2a9e9e075aad14c2a7fe1a19473f3419bb40 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 4 Sep 2013 14:16:51 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-4205 Allow for tuning to thread pool keep alive times as system properties. --- .../org/apache/activemq/thread/TaskRunnerFactory.java | 11 ++++++++--- .../activemq/transport/AbstractInactivityMonitor.java | 8 +++++--- .../activemq/transport/nio/SelectorManager.java | 9 +++++++-- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java b/activemq-client/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java index 1a0358c12b..86f4f3fac0 100755 --- a/activemq-client/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java +++ b/activemq-client/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java @@ -47,10 +47,10 @@ public class TaskRunnerFactory implements Executor { private String name; private int priority; private boolean daemon; - private AtomicLong id = new AtomicLong(0); + private final AtomicLong id = new AtomicLong(0); private boolean dedicatedTaskRunner; private long shutdownAwaitTermination = 30000; - private AtomicBoolean initDone = new AtomicBoolean(false); + private final AtomicBoolean initDone = new AtomicBoolean(false); private int maxThreadPoolSize = Integer.MAX_VALUE; private RejectedExecutionHandler rejectedTaskHandler = null; @@ -140,6 +140,7 @@ public class TaskRunnerFactory implements Executor { } } + @Override public void execute(Runnable runnable) { execute(runnable, name); } @@ -164,7 +165,8 @@ public class TaskRunnerFactory implements Executor { } protected ExecutorService createDefaultExecutor() { - ThreadPoolExecutor rc = new ThreadPoolExecutor(0, getMaxThreadPoolSize(), 30, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { + ThreadPoolExecutor rc = new ThreadPoolExecutor(0, getMaxThreadPoolSize(), getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { + @Override public Thread newThread(Runnable runnable) { String threadName = name + "-" + id.incrementAndGet(); Thread thread = new Thread(runnable, threadName); @@ -253,4 +255,7 @@ public class TaskRunnerFactory implements Executor { this.shutdownAwaitTermination = shutdownAwaitTermination; } + private static int getDefaultKeepAliveTime() { + return Integer.getInteger("org.apache.activemq.thread.TaskRunnerFactory.keepAliveTime", 30); + } } diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java b/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java index d64f35e76c..a7976451e4 100644 --- a/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java @@ -444,10 +444,12 @@ public abstract class AbstractInactivityMonitor extends TransportFilter { }; private ThreadPoolExecutor createExecutor() { - // TODO: This value of 10 seconds seems to low, see discussion at - // http://activemq.2283324.n4.nabble.com/InactivityMonitor-Creating-too-frequent-threads-tp4656752.html;cid=1348142445209-351 - ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), factory); + ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue(), factory); exec.allowCoreThreadTimeOut(true); return exec; } + + private static int getDefaultKeepAliveTime() { + return Integer.getInteger("org.apache.activemq.transport.AbstractInactivityMonitor.keepAliveTime", 30); + } } diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java b/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java index c7049193dd..9be523120b 100644 --- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java @@ -39,14 +39,15 @@ public final class SelectorManager { private Executor selectorExecutor = createDefaultExecutor(); private Executor channelExecutor = selectorExecutor; - private LinkedList freeWorkers = new LinkedList(); + private final LinkedList freeWorkers = new LinkedList(); private int maxChannelsPerWorker = 1024; protected ExecutorService createDefaultExecutor() { - ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { + ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { private long i = 0; + @Override public Thread newThread(Runnable runnable) { this.i++; final Thread t = new Thread(runnable, "ActiveMQ NIO Worker " + this.i); @@ -57,6 +58,10 @@ public final class SelectorManager { return rc; } + private static int getDefaultKeepAliveTime() { + return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.keepAliveTime", 30); + } + public static SelectorManager getInstance() { return SINGLETON; }