https://issues.apache.org/activemq/browse/AMQ-1255


git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@647872 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-04-14 16:02:12 +00:00
parent bff39c565b
commit 931ed7673d
3 changed files with 68 additions and 45 deletions

View File

@ -86,7 +86,7 @@ public class AdvisoryBroker extends BrokerFilter {
if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()); ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
consumers.put(info.getConsumerId(), info); consumers.put(info.getConsumerId(), info);
fireConsumerAdvisory(context,info.getDestination(), topic, info); fireConsumerAdvisory(context, info.getDestination(), topic, info);
} else { } else {
// We need to replay all the previously collected state objects // We need to replay all the previously collected state objects
// for this newly added consumer. // for this newly added consumer.
@ -179,7 +179,7 @@ public class AdvisoryBroker extends BrokerFilter {
fireAdvisory(context, topic, info); fireAdvisory(context, topic, info);
try { try {
next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), -1); next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), -1);
} catch (Exception expectedIfDestinationDidNotExistYet) { } catch (Exception expectedIfDestinationDidNotExistYet) {
} }
try { try {
next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1); next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1);
@ -190,7 +190,7 @@ public class AdvisoryBroker extends BrokerFilter {
} }
public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception { public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception {
next.removeDestinationInfo(context, destInfo); next.removeDestinationInfo(context, destInfo);
DestinationInfo info = destinations.remove(destInfo.getDestination()); DestinationInfo info = destinations.remove(destInfo.getDestination());
if (info != null) { if (info != null) {
info.setDestination(destInfo.getDestination()); info.setDestination(destInfo.getDestination());
@ -203,6 +203,7 @@ public class AdvisoryBroker extends BrokerFilter {
} }
try { try {
next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1); next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1);
} catch (Exception expectedIfDestinationDidNotExistYet) { } catch (Exception expectedIfDestinationDidNotExistYet) {
} }
} }
@ -221,10 +222,13 @@ public class AdvisoryBroker extends BrokerFilter {
next.removeConsumer(context, info); next.removeConsumer(context, info);
// Don't advise advisory topics. // Don't advise advisory topics.
if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { ActiveMQDestination dest = info.getDestination();
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()); if (!AdvisorySupport.isAdvisoryTopic(dest)) {
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
consumers.remove(info.getConsumerId()); consumers.remove(info.getConsumerId());
fireConsumerAdvisory(context,info.getDestination(), topic, info.createRemoveCommand()); if (!dest.isTemporary() || destinations.contains(dest)) {
fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand());
}
} }
} }
@ -232,10 +236,13 @@ public class AdvisoryBroker extends BrokerFilter {
next.removeProducer(context, info); next.removeProducer(context, info);
// Don't advise advisory topics. // Don't advise advisory topics.
if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) { ActiveMQDestination dest = info.getDestination();
ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()); if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(dest)) {
ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(dest);
producers.remove(info.getProducerId()); producers.remove(info.getProducerId());
fireProducerAdvisory(context, info.getDestination(),topic, info.createRemoveCommand()); if (!dest.isTemporary() || destinations.contains(dest)) {
fireProducerAdvisory(context, dest,topic, info.createRemoveCommand());
}
} }
} }

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@ -24,9 +23,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange; import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.DestinationAlreadyExistsException; import org.apache.activemq.broker.DestinationAlreadyExistsException;
@ -96,18 +93,21 @@ public abstract class AbstractRegion implements Region {
context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
context.getBroker().addDestination(context, dest); context.getBroker().addDestination(context, dest);
} }
synchronized (destinationsMutex) {
for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) { for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
Destination dest = i.next(); Destination dest = i.next();
dest.start(); dest.start();
}
} }
} }
public void stop() throws Exception { public void stop() throws Exception {
started = false; started = false;
for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) { synchronized (destinationsMutex) {
Destination dest = i.next(); for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
dest.stop(); Destination dest = i.next();
dest.stop();
}
} }
destinations.clear(); destinations.clear();
} }
@ -169,10 +169,10 @@ public abstract class AbstractRegion implements Region {
} }
LOG.debug("Removing destination: " + destination); LOG.debug("Removing destination: " + destination);
synchronized (destinationsMutex) { synchronized (destinationsMutex) {
Destination dest = destinations.remove(destination); Destination dest = destinations.remove(destination);
if (dest != null) { if (dest != null) {
// timeout<0 or we timed out, we now force any remaining // timeout<0 or we timed out, we now force any remaining
// subscriptions to un-subscribe. // subscriptions to un-subscribe.
for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) { for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
@ -181,11 +181,10 @@ public abstract class AbstractRegion implements Region {
dest.removeSubscription(context, sub); dest.removeSubscription(context, sub);
} }
} }
destinationMap.removeAll(destination); destinationMap.removeAll(destination);
dispose(context,dest); dispose(context,dest);
} else { } else {
LOG.debug("Destination doesn't exist: " + dest); LOG.debug("Destination doesn't exist: " + dest);
} }
} }
@ -259,9 +258,12 @@ public abstract class AbstractRegion implements Region {
// so everything after this point would be leaked. // so everything after this point would be leaked.
// Add the subscription to all the matching queues. // Add the subscription to all the matching queues.
for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
Destination dest = (Destination)iter.next(); synchronized(destinationsMutex) {
dest.addSubscription(context, sub); for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
Destination dest = (Destination)iter.next();
dest.addSubscription(context, sub);
}
} }
if (info.isBrowser()) { if (info.isBrowser()) {
@ -286,7 +288,9 @@ public abstract class AbstractRegion implements Region {
*/ */
protected Set<ActiveMQDestination> getInactiveDestinations() { protected Set<ActiveMQDestination> getInactiveDestinations() {
Set<ActiveMQDestination> inactiveDests = destinationFactory.getDestinations(); Set<ActiveMQDestination> inactiveDests = destinationFactory.getDestinations();
inactiveDests.removeAll(destinations.keySet()); synchronized (destinationsMutex) {
inactiveDests.removeAll(destinations.keySet());
}
return inactiveDests; return inactiveDests;
} }
@ -298,10 +302,12 @@ public abstract class AbstractRegion implements Region {
if (sub != null) { if (sub != null) {
// remove the subscription from all the matching queues. // remove the subscription from all the matching queues.
for (Iterator iter = destinationMap.get(info.getDestination()) synchronized (destinationsMutex) {
.iterator(); iter.hasNext();) { for (Iterator iter = destinationMap.get(info.getDestination())
Destination dest = (Destination) iter.next(); .iterator(); iter.hasNext();) {
dest.removeSubscription(context, sub); Destination dest = (Destination) iter.next();
dest.removeSubscription(context, sub);
}
} }
destroySubscription(sub); destroySubscription(sub);
@ -396,9 +402,11 @@ public abstract class AbstractRegion implements Region {
Subscription sub = iter.next(); Subscription sub = iter.next();
sub.gc(); sub.gc();
} }
for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) { synchronized (destinationsMutex) {
Destination dest = iter.next(); for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
dest.gc(); Destination dest = iter.next();
dest.gc();
}
} }
} }
@ -417,9 +425,11 @@ public abstract class AbstractRegion implements Region {
} }
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception{ public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception{
for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { synchronized (destinationsMutex) {
Destination dest = (Destination)iter.next(); for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
dest.addProducer(context, info); Destination dest = (Destination) iter.next();
dest.addProducer(context, info);
}
} }
} }
@ -429,9 +439,11 @@ public abstract class AbstractRegion implements Region {
* @throws Exception TODO * @throws Exception TODO
*/ */
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception{ public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception{
for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { synchronized (destinationsMutex) {
Destination dest = (Destination)iter.next(); for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
dest.removeProducer(context, info); Destination dest = (Destination)iter.next();
dest.removeProducer(context, info);
}
} }
} }

View File

@ -78,9 +78,11 @@ public class TopicRegion extends AbstractRegion {
if (hasDurableSubChanged(info, sub.getConsumerInfo())) { if (hasDurableSubChanged(info, sub.getConsumerInfo())) {
// Remove the consumer first then add it. // Remove the consumer first then add it.
durableSubscriptions.remove(key); durableSubscriptions.remove(key);
for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) { synchronized (destinationsMutex) {
Topic topic = (Topic)iter.next(); for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
topic.deleteSubscription(context, key); Topic topic = (Topic)iter.next();
topic.deleteSubscription(context, key);
}
} }
super.removeConsumer(context, sub.getConsumerInfo()); super.removeConsumer(context, sub.getConsumerInfo());
super.addConsumer(context, info); super.addConsumer(context, info);
@ -132,9 +134,11 @@ public class TopicRegion extends AbstractRegion {
} }
durableSubscriptions.remove(key); durableSubscriptions.remove(key);
for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) { synchronized (destinationsMutex) {
Topic topic = (Topic)iter.next(); for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
topic.deleteSubscription(context, key); Topic topic = (Topic)iter.next();
topic.deleteSubscription(context, key);
}
} }
super.removeConsumer(context, sub.getConsumerInfo()); super.removeConsumer(context, sub.getConsumerInfo());
} }