https://issues.apache.org/jira/browse/AMQ-3135 - replay network bridge advisories for new consumers

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1146717 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2011-07-14 13:55:49 +00:00
parent 48fa20a7bd
commit 7880521a1d
3 changed files with 67 additions and 0 deletions

View File

@ -52,6 +52,7 @@ public class AdvisoryBroker extends BrokerFilter {
protected final ConcurrentHashMap<ConsumerId, ConsumerInfo> consumers = new ConcurrentHashMap<ConsumerId, ConsumerInfo>();
protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>();
protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
protected final ConcurrentHashMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>();
protected final ProducerId advisoryProducerId = new ProducerId();
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
@ -124,6 +125,15 @@ public class AdvisoryBroker extends BrokerFilter {
fireConsumerAdvisory(context,value.getDestination(), topic, value, info.getConsumerId());
}
}
// Replay network bridges
if (AdvisorySupport.isNetworkBridgeAdvisoryTopic(info.getDestination())) {
for (Iterator<BrokerInfo> iter = networkBridges.keySet().iterator(); iter.hasNext();) {
BrokerInfo key = iter.next();
ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
fireAdvisory(context, topic, key, null, networkBridges.get(key));
}
}
}
return answer;
}
@ -399,6 +409,7 @@ public class AdvisoryBroker extends BrokerFilter {
advisoryMessage.setBooleanProperty("started", true);
advisoryMessage.setBooleanProperty("createdByDuplex", createdByDuplex);
advisoryMessage.setStringProperty("remoteIp", remoteIp);
networkBridges.putIfAbsent(brokerInfo, advisoryMessage);
ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
@ -418,6 +429,7 @@ public class AdvisoryBroker extends BrokerFilter {
if (brokerInfo != null) {
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setBooleanProperty("started", false);
networkBridges.remove(brokerInfo);
ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();

View File

@ -452,6 +452,24 @@ public final class AdvisorySupport {
}
}
public static boolean isNetworkBridgeAdvisoryTopic(Destination destination) throws JMSException {
return isNetworkBridgeAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
}
public static boolean isNetworkBridgeAdvisoryTopic(ActiveMQDestination destination) {
if (destination.isComposite()) {
ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
for (int i = 0; i < compositeDestinations.length; i++) {
if (isNetworkBridgeAdvisoryTopic(compositeDestinations[i])) {
return true;
}
}
return false;
} else {
return destination.isTopic() && destination.getPhysicalName().startsWith(NETWORK_BRIDGE_TOPIC_PREFIX);
}
}
/**
* Returns the agent topic which is used to send commands to the broker
*/

View File

@ -61,6 +61,43 @@ public class AdvisoryNetworkBridgeTest extends TestCase {
assertNotNull(advisory);
assertTrue(advisory.getDataStructure() instanceof BrokerInfo);
assertFalse(advisory.getBooleanProperty("started"));
conn.close();
}
public void testAddConsumerLater() throws Exception {
createBroker1();
createBroker2();
Thread.sleep(1000);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://broker1");
Connection conn = factory.createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
MessageConsumer consumer = sess.createConsumer(AdvisorySupport.getNetworkBridgeAdvisoryTopic());
ActiveMQMessage advisory = (ActiveMQMessage)consumer.receive(2000);
assertNotNull(advisory);
assertTrue(advisory.getDataStructure() instanceof BrokerInfo);
assertTrue(advisory.getBooleanProperty("started"));
assertCreatedByDuplex(advisory.getBooleanProperty("createdByDuplex"));
broker2.stop();
broker2.waitUntilStopped();
advisory = (ActiveMQMessage)consumer.receive(2000);
assertNotNull(advisory);
assertTrue(advisory.getDataStructure() instanceof BrokerInfo);
assertFalse(advisory.getBooleanProperty("started"));
consumer = sess.createConsumer(AdvisorySupport.getNetworkBridgeAdvisoryTopic());
advisory = (ActiveMQMessage)consumer.receive(1000);
assertNull(advisory);
conn.close();
}
public void assertCreatedByDuplex(boolean createdByDuplex) {