https://issues.apache.org/jira/browse/AMQ-3985 - ActiveMQConnection temp advisory consumer should use asyncDispatch - can cause deadlock with slow consumers. Fix with test

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1375595 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2012-08-21 14:59:41 +00:00
parent dfbdbf6a50
commit ba9037ca65
2 changed files with 64 additions and 0 deletions

View File

@ -45,6 +45,7 @@ public class AdvisoryConsumer implements ActiveMQDispatcher {
info.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC); info.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
info.setPrefetchSize(1000); info.setPrefetchSize(1000);
info.setNoLocal(true); info.setNoLocal(true);
info.setDispatchAsync(true);
this.connection.addDispatcher(info.getConsumerId(), this); this.connection.addDispatcher(info.getConsumerId(), this);
this.connection.syncSendPacket(this.info); this.connection.syncSendPacket(this.info);

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq; package org.apache.activemq;
import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
@ -23,6 +24,9 @@ import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage; import javax.jms.BytesMessage;
import javax.jms.Connection; import javax.jms.Connection;
@ -38,12 +42,17 @@ import javax.jms.TemporaryQueue;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.vm.VMTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* @version * @version
*/ */
public class JmsTempDestinationTest extends TestCase { public class JmsTempDestinationTest extends TestCase {
private static final Logger LOG = LoggerFactory.getLogger(JmsTempDestinationTest.class);
private Connection connection; private Connection connection;
private ActiveMQConnectionFactory factory; private ActiveMQConnectionFactory factory;
protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>()); protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>());
@ -293,4 +302,58 @@ public class JmsTempDestinationTest extends TestCase {
assertTrue("failed to throw an exception", true); assertTrue("failed to throw an exception", true);
} }
} }
public void testSlowConsumerDoesNotBlockFastTempUsers() throws Exception {
ActiveMQConnectionFactory advisoryConnFactory = new ActiveMQConnectionFactory("vm://localhost?asyncQueueDepth=20");
Connection connection = advisoryConnFactory.createConnection();
connections.add(connection);
connection.start();
final CountDownLatch done = new CountDownLatch(1);
final AtomicBoolean ok = new AtomicBoolean(true);
final AtomicBoolean first = new AtomicBoolean(true);
VMTransport t = ((ActiveMQConnection)connection).getTransport().narrow(VMTransport.class);
t.setTransportListener(new TransportListener() {
@Override
public void onCommand(Object command) {
// block first dispatch for a while so broker backs up, but other connection should be able to proceed
if (first.compareAndSet(true, false)) {
try {
ok.set(done.await(35, TimeUnit.SECONDS));
LOG.info("Done waiting: " + ok.get());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void onException(IOException error) {
}
@Override
public void transportInterupted() {
}
@Override
public void transportResumed() {
}
});
connection = factory.createConnection();
connections.add(connection);
((ActiveMQConnection)connection).setWatchTopicAdvisories(false);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
for (int i=0; i<2500; i++) {
TemporaryQueue queue = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(queue);
consumer.close();
queue.delete();
}
LOG.info("Done with work: " + ok.get());
done.countDown();
assertTrue("ok", ok.get());
}
} }