This closes #2709
This commit is contained in:
commit
55e59febf3
|
@ -1998,4 +1998,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
|||
@LogMessage(level = Logger.Level.ERROR)
|
||||
@Message(id = 224097, value = "Failed to start server", format = Message.Format.MESSAGE_FORMAT)
|
||||
void failedToStartServer(@Cause Throwable t);
|
||||
|
||||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(id = 224098, value = "Received a vote saying the backup is live with connector: {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||
void qourumBackupIsLive(String liveConnector);
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boole
|
|||
public static final SimpleString LIVE_FAILOVER_VOTE = new SimpleString("LiveFailoverQuorumVote");
|
||||
private final CountDownLatch latch;
|
||||
private final String targetNodeId;
|
||||
private final String liveConnector;
|
||||
|
||||
private int votesNeeded;
|
||||
|
||||
|
@ -51,9 +52,10 @@ public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boole
|
|||
* 5 | 4 | 3.5 | 3
|
||||
* 6 | 5 | 4 | 4
|
||||
*/
|
||||
public QuorumVoteServerConnect(int size, String targetNodeId, boolean requestToStayLive) {
|
||||
public QuorumVoteServerConnect(int size, String targetNodeId, boolean requestToStayLive, String liveConnector) {
|
||||
super(LIVE_FAILOVER_VOTE);
|
||||
this.targetNodeId = targetNodeId;
|
||||
this.liveConnector = liveConnector;
|
||||
double majority;
|
||||
if (size <= 2) {
|
||||
majority = ((double) size) / 2;
|
||||
|
@ -71,7 +73,7 @@ public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boole
|
|||
}
|
||||
|
||||
public QuorumVoteServerConnect(int size, String targetNodeId) {
|
||||
this(size, targetNodeId, false);
|
||||
this(size, targetNodeId, false, null);
|
||||
}
|
||||
/**
|
||||
* if we can connect to a node
|
||||
|
@ -80,7 +82,7 @@ public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boole
|
|||
*/
|
||||
@Override
|
||||
public Vote connected() {
|
||||
return new ServerConnectVote(targetNodeId, requestToStayLive);
|
||||
return new ServerConnectVote(targetNodeId, requestToStayLive, null);
|
||||
}
|
||||
/**
|
||||
* if we cant connect to the node
|
||||
|
@ -108,9 +110,19 @@ public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boole
|
|||
public synchronized void vote(ServerConnectVote vote) {
|
||||
if (decision)
|
||||
return;
|
||||
if (vote.getVote()) {
|
||||
if (!requestToStayLive && vote.getVote()) {
|
||||
total++;
|
||||
latch.countDown();
|
||||
if (total >= votesNeeded) {
|
||||
decision = true;
|
||||
}//do the opposite, if it says there is a node connected it means the backup has come live
|
||||
} else if (requestToStayLive && vote.getVote()) {
|
||||
total++;
|
||||
latch.countDown();
|
||||
if (liveConnector != null && !liveConnector.equals(vote.getTransportConfiguration())) {
|
||||
ActiveMQServerLogger.LOGGER.qourumBackupIsLive(liveConnector);
|
||||
return;
|
||||
}
|
||||
if (total >= votesNeeded) {
|
||||
decision = true;
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.Map;
|
|||
public class ServerConnectVote extends BooleanVote {
|
||||
|
||||
private String nodeId;
|
||||
private String transportConfiguration;
|
||||
|
||||
public ServerConnectVote(String nodeId) {
|
||||
super(false);
|
||||
|
@ -33,9 +34,10 @@ public class ServerConnectVote extends BooleanVote {
|
|||
super(false);
|
||||
}
|
||||
|
||||
public ServerConnectVote(String nodeid, boolean isLive) {
|
||||
public ServerConnectVote(String nodeid, boolean isLive, String transportConfiguration) {
|
||||
super(isLive);
|
||||
this.nodeId = nodeid;
|
||||
this.transportConfiguration = transportConfiguration;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -52,12 +54,18 @@ public class ServerConnectVote extends BooleanVote {
|
|||
public void encode(ActiveMQBuffer buff) {
|
||||
super.encode(buff);
|
||||
buff.writeString(nodeId);
|
||||
buff.writeNullableString(transportConfiguration);
|
||||
}
|
||||
|
||||
public String getTransportConfiguration() {
|
||||
return transportConfiguration;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decode(ActiveMQBuffer buff) {
|
||||
super.decode(buff);
|
||||
nodeId = buff.readString();
|
||||
transportConfiguration = buff.readNullableString();
|
||||
}
|
||||
|
||||
public String getNodeId() {
|
||||
|
|
|
@ -36,14 +36,18 @@ public class ServerConnectVoteHandler implements QuorumVoteHandler {
|
|||
public Vote vote(Vote vote) {
|
||||
ServerConnectVote serverConnectVote = (ServerConnectVote) vote;
|
||||
String nodeid = serverConnectVote.getNodeId();
|
||||
TopologyMemberImpl member = server.getClusterManager().getDefaultConnection(null).getTopology().getMember(nodeid);
|
||||
try {
|
||||
TopologyMemberImpl member = server.getClusterManager().getDefaultConnection(null).getTopology().getMember(nodeid);
|
||||
|
||||
if (member != null && member.getLive() != null) {
|
||||
ActiveMQServerLogger.LOGGER.nodeFoundInClusterTopology(nodeid);
|
||||
return new ServerConnectVote(nodeid, (Boolean) vote.getVote());
|
||||
if (member != null && member.getLive() != null) {
|
||||
ActiveMQServerLogger.LOGGER.nodeFoundInClusterTopology(nodeid);
|
||||
return new ServerConnectVote(nodeid, (Boolean) vote.getVote(), member.getLive().toString());
|
||||
}
|
||||
ActiveMQServerLogger.LOGGER.nodeNotFoundInClusterTopology(nodeid);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
ActiveMQServerLogger.LOGGER.nodeNotFoundInClusterTopology(nodeid);
|
||||
return new ServerConnectVote(nodeid, !((Boolean) vote.getVote()));
|
||||
return new ServerConnectVote(nodeid, !((Boolean) vote.getVote()), null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -254,8 +254,15 @@ public class SharedNothingLiveActivation extends LiveActivation {
|
|||
if (failed && replicatedPolicy.isVoteOnReplicationFailure()) {
|
||||
QuorumManager quorumManager = activeMQServer.getClusterManager().getQuorumManager();
|
||||
int size = replicatedPolicy.getQuorumSize() == -1 ? quorumManager.getMaxClusterSize() : replicatedPolicy.getQuorumSize();
|
||||
|
||||
QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, activeMQServer.getNodeID().toString(), true);
|
||||
String liveConnector = null;
|
||||
List<ClusterConnectionConfiguration> clusterConfigurations = activeMQServer.getConfiguration().getClusterConfigurations();
|
||||
if (clusterConfigurations != null && clusterConfigurations.size() > 0) {
|
||||
ClusterConnectionConfiguration clusterConnectionConfiguration = clusterConfigurations.get(0);
|
||||
String connectorName = clusterConnectionConfiguration.getConnectorName();
|
||||
TransportConfiguration transportConfiguration = activeMQServer.getConfiguration().getConnectorConfigurations().get(connectorName);
|
||||
liveConnector = transportConfiguration != null ? transportConfiguration.toString() : null;
|
||||
}
|
||||
QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, activeMQServer.getNodeID().toString(), true, liveConnector);
|
||||
|
||||
quorumManager.vote(quorumVote);
|
||||
|
||||
|
|
|
@ -0,0 +1,133 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.cluster.failover;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
|
||||
import org.apache.activemq.artemis.api.core.client.TopologyMember;
|
||||
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||
import org.apache.activemq.artemis.core.server.impl.SharedNothingLiveActivation;
|
||||
import org.apache.activemq.artemis.tests.integration.cluster.util.BackupSyncDelay;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class QuorumFailOverLiveVotesTest extends StaticClusterWithBackupFailoverTest {
|
||||
@Override
|
||||
protected void setupServers() throws Exception {
|
||||
super.setupServers();
|
||||
//we need to know who is connected to who
|
||||
((ReplicatedPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration()).setGroupName("group0");
|
||||
((ReplicatedPolicyConfiguration) servers[1].getConfiguration().getHAPolicyConfiguration()).setGroupName("group1");
|
||||
((ReplicatedPolicyConfiguration) servers[2].getConfiguration().getHAPolicyConfiguration()).setGroupName("group2");
|
||||
((ReplicaPolicyConfiguration) servers[3].getConfiguration().getHAPolicyConfiguration()).setGroupName("group0");
|
||||
((ReplicaPolicyConfiguration) servers[4].getConfiguration().getHAPolicyConfiguration()).setGroupName("group1");
|
||||
((ReplicaPolicyConfiguration) servers[5].getConfiguration().getHAPolicyConfiguration()).setGroupName("group2");
|
||||
|
||||
//reduce the numbers so that the vote finishes faster
|
||||
((ReplicaPolicyConfiguration) servers[3].getConfiguration().getHAPolicyConfiguration()).setVoteRetries(5);
|
||||
((ReplicaPolicyConfiguration) servers[3].getConfiguration().getHAPolicyConfiguration()).setVoteRetryWait(500);
|
||||
((ReplicatedPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration()).setVoteOnReplicationFailure(true);
|
||||
((ReplicatedPolicyConfiguration) servers[1].getConfiguration().getHAPolicyConfiguration()).setVoteOnReplicationFailure(true);
|
||||
((ReplicatedPolicyConfiguration) servers[2].getConfiguration().getHAPolicyConfiguration()).setVoteOnReplicationFailure(true);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testQuorumVotingLiveNotDead() throws Exception {
|
||||
int[] liveServerIDs = new int[]{0, 1, 2};
|
||||
setupCluster();
|
||||
startServers(0, 1, 2);
|
||||
new BackupSyncDelay(servers[4], servers[1], PacketImpl.REPLICATION_SCHEDULED_FAILOVER);
|
||||
startServers(3, 4, 5);
|
||||
|
||||
for (int i : liveServerIDs) {
|
||||
waitForTopology(servers[i], 3, 3);
|
||||
}
|
||||
|
||||
waitForFailoverTopology(3, 0, 1, 2);
|
||||
waitForFailoverTopology(4, 0, 1, 2);
|
||||
waitForFailoverTopology(5, 0, 1, 2);
|
||||
|
||||
for (int i : liveServerIDs) {
|
||||
setupSessionFactory(i, i + 3, isNetty(), false);
|
||||
createQueue(i, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
|
||||
addConsumer(i, i, QUEUE_NAME, null);
|
||||
}
|
||||
|
||||
waitForBindings(0, QUEUES_TESTADDRESS, 1, 1, true);
|
||||
waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true);
|
||||
waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true);
|
||||
|
||||
send(0, QUEUES_TESTADDRESS, 10, false, null);
|
||||
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
|
||||
|
||||
final QuorumFailOverLiveVotesTest.TopologyListener liveTopologyListener = new QuorumFailOverLiveVotesTest.TopologyListener("LIVE-1");
|
||||
|
||||
locators[0].addClusterTopologyListener(liveTopologyListener);
|
||||
|
||||
assertTrue("we assume 3 is a backup", servers[3].getHAPolicy().isBackup());
|
||||
assertFalse("no shared storage", servers[3].getHAPolicy().isSharedStore());
|
||||
|
||||
SharedNothingLiveActivation liveActivation = (SharedNothingLiveActivation) servers[0].getActivation();
|
||||
// ;
|
||||
servers[0].getRemotingService().freeze(null, null);
|
||||
waitForFailoverTopology(4, 3, 1, 2);
|
||||
waitForFailoverTopology(5, 3, 1, 2);
|
||||
|
||||
assertTrue(servers[3].waitForActivation(2, TimeUnit.SECONDS));
|
||||
liveActivation.freezeReplication();
|
||||
waitForServerToStop(servers[0]);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isSharedStorage() {
|
||||
return false;
|
||||
}
|
||||
|
||||
private static class TopologyListener implements ClusterTopologyListener {
|
||||
|
||||
final String prefix;
|
||||
final Map<String, Pair<TransportConfiguration, TransportConfiguration>> nodes = new ConcurrentHashMap<>();
|
||||
|
||||
private TopologyListener(String string) {
|
||||
prefix = string;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nodeUP(TopologyMember topologyMember, boolean last) {
|
||||
Pair<TransportConfiguration, TransportConfiguration> connectorPair = new Pair<>(topologyMember.getLive(), topologyMember.getBackup());
|
||||
nodes.put(topologyMember.getBackupGroupName(), connectorPair);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nodeDown(long eventUID, String nodeID) {
|
||||
nodes.remove(nodeID);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TopologyListener(" + prefix + ", #=" + nodes.size() + ")";
|
||||
}
|
||||
}
|
||||
}
|
|
@ -47,7 +47,7 @@ public class QuorumVoteServerConnectTest extends ActiveMQTestBase {
|
|||
public void testSuccessfulVote() {
|
||||
QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, "foo");
|
||||
for (int i = 0; i < trueVotes - 1; i++) {
|
||||
quorum.vote(new ServerConnectVote("foo", true));
|
||||
quorum.vote(new ServerConnectVote("foo", true, null));
|
||||
}
|
||||
|
||||
if (size > 1) {
|
||||
|
@ -55,7 +55,7 @@ public class QuorumVoteServerConnectTest extends ActiveMQTestBase {
|
|||
}
|
||||
quorum = new QuorumVoteServerConnect(size, "foo");
|
||||
for (int i = 0; i < trueVotes; i++) {
|
||||
quorum.vote(new ServerConnectVote("foo", true));
|
||||
quorum.vote(new ServerConnectVote("foo", true, null));
|
||||
}
|
||||
assertTrue(quorum.getDecision());
|
||||
}
|
||||
|
@ -64,7 +64,7 @@ public class QuorumVoteServerConnectTest extends ActiveMQTestBase {
|
|||
public void testUnSuccessfulVote() {
|
||||
QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, "foo");
|
||||
for (int i = 0; i < trueVotes - 1; i++) {
|
||||
quorum.vote(new ServerConnectVote("foo", true));
|
||||
quorum.vote(new ServerConnectVote("foo", true, null));
|
||||
}
|
||||
|
||||
if (size > 1) {
|
||||
|
@ -72,7 +72,7 @@ public class QuorumVoteServerConnectTest extends ActiveMQTestBase {
|
|||
}
|
||||
quorum = new QuorumVoteServerConnect(size, "foo");
|
||||
for (int i = 0; i < trueVotes - 1; i++) {
|
||||
quorum.vote(new ServerConnectVote("foo", true));
|
||||
quorum.vote(new ServerConnectVote("foo", true, null));
|
||||
}
|
||||
if (size == 1) {
|
||||
assertTrue(quorum.getDecision());
|
||||
|
|
Loading…
Reference in New Issue