Updated the TransportConnector and TransportConnection to use a thread pool when initalizing and destroying connection to better support fast

connect and disconnect use cases.



git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@916780 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2010-02-26 18:40:31 +00:00
parent 662324caac
commit 1f521da77e
5 changed files with 53 additions and 30 deletions

View File

@ -77,6 +77,7 @@ import org.apache.activemq.state.ConsumerState;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.state.TransactionState;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
@ -91,6 +92,8 @@ import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.URISupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import static org.apache.activemq.thread.DefaultThreadPools.*;
/**
* @version $Revision: 1.8 $
*/
@ -908,8 +911,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
cs.getContext().getStopping().set(true);
}
try {
new Thread("ActiveMQ Transport Stopper: " + transport.getRemoteAddress()) {
@Override
getDefaultTaskRunnerFactory().execute(new Runnable(){
public void run() {
serviceLock.writeLock().lock();
try {
@ -922,7 +924,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
serviceLock.writeLock().unlock();
}
}
}.start();
});
} catch (Throwable t) {
LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t);
stopped.countDown();

View File

@ -21,6 +21,7 @@ import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.region.ConnectorStatistics;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
@ -32,6 +33,9 @@ import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import static org.apache.activemq.thread.DefaultThreadPools.*;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@ -202,9 +206,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
server.setAcceptListener(new TransportAcceptListener() {
public void onAccept(final Transport transport) {
try {
// Starting the connection could block due to
// wireformat negotiation, so start it in an async thread.
Thread startThread = new Thread("ActiveMQ Transport Initiator: " + transport.getRemoteAddress()) {
getDefaultTaskRunnerFactory().execute(new Runnable(){
public void run() {
try {
Connection connection = createConnection(transport);
@ -214,8 +216,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
onAcceptError(e);
}
}
};
startThread.start();
});
} catch (Exception e) {
String remoteHost = transport.getRemoteAddress();
ServiceSupport.dispose(transport);

View File

@ -26,24 +26,24 @@ import java.util.concurrent.ThreadFactory;
*/
public final class DefaultThreadPools {
private static final Executor DEFAULT_POOL;
static {
DEFAULT_POOL = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "ActiveMQ Default Thread Pool Thread");
thread.setDaemon(true);
return thread;
}
});
}
// private static final Executor DEFAULT_POOL;
// static {
// DEFAULT_POOL = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
// public Thread newThread(Runnable runnable) {
// Thread thread = new Thread(runnable, "ActiveMQ Default Thread Pool Thread");
// thread.setDaemon(true);
// return thread;
// }
// });
// }
private static final TaskRunnerFactory DEFAULT_TASK_RUNNER_FACTORY = new TaskRunnerFactory();
private DefaultThreadPools() {
}
public static Executor getDefaultPool() {
return DEFAULT_POOL;
}
// public static Executor getDefaultPool() {
// return DEFAULT_POOL;
// }
public static TaskRunnerFactory getDefaultTaskRunnerFactory() {
return DEFAULT_TASK_RUNNER_FACTORY;

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.thread;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
@ -31,7 +32,7 @@ import java.util.concurrent.TimeUnit;
*
* @version $Revision: 1.5 $
*/
public class TaskRunnerFactory {
public class TaskRunnerFactory implements Executor {
private ExecutorService executor;
private int maxIterationsPerRun;
@ -80,6 +81,18 @@ public class TaskRunnerFactory {
}
}
public void execute(Runnable runnable) {
execute(runnable, "ActiveMQ Task");
}
public void execute(Runnable runnable, String name) {
if (executor != null) {
executor.execute(runnable);
} else {
new Thread(runnable, name).start();
}
}
protected ExecutorService createDefaultExecutor() {
ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {

View File

@ -20,8 +20,11 @@ import java.io.IOException;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* The SelectorManager will manage one Selector and the thread that checks the
@ -36,17 +39,21 @@ public final class SelectorManager {
public static final SelectorManager SINGLETON = new SelectorManager();
private Executor selectorExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread rc = new Thread(r);
rc.setName("NIO Transport Thread");
return rc;
}
});
private Executor selectorExecutor = createDefaultExecutor();
private Executor channelExecutor = selectorExecutor;
private LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>();
private int maxChannelsPerWorker = 64;
protected ExecutorService createDefaultExecutor() {
ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
return new Thread(runnable, "ActiveMQ NIO Worker");
}
});
// rc.allowCoreThreadTimeOut(true);
return rc;
}
public static SelectorManager getInstance() {
return SINGLETON;
}