diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java index a7a525e575..570b5f1310 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java @@ -31,6 +31,7 @@ import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.Wait; import org.apache.activemq.xbean.BrokerFactoryBean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -134,28 +135,38 @@ abstract public class QueueMasterSlaveTestSupport extends JmsTopicSendReceiveWit assertTrue("slave started", slaveStarted.await(60, TimeUnit.SECONDS)); assertTrue(!slave.get().isSlave()); + LOG.info("Sending post failover message to VT"); + final String text = "ForUWhenSlaveKicksIn"; producer.send(new ActiveMQTopic("VirtualTopic.TA1"), session.createTextMessage(text)); + // dest must survive failover - consumer created after send qConsumer = session.createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.TA1")); - javax.jms.Message message = qConsumer.receive(20000); + javax.jms.Message message = qConsumer.receive(10000); assertNotNull("Get message after failover", message); assertEquals("correct message", text, ((TextMessage)message).getText()); } public void testAdvisory() throws Exception { - MessageConsumer advConsumer = session.createConsumer(AdvisorySupport.getMasterBrokerAdvisoryTopic()); - Message advisoryMessage = advConsumer.receive(5000); - LOG.info("received " + advisoryMessage); - assertNotNull("Didn't received advisory", advisoryMessage); + final MessageConsumer advConsumer = session.createConsumer(AdvisorySupport.getMasterBrokerAdvisoryTopic()); + final Message[] advisoryMessage = new Message[1]; + advisoryMessage[0] = advConsumer.receive(5000); + LOG.info("received " + advisoryMessage[0]); + assertNotNull("Didn't received advisory", advisoryMessage[0]); master.stop(); assertTrue("slave started", slaveStarted.await(60, TimeUnit.SECONDS)); LOG.info("slave started"); - advisoryMessage = advConsumer.receive(20000); - LOG.info("received " + advisoryMessage); - assertNotNull("Didn't received advisory", advisoryMessage); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + advisoryMessage[0] = advConsumer.receive(500); + return advisoryMessage[0] != null; + } + }); + LOG.info("received " + advisoryMessage[0]); + assertNotNull("Didn't received advisory", advisoryMessage[0]); } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java index 4cfb5e3f3e..c14506f53f 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.ft; +import java.io.File; import java.io.IOException; import java.net.URI; import javax.sql.DataSource; @@ -33,11 +34,13 @@ public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSup protected DataSource sharedDs; protected String MASTER_URL = "tcp://localhost:62001"; protected String SLAVE_URL = "tcp://localhost:62002"; + File sharedDbDirFile; @Override protected void setUp() throws Exception { // startup db sharedDs = new SyncCreateDataSource((EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory())); + sharedDbDirFile = new File(new File(IOHelper.getDefaultDataDirectory()), "sharedKahaDB"); super.setUp(); } @@ -47,6 +50,7 @@ public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSup DataSourceServiceSupport.shutdownDefaultDataSource(((SyncCreateDataSource)sharedDs).getDelegate()); } + @Override protected void createMaster() throws Exception { master = new BrokerService(); master.setBrokerName("master"); @@ -55,6 +59,7 @@ public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSup master.setPersistent(true); master.setDeleteAllMessagesOnStartup(true); KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) master.getPersistenceAdapter(); + kahaDBPersistenceAdapter.setDirectory(sharedDbDirFile); LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker(); leaseDatabaseLocker.setCreateTablesOnStartup(true); leaseDatabaseLocker.setDataSource(getExistingDataSource()); @@ -63,6 +68,7 @@ public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSup configureLocker(kahaDBPersistenceAdapter); configureBroker(master); master.start(); + master.waitUntilStarted(); } protected void configureBroker(BrokerService brokerService) { @@ -86,14 +92,15 @@ public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSup broker.setUseJmx(false); broker.setPersistent(true); KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); + kahaDBPersistenceAdapter.setDirectory(sharedDbDirFile); LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker(); leaseDatabaseLocker.setDataSource(getExistingDataSource()); leaseDatabaseLocker.setStatements(new Statements()); kahaDBPersistenceAdapter.setLocker(leaseDatabaseLocker); configureLocker(kahaDBPersistenceAdapter); configureBroker(broker); - broker.start(); slave.set(broker); + broker.start(); slaveStarted.countDown(); } catch (IllegalStateException expectedOnShutdown) { } catch (Exception e) {