https://issues.apache.org/jira/browse/AMQ-4108 - master broker advisory topic needs to be retroactive

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1399302 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2012-10-17 15:16:01 +00:00
parent be8e58be14
commit b5e46ef9c5
2 changed files with 20 additions and 1 deletions

View File

@ -27,10 +27,12 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
@ -84,7 +86,12 @@ public class Topic extends BaseDestination implements Task {
super(brokerService, store, destination, parentStats);
this.topicStore = store;
// set default subscription recovery policy
subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) {
subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
setAlwaysRetroactive(true);
} else {
subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
}
this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName());
}

View File

@ -21,11 +21,13 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@ -132,4 +134,14 @@ public class QueueMasterSlaveTest extends JmsTopicSendReceiveWithTwoConnectionsT
assertNotNull("Get message after failover", message);
assertEquals("correct message", text, ((TextMessage)message).getText());
}
public void testAdvisory() throws Exception {
MessageConsumer advConsumer = session.createConsumer(AdvisorySupport.getMasterBrokerAdvisoryTopic());
master.stop();
assertTrue("slave started", slaveStarted.await(15, TimeUnit.SECONDS));
Message advisoryMessage = advConsumer.receive(5000);
assertNotNull("Didn't received advisory", advisoryMessage);
}
}