ARTEMIS-1296 - fix quorum vote

Add a check on top of just being able to connect to a live broker

https://issues.apache.org/jira/browse/ARTEMIS-1296
This commit is contained in:
Andy Taylor 2017-07-18 07:57:52 +01:00
parent 3f924777fa
commit 6f140897c6
12 changed files with 217 additions and 25 deletions

View File

@ -439,6 +439,8 @@ public interface ActiveMQServerControl {
long getGlobalMaxSize(); long getGlobalMaxSize();
// Operations ---------------------------------------------------- // Operations ----------------------------------------------------
@Operation(desc = "Isolate the broker", impact = MBeanOperationInfo.ACTION)
boolean freezeReplication();
@Operation(desc = "create an address", impact = MBeanOperationInfo.ACTION) @Operation(desc = "create an address", impact = MBeanOperationInfo.ACTION)
String createAddress(@Parameter(name = "name", desc = "The name of the address") String name, String createAddress(@Parameter(name = "name", desc = "The name of the address") String name,

View File

@ -94,7 +94,9 @@ import org.apache.activemq.artemis.core.server.cluster.ha.LiveOnlyPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy; import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy; import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy;
import org.apache.activemq.artemis.core.server.group.GroupingHandler; import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.SharedNothingLiveActivation;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
@ -567,6 +569,17 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
} }
} }
@Override
public boolean freezeReplication() {
Activation activation = server.getActivation();
if (activation instanceof SharedNothingLiveActivation) {
SharedNothingLiveActivation liveActivation = (SharedNothingLiveActivation) activation;
liveActivation.freezeReplication();
return true;
}
return false;
}
private enum AddressInfoTextFormatter { private enum AddressInfoTextFormatter {
Long { Long {
@Override @Override

View File

@ -23,7 +23,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
/** /**
* a simple yes.no vote * a simple yes.no vote
*/ */
public final class BooleanVote extends Vote<Boolean> { public class BooleanVote extends Vote<Boolean> {
private boolean vote; private boolean vote;

View File

@ -21,15 +21,15 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.Topology; import org.apache.activemq.artemis.core.client.impl.Topology;
import org.apache.activemq.artemis.core.persistence.StorageManager;
/** /**
* A Qourum Vote for deciding if a replicated backup should become live. * A Qourum Vote for deciding if a replicated backup should become live.
*/ */
public class QuorumVoteServerConnect extends QuorumVote<BooleanVote, Boolean> { public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boolean> {
private static final SimpleString LIVE_FAILOVER_VOTE = new SimpleString("LIVE_FAILOVER)VOTE"); public static final SimpleString LIVE_FAILOVER_VOTE = new SimpleString("LIVE_FAILOVER_VOTE");
private final CountDownLatch latch; private final CountDownLatch latch;
private final String targetNodeId;
private int votesNeeded; private int votesNeeded;
@ -47,8 +47,9 @@ public class QuorumVoteServerConnect extends QuorumVote<BooleanVote, Boolean> {
* 5 | 4 | 3.5 | 3 * 5 | 4 | 3.5 | 3
* 6 | 5 | 4 | 4 * 6 | 5 | 4 | 4
*/ */
public QuorumVoteServerConnect(int size, StorageManager storageManager) { public QuorumVoteServerConnect(int size, String targetNodeId) {
super(LIVE_FAILOVER_VOTE); super(LIVE_FAILOVER_VOTE);
this.targetNodeId = targetNodeId;
double majority; double majority;
if (size <= 2) { if (size <= 2) {
majority = ((double) size) / 2; majority = ((double) size) / 2;
@ -71,7 +72,7 @@ public class QuorumVoteServerConnect extends QuorumVote<BooleanVote, Boolean> {
*/ */
@Override @Override
public Vote connected() { public Vote connected() {
return new BooleanVote(true); return new ServerConnectVote(targetNodeId);
} }
/** /**
@ -97,7 +98,7 @@ public class QuorumVoteServerConnect extends QuorumVote<BooleanVote, Boolean> {
* @param vote the vote to make. * @param vote the vote to make.
*/ */
@Override @Override
public synchronized void vote(BooleanVote vote) { public synchronized void vote(ServerConnectVote vote) {
if (decision) if (decision)
return; return;
if (vote.getVote()) { if (vote.getVote()) {
@ -111,19 +112,16 @@ public class QuorumVoteServerConnect extends QuorumVote<BooleanVote, Boolean> {
@Override @Override
public void allVotesCast(Topology voteTopology) { public void allVotesCast(Topology voteTopology) {
while (latch.getCount() > 0) {
latch.countDown(); latch.countDown();
} }
}
@Override @Override
public Boolean getDecision() { public Boolean getDecision() {
return decision; return decision;
} }
@Override
public SimpleString getName() {
return null;
}
public void await(int latchTimeout, TimeUnit unit) throws InterruptedException { public void await(int latchTimeout, TimeUnit unit) throws InterruptedException {
latch.await(latchTimeout, unit); latch.await(latchTimeout, unit);
} }

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.cluster.qourum;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import java.util.Map;
public class ServerConnectVote extends BooleanVote {
private String nodeId;
public ServerConnectVote(String nodeId) {
super(false);
this.nodeId = nodeId;
}
public ServerConnectVote() {
super(false);
}
public ServerConnectVote(String nodeid, boolean isLive) {
super(isLive);
this.nodeId = nodeid;
}
@Override
public boolean isRequestServerVote() {
return true;
}
@Override
public Map<String, Object> getVoteMap() {
return null;
}
@Override
public void encode(ActiveMQBuffer buff) {
super.encode(buff);
buff.writeString(nodeId);
}
@Override
public void decode(ActiveMQBuffer buff) {
super.decode(buff);
nodeId = buff.readString();
}
public String getNodeId() {
return nodeId;
}
}

View File

@ -267,7 +267,7 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
private boolean isLiveDown() { private boolean isLiveDown() {
int size = quorumSize == -1 ? quorumManager.getMaxClusterSize() : quorumSize; int size = quorumSize == -1 ? quorumManager.getMaxClusterSize() : quorumSize;
QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, storageManager); QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, targetServerID);
quorumManager.vote(quorumVote); quorumManager.vote(quorumVote);

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteHandler;
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteServerConnect;
import org.apache.activemq.artemis.core.server.cluster.qourum.ServerConnectVote;
import org.apache.activemq.artemis.core.server.cluster.qourum.Vote;
public class ServerConnectVoteHandler implements QuorumVoteHandler {
private final ActiveMQServerImpl server;
public ServerConnectVoteHandler(ActiveMQServerImpl server) {
this.server = server;
}
@Override
public Vote vote(Vote vote) {
ServerConnectVote serverConnectVote = (ServerConnectVote) vote;
String nodeid = serverConnectVote.getNodeId();
TopologyMemberImpl member = server.getClusterManager().getDefaultConnection(null).getTopology().getMember(nodeid);
if (member != null && member.getLive() != null) {
return new ServerConnectVote(nodeid, false);
}
return new ServerConnectVote(nodeid, true);
}
@Override
public SimpleString getQuorumName() {
return QuorumVoteServerConnect.LIVE_FAILOVER_VOTE;
}
@Override
public Vote decode(ActiveMQBuffer voteBuffer) {
ServerConnectVote vote = new ServerConnectVote();
vote.decode(voteBuffer);
return vote;
}
}

View File

@ -133,6 +133,7 @@ public final class SharedNothingBackupActivation extends Activation {
return; return;
backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize()); backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize());
activeMQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum); activeMQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum);
activeMQServer.getClusterManager().getQuorumManager().registerQuorumHandler(new ServerConnectVoteHandler(activeMQServer));
} }
//use a Node Locator to connect to the cluster //use a Node Locator to connect to the cluster

View File

@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQAlreadyReplicatingException; import org.apache.activemq.artemis.api.core.ActiveMQAlreadyReplicatingException;
import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
@ -89,6 +90,10 @@ public class SharedNothingLiveActivation extends LiveActivation {
} }
} }
public void freezeReplication() {
replicationManager.getBackupTransportConnection().fail(new ActiveMQDisconnectedException());
}
@Override @Override
public void run() { public void run() {
try { try {
@ -106,6 +111,8 @@ public class SharedNothingLiveActivation extends LiveActivation {
activeMQServer.initialisePart1(false); activeMQServer.initialisePart1(false);
activeMQServer.getClusterManager().getQuorumManager().registerQuorumHandler(new ServerConnectVoteHandler(activeMQServer));
activeMQServer.initialisePart2(false); activeMQServer.initialisePart2(false);
activeMQServer.completeActivation(); activeMQServer.completeActivation();
@ -248,7 +255,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
QuorumManager quorumManager = activeMQServer.getClusterManager().getQuorumManager(); QuorumManager quorumManager = activeMQServer.getClusterManager().getQuorumManager();
int size = replicatedPolicy.getQuorumSize() == -1 ? quorumManager.getMaxClusterSize() : replicatedPolicy.getQuorumSize(); int size = replicatedPolicy.getQuorumSize() == -1 ? quorumManager.getMaxClusterSize() : replicatedPolicy.getQuorumSize();
QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, activeMQServer.getStorageManager()); QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, activeMQServer.getNodeID().toString());
quorumManager.vote(quorumVote); quorumManager.vote(quorumVote);

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.server.impl.SharedNothingLiveActivation;
import org.apache.activemq.artemis.tests.integration.cluster.util.BackupSyncDelay; import org.apache.activemq.artemis.tests.integration.cluster.util.BackupSyncDelay;
import org.junit.Test; import org.junit.Test;
@ -94,6 +95,49 @@ public class QuorumFailOverTest extends StaticClusterWithBackupFailoverTest {
assertFalse("4 should have failed over ", servers[4].getHAPolicy().isBackup()); assertFalse("4 should have failed over ", servers[4].getHAPolicy().isBackup());
} }
@Test
public void testQuorumVotingLiveNotDead() throws Exception {
int[] liveServerIDs = new int[]{0, 1, 2};
setupCluster();
startServers(0, 1, 2);
new BackupSyncDelay(servers[4], servers[1], PacketImpl.REPLICATION_SCHEDULED_FAILOVER);
startServers(3, 4, 5);
for (int i : liveServerIDs) {
waitForTopology(servers[i], 3, 3);
}
waitForFailoverTopology(3, 0, 1, 2);
waitForFailoverTopology(4, 0, 1, 2);
waitForFailoverTopology(5, 0, 1, 2);
for (int i : liveServerIDs) {
setupSessionFactory(i, i + 3, isNetty(), false);
createQueue(i, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
addConsumer(i, i, QUEUE_NAME, null);
}
waitForBindings(0, QUEUES_TESTADDRESS, 1, 1, true);
waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true);
waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true);
send(0, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
final TopologyListener liveTopologyListener = new TopologyListener("LIVE-1");
locators[0].addClusterTopologyListener(liveTopologyListener);
assertTrue("we assume 3 is a backup", servers[3].getHAPolicy().isBackup());
assertFalse("no shared storage", servers[3].getHAPolicy().isSharedStore());
SharedNothingLiveActivation liveActivation = (SharedNothingLiveActivation) servers[0].getActivation();
liveActivation.freezeReplication();
assertFalse(servers[0].isReplicaSync());
waitForRemoteBackupSynchronization(servers[0]);
assertTrue(servers[0].isReplicaSync());
}
@Override @Override
protected boolean isSharedStorage() { protected boolean isSharedStorage() {
return false; return false;

View File

@ -19,9 +19,8 @@ package org.apache.activemq.artemis.tests.integration.cluster.failover;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import org.apache.activemq.artemis.core.server.cluster.qourum.BooleanVote;
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteServerConnect; import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteServerConnect;
import org.apache.activemq.artemis.tests.integration.server.FakeStorageManager; import org.apache.activemq.artemis.core.server.cluster.qourum.ServerConnectVote;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -46,34 +45,34 @@ public class QuorumVoteServerConnectTest extends ActiveMQTestBase {
@Test @Test
public void testSuccessfulVote() { public void testSuccessfulVote() {
QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, new FakeStorageManager()); QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, "foo");
for (int i = 0; i < trueVotes - 1; i++) { for (int i = 0; i < trueVotes - 1; i++) {
quorum.vote(new BooleanVote(true)); quorum.vote(new ServerConnectVote("foo", true));
} }
if (size > 1) { if (size > 1) {
assertFalse(quorum.getDecision()); assertFalse(quorum.getDecision());
} }
quorum = new QuorumVoteServerConnect(size, new FakeStorageManager()); quorum = new QuorumVoteServerConnect(size, "foo");
for (int i = 0; i < trueVotes; i++) { for (int i = 0; i < trueVotes; i++) {
quorum.vote(new BooleanVote(true)); quorum.vote(new ServerConnectVote("foo", true));
} }
assertTrue(quorum.getDecision()); assertTrue(quorum.getDecision());
} }
@Test @Test
public void testUnSuccessfulVote() { public void testUnSuccessfulVote() {
QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, new FakeStorageManager()); QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, "foo");
for (int i = 0; i < trueVotes - 1; i++) { for (int i = 0; i < trueVotes - 1; i++) {
quorum.vote(new BooleanVote(true)); quorum.vote(new ServerConnectVote("foo", true));
} }
if (size > 1) { if (size > 1) {
assertFalse(quorum.getDecision()); assertFalse(quorum.getDecision());
} }
quorum = new QuorumVoteServerConnect(size, new FakeStorageManager()); quorum = new QuorumVoteServerConnect(size, "foo");
for (int i = 0; i < trueVotes - 1; i++) { for (int i = 0; i < trueVotes - 1; i++) {
quorum.vote(new BooleanVote(true)); quorum.vote(new ServerConnectVote("foo", true));
} }
if (size == 1) { if (size == 1) {
assertTrue(quorum.getDecision()); assertTrue(quorum.getDecision());

View File

@ -595,6 +595,12 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
return (Long) proxy.retrieveAttributeValue("GlobalMaxSize", Long.class); return (Long) proxy.retrieveAttributeValue("GlobalMaxSize", Long.class);
} }
@Override
public boolean freezeReplication() {
return false;
}
@Override @Override
public String createAddress(String name, String routingTypes) throws Exception { public String createAddress(String name, String routingTypes) throws Exception {
return (String) proxy.invokeOperation("createAddress", name, routingTypes); return (String) proxy.invokeOperation("createAddress", name, routingTypes);