diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index c7aca05953..91b9b8c1f3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -17,10 +17,16 @@ package org.apache.activemq.transport.failover; +import java.io.BufferedReader; +import java.io.FileNotFoundException; +import java.io.FileReader; import java.io.IOException; +import java.io.InputStreamReader; import java.io.InterruptedIOException; import java.net.InetAddress; +import java.net.MalformedURLException; import java.net.URI; +import java.net.URL; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; @@ -31,7 +37,6 @@ import java.util.Set; import java.util.StringTokenizer; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicReference; - import org.apache.activemq.broker.SslContext; import org.apache.activemq.command.Command; import org.apache.activemq.command.ConnectionControl; @@ -113,6 +118,8 @@ public class FailoverTransport implements CompositeTransport { private boolean reconnectSupported=true; // remember for reconnect thread private SslContext brokerSslContext; + private String updateURIsURL = null; + private boolean rebalanceUpdateURIs=true; public FailoverTransport() throws InterruptedIOException { brokerSslContext = SslContext.getCurrentSslContext(); @@ -257,12 +264,15 @@ public class FailoverTransport implements CompositeTransport { } } } - String connectedStr = control.getConnectedBrokers(); - if (connectedStr != null) { - connectedStr = connectedStr.trim(); - if (connectedStr.length() > 0 && isUpdateURIsSupported()) { + processNewTransports(control.isRebalanceConnection(), control.getConnectedBrokers()); + } + + private final void processNewTransports(boolean rebalance, String newTransports) { + if (newTransports != null) { + newTransports = newTransports.trim(); + if (newTransports.length() > 0 && isUpdateURIsSupported()) { List list = new ArrayList(); - StringTokenizer tokenizer = new StringTokenizer(connectedStr, ","); + StringTokenizer tokenizer = new StringTokenizer(newTransports, ","); while (tokenizer.hasMoreTokens()) { String str = tokenizer.nextToken(); try { @@ -274,9 +284,9 @@ public class FailoverTransport implements CompositeTransport { } if (list.isEmpty() == false) { try { - updateURIs(control.isRebalanceConnection(), list.toArray(new URI[list.size()])); + updateURIs(rebalance, list.toArray(new URI[list.size()])); } catch (IOException e) { - LOG.error("Failed to update transport URI's from: " + connectedStr, e); + LOG.error("Failed to update transport URI's from: " + newTransports, e); } } @@ -752,6 +762,40 @@ public class FailoverTransport implements CompositeTransport { Exception failure = null; synchronized (reconnectMutex) { + // If updateURIsURL is specified, read the file and add any new + // transport URI's to this FailOverTransport. + // Note: Could track file timestamp to avoid unnecessary reading. + String fileURL = getUpdateURIsURL(); + if (fileURL != null) { + BufferedReader in = null; + String newUris = null; + StringBuffer buffer = new StringBuffer(); + + try { + in = new BufferedReader(getURLStream(fileURL)); + while (true) { + String line = in.readLine(); + if (line == null) { + break; + } + buffer.append(line); + } + newUris = buffer.toString(); + } catch (IOException ioe) { + LOG.error("Failed to read updateURIsURL: " + fileURL, ioe); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException ioe) { + // ignore + } + } + } + + processNewTransports(isRebalanceUpdateURIs(), newUris); + } + if (disposed || connectionFailure != null) { reconnectMutex.notifyAll(); } @@ -1006,6 +1050,34 @@ public class FailoverTransport implements CompositeTransport { } } } + + /** + * @return the updateURIsURL + */ + public String getUpdateURIsURL() { + return this.updateURIsURL; + } + + /** + * @param updateURIsURL the updateURIsURL to set + */ + public void setUpdateURIsURL(String updateURIsURL) { + this.updateURIsURL = updateURIsURL; + } + + /** + * @return the rebalanceUpdateURIs + */ + public boolean isRebalanceUpdateURIs() { + return this.rebalanceUpdateURIs; + } + + /** + * @param rebalanceUpdateURIs the rebalanceUpdateURIs to set + */ + public void setRebalanceUpdateURIs(boolean rebalanceUpdateURIs) { + this.rebalanceUpdateURIs = rebalanceUpdateURIs; + } public int getReceiveCounter() { Transport transport = connectedTransport.get(); @@ -1045,4 +1117,19 @@ public class FailoverTransport implements CompositeTransport { } return result; } + + private InputStreamReader getURLStream(String path) throws IOException { + InputStreamReader result = null; + URL url = null; + try { + url = new URL(path); + result = new InputStreamReader(url.openStream()); + } catch (MalformedURLException e) { + // ignore - it could be a path to a a local file + } + if (result == null) { + result = new FileReader(path); + } + return result; + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java new file mode 100644 index 0000000000..e4153019f9 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.failover; + +import java.io.File; +import java.io.FileOutputStream; +import java.net.URI; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; + +public class FailoverUpdateURIsTest extends TestCase { + + private static final String QUEUE_NAME = "test.failoverupdateuris"; + + public void testUpdateURIs() throws Exception { + + long timeout = 1000; + URI firstTcpUri = new URI("tcp://localhost:61616"); + URI secondTcpUri = new URI("tcp://localhost:61626"); + String targetDir = "target/" + getName(); + new File(targetDir).mkdir(); + File updateFile = new File(targetDir + "/updateURIsFile.txt"); + System.out.println(updateFile); + System.out.println(updateFile.toURI()); + System.out.println(updateFile.getAbsoluteFile()); + System.out.println(updateFile.getAbsoluteFile().toURI()); + FileOutputStream out = new FileOutputStream(updateFile); + out.write(firstTcpUri.toString().getBytes()); + out.close(); + + BrokerService bs1 = new BrokerService(); + bs1.setUseJmx(false); + bs1.addConnector(firstTcpUri); + bs1.start(); + + // no failover uri's to start with, must be read from file... + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:()?updateURIsURL=file:///" + updateFile.getAbsoluteFile()); + Connection connection = cf.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue theQueue = session.createQueue(QUEUE_NAME); + MessageProducer producer = session.createProducer(theQueue); + MessageConsumer consumer = session.createConsumer(theQueue); + Message message = session.createTextMessage("Test message"); + producer.send(message); + Message msg = consumer.receive(2000); + assertNotNull(msg); + + bs1.stop(); + bs1.waitUntilStopped(); + + BrokerService bs2 = new BrokerService(); + bs2.setUseJmx(false); + bs2.addConnector(secondTcpUri); + bs2.start(); + + // add the transport uri for broker number 2 + out = new FileOutputStream(updateFile, true); + out.write(",".getBytes()); + out.write(secondTcpUri.toString().getBytes()); + out.close(); + + producer.send(message); + msg = consumer.receive(2000); + assertNotNull(msg); + } + +}