https://issues.apache.org/jira/browse/AMQ-4842 - the store needed to be shared between master and slave

This commit is contained in:
gtully 2015-05-20 13:42:49 +01:00
parent 9ae5b4147b
commit c07d6c841d
2 changed files with 27 additions and 9 deletions

View File

@ -31,6 +31,7 @@ import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -134,28 +135,38 @@ abstract public class QueueMasterSlaveTestSupport extends JmsTopicSendReceiveWit
assertTrue("slave started", slaveStarted.await(60, TimeUnit.SECONDS));
assertTrue(!slave.get().isSlave());
LOG.info("Sending post failover message to VT");
final String text = "ForUWhenSlaveKicksIn";
producer.send(new ActiveMQTopic("VirtualTopic.TA1"), session.createTextMessage(text));
// dest must survive failover - consumer created after send
qConsumer = session.createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.TA1"));
javax.jms.Message message = qConsumer.receive(20000);
javax.jms.Message message = qConsumer.receive(10000);
assertNotNull("Get message after failover", message);
assertEquals("correct message", text, ((TextMessage)message).getText());
}
public void testAdvisory() throws Exception {
MessageConsumer advConsumer = session.createConsumer(AdvisorySupport.getMasterBrokerAdvisoryTopic());
Message advisoryMessage = advConsumer.receive(5000);
LOG.info("received " + advisoryMessage);
assertNotNull("Didn't received advisory", advisoryMessage);
final MessageConsumer advConsumer = session.createConsumer(AdvisorySupport.getMasterBrokerAdvisoryTopic());
final Message[] advisoryMessage = new Message[1];
advisoryMessage[0] = advConsumer.receive(5000);
LOG.info("received " + advisoryMessage[0]);
assertNotNull("Didn't received advisory", advisoryMessage[0]);
master.stop();
assertTrue("slave started", slaveStarted.await(60, TimeUnit.SECONDS));
LOG.info("slave started");
advisoryMessage = advConsumer.receive(20000);
LOG.info("received " + advisoryMessage);
assertNotNull("Didn't received advisory", advisoryMessage);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
advisoryMessage[0] = advConsumer.receive(500);
return advisoryMessage[0] != null;
}
});
LOG.info("received " + advisoryMessage[0]);
assertNotNull("Didn't received advisory", advisoryMessage[0]);
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.ft;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import javax.sql.DataSource;
@ -33,11 +34,13 @@ public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSup
protected DataSource sharedDs;
protected String MASTER_URL = "tcp://localhost:62001";
protected String SLAVE_URL = "tcp://localhost:62002";
File sharedDbDirFile;
@Override
protected void setUp() throws Exception {
// startup db
sharedDs = new SyncCreateDataSource((EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()));
sharedDbDirFile = new File(new File(IOHelper.getDefaultDataDirectory()), "sharedKahaDB");
super.setUp();
}
@ -47,6 +50,7 @@ public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSup
DataSourceServiceSupport.shutdownDefaultDataSource(((SyncCreateDataSource)sharedDs).getDelegate());
}
@Override
protected void createMaster() throws Exception {
master = new BrokerService();
master.setBrokerName("master");
@ -55,6 +59,7 @@ public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSup
master.setPersistent(true);
master.setDeleteAllMessagesOnStartup(true);
KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) master.getPersistenceAdapter();
kahaDBPersistenceAdapter.setDirectory(sharedDbDirFile);
LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
leaseDatabaseLocker.setCreateTablesOnStartup(true);
leaseDatabaseLocker.setDataSource(getExistingDataSource());
@ -63,6 +68,7 @@ public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSup
configureLocker(kahaDBPersistenceAdapter);
configureBroker(master);
master.start();
master.waitUntilStarted();
}
protected void configureBroker(BrokerService brokerService) {
@ -86,14 +92,15 @@ public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSup
broker.setUseJmx(false);
broker.setPersistent(true);
KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
kahaDBPersistenceAdapter.setDirectory(sharedDbDirFile);
LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
leaseDatabaseLocker.setDataSource(getExistingDataSource());
leaseDatabaseLocker.setStatements(new Statements());
kahaDBPersistenceAdapter.setLocker(leaseDatabaseLocker);
configureLocker(kahaDBPersistenceAdapter);
configureBroker(broker);
broker.start();
slave.set(broker);
broker.start();
slaveStarted.countDown();
} catch (IllegalStateException expectedOnShutdown) {
} catch (Exception e) {