ARTEMIS-1866 Make Quorum vote result wait time configurable.

Quorum voting is used by both the live and the backup to decide what to do if a replication connection is disconnected.
Basically, the server will request each live server in the cluster to vote as to whether it thinks the server it is replicating to or from is still alive.
You can also configure the time for which the quorum manager will wait for the quorum vote response.
Currently, the value is hardcoded as 30 sec. We should change this 30-second wait to be configurable.
This commit is contained in:
saurabhrai 2018-05-14 17:59:48 +05:30 committed by Clebert Suconic
parent a6c54a7926
commit d11eed2f9e
24 changed files with 214 additions and 45 deletions

View File

@ -497,6 +497,9 @@ public final class ActiveMQDefaultConfiguration {
//how long we wait between votes, 5 secs //how long we wait between votes, 5 secs
private static long DEFAULT_VOTE_RETRY_WAIT = 5000; private static long DEFAULT_VOTE_RETRY_WAIT = 5000;
//how long we wait for vote result, 30 secs
private static int DEFAULT_QUORUM_VOTE_WAIT = 30;
public static int DEFAULT_QUORUM_SIZE = -1; public static int DEFAULT_QUORUM_SIZE = -1;
public static final boolean DEFAULT_ANALYZE_CRITICAL = true; public static final boolean DEFAULT_ANALYZE_CRITICAL = true;
@ -1365,4 +1368,8 @@ public final class ActiveMQDefaultConfiguration {
public static long getDefaultVoteRetryWait() { public static long getDefaultVoteRetryWait() {
return DEFAULT_VOTE_RETRY_WAIT; return DEFAULT_VOTE_RETRY_WAIT;
} }
public static int getDefaultQuorumVoteWait() {
return DEFAULT_QUORUM_VOTE_WAIT;
}
} }

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.config;
import java.net.URI; import java.net.URI;
import java.util.List; import java.util.List;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.ha.ColocatedPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.ColocatedPolicyConfiguration;
@ -72,11 +73,11 @@ public final class ConfigurationUtils {
} }
case REPLICATED: { case REPLICATED: {
ReplicatedPolicyConfiguration pc = (ReplicatedPolicyConfiguration) conf; ReplicatedPolicyConfiguration pc = (ReplicatedPolicyConfiguration) conf;
return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout(), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait()); return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout(), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait(), pc.getQuorumVoteWait());
} }
case REPLICA: { case REPLICA: {
ReplicaPolicyConfiguration pc = (ReplicaPolicyConfiguration) conf; ReplicaPolicyConfiguration pc = (ReplicaPolicyConfiguration) conf;
return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(), getScaleDownPolicy(pc.getScaleDownConfiguration()), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait()); return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(), getScaleDownPolicy(pc.getScaleDownConfiguration()), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait(), pc.getQuorumVoteWait());
} }
case SHARED_STORE_MASTER: { case SHARED_STORE_MASTER: {
SharedStoreMasterPolicyConfiguration pc = (SharedStoreMasterPolicyConfiguration) conf; SharedStoreMasterPolicyConfiguration pc = (SharedStoreMasterPolicyConfiguration) conf;
@ -93,7 +94,7 @@ public final class ConfigurationUtils {
HAPolicy livePolicy; HAPolicy livePolicy;
//if null default to colocated //if null default to colocated
if (liveConf == null) { if (liveConf == null) {
livePolicy = new ReplicatedPolicy(server.getNetworkHealthCheck()); livePolicy = new ReplicatedPolicy(server.getNetworkHealthCheck(),ActiveMQDefaultConfiguration.getDefaultQuorumVoteWait());
} else { } else {
livePolicy = getHAPolicy(liveConf, server); livePolicy = getHAPolicy(liveConf, server);
} }
@ -101,7 +102,7 @@ public final class ConfigurationUtils {
BackupPolicy backupPolicy; BackupPolicy backupPolicy;
if (backupConf == null) { if (backupConf == null) {
if (livePolicy instanceof ReplicatedPolicy) { if (livePolicy instanceof ReplicatedPolicy) {
backupPolicy = new ReplicaPolicy(server.getNetworkHealthCheck()); backupPolicy = new ReplicaPolicy(server.getNetworkHealthCheck(),ActiveMQDefaultConfiguration.getDefaultQuorumVoteWait());
} else if (livePolicy instanceof SharedStoreMasterPolicy) { } else if (livePolicy instanceof SharedStoreMasterPolicy) {
backupPolicy = new SharedStoreSlavePolicy(); backupPolicy = new SharedStoreSlavePolicy();
} else { } else {

View File

@ -47,7 +47,10 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration {
private long voteRetryWait = ActiveMQDefaultConfiguration.getDefaultVoteRetryWait(); private long voteRetryWait = ActiveMQDefaultConfiguration.getDefaultVoteRetryWait();
public ReplicaPolicyConfiguration() { private final int quorumVoteWait;
public ReplicaPolicyConfiguration(int quorumVoteWait) {
this.quorumVoteWait = quorumVoteWait;
} }
@Override @Override
@ -159,4 +162,8 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration {
public long getVoteRetryWait() { public long getVoteRetryWait() {
return voteRetryWait; return voteRetryWait;
} }
public int getQuorumVoteWait() {
return quorumVoteWait;
}
} }

View File

@ -37,7 +37,10 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration {
private long voteRetryWait = ActiveMQDefaultConfiguration.getDefaultVoteRetryWait(); private long voteRetryWait = ActiveMQDefaultConfiguration.getDefaultVoteRetryWait();
public ReplicatedPolicyConfiguration() { private final int quorumVoteWait;
public ReplicatedPolicyConfiguration(int quorumVoteWait) {
this.quorumVoteWait = quorumVoteWait;
} }
@Override @Override
@ -112,4 +115,8 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration {
public long getVoteRetryWait() { public long getVoteRetryWait() {
return voteRetryWait; return voteRetryWait;
} }
public int getQuorumVoteWait() {
return quorumVoteWait;
}
} }

View File

@ -1303,7 +1303,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
} }
private ReplicatedPolicyConfiguration createReplicatedHaPolicy(Element policyNode) { private ReplicatedPolicyConfiguration createReplicatedHaPolicy(Element policyNode) {
ReplicatedPolicyConfiguration configuration = new ReplicatedPolicyConfiguration(); ReplicatedPolicyConfiguration configuration = new ReplicatedPolicyConfiguration(getInteger(policyNode, "quorum-vote-wait", ActiveMQDefaultConfiguration.getDefaultQuorumVoteWait(), Validators.GT_ZERO));
configuration.setCheckForLiveServer(getBoolean(policyNode, "check-for-live-server", configuration.isCheckForLiveServer())); configuration.setCheckForLiveServer(getBoolean(policyNode, "check-for-live-server", configuration.isCheckForLiveServer()));
@ -1325,7 +1325,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
} }
private ReplicaPolicyConfiguration createReplicaHaPolicy(Element policyNode) { private ReplicaPolicyConfiguration createReplicaHaPolicy(Element policyNode) {
ReplicaPolicyConfiguration configuration = new ReplicaPolicyConfiguration(); ReplicaPolicyConfiguration configuration = new ReplicaPolicyConfiguration(getInteger(policyNode, "quorum-vote-wait", ActiveMQDefaultConfiguration.getDefaultQuorumVoteWait(), Validators.GT_ZERO));
configuration.setRestartBackup(getBoolean(policyNode, "restart-backup", configuration.isRestartBackup())); configuration.setRestartBackup(getBoolean(policyNode, "restart-backup", configuration.isRestartBackup()));

View File

@ -1942,4 +1942,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.ERROR) @LogMessage(level = Logger.Level.ERROR)
@Message(id = 224093, value = "Reference to message is null", format = Message.Format.MESSAGE_FORMAT) @Message(id = 224093, value = "Reference to message is null", format = Message.Format.MESSAGE_FORMAT)
void nullRefMessage(); void nullRefMessage();
@LogMessage(level = Logger.Level.TRACE)
@Message(id = 224094, value = "Quorum vote result await is interrupted", format = Message.Format.MESSAGE_FORMAT)
void quorumVoteAwaitInterrupted();
} }

View File

@ -57,14 +57,19 @@ public class ReplicaPolicy extends BackupPolicy {
private long voteRetryWait; private long voteRetryWait;
public ReplicaPolicy(final NetworkHealthCheck networkHealthCheck) { private final int quorumVoteWait;
public ReplicaPolicy(final NetworkHealthCheck networkHealthCheck, int quorumVoteWait) {
this.networkHealthCheck = networkHealthCheck; this.networkHealthCheck = networkHealthCheck;
this.quorumVoteWait = quorumVoteWait;
} }
public ReplicaPolicy(final NetworkHealthCheck networkHealthCheck, public ReplicaPolicy(final NetworkHealthCheck networkHealthCheck,
ReplicatedPolicy replicatedPolicy) { ReplicatedPolicy replicatedPolicy,
int quorumVoteWait) {
this.networkHealthCheck = networkHealthCheck; this.networkHealthCheck = networkHealthCheck;
this.replicatedPolicy = replicatedPolicy; this.replicatedPolicy = replicatedPolicy;
this.quorumVoteWait = quorumVoteWait;
} }
public ReplicaPolicy(String clusterName, public ReplicaPolicy(String clusterName,
@ -78,7 +83,8 @@ public class ReplicaPolicy extends BackupPolicy {
boolean voteOnReplicationFailure, boolean voteOnReplicationFailure,
int quorumSize, int quorumSize,
int voteRetries, int voteRetries,
long voteRetryWait) { long voteRetryWait,
int quorumVoteWait) {
this.clusterName = clusterName; this.clusterName = clusterName;
this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize; this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize;
this.groupName = groupName; this.groupName = groupName;
@ -91,18 +97,21 @@ public class ReplicaPolicy extends BackupPolicy {
this.scaleDownPolicy = scaleDownPolicy; this.scaleDownPolicy = scaleDownPolicy;
this.networkHealthCheck = networkHealthCheck; this.networkHealthCheck = networkHealthCheck;
this.voteOnReplicationFailure = voteOnReplicationFailure; this.voteOnReplicationFailure = voteOnReplicationFailure;
this.quorumVoteWait = quorumVoteWait;
} }
public ReplicaPolicy(String clusterName, public ReplicaPolicy(String clusterName,
int maxSavedReplicatedJournalsSize, int maxSavedReplicatedJournalsSize,
String groupName, String groupName,
ReplicatedPolicy replicatedPolicy, ReplicatedPolicy replicatedPolicy,
NetworkHealthCheck networkHealthCheck) { NetworkHealthCheck networkHealthCheck,
int quorumVoteWait) {
this.clusterName = clusterName; this.clusterName = clusterName;
this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize; this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize;
this.groupName = groupName; this.groupName = groupName;
this.replicatedPolicy = replicatedPolicy; this.replicatedPolicy = replicatedPolicy;
this.networkHealthCheck = networkHealthCheck; this.networkHealthCheck = networkHealthCheck;
this.quorumVoteWait = quorumVoteWait;
} }
public String getClusterName() { public String getClusterName() {
@ -123,7 +132,7 @@ public class ReplicaPolicy extends BackupPolicy {
public ReplicatedPolicy getReplicatedPolicy() { public ReplicatedPolicy getReplicatedPolicy() {
if (replicatedPolicy == null) { if (replicatedPolicy == null) {
replicatedPolicy = new ReplicatedPolicy(false, allowFailback, initialReplicationSyncTimeout, groupName, clusterName, this, networkHealthCheck, voteOnReplicationFailure, quorumSize, voteRetries, voteRetryWait); replicatedPolicy = new ReplicatedPolicy(false, allowFailback, initialReplicationSyncTimeout, groupName, clusterName, this, networkHealthCheck, voteOnReplicationFailure, quorumSize, voteRetries, voteRetryWait, quorumVoteWait);
} }
return replicatedPolicy; return replicatedPolicy;
} }
@ -234,4 +243,8 @@ public class ReplicaPolicy extends BackupPolicy {
public long getVoteRetryWait() { public long getVoteRetryWait() {
return voteRetryWait; return voteRetryWait;
} }
public int getQuorumVoteWait() {
return quorumVoteWait;
}
} }

View File

@ -61,9 +61,12 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
private final NetworkHealthCheck networkHealthCheck; private final NetworkHealthCheck networkHealthCheck;
public ReplicatedPolicy(NetworkHealthCheck networkHealthCheck) { private final int quorumVoteWait;
replicaPolicy = new ReplicaPolicy(networkHealthCheck, this);
public ReplicatedPolicy(NetworkHealthCheck networkHealthCheck, int quorumVoteWait) {
replicaPolicy = new ReplicaPolicy(networkHealthCheck, this, quorumVoteWait);
this.networkHealthCheck = networkHealthCheck; this.networkHealthCheck = networkHealthCheck;
this.quorumVoteWait = quorumVoteWait;
} }
public ReplicatedPolicy(boolean checkForLiveServer, public ReplicatedPolicy(boolean checkForLiveServer,
@ -74,7 +77,8 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
boolean voteOnReplicationFailure, boolean voteOnReplicationFailure,
int quorumSize, int quorumSize,
int voteRetries, int voteRetries,
long voteRetryWait) { long voteRetryWait,
int quorumVoteWait) {
this.checkForLiveServer = checkForLiveServer; this.checkForLiveServer = checkForLiveServer;
this.groupName = groupName; this.groupName = groupName;
this.clusterName = clusterName; this.clusterName = clusterName;
@ -84,6 +88,7 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
this.quorumSize = quorumSize; this.quorumSize = quorumSize;
this.voteRetries = voteRetries; this.voteRetries = voteRetries;
this.voteRetryWait = voteRetryWait; this.voteRetryWait = voteRetryWait;
this.quorumVoteWait = quorumVoteWait;
} }
public ReplicatedPolicy(boolean checkForLiveServer, public ReplicatedPolicy(boolean checkForLiveServer,
@ -96,7 +101,8 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
boolean voteOnReplicationFailure, boolean voteOnReplicationFailure,
int quorumSize, int quorumSize,
int voteRetries, int voteRetries,
long voteRetryWait) { long voteRetryWait,
int quorumVoteWait) {
this.checkForLiveServer = checkForLiveServer; this.checkForLiveServer = checkForLiveServer;
this.clusterName = clusterName; this.clusterName = clusterName;
this.groupName = groupName; this.groupName = groupName;
@ -106,6 +112,7 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
this.networkHealthCheck = networkHealthCheck; this.networkHealthCheck = networkHealthCheck;
this.voteOnReplicationFailure = voteOnReplicationFailure; this.voteOnReplicationFailure = voteOnReplicationFailure;
this.quorumSize = quorumSize; this.quorumSize = quorumSize;
this.quorumVoteWait = quorumVoteWait;
} }
public boolean isCheckForLiveServer() { public boolean isCheckForLiveServer() {
@ -147,7 +154,7 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
public ReplicaPolicy getReplicaPolicy() { public ReplicaPolicy getReplicaPolicy() {
if (replicaPolicy == null) { if (replicaPolicy == null) {
replicaPolicy = new ReplicaPolicy(networkHealthCheck, this); replicaPolicy = new ReplicaPolicy(networkHealthCheck, this, quorumVoteWait);
replicaPolicy.setQuorumSize(quorumSize); replicaPolicy.setQuorumSize(quorumSize);
replicaPolicy.setVoteOnReplicationFailure(voteOnReplicationFailure); replicaPolicy.setVoteOnReplicationFailure(voteOnReplicationFailure);
replicaPolicy.setVoteRetries(voteRetries); replicaPolicy.setVoteRetries(voteRetries);
@ -230,4 +237,8 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
public void setQuorumSize(int quorumSize) { public void setQuorumSize(int quorumSize) {
this.quorumSize = quorumSize; this.quorumSize = quorumSize;
} }
public int getQuorumVoteWait() {
return quorumVoteWait;
}
} }

View File

@ -66,6 +66,7 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
private volatile boolean stopped = false; private volatile boolean stopped = false;
private final int quorumVoteWait;
/** /**
* This is a safety net in case the live sends the first {@link ReplicationLiveIsStoppingMessage} * This is a safety net in case the live sends the first {@link ReplicationLiveIsStoppingMessage}
* with code {@link org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage.LiveStopping#STOP_CALLED} and crashes before sending the second with * with code {@link org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage.LiveStopping#STOP_CALLED} and crashes before sending the second with
@ -81,7 +82,8 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
NetworkHealthCheck networkHealthCheck, NetworkHealthCheck networkHealthCheck,
int quorumSize, int quorumSize,
int voteRetries, int voteRetries,
long voteRetryWait) { long voteRetryWait,
int quorumVoteWait) {
this.storageManager = storageManager; this.storageManager = storageManager;
this.scheduledPool = scheduledPool; this.scheduledPool = scheduledPool;
this.quorumSize = quorumSize; this.quorumSize = quorumSize;
@ -90,6 +92,7 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
this.networkHealthCheck = networkHealthCheck; this.networkHealthCheck = networkHealthCheck;
this.voteRetries = voteRetries; this.voteRetries = voteRetries;
this.voteRetryWait = voteRetryWait; this.voteRetryWait = voteRetryWait;
this.quorumVoteWait = quorumVoteWait;
} }
private volatile BACKUP_ACTIVATION signal; private volatile BACKUP_ACTIVATION signal;
@ -297,9 +300,10 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
quorumManager.vote(quorumVote); quorumManager.vote(quorumVote);
try { try {
quorumVote.await(LATCH_TIMEOUT, TimeUnit.SECONDS); quorumVote.await(quorumVoteWait, TimeUnit.SECONDS);
} catch (InterruptedException interruption) { } catch (InterruptedException interruption) {
// No-op. The best the quorum can do now is to return the latest number it has // No-op. The best the quorum can do now is to return the latest number it has
ActiveMQServerLogger.LOGGER.quorumVoteAwaitInterrupted();
} }
quorumManager.voteComplete(quorumVote); quorumManager.voteComplete(quorumVote);

View File

@ -131,7 +131,7 @@ public final class SharedNothingBackupActivation extends Activation {
logger.trace("Entered a synchronized"); logger.trace("Entered a synchronized");
if (closed) if (closed)
return; return;
backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize(), replicaPolicy.getVoteRetries(), replicaPolicy.getVoteRetryWait()); backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize(), replicaPolicy.getVoteRetries(), replicaPolicy.getVoteRetryWait(), replicaPolicy.getQuorumVoteWait());
activeMQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum); activeMQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum);
activeMQServer.getClusterManager().getQuorumManager().registerQuorumHandler(new ServerConnectVoteHandler(activeMQServer)); activeMQServer.getClusterManager().getQuorumManager().registerQuorumHandler(new ServerConnectVoteHandler(activeMQServer));
} }

View File

@ -2260,6 +2260,13 @@
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="quorum-vote-wait" type="xsd:integer" default="30" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
How long to wait (in seconds) for vote results
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all> </xsd:all>
<xsd:attributeGroup ref="xml:specialAttrs"/> <xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType> </xsd:complexType>
@ -2363,6 +2370,13 @@
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="quorum-vote-wait" type="xsd:integer" default="30" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
How long to wait (in seconds) for vote results
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all> </xsd:all>
<xsd:attributeGroup ref="xml:specialAttrs"/> <xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType> </xsd:complexType>

View File

@ -8,7 +8,20 @@ from that will help mitigate this problem
Quorum voting is used by both the live and the backup to decide what to do if a replication connection is disconnected. Quorum voting is used by both the live and the backup to decide what to do if a replication connection is disconnected.
Basically the server will request each live server in the cluster to vote as to whether it thinks the server it is replicating Basically the server will request each live server in the cluster to vote as to whether it thinks the server it is replicating
to or from is still alive. This being the case the minimum number of live/backup pairs needed is 3. If less than 3 pairs to or from is still alive. You can also configure the time for which the quorum manager will wait for the quorum vote response.
The default time is 30 sec you can configure like so for master and also for the slave:
```xml
<ha-policy>
<replication>
<master>
<quorum-vote-wait>12</quorum-vote-wait>
</master>
</replication>
</ha-policy>
```
This being the case the minimum number of live/backup pairs needed is 3. If less than 3 pairs
are used then the only option is to use a Network Pinger which is explained later in this chapter or choose how you want each server to are used then the only option is to use a Network Pinger which is explained later in this chapter or choose how you want each server to
react which the following details: react which the following details:

View File

@ -65,6 +65,7 @@ import org.junit.Before;
public abstract class ClusteredBridgeTestBase extends ActiveMQTestBase { public abstract class ClusteredBridgeTestBase extends ActiveMQTestBase {
private static int index = 0; private static int index = 0;
private static final int QUORUM_VOTE_WAIT_TIME_SEC = 30;
protected Map<String, ServerGroup> groups = new HashMap<>(); protected Map<String, ServerGroup> groups = new HashMap<>();
@ -146,7 +147,7 @@ public abstract class ClusteredBridgeTestBase extends ActiveMQTestBase {
backupConnector = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params, "in-vm-backup"); backupConnector = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params, "in-vm-backup");
//live //live
Configuration conf0 = createBasicConfig().setJournalDirectory(getJournalDir(id, false)).setBindingsDirectory(getBindingsDir(id, false)).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params0)).addConnectorConfiguration(liveConnector.getName(), liveConnector).setHAPolicyConfiguration(new ReplicatedPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName())); Configuration conf0 = createBasicConfig().setJournalDirectory(getJournalDir(id, false)).setBindingsDirectory(getBindingsDir(id, false)).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params0)).addConnectorConfiguration(liveConnector.getName(), liveConnector).setHAPolicyConfiguration(new ReplicatedPolicyConfiguration(QUORUM_VOTE_WAIT_TIME_SEC)).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName()));
ActiveMQServer server0 = addServer(ActiveMQServers.newActiveMQServer(conf0, true)); ActiveMQServer server0 = addServer(ActiveMQServers.newActiveMQServer(conf0, true));
@ -155,7 +156,7 @@ public abstract class ClusteredBridgeTestBase extends ActiveMQTestBase {
liveNode.setRegistry(new JndiBindingRegistry(liveContext)); liveNode.setRegistry(new JndiBindingRegistry(liveContext));
//backup //backup
Configuration config = createBasicConfig().setJournalDirectory(getJournalDir(id, true)).setBindingsDirectory(getBindingsDir(id, true)).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params)).addConnectorConfiguration(backupConnector.getName(), backupConnector).addConnectorConfiguration(liveConnector.getName(), liveConnector).setHAPolicyConfiguration(new ReplicaPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName())); Configuration config = createBasicConfig().setJournalDirectory(getJournalDir(id, true)).setBindingsDirectory(getBindingsDir(id, true)).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params)).addConnectorConfiguration(backupConnector.getName(), backupConnector).addConnectorConfiguration(liveConnector.getName(), liveConnector).setHAPolicyConfiguration(new ReplicaPolicyConfiguration(QUORUM_VOTE_WAIT_TIME_SEC)).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName()));
ActiveMQServer backup = addServer(ActiveMQServers.newActiveMQServer(config, true)); ActiveMQServer backup = addServer(ActiveMQServers.newActiveMQServer(config, true));

View File

@ -84,6 +84,7 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager; import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.HAConfigUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -1483,7 +1484,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
if (sharedStorage) if (sharedStorage)
haPolicyConfiguration = new SharedStoreMasterPolicyConfiguration(); haPolicyConfiguration = new SharedStoreMasterPolicyConfiguration();
else else
haPolicyConfiguration = new ReplicatedPolicyConfiguration(); haPolicyConfiguration = new ReplicatedPolicyConfiguration(HAConfigUtils.QUORUM_VOTE_WAIT_TIME_SEC);
} }
Configuration configuration = createBasicConfig(node).setJournalMaxIO_AIO(1000).setThreadPoolMaxSize(10).clearAcceptorConfigurations().addAcceptorConfiguration(createTransportConfiguration(netty, true, generateParams(node, netty))).setHAPolicyConfiguration(haPolicyConfiguration).setResolveProtocols(isResolveProtocols()); Configuration configuration = createBasicConfig(node).setJournalMaxIO_AIO(1000).setThreadPoolMaxSize(10).clearAcceptorConfigurations().addAcceptorConfiguration(createTransportConfiguration(netty, true, generateParams(node, netty))).setHAPolicyConfiguration(haPolicyConfiguration).setResolveProtocols(isResolveProtocols());
@ -1538,7 +1539,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
TransportConfiguration backupConfig = createTransportConfiguration(netty, false, generateParams(node, netty)); TransportConfiguration backupConfig = createTransportConfiguration(netty, false, generateParams(node, netty));
TransportConfiguration acceptorConfig = createTransportConfiguration(netty, true, generateParams(node, netty)); TransportConfiguration acceptorConfig = createTransportConfiguration(netty, true, generateParams(node, netty));
Configuration configuration = createBasicConfig(sharedStorage ? liveNode : node).clearAcceptorConfigurations().addAcceptorConfiguration(acceptorConfig).addConnectorConfiguration(liveConfig.getName(), liveConfig).addConnectorConfiguration(backupConfig.getName(), backupConfig).setHAPolicyConfiguration(sharedStorage ? new SharedStoreSlavePolicyConfiguration() : new ReplicaPolicyConfiguration()); Configuration configuration = createBasicConfig(sharedStorage ? liveNode : node).clearAcceptorConfigurations().addAcceptorConfiguration(acceptorConfig).addConnectorConfiguration(liveConfig.getName(), liveConfig).addConnectorConfiguration(backupConfig.getName(), backupConfig).setHAPolicyConfiguration(sharedStorage ? new SharedStoreSlavePolicyConfiguration() : new ReplicaPolicyConfiguration(HAConfigUtils.QUORUM_VOTE_WAIT_TIME_SEC));
ActiveMQServer server; ActiveMQServer server;
@ -1575,7 +1576,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration().setName("dg1").setRefreshTimeout(1000).setDiscoveryInitialWaitTimeout(1000).setBroadcastEndpointFactory(endpoint); DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration().setName("dg1").setRefreshTimeout(1000).setDiscoveryInitialWaitTimeout(1000).setBroadcastEndpointFactory(endpoint);
Configuration configuration = createBasicConfig(node).setJournalMaxIO_AIO(1000).clearAcceptorConfigurations().addAcceptorConfiguration(createTransportConfiguration(netty, true, params)).addConnectorConfiguration(connector.getName(), connector).addBroadcastGroupConfiguration(bcConfig).addDiscoveryGroupConfiguration(dcConfig.getName(), dcConfig).setHAPolicyConfiguration(sharedStorage ? new SharedStoreMasterPolicyConfiguration() : new ReplicatedPolicyConfiguration()); Configuration configuration = createBasicConfig(node).setJournalMaxIO_AIO(1000).clearAcceptorConfigurations().addAcceptorConfiguration(createTransportConfiguration(netty, true, params)).addConnectorConfiguration(connector.getName(), connector).addBroadcastGroupConfiguration(bcConfig).addDiscoveryGroupConfiguration(dcConfig.getName(), dcConfig).setHAPolicyConfiguration(sharedStorage ? new SharedStoreMasterPolicyConfiguration() : new ReplicatedPolicyConfiguration(HAConfigUtils.QUORUM_VOTE_WAIT_TIME_SEC));
ActiveMQServer server; ActiveMQServer server;
if (fileStorage) { if (fileStorage) {
@ -1620,7 +1621,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration().setName("dg1").setRefreshTimeout(5000).setDiscoveryInitialWaitTimeout(5000).setBroadcastEndpointFactory(endpoint); DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration().setName("dg1").setRefreshTimeout(5000).setDiscoveryInitialWaitTimeout(5000).setBroadcastEndpointFactory(endpoint);
Configuration configuration = createBasicConfig(sharedStorage ? liveNode : node).clearAcceptorConfigurations().addAcceptorConfiguration(createTransportConfiguration(netty, true, params)).addConnectorConfiguration(connector.getName(), connector).addBroadcastGroupConfiguration(bcConfig).addDiscoveryGroupConfiguration(dcConfig.getName(), dcConfig).setHAPolicyConfiguration(sharedStorage ? new SharedStoreSlavePolicyConfiguration() : new ReplicatedPolicyConfiguration()); Configuration configuration = createBasicConfig(sharedStorage ? liveNode : node).clearAcceptorConfigurations().addAcceptorConfiguration(createTransportConfiguration(netty, true, params)).addConnectorConfiguration(connector.getName(), connector).addBroadcastGroupConfiguration(bcConfig).addDiscoveryGroupConfiguration(dcConfig.getName(), dcConfig).setHAPolicyConfiguration(sharedStorage ? new SharedStoreSlavePolicyConfiguration() : new ReplicatedPolicyConfiguration(HAConfigUtils.QUORUM_VOTE_WAIT_TIME_SEC));
ActiveMQServer server; ActiveMQServer server;
if (sharedStorage) { if (sharedStorage) {

View File

@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfigur
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.HAConfigUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -311,8 +312,8 @@ public class AutomaticColocatedQuorumVoteTest extends ActiveMQTestBase {
sssc.setScaleDownConfiguration(new ScaleDownConfiguration()); sssc.setScaleDownConfiguration(new ScaleDownConfiguration());
} }
} else { } else {
ReplicatedPolicyConfiguration rpc = new ReplicatedPolicyConfiguration(); ReplicatedPolicyConfiguration rpc = new ReplicatedPolicyConfiguration(HAConfigUtils.QUORUM_VOTE_WAIT_TIME_SEC);
ReplicaPolicyConfiguration rpc2 = new ReplicaPolicyConfiguration(); ReplicaPolicyConfiguration rpc2 = new ReplicaPolicyConfiguration(HAConfigUtils.QUORUM_VOTE_WAIT_TIME_SEC);
haPolicy.setLiveConfig(rpc); haPolicy.setLiveConfig(rpc);
haPolicy.setBackupConfig(rpc2); haPolicy.setBackupConfig(rpc2);
if (scaleDown) { if (scaleDown) {

View File

@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager; import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer; import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer;
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer; import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
import org.apache.activemq.artemis.tests.util.HAConfigUtils;
import org.junit.Test; import org.junit.Test;
/** /**
@ -125,7 +126,7 @@ public class MultipleLivesMultipleBackupsFailoverTest extends MultipleBackupsFai
boolean createClusterConnections, boolean createClusterConnections,
int[] otherBackupNodes, int[] otherBackupNodes,
int... otherClusterNodes) throws Exception { int... otherClusterNodes) throws Exception {
Configuration config1 = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(createTransportConfiguration(isNetty(), true, generateParams(nodeid, isNetty()))).setHAPolicyConfiguration(sharedStore ? new SharedStoreSlavePolicyConfiguration() : new ReplicaPolicyConfiguration()).setBindingsDirectory(getBindingsDir() + "_" + liveNode).setJournalDirectory(getJournalDir() + "_" + liveNode).setPagingDirectory(getPageDir() + "_" + liveNode).setLargeMessagesDirectory(getLargeMessagesDir() + "_" + liveNode); Configuration config1 = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(createTransportConfiguration(isNetty(), true, generateParams(nodeid, isNetty()))).setHAPolicyConfiguration(sharedStore ? new SharedStoreSlavePolicyConfiguration() : new ReplicaPolicyConfiguration(HAConfigUtils.QUORUM_VOTE_WAIT_TIME_SEC)).setBindingsDirectory(getBindingsDir() + "_" + liveNode).setJournalDirectory(getJournalDir() + "_" + liveNode).setPagingDirectory(getPageDir() + "_" + liveNode).setLargeMessagesDirectory(getLargeMessagesDir() + "_" + liveNode);
for (int node : otherBackupNodes) { for (int node : otherBackupNodes) {
TransportConfiguration liveConnector = createTransportConfiguration(isNetty(), false, generateParams(node, isNetty())); TransportConfiguration liveConnector = createTransportConfiguration(isNetty(), false, generateParams(node, isNetty()));
@ -149,7 +150,7 @@ public class MultipleLivesMultipleBackupsFailoverTest extends MultipleBackupsFai
protected void createLiveConfig(NodeManager nodeManager, int liveNode, int... otherLiveNodes) throws Exception { protected void createLiveConfig(NodeManager nodeManager, int liveNode, int... otherLiveNodes) throws Exception {
TransportConfiguration liveConnector = createTransportConfiguration(isNetty(), false, generateParams(liveNode, isNetty())); TransportConfiguration liveConnector = createTransportConfiguration(isNetty(), false, generateParams(liveNode, isNetty()));
Configuration config0 = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(createTransportConfiguration(isNetty(), true, generateParams(liveNode, isNetty()))).setHAPolicyConfiguration(sharedStore ? new SharedStoreMasterPolicyConfiguration() : new ReplicatedPolicyConfiguration()).setBindingsDirectory(getBindingsDir() + "_" + liveNode).setJournalDirectory(getJournalDir() + "_" + liveNode).setPagingDirectory(getPageDir() + "_" + liveNode).setLargeMessagesDirectory(getLargeMessagesDir() + "_" + liveNode).addConnectorConfiguration(liveConnector.getName(), liveConnector); Configuration config0 = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(createTransportConfiguration(isNetty(), true, generateParams(liveNode, isNetty()))).setHAPolicyConfiguration(sharedStore ? new SharedStoreMasterPolicyConfiguration() : new ReplicatedPolicyConfiguration(HAConfigUtils.QUORUM_VOTE_WAIT_TIME_SEC)).setBindingsDirectory(getBindingsDir() + "_" + liveNode).setJournalDirectory(getJournalDir() + "_" + liveNode).setPagingDirectory(getPageDir() + "_" + liveNode).setLargeMessagesDirectory(getLargeMessagesDir() + "_" + liveNode).addConnectorConfiguration(liveConnector.getName(), liveConnector);
String[] pairs = new String[otherLiveNodes.length]; String[] pairs = new String[otherLiveNodes.length];
for (int i = 0; i < otherLiveNodes.length; i++) { for (int i = 0; i < otherLiveNodes.length; i++) {

View File

@ -39,6 +39,7 @@ import org.apache.activemq.artemis.junit.Wait;
import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer; import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer;
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer; import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.HAConfigUtils;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils; import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.junit.Before; import org.junit.Before;
@ -86,7 +87,7 @@ public abstract class MultipleServerFailoverTestBase extends ActiveMQTestBase {
if (isSharedStore()) { if (isSharedStore()) {
haPolicyConfiguration = new SharedStoreMasterPolicyConfiguration(); haPolicyConfiguration = new SharedStoreMasterPolicyConfiguration();
} else { } else {
haPolicyConfiguration = new ReplicatedPolicyConfiguration(); haPolicyConfiguration = new ReplicatedPolicyConfiguration(HAConfigUtils.QUORUM_VOTE_WAIT_TIME_SEC);
if (getNodeGroupName() != null) { if (getNodeGroupName() != null) {
((ReplicatedPolicyConfiguration) haPolicyConfiguration).setGroupName(getNodeGroupName() + "-" + i); ((ReplicatedPolicyConfiguration) haPolicyConfiguration).setGroupName(getNodeGroupName() + "-" + i);
} }
@ -129,7 +130,7 @@ public abstract class MultipleServerFailoverTestBase extends ActiveMQTestBase {
if (isSharedStore()) { if (isSharedStore()) {
haPolicyConfiguration = new SharedStoreSlavePolicyConfiguration(); haPolicyConfiguration = new SharedStoreSlavePolicyConfiguration();
} else { } else {
haPolicyConfiguration = new ReplicaPolicyConfiguration(); haPolicyConfiguration = new ReplicaPolicyConfiguration(HAConfigUtils.QUORUM_VOTE_WAIT_TIME_SEC);
if (getNodeGroupName() != null) { if (getNodeGroupName() != null) {
((ReplicaPolicyConfiguration) haPolicyConfiguration).setGroupName(getNodeGroupName() + "-" + i); ((ReplicaPolicyConfiguration) haPolicyConfiguration).setGroupName(getNodeGroupName() + "-" + i);
} }

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster.failover;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy;
import org.apache.activemq.artemis.tests.util.HAConfigUtils;
import org.junit.Test;
public class QuorumResultWaitTest extends StaticClusterWithBackupFailoverTest {
public static final int QUORUM_VOTE_WAIT_CONFIGURED_TIME_SEC = 12;
@Override
protected void setupServers() throws Exception {
super.setupServers();
//we need to know who is connected to who
((ReplicatedPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration()).setGroupName("group0");
((ReplicatedPolicyConfiguration) servers[1].getConfiguration().getHAPolicyConfiguration()).setGroupName("group1");
((ReplicatedPolicyConfiguration) servers[2].getConfiguration().getHAPolicyConfiguration()).setGroupName("group2");
((ReplicaPolicyConfiguration) servers[4].getConfiguration().getHAPolicyConfiguration()).setGroupName("group1");
((ReplicaPolicyConfiguration) servers[5].getConfiguration().getHAPolicyConfiguration()).setGroupName("group2");
ReplicatedPolicyConfiguration replicatedPolicyConf = new ReplicatedPolicyConfiguration(QUORUM_VOTE_WAIT_CONFIGURED_TIME_SEC);
replicatedPolicyConf.setGroupName("group0");
replicatedPolicyConf.setVoteRetries(5);
replicatedPolicyConf.setVoteRetryWait(100);
servers[3].getConfiguration().setHAPolicyConfiguration(replicatedPolicyConf);
}
@Test
public void testQuorumVotingResultWait() throws Exception {
setupCluster();
startServers(0, 1, 2);
startServers(3, 4, 5);
//Assert if the default time 30 sec is used
assertEquals(HAConfigUtils.QUORUM_VOTE_WAIT_TIME_SEC, ((ReplicatedPolicy)(servers[0].getHAPolicy())).getQuorumVoteWait());
//Assert if the configured time is used.
assertEquals(QUORUM_VOTE_WAIT_CONFIGURED_TIME_SEC, ((ReplicatedPolicy)(servers[3].getHAPolicy())).getQuorumVoteWait());
}
@Override
protected boolean isSharedStorage() {
return false;
}
}

View File

@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer; import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer;
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer; import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
import org.apache.activemq.artemis.tests.util.HAConfigUtils;
import org.junit.Test; import org.junit.Test;
/** /**
@ -125,7 +126,7 @@ public class SingleLiveMultipleBackupsFailoverTest extends MultipleBackupsFailov
protected void createBackupConfig(int liveNode, int nodeid, int... nodes) throws Exception { protected void createBackupConfig(int liveNode, int nodeid, int... nodes) throws Exception {
TransportConfiguration backupConnector = createTransportConfiguration(isNetty(), false, generateParams(nodeid, isNetty())); TransportConfiguration backupConnector = createTransportConfiguration(isNetty(), false, generateParams(nodeid, isNetty()));
Configuration config1 = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(createTransportConfiguration(isNetty(), true, generateParams(nodeid, isNetty()))).setHAPolicyConfiguration(sharedStore ? new SharedStoreSlavePolicyConfiguration() : new ReplicatedPolicyConfiguration()).addConnectorConfiguration(backupConnector.getName(), backupConnector).setBindingsDirectory(getBindingsDir() + "_" + liveNode).setJournalDirectory(getJournalDir() + "_" + liveNode).setPagingDirectory(getPageDir() + "_" + liveNode).setLargeMessagesDirectory(getLargeMessagesDir() + "_" + liveNode); Configuration config1 = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(createTransportConfiguration(isNetty(), true, generateParams(nodeid, isNetty()))).setHAPolicyConfiguration(sharedStore ? new SharedStoreSlavePolicyConfiguration() : new ReplicatedPolicyConfiguration(HAConfigUtils.QUORUM_VOTE_WAIT_TIME_SEC)).addConnectorConfiguration(backupConnector.getName(), backupConnector).setBindingsDirectory(getBindingsDir() + "_" + liveNode).setJournalDirectory(getJournalDir() + "_" + liveNode).setPagingDirectory(getPageDir() + "_" + liveNode).setLargeMessagesDirectory(getLargeMessagesDir() + "_" + liveNode);
String[] staticConnectors = new String[nodes.length]; String[] staticConnectors = new String[nodes.length];
for (int i = 0; i < nodes.length; i++) { for (int i = 0; i < nodes.length; i++) {
@ -141,7 +142,7 @@ public class SingleLiveMultipleBackupsFailoverTest extends MultipleBackupsFailov
protected void createLiveConfig(int liveNode) throws Exception { protected void createLiveConfig(int liveNode) throws Exception {
TransportConfiguration liveConnector = createTransportConfiguration(isNetty(), false, generateParams(liveNode, isNetty())); TransportConfiguration liveConnector = createTransportConfiguration(isNetty(), false, generateParams(liveNode, isNetty()));
Configuration config0 = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(createTransportConfiguration(isNetty(), true, generateParams(liveNode, isNetty()))).setHAPolicyConfiguration(sharedStore ? new SharedStoreMasterPolicyConfiguration() : new ReplicatedPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector).setBindingsDirectory(getBindingsDir() + "_" + liveNode).setJournalDirectory(getJournalDir() + "_" + liveNode).setPagingDirectory(getPageDir() + "_" + liveNode).setLargeMessagesDirectory(getLargeMessagesDir() + "_" + liveNode); Configuration config0 = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(createTransportConfiguration(isNetty(), true, generateParams(liveNode, isNetty()))).setHAPolicyConfiguration(sharedStore ? new SharedStoreMasterPolicyConfiguration() : new ReplicatedPolicyConfiguration(HAConfigUtils.QUORUM_VOTE_WAIT_TIME_SEC)).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector).setBindingsDirectory(getBindingsDir() + "_" + liveNode).setJournalDirectory(getJournalDir() + "_" + liveNode).setPagingDirectory(getPageDir() + "_" + liveNode).setLargeMessagesDirectory(getLargeMessagesDir() + "_" + liveNode);
SameProcessActiveMQServer server = new SameProcessActiveMQServer(createInVMFailoverServer(true, config0, nodeManager, liveNode)); SameProcessActiveMQServer server = new SameProcessActiveMQServer(createInVMFailoverServer(true, config0, nodeManager, liveNode));
addActiveMQComponent(server); addActiveMQComponent(server);

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager; import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.HAConfigUtils;
import org.junit.Before; import org.junit.Before;
public class MultiServerTestBase extends ActiveMQTestBase { public class MultiServerTestBase extends ActiveMQTestBase {
@ -156,7 +157,7 @@ public class MultiServerTestBase extends ActiveMQTestBase {
nodeManager = new InVMNodeManager(false); nodeManager = new InVMNodeManager(false);
} }
Configuration configuration = createBasicConfig(node).setJournalMaxIO_AIO(1000).setThreadPoolMaxSize(10).clearAcceptorConfigurations().addAcceptorConfiguration(serverConfigAcceptor).addConnectorConfiguration("thisConnector", thisConnector).setHAPolicyConfiguration(sharedStorage ? new SharedStoreMasterPolicyConfiguration() : new ReplicatedPolicyConfiguration()); Configuration configuration = createBasicConfig(node).setJournalMaxIO_AIO(1000).setThreadPoolMaxSize(10).clearAcceptorConfigurations().addAcceptorConfiguration(serverConfigAcceptor).addConnectorConfiguration("thisConnector", thisConnector).setHAPolicyConfiguration(sharedStorage ? new SharedStoreMasterPolicyConfiguration() : new ReplicatedPolicyConfiguration(HAConfigUtils.QUORUM_VOTE_WAIT_TIME_SEC));
List<String> targetServersOnConnection = new ArrayList<>(); List<String> targetServersOnConnection = new ArrayList<>();
@ -202,7 +203,7 @@ public class MultiServerTestBase extends ActiveMQTestBase {
TransportConfiguration serverConfigAcceptor = createTransportConfiguration(useNetty(), true, generateParams(node, useNetty())); TransportConfiguration serverConfigAcceptor = createTransportConfiguration(useNetty(), true, generateParams(node, useNetty()));
TransportConfiguration thisConnector = createTransportConfiguration(useNetty(), false, generateParams(node, useNetty())); TransportConfiguration thisConnector = createTransportConfiguration(useNetty(), false, generateParams(node, useNetty()));
Configuration configuration = createBasicConfig(useSharedStorage() ? liveNode : node).clearAcceptorConfigurations().addAcceptorConfiguration(serverConfigAcceptor).addConnectorConfiguration("thisConnector", thisConnector).setHAPolicyConfiguration(useSharedStorage() ? new SharedStoreSlavePolicyConfiguration() : new ReplicaPolicyConfiguration()); Configuration configuration = createBasicConfig(useSharedStorage() ? liveNode : node).clearAcceptorConfigurations().addAcceptorConfiguration(serverConfigAcceptor).addConnectorConfiguration("thisConnector", thisConnector).setHAPolicyConfiguration(useSharedStorage() ? new SharedStoreSlavePolicyConfiguration() : new ReplicaPolicyConfiguration(HAConfigUtils.QUORUM_VOTE_WAIT_TIME_SEC));
List<String> targetServersOnConnection = new ArrayList<>(); List<String> targetServersOnConnection = new ArrayList<>();

View File

@ -62,6 +62,7 @@ import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil; import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
import org.apache.activemq.artemis.tests.unit.util.InVMNamingContext; import org.apache.activemq.artemis.tests.unit.util.InVMNamingContext;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.HAConfigUtils;
import org.apache.activemq.artemis.tests.util.InVMNodeManagerServer; import org.apache.activemq.artemis.tests.util.InVMNodeManagerServer;
import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert; import org.junit.Assert;
@ -465,7 +466,7 @@ public class JMSFailoverTest extends ActiveMQTestBase {
backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1); backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
backupConf = createBasicConfig().addAcceptorConfiguration(backupAcceptortc).addConnectorConfiguration(livetc.getName(), livetc).addConnectorConfiguration(backuptc.getName(), backuptc).setSecurityEnabled(false).setJournalType(getDefaultJournalType()).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, backupParams)).setBindingsDirectory(getBindingsDir()).setJournalMinFiles(2).setJournalDirectory(getJournalDir()).setPagingDirectory(getPageDir()).setLargeMessagesDirectory(getLargeMessagesDir()).setPersistenceEnabled(true).setHAPolicyConfiguration(sharedStore ? new SharedStoreSlavePolicyConfiguration() : new ReplicaPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(backuptc.getName(), livetc.getName())); backupConf = createBasicConfig().addAcceptorConfiguration(backupAcceptortc).addConnectorConfiguration(livetc.getName(), livetc).addConnectorConfiguration(backuptc.getName(), backuptc).setSecurityEnabled(false).setJournalType(getDefaultJournalType()).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, backupParams)).setBindingsDirectory(getBindingsDir()).setJournalMinFiles(2).setJournalDirectory(getJournalDir()).setPagingDirectory(getPageDir()).setLargeMessagesDirectory(getLargeMessagesDir()).setPersistenceEnabled(true).setHAPolicyConfiguration(sharedStore ? new SharedStoreSlavePolicyConfiguration() : new ReplicaPolicyConfiguration(HAConfigUtils.QUORUM_VOTE_WAIT_TIME_SEC)).addClusterConfiguration(basicClusterConnectionConfig(backuptc.getName(), livetc.getName()));
backupServer = addServer(new InVMNodeManagerServer(backupConf, nodeManager)); backupServer = addServer(new InVMNodeManagerServer(backupConf, nodeManager));
@ -477,7 +478,7 @@ public class JMSFailoverTest extends ActiveMQTestBase {
log.info("Starting backup"); log.info("Starting backup");
backupJMSServer.start(); backupJMSServer.start();
liveConf = createBasicConfig().setJournalDirectory(getJournalDir()).setBindingsDirectory(getBindingsDir()).setSecurityEnabled(false).addAcceptorConfiguration(liveAcceptortc).setJournalType(getDefaultJournalType()).setBindingsDirectory(getBindingsDir()).setJournalMinFiles(2).setJournalDirectory(getJournalDir()).setPagingDirectory(getPageDir()).setLargeMessagesDirectory(getLargeMessagesDir()).addConnectorConfiguration(livetc.getName(), livetc).setPersistenceEnabled(true).setHAPolicyConfiguration(sharedStore ? new SharedStoreMasterPolicyConfiguration() : new ReplicatedPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(livetc.getName())); liveConf = createBasicConfig().setJournalDirectory(getJournalDir()).setBindingsDirectory(getBindingsDir()).setSecurityEnabled(false).addAcceptorConfiguration(liveAcceptortc).setJournalType(getDefaultJournalType()).setBindingsDirectory(getBindingsDir()).setJournalMinFiles(2).setJournalDirectory(getJournalDir()).setPagingDirectory(getPageDir()).setLargeMessagesDirectory(getLargeMessagesDir()).addConnectorConfiguration(livetc.getName(), livetc).setPersistenceEnabled(true).setHAPolicyConfiguration(sharedStore ? new SharedStoreMasterPolicyConfiguration() : new ReplicatedPolicyConfiguration(HAConfigUtils.QUORUM_VOTE_WAIT_TIME_SEC)).addClusterConfiguration(basicClusterConnectionConfig(livetc.getName()));
liveServer = addServer(new InVMNodeManagerServer(liveConf, nodeManager)); liveServer = addServer(new InVMNodeManagerServer(liveConf, nodeManager));

View File

@ -46,6 +46,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.junit.Wait; import org.apache.activemq.artemis.junit.Wait;
import org.apache.activemq.artemis.tests.util.HAConfigUtils;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -247,7 +248,7 @@ public class SharedNothingReplicationTest {
conf.setClusterUser("mycluster"); conf.setClusterUser("mycluster");
conf.setClusterPassword("mypassword"); conf.setClusterPassword("mypassword");
ReplicatedPolicyConfiguration haPolicy = new ReplicatedPolicyConfiguration(); ReplicatedPolicyConfiguration haPolicy = new ReplicatedPolicyConfiguration(HAConfigUtils.QUORUM_VOTE_WAIT_TIME_SEC);
haPolicy.setVoteOnReplicationFailure(false); haPolicy.setVoteOnReplicationFailure(false);
haPolicy.setCheckForLiveServer(false); haPolicy.setCheckForLiveServer(false);
conf.setHAPolicyConfiguration(haPolicy); conf.setHAPolicyConfiguration(haPolicy);
@ -270,7 +271,7 @@ public class SharedNothingReplicationTest {
File backupDir = brokersFolder.newFolder("backup"); File backupDir = brokersFolder.newFolder("backup");
conf.setBrokerInstance(backupDir); conf.setBrokerInstance(backupDir);
ReplicaPolicyConfiguration haPolicy = new ReplicaPolicyConfiguration(); ReplicaPolicyConfiguration haPolicy = new ReplicaPolicyConfiguration(HAConfigUtils.QUORUM_VOTE_WAIT_TIME_SEC);
haPolicy.setClusterName("cluster"); haPolicy.setClusterName("cluster");
conf.setHAPolicyConfiguration(haPolicy); conf.setHAPolicyConfiguration(haPolicy);

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.util;
public final class HAConfigUtils {
public static final int QUORUM_VOTE_WAIT_TIME_SEC = 30;
}

View File

@ -44,8 +44,8 @@ public final class ReplicatedBackupUtils {
liveConfig.clearAcceptorConfigurations().addAcceptorConfiguration(liveAcceptor); liveConfig.clearAcceptorConfigurations().addAcceptorConfiguration(liveAcceptor);
} }
backupConfig.addConnectorConfiguration(BACKUP_NODE_NAME, backupConnector).addConnectorConfiguration(LIVE_NODE_NAME, liveConnector).addClusterConfiguration(ActiveMQTestBase.basicClusterConnectionConfig(BACKUP_NODE_NAME, LIVE_NODE_NAME)).setHAPolicyConfiguration(new ReplicaPolicyConfiguration()); backupConfig.addConnectorConfiguration(BACKUP_NODE_NAME, backupConnector).addConnectorConfiguration(LIVE_NODE_NAME, liveConnector).addClusterConfiguration(ActiveMQTestBase.basicClusterConnectionConfig(BACKUP_NODE_NAME, LIVE_NODE_NAME)).setHAPolicyConfiguration(new ReplicaPolicyConfiguration(HAConfigUtils.QUORUM_VOTE_WAIT_TIME_SEC));
liveConfig.setName(LIVE_NODE_NAME).addConnectorConfiguration(LIVE_NODE_NAME, liveConnector).addConnectorConfiguration(BACKUP_NODE_NAME, backupConnector).setSecurityEnabled(false).addClusterConfiguration(ActiveMQTestBase.basicClusterConnectionConfig(LIVE_NODE_NAME, BACKUP_NODE_NAME)).setHAPolicyConfiguration(new ReplicatedPolicyConfiguration()); liveConfig.setName(LIVE_NODE_NAME).addConnectorConfiguration(LIVE_NODE_NAME, liveConnector).addConnectorConfiguration(BACKUP_NODE_NAME, backupConnector).setSecurityEnabled(false).addClusterConfiguration(ActiveMQTestBase.basicClusterConnectionConfig(LIVE_NODE_NAME, BACKUP_NODE_NAME)).setHAPolicyConfiguration(new ReplicatedPolicyConfiguration(HAConfigUtils.QUORUM_VOTE_WAIT_TIME_SEC));
} }
} }