diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/PublishedAddressPolicy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/PublishedAddressPolicy.java new file mode 100644 index 0000000000..01f22d3409 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/PublishedAddressPolicy.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.net.InetAddress; +import java.net.URI; +import java.net.UnknownHostException; +import java.util.Locale; + +import org.apache.activemq.util.InetAddressUtil; + +/** + * Policy object that controls how a TransportConnector publishes the connector's + * address to the outside world. By default the connector will publish itself + * using the resolved host name of the bound server socket. + * + * @org.apache.xbean.XBean + */ +public class PublishedAddressPolicy { + + private String clusterClientUriQuery; + private PublishedHostStrategy publishedHostStrategy = PublishedHostStrategy.DEFAULT; + + /** + * Defines the value of the published host value. + */ + public enum PublishedHostStrategy { + DEFAULT, + IPADDRESS, + HOSTNAME, + FQDN; + + public static PublishedHostStrategy getValue(String value) { + return valueOf(value.toUpperCase(Locale.ENGLISH)); + } + } + + /** + * Using the supplied TransportConnector this method returns the String that will + * be used to update clients with this connector's connect address. + * + * @param connector + * The TransportConnector whose address is to be published. + * @return a string URI address that a client can use to connect to this Transport. + * @throws Exception + */ + public String getPublishableConnectString(TransportConnector connector) throws Exception { + + URI connectorURI = connector.getConnectUri(); + + if (connectorURI == null) { + return null; + } + + String scheme = connectorURI.getScheme(); + String userInfo = getPublishedUserInfoValue(connectorURI.getUserInfo()); + String host = getPublishedHostValue(connectorURI.getHost()); + int port = connectorURI.getPort(); + String path = getPublishedPathValue(connectorURI.getPath()); + String fragment = getPublishedFragmentValue(connectorURI.getFragment()); + + URI publishedURI = new URI(scheme, userInfo, host, port, path, getClusterClientUriQuery(), fragment); + + return publishedURI.toString(); + } + + /** + * Subclasses can override what host value is published by implementing alternate + * logic for this method. + * + * @param uriHostEntry + * @return + * @throws UnknownHostException + */ + protected String getPublishedHostValue(String uriHostEntry) throws UnknownHostException { + + // By default we just republish what was already present. + String result = uriHostEntry; + + if (this.publishedHostStrategy.equals(PublishedHostStrategy.IPADDRESS)) { + InetAddress address = InetAddress.getByName(uriHostEntry); + result = address.getHostAddress(); + } else if (this.publishedHostStrategy.equals(PublishedHostStrategy.HOSTNAME)) { + InetAddress address = InetAddress.getByName(uriHostEntry); + if (address.isAnyLocalAddress()) { + // make it more human readable and useful, an alternative to 0.0.0.0 + result = InetAddressUtil.getLocalHostName(); + } else { + result = address.getHostName(); + } + } else if (this.publishedHostStrategy.equals(PublishedHostStrategy.FQDN)) { + InetAddress address = InetAddress.getByName(uriHostEntry); + if (address.isAnyLocalAddress()) { + // make it more human readable and useful, an alternative to 0.0.0.0 + result = InetAddressUtil.getLocalHostName(); + } else { + result = address.getCanonicalHostName(); + } + } + + return result; + } + + /** + * Subclasses can override what path value is published by implementing alternate + * logic for this method. By default this method simply returns what was already + * set as the Path value in the original URI. + * + * @param uriPathEntry + * The original value of the URI path. + * + * @return the desired value for the published URI's path. + */ + protected String getPublishedPathValue(String uriPathEntry) { + return uriPathEntry; + } + + /** + * Subclasses can override what host value is published by implementing alternate + * logic for this method. By default this method simply returns what was already + * set as the Fragment value in the original URI. + * + * @param uriFragmentEntry + * The original value of the URI Fragment. + * + * @return the desired value for the published URI's Fragment. + */ + protected String getPublishedFragmentValue(String uriFragmentEntry) { + return uriFragmentEntry; + } + + /** + * Subclasses can override what user info value is published by implementing alternate + * logic for this method. By default this method simply returns what was already + * set as the UserInfo value in the original URI. + * + * @param uriUserInfoEntry + * The original value of the URI user info. + * + * @return the desired value for the published URI's user info. + */ + protected String getPublishedUserInfoValue(String uriUserInfoEntry) { + return uriUserInfoEntry; + } + + /** + * Gets the URI query that's configured on the published URI that's sent to client's + * when the cluster info is updated. + * + * @return the clusterClientUriQuery + */ + public String getClusterClientUriQuery() { + return clusterClientUriQuery; + } + + /** + * Sets the URI query that's configured on the published URI that's sent to client's + * when the cluster info is updated. + * + * @param clusterClientUriQuery the clusterClientUriQuery to set + */ + public void setClusterClientUriQuery(String clusterClientUriQuery) { + this.clusterClientUriQuery = clusterClientUriQuery; + } + + /** + * @return the publishedHostStrategy + */ + public PublishedHostStrategy getPublishedHostStrategy() { + return publishedHostStrategy; + } + + /** + * @param publishedHostStrategy the publishedHostStrategy to set + */ + public void setPublishedHostStrategy(PublishedHostStrategy strategy) { + this.publishedHostStrategy = strategy; + } + + /** + * @param publishedHostStrategy the publishedHostStrategy to set + */ + public void setPublishedHostStrategy(String strategy) { + this.publishedHostStrategy = PublishedHostStrategy.getValue(strategy); + } +} diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java index 2cfb7e0df9..7e3b218d30 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java @@ -33,7 +33,10 @@ import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.ConnectionControl; import org.apache.activemq.security.MessageAuthorizationPolicy; import org.apache.activemq.thread.TaskRunnerFactory; -import org.apache.activemq.transport.*; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportAcceptListener; +import org.apache.activemq.transport.TransportFactorySupport; +import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.discovery.DiscoveryAgent; import org.apache.activemq.transport.discovery.DiscoveryAgentFactory; import org.apache.activemq.util.ServiceStopper; @@ -70,6 +73,7 @@ public class TransportConnector implements Connector, BrokerServiceAware { private boolean auditNetworkProducers = false; private int maximumProducersAllowedPerConnection = Integer.MAX_VALUE; private int maximumConsumersAllowedPerConnection = Integer.MAX_VALUE; + private PublishedAddressPolicy publishedAddressPolicy = new PublishedAddressPolicy(); LinkedList peerBrokers = new LinkedList(); @@ -117,9 +121,11 @@ public class TransportConnector implements Connector, BrokerServiceAware { rc.setAuditNetworkProducers(isAuditNetworkProducers()); rc.setMaximumConsumersAllowedPerConnection(getMaximumConsumersAllowedPerConnection()); rc.setMaximumProducersAllowedPerConnection(getMaximumProducersAllowedPerConnection()); + rc.setPublishedAddressPolicy(getPublishedAddressPolicy()); return rc; } + @Override public BrokerInfo getBrokerInfo() { return brokerInfo; } @@ -172,6 +178,7 @@ public class TransportConnector implements Connector, BrokerServiceAware { /** * @return the statistics for this connector */ + @Override public ConnectorStatistics getStatistics() { return statistics; } @@ -188,6 +195,7 @@ public class TransportConnector implements Connector, BrokerServiceAware { this.messageAuthorizationPolicy = messageAuthorizationPolicy; } + @Override public void start() throws Exception { broker = brokerService.getBroker(); brokerInfo.setBrokerName(broker.getBrokerName()); @@ -196,9 +204,11 @@ public class TransportConnector implements Connector, BrokerServiceAware { brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration()); brokerInfo.setBrokerURL(broker.getBrokerService().getDefaultSocketURIString()); getServer().setAcceptListener(new TransportAcceptListener() { + @Override public void onAccept(final Transport transport) { try { brokerService.getTaskRunnerFactory().execute(new Runnable() { + @Override public void run() { try { Connection connection = createConnection(transport); @@ -217,6 +227,7 @@ public class TransportConnector implements Connector, BrokerServiceAware { } } + @Override public void onAcceptError(Exception error) { onAcceptError(error, null); } @@ -244,25 +255,14 @@ public class TransportConnector implements Connector, BrokerServiceAware { } public String getPublishableConnectString() throws Exception { - return getPublishableConnectString(getConnectUri()); - } - - public String getPublishableConnectString(URI theConnectURI) throws Exception { - String publishableConnectString = null; - if (theConnectURI != null) { - publishableConnectString = theConnectURI.toString(); - // strip off server side query parameters which may not be compatible to clients - if (theConnectURI.getRawQuery() != null) { - publishableConnectString = publishableConnectString.substring(0, publishableConnectString - .indexOf(theConnectURI.getRawQuery()) - 1); - } - } + String publishableConnectString = publishedAddressPolicy.getPublishableConnectString(this); if (LOG.isDebugEnabled()) { - LOG.debug("Publishing: " + publishableConnectString + " for broker transport URI: " + theConnectURI); + LOG.debug("Publishing: " + publishableConnectString + " for broker transport URI: " + getConnectUri()); } return publishableConnectString; } + @Override public void stop() throws Exception { ServiceStopper ss = new ServiceStopper(); if (discoveryAgent != null) { @@ -425,6 +425,7 @@ public class TransportConnector implements Connector, BrokerServiceAware { } } + @Override public void updateClientClusterInfo() { if (isRebalanceClusterClients() || isUpdateClusterClients()) { ConnectionControl control = getConnectionControl(); @@ -488,6 +489,7 @@ public class TransportConnector implements Connector, BrokerServiceAware { /** * This is called by the BrokerService right before it starts the transport. */ + @Override public void setBrokerService(BrokerService brokerService) { this.brokerService = brokerService; } @@ -503,6 +505,7 @@ public class TransportConnector implements Connector, BrokerServiceAware { /** * @return the updateClusterClients */ + @Override public boolean isUpdateClusterClients() { return this.updateClusterClients; } @@ -518,6 +521,7 @@ public class TransportConnector implements Connector, BrokerServiceAware { /** * @return the rebalanceClusterClients */ + @Override public boolean isRebalanceClusterClients() { return this.rebalanceClusterClients; } @@ -533,6 +537,7 @@ public class TransportConnector implements Connector, BrokerServiceAware { /** * @return the updateClusterClientsOnRemove */ + @Override public boolean isUpdateClusterClientsOnRemove() { return this.updateClusterClientsOnRemove; } @@ -559,6 +564,7 @@ public class TransportConnector implements Connector, BrokerServiceAware { this.updateClusterFilter = updateClusterFilter; } + @Override public int connectionCount() { return connections.size(); } @@ -591,4 +597,24 @@ public class TransportConnector implements Connector, BrokerServiceAware { public void setMaximumConsumersAllowedPerConnection(int maximumConsumersAllowedPerConnection) { this.maximumConsumersAllowedPerConnection = maximumConsumersAllowedPerConnection; } + + /** + * Gets the currently configured policy for creating the published connection address of this + * TransportConnector. + * + * @return the publishedAddressPolicy + */ + public PublishedAddressPolicy getPublishedAddressPolicy() { + return publishedAddressPolicy; + } + + /** + * Sets the configured policy for creating the published connection address of this + * TransportConnector. + * + * @return the publishedAddressPolicy + */ + public void setPublishedAddressPolicy(PublishedAddressPolicy publishedAddressPolicy) { + this.publishedAddressPolicy = publishedAddressPolicy; + } }