From 43a92764846d47b32065704c88b925bcc8d72f70 Mon Sep 17 00:00:00 2001 From: Andy Taylor Date: Mon, 6 Feb 2017 16:01:09 +0000 Subject: [PATCH] ARTEMIS-866 - replication improvements add functionality to allow live to vote for quorum on failure Also allow the quorum size to be configurable. https://issues.apache.org/jira/browse/ARTEMIS-866 --- .../config/ActiveMQDefaultConfiguration.java | 11 ++ .../core/config/ConfigurationUtils.java | 4 +- .../config/ha/ReplicaPolicyConfiguration.java | 20 +++ .../ha/ReplicatedPolicyConfiguration.java | 20 +++ .../impl/FileConfigurationParser.java | 8 + .../core/server/cluster/ha/ReplicaPolicy.java | 34 +++- .../server/cluster/ha/ReplicatedPolicy.java | 39 ++++- .../qourum/SharedNothingBackupQuorum.java | 8 +- .../impl/SharedNothingBackupActivation.java | 2 +- .../impl/SharedNothingLiveActivation.java | 47 ++++- .../schema/artemis-configuration.xsd | 30 ++++ docs/user-manual/en/network-isolation.md | 59 ++++++- .../ReplicatedVotingFailoverTest.java | 162 ++++++++++++++++++ 13 files changed, 427 insertions(+), 17 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedVotingFailoverTest.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 8fce7ead40..a26fd1df17 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -467,6 +467,10 @@ public final class ActiveMQDefaultConfiguration { public static final String DEFAULT_INTERNAL_NAMING_PREFIX = "$.artemis.internal."; + public static boolean DEFAULT_VOTE_ON_REPLICATION_FAILURE = false; + + public static int DEFAULT_QUORUM_SIZE = -1; + /** * If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers. */ @@ -1260,4 +1264,11 @@ public final class ActiveMQDefaultConfiguration { return DEFAULT_NETWORK_CHECK_NIC; } + public static boolean getDefaultVoteOnReplicationFailure() { + return DEFAULT_VOTE_ON_REPLICATION_FAILURE; + } + + public static int getDefaultQuorumSize() { + return DEFAULT_QUORUM_SIZE; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java index beeb8dae47..eefd9b1bde 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java @@ -65,11 +65,11 @@ public final class ConfigurationUtils { } case REPLICATED: { ReplicatedPolicyConfiguration pc = (ReplicatedPolicyConfiguration) conf; - return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout(), server.getNetworkHealthCheck()); + return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout(), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize()); } case REPLICA: { ReplicaPolicyConfiguration pc = (ReplicaPolicyConfiguration) conf; - return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(), getScaleDownPolicy(pc.getScaleDownConfiguration()), server.getNetworkHealthCheck()); + return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(), getScaleDownPolicy(pc.getScaleDownConfiguration()), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize()); } case SHARED_STORE_MASTER: { SharedStoreMasterPolicyConfiguration pc = (SharedStoreMasterPolicyConfiguration) conf; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java index 17c83d44a2..0b50882378 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java @@ -39,6 +39,10 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration { private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout(); + private boolean voteOnReplicationFailure = ActiveMQDefaultConfiguration.getDefaultVoteOnReplicationFailure(); + + private int quorumSize = ActiveMQDefaultConfiguration.getDefaultQuorumSize(); + public ReplicaPolicyConfiguration() { } @@ -119,4 +123,20 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration { this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; return this; } + + public boolean getVoteOnReplicationFailure() { + return voteOnReplicationFailure; + } + + public void setVoteOnReplicationFailure(Boolean voteOnReplicationFailure) { + this.voteOnReplicationFailure = voteOnReplicationFailure; + } + + public int getQuorumSize() { + return quorumSize; + } + + public void setQuorumSize(int quorumSize) { + this.quorumSize = quorumSize; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java index 3b84bb7121..9072822c6e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java @@ -29,6 +29,10 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration { private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout(); + private boolean voteOnReplicationFailure = ActiveMQDefaultConfiguration.getDefaultVoteOnReplicationFailure(); + + private int quorumSize = ActiveMQDefaultConfiguration.getDefaultQuorumSize(); + public ReplicatedPolicyConfiguration() { } @@ -71,4 +75,20 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration { public void setInitialReplicationSyncTimeout(long initialReplicationSyncTimeout) { this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; } + + public boolean getVoteOnReplicationFailure() { + return voteOnReplicationFailure; + } + + public void setVoteOnReplicationFailure(boolean voteOnReplicationFailure) { + this.voteOnReplicationFailure = voteOnReplicationFailure; + } + + public int getQuorumSize() { + return quorumSize; + } + + public void setQuorumSize(int quorumSize) { + this.quorumSize = quorumSize; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 6211a1b5a8..2e40e6ade2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -1146,6 +1146,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { configuration.setInitialReplicationSyncTimeout(getLong(policyNode, "initial-replication-sync-timeout", configuration.getInitialReplicationSyncTimeout(), Validators.GT_ZERO)); + configuration.setVoteOnReplicationFailure(getBoolean(policyNode, "vote-on-replication-failure", configuration.getVoteOnReplicationFailure())); + + configuration.setQuorumSize(getInteger(policyNode, "quorum-size", configuration.getQuorumSize(), Validators.MINUS_ONE_OR_GT_ZERO)); + return configuration; } @@ -1166,6 +1170,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { configuration.setScaleDownConfiguration(parseScaleDownConfig(policyNode)); + configuration.setVoteOnReplicationFailure(getBoolean(policyNode, "vote-on-replication-failure", configuration.getVoteOnReplicationFailure())); + + configuration.setQuorumSize(getInteger(policyNode, "quorum-size", configuration.getQuorumSize(), Validators.MINUS_ONE_OR_GT_ZERO)); + return configuration; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java index 68db06eab8..233961073d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java @@ -39,6 +39,16 @@ public class ReplicaPolicy extends BackupPolicy { private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout(); + /* + * what quorum size to use for voting + * */ + private int quorumSize; + + /* + * whether or not this live broker should vote to remain live + * */ + private boolean voteOnReplicationFailure; + private ReplicatedPolicy replicatedPolicy; private final NetworkHealthCheck networkHealthCheck; @@ -60,15 +70,19 @@ public class ReplicaPolicy extends BackupPolicy { boolean allowFailback, long initialReplicationSyncTimeout, ScaleDownPolicy scaleDownPolicy, - NetworkHealthCheck networkHealthCheck) { + NetworkHealthCheck networkHealthCheck, + boolean voteOnReplicationFailure, + int quorumSize) { this.clusterName = clusterName; this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize; this.groupName = groupName; this.restartBackup = restartBackup; this.allowFailback = allowFailback; this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; + this.quorumSize = quorumSize; this.scaleDownPolicy = scaleDownPolicy; this.networkHealthCheck = networkHealthCheck; + this.voteOnReplicationFailure = voteOnReplicationFailure; } public ReplicaPolicy(String clusterName, @@ -101,7 +115,7 @@ public class ReplicaPolicy extends BackupPolicy { public ReplicatedPolicy getReplicatedPolicy() { if (replicatedPolicy == null) { - replicatedPolicy = new ReplicatedPolicy(false, allowFailback, initialReplicationSyncTimeout, groupName, clusterName, this, networkHealthCheck); + replicatedPolicy = new ReplicatedPolicy(false, allowFailback, initialReplicationSyncTimeout, groupName, clusterName, this, networkHealthCheck, voteOnReplicationFailure, quorumSize); } return replicatedPolicy; } @@ -180,4 +194,20 @@ public class ReplicaPolicy extends BackupPolicy { backupActivation.init(); return backupActivation; } + + public void setQuorumSize(int quorumSize) { + this.quorumSize = quorumSize; + } + + public int getQuorumSize() { + return quorumSize; + } + + public void setVoteOnReplicationFailure(boolean voteOnReplicationFailure) { + this.voteOnReplicationFailure = voteOnReplicationFailure; + } + + public boolean isVoteOnReplicationFailure() { + return voteOnReplicationFailure; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java index 82df79c526..f8892af357 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java @@ -40,6 +40,16 @@ public class ReplicatedPolicy implements HAPolicy { * */ private boolean allowAutoFailBack = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback(); + /* + * whether or not this live broker should vote to remain live + * */ + private boolean voteOnReplicationFailure; + + /* + * what quorum size to use for voting + * */ + private int quorumSize; + /* * this are only used as the policy when the server is started as a live after a failover * */ @@ -56,15 +66,16 @@ public class ReplicatedPolicy implements HAPolicy { String groupName, String clusterName, long initialReplicationSyncTimeout, - NetworkHealthCheck networkHealthCheck) { + NetworkHealthCheck networkHealthCheck, + boolean voteOnReplicationFailure, + int quorumSize) { this.checkForLiveServer = checkForLiveServer; this.groupName = groupName; this.clusterName = clusterName; this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; this.networkHealthCheck = networkHealthCheck; - /* - * we create this with sensible defaults in case we start after a failover - * */ + this.voteOnReplicationFailure = voteOnReplicationFailure; + this.quorumSize = quorumSize; } public ReplicatedPolicy(boolean checkForLiveServer, @@ -73,7 +84,9 @@ public class ReplicatedPolicy implements HAPolicy { String groupName, String clusterName, ReplicaPolicy replicaPolicy, - NetworkHealthCheck networkHealthCheck) { + NetworkHealthCheck networkHealthCheck, + boolean voteOnReplicationFailure, + int quorumSize) { this.checkForLiveServer = checkForLiveServer; this.clusterName = clusterName; this.groupName = groupName; @@ -81,6 +94,8 @@ public class ReplicatedPolicy implements HAPolicy { this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; this.replicaPolicy = replicaPolicy; this.networkHealthCheck = networkHealthCheck; + this.voteOnReplicationFailure = voteOnReplicationFailure; + this.quorumSize = quorumSize; } public boolean isCheckForLiveServer() { @@ -123,6 +138,8 @@ public class ReplicatedPolicy implements HAPolicy { public ReplicaPolicy getReplicaPolicy() { if (replicaPolicy == null) { replicaPolicy = new ReplicaPolicy(networkHealthCheck, this); + replicaPolicy.setQuorumSize(quorumSize); + replicaPolicy.setVoteOnReplicationFailure(voteOnReplicationFailure); if (clusterName != null && clusterName.length() > 0) { replicaPolicy.setClusterName(clusterName); } @@ -182,6 +199,10 @@ public class ReplicatedPolicy implements HAPolicy { this.allowAutoFailBack = allowAutoFailBack; } + public boolean isVoteOnReplicationFailure() { + return voteOnReplicationFailure; + } + @Override public LiveActivation createActivation(ActiveMQServerImpl server, boolean wasLive, @@ -189,4 +210,12 @@ public class ReplicatedPolicy implements HAPolicy { ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) { return new SharedNothingLiveActivation(server, this); } + + public int getQuorumSize() { + return quorumSize; + } + + public void setQuorumSize(int quorumSize) { + this.quorumSize = quorumSize; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java index b3e9c32b30..000552a5f0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java @@ -45,6 +45,7 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener private final StorageManager storageManager; private final ScheduledExecutorService scheduledPool; + private final int quorumSize; private CountDownLatch latch; @@ -66,9 +67,11 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener public SharedNothingBackupQuorum(StorageManager storageManager, NodeManager nodeManager, ScheduledExecutorService scheduledPool, - NetworkHealthCheck networkHealthCheck) { + NetworkHealthCheck networkHealthCheck, + int quorumSize) { this.storageManager = storageManager; this.scheduledPool = scheduledPool; + this.quorumSize = quorumSize; this.latch = new CountDownLatch(1); this.nodeManager = nodeManager; this.networkHealthCheck = networkHealthCheck; @@ -257,8 +260,7 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener * @return the voting decision */ private boolean isLiveDown() { - // we use 1 less than the max cluste size as we arent bothered about the replicated live node - int size = quorumManager.getMaxClusterSize(); + int size = quorumSize == -1 ? quorumManager.getMaxClusterSize() : quorumSize; QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, storageManager); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java index cb8c97158c..1f47f91768 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java @@ -121,7 +121,7 @@ public final class SharedNothingBackupActivation extends Activation { synchronized (this) { if (closed) return; - backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck); + backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize()); activeMQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java index c984ae2d6b..ce0563828e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java @@ -52,6 +52,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy; +import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager; +import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteServerConnect; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.jboss.logging.Logger; @@ -215,7 +217,7 @@ public class SharedNothingLiveActivation extends LiveActivation { @Override public void connectionFailed(ActiveMQException exception, boolean failedOver) { - connectionClosed(); + handleClose(true); } @Override @@ -225,6 +227,10 @@ public class SharedNothingLiveActivation extends LiveActivation { @Override public void connectionClosed() { + handleClose(false); + } + + private void handleClose(boolean failed) { ExecutorService executorService = activeMQServer.getThreadPool(); if (executorService != null) { executorService.execute(new Runnable() { @@ -234,6 +240,45 @@ public class SharedNothingLiveActivation extends LiveActivation { if (replicationManager != null) { activeMQServer.getStorageManager().stopReplication(); replicationManager = null; + + if (failed && replicatedPolicy.isVoteOnReplicationFailure()) { + QuorumManager quorumManager = activeMQServer.getClusterManager().getQuorumManager(); + int size = replicatedPolicy.getQuorumSize() == -1 ? quorumManager.getMaxClusterSize() : replicatedPolicy.getQuorumSize(); + + QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, activeMQServer.getStorageManager()); + + quorumManager.vote(quorumVote); + + try { + quorumVote.await(5, TimeUnit.SECONDS); + } catch (InterruptedException interruption) { + // No-op. The best the quorum can do now is to return the latest number it has + } + + quorumManager.voteComplete(quorumVote); + + if (!quorumVote.getDecision()) { + try { + Thread startThread = new Thread(new Runnable() { + @Override + public void run() { + try { + if (logger.isTraceEnabled()) { + logger.trace("Calling activeMQServer.stop() to stop the server"); + } + activeMQServer.stop(); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorRestartingBackupServer(e, activeMQServer); + } + } + }); + startThread.start(); + startThread.join(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } } } } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index bc9363f252..66d2b2b349 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -1983,6 +1983,20 @@ + + + + Whether or not this live broker should vote to remain as live if replication is lost. + + + + + + + The quorum size used for voting after replication loss, -1 means use the current cluster size + + + @@ -2055,6 +2069,22 @@ + + + + If we have to start as a replicated server decide whether or not this live broker should vote to remain + as live if replication is lost. + + + + + + + If we have to start as a replicated server or we are a backup and lose connection to live, the quorum size + used for voting after replication loss, -1 means use the current cluster size + + + diff --git a/docs/user-manual/en/network-isolation.md b/docs/user-manual/en/network-isolation.md index 38b0d36a2f..1ab5e62da8 100644 --- a/docs/user-manual/en/network-isolation.md +++ b/docs/user-manual/en/network-isolation.md @@ -1,8 +1,61 @@ -# Network Isolation +# Network Isolation (Split Brain) -In case the server is isolated, say for a network failure, the server will be isolated for its peers on a network of brokers. If you are playing with replication the backup may think the backup failed and you may endup with two live nodes, what is called the split brain. +It is possible that if a replicated live or backup server becomes isolated in a network that failover will occur and you will end up +with 2 live servers serving messages in a cluster, this we call split brain. There are different configurations you can choose +from that will help mitigate this problem -# Pinging the network +## Quorum Voting + +Quorum voting is used by both the live and the backup to decide what to do if a replication connection is disconnected. +Basically the server will request each live server in the cluster to vote as to whether it thinks the server it is replicating +to or from is still alive. This being the case the minimum number of live/backup pairs needed is 3. If less than 3 pairs +are used then the only option is to use a Network Pinger which is explained later in this chapter or choose how you want each server to +react which the following details: + +### Backup Voting + +By default if a replica loses its replication connection to the live broker it makes a decision as to whether to start or not +with a quorum vote. This of course requires that there be at least 3 pairs of live/backup nodes in the cluster. For a 3 node +cluster it will start if it gets 2 votes back saying that its live server is no longer available, for 4 nodes this would be +3 votes and so on. + +It's also possible to statically set the quorum size that should be used fotr the case where the cluster size is known up front, +this is done on the Replica Policy like so: + +```xml + + + + 2 + + + +``` + +In this example the quorum size is set to 2 so if you were using a single pair and the backup lost connectivity it would +never start. + +### Live Voting + +By default, if the live server loses its replication connection then it will just carry on and wait for a backup to reconnect +and start replicating again. In the event of a possible split brain scenario this may mean that the live stays live even though +the backup has been activated. It is possible to configure the live server to vote for a quorum if this happens, in this way +if the live server doesn't not receive a majority vote then it will shutdown. This is done by setting the _vote-on-replication-failure_ +to true. + +```xml + + + + true + 2 + + + +``` +As in the backup policy it is also possible to statically configure the quorum size. + +## Pinging the network You may configure one more addresses on the broker.xml that are part of your network topology, that will be pinged through the life cycle of the server. diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedVotingFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedVotingFailoverTest.java new file mode 100644 index 0000000000..bf52835709 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedVotingFailoverTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.failover; + +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration; +import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +public class ReplicatedVotingFailoverTest extends FailoverTestBase { + + boolean testBackupFailsVoteFails = false; + @Rule + public TestRule watcher = new TestWatcher() { + @Override + protected void starting(Description description) { + testBackupFailsVoteFails = description.getMethodName().equals("testBackupFailsVoteFails"); + } + + }; + + protected void beforeWaitForRemoteBackupSynchronization() { + } + + @Override + protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) { + return TransportConfigurationUtils.getInVMAcceptor(live); + } + + @Override + protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) { + return TransportConfigurationUtils.getInVMConnector(live); + } + + @Test + public void testBackupFailsVoteSuccess() throws Exception { + try { + beforeWaitForRemoteBackupSynchronization(); + + waitForRemoteBackupSynchronization(backupServer.getServer()); + + backupServer.stop(); + + ServerLocator locator = createInVMLocator(0); + ClientSessionFactory sessionFactory = locator.createSessionFactory(); + ClientSession session = sessionFactory.createSession(); + addClientSession(session); + ClientProducer producer = session.createProducer("testAddress"); + producer.send(session.createMessage(true)); + assertTrue(liveServer.isActive()); + + + } finally { + try { + liveServer.getServer().stop(); + } catch (Throwable ignored) { + } + try { + backupServer.getServer().stop(); + } catch (Throwable ignored) { + } + } + } + + @Test + public void testBackupFailsVoteFails() throws Exception { + try { + beforeWaitForRemoteBackupSynchronization(); + + waitForRemoteBackupSynchronization(backupServer.getServer()); + + backupServer.stop(); + + try { + ServerLocator locator = createInVMLocator(0); + ClientSessionFactory sessionFactory = locator.createSessionFactory(); + ClientSession session = sessionFactory.createSession(); + addClientSession(session); + ClientProducer producer = session.createProducer("testAddress"); + producer.send(session.createMessage(true)); + } catch (Exception e) { + //expected + } + waitForServerToStop(liveServer.getServer()); + assertFalse(liveServer.isStarted()); + + + } finally { + try { + liveServer.getServer().stop(); + } catch (Throwable ignored) { + } + try { + backupServer.getServer().stop(); + } catch (Throwable ignored) { + } + } + } + + @Override + protected void createConfigs() throws Exception { + createReplicatedConfigs(); + } + + @Override + protected void setupHAPolicyConfiguration() { + ((ReplicatedPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setCheckForLiveServer(true); + ((ReplicatedPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setVoteOnReplicationFailure(true); + ((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(2).setAllowFailBack(true); + ((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setRestartBackup(false); + if (testBackupFailsVoteFails) { + ((ReplicatedPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setQuorumSize(2); + } + } + + @Override + protected void crash(boolean waitFailure, ClientSession... sessions) throws Exception { + if (sessions.length > 0) { + for (ClientSession session : sessions) { + waitForRemoteBackup(session.getSessionFactory(), 5, true, backupServer.getServer()); + } + } else { + waitForRemoteBackup(null, 5, true, backupServer.getServer()); + } + super.crash(waitFailure, sessions); + } + + @Override + protected void crash(ClientSession... sessions) throws Exception { + if (sessions.length > 0) { + for (ClientSession session : sessions) { + waitForRemoteBackup(session.getSessionFactory(), 5, true, backupServer.getServer()); + } + } else { + waitForRemoteBackup(null, 5, true, backupServer.getServer()); + } + super.crash(sessions); + } +}