A couple of tests to add to the patch provided
This commit is contained in:
Andy Taylor 2015-06-08 15:53:48 +01:00
parent c9a3202bc3
commit 67c28b1c68
1 changed files with 97 additions and 0 deletions

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.activemq; package org.apache.activemq;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -615,6 +617,101 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
} }
public void testRedeliveryRollbackWithDelayBlocking() throws Exception
{
redeliveryRollbackWithDelay(true);
}
public void testRedeliveryRollbackWithDelayNonBlocking() throws Exception
{
redeliveryRollbackWithDelay(false);
}
public void redeliveryRollbackWithDelay(final boolean blockingRedelivery) throws Exception {
connection.start();
Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQQueue destination = new ActiveMQQueue("TEST");
MessageProducer producer = sendSession.createProducer(destination);
producer.send(sendSession.createTextMessage("1st"));
producer.send(sendSession.createTextMessage("2nd"));
connection = (ActiveMQConnection)factory.createConnection(userName, password);
connections.add(connection);
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(2000);
policy.setUseExponentialBackOff(false);
connection.setNonBlockingRedelivery(blockingRedelivery);
connection.start();
final CountDownLatch done = new CountDownLatch(3);
final ActiveMQSession session = (ActiveMQSession) connection.createSession(true, Session.SESSION_TRANSACTED);
final List<String> list = new ArrayList<>();
session.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
ActiveMQTextMessage m = (ActiveMQTextMessage) message;
LOG.info("Got: " + ((ActiveMQTextMessage) message).getMessageId() + ", seq:" + ((ActiveMQTextMessage) message).getMessageId().getBrokerSequenceId());
list.add(((ActiveMQTextMessage) message).getText());
if (done.getCount() == 3)
{
session.rollback();
}
done.countDown();
} catch (Exception ignored) {
ignored.printStackTrace();
}
}
});
connection.createConnectionConsumer(
destination,
null,
new ServerSessionPool() {
@Override
public ServerSession getServerSession() throws JMSException {
return new ServerSession() {
@Override
public Session getSession() throws JMSException {
return session;
}
@Override
public void start() throws JMSException {
}
};
}
},
100,
false);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
session.run();
return done.await(10, TimeUnit.MILLISECONDS);
}
}, 5000);
connection.close();
connections.remove(connection);
assertEquals(list.size(), 3);
if (blockingRedelivery) {
assertEquals("1st", list.get(0));
assertEquals("2nd", list.get(1));
assertEquals("1st", list.get(2));
} else {
assertEquals("1st", list.get(0));
assertEquals("1st", list.get(1));
assertEquals("2nd", list.get(2));
}
}
public void testInitialRedeliveryDelayZero() throws Exception { public void testInitialRedeliveryDelayZero() throws Exception {
// Receive a message with the JMS API // Receive a message with the JMS API