This commit is contained in:
Justin Bertram 2017-07-18 08:44:32 -05:00
commit 8f500986a0
12 changed files with 217 additions and 25 deletions

View File

@ -439,6 +439,8 @@ public interface ActiveMQServerControl {
long getGlobalMaxSize();
// Operations ----------------------------------------------------
@Operation(desc = "Isolate the broker", impact = MBeanOperationInfo.ACTION)
boolean freezeReplication();
@Operation(desc = "create an address", impact = MBeanOperationInfo.ACTION)
String createAddress(@Parameter(name = "name", desc = "The name of the address") String name,

View File

@ -94,7 +94,9 @@ import org.apache.activemq.artemis.core.server.cluster.ha.LiveOnlyPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.SharedNothingLiveActivation;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
@ -567,6 +569,17 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
}
@Override
public boolean freezeReplication() {
Activation activation = server.getActivation();
if (activation instanceof SharedNothingLiveActivation) {
SharedNothingLiveActivation liveActivation = (SharedNothingLiveActivation) activation;
liveActivation.freezeReplication();
return true;
}
return false;
}
private enum AddressInfoTextFormatter {
Long {
@Override

View File

@ -23,7 +23,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
/**
* a simple yes.no vote
*/
public final class BooleanVote extends Vote<Boolean> {
public class BooleanVote extends Vote<Boolean> {
private boolean vote;

View File

@ -21,15 +21,15 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.Topology;
import org.apache.activemq.artemis.core.persistence.StorageManager;
/**
* A Qourum Vote for deciding if a replicated backup should become live.
*/
public class QuorumVoteServerConnect extends QuorumVote<BooleanVote, Boolean> {
public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boolean> {
private static final SimpleString LIVE_FAILOVER_VOTE = new SimpleString("LIVE_FAILOVER)VOTE");
public static final SimpleString LIVE_FAILOVER_VOTE = new SimpleString("LIVE_FAILOVER_VOTE");
private final CountDownLatch latch;
private final String targetNodeId;
private int votesNeeded;
@ -47,8 +47,9 @@ public class QuorumVoteServerConnect extends QuorumVote<BooleanVote, Boolean> {
* 5 | 4 | 3.5 | 3
* 6 | 5 | 4 | 4
*/
public QuorumVoteServerConnect(int size, StorageManager storageManager) {
public QuorumVoteServerConnect(int size, String targetNodeId) {
super(LIVE_FAILOVER_VOTE);
this.targetNodeId = targetNodeId;
double majority;
if (size <= 2) {
majority = ((double) size) / 2;
@ -71,7 +72,7 @@ public class QuorumVoteServerConnect extends QuorumVote<BooleanVote, Boolean> {
*/
@Override
public Vote connected() {
return new BooleanVote(true);
return new ServerConnectVote(targetNodeId);
}
/**
@ -97,7 +98,7 @@ public class QuorumVoteServerConnect extends QuorumVote<BooleanVote, Boolean> {
* @param vote the vote to make.
*/
@Override
public synchronized void vote(BooleanVote vote) {
public synchronized void vote(ServerConnectVote vote) {
if (decision)
return;
if (vote.getVote()) {
@ -111,7 +112,9 @@ public class QuorumVoteServerConnect extends QuorumVote<BooleanVote, Boolean> {
@Override
public void allVotesCast(Topology voteTopology) {
latch.countDown();
while (latch.getCount() > 0) {
latch.countDown();
}
}
@Override
@ -119,11 +122,6 @@ public class QuorumVoteServerConnect extends QuorumVote<BooleanVote, Boolean> {
return decision;
}
@Override
public SimpleString getName() {
return null;
}
public void await(int latchTimeout, TimeUnit unit) throws InterruptedException {
latch.await(latchTimeout, unit);
}

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.cluster.qourum;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import java.util.Map;
public class ServerConnectVote extends BooleanVote {
private String nodeId;
public ServerConnectVote(String nodeId) {
super(false);
this.nodeId = nodeId;
}
public ServerConnectVote() {
super(false);
}
public ServerConnectVote(String nodeid, boolean isLive) {
super(isLive);
this.nodeId = nodeid;
}
@Override
public boolean isRequestServerVote() {
return true;
}
@Override
public Map<String, Object> getVoteMap() {
return null;
}
@Override
public void encode(ActiveMQBuffer buff) {
super.encode(buff);
buff.writeString(nodeId);
}
@Override
public void decode(ActiveMQBuffer buff) {
super.decode(buff);
nodeId = buff.readString();
}
public String getNodeId() {
return nodeId;
}
}

View File

@ -267,7 +267,7 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
private boolean isLiveDown() {
int size = quorumSize == -1 ? quorumManager.getMaxClusterSize() : quorumSize;
QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, storageManager);
QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, targetServerID);
quorumManager.vote(quorumVote);

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteHandler;
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteServerConnect;
import org.apache.activemq.artemis.core.server.cluster.qourum.ServerConnectVote;
import org.apache.activemq.artemis.core.server.cluster.qourum.Vote;
public class ServerConnectVoteHandler implements QuorumVoteHandler {
private final ActiveMQServerImpl server;
public ServerConnectVoteHandler(ActiveMQServerImpl server) {
this.server = server;
}
@Override
public Vote vote(Vote vote) {
ServerConnectVote serverConnectVote = (ServerConnectVote) vote;
String nodeid = serverConnectVote.getNodeId();
TopologyMemberImpl member = server.getClusterManager().getDefaultConnection(null).getTopology().getMember(nodeid);
if (member != null && member.getLive() != null) {
return new ServerConnectVote(nodeid, false);
}
return new ServerConnectVote(nodeid, true);
}
@Override
public SimpleString getQuorumName() {
return QuorumVoteServerConnect.LIVE_FAILOVER_VOTE;
}
@Override
public Vote decode(ActiveMQBuffer voteBuffer) {
ServerConnectVote vote = new ServerConnectVote();
vote.decode(voteBuffer);
return vote;
}
}

View File

@ -133,6 +133,7 @@ public final class SharedNothingBackupActivation extends Activation {
return;
backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize());
activeMQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum);
activeMQServer.getClusterManager().getQuorumManager().registerQuorumHandler(new ServerConnectVoteHandler(activeMQServer));
}
//use a Node Locator to connect to the cluster

View File

@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQAlreadyReplicatingException;
import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
@ -89,6 +90,10 @@ public class SharedNothingLiveActivation extends LiveActivation {
}
}
public void freezeReplication() {
replicationManager.getBackupTransportConnection().fail(new ActiveMQDisconnectedException());
}
@Override
public void run() {
try {
@ -106,6 +111,8 @@ public class SharedNothingLiveActivation extends LiveActivation {
activeMQServer.initialisePart1(false);
activeMQServer.getClusterManager().getQuorumManager().registerQuorumHandler(new ServerConnectVoteHandler(activeMQServer));
activeMQServer.initialisePart2(false);
activeMQServer.completeActivation();
@ -248,7 +255,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
QuorumManager quorumManager = activeMQServer.getClusterManager().getQuorumManager();
int size = replicatedPolicy.getQuorumSize() == -1 ? quorumManager.getMaxClusterSize() : replicatedPolicy.getQuorumSize();
QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, activeMQServer.getStorageManager());
QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, activeMQServer.getNodeID().toString());
quorumManager.vote(quorumVote);

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.server.impl.SharedNothingLiveActivation;
import org.apache.activemq.artemis.tests.integration.cluster.util.BackupSyncDelay;
import org.junit.Test;
@ -94,6 +95,49 @@ public class QuorumFailOverTest extends StaticClusterWithBackupFailoverTest {
assertFalse("4 should have failed over ", servers[4].getHAPolicy().isBackup());
}
@Test
public void testQuorumVotingLiveNotDead() throws Exception {
int[] liveServerIDs = new int[]{0, 1, 2};
setupCluster();
startServers(0, 1, 2);
new BackupSyncDelay(servers[4], servers[1], PacketImpl.REPLICATION_SCHEDULED_FAILOVER);
startServers(3, 4, 5);
for (int i : liveServerIDs) {
waitForTopology(servers[i], 3, 3);
}
waitForFailoverTopology(3, 0, 1, 2);
waitForFailoverTopology(4, 0, 1, 2);
waitForFailoverTopology(5, 0, 1, 2);
for (int i : liveServerIDs) {
setupSessionFactory(i, i + 3, isNetty(), false);
createQueue(i, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
addConsumer(i, i, QUEUE_NAME, null);
}
waitForBindings(0, QUEUES_TESTADDRESS, 1, 1, true);
waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true);
waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true);
send(0, QUEUES_TESTADDRESS, 10, false, null);
verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
final TopologyListener liveTopologyListener = new TopologyListener("LIVE-1");
locators[0].addClusterTopologyListener(liveTopologyListener);
assertTrue("we assume 3 is a backup", servers[3].getHAPolicy().isBackup());
assertFalse("no shared storage", servers[3].getHAPolicy().isSharedStore());
SharedNothingLiveActivation liveActivation = (SharedNothingLiveActivation) servers[0].getActivation();
liveActivation.freezeReplication();
assertFalse(servers[0].isReplicaSync());
waitForRemoteBackupSynchronization(servers[0]);
assertTrue(servers[0].isReplicaSync());
}
@Override
protected boolean isSharedStorage() {
return false;

View File

@ -19,9 +19,8 @@ package org.apache.activemq.artemis.tests.integration.cluster.failover;
import java.util.Arrays;
import java.util.Collection;
import org.apache.activemq.artemis.core.server.cluster.qourum.BooleanVote;
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteServerConnect;
import org.apache.activemq.artemis.tests.integration.server.FakeStorageManager;
import org.apache.activemq.artemis.core.server.cluster.qourum.ServerConnectVote;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -46,34 +45,34 @@ public class QuorumVoteServerConnectTest extends ActiveMQTestBase {
@Test
public void testSuccessfulVote() {
QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, new FakeStorageManager());
QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, "foo");
for (int i = 0; i < trueVotes - 1; i++) {
quorum.vote(new BooleanVote(true));
quorum.vote(new ServerConnectVote("foo", true));
}
if (size > 1) {
assertFalse(quorum.getDecision());
}
quorum = new QuorumVoteServerConnect(size, new FakeStorageManager());
quorum = new QuorumVoteServerConnect(size, "foo");
for (int i = 0; i < trueVotes; i++) {
quorum.vote(new BooleanVote(true));
quorum.vote(new ServerConnectVote("foo", true));
}
assertTrue(quorum.getDecision());
}
@Test
public void testUnSuccessfulVote() {
QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, new FakeStorageManager());
QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, "foo");
for (int i = 0; i < trueVotes - 1; i++) {
quorum.vote(new BooleanVote(true));
quorum.vote(new ServerConnectVote("foo", true));
}
if (size > 1) {
assertFalse(quorum.getDecision());
}
quorum = new QuorumVoteServerConnect(size, new FakeStorageManager());
quorum = new QuorumVoteServerConnect(size, "foo");
for (int i = 0; i < trueVotes - 1; i++) {
quorum.vote(new BooleanVote(true));
quorum.vote(new ServerConnectVote("foo", true));
}
if (size == 1) {
assertTrue(quorum.getDecision());

View File

@ -595,6 +595,12 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
return (Long) proxy.retrieveAttributeValue("GlobalMaxSize", Long.class);
}
@Override
public boolean freezeReplication() {
return false;
}
@Override
public String createAddress(String name, String routingTypes) throws Exception {
return (String) proxy.invokeOperation("createAddress", name, routingTypes);