From 883eed0e385e9695a3746f1dede0a15c9fc24427 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Wed, 23 Mar 2011 22:58:18 +0000 Subject: [PATCH] Update the tests so that they're not dependent on port 61616 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1084797 13f79535-47bb-0310-9956-ffa450edef68 --- ...FailoverConsumerOutstandingCommitTest.java | 137 +++++++++--------- .../FailoverConsumerUnconsumedTest.java | 122 ++++++++-------- .../failover/FailoverPrefetchZeroTest.java | 18 ++- .../failover/FailoverTransactionTest.java | 59 +++++--- 4 files changed, 180 insertions(+), 156 deletions(-) diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java index a071d13a50..ff38fd9ebc 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java @@ -52,55 +52,58 @@ import org.junit.After; import org.junit.Test; public class FailoverConsumerOutstandingCommitTest { - - private static final Logger LOG = LoggerFactory.getLogger(FailoverConsumerOutstandingCommitTest.class); - private static final String QUEUE_NAME = "FailoverWithOutstandingCommit"; - private static final String MESSAGE_TEXT = "Test message "; - private String url = "tcp://localhost:61616"; - final int prefetch = 10; - BrokerService broker; - - public void startCleanBroker() throws Exception { - startBroker(true); - } - - @After - public void stopBroker() throws Exception { - if (broker != null) { - broker.stop(); - } - } - - public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception { - broker = createBroker(deleteAllMessagesOnStartup); - broker.start(); - } - public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception { - broker = new BrokerService(); - broker.addConnector(url); - broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup); - PolicyMap policyMap = new PolicyMap(); - PolicyEntry defaultEntry = new PolicyEntry(); - - // optimizedDispatche and sync dispatch ensure that the dispatch happens - // before the commit reply that the consumer.clearDispatchList is waiting for. - defaultEntry.setOptimizedDispatch(true); + private static final Logger LOG = LoggerFactory.getLogger(FailoverConsumerOutstandingCommitTest.class); + private static final String QUEUE_NAME = "FailoverWithOutstandingCommit"; + private static final String MESSAGE_TEXT = "Test message "; + private static final String TRANSPORT_URI = "tcp://localhost:0"; + private String url; + final int prefetch = 10; + BrokerService broker; + + @After + public void stopBroker() throws Exception { + if (broker != null) { + broker.stop(); + } + } + + public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception { + broker = createBroker(deleteAllMessagesOnStartup); + broker.start(); + } + + public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception { + return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI); + } + + public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception { + broker = new BrokerService(); + broker.addConnector(bindAddress); + broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + + // optimizedDispatche and sync dispatch ensure that the dispatch happens + // before the commit reply that the consumer.clearDispatchList is waiting for. + defaultEntry.setOptimizedDispatch(true); policyMap.setDefaultEntry(defaultEntry); broker.setDestinationPolicy(policyMap); - - return broker; - } - @Test - public void testFailoverConsumerDups() throws Exception { - doTestFailoverConsumerDups(true); - } - - public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception { - + url = broker.getTransportConnectors().get(0).getConnectUri().toString(); + + return broker; + } + + @Test + public void testFailoverConsumerDups() throws Exception { + doTestFailoverConsumerDups(true); + } + + public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception { + broker = createBroker(true); - + broker.setPlugins(new BrokerPlugin[] { new BrokerPluginSupport() { @Override @@ -108,7 +111,7 @@ public class FailoverConsumerOutstandingCommitTest { TransactionId xid, boolean onePhase) throws Exception { // so commit will hang as if reply is lost context.setDontSendReponse(true); - Executors.newSingleThreadExecutor().execute(new Runnable() { + Executors.newSingleThreadExecutor().execute(new Runnable() { public void run() { LOG.info("Stopping broker before commit..."); try { @@ -122,17 +125,17 @@ public class FailoverConsumerOutstandingCommitTest { } }); broker.start(); - + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); cf.setWatchTopicAdvisories(watchTopicAdvisories); cf.setDispatchAsync(false); - + final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); connection.start(); - + final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=" + prefetch); - + final Session consumerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); @@ -144,9 +147,9 @@ public class FailoverConsumerOutstandingCommitTest { public void onMessage(Message message) { LOG.info("consume one and commit"); - + assertNotNull("got message", message); - + try { consumerSession.commit(); } catch (JMSException e) { @@ -157,7 +160,7 @@ public class FailoverConsumerOutstandingCommitTest { LOG.info("done commit"); } }); - + // may block if broker shutodwn happens quickly Executors.newSingleThreadExecutor().execute(new Runnable() { public void run() { @@ -171,15 +174,15 @@ public class FailoverConsumerOutstandingCommitTest { LOG.info("producer done"); } }); - + // will be stopped by the plugin broker.waitUntilStopped(); - broker = createBroker(false); + broker = createBroker(false, url); broker.start(); assertTrue("consumer added through failover", commitDoneLatch.await(20, TimeUnit.SECONDS)); assertTrue("another message was recieved after failover", messagesReceived.await(20, TimeUnit.SECONDS)); - + connection.close(); } @@ -187,12 +190,12 @@ public class FailoverConsumerOutstandingCommitTest { public void TestFailoverConsumerOutstandingSendTxIncomplete() throws Exception { doTestFailoverConsumerOutstandingSendTx(false); } - + @Test public void TestFailoverConsumerOutstandingSendTxComplete() throws Exception { doTestFailoverConsumerOutstandingSendTx(true); } - + public void doTestFailoverConsumerOutstandingSendTx(final boolean doActualBrokerCommit) throws Exception { final boolean watchTopicAdvisories = true; broker = createBroker(true); @@ -233,7 +236,7 @@ public class FailoverConsumerOutstandingCommitTest { final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=" + prefetch); - + final Queue signalDestination = producerSession.createQueue(QUEUE_NAME + ".signal" + "?consumer.prefetchSize=" + prefetch); @@ -280,7 +283,7 @@ public class FailoverConsumerOutstandingCommitTest { // will be stopped by the plugin broker.waitUntilStopped(); - broker = createBroker(false); + broker = createBroker(false, url); broker.start(); assertTrue("commit done through failover", commitDoneLatch.await(20, TimeUnit.SECONDS)); @@ -291,8 +294,8 @@ public class FailoverConsumerOutstandingCommitTest { assertEquals("get message 0 second", MESSAGE_TEXT + "0", receivedMessages.get(1).getText()); assertTrue("another message was received", messagesReceived.await(20, TimeUnit.SECONDS)); assertEquals("get message 1 eventually", MESSAGE_TEXT + "1", receivedMessages.get(2).getText()); - - + + connection.close(); } @@ -312,28 +315,28 @@ public class FailoverConsumerOutstandingCommitTest { final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED); final MessageConsumer testConsumer = consumerSession.createConsumer(destination); assertNull("no message yet", testConsumer.receiveNoWait()); - + produceMessage(producerSession, destination, 1); producerSession.close(); // consume then rollback after restart Message msg = testConsumer.receive(5000); assertNotNull(msg); - + // restart with outstanding delivered message broker.stop(); broker.waitUntilStopped(); - broker = createBroker(false); + broker = createBroker(false, url); broker.start(); - + consumerSession.rollback(); - + // receive again msg = testConsumer.receive(10000); assertNotNull("got message again after rollback", msg); consumerSession.commit(); - + // close before sweep consumerSession.close(); msg = receiveMessage(cf, destination); diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java index de10509775..c743615761 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java @@ -51,51 +51,55 @@ import org.junit.Test; // see https://issues.apache.org/activemq/browse/AMQ-2573 public class FailoverConsumerUnconsumedTest { - + private static final Logger LOG = LoggerFactory.getLogger(FailoverConsumerUnconsumedTest.class); - private static final String QUEUE_NAME = "FailoverWithUnconsumed"; - private String url = "tcp://localhost:61616"; - final int prefetch = 10; - BrokerService broker; - - public void startCleanBroker() throws Exception { - startBroker(true); - } - - @After - public void stopBroker() throws Exception { - if (broker != null) { - broker.stop(); - } - } - - public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception { - broker = createBroker(deleteAllMessagesOnStartup); + private static final String QUEUE_NAME = "FailoverWithUnconsumed"; + private static final String TRANSPORT_URI = "tcp://localhost:0"; + private String url; + final int prefetch = 10; + BrokerService broker; + + @After + public void stopBroker() throws Exception { + if (broker != null) { + broker.stop(); + } + } + + public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception { + broker = createBroker(deleteAllMessagesOnStartup); broker.start(); - } + } - public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception { - broker = new BrokerService(); - broker.addConnector(url); - broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup); - return broker; - } + public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception { + return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI); + } - @Test - public void testFailoverConsumerDups() throws Exception { - doTestFailoverConsumerDups(true); - } - - @Test + public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception { + broker = new BrokerService(); + broker.addConnector(bindAddress); + broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup); + + this.url = broker.getTransportConnectors().get(0).getConnectUri().toString(); + + return broker; + } + + @Test + public void testFailoverConsumerDups() throws Exception { + doTestFailoverConsumerDups(true); + } + + @Test public void testFailoverConsumerDupsNoAdvisoryWatch() throws Exception { doTestFailoverConsumerDups(false); } - - public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception { - - final int maxConsumers = 4; + + public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception { + + final int maxConsumers = 4; broker = createBroker(true); - + broker.setPlugins(new BrokerPlugin[] { new BrokerPluginSupport() { int consumerCount; @@ -106,7 +110,7 @@ public class FailoverConsumerUnconsumedTest { final ConsumerInfo info) throws Exception { if (++consumerCount == maxConsumers + (watchTopicAdvisories ? 1:0)) { context.setDontSendReponse(true); - Executors.newSingleThreadExecutor().execute(new Runnable() { + Executors.newSingleThreadExecutor().execute(new Runnable() { public void run() { LOG.info("Stopping broker on consumer: " + info.getConsumerId()); try { @@ -122,13 +126,13 @@ public class FailoverConsumerUnconsumedTest { } }); broker.start(); - + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); cf.setWatchTopicAdvisories(watchTopicAdvisories); - + final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); connection.start(); - + final Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); final Queue destination = consumerSession.createQueue(QUEUE_NAME + "?jms.consumer.prefetch=" + prefetch); @@ -136,9 +140,9 @@ public class FailoverConsumerUnconsumedTest { for (int i=0; i