This closes #1175
This commit is contained in:
commit
7d9ae1a83e
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.server.impl;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
@ -38,7 +40,7 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator {
|
|||
private final Lock lock = new ReentrantLock();
|
||||
private final Condition condition = lock.newCondition();
|
||||
private final String backupGroupName;
|
||||
private Pair<TransportConfiguration, TransportConfiguration> liveConfiguration;
|
||||
private Queue<Pair<TransportConfiguration, TransportConfiguration>> liveConfigurations = new LinkedList<>();
|
||||
|
||||
private String nodeID;
|
||||
|
||||
|
@ -56,12 +58,12 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator {
|
|||
public void locateNode(long timeout) throws ActiveMQException {
|
||||
try {
|
||||
lock.lock();
|
||||
if (liveConfiguration == null) {
|
||||
if (liveConfigurations.size() == 0) {
|
||||
try {
|
||||
if (timeout != -1L) {
|
||||
ConcurrentUtil.await(condition, timeout);
|
||||
} else {
|
||||
while (liveConfiguration == null) {
|
||||
while (liveConfigurations.size() == 0) {
|
||||
condition.await();
|
||||
}
|
||||
}
|
||||
|
@ -79,7 +81,10 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator {
|
|||
try {
|
||||
lock.lock();
|
||||
if (backupGroupName.equals(topologyMember.getBackupGroupName()) && topologyMember.getLive() != null) {
|
||||
liveConfiguration = new Pair<>(topologyMember.getLive(), topologyMember.getBackup());
|
||||
Pair<TransportConfiguration, TransportConfiguration> liveConfiguration = new Pair<>(topologyMember.getLive(), topologyMember.getBackup());
|
||||
if (!liveConfigurations.contains(liveConfiguration)) {
|
||||
liveConfigurations.add(liveConfiguration);
|
||||
}
|
||||
nodeID = topologyMember.getNodeId();
|
||||
condition.signal();
|
||||
}
|
||||
|
@ -100,14 +105,14 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator {
|
|||
|
||||
@Override
|
||||
public Pair<TransportConfiguration, TransportConfiguration> getLiveConfiguration() {
|
||||
return liveConfiguration;
|
||||
return liveConfigurations.peek();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyRegistrationFailed(boolean alreadyReplicating) {
|
||||
try {
|
||||
lock.lock();
|
||||
liveConfiguration = null;
|
||||
liveConfigurations.poll();
|
||||
super.notifyRegistrationFailed(alreadyReplicating);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
|
|
Loading…
Reference in New Issue