diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java index 3192609e7a..d39e47028c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java @@ -439,6 +439,8 @@ public interface ActiveMQServerControl { long getGlobalMaxSize(); // Operations ---------------------------------------------------- + @Operation(desc = "Isolate the broker", impact = MBeanOperationInfo.ACTION) + boolean freezeReplication(); @Operation(desc = "create an address", impact = MBeanOperationInfo.ACTION) String createAddress(@Parameter(name = "name", desc = "The name of the address") String name, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 069ebf0023..9419cbb9f5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -94,7 +94,9 @@ import org.apache.activemq.artemis.core.server.cluster.ha.LiveOnlyPolicy; import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy; import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy; import org.apache.activemq.artemis.core.server.group.GroupingHandler; +import org.apache.activemq.artemis.core.server.impl.Activation; import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.impl.SharedNothingLiveActivation; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; @@ -567,6 +569,17 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active } } + @Override + public boolean freezeReplication() { + Activation activation = server.getActivation(); + if (activation instanceof SharedNothingLiveActivation) { + SharedNothingLiveActivation liveActivation = (SharedNothingLiveActivation) activation; + liveActivation.freezeReplication(); + return true; + } + return false; + } + private enum AddressInfoTextFormatter { Long { @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/BooleanVote.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/BooleanVote.java index cbc70e7073..5d615660c1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/BooleanVote.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/BooleanVote.java @@ -23,7 +23,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; /** * a simple yes.no vote */ -public final class BooleanVote extends Vote { +public class BooleanVote extends Vote { private boolean vote; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java index 582774a934..a189155dba 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java @@ -21,15 +21,15 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.impl.Topology; -import org.apache.activemq.artemis.core.persistence.StorageManager; /** * A Qourum Vote for deciding if a replicated backup should become live. */ -public class QuorumVoteServerConnect extends QuorumVote { +public class QuorumVoteServerConnect extends QuorumVote { - private static final SimpleString LIVE_FAILOVER_VOTE = new SimpleString("LIVE_FAILOVER)VOTE"); + public static final SimpleString LIVE_FAILOVER_VOTE = new SimpleString("LIVE_FAILOVER_VOTE"); private final CountDownLatch latch; + private final String targetNodeId; private int votesNeeded; @@ -47,8 +47,9 @@ public class QuorumVoteServerConnect extends QuorumVote { * 5 | 4 | 3.5 | 3 * 6 | 5 | 4 | 4 */ - public QuorumVoteServerConnect(int size, StorageManager storageManager) { + public QuorumVoteServerConnect(int size, String targetNodeId) { super(LIVE_FAILOVER_VOTE); + this.targetNodeId = targetNodeId; double majority; if (size <= 2) { majority = ((double) size) / 2; @@ -71,7 +72,7 @@ public class QuorumVoteServerConnect extends QuorumVote { */ @Override public Vote connected() { - return new BooleanVote(true); + return new ServerConnectVote(targetNodeId); } /** @@ -97,7 +98,7 @@ public class QuorumVoteServerConnect extends QuorumVote { * @param vote the vote to make. */ @Override - public synchronized void vote(BooleanVote vote) { + public synchronized void vote(ServerConnectVote vote) { if (decision) return; if (vote.getVote()) { @@ -111,7 +112,9 @@ public class QuorumVoteServerConnect extends QuorumVote { @Override public void allVotesCast(Topology voteTopology) { - latch.countDown(); + while (latch.getCount() > 0) { + latch.countDown(); + } } @Override @@ -119,11 +122,6 @@ public class QuorumVoteServerConnect extends QuorumVote { return decision; } - @Override - public SimpleString getName() { - return null; - } - public void await(int latchTimeout, TimeUnit unit) throws InterruptedException { latch.await(latchTimeout, unit); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/ServerConnectVote.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/ServerConnectVote.java new file mode 100644 index 0000000000..a6e472fb67 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/ServerConnectVote.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.cluster.qourum; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; + +import java.util.Map; + +public class ServerConnectVote extends BooleanVote { + + private String nodeId; + + public ServerConnectVote(String nodeId) { + super(false); + this.nodeId = nodeId; + } + + public ServerConnectVote() { + super(false); + } + + public ServerConnectVote(String nodeid, boolean isLive) { + super(isLive); + this.nodeId = nodeid; + } + + @Override + public boolean isRequestServerVote() { + return true; + } + + @Override + public Map getVoteMap() { + return null; + } + + @Override + public void encode(ActiveMQBuffer buff) { + super.encode(buff); + buff.writeString(nodeId); + } + + @Override + public void decode(ActiveMQBuffer buff) { + super.decode(buff); + nodeId = buff.readString(); + } + + public String getNodeId() { + return nodeId; + } +} \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java index 81b70ad312..d7bd27ebbd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java @@ -267,7 +267,7 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener private boolean isLiveDown() { int size = quorumSize == -1 ? quorumManager.getMaxClusterSize() : quorumSize; - QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, storageManager); + QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, targetServerID); quorumManager.vote(quorumVote); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConnectVoteHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConnectVoteHandler.java new file mode 100644 index 0000000000..84603573ed --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConnectVoteHandler.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl; +import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteHandler; +import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteServerConnect; +import org.apache.activemq.artemis.core.server.cluster.qourum.ServerConnectVote; +import org.apache.activemq.artemis.core.server.cluster.qourum.Vote; + +public class ServerConnectVoteHandler implements QuorumVoteHandler { + private final ActiveMQServerImpl server; + + public ServerConnectVoteHandler(ActiveMQServerImpl server) { + this.server = server; + } + + @Override + public Vote vote(Vote vote) { + ServerConnectVote serverConnectVote = (ServerConnectVote) vote; + String nodeid = serverConnectVote.getNodeId(); + TopologyMemberImpl member = server.getClusterManager().getDefaultConnection(null).getTopology().getMember(nodeid); + if (member != null && member.getLive() != null) { + return new ServerConnectVote(nodeid, false); + } + return new ServerConnectVote(nodeid, true); + } + + @Override + public SimpleString getQuorumName() { + return QuorumVoteServerConnect.LIVE_FAILOVER_VOTE; + } + + @Override + public Vote decode(ActiveMQBuffer voteBuffer) { + ServerConnectVote vote = new ServerConnectVote(); + vote.decode(voteBuffer); + return vote; + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java index fcba00c8d9..06a3afba1b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java @@ -133,6 +133,7 @@ public final class SharedNothingBackupActivation extends Activation { return; backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize()); activeMQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum); + activeMQServer.getClusterManager().getQuorumManager().registerQuorumHandler(new ServerConnectVoteHandler(activeMQServer)); } //use a Node Locator to connect to the cluster diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java index ce67e21daa..355cefb73b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java @@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQAlreadyReplicatingException; +import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; @@ -89,6 +90,10 @@ public class SharedNothingLiveActivation extends LiveActivation { } } + public void freezeReplication() { + replicationManager.getBackupTransportConnection().fail(new ActiveMQDisconnectedException()); + } + @Override public void run() { try { @@ -106,6 +111,8 @@ public class SharedNothingLiveActivation extends LiveActivation { activeMQServer.initialisePart1(false); + activeMQServer.getClusterManager().getQuorumManager().registerQuorumHandler(new ServerConnectVoteHandler(activeMQServer)); + activeMQServer.initialisePart2(false); activeMQServer.completeActivation(); @@ -248,7 +255,7 @@ public class SharedNothingLiveActivation extends LiveActivation { QuorumManager quorumManager = activeMQServer.getClusterManager().getQuorumManager(); int size = replicatedPolicy.getQuorumSize() == -1 ? quorumManager.getMaxClusterSize() : replicatedPolicy.getQuorumSize(); - QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, activeMQServer.getStorageManager()); + QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, activeMQServer.getNodeID().toString()); quorumManager.vote(quorumVote); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumFailOverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumFailOverTest.java index 6043f888ca..ece21a320e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumFailOverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumFailOverTest.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.client.TopologyMember; import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.server.impl.SharedNothingLiveActivation; import org.apache.activemq.artemis.tests.integration.cluster.util.BackupSyncDelay; import org.junit.Test; @@ -94,6 +95,49 @@ public class QuorumFailOverTest extends StaticClusterWithBackupFailoverTest { assertFalse("4 should have failed over ", servers[4].getHAPolicy().isBackup()); } + @Test + public void testQuorumVotingLiveNotDead() throws Exception { + int[] liveServerIDs = new int[]{0, 1, 2}; + setupCluster(); + startServers(0, 1, 2); + new BackupSyncDelay(servers[4], servers[1], PacketImpl.REPLICATION_SCHEDULED_FAILOVER); + startServers(3, 4, 5); + + for (int i : liveServerIDs) { + waitForTopology(servers[i], 3, 3); + } + + waitForFailoverTopology(3, 0, 1, 2); + waitForFailoverTopology(4, 0, 1, 2); + waitForFailoverTopology(5, 0, 1, 2); + + for (int i : liveServerIDs) { + setupSessionFactory(i, i + 3, isNetty(), false); + createQueue(i, QUEUES_TESTADDRESS, QUEUE_NAME, null, true); + addConsumer(i, i, QUEUE_NAME, null); + } + + waitForBindings(0, QUEUES_TESTADDRESS, 1, 1, true); + waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true); + waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true); + + send(0, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + final TopologyListener liveTopologyListener = new TopologyListener("LIVE-1"); + + locators[0].addClusterTopologyListener(liveTopologyListener); + + assertTrue("we assume 3 is a backup", servers[3].getHAPolicy().isBackup()); + assertFalse("no shared storage", servers[3].getHAPolicy().isSharedStore()); + + SharedNothingLiveActivation liveActivation = (SharedNothingLiveActivation) servers[0].getActivation(); + liveActivation.freezeReplication(); + assertFalse(servers[0].isReplicaSync()); + waitForRemoteBackupSynchronization(servers[0]); + assertTrue(servers[0].isReplicaSync()); + } + @Override protected boolean isSharedStorage() { return false; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumVoteServerConnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumVoteServerConnectTest.java index 633ececffa..9c4b4f3a1f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumVoteServerConnectTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumVoteServerConnectTest.java @@ -19,9 +19,8 @@ package org.apache.activemq.artemis.tests.integration.cluster.failover; import java.util.Arrays; import java.util.Collection; -import org.apache.activemq.artemis.core.server.cluster.qourum.BooleanVote; import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteServerConnect; -import org.apache.activemq.artemis.tests.integration.server.FakeStorageManager; +import org.apache.activemq.artemis.core.server.cluster.qourum.ServerConnectVote; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.junit.Test; import org.junit.runner.RunWith; @@ -46,34 +45,34 @@ public class QuorumVoteServerConnectTest extends ActiveMQTestBase { @Test public void testSuccessfulVote() { - QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, new FakeStorageManager()); + QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, "foo"); for (int i = 0; i < trueVotes - 1; i++) { - quorum.vote(new BooleanVote(true)); + quorum.vote(new ServerConnectVote("foo", true)); } if (size > 1) { assertFalse(quorum.getDecision()); } - quorum = new QuorumVoteServerConnect(size, new FakeStorageManager()); + quorum = new QuorumVoteServerConnect(size, "foo"); for (int i = 0; i < trueVotes; i++) { - quorum.vote(new BooleanVote(true)); + quorum.vote(new ServerConnectVote("foo", true)); } assertTrue(quorum.getDecision()); } @Test public void testUnSuccessfulVote() { - QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, new FakeStorageManager()); + QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, "foo"); for (int i = 0; i < trueVotes - 1; i++) { - quorum.vote(new BooleanVote(true)); + quorum.vote(new ServerConnectVote("foo", true)); } if (size > 1) { assertFalse(quorum.getDecision()); } - quorum = new QuorumVoteServerConnect(size, new FakeStorageManager()); + quorum = new QuorumVoteServerConnect(size, "foo"); for (int i = 0; i < trueVotes - 1; i++) { - quorum.vote(new BooleanVote(true)); + quorum.vote(new ServerConnectVote("foo", true)); } if (size == 1) { assertTrue(quorum.getDecision()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java index cd8db34c24..cbe3ce5bc6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java @@ -595,6 +595,12 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes return (Long) proxy.retrieveAttributeValue("GlobalMaxSize", Long.class); } + @Override + public boolean freezeReplication() { + + return false; + } + @Override public String createAddress(String name, String routingTypes) throws Exception { return (String) proxy.invokeOperation("createAddress", name, routingTypes);