From 5cd9ebaeb7f82e2828e6ccf0931b12aab51dddab Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Wed, 21 Mar 2012 19:55:30 +0000 Subject: [PATCH] fis for: https://issues.apache.org/jira/browse/AMQ-3718 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1303544 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/BrokerService.java | 38 +++++++++++++++---- .../activemq/thread/TaskRunnerFactory.java | 9 ++--- .../transport/nio/SelectorManager.java | 25 ++++++------ 3 files changed, 47 insertions(+), 25 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index 64175f2b26..523ba1c65d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -31,6 +31,8 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -2371,13 +2373,35 @@ public class BrokerService implements Service { protected synchronized ThreadPoolExecutor getExecutor() { if (this.executor == null) { - this.executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "Usage Async Task"); - thread.setDaemon(true); - return thread; - } - }); + this.executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { + + private long i = 0; + + @Override + public Thread newThread(Runnable runnable) { + this.i++; + Thread thread = new Thread(runnable, "BrokerService.worker." + this.i); + thread.setDaemon(true); + thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(final Thread t, final Throwable e) { + LOG.error("Error in thread '{}'", t.getName(), e); + } + }); + return thread; + } + }, new RejectedExecutionHandler() { + @Override + public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { + try { + executor.getQueue().offer(r, 60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker"); + } + + throw new RejectedExecutionException("Timed Out while attempting to enqueue Task."); + } + }); } return this.executor; } diff --git a/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java b/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java index 3bc93abd2c..486ac0551c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java @@ -48,9 +48,9 @@ public class TaskRunnerFactory implements Executor { public TaskRunnerFactory() { this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000); } - + private TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) { - this(name,priority,daemon,maxIterationsPerRun,false); + this(name,priority,daemon,maxIterationsPerRun,false); } public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner) { @@ -92,7 +92,7 @@ public class TaskRunnerFactory implements Executor { public void execute(Runnable runnable) { execute(runnable, "ActiveMQ Task"); } - + public void execute(Runnable runnable, String name) { init(); if (executor != null) { @@ -103,7 +103,7 @@ public class TaskRunnerFactory implements Executor { } protected ExecutorService createDefaultExecutor() { - ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { + ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable, name + "-" + id.incrementAndGet()); thread.setDaemon(daemon); @@ -111,7 +111,6 @@ public class TaskRunnerFactory implements Executor { return thread; } }); - // rc.allowCoreThreadTimeOut(true); return rc; } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java b/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java index 9782f3b04d..c7049193dd 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java @@ -29,11 +29,9 @@ import java.util.concurrent.TimeUnit; /** * The SelectorManager will manage one Selector and the thread that checks the * selector. - * + * * We may need to consider running more than one thread to check the selector if * servicing the selector takes too long. - * - * @version $Rev: 46019 $ $Date: 2004-09-14 05:56:06 -0400 (Tue, 14 Sep 2004) $ */ public final class SelectorManager { @@ -43,28 +41,31 @@ public final class SelectorManager { private Executor channelExecutor = selectorExecutor; private LinkedList freeWorkers = new LinkedList(); private int maxChannelsPerWorker = 1024; - + protected ExecutorService createDefaultExecutor() { - ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.MILLISECONDS, new SynchronousQueue(), new ThreadFactory() { + ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { + + private long i = 0; + public Thread newThread(Runnable runnable) { - return new Thread(runnable, "ActiveMQ NIO Worker"); + this.i++; + final Thread t = new Thread(runnable, "ActiveMQ NIO Worker " + this.i); + return t; } }); - // rc.allowCoreThreadTimeOut(true); + return rc; } - + public static SelectorManager getInstance() { return SINGLETON; } public interface Listener { void onSelect(SelectorSelection selector); - void onError(SelectorSelection selection, Throwable error); } - public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener) throws IOException { @@ -78,7 +79,6 @@ public final class SelectorManager { worker.retain(); selection = new SelectorSelection(worker, socketChannel, listener); } - } else { // Worker starts /w retain count of 1 SelectorWorker worker = new SelectorWorker(this); @@ -86,7 +86,7 @@ public final class SelectorManager { selection = new SelectorSelection(worker, socketChannel, listener); } } - + return selection; } @@ -125,5 +125,4 @@ public final class SelectorManager { public void setSelectorExecutor(Executor selectorExecutor) { this.selectorExecutor = selectorExecutor; } - }