resolve: https://issues.apache.org/activemq/browse/AMQ-2772 - patch applied with thanks and new test case created to protect the change

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@953696 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-06-11 13:40:49 +00:00
parent 2b6f36dbfd
commit e771b88479
2 changed files with 56 additions and 2 deletions

View File

@ -1852,7 +1852,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
public void transportInterupted() {
this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0) - connectionConsumers.size());
if (LOG.isDebugEnabled()) {
LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
}

View File

@ -36,6 +36,8 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
@ -291,6 +293,58 @@ public class FailoverTransactionTest {
connection.close();
}
@Test
// https://issues.apache.org/activemq/browse/AMQ-2772
public void testFailoverWithConnectionConsumer() throws Exception {
startCleanBroker();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(QUEUE_NAME);
final CountDownLatch connectionConsumerGotOne = new CountDownLatch(1);
final Session poolSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.createConnectionConsumer(destination, null, new ServerSessionPool() {
public ServerSession getServerSession() throws JMSException {
return new ServerSession() {
public Session getSession() throws JMSException {
return poolSession;
}
public void start() throws JMSException {
connectionConsumerGotOne.countDown();
poolSession.run();
}
};
}
}, 1);
MessageConsumer consumer = session.createConsumer(destination);
MessageProducer producer;
TextMessage message;
final int count = 10;
for (int i=0; i<count; i++) {
producer = session.createProducer(destination);
message = session.createTextMessage("Test message: " + count);
producer.send(message);
producer.close();
}
// restart to force failover and connection state recovery before the commit
broker.stop();
startBroker(false);
session.commit();
for (int i=0; i<count-1; i++) {
assertNotNull("we got all the message: " + count, consumer.receive(20000));
}
session.commit();
connection.close();
assertTrue("connectionconsumer got a message", connectionConsumerGotOne.await(10, TimeUnit.SECONDS));
}
@Test
public void testFailoverConsumerAckLost() throws Exception {
// as failure depends on hash order of state tracker recovery, do a few times