https://issues.apache.org/jira/browse/AMQ-5513 - additional test for reconnect/rebalance case - indicate the absense of delivery information to the destination

This commit is contained in:
gtully 2015-01-23 15:23:35 +00:00
parent f38cb588d3
commit a2c5c22ec5
2 changed files with 39 additions and 1 deletions

View File

@ -1184,7 +1184,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
cs.getContext().getStopping().set(true); cs.getContext().getStopping().set(true);
try { try {
LOG.debug("Cleaning up connection resources: {}", getRemoteAddress()); LOG.debug("Cleaning up connection resources: {}", getRemoteAddress());
processRemoveConnection(cs.getInfo().getConnectionId(), 0l); processRemoveConnection(cs.getInfo().getConnectionId(), -1);
} catch (Throwable ignore) { } catch (Throwable ignore) {
ignore.printStackTrace(); ignore.printStackTrace();
} }

View File

@ -32,6 +32,8 @@ import javax.jms.Topic;
import junit.framework.Test; import junit.framework.Test;
import junit.framework.TestCase; import junit.framework.TestCase;
import junit.framework.TestSuite; import junit.framework.TestSuite;
import org.apache.activemq.transport.vm.VMTransport;
import org.apache.activemq.util.Wait;
/** /**
* *
@ -401,6 +403,42 @@ public class JmsRedeliveredTest extends TestCase {
session.close(); session.close();
} }
public void testNoReceiveConsumerDisconnectDoesNotIncrementRedelivery() throws Exception {
connection.setClientID(getName());
connection.start();
Connection keepBrokerAliveConnection = createConnection();
keepBrokerAliveConnection.start();
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("queue-" + getName());
final MessageConsumer consumer = session.createConsumer(queue);
MessageProducer producer = createProducer(session, queue);
producer.send(createTextMessage(session));
session.commit();
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return ((ActiveMQMessageConsumer)consumer).getMessageSize() == 1;
}
});
// whack the connection - like a rebalance or tcp drop
((ActiveMQConnection)connection).getTransport().narrow(VMTransport.class).stop();
session = keepBrokerAliveConnection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createConsumer(queue);
Message msg = messageConsumer.receive(1000);
assertNotNull(msg);
msg.acknowledge();
assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
session.close();
keepBrokerAliveConnection.close();
}
public void testNoReceiveConsumerDoesNotIncrementRedelivery() throws Exception { public void testNoReceiveConsumerDoesNotIncrementRedelivery() throws Exception {
connection.setClientID(getName()); connection.setClientID(getName());
connection.start(); connection.start();