diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index a409ffbbfb..9b5d75d322 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -485,6 +485,12 @@ public final class ActiveMQDefaultConfiguration { public static boolean DEFAULT_VOTE_ON_REPLICATION_FAILURE = false; + //how many times we retry a vote before restarting as a backup + private static int DEFAULT_VOTE_RETRIES = 12; + + //how long we wait between votes, 5 secs + private static long DEFAULT_VOTE_RETRY_WAIT = 5000; + public static int DEFAULT_QUORUM_SIZE = -1; public static final boolean DEFAULT_ANALYZE_CRITICAL = true; @@ -1334,4 +1340,11 @@ public final class ActiveMQDefaultConfiguration { } + public static int getDefaultVoteRetries() { + return DEFAULT_VOTE_RETRIES; + } + + public static long getDefaultVoteRetryWait() { + return DEFAULT_VOTE_RETRY_WAIT; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java index 95f524f326..7ea26ae0a8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java @@ -72,11 +72,11 @@ public final class ConfigurationUtils { } case REPLICATED: { ReplicatedPolicyConfiguration pc = (ReplicatedPolicyConfiguration) conf; - return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout(), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize()); + return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout(), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait()); } case REPLICA: { ReplicaPolicyConfiguration pc = (ReplicaPolicyConfiguration) conf; - return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(), getScaleDownPolicy(pc.getScaleDownConfiguration()), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize()); + return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(), getScaleDownPolicy(pc.getScaleDownConfiguration()), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait()); } case SHARED_STORE_MASTER: { SharedStoreMasterPolicyConfiguration pc = (SharedStoreMasterPolicyConfiguration) conf; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java index 0b50882378..0e6c82d64d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java @@ -43,6 +43,10 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration { private int quorumSize = ActiveMQDefaultConfiguration.getDefaultQuorumSize(); + private int voteRetries = ActiveMQDefaultConfiguration.getDefaultVoteRetries(); + + private long voteRetryWait = ActiveMQDefaultConfiguration.getDefaultVoteRetryWait(); + public ReplicaPolicyConfiguration() { } @@ -139,4 +143,20 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration { public void setQuorumSize(int quorumSize) { this.quorumSize = quorumSize; } + + public int getVoteRetries() { + return voteRetries; + } + + public void setVoteRetries(int voteRetries) { + this.voteRetries = voteRetries; + } + + public void setVoteRetryWait(long voteRetryWait) { + this.voteRetryWait = voteRetryWait; + } + + public long getVoteRetryWait() { + return voteRetryWait; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java index 9072822c6e..68d69bb618 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java @@ -33,6 +33,10 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration { private int quorumSize = ActiveMQDefaultConfiguration.getDefaultQuorumSize(); + private int voteRetries = ActiveMQDefaultConfiguration.getDefaultVoteRetries(); + + private long voteRetryWait = ActiveMQDefaultConfiguration.getDefaultVoteRetryWait(); + public ReplicatedPolicyConfiguration() { } @@ -91,4 +95,21 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration { public void setQuorumSize(int quorumSize) { this.quorumSize = quorumSize; } + + + public int getVoteRetries() { + return voteRetries; + } + + public void setVoteRetries(int voteRetries) { + this.voteRetries = voteRetries; + } + + public void setVoteRetryWait(long voteRetryWait) { + this.voteRetryWait = voteRetryWait; + } + + public long getVoteRetryWait() { + return voteRetryWait; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 6c64a3bd87..7f71c86c37 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -1284,6 +1284,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { configuration.setVoteOnReplicationFailure(getBoolean(policyNode, "vote-on-replication-failure", configuration.getVoteOnReplicationFailure())); + configuration.setVoteRetries(getInteger(policyNode, "vote-retries", configuration.getVoteRetries(), Validators.MINUS_ONE_OR_GE_ZERO)); + + configuration.setVoteRetryWait(getLong(policyNode, "vote-retry-wait", configuration.getVoteRetryWait(), Validators.GT_ZERO)); + configuration.setQuorumSize(getInteger(policyNode, "quorum-size", configuration.getQuorumSize(), Validators.MINUS_ONE_OR_GT_ZERO)); return configuration; @@ -1308,6 +1312,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { configuration.setVoteOnReplicationFailure(getBoolean(policyNode, "vote-on-replication-failure", configuration.getVoteOnReplicationFailure())); + configuration.setVoteRetries(getInteger(policyNode, "vote-retries", configuration.getVoteRetries(), Validators.MINUS_ONE_OR_GE_ZERO)); + + configuration.setVoteRetryWait(getLong(policyNode, "vote-retry-wait", configuration.getVoteRetryWait(), Validators.GT_ZERO)); + configuration.setQuorumSize(getInteger(policyNode, "quorum-size", configuration.getQuorumSize(), Validators.MINUS_ONE_OR_GT_ZERO)); return configuration; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java index 233961073d..40559cf132 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java @@ -53,6 +53,10 @@ public class ReplicaPolicy extends BackupPolicy { private final NetworkHealthCheck networkHealthCheck; + private int voteRetries; + + private long voteRetryWait; + public ReplicaPolicy(final NetworkHealthCheck networkHealthCheck) { this.networkHealthCheck = networkHealthCheck; } @@ -72,7 +76,9 @@ public class ReplicaPolicy extends BackupPolicy { ScaleDownPolicy scaleDownPolicy, NetworkHealthCheck networkHealthCheck, boolean voteOnReplicationFailure, - int quorumSize) { + int quorumSize, + int voteRetries, + long voteRetryWait) { this.clusterName = clusterName; this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize; this.groupName = groupName; @@ -80,6 +86,8 @@ public class ReplicaPolicy extends BackupPolicy { this.allowFailback = allowFailback; this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; this.quorumSize = quorumSize; + this.voteRetries = voteRetries; + this.voteRetryWait = voteRetryWait; this.scaleDownPolicy = scaleDownPolicy; this.networkHealthCheck = networkHealthCheck; this.voteOnReplicationFailure = voteOnReplicationFailure; @@ -115,7 +123,7 @@ public class ReplicaPolicy extends BackupPolicy { public ReplicatedPolicy getReplicatedPolicy() { if (replicatedPolicy == null) { - replicatedPolicy = new ReplicatedPolicy(false, allowFailback, initialReplicationSyncTimeout, groupName, clusterName, this, networkHealthCheck, voteOnReplicationFailure, quorumSize); + replicatedPolicy = new ReplicatedPolicy(false, allowFailback, initialReplicationSyncTimeout, groupName, clusterName, this, networkHealthCheck, voteOnReplicationFailure, quorumSize, voteRetries, voteRetryWait); } return replicatedPolicy; } @@ -210,4 +218,20 @@ public class ReplicaPolicy extends BackupPolicy { public boolean isVoteOnReplicationFailure() { return voteOnReplicationFailure; } + + public void setVoteRetries(int voteRetries) { + this.voteRetries = voteRetries; + } + + public void setVoteRetryWait(long voteRetryWait) { + this.voteRetryWait = voteRetryWait; + } + + public int getVoteRetries() { + return voteRetries; + } + + public long getVoteRetryWait() { + return voteRetryWait; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java index f8892af357..135e8d0f39 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java @@ -50,6 +50,10 @@ public class ReplicatedPolicy implements HAPolicy { * */ private int quorumSize; + private int voteRetries; + + private long voteRetryWait; + /* * this are only used as the policy when the server is started as a live after a failover * */ @@ -68,7 +72,9 @@ public class ReplicatedPolicy implements HAPolicy { long initialReplicationSyncTimeout, NetworkHealthCheck networkHealthCheck, boolean voteOnReplicationFailure, - int quorumSize) { + int quorumSize, + int voteRetries, + long voteRetryWait) { this.checkForLiveServer = checkForLiveServer; this.groupName = groupName; this.clusterName = clusterName; @@ -76,6 +82,8 @@ public class ReplicatedPolicy implements HAPolicy { this.networkHealthCheck = networkHealthCheck; this.voteOnReplicationFailure = voteOnReplicationFailure; this.quorumSize = quorumSize; + this.voteRetries = voteRetries; + this.voteRetryWait = voteRetryWait; } public ReplicatedPolicy(boolean checkForLiveServer, @@ -86,7 +94,9 @@ public class ReplicatedPolicy implements HAPolicy { ReplicaPolicy replicaPolicy, NetworkHealthCheck networkHealthCheck, boolean voteOnReplicationFailure, - int quorumSize) { + int quorumSize, + int voteRetries, + long voteRetryWait) { this.checkForLiveServer = checkForLiveServer; this.clusterName = clusterName; this.groupName = groupName; @@ -140,6 +150,8 @@ public class ReplicatedPolicy implements HAPolicy { replicaPolicy = new ReplicaPolicy(networkHealthCheck, this); replicaPolicy.setQuorumSize(quorumSize); replicaPolicy.setVoteOnReplicationFailure(voteOnReplicationFailure); + replicaPolicy.setVoteRetries(voteRetries); + replicaPolicy.setVoteRetryWait(voteRetryWait); if (clusterName != null && clusterName.length() > 0) { replicaPolicy.setClusterName(clusterName); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index 3aa82a1bf8..4790fdae0e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -778,7 +778,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled protected void fail(final boolean permanently) { logger.debug(this + "\n\t::fail being called, permanently=" + permanently); - + //we need to make sure we remove the node from the topology so any incoming quorum requests are voted correctly + serverLocator.notifyNodeDown(System.currentTimeMillis(), targetNodeID); if (queue != null) { try { if (logger.isTraceEnabled()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumManager.java index 77a7d18c67..9b8b64718a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumManager.java @@ -252,6 +252,22 @@ public final class QuorumManager implements ClusterTopologyListener, ActiveMQCom return handlers.get(handler); } + public TransportConfiguration getLiveTransportConfiguration(String targetServerID) { + TopologyMemberImpl member = clusterController.getDefaultClusterTopology().getMember(targetServerID); + return member != null ? member.getLive() : null; + } + + public boolean checkLive(TransportConfiguration liveTransportConfiguration) { + try { + ClusterControl control = clusterController.connectToNode(liveTransportConfiguration); + control.close(); + return true; + } catch (Throwable t) { + return false; + } + } + + private final class VoteRunnableHolder { private final QuorumVote quorumVote; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java index 330b53af80..029767b30c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java @@ -21,6 +21,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.Topology; @@ -33,6 +34,8 @@ import org.apache.activemq.artemis.core.server.NodeManager; public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener { + private TransportConfiguration liveTransportConfiguration; + public enum BACKUP_ACTIVATION { FAIL_OVER, FAILURE_REPLICATING, ALREADY_REPLICATING, STOP; } @@ -47,6 +50,12 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener private final ScheduledExecutorService scheduledPool; private final int quorumSize; + private final int voteRetries; + + private final long voteRetryWait; + + private final Object voteGuard = new Object(); + private CountDownLatch latch; private ClientSessionFactoryInternal sessionFactory; @@ -68,13 +77,17 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener NodeManager nodeManager, ScheduledExecutorService scheduledPool, NetworkHealthCheck networkHealthCheck, - int quorumSize) { + int quorumSize, + int voteRetries, + long voteRetryWait) { this.storageManager = storageManager; this.scheduledPool = scheduledPool; this.quorumSize = quorumSize; this.latch = new CountDownLatch(1); this.nodeManager = nodeManager; this.networkHealthCheck = networkHealthCheck; + this.voteRetries = voteRetries; + this.voteRetryWait = voteRetryWait; } private volatile BACKUP_ACTIVATION signal; @@ -129,6 +142,7 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener public void liveIDSet(String liveID) { targetServerID = liveID; nodeManager.setNodeID(liveID); + liveTransportConfiguration = quorumManager.getLiveTransportConfiguration(targetServerID); //now we are replicating we can start waiting for disconnect notifications so we can fail over // sessionFactory.addFailureListener(this); } @@ -267,20 +281,44 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener * @return the voting decision */ private boolean isLiveDown() { + //lets assume live is not down + Boolean decision = false; + int voteAttempts = 0; int size = quorumSize == -1 ? quorumManager.getMaxClusterSize() : quorumSize; - QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, targetServerID); + synchronized (voteGuard) { + while (!decision && voteAttempts++ < voteRetries) { + // a quick check to see if the live actually is dead + if (quorumManager.checkLive(liveTransportConfiguration)) { + //the live is still alive so we best not failover + return false; + } + //the live is dead so lets vote for quorum + QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, targetServerID); - quorumManager.vote(quorumVote); + quorumManager.vote(quorumVote); - try { - quorumVote.await(LATCH_TIMEOUT, TimeUnit.SECONDS); - } catch (InterruptedException interruption) { - // No-op. The best the quorum can do now is to return the latest number it has + try { + quorumVote.await(LATCH_TIMEOUT, TimeUnit.SECONDS); + } catch (InterruptedException interruption) { + // No-op. The best the quorum can do now is to return the latest number it has + } + + quorumManager.voteComplete(quorumVote); + + decision = quorumVote.getDecision(); + + if (decision) { + return decision; + } + try { + voteGuard.wait(voteRetryWait); + } catch (InterruptedException e) { + //nothing to do here + } + } } - quorumManager.voteComplete(quorumVote); - - return quorumVote.getDecision(); + return decision; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java index 06a3afba1b..58742de97f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java @@ -131,7 +131,7 @@ public final class SharedNothingBackupActivation extends Activation { logger.trace("Entered a synchronized"); if (closed) return; - backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize()); + backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize(), replicaPolicy.getVoteRetries(), replicaPolicy.getVoteRetryWait()); activeMQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum); activeMQServer.getClusterManager().getQuorumManager().registerQuorumHandler(new ServerConnectVoteHandler(activeMQServer)); } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index a6155544ba..e76478cb47 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -2171,6 +2171,22 @@ + + + + + If we start as a replica and lose connection to the master, how many times should we attempt to vote + for quorum before restarting + + + + + + + How long to wait (in milliseconds) between each vote + + + @@ -2259,6 +2275,20 @@ + + + + If we lose connection to the master, how many times should we attempt to vote for quorum before restarting + + + + + + + How long to wait (in milliseconds) between each vote + + + diff --git a/docs/user-manual/en/network-isolation.md b/docs/user-manual/en/network-isolation.md index 06eda3167f..ded89b7ab4 100644 --- a/docs/user-manual/en/network-isolation.md +++ b/docs/user-manual/en/network-isolation.md @@ -17,9 +17,22 @@ react which the following details: By default if a replica loses its replication connection to the live broker it makes a decision as to whether to start or not with a quorum vote. This of course requires that there be at least 3 pairs of live/backup nodes in the cluster. For a 3 node cluster it will start if it gets 2 votes back saying that its live server is no longer available, for 4 nodes this would be -3 votes and so on. +3 votes and so on. When a backup loses connection to the master it will keep voting for a quorum until it either receives a vote +allowing it to start or it detects that the master is still live. for the latter it will then restart as a backup. How many votes +and how long between each vote the backup should wait is configured like so: -It's also possible to statically set the quorum size that should be used fotr the case where the cluster size is known up front, +```xml + + + + 12 + 5000 + + + +``` + +It's also possible to statically set the quorum size that should be used for the case where the cluster size is known up front, this is done on the Replica Policy like so: ```xml