From 931ed7673d00bf890131ed284f9ce8f81b50af93 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Mon, 14 Apr 2008 16:02:12 +0000 Subject: [PATCH] Fix for: https://issues.apache.org/activemq/browse/AMQ-1255 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@647872 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/advisory/AdvisoryBroker.java | 25 ++++--- .../broker/region/AbstractRegion.java | 72 +++++++++++-------- .../activemq/broker/region/TopicRegion.java | 16 +++-- 3 files changed, 68 insertions(+), 45 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index e3c8103092..abd7dfc582 100755 --- a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -86,7 +86,7 @@ public class AdvisoryBroker extends BrokerFilter { if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()); consumers.put(info.getConsumerId(), info); - fireConsumerAdvisory(context,info.getDestination(), topic, info); + fireConsumerAdvisory(context, info.getDestination(), topic, info); } else { // We need to replay all the previously collected state objects // for this newly added consumer. @@ -179,7 +179,7 @@ public class AdvisoryBroker extends BrokerFilter { fireAdvisory(context, topic, info); try { next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), -1); - } catch (Exception expectedIfDestinationDidNotExistYet) { + } catch (Exception expectedIfDestinationDidNotExistYet) { } try { next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1); @@ -190,7 +190,7 @@ public class AdvisoryBroker extends BrokerFilter { } public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception { - next.removeDestinationInfo(context, destInfo); + next.removeDestinationInfo(context, destInfo); DestinationInfo info = destinations.remove(destInfo.getDestination()); if (info != null) { info.setDestination(destInfo.getDestination()); @@ -203,6 +203,7 @@ public class AdvisoryBroker extends BrokerFilter { } try { next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1); + } catch (Exception expectedIfDestinationDidNotExistYet) { } } @@ -221,10 +222,13 @@ public class AdvisoryBroker extends BrokerFilter { next.removeConsumer(context, info); // Don't advise advisory topics. - if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { - ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()); + ActiveMQDestination dest = info.getDestination(); + if (!AdvisorySupport.isAdvisoryTopic(dest)) { + ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest); consumers.remove(info.getConsumerId()); - fireConsumerAdvisory(context,info.getDestination(), topic, info.createRemoveCommand()); + if (!dest.isTemporary() || destinations.contains(dest)) { + fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand()); + } } } @@ -232,10 +236,13 @@ public class AdvisoryBroker extends BrokerFilter { next.removeProducer(context, info); // Don't advise advisory topics. - if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) { - ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()); + ActiveMQDestination dest = info.getDestination(); + if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(dest)) { + ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(dest); producers.remove(info.getProducerId()); - fireProducerAdvisory(context, info.getDestination(),topic, info.createRemoveCommand()); + if (!dest.isTemporary() || destinations.contains(dest)) { + fireProducerAdvisory(context, dest,topic, info.createRemoveCommand()); + } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index 531094f00b..f2458461ae 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.broker.region; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -24,9 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; - import javax.jms.JMSException; - import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConsumerBrokerExchange; import org.apache.activemq.broker.DestinationAlreadyExistsException; @@ -96,18 +93,21 @@ public abstract class AbstractRegion implements Region { context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); context.getBroker().addDestination(context, dest); } - - for (Iterator i = destinations.values().iterator(); i.hasNext();) { - Destination dest = i.next(); - dest.start(); + synchronized (destinationsMutex) { + for (Iterator i = destinations.values().iterator(); i.hasNext();) { + Destination dest = i.next(); + dest.start(); + } } } public void stop() throws Exception { started = false; - for (Iterator i = destinations.values().iterator(); i.hasNext();) { - Destination dest = i.next(); - dest.stop(); + synchronized (destinationsMutex) { + for (Iterator i = destinations.values().iterator(); i.hasNext();) { + Destination dest = i.next(); + dest.stop(); + } } destinations.clear(); } @@ -169,10 +169,10 @@ public abstract class AbstractRegion implements Region { } LOG.debug("Removing destination: " + destination); + synchronized (destinationsMutex) { Destination dest = destinations.remove(destination); if (dest != null) { - // timeout<0 or we timed out, we now force any remaining // subscriptions to un-subscribe. for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) { @@ -181,11 +181,10 @@ public abstract class AbstractRegion implements Region { dest.removeSubscription(context, sub); } } - destinationMap.removeAll(destination); dispose(context,dest); - } else { + } else { LOG.debug("Destination doesn't exist: " + dest); } } @@ -259,9 +258,12 @@ public abstract class AbstractRegion implements Region { // so everything after this point would be leaked. // Add the subscription to all the matching queues. - for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { - Destination dest = (Destination)iter.next(); - dest.addSubscription(context, sub); + + synchronized(destinationsMutex) { + for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { + Destination dest = (Destination)iter.next(); + dest.addSubscription(context, sub); + } } if (info.isBrowser()) { @@ -286,7 +288,9 @@ public abstract class AbstractRegion implements Region { */ protected Set getInactiveDestinations() { Set inactiveDests = destinationFactory.getDestinations(); - inactiveDests.removeAll(destinations.keySet()); + synchronized (destinationsMutex) { + inactiveDests.removeAll(destinations.keySet()); + } return inactiveDests; } @@ -298,10 +302,12 @@ public abstract class AbstractRegion implements Region { if (sub != null) { // remove the subscription from all the matching queues. - for (Iterator iter = destinationMap.get(info.getDestination()) - .iterator(); iter.hasNext();) { - Destination dest = (Destination) iter.next(); - dest.removeSubscription(context, sub); + synchronized (destinationsMutex) { + for (Iterator iter = destinationMap.get(info.getDestination()) + .iterator(); iter.hasNext();) { + Destination dest = (Destination) iter.next(); + dest.removeSubscription(context, sub); + } } destroySubscription(sub); @@ -396,9 +402,11 @@ public abstract class AbstractRegion implements Region { Subscription sub = iter.next(); sub.gc(); } - for (Iterator iter = destinations.values().iterator(); iter.hasNext();) { - Destination dest = iter.next(); - dest.gc(); + synchronized (destinationsMutex) { + for (Iterator iter = destinations.values().iterator(); iter.hasNext();) { + Destination dest = iter.next(); + dest.gc(); + } } } @@ -417,9 +425,11 @@ public abstract class AbstractRegion implements Region { } public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception{ - for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { - Destination dest = (Destination)iter.next(); - dest.addProducer(context, info); + synchronized (destinationsMutex) { + for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { + Destination dest = (Destination) iter.next(); + dest.addProducer(context, info); + } } } @@ -429,9 +439,11 @@ public abstract class AbstractRegion implements Region { * @throws Exception TODO */ public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception{ - for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { - Destination dest = (Destination)iter.next(); - dest.removeProducer(context, info); + synchronized (destinationsMutex) { + for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { + Destination dest = (Destination)iter.next(); + dest.removeProducer(context, info); + } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java index 3622f8e452..aa1e18e922 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java @@ -78,9 +78,11 @@ public class TopicRegion extends AbstractRegion { if (hasDurableSubChanged(info, sub.getConsumerInfo())) { // Remove the consumer first then add it. durableSubscriptions.remove(key); - for (Iterator iter = destinations.values().iterator(); iter.hasNext();) { - Topic topic = (Topic)iter.next(); - topic.deleteSubscription(context, key); + synchronized (destinationsMutex) { + for (Iterator iter = destinations.values().iterator(); iter.hasNext();) { + Topic topic = (Topic)iter.next(); + topic.deleteSubscription(context, key); + } } super.removeConsumer(context, sub.getConsumerInfo()); super.addConsumer(context, info); @@ -132,9 +134,11 @@ public class TopicRegion extends AbstractRegion { } durableSubscriptions.remove(key); - for (Iterator iter = destinations.values().iterator(); iter.hasNext();) { - Topic topic = (Topic)iter.next(); - topic.deleteSubscription(context, key); + synchronized (destinationsMutex) { + for (Iterator iter = destinations.values().iterator(); iter.hasNext();) { + Topic topic = (Topic)iter.next(); + topic.deleteSubscription(context, key); + } } super.removeConsumer(context, sub.getConsumerInfo()); }