diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index a2eb1ea76c..70ee3ec794 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1998,4 +1998,8 @@ public interface ActiveMQServerLogger extends BasicLogger { @LogMessage(level = Logger.Level.ERROR) @Message(id = 224097, value = "Failed to start server", format = Message.Format.MESSAGE_FORMAT) void failedToStartServer(@Cause Throwable t); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 224098, value = "Received a vote saying the backup is live with connector: {0}", format = Message.Format.MESSAGE_FORMAT) + void qourumBackupIsLive(String liveConnector); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java index dcc1892c47..32fea40ff8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java @@ -31,6 +31,7 @@ public class QuorumVoteServerConnect extends QuorumVote= votesNeeded) { + decision = true; + }//do the opposite, if it says there is a node connected it means the backup has come live + } else if (requestToStayLive && vote.getVote()) { + total++; + latch.countDown(); + if (liveConnector != null && !liveConnector.equals(vote.getTransportConfiguration())) { + ActiveMQServerLogger.LOGGER.qourumBackupIsLive(liveConnector); + return; + } if (total >= votesNeeded) { decision = true; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/ServerConnectVote.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/ServerConnectVote.java index 9f108e0a95..8a54fc3724 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/ServerConnectVote.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/ServerConnectVote.java @@ -23,6 +23,7 @@ import java.util.Map; public class ServerConnectVote extends BooleanVote { private String nodeId; + private String transportConfiguration; public ServerConnectVote(String nodeId) { super(false); @@ -33,9 +34,10 @@ public class ServerConnectVote extends BooleanVote { super(false); } - public ServerConnectVote(String nodeid, boolean isLive) { + public ServerConnectVote(String nodeid, boolean isLive, String transportConfiguration) { super(isLive); this.nodeId = nodeid; + this.transportConfiguration = transportConfiguration; } @Override @@ -52,12 +54,18 @@ public class ServerConnectVote extends BooleanVote { public void encode(ActiveMQBuffer buff) { super.encode(buff); buff.writeString(nodeId); + buff.writeNullableString(transportConfiguration); + } + + public String getTransportConfiguration() { + return transportConfiguration; } @Override public void decode(ActiveMQBuffer buff) { super.decode(buff); nodeId = buff.readString(); + transportConfiguration = buff.readNullableString(); } public String getNodeId() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConnectVoteHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConnectVoteHandler.java index 3fdfaa5a44..7c573bb12a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConnectVoteHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConnectVoteHandler.java @@ -36,14 +36,18 @@ public class ServerConnectVoteHandler implements QuorumVoteHandler { public Vote vote(Vote vote) { ServerConnectVote serverConnectVote = (ServerConnectVote) vote; String nodeid = serverConnectVote.getNodeId(); - TopologyMemberImpl member = server.getClusterManager().getDefaultConnection(null).getTopology().getMember(nodeid); + try { + TopologyMemberImpl member = server.getClusterManager().getDefaultConnection(null).getTopology().getMember(nodeid); - if (member != null && member.getLive() != null) { - ActiveMQServerLogger.LOGGER.nodeFoundInClusterTopology(nodeid); - return new ServerConnectVote(nodeid, (Boolean) vote.getVote()); + if (member != null && member.getLive() != null) { + ActiveMQServerLogger.LOGGER.nodeFoundInClusterTopology(nodeid); + return new ServerConnectVote(nodeid, (Boolean) vote.getVote(), member.getLive().toString()); + } + ActiveMQServerLogger.LOGGER.nodeNotFoundInClusterTopology(nodeid); + } catch (Exception e) { + e.printStackTrace(); } - ActiveMQServerLogger.LOGGER.nodeNotFoundInClusterTopology(nodeid); - return new ServerConnectVote(nodeid, !((Boolean) vote.getVote())); + return new ServerConnectVote(nodeid, !((Boolean) vote.getVote()), null); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java index a4d8db23ac..200d167a89 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java @@ -254,8 +254,15 @@ public class SharedNothingLiveActivation extends LiveActivation { if (failed && replicatedPolicy.isVoteOnReplicationFailure()) { QuorumManager quorumManager = activeMQServer.getClusterManager().getQuorumManager(); int size = replicatedPolicy.getQuorumSize() == -1 ? quorumManager.getMaxClusterSize() : replicatedPolicy.getQuorumSize(); - - QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, activeMQServer.getNodeID().toString(), true); + String liveConnector = null; + List clusterConfigurations = activeMQServer.getConfiguration().getClusterConfigurations(); + if (clusterConfigurations != null && clusterConfigurations.size() > 0) { + ClusterConnectionConfiguration clusterConnectionConfiguration = clusterConfigurations.get(0); + String connectorName = clusterConnectionConfiguration.getConnectorName(); + TransportConfiguration transportConfiguration = activeMQServer.getConfiguration().getConnectorConfigurations().get(connectorName); + liveConnector = transportConfiguration != null ? transportConfiguration.toString() : null; + } + QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, activeMQServer.getNodeID().toString(), true, liveConnector); quorumManager.vote(quorumVote); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumFailOverLiveVotesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumFailOverLiveVotesTest.java new file mode 100644 index 0000000000..ded54d2abb --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumFailOverLiveVotesTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.failover; + +import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener; +import org.apache.activemq.artemis.api.core.client.TopologyMember; +import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.server.impl.SharedNothingLiveActivation; +import org.apache.activemq.artemis.tests.integration.cluster.util.BackupSyncDelay; +import org.junit.Test; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +public class QuorumFailOverLiveVotesTest extends StaticClusterWithBackupFailoverTest { + @Override + protected void setupServers() throws Exception { + super.setupServers(); + //we need to know who is connected to who + ((ReplicatedPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration()).setGroupName("group0"); + ((ReplicatedPolicyConfiguration) servers[1].getConfiguration().getHAPolicyConfiguration()).setGroupName("group1"); + ((ReplicatedPolicyConfiguration) servers[2].getConfiguration().getHAPolicyConfiguration()).setGroupName("group2"); + ((ReplicaPolicyConfiguration) servers[3].getConfiguration().getHAPolicyConfiguration()).setGroupName("group0"); + ((ReplicaPolicyConfiguration) servers[4].getConfiguration().getHAPolicyConfiguration()).setGroupName("group1"); + ((ReplicaPolicyConfiguration) servers[5].getConfiguration().getHAPolicyConfiguration()).setGroupName("group2"); + + //reduce the numbers so that the vote finishes faster + ((ReplicaPolicyConfiguration) servers[3].getConfiguration().getHAPolicyConfiguration()).setVoteRetries(5); + ((ReplicaPolicyConfiguration) servers[3].getConfiguration().getHAPolicyConfiguration()).setVoteRetryWait(500); + ((ReplicatedPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration()).setVoteOnReplicationFailure(true); + ((ReplicatedPolicyConfiguration) servers[1].getConfiguration().getHAPolicyConfiguration()).setVoteOnReplicationFailure(true); + ((ReplicatedPolicyConfiguration) servers[2].getConfiguration().getHAPolicyConfiguration()).setVoteOnReplicationFailure(true); + + } + + + @Test + public void testQuorumVotingLiveNotDead() throws Exception { + int[] liveServerIDs = new int[]{0, 1, 2}; + setupCluster(); + startServers(0, 1, 2); + new BackupSyncDelay(servers[4], servers[1], PacketImpl.REPLICATION_SCHEDULED_FAILOVER); + startServers(3, 4, 5); + + for (int i : liveServerIDs) { + waitForTopology(servers[i], 3, 3); + } + + waitForFailoverTopology(3, 0, 1, 2); + waitForFailoverTopology(4, 0, 1, 2); + waitForFailoverTopology(5, 0, 1, 2); + + for (int i : liveServerIDs) { + setupSessionFactory(i, i + 3, isNetty(), false); + createQueue(i, QUEUES_TESTADDRESS, QUEUE_NAME, null, true); + addConsumer(i, i, QUEUE_NAME, null); + } + + waitForBindings(0, QUEUES_TESTADDRESS, 1, 1, true); + waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true); + waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true); + + send(0, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + final QuorumFailOverLiveVotesTest.TopologyListener liveTopologyListener = new QuorumFailOverLiveVotesTest.TopologyListener("LIVE-1"); + + locators[0].addClusterTopologyListener(liveTopologyListener); + + assertTrue("we assume 3 is a backup", servers[3].getHAPolicy().isBackup()); + assertFalse("no shared storage", servers[3].getHAPolicy().isSharedStore()); + + SharedNothingLiveActivation liveActivation = (SharedNothingLiveActivation) servers[0].getActivation(); + // ; + servers[0].getRemotingService().freeze(null, null); + waitForFailoverTopology(4, 3, 1, 2); + waitForFailoverTopology(5, 3, 1, 2); + + assertTrue(servers[3].waitForActivation(2, TimeUnit.SECONDS)); + liveActivation.freezeReplication(); + waitForServerToStop(servers[0]); + } + + @Override + protected boolean isSharedStorage() { + return false; + } + + private static class TopologyListener implements ClusterTopologyListener { + + final String prefix; + final Map> nodes = new ConcurrentHashMap<>(); + + private TopologyListener(String string) { + prefix = string; + } + + @Override + public void nodeUP(TopologyMember topologyMember, boolean last) { + Pair connectorPair = new Pair<>(topologyMember.getLive(), topologyMember.getBackup()); + nodes.put(topologyMember.getBackupGroupName(), connectorPair); + } + + @Override + public void nodeDown(long eventUID, String nodeID) { + nodes.remove(nodeID); + } + + @Override + public String toString() { + return "TopologyListener(" + prefix + ", #=" + nodes.size() + ")"; + } + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumVoteServerConnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumVoteServerConnectTest.java index 9c4b4f3a1f..0d11bcf5ee 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumVoteServerConnectTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumVoteServerConnectTest.java @@ -47,7 +47,7 @@ public class QuorumVoteServerConnectTest extends ActiveMQTestBase { public void testSuccessfulVote() { QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, "foo"); for (int i = 0; i < trueVotes - 1; i++) { - quorum.vote(new ServerConnectVote("foo", true)); + quorum.vote(new ServerConnectVote("foo", true, null)); } if (size > 1) { @@ -55,7 +55,7 @@ public class QuorumVoteServerConnectTest extends ActiveMQTestBase { } quorum = new QuorumVoteServerConnect(size, "foo"); for (int i = 0; i < trueVotes; i++) { - quorum.vote(new ServerConnectVote("foo", true)); + quorum.vote(new ServerConnectVote("foo", true, null)); } assertTrue(quorum.getDecision()); } @@ -64,7 +64,7 @@ public class QuorumVoteServerConnectTest extends ActiveMQTestBase { public void testUnSuccessfulVote() { QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, "foo"); for (int i = 0; i < trueVotes - 1; i++) { - quorum.vote(new ServerConnectVote("foo", true)); + quorum.vote(new ServerConnectVote("foo", true, null)); } if (size > 1) { @@ -72,7 +72,7 @@ public class QuorumVoteServerConnectTest extends ActiveMQTestBase { } quorum = new QuorumVoteServerConnect(size, "foo"); for (int i = 0; i < trueVotes - 1; i++) { - quorum.vote(new ServerConnectVote("foo", true)); + quorum.vote(new ServerConnectVote("foo", true, null)); } if (size == 1) { assertTrue(quorum.getDecision());