Update the tests so that they're not dependent on port 61616

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1084797 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-03-23 22:58:18 +00:00
parent 5cf486c975
commit 883eed0e38
4 changed files with 180 additions and 156 deletions

View File

@ -52,55 +52,58 @@ import org.junit.After;
import org.junit.Test; import org.junit.Test;
public class FailoverConsumerOutstandingCommitTest { public class FailoverConsumerOutstandingCommitTest {
private static final Logger LOG = LoggerFactory.getLogger(FailoverConsumerOutstandingCommitTest.class);
private static final String QUEUE_NAME = "FailoverWithOutstandingCommit";
private static final String MESSAGE_TEXT = "Test message ";
private String url = "tcp://localhost:61616";
final int prefetch = 10;
BrokerService broker;
public void startCleanBroker() throws Exception {
startBroker(true);
}
@After
public void stopBroker() throws Exception {
if (broker != null) {
broker.stop();
}
}
public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
broker = createBroker(deleteAllMessagesOnStartup);
broker.start();
}
public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception { private static final Logger LOG = LoggerFactory.getLogger(FailoverConsumerOutstandingCommitTest.class);
broker = new BrokerService(); private static final String QUEUE_NAME = "FailoverWithOutstandingCommit";
broker.addConnector(url); private static final String MESSAGE_TEXT = "Test message ";
broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup); private static final String TRANSPORT_URI = "tcp://localhost:0";
PolicyMap policyMap = new PolicyMap(); private String url;
PolicyEntry defaultEntry = new PolicyEntry(); final int prefetch = 10;
BrokerService broker;
// optimizedDispatche and sync dispatch ensure that the dispatch happens
// before the commit reply that the consumer.clearDispatchList is waiting for. @After
defaultEntry.setOptimizedDispatch(true); public void stopBroker() throws Exception {
if (broker != null) {
broker.stop();
}
}
public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
broker = createBroker(deleteAllMessagesOnStartup);
broker.start();
}
public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI);
}
public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
broker = new BrokerService();
broker.addConnector(bindAddress);
broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
// optimizedDispatche and sync dispatch ensure that the dispatch happens
// before the commit reply that the consumer.clearDispatchList is waiting for.
defaultEntry.setOptimizedDispatch(true);
policyMap.setDefaultEntry(defaultEntry); policyMap.setDefaultEntry(defaultEntry);
broker.setDestinationPolicy(policyMap); broker.setDestinationPolicy(policyMap);
return broker;
}
@Test url = broker.getTransportConnectors().get(0).getConnectUri().toString();
public void testFailoverConsumerDups() throws Exception {
doTestFailoverConsumerDups(true); return broker;
} }
public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception { @Test
public void testFailoverConsumerDups() throws Exception {
doTestFailoverConsumerDups(true);
}
public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception {
broker = createBroker(true); broker = createBroker(true);
broker.setPlugins(new BrokerPlugin[] { broker.setPlugins(new BrokerPlugin[] {
new BrokerPluginSupport() { new BrokerPluginSupport() {
@Override @Override
@ -108,7 +111,7 @@ public class FailoverConsumerOutstandingCommitTest {
TransactionId xid, boolean onePhase) throws Exception { TransactionId xid, boolean onePhase) throws Exception {
// so commit will hang as if reply is lost // so commit will hang as if reply is lost
context.setDontSendReponse(true); context.setDontSendReponse(true);
Executors.newSingleThreadExecutor().execute(new Runnable() { Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() { public void run() {
LOG.info("Stopping broker before commit..."); LOG.info("Stopping broker before commit...");
try { try {
@ -122,17 +125,17 @@ public class FailoverConsumerOutstandingCommitTest {
} }
}); });
broker.start(); broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
cf.setWatchTopicAdvisories(watchTopicAdvisories); cf.setWatchTopicAdvisories(watchTopicAdvisories);
cf.setDispatchAsync(false); cf.setDispatchAsync(false);
final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
connection.start(); connection.start();
final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=" + prefetch); final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=" + prefetch);
final Session consumerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); final Session consumerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
@ -144,9 +147,9 @@ public class FailoverConsumerOutstandingCommitTest {
public void onMessage(Message message) { public void onMessage(Message message) {
LOG.info("consume one and commit"); LOG.info("consume one and commit");
assertNotNull("got message", message); assertNotNull("got message", message);
try { try {
consumerSession.commit(); consumerSession.commit();
} catch (JMSException e) { } catch (JMSException e) {
@ -157,7 +160,7 @@ public class FailoverConsumerOutstandingCommitTest {
LOG.info("done commit"); LOG.info("done commit");
} }
}); });
// may block if broker shutodwn happens quickly // may block if broker shutodwn happens quickly
Executors.newSingleThreadExecutor().execute(new Runnable() { Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() { public void run() {
@ -171,15 +174,15 @@ public class FailoverConsumerOutstandingCommitTest {
LOG.info("producer done"); LOG.info("producer done");
} }
}); });
// will be stopped by the plugin // will be stopped by the plugin
broker.waitUntilStopped(); broker.waitUntilStopped();
broker = createBroker(false); broker = createBroker(false, url);
broker.start(); broker.start();
assertTrue("consumer added through failover", commitDoneLatch.await(20, TimeUnit.SECONDS)); assertTrue("consumer added through failover", commitDoneLatch.await(20, TimeUnit.SECONDS));
assertTrue("another message was recieved after failover", messagesReceived.await(20, TimeUnit.SECONDS)); assertTrue("another message was recieved after failover", messagesReceived.await(20, TimeUnit.SECONDS));
connection.close(); connection.close();
} }
@ -187,12 +190,12 @@ public class FailoverConsumerOutstandingCommitTest {
public void TestFailoverConsumerOutstandingSendTxIncomplete() throws Exception { public void TestFailoverConsumerOutstandingSendTxIncomplete() throws Exception {
doTestFailoverConsumerOutstandingSendTx(false); doTestFailoverConsumerOutstandingSendTx(false);
} }
@Test @Test
public void TestFailoverConsumerOutstandingSendTxComplete() throws Exception { public void TestFailoverConsumerOutstandingSendTxComplete() throws Exception {
doTestFailoverConsumerOutstandingSendTx(true); doTestFailoverConsumerOutstandingSendTx(true);
} }
public void doTestFailoverConsumerOutstandingSendTx(final boolean doActualBrokerCommit) throws Exception { public void doTestFailoverConsumerOutstandingSendTx(final boolean doActualBrokerCommit) throws Exception {
final boolean watchTopicAdvisories = true; final boolean watchTopicAdvisories = true;
broker = createBroker(true); broker = createBroker(true);
@ -233,7 +236,7 @@ public class FailoverConsumerOutstandingCommitTest {
final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = producerSession.createQueue(QUEUE_NAME final Queue destination = producerSession.createQueue(QUEUE_NAME
+ "?consumer.prefetchSize=" + prefetch); + "?consumer.prefetchSize=" + prefetch);
final Queue signalDestination = producerSession.createQueue(QUEUE_NAME + ".signal" final Queue signalDestination = producerSession.createQueue(QUEUE_NAME + ".signal"
+ "?consumer.prefetchSize=" + prefetch); + "?consumer.prefetchSize=" + prefetch);
@ -280,7 +283,7 @@ public class FailoverConsumerOutstandingCommitTest {
// will be stopped by the plugin // will be stopped by the plugin
broker.waitUntilStopped(); broker.waitUntilStopped();
broker = createBroker(false); broker = createBroker(false, url);
broker.start(); broker.start();
assertTrue("commit done through failover", commitDoneLatch.await(20, TimeUnit.SECONDS)); assertTrue("commit done through failover", commitDoneLatch.await(20, TimeUnit.SECONDS));
@ -291,8 +294,8 @@ public class FailoverConsumerOutstandingCommitTest {
assertEquals("get message 0 second", MESSAGE_TEXT + "0", receivedMessages.get(1).getText()); assertEquals("get message 0 second", MESSAGE_TEXT + "0", receivedMessages.get(1).getText());
assertTrue("another message was received", messagesReceived.await(20, TimeUnit.SECONDS)); assertTrue("another message was received", messagesReceived.await(20, TimeUnit.SECONDS));
assertEquals("get message 1 eventually", MESSAGE_TEXT + "1", receivedMessages.get(2).getText()); assertEquals("get message 1 eventually", MESSAGE_TEXT + "1", receivedMessages.get(2).getText());
connection.close(); connection.close();
} }
@ -312,28 +315,28 @@ public class FailoverConsumerOutstandingCommitTest {
final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED); final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
final MessageConsumer testConsumer = consumerSession.createConsumer(destination); final MessageConsumer testConsumer = consumerSession.createConsumer(destination);
assertNull("no message yet", testConsumer.receiveNoWait()); assertNull("no message yet", testConsumer.receiveNoWait());
produceMessage(producerSession, destination, 1); produceMessage(producerSession, destination, 1);
producerSession.close(); producerSession.close();
// consume then rollback after restart // consume then rollback after restart
Message msg = testConsumer.receive(5000); Message msg = testConsumer.receive(5000);
assertNotNull(msg); assertNotNull(msg);
// restart with outstanding delivered message // restart with outstanding delivered message
broker.stop(); broker.stop();
broker.waitUntilStopped(); broker.waitUntilStopped();
broker = createBroker(false); broker = createBroker(false, url);
broker.start(); broker.start();
consumerSession.rollback(); consumerSession.rollback();
// receive again // receive again
msg = testConsumer.receive(10000); msg = testConsumer.receive(10000);
assertNotNull("got message again after rollback", msg); assertNotNull("got message again after rollback", msg);
consumerSession.commit(); consumerSession.commit();
// close before sweep // close before sweep
consumerSession.close(); consumerSession.close();
msg = receiveMessage(cf, destination); msg = receiveMessage(cf, destination);

View File

@ -51,51 +51,55 @@ import org.junit.Test;
// see https://issues.apache.org/activemq/browse/AMQ-2573 // see https://issues.apache.org/activemq/browse/AMQ-2573
public class FailoverConsumerUnconsumedTest { public class FailoverConsumerUnconsumedTest {
private static final Logger LOG = LoggerFactory.getLogger(FailoverConsumerUnconsumedTest.class); private static final Logger LOG = LoggerFactory.getLogger(FailoverConsumerUnconsumedTest.class);
private static final String QUEUE_NAME = "FailoverWithUnconsumed"; private static final String QUEUE_NAME = "FailoverWithUnconsumed";
private String url = "tcp://localhost:61616"; private static final String TRANSPORT_URI = "tcp://localhost:0";
final int prefetch = 10; private String url;
BrokerService broker; final int prefetch = 10;
BrokerService broker;
public void startCleanBroker() throws Exception {
startBroker(true); @After
} public void stopBroker() throws Exception {
if (broker != null) {
@After broker.stop();
public void stopBroker() throws Exception { }
if (broker != null) { }
broker.stop();
} public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
} broker = createBroker(deleteAllMessagesOnStartup);
public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
broker = createBroker(deleteAllMessagesOnStartup);
broker.start(); broker.start();
} }
public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception { public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
broker = new BrokerService(); return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI);
broker.addConnector(url); }
broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
return broker;
}
@Test public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
public void testFailoverConsumerDups() throws Exception { broker = new BrokerService();
doTestFailoverConsumerDups(true); broker.addConnector(bindAddress);
} broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
@Test this.url = broker.getTransportConnectors().get(0).getConnectUri().toString();
return broker;
}
@Test
public void testFailoverConsumerDups() throws Exception {
doTestFailoverConsumerDups(true);
}
@Test
public void testFailoverConsumerDupsNoAdvisoryWatch() throws Exception { public void testFailoverConsumerDupsNoAdvisoryWatch() throws Exception {
doTestFailoverConsumerDups(false); doTestFailoverConsumerDups(false);
} }
public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception { public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception {
final int maxConsumers = 4; final int maxConsumers = 4;
broker = createBroker(true); broker = createBroker(true);
broker.setPlugins(new BrokerPlugin[] { broker.setPlugins(new BrokerPlugin[] {
new BrokerPluginSupport() { new BrokerPluginSupport() {
int consumerCount; int consumerCount;
@ -106,7 +110,7 @@ public class FailoverConsumerUnconsumedTest {
final ConsumerInfo info) throws Exception { final ConsumerInfo info) throws Exception {
if (++consumerCount == maxConsumers + (watchTopicAdvisories ? 1:0)) { if (++consumerCount == maxConsumers + (watchTopicAdvisories ? 1:0)) {
context.setDontSendReponse(true); context.setDontSendReponse(true);
Executors.newSingleThreadExecutor().execute(new Runnable() { Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() { public void run() {
LOG.info("Stopping broker on consumer: " + info.getConsumerId()); LOG.info("Stopping broker on consumer: " + info.getConsumerId());
try { try {
@ -122,13 +126,13 @@ public class FailoverConsumerUnconsumedTest {
} }
}); });
broker.start(); broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
cf.setWatchTopicAdvisories(watchTopicAdvisories); cf.setWatchTopicAdvisories(watchTopicAdvisories);
final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
connection.start(); connection.start();
final Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); final Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = consumerSession.createQueue(QUEUE_NAME + "?jms.consumer.prefetch=" + prefetch); final Queue destination = consumerSession.createQueue(QUEUE_NAME + "?jms.consumer.prefetch=" + prefetch);
@ -136,9 +140,9 @@ public class FailoverConsumerUnconsumedTest {
for (int i=0; i<maxConsumers -1; i++) { for (int i=0; i<maxConsumers -1; i++) {
testConsumers.add(new TestConsumer(consumerSession, destination, connection)); testConsumers.add(new TestConsumer(consumerSession, destination, connection));
} }
produceMessage(consumerSession, destination, maxConsumers * prefetch); produceMessage(consumerSession, destination, maxConsumers * prefetch);
assertTrue("add messages are dispatched", Wait.waitFor(new Wait.Condition() { assertTrue("add messages are dispatched", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
int totalUnconsumed = 0; int totalUnconsumed = 0;
@ -146,14 +150,14 @@ public class FailoverConsumerUnconsumedTest {
long unconsumed = testConsumer.unconsumedSize(); long unconsumed = testConsumer.unconsumedSize();
LOG.info(testConsumer.getConsumerId() + " unconsumed: " + unconsumed); LOG.info(testConsumer.getConsumerId() + " unconsumed: " + unconsumed);
totalUnconsumed += unconsumed; totalUnconsumed += unconsumed;
} }
return totalUnconsumed == (maxConsumers-1) * prefetch; return totalUnconsumed == (maxConsumers-1) * prefetch;
} }
})); }));
final CountDownLatch commitDoneLatch = new CountDownLatch(1); final CountDownLatch commitDoneLatch = new CountDownLatch(1);
Executors.newSingleThreadExecutor().execute(new Runnable() { Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() { public void run() {
try { try {
LOG.info("add last consumer..."); LOG.info("add last consumer...");
@ -165,7 +169,7 @@ public class FailoverConsumerUnconsumedTest {
} }
} }
}); });
// will be stopped by the plugin // will be stopped by the plugin
broker.waitUntilStopped(); broker.waitUntilStopped();
@ -182,11 +186,11 @@ public class FailoverConsumerUnconsumedTest {
} }
})); }));
broker = createBroker(false); broker = createBroker(false, this.url);
broker.start(); broker.start();
assertTrue("consumer added through failover", commitDoneLatch.await(30, TimeUnit.SECONDS)); assertTrue("consumer added through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
// each should again get prefetch messages - all unconsumed deliveries should be rolledback // each should again get prefetch messages - all unconsumed deliveries should be rolledback
assertTrue("after start all messages are re dispatched", Wait.waitFor(new Wait.Condition() { assertTrue("after start all messages are re dispatched", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
@ -195,14 +199,14 @@ public class FailoverConsumerUnconsumedTest {
long unconsumed = testConsumer.unconsumedSize(); long unconsumed = testConsumer.unconsumedSize();
LOG.info(testConsumer.getConsumerId() + " after restart: unconsumed: " + unconsumed); LOG.info(testConsumer.getConsumerId() + " after restart: unconsumed: " + unconsumed);
totalUnconsumed += unconsumed; totalUnconsumed += unconsumed;
} }
return totalUnconsumed == (maxConsumers) * prefetch; return totalUnconsumed == (maxConsumers) * prefetch;
} }
})); }));
connection.close(); connection.close();
} }
private void produceMessage(final Session producerSession, Queue destination, long count) private void produceMessage(final Session producerSession, Queue destination, long count)
throws JMSException { throws JMSException {
MessageProducer producer = producerSession.createProducer(destination); MessageProducer producer = producerSession.createProducer(destination);
@ -212,22 +216,22 @@ public class FailoverConsumerUnconsumedTest {
} }
producer.close(); producer.close();
} }
// allow access to unconsumedMessages // allow access to unconsumedMessages
class TestConsumer extends ActiveMQMessageConsumer { class TestConsumer extends ActiveMQMessageConsumer {
TestConsumer(Session consumerSession, Destination destination, ActiveMQConnection connection) throws Exception { TestConsumer(Session consumerSession, Destination destination, ActiveMQConnection connection) throws Exception {
super((ActiveMQSession) consumerSession, super((ActiveMQSession) consumerSession,
new ConsumerId(new SessionId(connection.getConnectionInfo().getConnectionId(),1), nextGen()), new ConsumerId(new SessionId(connection.getConnectionInfo().getConnectionId(),1), nextGen()),
ActiveMQMessageTransformation.transformDestination(destination), null, "", ActiveMQMessageTransformation.transformDestination(destination), null, "",
prefetch, -1, false, false, true, null); prefetch, -1, false, false, true, null);
} }
public int unconsumedSize() { public int unconsumedSize() {
return unconsumedMessages.size(); return unconsumedMessages.size();
} }
} }
static long idGen = 100; static long idGen = 100;
private static long nextGen() { private static long nextGen() {
idGen -=5; idGen -=5;

View File

@ -48,14 +48,11 @@ public class FailoverPrefetchZeroTest {
private static final Logger LOG = LoggerFactory.getLogger(FailoverPrefetchZeroTest.class); private static final Logger LOG = LoggerFactory.getLogger(FailoverPrefetchZeroTest.class);
private static final String QUEUE_NAME = "FailoverPrefetchZero"; private static final String QUEUE_NAME = "FailoverPrefetchZero";
private String url = "tcp://localhost:61616"; private static final String TRANSPORT_URI = "tcp://localhost:0";
private String url;
final int prefetch = 0; final int prefetch = 0;
BrokerService broker; BrokerService broker;
public void startCleanBroker() throws Exception {
startBroker(true);
}
@After @After
public void stopBroker() throws Exception { public void stopBroker() throws Exception {
if (broker != null) { if (broker != null) {
@ -69,9 +66,16 @@ public class FailoverPrefetchZeroTest {
} }
public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception { public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI);
}
public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
broker = new BrokerService(); broker = new BrokerService();
broker.addConnector(url); broker.addConnector(bindAddress);
broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup); broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
url = broker.getTransportConnectors().get(0).getConnectUri().toString();
return broker; return broker;
} }
@ -135,7 +139,7 @@ public class FailoverPrefetchZeroTest {
// will be stopped by the plugin // will be stopped by the plugin
assertTrue("pull completed on broker", pullDone.await(30, TimeUnit.SECONDS)); assertTrue("pull completed on broker", pullDone.await(30, TimeUnit.SECONDS));
broker.waitUntilStopped(); broker.waitUntilStopped();
broker = createBroker(false); broker = createBroker(false, url);
broker.start(); broker.start();
assertTrue("receive completed through failover", receiveDone.await(30, TimeUnit.SECONDS)); assertTrue("receive completed through failover", receiveDone.await(30, TimeUnit.SECONDS));

View File

@ -60,17 +60,14 @@ public class FailoverTransactionTest extends TestSupport {
private static final Logger LOG = LoggerFactory.getLogger(FailoverTransactionTest.class); private static final Logger LOG = LoggerFactory.getLogger(FailoverTransactionTest.class);
private static final String QUEUE_NAME = "FailoverWithTx"; private static final String QUEUE_NAME = "FailoverWithTx";
private String url = "tcp://localhost:61616"; private static final String TRANSPORT_URI = "tcp://localhost:0";
private String url;
BrokerService broker; BrokerService broker;
public static Test suite() { public static Test suite() {
return suite(FailoverTransactionTest.class); return suite(FailoverTransactionTest.class);
} }
public void startCleanBroker() throws Exception {
startBroker(true);
}
public void setUp() throws Exception { public void setUp() throws Exception {
super.setMaxTestTime(20 * 60 * 1000); // some boxes can be real slow super.setMaxTestTime(20 * 60 * 1000); // some boxes can be real slow
super.setAutoFail(true); super.setAutoFail(true);
@ -87,17 +84,33 @@ public class FailoverTransactionTest extends TestSupport {
} }
} }
private void startCleanBroker() throws Exception {
startBroker(true);
}
public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception { public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
broker = createBroker(deleteAllMessagesOnStartup); broker = createBroker(deleteAllMessagesOnStartup);
broker.start(); broker.start();
} }
public void startBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
broker = createBroker(deleteAllMessagesOnStartup, bindAddress);
broker.start();
}
public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception { public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI);
}
public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
broker = new BrokerService(); broker = new BrokerService();
broker.setUseJmx(false); broker.setUseJmx(false);
broker.setAdvisorySupport(false); broker.setAdvisorySupport(false);
broker.addConnector(url); broker.addConnector(bindAddress);
broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup); broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
url = broker.getTransportConnectors().get(0).getConnectUri().toString();
return broker; return broker;
} }
@ -114,7 +127,7 @@ public class FailoverTransactionTest extends TestSupport {
// restart to force failover and connection state recovery before the commit // restart to force failover and connection state recovery before the commit
broker.stop(); broker.stop();
startBroker(false); startBroker(false, url);
session.commit(); session.commit();
assertNotNull("we got the message", consumer.receive(20000)); assertNotNull("we got the message", consumer.receive(20000));
@ -182,7 +195,7 @@ public class FailoverTransactionTest extends TestSupport {
// will be stopped by the plugin // will be stopped by the plugin
broker.waitUntilStopped(); broker.waitUntilStopped();
broker = createBroker(false); broker = createBroker(false, url);
setDefaultPersistenceAdapter(broker); setDefaultPersistenceAdapter(broker);
broker.start(); broker.start();
@ -202,7 +215,7 @@ public class FailoverTransactionTest extends TestSupport {
broker.waitUntilStopped(); broker.waitUntilStopped();
LOG.info("Checking for remaining/hung messages.."); LOG.info("Checking for remaining/hung messages..");
broker = createBroker(false); broker = createBroker(false, url);
setDefaultPersistenceAdapter(broker); setDefaultPersistenceAdapter(broker);
broker.start(); broker.start();
@ -285,7 +298,7 @@ public class FailoverTransactionTest extends TestSupport {
// will be stopped by the plugin // will be stopped by the plugin
broker.waitUntilStopped(); broker.waitUntilStopped();
broker = createBroker(false); broker = createBroker(false, url);
setDefaultPersistenceAdapter(broker); setDefaultPersistenceAdapter(broker);
LOG.info("restarting...."); LOG.info("restarting....");
broker.start(); broker.start();
@ -309,7 +322,7 @@ public class FailoverTransactionTest extends TestSupport {
broker.waitUntilStopped(); broker.waitUntilStopped();
LOG.info("Checking for remaining/hung messages with second restart.."); LOG.info("Checking for remaining/hung messages with second restart..");
broker = createBroker(false); broker = createBroker(false, url);
setDefaultPersistenceAdapter(broker); setDefaultPersistenceAdapter(broker);
broker.start(); broker.start();
@ -430,7 +443,7 @@ public class FailoverTransactionTest extends TestSupport {
broker.waitUntilStopped(); broker.waitUntilStopped();
LOG.info("Checking for remaining/hung messages with restart.."); LOG.info("Checking for remaining/hung messages with restart..");
broker = createBroker(false); broker = createBroker(false, url);
setDefaultPersistenceAdapter(broker); setDefaultPersistenceAdapter(broker);
broker.start(); broker.start();
@ -462,7 +475,7 @@ public class FailoverTransactionTest extends TestSupport {
// restart to force failover and connection state recovery before the commit // restart to force failover and connection state recovery before the commit
broker.stop(); broker.stop();
startBroker(false); startBroker(false, url);
session.commit(); session.commit();
@ -493,7 +506,7 @@ public class FailoverTransactionTest extends TestSupport {
// restart to force failover and connection state recovery before the commit // restart to force failover and connection state recovery before the commit
broker.stop(); broker.stop();
startBroker(false); startBroker(false, url);
session.commit(); session.commit();
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
@ -543,7 +556,7 @@ public class FailoverTransactionTest extends TestSupport {
// restart to force failover and connection state recovery before the commit // restart to force failover and connection state recovery before the commit
broker.stop(); broker.stop();
startBroker(false); startBroker(false, url);
session.commit(); session.commit();
for (int i = 0; i < count - 1; i++) { for (int i = 0; i < count - 1; i++) {
@ -671,7 +684,7 @@ public class FailoverTransactionTest extends TestSupport {
// will be stopped by the plugin // will be stopped by the plugin
broker.waitUntilStopped(); broker.waitUntilStopped();
broker = createBroker(false); broker = createBroker(false, url);
setDefaultPersistenceAdapter(broker); setDefaultPersistenceAdapter(broker);
broker.start(); broker.start();
@ -708,7 +721,7 @@ public class FailoverTransactionTest extends TestSupport {
broker.waitUntilStopped(); broker.waitUntilStopped();
LOG.info("Checking for remaining/hung messages.."); LOG.info("Checking for remaining/hung messages..");
broker = createBroker(false); broker = createBroker(false, url);
setDefaultPersistenceAdapter(broker); setDefaultPersistenceAdapter(broker);
broker.start(); broker.start();
@ -744,7 +757,7 @@ public class FailoverTransactionTest extends TestSupport {
assertNotNull(msg); assertNotNull(msg);
broker.stop(); broker.stop();
broker = createBroker(false); broker = createBroker(false, url);
// use empty jdbc store so that default wait(0) for redeliveries will timeout after failover // use empty jdbc store so that default wait(0) for redeliveries will timeout after failover
setPersistenceAdapter(broker, PersistenceAdapterChoice.JDBC); setPersistenceAdapter(broker, PersistenceAdapterChoice.JDBC);
broker.start(); broker.start();
@ -756,7 +769,7 @@ public class FailoverTransactionTest extends TestSupport {
} }
broker.stop(); broker.stop();
broker = createBroker(false); broker = createBroker(false, url);
broker.start(); broker.start();
assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000)); assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000));
@ -784,7 +797,7 @@ public class FailoverTransactionTest extends TestSupport {
assertNotNull("got message just produced", msg); assertNotNull("got message just produced", msg);
broker.stop(); broker.stop();
broker = createBroker(false); broker = createBroker(false, url);
// use empty jdbc store so that wait for re-deliveries occur when failover resumes // use empty jdbc store so that wait for re-deliveries occur when failover resumes
setPersistenceAdapter(broker, PersistenceAdapterChoice.JDBC); setPersistenceAdapter(broker, PersistenceAdapterChoice.JDBC);
broker.start(); broker.start();
@ -803,7 +816,7 @@ public class FailoverTransactionTest extends TestSupport {
}); });
broker.stop(); broker.stop();
broker = createBroker(false); broker = createBroker(false, url);
broker.start(); broker.start();
assertTrue("commit was successful", commitDone.await(30, TimeUnit.SECONDS)); assertTrue("commit was successful", commitDone.await(30, TimeUnit.SECONDS));
@ -836,7 +849,7 @@ public class FailoverTransactionTest extends TestSupport {
MessageConsumer consumer2 = consumerSession.createConsumer(consumerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1")); MessageConsumer consumer2 = consumerSession.createConsumer(consumerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1"));
broker.stop(); broker.stop();
broker = createBroker(false); broker = createBroker(false, url);
broker.start(); broker.start();
final CountDownLatch commitDone = new CountDownLatch(1); final CountDownLatch commitDone = new CountDownLatch(1);
@ -851,7 +864,7 @@ public class FailoverTransactionTest extends TestSupport {
try { try {
consumerSession.commit(); consumerSession.commit();
} catch (JMSException ex) { } catch (JMSException ex) {
exceptions.add(ex); exceptions.add(ex);
} finally { } finally {
commitDone.countDown(); commitDone.countDown();
} }