Partial fix, perform a proper URI comparison to avoid an unnecessary reconnect bounce.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1478083 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-05-01 16:33:57 +00:00
parent d52e5910a5
commit fbac51c8f8
1 changed files with 63 additions and 28 deletions

View File

@ -122,7 +122,7 @@ public class FailoverTransport implements CompositeTransport {
private boolean connectedToPriority = false; private boolean connectedToPriority = false;
private boolean priorityBackup = false; private boolean priorityBackup = false;
private ArrayList<URI> priorityList = new ArrayList<URI>(); private final ArrayList<URI> priorityList = new ArrayList<URI>();
private boolean priorityBackupAvailable = false; private boolean priorityBackupAvailable = false;
public FailoverTransport() throws InterruptedIOException { public FailoverTransport() throws InterruptedIOException {
@ -132,6 +132,7 @@ public class FailoverTransport implements CompositeTransport {
reconnectTaskFactory = new TaskRunnerFactory(); reconnectTaskFactory = new TaskRunnerFactory();
reconnectTaskFactory.init(); reconnectTaskFactory.init();
reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() { reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() {
@Override
public boolean iterate() { public boolean iterate() {
boolean result = false; boolean result = false;
if (!started) { if (!started) {
@ -178,6 +179,7 @@ public class FailoverTransport implements CompositeTransport {
TransportListener createTransportListener() { TransportListener createTransportListener() {
return new TransportListener() { return new TransportListener() {
@Override
public void onCommand(Object o) { public void onCommand(Object o) {
Command command = (Command) o; Command command = (Command) o;
if (command == null) { if (command == null) {
@ -204,6 +206,7 @@ public class FailoverTransport implements CompositeTransport {
} }
} }
@Override
public void onException(IOException error) { public void onException(IOException error) {
try { try {
handleTransportFailure(error); handleTransportFailure(error);
@ -213,12 +216,14 @@ public class FailoverTransport implements CompositeTransport {
} }
} }
@Override
public void transportInterupted() { public void transportInterupted() {
if (transportListener != null) { if (transportListener != null) {
transportListener.transportInterupted(); transportListener.transportInterupted();
} }
} }
@Override
public void transportResumed() { public void transportResumed() {
if (transportListener != null) { if (transportListener != null) {
transportListener.transportResumed(); transportListener.transportResumed();
@ -324,6 +329,7 @@ public class FailoverTransport implements CompositeTransport {
} }
} }
@Override
public void start() throws Exception { public void start() throws Exception {
synchronized (reconnectMutex) { synchronized (reconnectMutex) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -344,6 +350,7 @@ public class FailoverTransport implements CompositeTransport {
} }
} }
@Override
public void stop() throws Exception { public void stop() throws Exception {
Transport transportToStop = null; Transport transportToStop = null;
List<Transport> backupsToStop = new ArrayList<Transport>(backups.size()); List<Transport> backupsToStop = new ArrayList<Transport>(backups.size());
@ -541,6 +548,7 @@ public class FailoverTransport implements CompositeTransport {
} }
} }
@Override
public void oneway(Object o) throws IOException { public void oneway(Object o) throws IOException {
Command command = (Command) o; Command command = (Command) o;
@ -692,18 +700,22 @@ public class FailoverTransport implements CompositeTransport {
} }
} }
@Override
public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
throw new AssertionError("Unsupported Method"); throw new AssertionError("Unsupported Method");
} }
@Override
public Object request(Object command) throws IOException { public Object request(Object command) throws IOException {
throw new AssertionError("Unsupported Method"); throw new AssertionError("Unsupported Method");
} }
@Override
public Object request(Object command, int timeout) throws IOException { public Object request(Object command, int timeout) throws IOException {
throw new AssertionError("Unsupported Method"); throw new AssertionError("Unsupported Method");
} }
@Override
public void add(boolean rebalance, URI u[]) { public void add(boolean rebalance, URI u[]) {
boolean newURI = false; boolean newURI = false;
for (URI uri : u) { for (URI uri : u) {
@ -717,6 +729,7 @@ public class FailoverTransport implements CompositeTransport {
} }
} }
@Override
public void remove(boolean rebalance, URI u[]) { public void remove(boolean rebalance, URI u[]) {
for (URI uri : u) { for (URI uri : u) {
uris.remove(uri); uris.remove(uri);
@ -782,10 +795,12 @@ public class FailoverTransport implements CompositeTransport {
return l; return l;
} }
@Override
public TransportListener getTransportListener() { public TransportListener getTransportListener() {
return transportListener; return transportListener;
} }
@Override
public void setTransportListener(TransportListener commandListener) { public void setTransportListener(TransportListener commandListener) {
synchronized (listenerMutex) { synchronized (listenerMutex) {
this.transportListener = commandListener; this.transportListener = commandListener;
@ -793,6 +808,7 @@ public class FailoverTransport implements CompositeTransport {
} }
} }
@Override
public <T> T narrow(Class<T> target) { public <T> T narrow(Class<T> target) {
if (target.isAssignableFrom(getClass())) { if (target.isAssignableFrom(getClass())) {
@ -838,6 +854,7 @@ public class FailoverTransport implements CompositeTransport {
return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString(); return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString();
} }
@Override
public String getRemoteAddress() { public String getRemoteAddress() {
Transport transport = connectedTransport.get(); Transport transport = connectedTransport.get();
if (transport != null) { if (transport != null) {
@ -846,6 +863,7 @@ public class FailoverTransport implements CompositeTransport {
return null; return null;
} }
@Override
public boolean isFaultTolerant() { public boolean isFaultTolerant() {
return true; return true;
} }
@ -904,7 +922,7 @@ public class FailoverTransport implements CompositeTransport {
failure = new IOException("No uris available to connect to."); failure = new IOException("No uris available to connect to.");
} else { } else {
if (doRebalance) { if (doRebalance) {
if (connectList.get(0).equals(connectedTransportURI)) { if (compareURIs(connectList.get(0), connectedTransportURI)) {
// already connected to first in the list, no need to rebalance // already connected to first in the list, no need to rebalance
doRebalance = false; doRebalance = false;
return false; return false;
@ -1189,18 +1207,22 @@ public class FailoverTransport implements CompositeTransport {
return uris.indexOf(uri) == 0; return uris.indexOf(uri) == 0;
} }
@Override
public boolean isDisposed() { public boolean isDisposed() {
return disposed; return disposed;
} }
@Override
public boolean isConnected() { public boolean isConnected() {
return connected; return connected;
} }
@Override
public void reconnect(URI uri) throws IOException { public void reconnect(URI uri) throws IOException {
add(true, new URI[]{uri}); add(true, new URI[]{uri});
} }
@Override
public boolean isReconnectSupported() { public boolean isReconnectSupported() {
return this.reconnectSupported; return this.reconnectSupported;
} }
@ -1209,6 +1231,7 @@ public class FailoverTransport implements CompositeTransport {
this.reconnectSupported = value; this.reconnectSupported = value;
} }
@Override
public boolean isUpdateURIsSupported() { public boolean isUpdateURIsSupported() {
return this.updateURIsSupported; return this.updateURIsSupported;
} }
@ -1217,6 +1240,7 @@ public class FailoverTransport implements CompositeTransport {
this.updateURIsSupported = value; this.updateURIsSupported = value;
} }
@Override
public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException { public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
if (isUpdateURIsSupported()) { if (isUpdateURIsSupported()) {
HashSet<URI> copy = new HashSet<URI>(this.updated); HashSet<URI> copy = new HashSet<URI>(this.updated);
@ -1265,6 +1289,7 @@ public class FailoverTransport implements CompositeTransport {
this.rebalanceUpdateURIs = rebalanceUpdateURIs; this.rebalanceUpdateURIs = rebalanceUpdateURIs;
} }
@Override
public int getReceiveCounter() { public int getReceiveCounter() {
Transport transport = connectedTransport.get(); Transport transport = connectedTransport.get();
if (transport == null) { if (transport == null) {
@ -1290,38 +1315,48 @@ public class FailoverTransport implements CompositeTransport {
private boolean contains(URI newURI) { private boolean contains(URI newURI) {
boolean result = false; boolean result = false;
for (URI uri : uris) { for (URI uri : uris) {
if (newURI.getPort() == uri.getPort()) { if (compareURIs(newURI, uri)) {
InetAddress newAddr = null;
InetAddress addr = null;
try {
newAddr = InetAddress.getByName(newURI.getHost());
addr = InetAddress.getByName(uri.getHost());
} catch(IOException e) {
if (newAddr == null) {
LOG.error("Failed to Lookup INetAddress for URI[ " + newURI + " ] : " + e);
} else {
LOG.error("Failed to Lookup INetAddress for URI[ " + uri + " ] : " + e);
}
if (newURI.getHost().equalsIgnoreCase(uri.getHost())) {
result = true; result = true;
break; break;
} else {
continue;
}
}
if (addr.equals(newAddr)) {
result = true;
break;
}
} }
} }
return result; return result;
} }
private boolean compareURIs(final URI first, final URI second) {
if (first == null || second == null) {
return false;
}
if (first.getPort() == second.getPort()) {
InetAddress firstAddr = null;
InetAddress secondAddr = null;
try {
firstAddr = InetAddress.getByName(first.getHost());
secondAddr = InetAddress.getByName(second.getHost());
} catch(IOException e) {
if (firstAddr == null) {
LOG.error("Failed to Lookup INetAddress for URI[ " + firstAddr + " ] : " + e);
} else {
LOG.error("Failed to Lookup INetAddress for URI[ " + secondAddr + " ] : " + e);
}
if (first.getHost().equalsIgnoreCase(second.getHost())) {
return true;
}
}
if (firstAddr.equals(secondAddr)) {
return true;
}
}
return false;
}
private InputStreamReader getURLStream(String path) throws IOException { private InputStreamReader getURLStream(String path) throws IOException {
InputStreamReader result = null; InputStreamReader result = null;
URL url = null; URL url = null;