diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 564321f9ed..5ac201e97f 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -289,11 +289,10 @@ public class AdvisoryBroker extends BrokerFilter { //in case of multiple matches VirtualConsumerPair key = new VirtualConsumerPair(virtualDestination, destination); ConsumerInfo i = brokerConsumerDests.get(key); - if (consumerInfo.equals(i)) { - if (brokerConsumerDests.remove(key) != null) { - fireVirtualDestinationRemoveAdvisory(context, consumerInfo); - break; - } + if (consumerInfo.equals(i) && brokerConsumerDests.remove(key) != null) { + LOG.debug("Virtual consumer pair removed: {} for consumer: {} ", key, i); + fireVirtualDestinationRemoveAdvisory(context, consumerInfo); + break; } } } @@ -549,6 +548,7 @@ public class AdvisoryBroker extends BrokerFilter { super.virtualDestinationAdded(context, virtualDestination); if (virtualDestinations.add(virtualDestination)) { + LOG.debug("Virtual destination added: {}", virtualDestination); try { // Don't advise advisory topics. if (!AdvisorySupport.isAdvisoryTopic(virtualDestination.getVirtualDestination())) { @@ -592,20 +592,25 @@ public class AdvisoryBroker extends BrokerFilter { //if no consumer info, we need to create one - this is the case when an advisory is fired //because of the existence of a destination matching a virtual destination if (info == null) { - ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId()); - SessionId sessionId = new SessionId(connectionId, sessionIdGenerator.getNextSequenceId()); - ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); - - info = new ConsumerInfo(consumerId); //store the virtual destination and the activeMQDestination as a pair so that we can keep track //of all matching forwarded destinations that caused demand - if(brokerConsumerDests.putIfAbsent(new VirtualConsumerPair(virtualDestination, activeMQDest), info) == null) { - info.setDestination(virtualDestination.getVirtualDestination()); - ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination()); + VirtualConsumerPair pair = new VirtualConsumerPair(virtualDestination, activeMQDest); + if (brokerConsumerDests.get(pair) == null) { + ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId()); + SessionId sessionId = new SessionId(connectionId, sessionIdGenerator.getNextSequenceId()); + ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); + info = new ConsumerInfo(consumerId); - if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) { - fireConsumerAdvisory(context, info.getDestination(), topic, info); + if(brokerConsumerDests.putIfAbsent(pair, info) == null) { + LOG.debug("Virtual consumer pair added: {} for consumer: {} ", pair, info); + info.setDestination(virtualDestination.getVirtualDestination()); + ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination()); + + if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) { + LOG.debug("Virtual consumer added: {}, for virtual destination: {}", info, virtualDestination); + fireConsumerAdvisory(context, info.getDestination(), topic, info); + } } } //this is the case of a real consumer coming online @@ -615,6 +620,7 @@ public class AdvisoryBroker extends BrokerFilter { ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination()); if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) { + LOG.debug("Virtual consumer added: {}, for virtual destination: {}", info, virtualDestination); fireConsumerAdvisory(context, info.getDestination(), topic, info); } } @@ -626,6 +632,7 @@ public class AdvisoryBroker extends BrokerFilter { super.virtualDestinationRemoved(context, virtualDestination); if (virtualDestinations.remove(virtualDestination)) { + LOG.debug("Virtual destination removed: {}", virtualDestination); try { consumersLock.readLock().lock(); try { @@ -636,16 +643,17 @@ public class AdvisoryBroker extends BrokerFilter { //find all consumers for this virtual destination if (virtualDestinationConsumers.get(info).equals(virtualDestination)) { fireVirtualDestinationRemoveAdvisory(context, info); - } - //check consumers created for the existence of a destination to see if they - //match the consumerinfo and clean up - for (VirtualConsumerPair activeMQDest : brokerConsumerDests.keySet()) { - ConsumerInfo i = brokerConsumerDests.get(activeMQDest); - if (info.equals(i)) { - brokerConsumerDests.remove(activeMQDest); + //check consumers created for the existence of a destination to see if they + //match the consumerinfo and clean up + for (VirtualConsumerPair activeMQDest : brokerConsumerDests.keySet()) { + ConsumerInfo i = brokerConsumerDests.get(activeMQDest); + if (info.equals(i) && brokerConsumerDests.remove(activeMQDest) != null) { + LOG.debug("Virtual consumer pair removed: {} for consumer: {} ", activeMQDest, i); + } } } + } } } @@ -663,6 +671,7 @@ public class AdvisoryBroker extends BrokerFilter { VirtualDestination virtualDestination = virtualDestinationConsumers.remove(info); if (virtualDestination != null) { + LOG.debug("Virtual consumer removed: {}, for virtual destination: {}", info, virtualDestination); ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(virtualDestination.getVirtualDestination()); ActiveMQDestination dest = info.getDestination(); @@ -898,6 +907,7 @@ public class AdvisoryBroker extends BrokerFilter { this.virtualDestination = virtualDestination; this.activeMQDestination = activeMQDestination; } + @Override public int hashCode() { final int prime = 31; @@ -913,6 +923,7 @@ public class AdvisoryBroker extends BrokerFilter { .hashCode()); return result; } + @Override public boolean equals(Object obj) { if (this == obj) @@ -936,6 +947,13 @@ public class AdvisoryBroker extends BrokerFilter { return false; return true; } + + @Override + public String toString() { + return "VirtualConsumerPair [virtualDestination=" + virtualDestination + ", activeMQDestination=" + + activeMQDestination + "]"; + } + private AdvisoryBroker getOuterType() { return AdvisoryBroker.this; } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java index dc81a0e043..73cb548138 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java @@ -30,10 +30,10 @@ import org.apache.activemq.selector.SelectorParser; * * @org.apache.xbean.XBean * - * + * */ public class FilteredDestination { - + private ActiveMQDestination destination; private String selector; private BooleanExpression filter; @@ -91,4 +91,35 @@ public class FilteredDestination { public void setTopic(String topic) { setDestination(ActiveMQDestination.createDestination(topic, ActiveMQDestination.TOPIC_TYPE)); } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((destination == null) ? 0 : destination.hashCode()); + result = prime * result + ((selector == null) ? 0 : selector.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + FilteredDestination other = (FilteredDestination) obj; + if (destination == null) { + if (other.destination != null) + return false; + } else if (!destination.equals(other.destination)) + return false; + if (selector == null) { + if (other.selector != null) + return false; + } else if (!selector.equals(other.selector)) + return false; + return true; + } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java index bff069b461..3b46f8cb9c 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java @@ -544,6 +544,59 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport { } + /** + * This tests that having 2 composite destinations (1 included for dynamic flow and 1 not) + * will allow messages to flow and that deleting 1 destination dosen't clear out the virtual + * consumer map except for what should be cleared. + * + */ + @Test(timeout = 60 * 1000) + public void testTwoCompositeTopicsRemove1() throws Exception { + Assume.assumeTrue(isUseVirtualDestSubsOnCreation); + + doSetUp(true, null); + + //configure a virtual destination that forwards messages from topic testQueueName + //to queue "include.test.bar.bridge" and "include.test.bar.bridge2" + CompositeTopic compositeTopic1 = createCompositeTopic(testTopicName, + new ActiveMQQueue("include.test.bar.bridge")); + CompositeTopic compositeTopic2 = createCompositeTopic(testTopicName + 2, + new ActiveMQQueue("include.test.bar.bridge2")); + + runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic1, compositeTopic2}, true); + + MessageProducer includedProducer = localSession.createProducer(included); + Message test = localSession.createTextMessage("test"); + Thread.sleep(1000); + + final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics(); + final DestinationStatistics remoteDestStatistics = remoteBroker.getDestination( + new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics(); + + waitForConsumerCount(destinationStatistics, 1); + + includedProducer.send(test); + + waitForDispatchFromLocalBroker(destinationStatistics, 1); + assertLocalBrokerStatistics(destinationStatistics, 1); + assertEquals("remote dest messages", 1, remoteDestStatistics.getMessages().getCount()); + + //verify there are 2 virtual destinations but only 1 consumer and broker dest + assertAdvisoryBrokerCounts(2,1,1); + runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic1}, true); + Thread.sleep(2000); + //verify there is is only 1 virtual dest after deletion + assertAdvisoryBrokerCounts(1,1,1); + + includedProducer.send(test); + + //make sure messages are still forwarded even after 1 composite topic was deleted + waitForDispatchFromLocalBroker(destinationStatistics, 2); + assertLocalBrokerStatistics(destinationStatistics, 2); + assertEquals("remote dest messages", 2, remoteDestStatistics.getMessages().getCount()); + + } + /** * Test that demand is destroyed after removing both targets from the composite Topic * @throws Exception @@ -1375,7 +1428,7 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport { ActiveMQMessage message = null; while ((message = (ActiveMQMessage) advisoryConsumer.receive(1000)) != null) { available++; - LOG.debug("advisory data structure: {}", message.getDataStructure()); + LOG.info("advisory data structure: {}", message.getDataStructure()); } assertEquals(count, available); }