Fixing the removal logic on virtual destination remove inside of
Advisory Broker to clean up virtual destination maps properly.  Added a
test to verify.  Also added new debug logging to help track down any
future issues.
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-03-09 19:52:01 +00:00
parent 8b23e072ee
commit a2781e3966
3 changed files with 127 additions and 25 deletions

View File

@ -289,11 +289,10 @@ public class AdvisoryBroker extends BrokerFilter {
//in case of multiple matches
VirtualConsumerPair key = new VirtualConsumerPair(virtualDestination, destination);
ConsumerInfo i = brokerConsumerDests.get(key);
if (consumerInfo.equals(i)) {
if (brokerConsumerDests.remove(key) != null) {
fireVirtualDestinationRemoveAdvisory(context, consumerInfo);
break;
}
if (consumerInfo.equals(i) && brokerConsumerDests.remove(key) != null) {
LOG.debug("Virtual consumer pair removed: {} for consumer: {} ", key, i);
fireVirtualDestinationRemoveAdvisory(context, consumerInfo);
break;
}
}
}
@ -549,6 +548,7 @@ public class AdvisoryBroker extends BrokerFilter {
super.virtualDestinationAdded(context, virtualDestination);
if (virtualDestinations.add(virtualDestination)) {
LOG.debug("Virtual destination added: {}", virtualDestination);
try {
// Don't advise advisory topics.
if (!AdvisorySupport.isAdvisoryTopic(virtualDestination.getVirtualDestination())) {
@ -592,20 +592,25 @@ public class AdvisoryBroker extends BrokerFilter {
//if no consumer info, we need to create one - this is the case when an advisory is fired
//because of the existence of a destination matching a virtual destination
if (info == null) {
ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
SessionId sessionId = new SessionId(connectionId, sessionIdGenerator.getNextSequenceId());
ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
info = new ConsumerInfo(consumerId);
//store the virtual destination and the activeMQDestination as a pair so that we can keep track
//of all matching forwarded destinations that caused demand
if(brokerConsumerDests.putIfAbsent(new VirtualConsumerPair(virtualDestination, activeMQDest), info) == null) {
info.setDestination(virtualDestination.getVirtualDestination());
ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
VirtualConsumerPair pair = new VirtualConsumerPair(virtualDestination, activeMQDest);
if (brokerConsumerDests.get(pair) == null) {
ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
SessionId sessionId = new SessionId(connectionId, sessionIdGenerator.getNextSequenceId());
ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
info = new ConsumerInfo(consumerId);
if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
fireConsumerAdvisory(context, info.getDestination(), topic, info);
if(brokerConsumerDests.putIfAbsent(pair, info) == null) {
LOG.debug("Virtual consumer pair added: {} for consumer: {} ", pair, info);
info.setDestination(virtualDestination.getVirtualDestination());
ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
LOG.debug("Virtual consumer added: {}, for virtual destination: {}", info, virtualDestination);
fireConsumerAdvisory(context, info.getDestination(), topic, info);
}
}
}
//this is the case of a real consumer coming online
@ -615,6 +620,7 @@ public class AdvisoryBroker extends BrokerFilter {
ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
LOG.debug("Virtual consumer added: {}, for virtual destination: {}", info, virtualDestination);
fireConsumerAdvisory(context, info.getDestination(), topic, info);
}
}
@ -626,6 +632,7 @@ public class AdvisoryBroker extends BrokerFilter {
super.virtualDestinationRemoved(context, virtualDestination);
if (virtualDestinations.remove(virtualDestination)) {
LOG.debug("Virtual destination removed: {}", virtualDestination);
try {
consumersLock.readLock().lock();
try {
@ -636,16 +643,17 @@ public class AdvisoryBroker extends BrokerFilter {
//find all consumers for this virtual destination
if (virtualDestinationConsumers.get(info).equals(virtualDestination)) {
fireVirtualDestinationRemoveAdvisory(context, info);
}
//check consumers created for the existence of a destination to see if they
//match the consumerinfo and clean up
for (VirtualConsumerPair activeMQDest : brokerConsumerDests.keySet()) {
ConsumerInfo i = brokerConsumerDests.get(activeMQDest);
if (info.equals(i)) {
brokerConsumerDests.remove(activeMQDest);
//check consumers created for the existence of a destination to see if they
//match the consumerinfo and clean up
for (VirtualConsumerPair activeMQDest : brokerConsumerDests.keySet()) {
ConsumerInfo i = brokerConsumerDests.get(activeMQDest);
if (info.equals(i) && brokerConsumerDests.remove(activeMQDest) != null) {
LOG.debug("Virtual consumer pair removed: {} for consumer: {} ", activeMQDest, i);
}
}
}
}
}
}
@ -663,6 +671,7 @@ public class AdvisoryBroker extends BrokerFilter {
VirtualDestination virtualDestination = virtualDestinationConsumers.remove(info);
if (virtualDestination != null) {
LOG.debug("Virtual consumer removed: {}, for virtual destination: {}", info, virtualDestination);
ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(virtualDestination.getVirtualDestination());
ActiveMQDestination dest = info.getDestination();
@ -898,6 +907,7 @@ public class AdvisoryBroker extends BrokerFilter {
this.virtualDestination = virtualDestination;
this.activeMQDestination = activeMQDestination;
}
@Override
public int hashCode() {
final int prime = 31;
@ -913,6 +923,7 @@ public class AdvisoryBroker extends BrokerFilter {
.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
@ -936,6 +947,13 @@ public class AdvisoryBroker extends BrokerFilter {
return false;
return true;
}
@Override
public String toString() {
return "VirtualConsumerPair [virtualDestination=" + virtualDestination + ", activeMQDestination="
+ activeMQDestination + "]";
}
private AdvisoryBroker getOuterType() {
return AdvisoryBroker.this;
}

View File

@ -30,10 +30,10 @@ import org.apache.activemq.selector.SelectorParser;
*
* @org.apache.xbean.XBean
*
*
*
*/
public class FilteredDestination {
private ActiveMQDestination destination;
private String selector;
private BooleanExpression filter;
@ -91,4 +91,35 @@ public class FilteredDestination {
public void setTopic(String topic) {
setDestination(ActiveMQDestination.createDestination(topic, ActiveMQDestination.TOPIC_TYPE));
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((destination == null) ? 0 : destination.hashCode());
result = prime * result + ((selector == null) ? 0 : selector.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
FilteredDestination other = (FilteredDestination) obj;
if (destination == null) {
if (other.destination != null)
return false;
} else if (!destination.equals(other.destination))
return false;
if (selector == null) {
if (other.selector != null)
return false;
} else if (!selector.equals(other.selector))
return false;
return true;
}
}

View File

@ -544,6 +544,59 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
}
/**
* This tests that having 2 composite destinations (1 included for dynamic flow and 1 not)
* will allow messages to flow and that deleting 1 destination dosen't clear out the virtual
* consumer map except for what should be cleared.
*
*/
@Test(timeout = 60 * 1000)
public void testTwoCompositeTopicsRemove1() throws Exception {
Assume.assumeTrue(isUseVirtualDestSubsOnCreation);
doSetUp(true, null);
//configure a virtual destination that forwards messages from topic testQueueName
//to queue "include.test.bar.bridge" and "include.test.bar.bridge2"
CompositeTopic compositeTopic1 = createCompositeTopic(testTopicName,
new ActiveMQQueue("include.test.bar.bridge"));
CompositeTopic compositeTopic2 = createCompositeTopic(testTopicName + 2,
new ActiveMQQueue("include.test.bar.bridge2"));
runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic1, compositeTopic2}, true);
MessageProducer includedProducer = localSession.createProducer(included);
Message test = localSession.createTextMessage("test");
Thread.sleep(1000);
final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics();
final DestinationStatistics remoteDestStatistics = remoteBroker.getDestination(
new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
waitForConsumerCount(destinationStatistics, 1);
includedProducer.send(test);
waitForDispatchFromLocalBroker(destinationStatistics, 1);
assertLocalBrokerStatistics(destinationStatistics, 1);
assertEquals("remote dest messages", 1, remoteDestStatistics.getMessages().getCount());
//verify there are 2 virtual destinations but only 1 consumer and broker dest
assertAdvisoryBrokerCounts(2,1,1);
runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic1}, true);
Thread.sleep(2000);
//verify there is is only 1 virtual dest after deletion
assertAdvisoryBrokerCounts(1,1,1);
includedProducer.send(test);
//make sure messages are still forwarded even after 1 composite topic was deleted
waitForDispatchFromLocalBroker(destinationStatistics, 2);
assertLocalBrokerStatistics(destinationStatistics, 2);
assertEquals("remote dest messages", 2, remoteDestStatistics.getMessages().getCount());
}
/**
* Test that demand is destroyed after removing both targets from the composite Topic
* @throws Exception
@ -1375,7 +1428,7 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
ActiveMQMessage message = null;
while ((message = (ActiveMQMessage) advisoryConsumer.receive(1000)) != null) {
available++;
LOG.debug("advisory data structure: {}", message.getDataStructure());
LOG.info("advisory data structure: {}", message.getDataStructure());
}
assertEquals(count, available);
}