ARTEMIS-1543 Fix Quorum Vote with remain live setting

This commit is contained in:
Martyn Taylor 2017-12-08 12:47:03 +00:00 committed by Andy Taylor
parent 27f448f061
commit 0e3468cefb
8 changed files with 497 additions and 333 deletions

View File

@ -299,6 +299,7 @@ public final class QuorumManager implements ClusterTopologyListener, ActiveMQCom
clusterControl.authorize();
//if we are successful get the vote and check whether we need to send it to the target server,
//just connecting may be enough
vote = quorumVote.connected();
if (vote.isRequestServerVote()) {
vote = clusterControl.sendQuorumVote(quorumVote.getName(), vote);

View File

@ -38,6 +38,9 @@ public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boole
private boolean decision = false;
// Is this the live requesting to stay live, or a backup requesting to become live.
private boolean requestToStayLive = false;
/**
* live nodes | remaining nodes | majority | votes needed
* 1 | 0 | 0 | 0
@ -48,7 +51,7 @@ public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boole
* 5 | 4 | 3.5 | 3
* 6 | 5 | 4 | 4
*/
public QuorumVoteServerConnect(int size, String targetNodeId) {
public QuorumVoteServerConnect(int size, String targetNodeId, boolean requestToStayLive) {
super(LIVE_FAILOVER_VOTE);
this.targetNodeId = targetNodeId;
double majority;
@ -64,8 +67,12 @@ public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boole
if (votesNeeded == 0) {
decision = true;
}
this.requestToStayLive = requestToStayLive;
}
public QuorumVoteServerConnect(int size, String targetNodeId) {
this(size, targetNodeId, false);
}
/**
* if we can connect to a node
*
@ -73,9 +80,8 @@ public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boole
*/
@Override
public Vote connected() {
return new ServerConnectVote(targetNodeId);
return new ServerConnectVote(targetNodeId, requestToStayLive);
}
/**
* if we cant connect to the node
*
@ -130,4 +136,8 @@ public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boole
else
ActiveMQServerLogger.LOGGER.timeoutWaitingForQuorumVoteResponses();
}
public boolean isRequestToStayLive() {
return requestToStayLive;
}
}

View File

@ -37,12 +37,13 @@ public class ServerConnectVoteHandler implements QuorumVoteHandler {
ServerConnectVote serverConnectVote = (ServerConnectVote) vote;
String nodeid = serverConnectVote.getNodeId();
TopologyMemberImpl member = server.getClusterManager().getDefaultConnection(null).getTopology().getMember(nodeid);
if (member != null && member.getLive() != null) {
ActiveMQServerLogger.LOGGER.nodeFoundInClusterTopology(nodeid);
return new ServerConnectVote(nodeid, false);
return new ServerConnectVote(nodeid, (Boolean) vote.getVote());
}
ActiveMQServerLogger.LOGGER.nodeNotFoundInClusterTopology(nodeid);
return new ServerConnectVote(nodeid, true);
return new ServerConnectVote(nodeid, !((Boolean) vote.getVote()));
}
@Override

View File

@ -255,7 +255,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
QuorumManager quorumManager = activeMQServer.getClusterManager().getQuorumManager();
int size = replicatedPolicy.getQuorumSize() == -1 ? quorumManager.getMaxClusterSize() : replicatedPolicy.getQuorumSize();
QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, activeMQServer.getNodeID().toString());
QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, activeMQServer.getNodeID().toString(), true);
quorumManager.vote(quorumVote);

View File

@ -0,0 +1,344 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster.failover;
import org.junit.Test;
public abstract class ClusterWithBackupFailoverTest extends ClusterWithBackupFailoverTestBase {
@Test
public void testFailLiveNodes() throws Throwable {
setupCluster();
startServers(3, 4, 5, 0, 1, 2);
//startServers(0, 1, 2, 3, 4, 5);
for (int i = 0; i < 3; i++) {
waitForTopology(servers[i], 3, 3);
}
waitForFailoverTopology(3, 0, 1, 2);
waitForFailoverTopology(4, 0, 1, 2);
waitForFailoverTopology(5, 0, 1, 2);
setupSessionFactory(0, 3, isNetty(), false);
setupSessionFactory(1, 4, isNetty(), false);
setupSessionFactory(2, 5, isNetty(), false);
createQueue(0, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
createQueue(1, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
createQueue(2, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
addConsumer(0, 0, QUEUE_NAME, null);
waitForBindings(0, QUEUES_TESTADDRESS, 1, 1, true);
addConsumer(1, 1, QUEUE_NAME, null);
waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true);
addConsumer(2, 2, QUEUE_NAME, null);
waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true);
waitForBindings();
send(0, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(1, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
Thread.sleep(1000);
log.info("######### Topology on client = " + locators[0].getTopology().describe() + " locator = " + locators[0]);
log.info("######### Crashing it........., sfs[0] = " + sfs[0]);
failNode(0);
waitForFailoverTopology(4, 3, 1, 2);
waitForFailoverTopology(5, 3, 1, 2);
// live nodes
waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true);
waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true);
// activated backup nodes
waitForBindings(3, QUEUES_TESTADDRESS, 1, 1, true);
// live nodes
waitForBindings(1, QUEUES_TESTADDRESS, 2, 2, false);
waitForBindings(2, QUEUES_TESTADDRESS, 2, 2, false);
// activated backup nodes
waitForBindings(3, QUEUES_TESTADDRESS, 2, 2, false);
ClusterWithBackupFailoverTestBase.log.info("** now sending");
send(0, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(1, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
failNode(1);
waitForFailoverTopology(5, 3, 4, 2);
Thread.sleep(1000);
// live nodes
waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true);
// activated backup nodes
waitForBindings(3, QUEUES_TESTADDRESS, 1, 1, true);
waitForBindings(4, QUEUES_TESTADDRESS, 1, 1, true);
// live nodes
waitForBindings(2, QUEUES_TESTADDRESS, 2, 2, false);
// activated backup nodes
waitForBindings(3, QUEUES_TESTADDRESS, 2, 2, false);
waitForBindings(4, QUEUES_TESTADDRESS, 2, 2, false);
send(0, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(1, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
failNode(2);
Thread.sleep(1000);
// activated backup nodes
waitForBindings(3, QUEUES_TESTADDRESS, 1, 1, true);
waitForBindings(4, QUEUES_TESTADDRESS, 1, 1, true);
waitForBindings(5, QUEUES_TESTADDRESS, 1, 1, true);
// activated backup nodes
waitForBindings(3, QUEUES_TESTADDRESS, 2, 2, false);
waitForBindings(4, QUEUES_TESTADDRESS, 2, 2, false);
waitForBindings(5, QUEUES_TESTADDRESS, 2, 2, false);
send(0, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(1, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
removeConsumer(0);
removeConsumer(1);
removeConsumer(2);
}
@Test
public void testFailBackupNodes() throws Exception {
setupCluster();
startServers(3, 4, 5, 0, 1, 2);
for (int i = 0; i < 3; i++) {
waitForTopology(servers[i], 3, 3);
}
setupSessionFactory(0, 3, isNetty(), false);
setupSessionFactory(1, 4, isNetty(), false);
setupSessionFactory(2, 5, isNetty(), false);
createQueue(0, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
createQueue(1, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
createQueue(2, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
addConsumer(0, 0, QUEUE_NAME, null);
addConsumer(1, 1, QUEUE_NAME, null);
addConsumer(2, 2, QUEUE_NAME, null);
waitForBindings();
send(0, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(1, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
failNode(3);
waitForBindings();
send(0, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(1, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
failNode(4);
waitForBindings();
send(0, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(1, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
failNode(5);
waitForBindings();
send(0, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(1, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
removeConsumer(0);
removeConsumer(1);
removeConsumer(2);
}
@Test
public void testFailAllNodes() throws Exception {
setupCluster();
startServers(0, 1, 2, 3, 4, 5);
setupSessionFactory(0, 3, isNetty(), false);
setupSessionFactory(1, 4, isNetty(), false);
setupSessionFactory(2, 5, isNetty(), false);
waitForFailoverTopology(3, 0, 1, 2);
waitForFailoverTopology(4, 0, 1, 2);
waitForFailoverTopology(5, 0, 1, 2);
createQueue(0, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
createQueue(1, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
createQueue(2, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
addConsumer(0, 0, QUEUE_NAME, null);
addConsumer(1, 1, QUEUE_NAME, null);
addConsumer(2, 2, QUEUE_NAME, null);
waitForBindings();
send(0, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(1, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
failNode(0);
waitForFailoverTopology(4, 3, 1, 2);
waitForFailoverTopology(5, 3, 1, 2);
// live nodes
waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true);
waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true);
// activated backup nodes
waitForBindings(3, QUEUES_TESTADDRESS, 1, 1, true);
// live nodes
waitForBindings(1, QUEUES_TESTADDRESS, 2, 2, false);
waitForBindings(2, QUEUES_TESTADDRESS, 2, 2, false);
// activated backup nodes
waitForBindings(3, QUEUES_TESTADDRESS, 2, 2, false);
ClusterWithBackupFailoverTestBase.log.info("** now sending");
send(0, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(1, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
removeConsumer(0);
failNode(3);
// live nodes
waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true);
waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true);
// live nodes
waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, false);
waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, false);
send(1, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2);
failNode(1);
waitForFailoverTopology(5, 2, 4);
// live nodes
waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true);
// activated backup nodes
waitForBindings(4, QUEUES_TESTADDRESS, 1, 1, true);
// live nodes
waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, false);
// activated backup nodes
waitForBindings(4, QUEUES_TESTADDRESS, 1, 1, false);
send(1, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2);
removeConsumer(1);
// live nodes
waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true);
// live nodes
waitForBindings(2, QUEUES_TESTADDRESS, 1, 0, false);
failNode(4, 1);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 2);
failNode(2);
// live nodes
waitForBindings(5, QUEUES_TESTADDRESS, 1, 1, true);
// live nodes
waitForBindings(5, QUEUES_TESTADDRESS, 0, 0, false);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 2);
removeConsumer(2);
failNode(5);
}
}

View File

@ -26,13 +26,12 @@ import org.apache.activemq.artemis.tests.integration.cluster.distribution.Cluste
import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer;
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
import org.junit.Before;
import org.junit.Test;
public abstract class ClusterWithBackupFailoverTestBase extends ClusterTestBase {
protected static final String QUEUE_NAME = "queue0";
protected static final String QUEUES_TESTADDRESS = "queues.testaddress";
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
protected static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
protected abstract void setupCluster(MessageLoadBalancingType messageLoadBalancingType) throws Exception;
@ -49,131 +48,7 @@ public abstract class ClusterWithBackupFailoverTestBase extends ClusterTestBase
return false;
}
@Test
public void testFailLiveNodes() throws Throwable {
setupCluster();
startServers(3, 4, 5, 0, 1, 2);
//startServers(0, 1, 2, 3, 4, 5);
for (int i = 0; i < 3; i++) {
waitForTopology(servers[i], 3, 3);
}
waitForFailoverTopology(3, 0, 1, 2);
waitForFailoverTopology(4, 0, 1, 2);
waitForFailoverTopology(5, 0, 1, 2);
setupSessionFactory(0, 3, isNetty(), false);
setupSessionFactory(1, 4, isNetty(), false);
setupSessionFactory(2, 5, isNetty(), false);
createQueue(0, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
createQueue(1, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
createQueue(2, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
addConsumer(0, 0, QUEUE_NAME, null);
waitForBindings(0, QUEUES_TESTADDRESS, 1, 1, true);
addConsumer(1, 1, QUEUE_NAME, null);
waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true);
addConsumer(2, 2, QUEUE_NAME, null);
waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true);
waitForBindings();
send(0, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(1, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
Thread.sleep(1000);
log.info("######### Topology on client = " + locators[0].getTopology().describe() + " locator = " + locators[0]);
log.info("######### Crashing it........., sfs[0] = " + sfs[0]);
failNode(0);
waitForFailoverTopology(4, 3, 1, 2);
waitForFailoverTopology(5, 3, 1, 2);
// live nodes
waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true);
waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true);
// activated backup nodes
waitForBindings(3, QUEUES_TESTADDRESS, 1, 1, true);
// live nodes
waitForBindings(1, QUEUES_TESTADDRESS, 2, 2, false);
waitForBindings(2, QUEUES_TESTADDRESS, 2, 2, false);
// activated backup nodes
waitForBindings(3, QUEUES_TESTADDRESS, 2, 2, false);
ClusterWithBackupFailoverTestBase.log.info("** now sending");
send(0, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(1, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
failNode(1);
waitForFailoverTopology(5, 3, 4, 2);
Thread.sleep(1000);
// live nodes
waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true);
// activated backup nodes
waitForBindings(3, QUEUES_TESTADDRESS, 1, 1, true);
waitForBindings(4, QUEUES_TESTADDRESS, 1, 1, true);
// live nodes
waitForBindings(2, QUEUES_TESTADDRESS, 2, 2, false);
// activated backup nodes
waitForBindings(3, QUEUES_TESTADDRESS, 2, 2, false);
waitForBindings(4, QUEUES_TESTADDRESS, 2, 2, false);
send(0, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(1, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
failNode(2);
Thread.sleep(1000);
// activated backup nodes
waitForBindings(3, QUEUES_TESTADDRESS, 1, 1, true);
waitForBindings(4, QUEUES_TESTADDRESS, 1, 1, true);
waitForBindings(5, QUEUES_TESTADDRESS, 1, 1, true);
// activated backup nodes
waitForBindings(3, QUEUES_TESTADDRESS, 2, 2, false);
waitForBindings(4, QUEUES_TESTADDRESS, 2, 2, false);
waitForBindings(5, QUEUES_TESTADDRESS, 2, 2, false);
send(0, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(1, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
removeConsumer(0);
removeConsumer(1);
removeConsumer(2);
}
private void waitForBindings() throws Exception {
protected void waitForBindings() throws Exception {
waitForBindings(0, QUEUES_TESTADDRESS, 1, 1, true);
waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true);
waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true);
@ -183,83 +58,6 @@ public abstract class ClusterWithBackupFailoverTestBase extends ClusterTestBase
waitForBindings(2, QUEUES_TESTADDRESS, 2, 2, false);
}
@Test
public void testFailBackupNodes() throws Exception {
setupCluster();
startServers(3, 4, 5, 0, 1, 2);
for (int i = 0; i < 3; i++) {
waitForTopology(servers[i], 3, 3);
}
setupSessionFactory(0, 3, isNetty(), false);
setupSessionFactory(1, 4, isNetty(), false);
setupSessionFactory(2, 5, isNetty(), false);
createQueue(0, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
createQueue(1, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
createQueue(2, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
addConsumer(0, 0, QUEUE_NAME, null);
addConsumer(1, 1, QUEUE_NAME, null);
addConsumer(2, 2, QUEUE_NAME, null);
waitForBindings();
send(0, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(1, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
failNode(3);
waitForBindings();
send(0, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(1, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
failNode(4);
waitForBindings();
send(0, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(1, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
failNode(5);
waitForBindings();
send(0, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(1, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
removeConsumer(0);
removeConsumer(1);
removeConsumer(2);
}
protected void setupCluster() throws Exception {
setupCluster(MessageLoadBalancingType.ON_DEMAND);
}
@ -297,126 +95,4 @@ public abstract class ClusterWithBackupFailoverTestBase extends ClusterTestBase
ClientSession[] sessionsArray = sessions.toArray(new ClientSession[sessions.size()]);
return sessionsArray;
}
@Test
public void testFailAllNodes() throws Exception {
setupCluster();
startServers(0, 1, 2, 3, 4, 5);
setupSessionFactory(0, 3, isNetty(), false);
setupSessionFactory(1, 4, isNetty(), false);
setupSessionFactory(2, 5, isNetty(), false);
waitForFailoverTopology(3, 0, 1, 2);
waitForFailoverTopology(4, 0, 1, 2);
waitForFailoverTopology(5, 0, 1, 2);
createQueue(0, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
createQueue(1, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
createQueue(2, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
addConsumer(0, 0, QUEUE_NAME, null);
addConsumer(1, 1, QUEUE_NAME, null);
addConsumer(2, 2, QUEUE_NAME, null);
waitForBindings();
send(0, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(1, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
failNode(0);
waitForFailoverTopology(4, 3, 1, 2);
waitForFailoverTopology(5, 3, 1, 2);
// live nodes
waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true);
waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true);
// activated backup nodes
waitForBindings(3, QUEUES_TESTADDRESS, 1, 1, true);
// live nodes
waitForBindings(1, QUEUES_TESTADDRESS, 2, 2, false);
waitForBindings(2, QUEUES_TESTADDRESS, 2, 2, false);
// activated backup nodes
waitForBindings(3, QUEUES_TESTADDRESS, 2, 2, false);
ClusterWithBackupFailoverTestBase.log.info("** now sending");
send(0, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(1, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
removeConsumer(0);
failNode(3);
// live nodes
waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true);
waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true);
// live nodes
waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, false);
waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, false);
send(1, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2);
failNode(1);
waitForFailoverTopology(5, 2, 4);
// live nodes
waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true);
// activated backup nodes
waitForBindings(4, QUEUES_TESTADDRESS, 1, 1, true);
// live nodes
waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, false);
// activated backup nodes
waitForBindings(4, QUEUES_TESTADDRESS, 1, 1, false);
send(1, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2);
removeConsumer(1);
// live nodes
waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true);
// live nodes
waitForBindings(2, QUEUES_TESTADDRESS, 1, 0, false);
failNode(4, 1);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 2);
failNode(2);
// live nodes
waitForBindings(5, QUEUES_TESTADDRESS, 1, 1, true);
// live nodes
waitForBindings(5, QUEUES_TESTADDRESS, 0, 0, false);
send(2, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 2);
removeConsumer(2);
failNode(5);
}
}

View File

@ -18,7 +18,7 @@ package org.apache.activemq.artemis.tests.integration.cluster.failover;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
public class DiscoveryClusterWithBackupFailoverTest extends ClusterWithBackupFailoverTestBase {
public class DiscoveryClusterWithBackupFailoverTest extends ClusterWithBackupFailoverTest {
protected final String groupAddress = getUDPDiscoveryAddress();

View File

@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster.failover;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.junit.Wait;
import org.apache.activemq.artemis.tests.integration.cluster.util.BackupSyncDelay;
import org.junit.Test;
public class LiveVoteOnBackupFailureClusterTest extends ClusterWithBackupFailoverTestBase {
@Override
protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception {
setupClusterConnectionWithBackups("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, new int[]{1, 2});
setupClusterConnectionWithBackups("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, new int[]{0, 2});
setupClusterConnectionWithBackups("cluster2", "queues", messageLoadBalancingType, 1, isNetty(), 2, new int[]{0, 1});
setupClusterConnectionWithBackups("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 3, new int[]{1, 2});
setupClusterConnectionWithBackups("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 4, new int[]{0, 2});
setupClusterConnectionWithBackups("cluster2", "queues", messageLoadBalancingType, 1, isNetty(), 5, new int[]{0, 1});
}
@Override
protected void setupServers() throws Exception {
// The backups
setupBackupServer(3, 0, isFileStorage(), isSharedStorage(), isNetty());
setupBackupServer(4, 1, isFileStorage(), isSharedStorage(), isNetty());
setupBackupServer(5, 2, isFileStorage(), isSharedStorage(), isNetty());
// The lives
setupLiveServer(0, isFileStorage(), isSharedStorage(), isNetty(), false);
setupLiveServer(1, isFileStorage(), isSharedStorage(), isNetty(), false);
setupLiveServer(2, isFileStorage(), isSharedStorage(), isNetty(), false);
//we need to know who is connected to who
((ReplicatedPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration()).setGroupName("group0");
((ReplicatedPolicyConfiguration) servers[1].getConfiguration().getHAPolicyConfiguration()).setGroupName("group1");
((ReplicatedPolicyConfiguration) servers[2].getConfiguration().getHAPolicyConfiguration()).setGroupName("group2");
((ReplicaPolicyConfiguration) servers[3].getConfiguration().getHAPolicyConfiguration()).setGroupName("group0");
((ReplicaPolicyConfiguration) servers[4].getConfiguration().getHAPolicyConfiguration()).setGroupName("group1");
((ReplicaPolicyConfiguration) servers[5].getConfiguration().getHAPolicyConfiguration()).setGroupName("group2");
// Configure to vote to stay live, when backup dies
((ReplicatedPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration()).setVoteOnReplicationFailure(true);
((ReplicatedPolicyConfiguration) servers[1].getConfiguration().getHAPolicyConfiguration()).setVoteOnReplicationFailure(true);
((ReplicatedPolicyConfiguration) servers[2].getConfiguration().getHAPolicyConfiguration()).setVoteOnReplicationFailure(true);
}
protected boolean isSharedStorage() {
return false;
}
@Test
public void testLiveVoteSucceedsAfterBackupFailure() throws Exception {
startCluster();
// Wait for servers to start
for (int i = 0; i < servers.length; i++) {
waitForServerToStart(servers[i]);
}
// Wait for backup to sync replication
for (int i = 3; i < servers.length; i++) {
Wait.waitFor(() -> servers[3].isReplicaSync());
}
// Register failure listener to detect when live recognises the backup has died.
final CountDownLatch latch = new CountDownLatch(1);
servers[0].getReplicationManager().getBackupTransportConnection().addFailureListener(new FailureListener() {
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver) {
latch.countDown();
}
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
latch.countDown();
}
});
servers[3].stop();
// Wait for live to notice backup is down.
latch.await(30, TimeUnit.SECONDS);
// The quorum vote time out is hardcoded 5s. Wait for double the time then check server is live
Thread.sleep(10000);
assertTrue(servers[0].isStarted());
}
private void startCluster() throws Exception {
int[] liveServerIDs = new int[]{0, 1, 2};
setupCluster();
startServers(0, 1, 2);
new BackupSyncDelay(servers[4], servers[1], PacketImpl.REPLICATION_SCHEDULED_FAILOVER);
startServers(3, 4, 5);
for (int i : liveServerIDs) {
waitForTopology(servers[i], 3, 3);
}
waitForFailoverTopology(3, 0, 1, 2);
waitForFailoverTopology(4, 0, 1, 2);
waitForFailoverTopology(5, 0, 1, 2);
}
}