Revert "Fix for https://issues.apache.org/jira/browse/AMQ-4000 Durable subscription not getting unregistered on networked broker, thanks torsten for the unit test!"

added DurableSubInBrokerNetworkTest to broken test profile till we get this resolved.

This reverts commit b7c32d924a.

Conflicts:
	activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1448161 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2013-02-20 13:33:36 +00:00
parent 5b5a818f4d
commit def8c77c07
3 changed files with 42 additions and 35 deletions

View File

@ -29,13 +29,25 @@ import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicSubscription; import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.command.*; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.security.SecurityContext; import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState; import org.apache.activemq.state.ProducerState;
import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.Usage;
import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator; import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -53,7 +65,6 @@ public class AdvisoryBroker extends BrokerFilter {
protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>(); protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>();
protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>(); protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
protected final ConcurrentHashMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>(); protected final ConcurrentHashMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>();
protected final ConcurrentHashMap<SubscriptionKey, ActiveMQTopic> durableSubscriptions = new ConcurrentHashMap<SubscriptionKey, ActiveMQTopic>();
protected final ProducerId advisoryProducerId = new ProducerId(); protected final ProducerId advisoryProducerId = new ProducerId();
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
@ -81,12 +92,6 @@ public class AdvisoryBroker extends BrokerFilter {
// Don't advise advisory topics. // Don't advise advisory topics.
if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
if (info.getDestination().isTopic() && info.isDurable()) {
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
if (!this.durableSubscriptions.contains(key)) {
this.durableSubscriptions.put(key, (ActiveMQTopic)info.getDestination());
}
}
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()); ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
consumers.put(info.getConsumerId(), info); consumers.put(info.getConsumerId(), info);
fireConsumerAdvisory(context, info.getDestination(), topic, info); fireConsumerAdvisory(context, info.getDestination(), topic, info);
@ -258,26 +263,6 @@ public class AdvisoryBroker extends BrokerFilter {
} }
} }
@Override
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
super.removeSubscription(context, info);
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
ActiveMQTopic dest = durableSubscriptions.get(key);
if (dest == null) {
LOG.warn("We cannot send an advisory message for a durable sub removal when we don't know about the durable sub");
}
// Don't advise advisory topics.
if (!AdvisorySupport.isAdvisoryTopic(dest)) {
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
durableSubscriptions.remove(key);
fireConsumerAdvisory(context,dest, topic, info);
}
}
@Override @Override
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
super.removeProducer(context, info); super.removeProducer(context, info);

View File

@ -50,7 +50,32 @@ import org.apache.activemq.broker.region.Region;
import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.*; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.NetworkBridgeFilter;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.security.SecurityContext; import org.apache.activemq.security.SecurityContext;
@ -791,11 +816,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} else if (data.getClass() == RemoveInfo.class) { } else if (data.getClass() == RemoveInfo.class) {
ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId(); ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
removeDemandSubscription(id); removeDemandSubscription(id);
} else if (data.getClass() == RemoveSubscriptionInfo.class) {
RemoveSubscriptionInfo durableSub = (RemoveSubscriptionInfo)data;
LOG.debug("Removing durable subscription: clientId: " + durableSub.getClientId()
+ ", durableName: " + durableSub.getSubcriptionName());
localBroker.oneway(data);
} }
} }

View File

@ -570,6 +570,8 @@
<exclude>**/StoreQueueCursorJournalNoDuplicateTest.*</exclude> <exclude>**/StoreQueueCursorJournalNoDuplicateTest.*</exclude>
<exclude>**/org.apache.activemq.usecases.ThreeBrokerVirtualTopicNetworkAMQPATest.*</exclude> <exclude>**/org.apache.activemq.usecases.ThreeBrokerVirtualTopicNetworkAMQPATest.*</exclude>
<exclude>**/LevelDBXARecoveryBrokerTest.*</exclude> <exclude>**/LevelDBXARecoveryBrokerTest.*</exclude>
<!-- https://issues.apache.org/jira/browse/AMQ-4000 -->
<exclude>**/DurableSubInBrokerNetworkTest.*</exclude>
</excludes> </excludes>
</configuration> </configuration>
</plugin> </plugin>