diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index dcb0867095..134b325529 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -40,13 +40,12 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.activemq.broker.SslContext; import org.apache.activemq.command.Command; import org.apache.activemq.command.ConnectionControl; -import org.apache.activemq.command.ConsumerControl; import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConsumerControl; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.command.Response; - import org.apache.activemq.state.ConnectionStateTracker; import org.apache.activemq.state.Tracked; import org.apache.activemq.thread.Task; @@ -270,7 +269,7 @@ public class FailoverTransport implements CompositeTransport { if (canReconnect()) { reconnectOk = true; } - LOG.warn("Transport (" + connectedTransportURI + ") failed" + LOG.warn("Transport (" + connectedTransportURI + ") failed" + (reconnectOk ? "," : ", not") + " attempting to automatically reconnect", e); failedConnectTransportURI = connectedTransportURI; @@ -840,7 +839,6 @@ public class FailoverTransport implements CompositeTransport { @Override public T narrow(Class target) { - if (target.isAssignableFrom(getClass())) { return target.cast(this); } @@ -849,7 +847,6 @@ public class FailoverTransport implements CompositeTransport { return transport.narrow(target); } return null; - } protected void restoreTransport(Transport t) throws Exception, IOException { @@ -1167,7 +1164,7 @@ public class FailoverTransport implements CompositeTransport { } /* - * called with reconnectMutex held + * called with reconnectMutex held */ private void propagateFailureToExceptionListener(Exception exception) { if (transportListener != null) { @@ -1259,9 +1256,17 @@ public class FailoverTransport implements CompositeTransport { } if (!priorityList.isEmpty()) { - return priorityList.contains(uri); + for (URI priorityURI : priorityList) { + if (compareURIs(priorityURI, uri)) { + return true; + } + } + + } else if (!uris.isEmpty()) { + return compareURIs(uris.get(0), uri); } - return uris.indexOf(uri) == 0; + + return false; } @Override @@ -1370,6 +1375,10 @@ public class FailoverTransport implements CompositeTransport { return stateTracker; } + public boolean isConnectedToPriority() { + return connectedToPriority; + } + private boolean contains(URI newURI) { boolean result = false; for (URI uri : uris) { @@ -1468,5 +1477,4 @@ public class FailoverTransport implements CompositeTransport { public void setWarnAfterReconnectAttempts(int warnAfterReconnectAttempts) { this.warnAfterReconnectAttempts = warnAfterReconnectAttempts; } - } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5336Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5336Test.java new file mode 100644 index 0000000000..7886c6e275 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5336Test.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.transport.failover.FailoverTransport; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Test for priority backup URI handling. + */ +public class AMQ5336Test { + + private BrokerService brokerService; + private String connectionUri; + + @Before + public void before() throws Exception { + brokerService = new BrokerService(); + TransportConnector connector = brokerService.addConnector("tcp://localhost:0"); + connectionUri = connector.getPublishableConnectString(); + brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.start(); + brokerService.waitUntilStarted(); + } + + @After + public void after() throws Exception { + if (brokerService != null) { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + } + + @Test + public void test() throws Exception { + String uri = "failover:(" + connectionUri + ")" + + "?randomize=false&" + + "nested.socket.tcpNoDelay=true&" + + "priorityBackup=true&" + + "priorityURIs=" + connectionUri + "&" + + "initialReconnectDelay=1000&" + + "useExponentialBackOff=false"; + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(uri); + ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); + connection.start(); + + FailoverTransport failover = connection.getTransport().narrow(FailoverTransport.class); + assertNotNull(failover); + assertTrue(failover.isConnectedToPriority()); + + connection.close(); + } +}