https://issues.apache.org/jira/browse/AMQ-5164 https://issues.apache.org/jira/browse/AMQ-4842 - master slave advisory needed to wait for failover reconnect - exponential backoff was sometimes giving to long a delay. Retroactive override needs to applied after policies. Tests reinstated

This commit is contained in:
gtully 2015-05-20 09:56:38 +01:00
parent baa10ed33e
commit 9ad65c62ed
4 changed files with 22 additions and 27 deletions

View File

@ -87,19 +87,18 @@ public class Topic extends BaseDestination implements Task {
DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
super(brokerService, store, destination, parentStats);
this.topicStore = store;
// set default subscription recovery policy
if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) {
subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
setAlwaysRetroactive(true);
} else {
subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null);
}
subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null);
this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName());
}
@Override
public void initialize() throws Exception {
super.initialize();
// set non default subscription recovery policy (override policyEntries)
if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) {
subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
setAlwaysRetroactive(true);
}
if (store != null) {
// AMQ-2586: Better to leave this stat at zero than to give the user
// misleading metrics.

View File

@ -28,7 +28,7 @@ import org.junit.Ignore;
public class QueueMasterSlaveSingleUrlTest extends QueueMasterSlaveTestSupport {
private final String brokerUrl = "tcp://localhost:62001";
private final String singleUriString = "failover://(" + brokerUrl +")?randomize=false";
private final String singleUriString = "failover://(" + brokerUrl +")?randomize=false&useExponentialBackOff=false";
@Override
protected void setUp() throws Exception {
@ -81,15 +81,4 @@ public class QueueMasterSlaveSingleUrlTest extends QueueMasterSlaveTestSupport {
}).start();
}
// The @Ignore is just here for documentation, since this is a JUnit3 test
// I added the sleep because without it the two other test cases fail. I haven't looked into it, but
// my guess whatever setUp does isn't really finished when the teardown runs.
@Ignore("See https://issues.apache.org/jira/browse/AMQ-5164")
@Override
public void testAdvisory() throws Exception {
Thread.sleep(5 * 1000);
//super.testAdvisory();
}
}

View File

@ -44,13 +44,15 @@ abstract public class QueueMasterSlaveTestSupport extends JmsTopicSendReceiveWit
protected BrokerService master;
protected AtomicReference<BrokerService> slave = new AtomicReference<BrokerService>();
protected CountDownLatch slaveStarted = new CountDownLatch(1);
protected CountDownLatch slaveStarted;
protected int inflightMessageCount;
protected int failureCount = 50;
protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false&useExponentialBackOff=false";
@Override
protected void setUp() throws Exception {
slaveStarted = new CountDownLatch(1);
slave.set(null);
setMaxTestTime(TimeUnit.MINUTES.toMillis(10));
setAutoFail(true);
if (System.getProperty("basedir") == null) {
@ -137,18 +139,21 @@ abstract public class QueueMasterSlaveTestSupport extends JmsTopicSendReceiveWit
qConsumer = session.createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.TA1"));
javax.jms.Message message = qConsumer.receive(4000);
javax.jms.Message message = qConsumer.receive(20000);
assertNotNull("Get message after failover", message);
assertEquals("correct message", text, ((TextMessage)message).getText());
}
public void testAdvisory() throws Exception {
MessageConsumer advConsumer = session.createConsumer(AdvisorySupport.getMasterBrokerAdvisoryTopic());
Message advisoryMessage = advConsumer.receive(5000);
LOG.info("received " + advisoryMessage);
assertNotNull("Didn't received advisory", advisoryMessage);
master.stop();
assertTrue("slave started", slaveStarted.await(60, TimeUnit.SECONDS));
LOG.info("slave started");
Message advisoryMessage = advConsumer.receive(5000);
advisoryMessage = advConsumer.receive(20000);
LOG.info("received " + advisoryMessage);
assertNotNull("Didn't received advisory", advisoryMessage);

View File

@ -34,12 +34,19 @@ public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSup
protected String MASTER_URL = "tcp://localhost:62001";
protected String SLAVE_URL = "tcp://localhost:62002";
@Override
protected void setUp() throws Exception {
// startup db
sharedDs = new SyncCreateDataSource((EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()));
super.setUp();
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
DataSourceServiceSupport.shutdownDefaultDataSource(((SyncCreateDataSource)sharedDs).getDelegate());
}
protected void createMaster() throws Exception {
master = new BrokerService();
master.setBrokerName("master");
@ -102,11 +109,6 @@ public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSup
kahaDBPersistenceAdapter.getLocker().setLockAcquireSleepInterval(500);
}
@Override
public void testVirtualTopicFailover() throws Exception {
// Ignoring for now, see AMQ-4842
}
protected DataSource getExistingDataSource() throws Exception {
return sharedDs;
}