diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index fc88b4de27..b676dfbc34 100755 --- a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -29,7 +29,6 @@ import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.TopicSubscription; import org.apache.activemq.command.*; -import org.apache.activemq.network.NetworkBridge; import org.apache.activemq.security.SecurityContext; import org.apache.activemq.state.ProducerState; import org.apache.activemq.usage.Usage; @@ -484,8 +483,8 @@ public class AdvisoryBroker extends BrokerFilter { advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id); String url = getBrokerService().getVmConnectorURI().toString(); - if (getBrokerService().getDefaultSocketURI() != null) { - url = getBrokerService().getDefaultSocketURI().toString(); + if (getBrokerService().getDefaultSocketURIString() != null) { + url = getBrokerService().getDefaultSocketURIString(); } advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index 19da912e87..0fb7b1e0b1 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -152,7 +152,7 @@ public class BrokerService implements Service { private boolean deleteAllMessagesOnStartup; private boolean advisorySupport = true; private URI vmConnectorURI; - private URI defaultSocketURI; + private String defaultSocketURIString; private PolicyMap destinationPolicy; private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean stopped = new AtomicBoolean(false); @@ -1315,24 +1315,24 @@ public class BrokerService implements Service { this.vmConnectorURI = vmConnectorURI; } - public URI getDefaultSocketURI() { + public String getDefaultSocketURIString() { if (started.get()) { - if (this.defaultSocketURI==null) { + if (this.defaultSocketURIString ==null) { for (TransportConnector tc:this.transportConnectors) { - URI result = null; + String result = null; try { - result = tc.getConnectUri(); + result = tc.getPublishableConnectString(); } catch (Exception e) { LOG.warn("Failed to get the ConnectURI for "+tc,e); } if (result != null) { - this.defaultSocketURI=result; + this.defaultSocketURIString =result; break; } } } - return this.defaultSocketURI; + return this.defaultSocketURIString; } return null; } @@ -2076,8 +2076,8 @@ public class BrokerService implements Service { connector.setLocalUri(uri); connector.setBrokerName(getBrokerName()); connector.setDurableDestinations(durableDestinations); - if (getDefaultSocketURI() != null) { - connector.setBrokerURL(getDefaultSocketURI().toString()); + if (getDefaultSocketURIString() != null) { + connector.setBrokerURL(getDefaultSocketURIString()); } connector.start(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index a97d49a8b4..c1a949fd07 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -584,7 +584,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { ConnectionId connectionId = info.getSessionId().getParentId(); TransportConnectionState cs = lookupConnectionState(connectionId); // Avoid replaying dup commands - if (!cs.getSessionIds().contains(info.getSessionId())) { + if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) { broker.addSession(cs.getContext(), info); try { cs.addSession(info); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java index 7fa76ba35b..ea99a205ad 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java @@ -254,7 +254,7 @@ public class TransportConnector implements Connector, BrokerServiceAware { LOG.info("Connector " + getName() + " Started"); } - private String getPublishableConnectString() throws Exception { + public String getPublishableConnectString() throws Exception { URI theConnectURI = getConnectUri(); String publishableConnectString = theConnectURI.toString(); // strip off server side query parameters which may not be compatible to @@ -386,23 +386,26 @@ public class TransportConnector implements Connector, BrokerServiceAware { boolean rebalance = isRebalanceClusterClients(); String connectedBrokers = ""; String self = ""; - if (brokerService.getDefaultSocketURI() != null) { - self += brokerService.getDefaultSocketURI().toString(); - self += ","; - } - if (rebalance == false) { - connectedBrokers += self; - } - if (this.broker.getPeerBrokerInfos() != null) { - for (BrokerInfo info : this.broker.getPeerBrokerInfos()) { - if (isMatchesClusterFilter(info.getBrokerName())) { - connectedBrokers += info.getBrokerURL(); - connectedBrokers += ","; + + if (isUpdateClusterClients()) { + if (brokerService.getDefaultSocketURIString() != null) { + self += brokerService.getDefaultSocketURIString(); + self += ","; + } + if (rebalance == false) { + connectedBrokers += self; + } + if (this.broker.getPeerBrokerInfos() != null) { + for (BrokerInfo info : this.broker.getPeerBrokerInfos()) { + if (isMatchesClusterFilter(info.getBrokerName())) { + connectedBrokers += info.getBrokerURL(); + connectedBrokers += ","; + } } } - } - if (rebalance) { - connectedBrokers += self; + if (rebalance) { + connectedBrokers += self; + } } ConnectionControl control = new ConnectionControl(); diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java index 0707d9eba3..a866d7a70a 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java @@ -58,6 +58,21 @@ private final Listconnections = new ArrayList 1); } + + public void testClusterURIOptionsStrip() throws Exception{ + createClients(); + if (brokerB == null) { + // add in server side only url param, should not be propagated + brokerB = createBrokerB(BROKER_B_BIND_ADDRESS + "?transport.closeAsync=false"); + } + Thread.sleep(3000); + Set set = new HashSet(); + for (ActiveMQConnection c:connections) { + set.add(c.getTransportChannel().getRemoteAddress()); + } + assertTrue(set.size() > 1); + } + public void testClusterConnectedBeforeClients() throws Exception{ @@ -80,7 +95,7 @@ private final Listconnections = new ArrayListconnections = new ArrayListconnections = new ArrayList