mirror of https://github.com/apache/activemq.git
fix for: https://issues.apache.org/jira/browse/AMQ-3707 fix for: https://issues.apache.org/jira/browse/AMQ-4024 Add a strategy class for use in constructing the Published connect string for a transport connector. Allows for setting whether the connector will send IP addres, hostname or FQDN along with configurable transport query options which allows control of client side transport and wireformat settings. User can override with their own version to further customize. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1423655 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
14e76891e2
commit
a4be10c146
|
@ -0,0 +1,200 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.URI;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Locale;
|
||||
|
||||
import org.apache.activemq.util.InetAddressUtil;
|
||||
|
||||
/**
|
||||
* Policy object that controls how a TransportConnector publishes the connector's
|
||||
* address to the outside world. By default the connector will publish itself
|
||||
* using the resolved host name of the bound server socket.
|
||||
*
|
||||
* @org.apache.xbean.XBean
|
||||
*/
|
||||
public class PublishedAddressPolicy {
|
||||
|
||||
private String clusterClientUriQuery;
|
||||
private PublishedHostStrategy publishedHostStrategy = PublishedHostStrategy.DEFAULT;
|
||||
|
||||
/**
|
||||
* Defines the value of the published host value.
|
||||
*/
|
||||
public enum PublishedHostStrategy {
|
||||
DEFAULT,
|
||||
IPADDRESS,
|
||||
HOSTNAME,
|
||||
FQDN;
|
||||
|
||||
public static PublishedHostStrategy getValue(String value) {
|
||||
return valueOf(value.toUpperCase(Locale.ENGLISH));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Using the supplied TransportConnector this method returns the String that will
|
||||
* be used to update clients with this connector's connect address.
|
||||
*
|
||||
* @param connector
|
||||
* The TransportConnector whose address is to be published.
|
||||
* @return a string URI address that a client can use to connect to this Transport.
|
||||
* @throws Exception
|
||||
*/
|
||||
public String getPublishableConnectString(TransportConnector connector) throws Exception {
|
||||
|
||||
URI connectorURI = connector.getConnectUri();
|
||||
|
||||
if (connectorURI == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
String scheme = connectorURI.getScheme();
|
||||
String userInfo = getPublishedUserInfoValue(connectorURI.getUserInfo());
|
||||
String host = getPublishedHostValue(connectorURI.getHost());
|
||||
int port = connectorURI.getPort();
|
||||
String path = getPublishedPathValue(connectorURI.getPath());
|
||||
String fragment = getPublishedFragmentValue(connectorURI.getFragment());
|
||||
|
||||
URI publishedURI = new URI(scheme, userInfo, host, port, path, getClusterClientUriQuery(), fragment);
|
||||
|
||||
return publishedURI.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Subclasses can override what host value is published by implementing alternate
|
||||
* logic for this method.
|
||||
*
|
||||
* @param uriHostEntry
|
||||
* @return
|
||||
* @throws UnknownHostException
|
||||
*/
|
||||
protected String getPublishedHostValue(String uriHostEntry) throws UnknownHostException {
|
||||
|
||||
// By default we just republish what was already present.
|
||||
String result = uriHostEntry;
|
||||
|
||||
if (this.publishedHostStrategy.equals(PublishedHostStrategy.IPADDRESS)) {
|
||||
InetAddress address = InetAddress.getByName(uriHostEntry);
|
||||
result = address.getHostAddress();
|
||||
} else if (this.publishedHostStrategy.equals(PublishedHostStrategy.HOSTNAME)) {
|
||||
InetAddress address = InetAddress.getByName(uriHostEntry);
|
||||
if (address.isAnyLocalAddress()) {
|
||||
// make it more human readable and useful, an alternative to 0.0.0.0
|
||||
result = InetAddressUtil.getLocalHostName();
|
||||
} else {
|
||||
result = address.getHostName();
|
||||
}
|
||||
} else if (this.publishedHostStrategy.equals(PublishedHostStrategy.FQDN)) {
|
||||
InetAddress address = InetAddress.getByName(uriHostEntry);
|
||||
if (address.isAnyLocalAddress()) {
|
||||
// make it more human readable and useful, an alternative to 0.0.0.0
|
||||
result = InetAddressUtil.getLocalHostName();
|
||||
} else {
|
||||
result = address.getCanonicalHostName();
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Subclasses can override what path value is published by implementing alternate
|
||||
* logic for this method. By default this method simply returns what was already
|
||||
* set as the Path value in the original URI.
|
||||
*
|
||||
* @param uriPathEntry
|
||||
* The original value of the URI path.
|
||||
*
|
||||
* @return the desired value for the published URI's path.
|
||||
*/
|
||||
protected String getPublishedPathValue(String uriPathEntry) {
|
||||
return uriPathEntry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Subclasses can override what host value is published by implementing alternate
|
||||
* logic for this method. By default this method simply returns what was already
|
||||
* set as the Fragment value in the original URI.
|
||||
*
|
||||
* @param uriFragmentEntry
|
||||
* The original value of the URI Fragment.
|
||||
*
|
||||
* @return the desired value for the published URI's Fragment.
|
||||
*/
|
||||
protected String getPublishedFragmentValue(String uriFragmentEntry) {
|
||||
return uriFragmentEntry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Subclasses can override what user info value is published by implementing alternate
|
||||
* logic for this method. By default this method simply returns what was already
|
||||
* set as the UserInfo value in the original URI.
|
||||
*
|
||||
* @param uriUserInfoEntry
|
||||
* The original value of the URI user info.
|
||||
*
|
||||
* @return the desired value for the published URI's user info.
|
||||
*/
|
||||
protected String getPublishedUserInfoValue(String uriUserInfoEntry) {
|
||||
return uriUserInfoEntry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the URI query that's configured on the published URI that's sent to client's
|
||||
* when the cluster info is updated.
|
||||
*
|
||||
* @return the clusterClientUriQuery
|
||||
*/
|
||||
public String getClusterClientUriQuery() {
|
||||
return clusterClientUriQuery;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the URI query that's configured on the published URI that's sent to client's
|
||||
* when the cluster info is updated.
|
||||
*
|
||||
* @param clusterClientUriQuery the clusterClientUriQuery to set
|
||||
*/
|
||||
public void setClusterClientUriQuery(String clusterClientUriQuery) {
|
||||
this.clusterClientUriQuery = clusterClientUriQuery;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the publishedHostStrategy
|
||||
*/
|
||||
public PublishedHostStrategy getPublishedHostStrategy() {
|
||||
return publishedHostStrategy;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param publishedHostStrategy the publishedHostStrategy to set
|
||||
*/
|
||||
public void setPublishedHostStrategy(PublishedHostStrategy strategy) {
|
||||
this.publishedHostStrategy = strategy;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param publishedHostStrategy the publishedHostStrategy to set
|
||||
*/
|
||||
public void setPublishedHostStrategy(String strategy) {
|
||||
this.publishedHostStrategy = PublishedHostStrategy.getValue(strategy);
|
||||
}
|
||||
}
|
|
@ -33,7 +33,10 @@ import org.apache.activemq.command.BrokerInfo;
|
|||
import org.apache.activemq.command.ConnectionControl;
|
||||
import org.apache.activemq.security.MessageAuthorizationPolicy;
|
||||
import org.apache.activemq.thread.TaskRunnerFactory;
|
||||
import org.apache.activemq.transport.*;
|
||||
import org.apache.activemq.transport.Transport;
|
||||
import org.apache.activemq.transport.TransportAcceptListener;
|
||||
import org.apache.activemq.transport.TransportFactorySupport;
|
||||
import org.apache.activemq.transport.TransportServer;
|
||||
import org.apache.activemq.transport.discovery.DiscoveryAgent;
|
||||
import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
|
||||
import org.apache.activemq.util.ServiceStopper;
|
||||
|
@ -70,6 +73,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
|
|||
private boolean auditNetworkProducers = false;
|
||||
private int maximumProducersAllowedPerConnection = Integer.MAX_VALUE;
|
||||
private int maximumConsumersAllowedPerConnection = Integer.MAX_VALUE;
|
||||
private PublishedAddressPolicy publishedAddressPolicy = new PublishedAddressPolicy();
|
||||
|
||||
LinkedList<String> peerBrokers = new LinkedList<String>();
|
||||
|
||||
|
@ -117,9 +121,11 @@ public class TransportConnector implements Connector, BrokerServiceAware {
|
|||
rc.setAuditNetworkProducers(isAuditNetworkProducers());
|
||||
rc.setMaximumConsumersAllowedPerConnection(getMaximumConsumersAllowedPerConnection());
|
||||
rc.setMaximumProducersAllowedPerConnection(getMaximumProducersAllowedPerConnection());
|
||||
rc.setPublishedAddressPolicy(getPublishedAddressPolicy());
|
||||
return rc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BrokerInfo getBrokerInfo() {
|
||||
return brokerInfo;
|
||||
}
|
||||
|
@ -172,6 +178,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
|
|||
/**
|
||||
* @return the statistics for this connector
|
||||
*/
|
||||
@Override
|
||||
public ConnectorStatistics getStatistics() {
|
||||
return statistics;
|
||||
}
|
||||
|
@ -188,6 +195,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
|
|||
this.messageAuthorizationPolicy = messageAuthorizationPolicy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() throws Exception {
|
||||
broker = brokerService.getBroker();
|
||||
brokerInfo.setBrokerName(broker.getBrokerName());
|
||||
|
@ -196,9 +204,11 @@ public class TransportConnector implements Connector, BrokerServiceAware {
|
|||
brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
|
||||
brokerInfo.setBrokerURL(broker.getBrokerService().getDefaultSocketURIString());
|
||||
getServer().setAcceptListener(new TransportAcceptListener() {
|
||||
@Override
|
||||
public void onAccept(final Transport transport) {
|
||||
try {
|
||||
brokerService.getTaskRunnerFactory().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
Connection connection = createConnection(transport);
|
||||
|
@ -217,6 +227,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAcceptError(Exception error) {
|
||||
onAcceptError(error, null);
|
||||
}
|
||||
|
@ -244,25 +255,14 @@ public class TransportConnector implements Connector, BrokerServiceAware {
|
|||
}
|
||||
|
||||
public String getPublishableConnectString() throws Exception {
|
||||
return getPublishableConnectString(getConnectUri());
|
||||
}
|
||||
|
||||
public String getPublishableConnectString(URI theConnectURI) throws Exception {
|
||||
String publishableConnectString = null;
|
||||
if (theConnectURI != null) {
|
||||
publishableConnectString = theConnectURI.toString();
|
||||
// strip off server side query parameters which may not be compatible to clients
|
||||
if (theConnectURI.getRawQuery() != null) {
|
||||
publishableConnectString = publishableConnectString.substring(0, publishableConnectString
|
||||
.indexOf(theConnectURI.getRawQuery()) - 1);
|
||||
}
|
||||
}
|
||||
String publishableConnectString = publishedAddressPolicy.getPublishableConnectString(this);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Publishing: " + publishableConnectString + " for broker transport URI: " + theConnectURI);
|
||||
LOG.debug("Publishing: " + publishableConnectString + " for broker transport URI: " + getConnectUri());
|
||||
}
|
||||
return publishableConnectString;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws Exception {
|
||||
ServiceStopper ss = new ServiceStopper();
|
||||
if (discoveryAgent != null) {
|
||||
|
@ -425,6 +425,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateClientClusterInfo() {
|
||||
if (isRebalanceClusterClients() || isUpdateClusterClients()) {
|
||||
ConnectionControl control = getConnectionControl();
|
||||
|
@ -488,6 +489,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
|
|||
/**
|
||||
* This is called by the BrokerService right before it starts the transport.
|
||||
*/
|
||||
@Override
|
||||
public void setBrokerService(BrokerService brokerService) {
|
||||
this.brokerService = brokerService;
|
||||
}
|
||||
|
@ -503,6 +505,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
|
|||
/**
|
||||
* @return the updateClusterClients
|
||||
*/
|
||||
@Override
|
||||
public boolean isUpdateClusterClients() {
|
||||
return this.updateClusterClients;
|
||||
}
|
||||
|
@ -518,6 +521,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
|
|||
/**
|
||||
* @return the rebalanceClusterClients
|
||||
*/
|
||||
@Override
|
||||
public boolean isRebalanceClusterClients() {
|
||||
return this.rebalanceClusterClients;
|
||||
}
|
||||
|
@ -533,6 +537,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
|
|||
/**
|
||||
* @return the updateClusterClientsOnRemove
|
||||
*/
|
||||
@Override
|
||||
public boolean isUpdateClusterClientsOnRemove() {
|
||||
return this.updateClusterClientsOnRemove;
|
||||
}
|
||||
|
@ -559,6 +564,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
|
|||
this.updateClusterFilter = updateClusterFilter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int connectionCount() {
|
||||
return connections.size();
|
||||
}
|
||||
|
@ -591,4 +597,24 @@ public class TransportConnector implements Connector, BrokerServiceAware {
|
|||
public void setMaximumConsumersAllowedPerConnection(int maximumConsumersAllowedPerConnection) {
|
||||
this.maximumConsumersAllowedPerConnection = maximumConsumersAllowedPerConnection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the currently configured policy for creating the published connection address of this
|
||||
* TransportConnector.
|
||||
*
|
||||
* @return the publishedAddressPolicy
|
||||
*/
|
||||
public PublishedAddressPolicy getPublishedAddressPolicy() {
|
||||
return publishedAddressPolicy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the configured policy for creating the published connection address of this
|
||||
* TransportConnector.
|
||||
*
|
||||
* @return the publishedAddressPolicy
|
||||
*/
|
||||
public void setPublishedAddressPolicy(PublishedAddressPolicy publishedAddressPolicy) {
|
||||
this.publishedAddressPolicy = publishedAddressPolicy;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue