ARTEMIS-866 - replication improvements

add functionality to allow live to vote for quorum on failure

Also allow the quorum size to be configurable.

https://issues.apache.org/jira/browse/ARTEMIS-866
This commit is contained in:
Andy Taylor 2017-02-06 16:01:09 +00:00 committed by Martyn Taylor
parent 7e5ada897e
commit 43a9276484
13 changed files with 427 additions and 17 deletions

View File

@ -467,6 +467,10 @@ public final class ActiveMQDefaultConfiguration {
public static final String DEFAULT_INTERNAL_NAMING_PREFIX = "$.artemis.internal."; public static final String DEFAULT_INTERNAL_NAMING_PREFIX = "$.artemis.internal.";
public static boolean DEFAULT_VOTE_ON_REPLICATION_FAILURE = false;
public static int DEFAULT_QUORUM_SIZE = -1;
/** /**
* If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers. * If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers.
*/ */
@ -1260,4 +1264,11 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_NETWORK_CHECK_NIC; return DEFAULT_NETWORK_CHECK_NIC;
} }
public static boolean getDefaultVoteOnReplicationFailure() {
return DEFAULT_VOTE_ON_REPLICATION_FAILURE;
}
public static int getDefaultQuorumSize() {
return DEFAULT_QUORUM_SIZE;
}
} }

View File

@ -65,11 +65,11 @@ public final class ConfigurationUtils {
} }
case REPLICATED: { case REPLICATED: {
ReplicatedPolicyConfiguration pc = (ReplicatedPolicyConfiguration) conf; ReplicatedPolicyConfiguration pc = (ReplicatedPolicyConfiguration) conf;
return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout(), server.getNetworkHealthCheck()); return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout(), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize());
} }
case REPLICA: { case REPLICA: {
ReplicaPolicyConfiguration pc = (ReplicaPolicyConfiguration) conf; ReplicaPolicyConfiguration pc = (ReplicaPolicyConfiguration) conf;
return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(), getScaleDownPolicy(pc.getScaleDownConfiguration()), server.getNetworkHealthCheck()); return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(), getScaleDownPolicy(pc.getScaleDownConfiguration()), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize());
} }
case SHARED_STORE_MASTER: { case SHARED_STORE_MASTER: {
SharedStoreMasterPolicyConfiguration pc = (SharedStoreMasterPolicyConfiguration) conf; SharedStoreMasterPolicyConfiguration pc = (SharedStoreMasterPolicyConfiguration) conf;

View File

@ -39,6 +39,10 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration {
private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout(); private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout();
private boolean voteOnReplicationFailure = ActiveMQDefaultConfiguration.getDefaultVoteOnReplicationFailure();
private int quorumSize = ActiveMQDefaultConfiguration.getDefaultQuorumSize();
public ReplicaPolicyConfiguration() { public ReplicaPolicyConfiguration() {
} }
@ -119,4 +123,20 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration {
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
return this; return this;
} }
public boolean getVoteOnReplicationFailure() {
return voteOnReplicationFailure;
}
public void setVoteOnReplicationFailure(Boolean voteOnReplicationFailure) {
this.voteOnReplicationFailure = voteOnReplicationFailure;
}
public int getQuorumSize() {
return quorumSize;
}
public void setQuorumSize(int quorumSize) {
this.quorumSize = quorumSize;
}
} }

View File

@ -29,6 +29,10 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration {
private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout(); private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout();
private boolean voteOnReplicationFailure = ActiveMQDefaultConfiguration.getDefaultVoteOnReplicationFailure();
private int quorumSize = ActiveMQDefaultConfiguration.getDefaultQuorumSize();
public ReplicatedPolicyConfiguration() { public ReplicatedPolicyConfiguration() {
} }
@ -71,4 +75,20 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration {
public void setInitialReplicationSyncTimeout(long initialReplicationSyncTimeout) { public void setInitialReplicationSyncTimeout(long initialReplicationSyncTimeout) {
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
} }
public boolean getVoteOnReplicationFailure() {
return voteOnReplicationFailure;
}
public void setVoteOnReplicationFailure(boolean voteOnReplicationFailure) {
this.voteOnReplicationFailure = voteOnReplicationFailure;
}
public int getQuorumSize() {
return quorumSize;
}
public void setQuorumSize(int quorumSize) {
this.quorumSize = quorumSize;
}
} }

View File

@ -1146,6 +1146,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
configuration.setInitialReplicationSyncTimeout(getLong(policyNode, "initial-replication-sync-timeout", configuration.getInitialReplicationSyncTimeout(), Validators.GT_ZERO)); configuration.setInitialReplicationSyncTimeout(getLong(policyNode, "initial-replication-sync-timeout", configuration.getInitialReplicationSyncTimeout(), Validators.GT_ZERO));
configuration.setVoteOnReplicationFailure(getBoolean(policyNode, "vote-on-replication-failure", configuration.getVoteOnReplicationFailure()));
configuration.setQuorumSize(getInteger(policyNode, "quorum-size", configuration.getQuorumSize(), Validators.MINUS_ONE_OR_GT_ZERO));
return configuration; return configuration;
} }
@ -1166,6 +1170,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
configuration.setScaleDownConfiguration(parseScaleDownConfig(policyNode)); configuration.setScaleDownConfiguration(parseScaleDownConfig(policyNode));
configuration.setVoteOnReplicationFailure(getBoolean(policyNode, "vote-on-replication-failure", configuration.getVoteOnReplicationFailure()));
configuration.setQuorumSize(getInteger(policyNode, "quorum-size", configuration.getQuorumSize(), Validators.MINUS_ONE_OR_GT_ZERO));
return configuration; return configuration;
} }

View File

@ -39,6 +39,16 @@ public class ReplicaPolicy extends BackupPolicy {
private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout(); private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout();
/*
* what quorum size to use for voting
* */
private int quorumSize;
/*
* whether or not this live broker should vote to remain live
* */
private boolean voteOnReplicationFailure;
private ReplicatedPolicy replicatedPolicy; private ReplicatedPolicy replicatedPolicy;
private final NetworkHealthCheck networkHealthCheck; private final NetworkHealthCheck networkHealthCheck;
@ -60,15 +70,19 @@ public class ReplicaPolicy extends BackupPolicy {
boolean allowFailback, boolean allowFailback,
long initialReplicationSyncTimeout, long initialReplicationSyncTimeout,
ScaleDownPolicy scaleDownPolicy, ScaleDownPolicy scaleDownPolicy,
NetworkHealthCheck networkHealthCheck) { NetworkHealthCheck networkHealthCheck,
boolean voteOnReplicationFailure,
int quorumSize) {
this.clusterName = clusterName; this.clusterName = clusterName;
this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize; this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize;
this.groupName = groupName; this.groupName = groupName;
this.restartBackup = restartBackup; this.restartBackup = restartBackup;
this.allowFailback = allowFailback; this.allowFailback = allowFailback;
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
this.quorumSize = quorumSize;
this.scaleDownPolicy = scaleDownPolicy; this.scaleDownPolicy = scaleDownPolicy;
this.networkHealthCheck = networkHealthCheck; this.networkHealthCheck = networkHealthCheck;
this.voteOnReplicationFailure = voteOnReplicationFailure;
} }
public ReplicaPolicy(String clusterName, public ReplicaPolicy(String clusterName,
@ -101,7 +115,7 @@ public class ReplicaPolicy extends BackupPolicy {
public ReplicatedPolicy getReplicatedPolicy() { public ReplicatedPolicy getReplicatedPolicy() {
if (replicatedPolicy == null) { if (replicatedPolicy == null) {
replicatedPolicy = new ReplicatedPolicy(false, allowFailback, initialReplicationSyncTimeout, groupName, clusterName, this, networkHealthCheck); replicatedPolicy = new ReplicatedPolicy(false, allowFailback, initialReplicationSyncTimeout, groupName, clusterName, this, networkHealthCheck, voteOnReplicationFailure, quorumSize);
} }
return replicatedPolicy; return replicatedPolicy;
} }
@ -180,4 +194,20 @@ public class ReplicaPolicy extends BackupPolicy {
backupActivation.init(); backupActivation.init();
return backupActivation; return backupActivation;
} }
public void setQuorumSize(int quorumSize) {
this.quorumSize = quorumSize;
}
public int getQuorumSize() {
return quorumSize;
}
public void setVoteOnReplicationFailure(boolean voteOnReplicationFailure) {
this.voteOnReplicationFailure = voteOnReplicationFailure;
}
public boolean isVoteOnReplicationFailure() {
return voteOnReplicationFailure;
}
} }

View File

@ -40,6 +40,16 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
* */ * */
private boolean allowAutoFailBack = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback(); private boolean allowAutoFailBack = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback();
/*
* whether or not this live broker should vote to remain live
* */
private boolean voteOnReplicationFailure;
/*
* what quorum size to use for voting
* */
private int quorumSize;
/* /*
* this are only used as the policy when the server is started as a live after a failover * this are only used as the policy when the server is started as a live after a failover
* */ * */
@ -56,15 +66,16 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
String groupName, String groupName,
String clusterName, String clusterName,
long initialReplicationSyncTimeout, long initialReplicationSyncTimeout,
NetworkHealthCheck networkHealthCheck) { NetworkHealthCheck networkHealthCheck,
boolean voteOnReplicationFailure,
int quorumSize) {
this.checkForLiveServer = checkForLiveServer; this.checkForLiveServer = checkForLiveServer;
this.groupName = groupName; this.groupName = groupName;
this.clusterName = clusterName; this.clusterName = clusterName;
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
this.networkHealthCheck = networkHealthCheck; this.networkHealthCheck = networkHealthCheck;
/* this.voteOnReplicationFailure = voteOnReplicationFailure;
* we create this with sensible defaults in case we start after a failover this.quorumSize = quorumSize;
* */
} }
public ReplicatedPolicy(boolean checkForLiveServer, public ReplicatedPolicy(boolean checkForLiveServer,
@ -73,7 +84,9 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
String groupName, String groupName,
String clusterName, String clusterName,
ReplicaPolicy replicaPolicy, ReplicaPolicy replicaPolicy,
NetworkHealthCheck networkHealthCheck) { NetworkHealthCheck networkHealthCheck,
boolean voteOnReplicationFailure,
int quorumSize) {
this.checkForLiveServer = checkForLiveServer; this.checkForLiveServer = checkForLiveServer;
this.clusterName = clusterName; this.clusterName = clusterName;
this.groupName = groupName; this.groupName = groupName;
@ -81,6 +94,8 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
this.replicaPolicy = replicaPolicy; this.replicaPolicy = replicaPolicy;
this.networkHealthCheck = networkHealthCheck; this.networkHealthCheck = networkHealthCheck;
this.voteOnReplicationFailure = voteOnReplicationFailure;
this.quorumSize = quorumSize;
} }
public boolean isCheckForLiveServer() { public boolean isCheckForLiveServer() {
@ -123,6 +138,8 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
public ReplicaPolicy getReplicaPolicy() { public ReplicaPolicy getReplicaPolicy() {
if (replicaPolicy == null) { if (replicaPolicy == null) {
replicaPolicy = new ReplicaPolicy(networkHealthCheck, this); replicaPolicy = new ReplicaPolicy(networkHealthCheck, this);
replicaPolicy.setQuorumSize(quorumSize);
replicaPolicy.setVoteOnReplicationFailure(voteOnReplicationFailure);
if (clusterName != null && clusterName.length() > 0) { if (clusterName != null && clusterName.length() > 0) {
replicaPolicy.setClusterName(clusterName); replicaPolicy.setClusterName(clusterName);
} }
@ -182,6 +199,10 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
this.allowAutoFailBack = allowAutoFailBack; this.allowAutoFailBack = allowAutoFailBack;
} }
public boolean isVoteOnReplicationFailure() {
return voteOnReplicationFailure;
}
@Override @Override
public LiveActivation createActivation(ActiveMQServerImpl server, public LiveActivation createActivation(ActiveMQServerImpl server,
boolean wasLive, boolean wasLive,
@ -189,4 +210,12 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) { ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) {
return new SharedNothingLiveActivation(server, this); return new SharedNothingLiveActivation(server, this);
} }
public int getQuorumSize() {
return quorumSize;
}
public void setQuorumSize(int quorumSize) {
this.quorumSize = quorumSize;
}
} }

View File

@ -45,6 +45,7 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
private final StorageManager storageManager; private final StorageManager storageManager;
private final ScheduledExecutorService scheduledPool; private final ScheduledExecutorService scheduledPool;
private final int quorumSize;
private CountDownLatch latch; private CountDownLatch latch;
@ -66,9 +67,11 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
public SharedNothingBackupQuorum(StorageManager storageManager, public SharedNothingBackupQuorum(StorageManager storageManager,
NodeManager nodeManager, NodeManager nodeManager,
ScheduledExecutorService scheduledPool, ScheduledExecutorService scheduledPool,
NetworkHealthCheck networkHealthCheck) { NetworkHealthCheck networkHealthCheck,
int quorumSize) {
this.storageManager = storageManager; this.storageManager = storageManager;
this.scheduledPool = scheduledPool; this.scheduledPool = scheduledPool;
this.quorumSize = quorumSize;
this.latch = new CountDownLatch(1); this.latch = new CountDownLatch(1);
this.nodeManager = nodeManager; this.nodeManager = nodeManager;
this.networkHealthCheck = networkHealthCheck; this.networkHealthCheck = networkHealthCheck;
@ -257,8 +260,7 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
* @return the voting decision * @return the voting decision
*/ */
private boolean isLiveDown() { private boolean isLiveDown() {
// we use 1 less than the max cluste size as we arent bothered about the replicated live node int size = quorumSize == -1 ? quorumManager.getMaxClusterSize() : quorumSize;
int size = quorumManager.getMaxClusterSize();
QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, storageManager); QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, storageManager);

View File

@ -121,7 +121,7 @@ public final class SharedNothingBackupActivation extends Activation {
synchronized (this) { synchronized (this) {
if (closed) if (closed)
return; return;
backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck); backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize());
activeMQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum); activeMQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum);
} }

View File

@ -52,6 +52,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy; import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy;
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager;
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteServerConnect;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
@ -215,7 +217,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
@Override @Override
public void connectionFailed(ActiveMQException exception, boolean failedOver) { public void connectionFailed(ActiveMQException exception, boolean failedOver) {
connectionClosed(); handleClose(true);
} }
@Override @Override
@ -225,6 +227,10 @@ public class SharedNothingLiveActivation extends LiveActivation {
@Override @Override
public void connectionClosed() { public void connectionClosed() {
handleClose(false);
}
private void handleClose(boolean failed) {
ExecutorService executorService = activeMQServer.getThreadPool(); ExecutorService executorService = activeMQServer.getThreadPool();
if (executorService != null) { if (executorService != null) {
executorService.execute(new Runnable() { executorService.execute(new Runnable() {
@ -234,6 +240,45 @@ public class SharedNothingLiveActivation extends LiveActivation {
if (replicationManager != null) { if (replicationManager != null) {
activeMQServer.getStorageManager().stopReplication(); activeMQServer.getStorageManager().stopReplication();
replicationManager = null; replicationManager = null;
if (failed && replicatedPolicy.isVoteOnReplicationFailure()) {
QuorumManager quorumManager = activeMQServer.getClusterManager().getQuorumManager();
int size = replicatedPolicy.getQuorumSize() == -1 ? quorumManager.getMaxClusterSize() : replicatedPolicy.getQuorumSize();
QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, activeMQServer.getStorageManager());
quorumManager.vote(quorumVote);
try {
quorumVote.await(5, TimeUnit.SECONDS);
} catch (InterruptedException interruption) {
// No-op. The best the quorum can do now is to return the latest number it has
}
quorumManager.voteComplete(quorumVote);
if (!quorumVote.getDecision()) {
try {
Thread startThread = new Thread(new Runnable() {
@Override
public void run() {
try {
if (logger.isTraceEnabled()) {
logger.trace("Calling activeMQServer.stop() to stop the server");
}
activeMQServer.stop();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorRestartingBackupServer(e, activeMQServer);
}
}
});
startThread.start();
startThread.join();
} catch (Exception e) {
e.printStackTrace();
}
}
}
} }
} }
} }

View File

@ -1983,6 +1983,20 @@
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="vote-on-replication-failure" type="xsd:boolean" default="false" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
Whether or not this live broker should vote to remain as live if replication is lost.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="quorum-size" type="xsd:integer" default="-1" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
The quorum size used for voting after replication loss, -1 means use the current cluster size
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all> </xsd:all>
</xsd:complexType> </xsd:complexType>
<xsd:complexType name="replicaPolicyType"> <xsd:complexType name="replicaPolicyType">
@ -2055,6 +2069,22 @@
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="vote-on-replication-failure" type="xsd:boolean" default="false" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
If we have to start as a replicated server decide whether or not this live broker should vote to remain
as live if replication is lost.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="quorum-size" type="xsd:integer" default="-1" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
If we have to start as a replicated server or we are a backup and lose connection to live, the quorum size
used for voting after replication loss, -1 means use the current cluster size
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all> </xsd:all>
</xsd:complexType> </xsd:complexType>
<xsd:complexType name="colocatedReplicaPolicyType"> <xsd:complexType name="colocatedReplicaPolicyType">

View File

@ -1,8 +1,61 @@
# Network Isolation # Network Isolation (Split Brain)
In case the server is isolated, say for a network failure, the server will be isolated for its peers on a network of brokers. If you are playing with replication the backup may think the backup failed and you may endup with two live nodes, what is called the split brain. It is possible that if a replicated live or backup server becomes isolated in a network that failover will occur and you will end up
with 2 live servers serving messages in a cluster, this we call split brain. There are different configurations you can choose
from that will help mitigate this problem
# Pinging the network ## Quorum Voting
Quorum voting is used by both the live and the backup to decide what to do if a replication connection is disconnected.
Basically the server will request each live server in the cluster to vote as to whether it thinks the server it is replicating
to or from is still alive. This being the case the minimum number of live/backup pairs needed is 3. If less than 3 pairs
are used then the only option is to use a Network Pinger which is explained later in this chapter or choose how you want each server to
react which the following details:
### Backup Voting
By default if a replica loses its replication connection to the live broker it makes a decision as to whether to start or not
with a quorum vote. This of course requires that there be at least 3 pairs of live/backup nodes in the cluster. For a 3 node
cluster it will start if it gets 2 votes back saying that its live server is no longer available, for 4 nodes this would be
3 votes and so on.
It's also possible to statically set the quorum size that should be used fotr the case where the cluster size is known up front,
this is done on the Replica Policy like so:
```xml
<ha-policy>
<replication>
<slave>
<quorum-size>2</quorum-size>
</slave>
</replication>
</ha-policy>
```
In this example the quorum size is set to 2 so if you were using a single pair and the backup lost connectivity it would
never start.
### Live Voting
By default, if the live server loses its replication connection then it will just carry on and wait for a backup to reconnect
and start replicating again. In the event of a possible split brain scenario this may mean that the live stays live even though
the backup has been activated. It is possible to configure the live server to vote for a quorum if this happens, in this way
if the live server doesn't not receive a majority vote then it will shutdown. This is done by setting the _vote-on-replication-failure_
to true.
```xml
<ha-policy>
<replication>
<master>
<vote-on-replication-failure>true</vote-on-replication-failure>
<quorum-size>2</quorum-size>
</master>
</replication>
</ha-policy>
```
As in the backup policy it is also possible to statically configure the quorum size.
## Pinging the network
You may configure one more addresses on the broker.xml that are part of your network topology, that will be pinged through the life cycle of the server. You may configure one more addresses on the broker.xml that are part of your network topology, that will be pinged through the life cycle of the server.

View File

@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster.failover;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
public class ReplicatedVotingFailoverTest extends FailoverTestBase {
boolean testBackupFailsVoteFails = false;
@Rule
public TestRule watcher = new TestWatcher() {
@Override
protected void starting(Description description) {
testBackupFailsVoteFails = description.getMethodName().equals("testBackupFailsVoteFails");
}
};
protected void beforeWaitForRemoteBackupSynchronization() {
}
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
return TransportConfigurationUtils.getInVMAcceptor(live);
}
@Override
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
return TransportConfigurationUtils.getInVMConnector(live);
}
@Test
public void testBackupFailsVoteSuccess() throws Exception {
try {
beforeWaitForRemoteBackupSynchronization();
waitForRemoteBackupSynchronization(backupServer.getServer());
backupServer.stop();
ServerLocator locator = createInVMLocator(0);
ClientSessionFactory sessionFactory = locator.createSessionFactory();
ClientSession session = sessionFactory.createSession();
addClientSession(session);
ClientProducer producer = session.createProducer("testAddress");
producer.send(session.createMessage(true));
assertTrue(liveServer.isActive());
} finally {
try {
liveServer.getServer().stop();
} catch (Throwable ignored) {
}
try {
backupServer.getServer().stop();
} catch (Throwable ignored) {
}
}
}
@Test
public void testBackupFailsVoteFails() throws Exception {
try {
beforeWaitForRemoteBackupSynchronization();
waitForRemoteBackupSynchronization(backupServer.getServer());
backupServer.stop();
try {
ServerLocator locator = createInVMLocator(0);
ClientSessionFactory sessionFactory = locator.createSessionFactory();
ClientSession session = sessionFactory.createSession();
addClientSession(session);
ClientProducer producer = session.createProducer("testAddress");
producer.send(session.createMessage(true));
} catch (Exception e) {
//expected
}
waitForServerToStop(liveServer.getServer());
assertFalse(liveServer.isStarted());
} finally {
try {
liveServer.getServer().stop();
} catch (Throwable ignored) {
}
try {
backupServer.getServer().stop();
} catch (Throwable ignored) {
}
}
}
@Override
protected void createConfigs() throws Exception {
createReplicatedConfigs();
}
@Override
protected void setupHAPolicyConfiguration() {
((ReplicatedPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setCheckForLiveServer(true);
((ReplicatedPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setVoteOnReplicationFailure(true);
((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(2).setAllowFailBack(true);
((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setRestartBackup(false);
if (testBackupFailsVoteFails) {
((ReplicatedPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setQuorumSize(2);
}
}
@Override
protected void crash(boolean waitFailure, ClientSession... sessions) throws Exception {
if (sessions.length > 0) {
for (ClientSession session : sessions) {
waitForRemoteBackup(session.getSessionFactory(), 5, true, backupServer.getServer());
}
} else {
waitForRemoteBackup(null, 5, true, backupServer.getServer());
}
super.crash(waitFailure, sessions);
}
@Override
protected void crash(ClientSession... sessions) throws Exception {
if (sessions.length > 0) {
for (ClientSession session : sessions) {
waitForRemoteBackup(session.getSessionFactory(), 5, true, backupServer.getServer());
}
} else {
waitForRemoteBackup(null, 5, true, backupServer.getServer());
}
super.crash(sessions);
}
}