Since some OS/JVM combinations handle threads more efficiently than others, using a thread pool to run our async tasks may not

be the most optimal solution.  Modified the TaskRunnerFactory so that it uses a system property to choose between the 
PooledTaskRunner or the DedicatedTaskRunner which now keeps a dedicated thread per task.

The default is still set to use the PooledTaskRunner, but we may change this if performance benchmarks indicate that DedicatedTaskRunner should be the default.

Also make the thread names a little more uniform so that when you use a debugger you can easily tell what each thread is doing.




git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@397774 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-04-28 05:54:54 +00:00
parent 39a5bbd68f
commit e2aad41e6a
19 changed files with 187 additions and 54 deletions

View File

@ -87,7 +87,6 @@ import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
import edu.emory.mathcs.backport.java.util.concurrent.Executor;
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
@ -97,8 +96,8 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener {
public static final TaskRunnerFactory SESSION_TASK_RUNNER = new TaskRunnerFactory("session Task",ThreadPriorities.INBOUND_CLIENT_SESSION,true,1000);
private final Executor asyncConnectionThread;
public static final TaskRunnerFactory SESSION_TASK_RUNNER = new TaskRunnerFactory("ActiveMQ Session Task",ThreadPriorities.INBOUND_CLIENT_SESSION,true,1000);
private final ThreadPoolExecutor asyncConnectionThread;
private static final Log log = LogFactory.getLog(ActiveMQConnection.class);
private static final IdGenerator connectionIdGenerator = new IdGenerator();
@ -168,16 +167,17 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* @param password
* @throws Exception
*/
protected ActiveMQConnection(Transport transport, JMSStatsImpl factoryStats)
protected ActiveMQConnection(final Transport transport, JMSStatsImpl factoryStats)
throws Exception {
// Configure a single threaded executor who's core thread can timeout if idle
// Configure a single threaded executor who's core thread can timeout if idle
asyncConnectionThread = new ThreadPoolExecutor(1,1,5,TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() {
public Thread newThread(Runnable r) {
return new Thread(r, "Connection task");
Thread thread = new Thread(r, "AcitveMQ Connection Worker: "+transport);
thread.setDaemon(true);
return thread;
}});
// Todo: see if we can allow the core threads to timeout.
// asyncConnectionThread.allowCoreThreadTimeOut(true);
asyncConnectionThread.allowCoreThreadTimeOut(true);
this.info = new ConnectionInfo(new ConnectionId(connectionIdGenerator.generateId()));
this.info.setManageable(true);

View File

@ -535,6 +535,8 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
synchronized public void dispose() throws JMSException {
if (!closed) {
executor.stop();
for (Iterator iter = consumers.iterator(); iter.hasNext();) {
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) iter.next();
@ -559,6 +561,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
connection.removeSession(this);
this.transactionContext=null;
closed = true;
}
}

View File

@ -61,7 +61,7 @@ public class ActiveMQSessionExecutor implements Task {
}
private void wakeup() {
if( !dispatchedBySessionPool && hasUncomsumedMessages() ) {
if( taskRunner!=null && !dispatchedBySessionPool && hasUncomsumedMessages() ) {
try {
taskRunner.wakeup();
} catch (InterruptedException e) {
@ -101,7 +101,7 @@ public class ActiveMQSessionExecutor implements Task {
synchronized void start() {
if( !messageQueue.isRunning() ) {
messageQueue.start();
taskRunner = ActiveMQConnection.SESSION_TASK_RUNNER.createTaskRunner(this);
taskRunner = ActiveMQConnection.SESSION_TASK_RUNNER.createTaskRunner(this, "ActiveMQ Session: "+session.getSessionId());
wakeup();
}
}

View File

@ -122,7 +122,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
}
if( taskRunnerFactory != null ) {
taskRunner = taskRunnerFactory.createTaskRunner( this );
taskRunner = taskRunnerFactory.createTaskRunner( this, "ActiveMQ Connection Dispatcher: "+System.identityHashCode(this) );
}
else {
taskRunner = null;
@ -145,6 +145,10 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
if(disposed)
return;
disposed=true;
if( taskRunner!=null )
taskRunner.shutdown();
//
// Remove all logical connection associated with this connection
// from the broker.

View File

@ -300,4 +300,7 @@ public class TransportConnector implements Connector {
this.name = name;
}
public String toString() {
return getName();
}
}

View File

@ -103,7 +103,7 @@ public class TransportStatusDetector implements Service,Runnable{
}
public void start() throws Exception{
if(started.compareAndSet(false,true)){
runner=new Thread(this,"Transport Status Dector "+connector);
runner=new Thread(this,"ActiveMQ Transport Status Monitor: "+connector);
runner.setDaemon(true);
runner.setPriority(ThreadPriorities.BROKER_MANAGEMENT);
runner.start();

View File

@ -46,7 +46,7 @@ public class CacheEvictionUsageListener implements UsageListener {
public boolean iterate() {
return evictMessages();
}
});
}, "Cache Evictor: "+System.identityHashCode(this));
}
private boolean evictMessages() {

View File

@ -120,7 +120,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
public boolean iterate() {
return doCheckpoint();
}
});
}, "ActiveMQ Journal Checkpoint Worker");
this.longTermPersistence = longTermPersistence;
}

View File

@ -120,7 +120,7 @@ public class QuickJournalPersistenceAdapter implements PersistenceAdapter, Journ
public boolean iterate() {
return doCheckpoint();
}
});
}, "ActiveMQ Checkpoint Worker");
this.longTermPersistence = longTermPersistence;
}

View File

@ -0,0 +1,109 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.thread;
/**
*
* @version $Revision: 1.1 $
*/
class DedicatedTaskRunner implements TaskRunner {
private final Task task;
private final Thread thread;
private final Object mutex = new Object();
private boolean threadTerminated;
private boolean pending;
private boolean shutdown;
public DedicatedTaskRunner(Task task, String name, int priority, boolean daemon) {
this.task = task;
thread = new Thread(name) {
public void run() {
runTask();
}
};
thread.setDaemon(daemon);
thread.setName(name);
thread.setPriority(priority);
thread.start();
}
/**
*/
public void wakeup() throws InterruptedException {
synchronized( mutex ) {
if( shutdown )
return;
pending=true;
mutex.notifyAll();
}
}
/**
* shut down the task
* @throws InterruptedException
*/
public void shutdown() throws InterruptedException{
synchronized(mutex){
shutdown=true;
pending=true;
mutex.notifyAll();
// Wait till the thread stops.
if(!threadTerminated){
mutex.wait();
}
}
}
private void runTask() {
try {
while( true ) {
synchronized (mutex) {
pending=false;
if( shutdown ) {
return;
}
}
if( !task.iterate() ) {
// wait to be notified.
synchronized (mutex) {
while( !pending ) {
mutex.wait();
}
}
}
}
} catch (InterruptedException e) {
// Someone really wants this thread to die off.
} finally {
// Make sure we notify any waiting threads that thread
// has terminated.
synchronized (mutex) {
threadTerminated=true;
mutex.notifyAll();
}
}
}
}

View File

@ -37,7 +37,7 @@ public class DefaultThreadPools {
});
}
private static final TaskRunnerFactory defaultTaskRunnerFactory = new TaskRunnerFactory(defaultPool,10);
private static final TaskRunnerFactory defaultTaskRunnerFactory = new TaskRunnerFactory();
public static Executor getDefaultPool() {
return defaultPool;

View File

@ -22,7 +22,7 @@ import edu.emory.mathcs.backport.java.util.concurrent.Executor;
*
* @version $Revision: 1.1 $
*/
class SimpleTaskRunner implements TaskRunner {
class PooledTaskRunner implements TaskRunner {
private final int maxIterationsPerRun;
private final Executor executor;
@ -33,7 +33,7 @@ class SimpleTaskRunner implements TaskRunner {
private boolean iterating;
private Thread runningThread;
public SimpleTaskRunner(Executor executor, Task task, int maxIterationsPerRun) {
public PooledTaskRunner(Executor executor, Task task, int maxIterationsPerRun) {
this.executor = executor;
this.maxIterationsPerRun = maxIterationsPerRun;
this.task = task;

View File

@ -37,43 +37,42 @@ import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
public class TaskRunnerFactory {
private Executor executor;
private int maxIterationsPerRun = 1000;
private int maxIterationsPerRun;
private String name;
private int priority;
private boolean daemon;
public TaskRunnerFactory() {
setExecutor(createDefaultExecutor("ActiveMQ Task", Thread.NORM_PRIORITY, true));
this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000);
}
public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) {
this.name = name;
this.priority = priority;
this.daemon = daemon;
this.maxIterationsPerRun = maxIterationsPerRun;
setExecutor(createDefaultExecutor(name, priority, daemon));
// If your OS/JVM combination has a good thread model, you may want to avoid
// using a thread pool to run tasks and use a DedicatedTaskRunner instead.
if( "true".equals(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner")) ) {
executor = null;
} else {
executor = createDefaultExecutor();
}
}
public TaskRunnerFactory(Executor executor, int maxIterationsPerRun) {
this.executor = executor;
this.maxIterationsPerRun = maxIterationsPerRun;
}
public TaskRunner createTaskRunner(Task task) {
return new SimpleTaskRunner(executor, task, maxIterationsPerRun);
public TaskRunner createTaskRunner(Task task, String name) {
if( executor!=null ) {
return new PooledTaskRunner(executor, task, maxIterationsPerRun);
} else {
return new DedicatedTaskRunner(task, name, priority, daemon);
}
}
public Executor getExecutor() {
return executor;
}
public void setExecutor(Executor executor) {
this.executor = executor;
}
public int getMaxIterationsPerRun() {
return maxIterationsPerRun;
}
public void setMaxIterationsPerRun(int maxIterationsPerRun) {
this.maxIterationsPerRun = maxIterationsPerRun;
}
protected Executor createDefaultExecutor(final String name, final int priority, final boolean daemon) {
protected Executor createDefaultExecutor() {
ThreadPoolExecutor rc = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, name);
@ -84,6 +83,7 @@ public class TaskRunnerFactory {
});
rc.allowCoreThreadTimeOut(true);
return rc;
}
}

View File

@ -69,7 +69,7 @@ public abstract class TransportServerThreadSupport extends TransportServerSuppor
protected void doStart() throws Exception {
log.info("Listening for connections at: " + getLocation());
runner = new Thread(this, toString());
runner = new Thread(this, "ActiveMQ Transport Server: "+toString());
runner.setDaemon(daemon);
runner.setPriority(ThreadPriorities.BROKER_MANAGEMENT);
runner.start();

View File

@ -38,7 +38,7 @@ public abstract class TransportThreadSupport extends TransportSupport implements
}
protected void doStart() throws Exception {
runner = new Thread(this, toString());
runner = new Thread(this, "ActiveMQ Transport: "+toString());
runner.setDaemon(daemon);
runner.start();
}

View File

@ -22,6 +22,7 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Random;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
@ -30,7 +31,6 @@ import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.transport.CompositeTransport;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
@ -40,6 +40,7 @@ import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
@ -214,7 +215,7 @@ public class FailoverTransport implements CompositeTransport {
return !disposed;
}
});
}, "ActiveMQ Failover Worker: "+System.identityHashCode(this));
}
private void handleTransportFailure(IOException e) throws InterruptedException {
@ -262,6 +263,7 @@ public class FailoverTransport implements CompositeTransport {
synchronized(sleepMutex){
sleepMutex.notifyAll();
}
reconnectTask.shutdown();
}
public long getInitialReconnectDelay() {

View File

@ -152,7 +152,7 @@ public class FanoutTransport implements CompositeTransport {
public boolean iterate() {
return doConnect();
}
});
}, "ActiveMQ Fanout Worker: "+System.identityHashCode(this));
}
/**
@ -294,7 +294,6 @@ public class FanoutTransport implements CompositeTransport {
log.debug("Stopped: "+this);
ss.throwFirstException();
}
reconnectTask.shutdown();
}

View File

@ -156,7 +156,7 @@ public class TcpTransportServer extends TransportServerThreadSupport {
* @return pretty print of this
*/
public String toString() {
return "TcpTransportServer@" + getLocation();
return ""+getLocation();
}
/**

View File

@ -32,6 +32,17 @@ public class TaskRunnerTest extends TestCase {
private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
.getLog(TaskRunnerTest.class);
public void testWakeupPooled() throws InterruptedException, BrokenBarrierException {
System.setProperty("org.apache.activemq.UseDedicatedTaskRunner", "false");
doTestWakeup();
}
public void testWakeupDedicated() throws InterruptedException, BrokenBarrierException {
System.setProperty("org.apache.activemq.UseDedicatedTaskRunner", "true");
doTestWakeup();
}
/**
* Simulate multiple threads queuing work for the
* TaskRunner. The Task Runner dequeues the
@ -40,7 +51,7 @@ public class TaskRunnerTest extends TestCase {
* @throws InterruptedException
* @throws BrokenBarrierException
*/
public void testWakeup() throws InterruptedException, BrokenBarrierException {
public void doTestWakeup() throws InterruptedException, BrokenBarrierException {
final AtomicInteger iterations = new AtomicInteger(0);
final AtomicInteger counter = new AtomicInteger(0);
@ -64,7 +75,7 @@ public class TaskRunnerTest extends TestCase {
return true;
}
}
});
}, "Thread Name");
long start = System.currentTimeMillis();
final int WORKER_COUNT=5;
@ -96,6 +107,8 @@ public class TaskRunnerTest extends TestCase {
log.info("Dequeues/s: "+(1000.0*ENQUEUE_COUNT/(end-start)));
log.info("duration: "+((end-start)/1000.0));
assertTrue(b);
runner.shutdown();
}