diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java index 07f0fc283c..6e263ce3d7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.server.cluster; +import java.util.Optional; + import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; @@ -32,14 +34,10 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupResp import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterConnectMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterConnectReplyMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NodeAnnounceMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteReplyMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteHandler; -import org.apache.activemq.artemis.core.server.cluster.qourum.Vote; /** * handles the communication between a cluster node and the cluster, either the whole cluster or a specific node in the @@ -64,6 +62,10 @@ public class ClusterControl implements AutoCloseable { this.clusterPassword = server.getConfiguration().getClusterPassword(); } + public Optional getClusterChannel() { + return Optional.ofNullable(clusterChannel); + } + /** * authorise this cluster control so it can communicate with the cluster, it will set the cluster channel on a successful * authentication. @@ -157,20 +159,6 @@ public class ClusterControl implements AutoCloseable { sessionFactory.close(); } - public Vote sendQuorumVote(SimpleString handler, Vote vote) { - try { - ActiveMQServerLogger.LOGGER.sendingQuorumVoteRequest(getSessionFactory().getConnection().getRemoteAddress(), vote.toString()); - QuorumVoteReplyMessage replyMessage = (QuorumVoteReplyMessage) clusterChannel.sendBlocking(new QuorumVoteMessage(handler, vote), PacketImpl.QUORUM_VOTE_REPLY); - QuorumVoteHandler voteHandler = server.getClusterManager().getQuorumManager().getVoteHandler(replyMessage.getHandler()); - replyMessage.decodeRest(voteHandler); - Vote voteResponse = replyMessage.getVote(); - ActiveMQServerLogger.LOGGER.receivedQuorumVoteResponse(getSessionFactory().getConnection().getRemoteAddress(), voteResponse.toString()); - return voteResponse; - } catch (ActiveMQException e) { - return null; - } - } - public boolean requestReplicatedBackup(int backupSize, SimpleString nodeID) { BackupRequestMessage backupRequestMessage = new BackupRequestMessage(backupSize, nodeID); return requestBackup(backupRequestMessage); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java index d6d948e88b..fa61c9f7e9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java @@ -45,15 +45,11 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterConnectMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterConnectReplyMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NodeAnnounceMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteReplyMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager; -import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteHandler; -import org.apache.activemq.artemis.core.server.cluster.qourum.Vote; import org.apache.activemq.artemis.core.server.impl.Activation; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.jboss.logging.Logger; @@ -396,17 +392,7 @@ public class ClusterController implements ActiveMQComponent { logger.debug("there is no acceptor used configured at the CoreProtocolManager " + this); } } else if (packet.getType() == PacketImpl.QUORUM_VOTE) { - QuorumVoteMessage quorumVoteMessage = (QuorumVoteMessage) packet; - QuorumVoteHandler voteHandler = quorumManager.getVoteHandler(quorumVoteMessage.getHandler()); - if (voteHandler == null) { - ActiveMQServerLogger.LOGGER.noVoteHandlerConfigured(); - return; - } - quorumVoteMessage.decode(voteHandler); - ActiveMQServerLogger.LOGGER.receivedQuorumVoteRequest(quorumVoteMessage.getVote().toString()); - Vote vote = quorumManager.vote(quorumVoteMessage.getHandler(), quorumVoteMessage.getVote()); - ActiveMQServerLogger.LOGGER.sendingQuorumVoteResponse(vote.toString()); - clusterChannel.send(new QuorumVoteReplyMessage(quorumVoteMessage.getHandler(), vote)); + quorumManager.handleQuorumVote(clusterChannel, packet); } else if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT) { ScaleDownAnnounceMessage message = (ScaleDownAnnounceMessage) packet; //we don't really need to check as it should always be true diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumManager.java index 9b8b64718a..7fc5a525b2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumManager.java @@ -20,14 +20,23 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener; import org.apache.activemq.artemis.api.core.client.TopologyMember; +import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl; +import org.apache.activemq.artemis.core.protocol.core.Channel; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteReplyMessage; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.cluster.ClusterControl; @@ -162,6 +171,48 @@ public final class QuorumManager implements ClusterTopologyListener, ActiveMQCom } } + public boolean hasLive(String nodeID, int quorumSize, int voteTimeout, TimeUnit voteTimeoutUnit) { + Objects.requireNonNull(nodeID, "nodeID"); + if (!started) { + throw new IllegalStateException("QuorumManager must start first"); + } + int size = quorumSize == -1 ? maxClusterSize : quorumSize; + QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, nodeID); + // A positive decision means that there is no live with nodeID + boolean noLive = awaitVoteComplete(quorumVote, voteTimeout, voteTimeoutUnit); + return !noLive; + } + + public boolean isStillLive(String nodeID, + TransportConfiguration liveConnector, + int quorumSize, + int voteTimeout, + TimeUnit voteTimeoutUnit) { + Objects.requireNonNull(nodeID, "nodeID"); + Objects.requireNonNull(nodeID, "liveConnector"); + if (!started) { + throw new IllegalStateException("QuorumManager must start first"); + } + int size = quorumSize == -1 ? maxClusterSize : quorumSize; + QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, nodeID, true, liveConnector.toString()); + return awaitVoteComplete(quorumVote, voteTimeout, voteTimeoutUnit); + } + + private boolean awaitVoteComplete(QuorumVoteServerConnect quorumVote, int voteTimeout, TimeUnit voteTimeoutUnit) { + vote(quorumVote); + + try { + quorumVote.await(voteTimeout, voteTimeoutUnit); + } catch (InterruptedException interruption) { + // No-op. The best the quorum can do now is to return the latest number it has + ActiveMQServerLogger.LOGGER.quorumVoteAwaitInterrupted(); + } + + voteComplete(quorumVote); + + return quorumVote.getDecision(); + } + /** * returns the maximum size this cluster has been. * @@ -214,7 +265,7 @@ public final class QuorumManager implements ClusterTopologyListener, ActiveMQCom * @param vote the vote * @return the updated vote */ - public Vote vote(SimpleString handler, Vote vote) { + private Vote vote(SimpleString handler, Vote vote) { QuorumVoteHandler quorumVoteHandler = handlers.get(handler); return quorumVoteHandler.vote(vote); } @@ -225,7 +276,7 @@ public final class QuorumManager implements ClusterTopologyListener, ActiveMQCom * * @param quorumVote the vote */ - public void voteComplete(QuorumVoteServerConnect quorumVote) { + private void voteComplete(QuorumVoteServerConnect quorumVote) { VoteRunnableHolder holder = voteRunnables.remove(quorumVote); if (holder != null) { for (VoteRunnable runnable : holder.runnables) { @@ -248,26 +299,24 @@ public final class QuorumManager implements ClusterTopologyListener, ActiveMQCom return QuorumManager.class.getSimpleName() + "(server=" + clusterController.getIdentity() + ")"; } - public QuorumVoteHandler getVoteHandler(SimpleString handler) { + private QuorumVoteHandler getVoteHandler(SimpleString handler) { return handlers.get(handler); } - public TransportConfiguration getLiveTransportConfiguration(String targetServerID) { - TopologyMemberImpl member = clusterController.getDefaultClusterTopology().getMember(targetServerID); - return member != null ? member.getLive() : null; - } - - public boolean checkLive(TransportConfiguration liveTransportConfiguration) { - try { - ClusterControl control = clusterController.connectToNode(liveTransportConfiguration); - control.close(); - return true; - } catch (Throwable t) { - return false; + public void handleQuorumVote(Channel clusterChannel, Packet packet) { + QuorumVoteMessage quorumVoteMessage = (QuorumVoteMessage) packet; + QuorumVoteHandler voteHandler = getVoteHandler(quorumVoteMessage.getHandler()); + if (voteHandler == null) { + ActiveMQServerLogger.LOGGER.noVoteHandlerConfigured(); + return; } + quorumVoteMessage.decode(voteHandler); + ActiveMQServerLogger.LOGGER.receivedQuorumVoteRequest(quorumVoteMessage.getVote().toString()); + Vote vote = vote(quorumVoteMessage.getHandler(), quorumVoteMessage.getVote()); + ActiveMQServerLogger.LOGGER.sendingQuorumVoteResponse(vote.toString()); + clusterChannel.send(new QuorumVoteReplyMessage(quorumVoteMessage.getHandler(), vote)); } - private final class VoteRunnableHolder { private final QuorumVote quorumVote; @@ -289,6 +338,23 @@ public final class QuorumManager implements ClusterTopologyListener, ActiveMQCom } } + private Vote sendQuorumVote(ClusterControl clusterControl, SimpleString handler, Vote vote) { + try { + final ClientSessionFactoryInternal sessionFactory = clusterControl.getSessionFactory(); + final String remoteAddress = sessionFactory.getConnection().getRemoteAddress(); + ActiveMQServerLogger.LOGGER.sendingQuorumVoteRequest(remoteAddress, vote.toString()); + QuorumVoteReplyMessage replyMessage = (QuorumVoteReplyMessage) clusterControl.getClusterChannel().get() + .sendBlocking(new QuorumVoteMessage(handler, vote), PacketImpl.QUORUM_VOTE_REPLY); + QuorumVoteHandler voteHandler = getVoteHandler(replyMessage.getHandler()); + replyMessage.decodeRest(voteHandler); + Vote voteResponse = replyMessage.getVote(); + ActiveMQServerLogger.LOGGER.receivedQuorumVoteResponse(remoteAddress, voteResponse.toString()); + return voteResponse; + } catch (ActiveMQException e) { + return null; + } + } + /** * this will connect to a node and then cast a vote. whether or not this vote is asked of the target node is dependent * on {@link org.apache.activemq.artemis.core.server.cluster.qourum.Vote#isRequestServerVote()} @@ -318,7 +384,7 @@ public final class QuorumManager implements ClusterTopologyListener, ActiveMQCom vote = quorumVote.connected(); if (vote.isRequestServerVote()) { - vote = clusterControl.sendQuorumVote(quorumVote.getName(), vote); + vote = sendQuorumVote(clusterControl, quorumVote.getName(), vote); quorumVote.vote(vote); } else { quorumVote.vote(vote); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java index 77d38f6c4d..e6f884d719 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java @@ -332,21 +332,7 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener //nothing to do here } } - //the live is dead so lets vote for quorum - QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, targetServerID); - - quorumManager.vote(quorumVote); - - try { - quorumVote.await(quorumVoteWait, TimeUnit.SECONDS); - } catch (InterruptedException interruption) { - // No-op. The best the quorum can do now is to return the latest number it has - ActiveMQServerLogger.LOGGER.quorumVoteAwaitInterrupted(); - } - - quorumManager.voteComplete(quorumVote); - - if (quorumVote.getDecision()) { + if (!quorumManager.hasLive(targetServerID, size, quorumVoteWait, TimeUnit.SECONDS)) { return true; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java index 200d167a89..660d442a1c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java @@ -38,6 +38,7 @@ import org.apache.activemq.artemis.api.core.client.TopologyMember; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; +import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.ConfigurationUtils; import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.ChannelHandler; @@ -57,7 +58,6 @@ import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy; import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager; -import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteServerConnect; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.jboss.logging.Logger; @@ -223,6 +223,13 @@ public class SharedNothingLiveActivation extends LiveActivation { } } + private static TransportConfiguration getLiveConnector(Configuration configuration) { + String connectorName = configuration.getClusterConfigurations().get(0).getConnectorName(); + TransportConfiguration transportConfiguration = configuration.getConnectorConfigurations().get(connectorName); + assert transportConfiguration != null; + return transportConfiguration; + } + private final class ReplicationFailureListener implements FailureListener, CloseListener { @Override @@ -253,28 +260,11 @@ public class SharedNothingLiveActivation extends LiveActivation { if (failed && replicatedPolicy.isVoteOnReplicationFailure()) { QuorumManager quorumManager = activeMQServer.getClusterManager().getQuorumManager(); - int size = replicatedPolicy.getQuorumSize() == -1 ? quorumManager.getMaxClusterSize() : replicatedPolicy.getQuorumSize(); - String liveConnector = null; - List clusterConfigurations = activeMQServer.getConfiguration().getClusterConfigurations(); - if (clusterConfigurations != null && clusterConfigurations.size() > 0) { - ClusterConnectionConfiguration clusterConnectionConfiguration = clusterConfigurations.get(0); - String connectorName = clusterConnectionConfiguration.getConnectorName(); - TransportConfiguration transportConfiguration = activeMQServer.getConfiguration().getConnectorConfigurations().get(connectorName); - liveConnector = transportConfiguration != null ? transportConfiguration.toString() : null; - } - QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, activeMQServer.getNodeID().toString(), true, liveConnector); - - quorumManager.vote(quorumVote); - - try { - quorumVote.await(5, TimeUnit.SECONDS); - } catch (InterruptedException interruption) { - // No-op. The best the quorum can do now is to return the latest number it has - } - - quorumManager.voteComplete(quorumVote); - - if (!quorumVote.getDecision()) { + final boolean isStillLive = quorumManager.isStillLive(activeMQServer.getNodeID().toString(), + getLiveConnector(activeMQServer.getConfiguration()), + replicatedPolicy.getQuorumSize(), + 5, TimeUnit.SECONDS); + if (!isStillLive) { try { Thread startThread = new Thread(new Runnable() { @Override