From 21420459535335bef22f8d1ced6d3ce5b92d9628 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Fri, 20 Sep 2013 16:50:20 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-4731 Initial fix for this issue. Uses a separate collection to track the creation time of the consumers, might want to test just using a ConcurrentLinkedQueue since that will stay in creation or naturally although the remove operations could cost more.. --- .../activemq/advisory/AdvisoryBroker.java | 84 ++++++++++++++++--- .../advisory/TempQueueMemoryTest.java | 30 +++---- 2 files changed, 88 insertions(+), 26 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 657169999d..e12c0ce19c 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -34,7 +34,21 @@ import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.broker.region.TopicSubscription; -import org.apache.activemq.command.*; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.command.Command; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.DestinationInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.security.SecurityContext; import org.apache.activemq.state.ProducerState; import org.apache.activemq.usage.Usage; @@ -55,33 +69,65 @@ public class AdvisoryBroker extends BrokerFilter { protected final ConcurrentHashMap connections = new ConcurrentHashMap(); class ConsumerIdKey { - final ConsumerId delegate; - final long creationTime = System.currentTimeMillis(); + private final ConsumerId delegate; + private final long creationTime; + ConsumerIdKey(ConsumerId id) { - delegate = id; + this.delegate = id; + this.creationTime = System.currentTimeMillis(); + } + + ConsumerIdKey(ConsumerId id, long creationTime) { + this.delegate = id; + this.creationTime = creationTime; } @Override public boolean equals(Object other) { - return delegate.equals(other); + if (this == other) { + return true; + } + if (other == null || other.getClass() != ConsumerIdKey.class) { + return false; + } + + ConsumerIdKey key = (ConsumerIdKey) other; + + return delegate.equals(key.delegate); } @Override public int hashCode() { return delegate.hashCode(); } + + @Override + public String toString() { + return "ConsumerIdKey { " + delegate + " }"; + } + + public ConsumerId getConsumerId() { + return this.delegate; + } + + public long getCreationTime() { + return this.creationTime; + } } + // replay consumer advisory messages in the order in which they arrive - allows duplicate suppression in // mesh networks with ttl>1 protected final Map consumers = new ConcurrentSkipListMap( - new Comparator() { - @Override - public int compare(ConsumerIdKey o1, ConsumerIdKey o2) { - return (o1.creationTime < o2.creationTime ? -1 : (o1.delegate==o2.delegate ? 0 : 1)); - } + new Comparator() { + @Override + public int compare(ConsumerIdKey o1, ConsumerIdKey o2) { + return (o1.creationTime < o2.creationTime ? -1 : o1.equals(o2) ? 0 : 1); } + } ); + protected final Map consumerTracker = new ConcurrentHashMap(); + protected final ConcurrentHashMap producers = new ConcurrentHashMap(); protected final ConcurrentHashMap destinations = new ConcurrentHashMap(); protected final ConcurrentHashMap networkBridges = new ConcurrentHashMap(); @@ -113,7 +159,10 @@ public class AdvisoryBroker extends BrokerFilter { // Don't advise advisory topics. if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()); - consumers.put(new ConsumerIdKey(info.getConsumerId()), info); + ConsumerIdKey key = new ConsumerIdKey(info.getConsumerId()); + consumerTracker.put(key.getConsumerId(), key.getCreationTime()); + consumers.put(key, info); + LOG.info("Added {} to the map:", key); fireConsumerAdvisory(context, info.getDestination(), topic, info); } else { // We need to replay all the previously collected state objects @@ -276,7 +325,18 @@ public class AdvisoryBroker extends BrokerFilter { ActiveMQDestination dest = info.getDestination(); if (!AdvisorySupport.isAdvisoryTopic(dest)) { ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest); - consumers.remove(new ConsumerIdKey(info.getConsumerId())); + + Object value = consumerTracker.remove(info.getConsumerId()); + if (value != null) { + Long creationTime = (Long) value; + ConsumerIdKey key = new ConsumerIdKey(info.getConsumerId(), creationTime); + if (consumers.remove(key) == null) { + LOG.info("Failed to remove:{} from the consumers map: {}", key, consumers); + } + } else { + LOG.info("Failed to find consumer:{} in creation time tracking map: ", info.getConsumerId()); + } + if (!dest.isTemporary() || destinations.containsKey(dest)) { fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand()); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java index 82686158b2..9bf8ed1426 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java @@ -34,12 +34,13 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport { + protected Connection serverConnection; protected Session serverSession; protected Connection clientConnection; protected Session clientSession; protected Destination serverDestination; - protected int messagesToSend = 2000; + protected int messagesToSend = 10; protected boolean deleteTempQueue = true; protected boolean serverTransactional = false; protected boolean clientTransactional = false; @@ -52,7 +53,7 @@ public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport { } public void testLoadRequestReply() throws Exception { - for (int i=0; i< numConsumers; i++) { + for (int i = 0; i < numConsumers; i++) { serverSession.createConsumer(serverDestination).setMessageListener(new MessageListener() { @Override public void onMessage(Message msg) { @@ -73,17 +74,19 @@ public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport { class Producer extends Thread { private final int numToSend; + public Producer(int numToSend) { this.numToSend = numToSend; } + @Override public void run() { try { - Session session = clientConnection.createSession(clientTransactional, - clientTransactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); + Session session = clientConnection.createSession(clientTransactional, clientTransactional ? + Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(serverDestination); - for (int i =0; i< numToSend; i++) { + for (int i = 0; i < numToSend; i++) { TemporaryQueue replyTo = session.createTemporaryQueue(); MessageConsumer consumer = session.createConsumer(replyTo); Message msg = session.createMessage(); @@ -109,8 +112,8 @@ public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport { } } Vector threads = new Vector(numProducers); - for (int i=0; i threads) throws Exception { - for (Thread thread: threads) { + for (Thread thread : threads) { thread.start(); } - for (Thread thread: threads) { + for (Thread thread : threads) { thread.join(); } }