git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1303544 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-03-21 19:55:30 +00:00
parent b96bb1ff4b
commit 5cd9ebaeb7
3 changed files with 47 additions and 25 deletions

View File

@ -31,6 +31,8 @@ import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -2371,12 +2373,34 @@ public class BrokerService implements Service {
protected synchronized ThreadPoolExecutor getExecutor() { protected synchronized ThreadPoolExecutor getExecutor() {
if (this.executor == null) { if (this.executor == null) {
this.executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { this.executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
private long i = 0;
@Override
public Thread newThread(Runnable runnable) { public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "Usage Async Task"); this.i++;
Thread thread = new Thread(runnable, "BrokerService.worker." + this.i);
thread.setDaemon(true); thread.setDaemon(true);
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
LOG.error("Error in thread '{}'", t.getName(), e);
}
});
return thread; return thread;
} }
}, new RejectedExecutionHandler() {
@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
try {
executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
}
throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
}
}); });
} }
return this.executor; return this.executor;

View File

@ -103,7 +103,7 @@ public class TaskRunnerFactory implements Executor {
} }
protected ExecutorService createDefaultExecutor() { protected ExecutorService createDefaultExecutor() {
ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() { ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) { public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, name + "-" + id.incrementAndGet()); Thread thread = new Thread(runnable, name + "-" + id.incrementAndGet());
thread.setDaemon(daemon); thread.setDaemon(daemon);
@ -111,7 +111,6 @@ public class TaskRunnerFactory implements Executor {
return thread; return thread;
} }
}); });
// rc.allowCoreThreadTimeOut(true);
return rc; return rc;
} }

View File

@ -32,8 +32,6 @@ import java.util.concurrent.TimeUnit;
* *
* We may need to consider running more than one thread to check the selector if * We may need to consider running more than one thread to check the selector if
* servicing the selector takes too long. * servicing the selector takes too long.
*
* @version $Rev: 46019 $ $Date: 2004-09-14 05:56:06 -0400 (Tue, 14 Sep 2004) $
*/ */
public final class SelectorManager { public final class SelectorManager {
@ -45,12 +43,17 @@ public final class SelectorManager {
private int maxChannelsPerWorker = 1024; private int maxChannelsPerWorker = 1024;
protected ExecutorService createDefaultExecutor() { protected ExecutorService createDefaultExecutor() {
ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() { ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
private long i = 0;
public Thread newThread(Runnable runnable) { public Thread newThread(Runnable runnable) {
return new Thread(runnable, "ActiveMQ NIO Worker"); this.i++;
final Thread t = new Thread(runnable, "ActiveMQ NIO Worker " + this.i);
return t;
} }
}); });
// rc.allowCoreThreadTimeOut(true);
return rc; return rc;
} }
@ -60,11 +63,9 @@ public final class SelectorManager {
public interface Listener { public interface Listener {
void onSelect(SelectorSelection selector); void onSelect(SelectorSelection selector);
void onError(SelectorSelection selection, Throwable error); void onError(SelectorSelection selection, Throwable error);
} }
public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener) public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener)
throws IOException { throws IOException {
@ -78,7 +79,6 @@ public final class SelectorManager {
worker.retain(); worker.retain();
selection = new SelectorSelection(worker, socketChannel, listener); selection = new SelectorSelection(worker, socketChannel, listener);
} }
} else { } else {
// Worker starts /w retain count of 1 // Worker starts /w retain count of 1
SelectorWorker worker = new SelectorWorker(this); SelectorWorker worker = new SelectorWorker(this);
@ -125,5 +125,4 @@ public final class SelectorManager {
public void setSelectorExecutor(Executor selectorExecutor) { public void setSelectorExecutor(Executor selectorExecutor) {
this.selectorExecutor = selectorExecutor; this.selectorExecutor = selectorExecutor;
} }
} }