Patch for AMQ-1093 to avoid a deadlock if the transport is being reconnected from inside a MessageListener which is calling a send(), lets make the explicit clear of the consumer dispatch list asynchronous and within the existing mutex

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@487235 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-12-14 15:23:47 +00:00
parent e500f2ecc4
commit 419ed2ee91
1 changed files with 23 additions and 20 deletions

View File

@ -17,24 +17,7 @@
*/
package org.apache.activemq;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.*;
import org.apache.activemq.management.JMSConsumerStatsImpl;
import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl;
@ -47,6 +30,12 @@ import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.IllegalStateException;
import javax.jms.*;
import javax.jms.Message;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -121,6 +110,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
private AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
private ExecutorService executorService = null;
private MessageTransformer transformer;
private boolean clearDispatchList;
/**
* Create a MessageConsumer
@ -569,7 +559,14 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
void clearMessagesInProgress(){
unconsumedMessages.clear();
// we are called from inside the transport reconnection logic
// which involves us clearing all the connections' consumers
// dispatch lists and clearing them
// so rather than trying to grab a mutex (which could be already
// owned by the message listener calling the send) we will just set
// a flag so that the list can be cleared as soon as the
// dispatch thread is ready to flush the dispatch list
clearDispatchList= true;
}
void deliverAcks(){
@ -859,7 +856,13 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
MessageListener listener = this.messageListener;
try {
synchronized(unconsumedMessages.getMutex()){
if (!unconsumedMessages.isClosed()) {
if (clearDispatchList) {
// we are reconnecting so lets flush the in progress messages
clearDispatchList = false;
unconsumedMessages.clear();
}
if (!unconsumedMessages.isClosed()) {
if (listener != null && unconsumedMessages.isRunning() ) {
ActiveMQMessage message = createActiveMQMessage(md);
beforeMessageIsConsumed(md);