ARTEMIS-1094 replica + group-name fix

When a replica attempts to connect to a live server using a group-name
and there are > 1 servers on the network using that group there is a
chance it will fail because it doesn't keep track of all of the
topology data it receives. This fix ensures that all the topology data
from the cluster tracked until it is used and fails at which point it
is discarded.
This commit is contained in:
Justin Bertram 2017-04-05 15:12:23 -05:00 committed by Clebert Suconic
parent f88311b04b
commit 4a57aecbbf
1 changed files with 11 additions and 6 deletions

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.activemq.artemis.core.server.impl; package org.apache.activemq.artemis.core.server.impl;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -38,7 +40,7 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator {
private final Lock lock = new ReentrantLock(); private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition(); private final Condition condition = lock.newCondition();
private final String backupGroupName; private final String backupGroupName;
private Pair<TransportConfiguration, TransportConfiguration> liveConfiguration; private Queue<Pair<TransportConfiguration, TransportConfiguration>> liveConfigurations = new LinkedList<>();
private String nodeID; private String nodeID;
@ -56,12 +58,12 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator {
public void locateNode(long timeout) throws ActiveMQException { public void locateNode(long timeout) throws ActiveMQException {
try { try {
lock.lock(); lock.lock();
if (liveConfiguration == null) { if (liveConfigurations.size() == 0) {
try { try {
if (timeout != -1L) { if (timeout != -1L) {
ConcurrentUtil.await(condition, timeout); ConcurrentUtil.await(condition, timeout);
} else { } else {
while (liveConfiguration == null) { while (liveConfigurations.size() == 0) {
condition.await(); condition.await();
} }
} }
@ -79,7 +81,10 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator {
try { try {
lock.lock(); lock.lock();
if (backupGroupName.equals(topologyMember.getBackupGroupName()) && topologyMember.getLive() != null) { if (backupGroupName.equals(topologyMember.getBackupGroupName()) && topologyMember.getLive() != null) {
liveConfiguration = new Pair<>(topologyMember.getLive(), topologyMember.getBackup()); Pair<TransportConfiguration, TransportConfiguration> liveConfiguration = new Pair<>(topologyMember.getLive(), topologyMember.getBackup());
if (!liveConfigurations.contains(liveConfiguration)) {
liveConfigurations.add(liveConfiguration);
}
nodeID = topologyMember.getNodeId(); nodeID = topologyMember.getNodeId();
condition.signal(); condition.signal();
} }
@ -100,14 +105,14 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator {
@Override @Override
public Pair<TransportConfiguration, TransportConfiguration> getLiveConfiguration() { public Pair<TransportConfiguration, TransportConfiguration> getLiveConfiguration() {
return liveConfiguration; return liveConfigurations.peek();
} }
@Override @Override
public void notifyRegistrationFailed(boolean alreadyReplicating) { public void notifyRegistrationFailed(boolean alreadyReplicating) {
try { try {
lock.lock(); lock.lock();
liveConfiguration = null; liveConfigurations.poll();
super.notifyRegistrationFailed(alreadyReplicating); super.notifyRegistrationFailed(alreadyReplicating);
} finally { } finally {
lock.unlock(); lock.unlock();