re-introduce optimize dispatch for queues

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@634588 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-03-07 09:02:37 +00:00
parent 4c481ec6a9
commit 3f9d6e2ef8
3 changed files with 37 additions and 20 deletions

View File

@ -78,6 +78,7 @@ import org.apache.commons.logging.LogFactory;
*/
public class Queue extends BaseDestination implements Task {
protected final Log log;
protected TaskRunnerFactory taskFactory;
protected TaskRunner taskRunner;
protected final List<Subscription> consumers = new ArrayList<Subscription>(50);
protected PendingMessageCursor messages;
@ -93,6 +94,7 @@ public class Queue extends BaseDestination implements Task {
private boolean useConsumerPriority=true;
private boolean strictOrderDispatch=false;
private QueueDispatchSelector dispatchSelector;
private boolean optimizedDispatch=false;
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
public void run() {
wakeup();
@ -109,11 +111,9 @@ public class Queue extends BaseDestination implements Task {
public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store,DestinationStatistics parentStats,
TaskRunnerFactory taskFactory) throws Exception {
super(brokerService, store, destination, parentStats);
this.taskFactory=taskFactory;
this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName());
this.dispatchSelector=new QueueDispatchSelector(destination);
}
public void initialize() throws Exception {
@ -133,17 +133,20 @@ public class Queue extends BaseDestination implements Task {
memoryUsage.setParent(systemUsage.getMemoryUsage());
}
this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "QueueThread:"+destination);
thread.setDaemon(true);
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
});
this.taskRunner = new DeterministicTaskRunner(this.executor,this);
if (isOptimizedDispatch()) {
this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue: " + destination.getPhysicalName());
}else {
this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "QueueThread:"+destination);
thread.setDaemon(true);
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
});
this.taskRunner = new DeterministicTaskRunner(this.executor,this);
}
super.initialize();
if (store != null) {
// Restore the persistent messages.
@ -583,6 +586,15 @@ public class Queue extends BaseDestination implements Task {
public void setStrictOrderDispatch(boolean strictOrderDispatch) {
this.strictOrderDispatch = strictOrderDispatch;
}
public boolean isOptimizedDispatch() {
return optimizedDispatch;
}
public void setOptimizedDispatch(boolean optimizedDispatch) {
this.optimizedDispatch = optimizedDispatch;
}
// Implementation methods
// -------------------------------------------------------------------------
@ -956,10 +968,14 @@ public class Queue extends BaseDestination implements Task {
}
protected void wakeup() {
try {
taskRunner.wakeup();
} catch (InterruptedException e) {
log.warn("Task Runner failed to wakeup ", e);
if (optimizedDispatch) {
iterate();
}else {
try {
taskRunner.wakeup();
} catch (InterruptedException e) {
log.warn("Task Runner failed to wakeup ", e);
}
}
}
@ -1075,4 +1091,5 @@ public class Queue extends BaseDestination implements Task {
private void removeFromConsumerList(Subscription sub) {
consumers.remove(sub);
}
}

View File

@ -32,7 +32,7 @@ import org.apache.activemq.thread.TaskRunnerFactory;
*/
public class TempQueue extends Queue{
private final ActiveMQTempDestination tempDest;
private TaskRunnerFactory taskFactory;
/**
* @param brokerService
@ -48,7 +48,6 @@ public class TempQueue extends Queue{
throws Exception {
super(brokerService, destination, store, parentStats, taskFactory);
this.tempDest = (ActiveMQTempDestination) destination;
this.taskFactory=taskFactory;
}
public void initialize() throws Exception {

View File

@ -86,6 +86,7 @@ public class PolicyEntry extends DestinationMapEntry {
queue.setMinimumMessageSize((int) getMinimumMessageSize());
queue.setUseConsumerPriority(isUseConsumerPriority());
queue.setStrictOrderDispatch(isStrictOrderDispatch());
queue.setOptimizedDispatch(isOptimizedDispatch());
}
public void configure(Topic topic) {