diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 49db5e2c11..64d407d537 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -1156,9 +1156,13 @@ public class FailoverTransport implements CompositeTransport { return maxReconnectValue; } + private boolean shouldBuildBackups() { + return (backup && backups.size() < backupPoolSize) || (priorityBackup && !(priorityBackupAvailable || connectedToPriority)); + } + final boolean buildBackups() { synchronized (backupMutex) { - if (!disposed && (backup || priorityBackup) && backups.size() < backupPoolSize) { + if (!disposed && shouldBuildBackups()) { ArrayList backupList = new ArrayList(priorityList); List connectList = getConnectList(); for (URI uri: connectList) { @@ -1175,7 +1179,7 @@ public class FailoverTransport implements CompositeTransport { } backups.removeAll(disposedList); disposedList.clear(); - for (Iterator iter = backupList.iterator(); !disposed && iter.hasNext() && backups.size() < backupPoolSize; ) { + for (Iterator iter = backupList.iterator(); !disposed && iter.hasNext() && shouldBuildBackups(); ) { URI uri = iter.next(); if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) { try { @@ -1190,6 +1194,17 @@ public class FailoverTransport implements CompositeTransport { if (priorityBackup && isPriority(uri)) { priorityBackupAvailable = true; backups.add(0, bt); + // if this priority backup overflows the pool + // remove the backup with the lowest priority + if (backups.size() > backupPoolSize) { + BackupTransport disposeTransport = backups.remove(backups.size() - 1); + disposeTransport.setDisposed(true); + Transport transport = disposeTransport.getTransport(); + if (transport != null) { + transport.setTransportListener(disposedListener); + disposeTransport(transport); + } + } } else { backups.add(bt); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java index 0176a8fad3..bed51830d0 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java @@ -115,7 +115,7 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport { getBroker(BROKER_C_NAME).waitUntilStarted(); Thread.sleep(1000); - setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + "," + BROKER_C_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false&backupPoolSize=2"); + setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + "," + BROKER_C_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false"); createClients(5);