diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 11af872e1a..d68c5bd7a7 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -19,8 +19,10 @@ package org.apache.activemq.advisory; import java.util.Comparator; import java.util.Iterator; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentSkipListMap; import org.apache.activemq.broker.Broker; @@ -68,66 +70,9 @@ public class AdvisoryBroker extends BrokerFilter { private static final IdGenerator ID_GENERATOR = new IdGenerator(); protected final ConcurrentHashMap connections = new ConcurrentHashMap(); - class ConsumerIdKey { - private final ConsumerId delegate; - private final long creationTime; - - ConsumerIdKey(ConsumerId id) { - this.delegate = id; - this.creationTime = System.currentTimeMillis(); - } - - ConsumerIdKey(ConsumerId id, long creationTime) { - this.delegate = id; - this.creationTime = creationTime; - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (other == null || other.getClass() != ConsumerIdKey.class) { - return false; - } - - ConsumerIdKey key = (ConsumerIdKey) other; - - return delegate.equals(key.delegate); - } - - @Override - public int hashCode() { - return delegate.hashCode(); - } - - @Override - public String toString() { - return "ConsumerIdKey { " + delegate + " }"; - } - - public ConsumerId getConsumerId() { - return this.delegate; - } - - public long getCreationTime() { - return this.creationTime; - } - } - - // replay consumer advisory messages in the order in which they arrive - allows duplicate suppression in - // mesh networks with ttl>1 - protected final Map consumers = new ConcurrentSkipListMap( - new Comparator() { - @Override - public int compare(ConsumerIdKey o1, ConsumerIdKey o2) { - return (o1.creationTime < o2.creationTime ? -1 : o1.equals(o2) ? 0 : 1); - } - } - ); - - protected final Map consumerTracker = new ConcurrentHashMap(); + protected final Queue consumers = new ConcurrentLinkedQueue(); + protected final ConcurrentHashMap producers = new ConcurrentHashMap(); protected final ConcurrentHashMap destinations = new ConcurrentHashMap(); protected final ConcurrentHashMap networkBridges = new ConcurrentHashMap(); @@ -159,9 +104,7 @@ public class AdvisoryBroker extends BrokerFilter { // Don't advise advisory topics. if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()); - ConsumerIdKey key = new ConsumerIdKey(info.getConsumerId()); - consumerTracker.put(key.getConsumerId(), key.getCreationTime()); - consumers.put(key, info); + consumers.offer(info); fireConsumerAdvisory(context, info.getDestination(), topic, info); } else { // We need to replay all the previously collected state objects @@ -206,7 +149,7 @@ public class AdvisoryBroker extends BrokerFilter { // Replay the consumers. if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) { - for (Iterator iter = consumers.values().iterator(); iter.hasNext();) { + for (Iterator iter = consumers.iterator(); iter.hasNext();) { ConsumerInfo value = iter.next(); ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination()); fireConsumerAdvisory(context,value.getDestination(), topic, value, info.getConsumerId()); @@ -324,17 +267,7 @@ public class AdvisoryBroker extends BrokerFilter { ActiveMQDestination dest = info.getDestination(); if (!AdvisorySupport.isAdvisoryTopic(dest)) { ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest); - Object value = consumerTracker.remove(info.getConsumerId()); - if (value != null) { - Long creationTime = (Long) value; - ConsumerIdKey key = new ConsumerIdKey(info.getConsumerId(), creationTime); - if (consumers.remove(key) == null) { - LOG.trace("Failed to remove:{} from the consumers map: {}", key, consumers); - } - } else { - LOG.trace("Failed to find consumer:{} in creation time tracking map: ", info.getConsumerId()); - } - + consumers.remove(info); if (!dest.isTemporary() || destinations.containsKey(dest)) { fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand()); } @@ -660,7 +593,7 @@ public class AdvisoryBroker extends BrokerFilter { return connections; } - public Map getAdvisoryConsumers() { + public Queue getAdvisoryConsumers() { return consumers; }