mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-4461 - priority backup should not be restricted by the pool size
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1486869 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c70d7735c0
commit
9490793f2c
|
@ -1156,9 +1156,13 @@ public class FailoverTransport implements CompositeTransport {
|
||||||
return maxReconnectValue;
|
return maxReconnectValue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean shouldBuildBackups() {
|
||||||
|
return (backup && backups.size() < backupPoolSize) || (priorityBackup && !(priorityBackupAvailable || connectedToPriority));
|
||||||
|
}
|
||||||
|
|
||||||
final boolean buildBackups() {
|
final boolean buildBackups() {
|
||||||
synchronized (backupMutex) {
|
synchronized (backupMutex) {
|
||||||
if (!disposed && (backup || priorityBackup) && backups.size() < backupPoolSize) {
|
if (!disposed && shouldBuildBackups()) {
|
||||||
ArrayList<URI> backupList = new ArrayList<URI>(priorityList);
|
ArrayList<URI> backupList = new ArrayList<URI>(priorityList);
|
||||||
List<URI> connectList = getConnectList();
|
List<URI> connectList = getConnectList();
|
||||||
for (URI uri: connectList) {
|
for (URI uri: connectList) {
|
||||||
|
@ -1175,7 +1179,7 @@ public class FailoverTransport implements CompositeTransport {
|
||||||
}
|
}
|
||||||
backups.removeAll(disposedList);
|
backups.removeAll(disposedList);
|
||||||
disposedList.clear();
|
disposedList.clear();
|
||||||
for (Iterator<URI> iter = backupList.iterator(); !disposed && iter.hasNext() && backups.size() < backupPoolSize; ) {
|
for (Iterator<URI> iter = backupList.iterator(); !disposed && iter.hasNext() && shouldBuildBackups(); ) {
|
||||||
URI uri = iter.next();
|
URI uri = iter.next();
|
||||||
if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
|
if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
|
||||||
try {
|
try {
|
||||||
|
@ -1190,6 +1194,17 @@ public class FailoverTransport implements CompositeTransport {
|
||||||
if (priorityBackup && isPriority(uri)) {
|
if (priorityBackup && isPriority(uri)) {
|
||||||
priorityBackupAvailable = true;
|
priorityBackupAvailable = true;
|
||||||
backups.add(0, bt);
|
backups.add(0, bt);
|
||||||
|
// if this priority backup overflows the pool
|
||||||
|
// remove the backup with the lowest priority
|
||||||
|
if (backups.size() > backupPoolSize) {
|
||||||
|
BackupTransport disposeTransport = backups.remove(backups.size() - 1);
|
||||||
|
disposeTransport.setDisposed(true);
|
||||||
|
Transport transport = disposeTransport.getTransport();
|
||||||
|
if (transport != null) {
|
||||||
|
transport.setTransportListener(disposedListener);
|
||||||
|
disposeTransport(transport);
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
backups.add(bt);
|
backups.add(bt);
|
||||||
}
|
}
|
||||||
|
|
|
@ -115,7 +115,7 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport {
|
||||||
getBroker(BROKER_C_NAME).waitUntilStarted();
|
getBroker(BROKER_C_NAME).waitUntilStarted();
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
|
|
||||||
setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + "," + BROKER_C_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false&backupPoolSize=2");
|
setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + "," + BROKER_C_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false");
|
||||||
|
|
||||||
createClients(5);
|
createClients(5);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue