https://issues.apache.org/jira/browse/AMQ-4000 - initial implementation of keeping track of durable subscribers over network and unregister them appropriately

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1483827 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2013-05-17 14:27:48 +00:00
parent 1558cbbb55
commit 620523a597
7 changed files with 85 additions and 36 deletions

View File

@ -24,6 +24,7 @@ import java.util.List;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.transport.Transport;
import org.slf4j.Logger;
@ -84,7 +85,11 @@ public class ConduitBridge extends DemandForwardingBridge {
}
// add the interest in the subscription
if (checkPaths(info.getBrokerPath(), ds.getRemoteInfo().getBrokerPath())) {
ds.add(info.getConsumerId());
if (!info.isDurable()) {
ds.add(info.getConsumerId());
} else {
ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
}
}
matched = true;
// continue - we want interest to any existing DemandSubscriptions

View File

@ -19,10 +19,7 @@ package org.apache.activemq.network;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@ -50,32 +47,7 @@ import org.apache.activemq.broker.region.Region;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.NetworkBridgeFilter;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.command.*;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.security.SecurityContext;
@ -147,7 +119,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
private final AtomicBoolean started = new AtomicBoolean();
private TransportConnection duplexInitiatingConnection;
private BrokerService brokerService = null;
protected BrokerService brokerService = null;
private ObjectName mbeanObjectName;
private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
private Transport duplexInboundLocalBroker = null;
@ -818,6 +790,28 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} else if (data.getClass() == RemoveInfo.class) {
ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
removeDemandSubscription(id);
} else if (data.getClass() == RemoveSubscriptionInfo.class) {
RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
DemandSubscription ds = (DemandSubscription) i.next();
boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
if (removed) {
if (ds.getDurableRemoteSubs().isEmpty()) {
// deactivate subscriber
RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId());
localBroker.oneway(removeInfo);
// remove subscriber
RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
sending.setClientId(localClientId);
sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName());
sending.setConnectionId(this.localConnectionInfo.getConnectionId());
localBroker.oneway(sending);
}
}
}
}
}
@ -1180,6 +1174,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (duplicateSuppressionIsRequired(sub)) {
undoMapRegistration(sub);
} else {
if (consumerInfo.isDurable()) {
sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()));
}
addSubscription(sub);
consumerAdded = true;
}
@ -1274,7 +1271,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
return found;
}
private final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
protected final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker();
Region region;
Collection<Subscription> subs;
@ -1370,8 +1367,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
// may need to change if we ever subscribe to a remote broker.
sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter());
} else {
// need to ack this message if it is ignored as it is durable so
// we check before we send. see: suppressMessageDispatch()
sub.setLocalDurableSubscriber(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
}
}

View File

@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.NetworkBridgeFilter;
import org.apache.activemq.command.SubscriptionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,6 +40,8 @@ public class DemandSubscription {
private final Set<ConsumerId> remoteSubsIds = new CopyOnWriteArraySet<ConsumerId>();
private final AtomicInteger dispatched = new AtomicInteger(0);
private final AtomicBoolean activeWaiter = new AtomicBoolean();
private final Set<SubscriptionInfo> durableRemoteSubs = new CopyOnWriteArraySet<SubscriptionInfo>();
private SubscriptionInfo localDurableSubscriber;
private NetworkBridgeFilter networkBridgeFilter;
@ -69,6 +72,10 @@ public class DemandSubscription {
return remoteSubsIds.remove(id);
}
public Set<SubscriptionInfo> getDurableRemoteSubs() {
return durableRemoteSubs;
}
/**
* @return true if there are no interested consumers
*/
@ -138,4 +145,12 @@ public class DemandSubscription {
public void setNetworkBridgeFilter(NetworkBridgeFilter networkBridgeFilter) {
this.networkBridgeFilter = networkBridgeFilter;
}
public SubscriptionInfo getLocalDurableSubscriber() {
return localDurableSubscriber;
}
public void setLocalDurableSubscriber(SubscriptionInfo localDurableSubscriber) {
this.localDurableSubscriber = localDurableSubscriber;
}
}

View File

@ -214,6 +214,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
this.info = new ConsumerInfo(consumerId);
this.info.setExclusive(this.session.connection.isExclusiveConsumer());
this.info.setClientId(this.session.connection.getClientID());
this.info.setSubscriptionName(name);
this.info.setPrefetchSize(prefetch);
this.info.setCurrentPrefetchSize(prefetch);

View File

@ -42,6 +42,7 @@ public class ConsumerInfo extends BaseCommand {
protected boolean browser;
protected boolean dispatchAsync;
protected String selector;
protected String clientId;
protected String subscriptionName;
protected boolean noLocal;
protected boolean exclusive;
@ -93,6 +94,7 @@ public class ConsumerInfo extends BaseCommand {
info.browser = browser;
info.dispatchAsync = dispatchAsync;
info.selector = selector;
info.clientId = clientId;
info.subscriptionName = subscriptionName;
info.noLocal = noLocal;
info.exclusive = exclusive;
@ -216,6 +218,19 @@ public class ConsumerInfo extends BaseCommand {
this.selector = selector;
}
/**
* Used to identify the id of a client connection.
*
* @openwire:property version=1
*/
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
/**
* Used to identify the name of a durable subscription.
*

View File

@ -34,6 +34,13 @@ public class SubscriptionInfo implements DataStructure {
protected String subscriptionName;
protected String selector;
public SubscriptionInfo() {}
public SubscriptionInfo(String clientId, String subscriptionName) {
this.clientId = clientId;
this.subscriptionName = subscriptionName;
}
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}

View File

@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
*/
public class DurableSubInBrokerNetworkTest extends NetworkTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(NetworkConnector.class);
private static final Logger LOG = LoggerFactory.getLogger(DurableSubInBrokerNetworkTest.class);
// protected BrokerService localBroker;
private final String subName = "Subscriber1";
private final String subName2 = "Subscriber2";
@ -152,6 +152,16 @@ public class DurableSubInBrokerNetworkTest extends NetworkTestSupport {
assertTrue("Durable subscription should still be on remote broker",
foundSubInRemoteBrokerByTopicName(topicName));
sub2.close();
session.unsubscribe(subName2);
Thread.sleep(100);
assertFalse(foundSubInLocalBroker(subName2));
assertFalse("Durable subscription not unregistered on remote broker",
foundSubInRemoteBrokerByTopicName(topicName));
}
private boolean foundSubInRemoteBrokerByTopicName(String topicName) throws Exception {