From def8c77c076a6c984b555c0f91b47c82a634ec3c Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Wed, 20 Feb 2013 13:33:36 +0000 Subject: [PATCH] Revert "Fix for https://issues.apache.org/jira/browse/AMQ-4000 Durable subscription not getting unregistered on networked broker, thanks torsten for the unit test!" added DurableSubInBrokerNetworkTest to broken test profile till we get this resolved. This reverts commit b7c32d924af5ada1a2068c77f3bf8e44267edea4. Conflicts: activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1448161 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/advisory/AdvisoryBroker.java | 43 ++++++------------- .../DemandForwardingBridgeSupport.java | 32 +++++++++++--- activemq-unit-tests/pom.xml | 2 + 3 files changed, 42 insertions(+), 35 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index d2ceffb133..09c7c85a68 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -29,13 +29,25 @@ import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.TopicSubscription; -import org.apache.activemq.command.*; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.command.Command; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.DestinationInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.security.SecurityContext; import org.apache.activemq.state.ProducerState; import org.apache.activemq.usage.Usage; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.LongSequenceGenerator; -import org.apache.activemq.util.SubscriptionKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +65,6 @@ public class AdvisoryBroker extends BrokerFilter { protected final ConcurrentHashMap producers = new ConcurrentHashMap(); protected final ConcurrentHashMap destinations = new ConcurrentHashMap(); protected final ConcurrentHashMap networkBridges = new ConcurrentHashMap(); - protected final ConcurrentHashMap durableSubscriptions = new ConcurrentHashMap(); protected final ProducerId advisoryProducerId = new ProducerId(); private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); @@ -81,12 +92,6 @@ public class AdvisoryBroker extends BrokerFilter { // Don't advise advisory topics. if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { - if (info.getDestination().isTopic() && info.isDurable()) { - SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); - if (!this.durableSubscriptions.contains(key)) { - this.durableSubscriptions.put(key, (ActiveMQTopic)info.getDestination()); - } - } ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()); consumers.put(info.getConsumerId(), info); fireConsumerAdvisory(context, info.getDestination(), topic, info); @@ -258,26 +263,6 @@ public class AdvisoryBroker extends BrokerFilter { } } - @Override - public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { - super.removeSubscription(context, info); - - SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); - - ActiveMQTopic dest = durableSubscriptions.get(key); - if (dest == null) { - LOG.warn("We cannot send an advisory message for a durable sub removal when we don't know about the durable sub"); - } - - // Don't advise advisory topics. - if (!AdvisorySupport.isAdvisoryTopic(dest)) { - ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest); - durableSubscriptions.remove(key); - fireConsumerAdvisory(context,dest, topic, info); - } - - } - @Override public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { super.removeProducer(context, info); diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index ebfeeba96c..fb7083eaa8 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -50,7 +50,32 @@ import org.apache.activemq.broker.region.Region; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.command.*; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTempDestination; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.BrokerId; +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.command.Command; +import org.apache.activemq.command.ConnectionError; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.DataStructure; +import org.apache.activemq.command.DestinationInfo; +import org.apache.activemq.command.ExceptionResponse; +import org.apache.activemq.command.KeepAliveInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.NetworkBridgeFilter; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.RemoveInfo; +import org.apache.activemq.command.Response; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.command.ShutdownInfo; +import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.security.SecurityContext; @@ -791,11 +816,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } else if (data.getClass() == RemoveInfo.class) { ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId(); removeDemandSubscription(id); - } else if (data.getClass() == RemoveSubscriptionInfo.class) { - RemoveSubscriptionInfo durableSub = (RemoveSubscriptionInfo)data; - LOG.debug("Removing durable subscription: clientId: " + durableSub.getClientId() - + ", durableName: " + durableSub.getSubcriptionName()); - localBroker.oneway(data); } } diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml index a419eb6f6c..d400ab9b4d 100755 --- a/activemq-unit-tests/pom.xml +++ b/activemq-unit-tests/pom.xml @@ -570,6 +570,8 @@ **/StoreQueueCursorJournalNoDuplicateTest.* **/org.apache.activemq.usecases.ThreeBrokerVirtualTopicNetworkAMQPATest.* **/LevelDBXARecoveryBrokerTest.* + + **/DurableSubInBrokerNetworkTest.*