git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1451608 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-03-01 14:55:20 +00:00
parent d98c3e0249
commit d8aca93328
1 changed files with 23 additions and 16 deletions

View File

@ -23,7 +23,9 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import junit.framework.Test; import junit.framework.Test;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
@ -32,8 +34,17 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class RedeliveryRestartTest extends BrokerRestartTestSupport { public class RedeliveryRestartTest extends BrokerRestartTestSupport {
private static final transient Logger LOG = LoggerFactory.getLogger(RedeliveryRestartTest.class); private static final transient Logger LOG = LoggerFactory.getLogger(RedeliveryRestartTest.class);
@Override
protected void setUp() throws Exception {
setAutoFail(true);
setMaxTestTime(2 * 60 * 1000);
super.setUp();
}
@Override @Override
protected void configureBroker(BrokerService broker) throws Exception { protected void configureBroker(BrokerService broker) throws Exception {
super.configureBroker(broker); super.configureBroker(broker);
@ -45,8 +56,8 @@ public class RedeliveryRestartTest extends BrokerRestartTestSupport {
public void testValidateRedeliveryFlagAfterRestart() throws Exception { public void testValidateRedeliveryFlagAfterRestart() throws Exception {
ConnectionFactory connectionFactory = ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getPublishableConnectString()
new ActiveMQConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getPublishableConnectString() + ")?jms.transactedIndividualAck=true"); + ")?jms.transactedIndividualAck=true");
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start(); connection.start();
@ -57,7 +68,7 @@ public class RedeliveryRestartTest extends BrokerRestartTestSupport {
MessageConsumer consumer = session.createConsumer(destination); MessageConsumer consumer = session.createConsumer(destination);
TextMessage msg = null; TextMessage msg = null;
for (int i=0; i<5;i++) { for (int i = 0; i < 5; i++) {
msg = (TextMessage) consumer.receive(20000); msg = (TextMessage) consumer.receive(20000);
LOG.info("not redelivered? got: " + msg); LOG.info("not redelivered? got: " + msg);
assertNotNull("got the message", msg); assertNotNull("got the message", msg);
@ -70,10 +81,11 @@ public class RedeliveryRestartTest extends BrokerRestartTestSupport {
restartBroker(); restartBroker();
// make failover aware of the restarted auto assigned port // make failover aware of the restarted auto assigned port
((FailoverTransport) connection.getTransport().narrow(FailoverTransport.class)).add(true, broker.getTransportConnectors().get(0).getPublishableConnectString()); connection.getTransport().narrow(FailoverTransport.class).add(true, broker.getTransportConnectors().get(0)
.getPublishableConnectString());
consumer = session.createConsumer(destination); consumer = session.createConsumer(destination);
for (int i=0; i<5;i++) { for (int i = 0; i < 5; i++) {
msg = (TextMessage) consumer.receive(4000); msg = (TextMessage) consumer.receive(4000);
LOG.info("redelivered? got: " + msg); LOG.info("redelivered? got: " + msg);
assertNotNull("got the message again", msg); assertNotNull("got the message again", msg);
@ -83,7 +95,7 @@ public class RedeliveryRestartTest extends BrokerRestartTestSupport {
session.commit(); session.commit();
// consume the rest that were not redeliveries // consume the rest that were not redeliveries
for (int i=0; i<5;i++) { for (int i = 0; i < 5; i++) {
msg = (TextMessage) consumer.receive(20000); msg = (TextMessage) consumer.receive(20000);
LOG.info("not redelivered? got: " + msg); LOG.info("not redelivered? got: " + msg);
assertNotNull("got the message", msg); assertNotNull("got the message", msg);
@ -96,8 +108,8 @@ public class RedeliveryRestartTest extends BrokerRestartTestSupport {
} }
public void testValidateRedeliveryFlagAfterRecovery() throws Exception { public void testValidateRedeliveryFlagAfterRecovery() throws Exception {
ConnectionFactory connectionFactory = ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()
new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString() + "?jms.transactedIndividualAck=true"); + "?jms.transactedIndividualAck=true");
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start(); connection.start();
@ -122,12 +134,11 @@ public class RedeliveryRestartTest extends BrokerRestartTestSupport {
broker = createRestartedBroker(); broker = createRestartedBroker();
broker.start(); broker.start();
connectionFactory = connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()
new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString() + "?jms.transactedIndividualAck=true"); + "?jms.transactedIndividualAck=true");
connection = (ActiveMQConnection) connectionFactory.createConnection(); connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start(); connection.start();
session = connection.createSession(true, Session.SESSION_TRANSACTED); session = connection.createSession(true, Session.SESSION_TRANSACTED);
consumer = session.createConsumer(destination); consumer = session.createConsumer(destination);
msg = (TextMessage) consumer.receive(10000); msg = (TextMessage) consumer.receive(10000);
@ -139,9 +150,7 @@ public class RedeliveryRestartTest extends BrokerRestartTestSupport {
connection.close(); connection.close();
} }
private void populateDestination(final int nbMessages, private void populateDestination(final int nbMessages, final String destinationName, javax.jms.Connection connection) throws JMSException {
final String destinationName, javax.jms.Connection connection)
throws JMSException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(destinationName); Destination destination = session.createQueue(destinationName);
MessageProducer producer = session.createProducer(destination); MessageProducer producer = session.createProducer(destination);
@ -152,7 +161,6 @@ public class RedeliveryRestartTest extends BrokerRestartTestSupport {
session.close(); session.close();
} }
public static Test suite() { public static Test suite() {
return suite(RedeliveryRestartTest.class); return suite(RedeliveryRestartTest.class);
} }
@ -160,5 +168,4 @@ public class RedeliveryRestartTest extends BrokerRestartTestSupport {
public static void main(String[] args) { public static void main(String[] args) {
junit.textui.TestRunner.run(suite()); junit.textui.TestRunner.run(suite());
} }
} }