ARTEMIS-1565 - replica should retry quorum vote

https://issues.apache.org/jira/browse/ARTEMIS-1565
This commit is contained in:
Andy Taylor 2017-12-19 11:14:50 +00:00
parent d1c9bc0f2d
commit 6067a285bd
13 changed files with 216 additions and 20 deletions

View File

@ -485,6 +485,12 @@ public final class ActiveMQDefaultConfiguration {
public static boolean DEFAULT_VOTE_ON_REPLICATION_FAILURE = false;
//how many times we retry a vote before restarting as a backup
private static int DEFAULT_VOTE_RETRIES = 12;
//how long we wait between votes, 5 secs
private static long DEFAULT_VOTE_RETRY_WAIT = 5000;
public static int DEFAULT_QUORUM_SIZE = -1;
public static final boolean DEFAULT_ANALYZE_CRITICAL = true;
@ -1334,4 +1340,11 @@ public final class ActiveMQDefaultConfiguration {
}
public static int getDefaultVoteRetries() {
return DEFAULT_VOTE_RETRIES;
}
public static long getDefaultVoteRetryWait() {
return DEFAULT_VOTE_RETRY_WAIT;
}
}

View File

@ -72,11 +72,11 @@ public final class ConfigurationUtils {
}
case REPLICATED: {
ReplicatedPolicyConfiguration pc = (ReplicatedPolicyConfiguration) conf;
return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout(), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize());
return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout(), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait());
}
case REPLICA: {
ReplicaPolicyConfiguration pc = (ReplicaPolicyConfiguration) conf;
return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(), getScaleDownPolicy(pc.getScaleDownConfiguration()), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize());
return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(), getScaleDownPolicy(pc.getScaleDownConfiguration()), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait());
}
case SHARED_STORE_MASTER: {
SharedStoreMasterPolicyConfiguration pc = (SharedStoreMasterPolicyConfiguration) conf;

View File

@ -43,6 +43,10 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration {
private int quorumSize = ActiveMQDefaultConfiguration.getDefaultQuorumSize();
private int voteRetries = ActiveMQDefaultConfiguration.getDefaultVoteRetries();
private long voteRetryWait = ActiveMQDefaultConfiguration.getDefaultVoteRetryWait();
public ReplicaPolicyConfiguration() {
}
@ -139,4 +143,20 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration {
public void setQuorumSize(int quorumSize) {
this.quorumSize = quorumSize;
}
public int getVoteRetries() {
return voteRetries;
}
public void setVoteRetries(int voteRetries) {
this.voteRetries = voteRetries;
}
public void setVoteRetryWait(long voteRetryWait) {
this.voteRetryWait = voteRetryWait;
}
public long getVoteRetryWait() {
return voteRetryWait;
}
}

View File

@ -33,6 +33,10 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration {
private int quorumSize = ActiveMQDefaultConfiguration.getDefaultQuorumSize();
private int voteRetries = ActiveMQDefaultConfiguration.getDefaultVoteRetries();
private long voteRetryWait = ActiveMQDefaultConfiguration.getDefaultVoteRetryWait();
public ReplicatedPolicyConfiguration() {
}
@ -91,4 +95,21 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration {
public void setQuorumSize(int quorumSize) {
this.quorumSize = quorumSize;
}
public int getVoteRetries() {
return voteRetries;
}
public void setVoteRetries(int voteRetries) {
this.voteRetries = voteRetries;
}
public void setVoteRetryWait(long voteRetryWait) {
this.voteRetryWait = voteRetryWait;
}
public long getVoteRetryWait() {
return voteRetryWait;
}
}

View File

@ -1284,6 +1284,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
configuration.setVoteOnReplicationFailure(getBoolean(policyNode, "vote-on-replication-failure", configuration.getVoteOnReplicationFailure()));
configuration.setVoteRetries(getInteger(policyNode, "vote-retries", configuration.getVoteRetries(), Validators.MINUS_ONE_OR_GE_ZERO));
configuration.setVoteRetryWait(getLong(policyNode, "vote-retry-wait", configuration.getVoteRetryWait(), Validators.GT_ZERO));
configuration.setQuorumSize(getInteger(policyNode, "quorum-size", configuration.getQuorumSize(), Validators.MINUS_ONE_OR_GT_ZERO));
return configuration;
@ -1308,6 +1312,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
configuration.setVoteOnReplicationFailure(getBoolean(policyNode, "vote-on-replication-failure", configuration.getVoteOnReplicationFailure()));
configuration.setVoteRetries(getInteger(policyNode, "vote-retries", configuration.getVoteRetries(), Validators.MINUS_ONE_OR_GE_ZERO));
configuration.setVoteRetryWait(getLong(policyNode, "vote-retry-wait", configuration.getVoteRetryWait(), Validators.GT_ZERO));
configuration.setQuorumSize(getInteger(policyNode, "quorum-size", configuration.getQuorumSize(), Validators.MINUS_ONE_OR_GT_ZERO));
return configuration;

View File

@ -53,6 +53,10 @@ public class ReplicaPolicy extends BackupPolicy {
private final NetworkHealthCheck networkHealthCheck;
private int voteRetries;
private long voteRetryWait;
public ReplicaPolicy(final NetworkHealthCheck networkHealthCheck) {
this.networkHealthCheck = networkHealthCheck;
}
@ -72,7 +76,9 @@ public class ReplicaPolicy extends BackupPolicy {
ScaleDownPolicy scaleDownPolicy,
NetworkHealthCheck networkHealthCheck,
boolean voteOnReplicationFailure,
int quorumSize) {
int quorumSize,
int voteRetries,
long voteRetryWait) {
this.clusterName = clusterName;
this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize;
this.groupName = groupName;
@ -80,6 +86,8 @@ public class ReplicaPolicy extends BackupPolicy {
this.allowFailback = allowFailback;
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
this.quorumSize = quorumSize;
this.voteRetries = voteRetries;
this.voteRetryWait = voteRetryWait;
this.scaleDownPolicy = scaleDownPolicy;
this.networkHealthCheck = networkHealthCheck;
this.voteOnReplicationFailure = voteOnReplicationFailure;
@ -115,7 +123,7 @@ public class ReplicaPolicy extends BackupPolicy {
public ReplicatedPolicy getReplicatedPolicy() {
if (replicatedPolicy == null) {
replicatedPolicy = new ReplicatedPolicy(false, allowFailback, initialReplicationSyncTimeout, groupName, clusterName, this, networkHealthCheck, voteOnReplicationFailure, quorumSize);
replicatedPolicy = new ReplicatedPolicy(false, allowFailback, initialReplicationSyncTimeout, groupName, clusterName, this, networkHealthCheck, voteOnReplicationFailure, quorumSize, voteRetries, voteRetryWait);
}
return replicatedPolicy;
}
@ -210,4 +218,20 @@ public class ReplicaPolicy extends BackupPolicy {
public boolean isVoteOnReplicationFailure() {
return voteOnReplicationFailure;
}
public void setVoteRetries(int voteRetries) {
this.voteRetries = voteRetries;
}
public void setVoteRetryWait(long voteRetryWait) {
this.voteRetryWait = voteRetryWait;
}
public int getVoteRetries() {
return voteRetries;
}
public long getVoteRetryWait() {
return voteRetryWait;
}
}

View File

@ -50,6 +50,10 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
* */
private int quorumSize;
private int voteRetries;
private long voteRetryWait;
/*
* this are only used as the policy when the server is started as a live after a failover
* */
@ -68,7 +72,9 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
long initialReplicationSyncTimeout,
NetworkHealthCheck networkHealthCheck,
boolean voteOnReplicationFailure,
int quorumSize) {
int quorumSize,
int voteRetries,
long voteRetryWait) {
this.checkForLiveServer = checkForLiveServer;
this.groupName = groupName;
this.clusterName = clusterName;
@ -76,6 +82,8 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
this.networkHealthCheck = networkHealthCheck;
this.voteOnReplicationFailure = voteOnReplicationFailure;
this.quorumSize = quorumSize;
this.voteRetries = voteRetries;
this.voteRetryWait = voteRetryWait;
}
public ReplicatedPolicy(boolean checkForLiveServer,
@ -86,7 +94,9 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
ReplicaPolicy replicaPolicy,
NetworkHealthCheck networkHealthCheck,
boolean voteOnReplicationFailure,
int quorumSize) {
int quorumSize,
int voteRetries,
long voteRetryWait) {
this.checkForLiveServer = checkForLiveServer;
this.clusterName = clusterName;
this.groupName = groupName;
@ -140,6 +150,8 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
replicaPolicy = new ReplicaPolicy(networkHealthCheck, this);
replicaPolicy.setQuorumSize(quorumSize);
replicaPolicy.setVoteOnReplicationFailure(voteOnReplicationFailure);
replicaPolicy.setVoteRetries(voteRetries);
replicaPolicy.setVoteRetryWait(voteRetryWait);
if (clusterName != null && clusterName.length() > 0) {
replicaPolicy.setClusterName(clusterName);
}

View File

@ -778,7 +778,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
protected void fail(final boolean permanently) {
logger.debug(this + "\n\t::fail being called, permanently=" + permanently);
//we need to make sure we remove the node from the topology so any incoming quorum requests are voted correctly
serverLocator.notifyNodeDown(System.currentTimeMillis(), targetNodeID);
if (queue != null) {
try {
if (logger.isTraceEnabled()) {

View File

@ -252,6 +252,22 @@ public final class QuorumManager implements ClusterTopologyListener, ActiveMQCom
return handlers.get(handler);
}
public TransportConfiguration getLiveTransportConfiguration(String targetServerID) {
TopologyMemberImpl member = clusterController.getDefaultClusterTopology().getMember(targetServerID);
return member != null ? member.getLive() : null;
}
public boolean checkLive(TransportConfiguration liveTransportConfiguration) {
try {
ClusterControl control = clusterController.connectToNode(liveTransportConfiguration);
control.close();
return true;
} catch (Throwable t) {
return false;
}
}
private final class VoteRunnableHolder {
private final QuorumVote quorumVote;

View File

@ -21,6 +21,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.Topology;
@ -33,6 +34,8 @@ import org.apache.activemq.artemis.core.server.NodeManager;
public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener {
private TransportConfiguration liveTransportConfiguration;
public enum BACKUP_ACTIVATION {
FAIL_OVER, FAILURE_REPLICATING, ALREADY_REPLICATING, STOP;
}
@ -47,6 +50,12 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
private final ScheduledExecutorService scheduledPool;
private final int quorumSize;
private final int voteRetries;
private final long voteRetryWait;
private final Object voteGuard = new Object();
private CountDownLatch latch;
private ClientSessionFactoryInternal sessionFactory;
@ -68,13 +77,17 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
NodeManager nodeManager,
ScheduledExecutorService scheduledPool,
NetworkHealthCheck networkHealthCheck,
int quorumSize) {
int quorumSize,
int voteRetries,
long voteRetryWait) {
this.storageManager = storageManager;
this.scheduledPool = scheduledPool;
this.quorumSize = quorumSize;
this.latch = new CountDownLatch(1);
this.nodeManager = nodeManager;
this.networkHealthCheck = networkHealthCheck;
this.voteRetries = voteRetries;
this.voteRetryWait = voteRetryWait;
}
private volatile BACKUP_ACTIVATION signal;
@ -129,6 +142,7 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
public void liveIDSet(String liveID) {
targetServerID = liveID;
nodeManager.setNodeID(liveID);
liveTransportConfiguration = quorumManager.getLiveTransportConfiguration(targetServerID);
//now we are replicating we can start waiting for disconnect notifications so we can fail over
// sessionFactory.addFailureListener(this);
}
@ -267,20 +281,44 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
* @return the voting decision
*/
private boolean isLiveDown() {
//lets assume live is not down
Boolean decision = false;
int voteAttempts = 0;
int size = quorumSize == -1 ? quorumManager.getMaxClusterSize() : quorumSize;
QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, targetServerID);
synchronized (voteGuard) {
while (!decision && voteAttempts++ < voteRetries) {
// a quick check to see if the live actually is dead
if (quorumManager.checkLive(liveTransportConfiguration)) {
//the live is still alive so we best not failover
return false;
}
//the live is dead so lets vote for quorum
QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, targetServerID);
quorumManager.vote(quorumVote);
quorumManager.vote(quorumVote);
try {
quorumVote.await(LATCH_TIMEOUT, TimeUnit.SECONDS);
} catch (InterruptedException interruption) {
// No-op. The best the quorum can do now is to return the latest number it has
try {
quorumVote.await(LATCH_TIMEOUT, TimeUnit.SECONDS);
} catch (InterruptedException interruption) {
// No-op. The best the quorum can do now is to return the latest number it has
}
quorumManager.voteComplete(quorumVote);
decision = quorumVote.getDecision();
if (decision) {
return decision;
}
try {
voteGuard.wait(voteRetryWait);
} catch (InterruptedException e) {
//nothing to do here
}
}
}
quorumManager.voteComplete(quorumVote);
return quorumVote.getDecision();
return decision;
}
}

View File

@ -131,7 +131,7 @@ public final class SharedNothingBackupActivation extends Activation {
logger.trace("Entered a synchronized");
if (closed)
return;
backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize());
backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize(), replicaPolicy.getVoteRetries(), replicaPolicy.getVoteRetryWait());
activeMQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum);
activeMQServer.getClusterManager().getQuorumManager().registerQuorumHandler(new ServerConnectVoteHandler(activeMQServer));
}

View File

@ -2171,6 +2171,22 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="vote-retries" type="xsd:integer" default="12" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
If we start as a replica and lose connection to the master, how many times should we attempt to vote
for quorum before restarting
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="vote-retries" type="xsd:long" default="5000" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
How long to wait (in milliseconds) between each vote
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all>
</xsd:complexType>
<xsd:complexType name="replicaPolicyType">
@ -2259,6 +2275,20 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="vote-retries" type="xsd:integer" default="12" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
If we lose connection to the master, how many times should we attempt to vote for quorum before restarting
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="vote-retries" type="xsd:long" default="5000" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
How long to wait (in milliseconds) between each vote
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all>
</xsd:complexType>
<xsd:complexType name="colocatedReplicaPolicyType">

View File

@ -17,9 +17,22 @@ react which the following details:
By default if a replica loses its replication connection to the live broker it makes a decision as to whether to start or not
with a quorum vote. This of course requires that there be at least 3 pairs of live/backup nodes in the cluster. For a 3 node
cluster it will start if it gets 2 votes back saying that its live server is no longer available, for 4 nodes this would be
3 votes and so on.
3 votes and so on. When a backup loses connection to the master it will keep voting for a quorum until it either receives a vote
allowing it to start or it detects that the master is still live. for the latter it will then restart as a backup. How many votes
and how long between each vote the backup should wait is configured like so:
It's also possible to statically set the quorum size that should be used fotr the case where the cluster size is known up front,
```xml
<ha-policy>
<replication>
<slave>
<vote-retries>12</vote-retries>
<vote-retry-wait>5000</vote-retry-wait>
</slave>
</replication>
</ha-policy>
```
It's also possible to statically set the quorum size that should be used for the case where the cluster size is known up front,
this is done on the Replica Policy like so:
```xml