This commit is contained in:
Rob Davies 2013-09-11 09:21:06 +01:00
parent 0ff359341c
commit 16c1627ca0
4 changed files with 32 additions and 2 deletions

View File

@ -332,4 +332,8 @@ public class ConnectionContext {
public XATransactionId getXid() { public XATransactionId getXid() {
return xid; return xid;
} }
public boolean isAllowLinkStealing(){
return connector != null && connector.isAllowLinkStealing();
}
} }

View File

@ -60,4 +60,10 @@ public interface Connector extends Service {
public boolean isUpdateClusterClientsOnRemove(); public boolean isUpdateClusterClientsOnRemove();
int connectionCount(); int connectionCount();
/**
* If enabled, older connections with the same clientID are stopped
* @return true/false if link stealing is enabled
*/
boolean isAllowLinkStealing();
} }

View File

@ -74,6 +74,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
private int maximumProducersAllowedPerConnection = Integer.MAX_VALUE; private int maximumProducersAllowedPerConnection = Integer.MAX_VALUE;
private int maximumConsumersAllowedPerConnection = Integer.MAX_VALUE; private int maximumConsumersAllowedPerConnection = Integer.MAX_VALUE;
private PublishedAddressPolicy publishedAddressPolicy = new PublishedAddressPolicy(); private PublishedAddressPolicy publishedAddressPolicy = new PublishedAddressPolicy();
private boolean allowLinkStealing;
LinkedList<String> peerBrokers = new LinkedList<String>(); LinkedList<String> peerBrokers = new LinkedList<String>();
@ -573,6 +574,15 @@ public class TransportConnector implements Connector, BrokerServiceAware {
return connections.size(); return connections.size();
} }
@Override
public boolean isAllowLinkStealing() {
return allowLinkStealing;
}
public void setAllowLinkStealing (boolean allowLinkStealing) {
this.allowLinkStealing=allowLinkStealing;
}
public boolean isAuditNetworkProducers() { public boolean isAuditNetworkProducers() {
return auditNetworkProducers; return auditNetworkProducers;
} }

View File

@ -231,8 +231,18 @@ public class RegionBroker extends EmptyBroker {
synchronized (clientIdSet) { synchronized (clientIdSet) {
ConnectionContext oldContext = clientIdSet.get(clientId); ConnectionContext oldContext = clientIdSet.get(clientId);
if (oldContext != null) { if (oldContext != null) {
throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " if (context.isAllowLinkStealing()){
+ oldContext.getConnection().getRemoteAddress()); clientIdSet.remove(clientId);
if (oldContext.getConnection() != null) {
LOG.warn("Stealing link for clientId " + clientId + " From Connection " + oldContext.getConnection());
oldContext.getConnection().stop();
}else{
LOG.error("Not Connection for " + oldContext);
}
}else{
throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from "
+ oldContext.getConnection().getRemoteAddress());
}
} else { } else {
clientIdSet.put(clientId, context); clientIdSet.put(clientId, context);
} }