Add a cluster update filter

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@921821 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2010-03-11 12:17:10 +00:00
parent fef398ac30
commit dc0f5b392b
1 changed files with 105 additions and 64 deletions

View File

@ -16,11 +16,14 @@
*/ */
package org.apache.activemq.broker; package org.apache.activemq.broker;
import static org.apache.activemq.thread.DefaultThreadPools.getDefaultTaskRunnerFactory;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Iterator; import java.util.Iterator;
import java.util.StringTokenizer;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.regex.Pattern;
import javax.management.ObjectName; import javax.management.ObjectName;
import org.apache.activemq.broker.jmx.ManagedTransportConnector; import org.apache.activemq.broker.jmx.ManagedTransportConnector;
import org.apache.activemq.broker.jmx.ManagementContext; import org.apache.activemq.broker.jmx.ManagementContext;
@ -28,7 +31,6 @@ import org.apache.activemq.broker.region.ConnectorStatistics;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionControl; import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.security.MessageAuthorizationPolicy; import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener; import org.apache.activemq.transport.TransportAcceptListener;
@ -41,23 +43,13 @@ import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import static org.apache.activemq.thread.DefaultThreadPools.*;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.management.ObjectName;
/** /**
* @org.apache.xbean.XBean * @org.apache.xbean.XBean
* @version $Revision: 1.6 $ * @version $Revision: 1.6 $
*/ */
public class TransportConnector implements Connector, BrokerServiceAware { public class TransportConnector implements Connector, BrokerServiceAware {
private static final Log LOG = LogFactory.getLog(TransportConnector.class); final Log LOG = LogFactory.getLog(TransportConnector.class);
protected CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>(); protected CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>();
protected TransportStatusDetector statusDector; protected TransportStatusDetector statusDector;
@ -75,9 +67,10 @@ public class TransportConnector implements Connector, BrokerServiceAware {
private boolean disableAsyncDispatch; private boolean disableAsyncDispatch;
private boolean enableStatusMonitor = false; private boolean enableStatusMonitor = false;
private Broker broker; private Broker broker;
private boolean updateClusterClients=false; private boolean updateClusterClients = false;
private boolean rebalanceClusterClients; private boolean rebalanceClusterClients;
private String updateClusterFilter;
public TransportConnector() { public TransportConnector() {
} }
@ -93,7 +86,6 @@ public class TransportConnector implements Connector, BrokerServiceAware {
} }
/** /**
* @return Returns the connections. * @return Returns the connections.
*/ */
@ -105,7 +97,8 @@ public class TransportConnector implements Connector, BrokerServiceAware {
* Factory method to create a JMX managed version of this transport * Factory method to create a JMX managed version of this transport
* connector * connector
*/ */
public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName) throws IOException, URISyntaxException { public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName)
throws IOException, URISyntaxException {
ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer()); ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer());
rc.setBrokerInfo(getBrokerInfo()); rc.setBrokerInfo(getBrokerInfo());
rc.setConnectUri(getConnectUri()); rc.setConnectUri(getConnectUri());
@ -130,15 +123,16 @@ public class TransportConnector implements Connector, BrokerServiceAware {
public void setBrokerInfo(BrokerInfo brokerInfo) { public void setBrokerInfo(BrokerInfo brokerInfo) {
this.brokerInfo = brokerInfo; this.brokerInfo = brokerInfo;
} }
/** /**
* *
* @deprecated use the {@link #setBrokerService(BrokerService)} method instead. * @deprecated use the {@link #setBrokerService(BrokerService)} method
* instead.
*/ */
@Deprecated @Deprecated
public void setBrokerName(String name) { public void setBrokerName(String name) {
if (this.brokerInfo==null) { if (this.brokerInfo == null) {
this.brokerInfo=new BrokerInfo(); this.brokerInfo = new BrokerInfo();
} }
this.brokerInfo.setBrokerName(name); this.brokerInfo.setBrokerName(name);
} }
@ -204,7 +198,6 @@ public class TransportConnector implements Connector, BrokerServiceAware {
} }
public void start() throws Exception { public void start() throws Exception {
TransportServer server = getServer();
broker = brokerService.getBroker(); broker = brokerService.getBroker();
brokerInfo.setBrokerName(broker.getBrokerName()); brokerInfo.setBrokerName(broker.getBrokerName());
brokerInfo.setBrokerId(broker.getBrokerId()); brokerInfo.setBrokerId(broker.getBrokerId());
@ -214,7 +207,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
server.setAcceptListener(new TransportAcceptListener() { server.setAcceptListener(new TransportAcceptListener() {
public void onAccept(final Transport transport) { public void onAccept(final Transport transport) {
try { try {
getDefaultTaskRunnerFactory().execute(new Runnable(){ getDefaultTaskRunnerFactory().execute(new Runnable() {
public void run() { public void run() {
try { try {
Connection connection = createConnection(transport); Connection connection = createConnection(transport);
@ -237,13 +230,14 @@ public class TransportConnector implements Connector, BrokerServiceAware {
} }
private void onAcceptError(Exception error, String remoteHost) { private void onAcceptError(Exception error, String remoteHost) {
LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from " + remoteHost) + ": " + error); LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from " + remoteHost) + ": "
+ error);
LOG.debug("Reason: " + error, error); LOG.debug("Reason: " + error, error);
} }
}); });
server.setBrokerInfo(brokerInfo); server.setBrokerInfo(brokerInfo);
server.start(); server.start();
DiscoveryAgent da = getDiscoveryAgent(); DiscoveryAgent da = getDiscoveryAgent();
if (da != null) { if (da != null) {
da.registerService(getPublishableConnectString()); da.registerService(getPublishableConnectString());
@ -258,15 +252,16 @@ public class TransportConnector implements Connector, BrokerServiceAware {
} }
private String getPublishableConnectString() throws Exception { private String getPublishableConnectString() throws Exception {
URI connectUri = getConnectUri(); URI theConnectURI = getConnectUri();
String publishableConnectString = connectUri.toString(); String publishableConnectString = theConnectURI.toString();
// strip off server side query parameters which may not be compatible to clients // strip off server side query parameters which may not be compatible to
if (connectUri.getRawQuery() != null) { // clients
publishableConnectString = if (theConnectURI.getRawQuery() != null) {
publishableConnectString.substring(0, publishableConnectString.indexOf(connectUri.getRawQuery()) -1); publishableConnectString = publishableConnectString.substring(0, publishableConnectString
.indexOf(theConnectURI.getRawQuery()) - 1);
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Publishing: " + publishableConnectString + " for broker transport URI: " + connectUri); LOG.debug("Publishing: " + publishableConnectString + " for broker transport URI: " + theConnectURI);
} }
return publishableConnectString; return publishableConnectString;
} }
@ -295,7 +290,8 @@ public class TransportConnector implements Connector, BrokerServiceAware {
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
protected Connection createConnection(Transport transport) throws IOException { protected Connection createConnection(Transport transport) throws IOException {
TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null : taskRunnerFactory); TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null
: taskRunnerFactory);
boolean statEnabled = this.getStatistics().isEnabled(); boolean statEnabled = this.getStatistics().isEnabled();
answer.getStatistics().setEnabled(statEnabled); answer.getStatistics().setEnabled(statEnabled);
answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy); answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy);
@ -307,9 +303,10 @@ public class TransportConnector implements Connector, BrokerServiceAware {
throw new IllegalArgumentException("You must specify either a server or uri property"); throw new IllegalArgumentException("You must specify either a server or uri property");
} }
if (brokerService == null) { if (brokerService == null) {
throw new IllegalArgumentException("You must specify the brokerService property. Maybe this connector should be added to a broker?"); throw new IllegalArgumentException(
"You must specify the brokerService property. Maybe this connector should be added to a broker?");
} }
return TransportFactory.bind(brokerService, uri); return TransportFactory.bind(brokerService, uri);
} }
public DiscoveryAgent getDiscoveryAgent() throws IOException { public DiscoveryAgent getDiscoveryAgent() throws IOException {
@ -381,44 +378,70 @@ public class TransportConnector implements Connector, BrokerServiceAware {
} }
return rc; return rc;
} }
protected ConnectionControl getConnectionControl() { protected ConnectionControl getConnectionControl() {
boolean rebalance = isRebalanceClusterClients(); boolean rebalance = isRebalanceClusterClients();
String connectedBrokers = ""; String connectedBrokers = "";
String self = ""; String self = "";
if (brokerService.getDefaultSocketURI() != null) { if (brokerService.getDefaultSocketURI() != null) {
self += brokerService.getDefaultSocketURI().toString(); self += brokerService.getDefaultSocketURI().toString();
self += ","; self += ",";
} }
if (rebalance == false) { if (rebalance == false) {
connectedBrokers += self; connectedBrokers += self;
} }
if (this.broker.getPeerBrokerInfos() != null) { if (this.broker.getPeerBrokerInfos() != null) {
for (BrokerInfo info : this.broker.getPeerBrokerInfos()) { for (BrokerInfo info : this.broker.getPeerBrokerInfos()) {
connectedBrokers += info.getBrokerURL(); if (isMatchesClusterFilter(info.getBrokerName())) {
connectedBrokers += ","; connectedBrokers += info.getBrokerURL();
} connectedBrokers += ",";
} }
if (rebalance) {
connectedBrokers += self;
} }
}
if (rebalance) {
connectedBrokers += self;
}
ConnectionControl control = new ConnectionControl();
control.setConnectedBrokers(connectedBrokers);
control.setRebalanceConnection(rebalance);
return control;
ConnectionControl control = new ConnectionControl();
control.setConnectedBrokers(connectedBrokers);
control.setRebalanceConnection(rebalance);
return control;
} }
public void updateClientClusterInfo() { public void updateClientClusterInfo() {
if (isRebalanceClusterClients() || isUpdateClusterClients()) { if (isRebalanceClusterClients() || isUpdateClusterClients()) {
ConnectionControl control = getConnectionControl(); ConnectionControl control = getConnectionControl();
for (Connection c: this.connections) { for (Connection c : this.connections) {
c.updateClient(control); c.updateClient(control);
} }
} }
} }
private boolean isMatchesClusterFilter(String brokerName) {
boolean result = true;
String filter = getUpdateClusterFilter();
if (filter != null) {
filter = filter.trim();
if (filter.length() > 0) {
StringTokenizer tokenizer = new StringTokenizer(filter, ",");
while (result && tokenizer.hasMoreTokens()) {
String token = tokenizer.nextToken();
result = isMatchesClusterFilter(brokerName, token);
}
}
}
return result;
}
private boolean isMatchesClusterFilter(String brokerName, String match) {
boolean result = true;
if (brokerName != null && match != null && brokerName.length() > 0 && match.length() > 0) {
result = Pattern.matches(match, brokerName);
}
return result;
}
public boolean isDisableAsyncDispatch() { public boolean isDisableAsyncDispatch() {
return disableAsyncDispatch; return disableAsyncDispatch;
} }
@ -435,7 +458,8 @@ public class TransportConnector implements Connector, BrokerServiceAware {
} }
/** /**
* @param enableStatusMonitor the enableStatusMonitor to set * @param enableStatusMonitor
* the enableStatusMonitor to set
*/ */
public void setEnableStatusMonitor(boolean enableStatusMonitor) { public void setEnableStatusMonitor(boolean enableStatusMonitor) {
this.enableStatusMonitor = enableStatusMonitor; this.enableStatusMonitor = enableStatusMonitor;
@ -452,9 +476,9 @@ public class TransportConnector implements Connector, BrokerServiceAware {
return broker; return broker;
} }
public BrokerService getBrokerService() { public BrokerService getBrokerService() {
return brokerService; return brokerService;
} }
/** /**
* @return the updateClusterClients * @return the updateClusterClients
@ -464,7 +488,8 @@ public class TransportConnector implements Connector, BrokerServiceAware {
} }
/** /**
* @param updateClusterClients the updateClusterClients to set * @param updateClusterClients
* the updateClusterClients to set
*/ */
public void setUpdateClusterClients(boolean updateClusterClients) { public void setUpdateClusterClients(boolean updateClusterClients) {
this.updateClusterClients = updateClusterClients; this.updateClusterClients = updateClusterClients;
@ -478,9 +503,25 @@ public class TransportConnector implements Connector, BrokerServiceAware {
} }
/** /**
* @param rebalanceClusterClients the rebalanceClusterClients to set * @param rebalanceClusterClients
* the rebalanceClusterClients to set
*/ */
public void setRebalanceClusterClients(boolean rebalanceClusterClients) { public void setRebalanceClusterClients(boolean rebalanceClusterClients) {
this.rebalanceClusterClients = rebalanceClusterClients; this.rebalanceClusterClients = rebalanceClusterClients;
} }
/**
* @return the updateClusterFilter
*/
public String getUpdateClusterFilter() {
return this.updateClusterFilter;
}
/**
* @param updateClusterFilter
* the updateClusterFilter to set
*/
public void setUpdateClusterFilter(String updateClusterFilter) {
this.updateClusterFilter = updateClusterFilter;
}
} }