use task runner to dispatch messages - improve concurrency

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@615287 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-01-25 17:52:54 +00:00
parent 55810b3155
commit cce756605f
1 changed files with 14 additions and 12 deletions

View File

@ -51,7 +51,6 @@ import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
@ -88,10 +87,7 @@ public class Queue extends BaseDestination implements Task {
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>(); private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
public void run() { public void run() {
try { wakeup();
taskRunner.wakeup();
} catch (InterruptedException e) {
}
}; };
}; };
@ -878,7 +874,7 @@ public class Queue extends BaseDestination implements Task {
*/ */
public boolean iterate() { public boolean iterate() {
while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) { while (!messagesWaitingForSpace.isEmpty() &&!memoryUsage.isFull()) {
Runnable op = messagesWaitingForSpace.removeFirst(); Runnable op = messagesWaitingForSpace.removeFirst();
op.run(); op.run();
} }
@ -930,11 +926,7 @@ public class Queue extends BaseDestination implements Task {
synchronized(pagedInMessages) { synchronized(pagedInMessages) {
pagedInMessages.remove(reference.getMessageId()); pagedInMessages.remove(reference.getMessageId());
} }
try { wakeup();
taskRunner.wakeup();
} catch (InterruptedException e) {
log.warn("Task Runner failed to wakeup ", e);
}
} }
protected boolean lockMessage(IndirectMessageReference r) { protected boolean lockMessage(IndirectMessageReference r) {
@ -953,7 +945,15 @@ public class Queue extends BaseDestination implements Task {
} }
destinationStatistics.getEnqueues().increment(); destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment(); destinationStatistics.getMessages().increment();
pageInMessages(false); wakeup();
}
final void wakeup() {
try {
taskRunner.wakeup();
} catch (InterruptedException e) {
log.warn("Task Runner failed to wakeup ", e);
}
} }
@ -1020,5 +1020,7 @@ public class Queue extends BaseDestination implements Task {
private void pageInMessages(boolean force) throws Exception { private void pageInMessages(boolean force) throws Exception {
doDispatch(doPageIn(force)); doDispatch(doPageIn(force));
} }
} }