- Fixing out of order dispatch.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@642067 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2008-03-28 02:22:39 +00:00
parent b5df4babc0
commit 76ac7588e8
1 changed files with 8 additions and 6 deletions

View File

@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
@ -107,7 +108,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
private long redeliveryDelay;
private int ackCounter;
private int dispatchedCount;
private MessageListener messageListener;
private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>();
private JMSConsumerStatsImpl stats;
private final String selector;
@ -330,7 +331,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
*/
public MessageListener getMessageListener() throws JMSException {
checkClosed();
return this.messageListener;
return this.messageListener.get();
}
/**
@ -354,19 +355,20 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
throw new JMSException(
"Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
}
this.messageListener = listener;
if (listener != null) {
boolean wasRunning = session.isRunning();
if (wasRunning) {
session.stop();
}
this.messageListener.set(listener);
session.redispatch(this, unconsumedMessages);
if (wasRunning) {
session.start();
}
} else {
this.messageListener.set(null);
}
}
@ -934,7 +936,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
public void dispatch(MessageDispatch md) {
MessageListener listener = this.messageListener;
MessageListener listener = this.messageListener.get();
try {
synchronized (unconsumedMessages.getMutex()) {
if (clearDispatchList) {
@ -1024,7 +1026,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
* @throws JMSException
*/
public boolean iterate() {
MessageListener listener = this.messageListener;
MessageListener listener = this.messageListener.get();
if (listener != null) {
MessageDispatch md = unconsumedMessages.dequeueNoWait();
if (md != null) {