This closes #1416 ARTEMIS-1302 make quorum voting more transparent
This commit is contained in:
commit
f749764880
|
@ -67,4 +67,20 @@ public class QuorumVoteMessage extends PacketImpl {
|
||||||
public void decode(QuorumVoteHandler voteHandler) {
|
public void decode(QuorumVoteHandler voteHandler) {
|
||||||
vote = voteHandler.decode(voteBuffer);
|
vote = voteHandler.decode(voteBuffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
StringBuffer buff = new StringBuffer(getParentString());
|
||||||
|
buff.append("]");
|
||||||
|
return buff.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getParentString() {
|
||||||
|
StringBuffer buff = new StringBuffer(super.getParentString());
|
||||||
|
buff.append(", vote=" + vote);
|
||||||
|
buff.append(", handler=" + handler);
|
||||||
|
return buff.toString();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -346,6 +346,54 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
||||||
@Message(id = 221059, value = "Deleting old data directory {0} as the max folders is set to 0", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 221059, value = "Deleting old data directory {0} as the max folders is set to 0", format = Message.Format.MESSAGE_FORMAT)
|
||||||
void backupDeletingData(String oldPath);
|
void backupDeletingData(String oldPath);
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.INFO)
|
||||||
|
@Message(id = 221060, value = "Sending quorum vote request to {0}: {1}", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void sendingQuorumVoteRequest(String remoteAddress, String vote);
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.INFO)
|
||||||
|
@Message(id = 221061, value = "Received quorum vote response from {0}: {1}", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void receivedQuorumVoteResponse(String remoteAddress, String vote);
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.INFO)
|
||||||
|
@Message(id = 221062, value = "Received quorum vote request: {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void receivedQuorumVoteRequest(String vote);
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.INFO)
|
||||||
|
@Message(id = 221063, value = "Sending quorum vote response: {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void sendingQuorumVoteResponse(String vote);
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.INFO)
|
||||||
|
@Message(id = 221064, value = "Node {0} found in cluster topology", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void nodeFoundInClusterTopology(String nodeId);
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.INFO)
|
||||||
|
@Message(id = 221065, value = "Node {0} not found in cluster topology", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void nodeNotFoundInClusterTopology(String nodeId);
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.INFO)
|
||||||
|
@Message(id = 221066, value = "Initiating quorum vote: {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void initiatingQuorumVote(SimpleString vote);
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.INFO)
|
||||||
|
@Message(id = 221067, value = "Waiting {0} {1} for quorum vote results.", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void waitingForQuorumVoteResults(int timeout, String unit);
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.INFO)
|
||||||
|
@Message(id = 221068, value = "Received all quorum votes.", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void receivedAllQuorumVotes();
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.INFO)
|
||||||
|
@Message(id = 221069, value = "Timeout waiting for quorum vote responses.", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void timeoutWaitingForQuorumVoteResponses();
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.INFO)
|
||||||
|
@Message(id = 221070, value = "Restarting as backup based on quorum vote results.", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void restartingAsBackupBasedOnQuorumVoteResults();
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.INFO)
|
||||||
|
@Message(id = 221071, value = "Failing over based on quorum vote results.", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void failingOverBasedOnQuorumVoteResults();
|
||||||
|
|
||||||
@LogMessage(level = Logger.Level.WARN)
|
@LogMessage(level = Logger.Level.WARN)
|
||||||
@Message(id = 222000, value = "ActiveMQServer is being finalized and has not been stopped. Please remember to stop the server before letting it go out of scope",
|
@Message(id = 222000, value = "ActiveMQServer is being finalized and has not been stopped. Please remember to stop the server before letting it go out of scope",
|
||||||
format = Message.Format.MESSAGE_FORMAT)
|
format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
|
|
@ -159,10 +159,13 @@ public class ClusterControl implements AutoCloseable {
|
||||||
|
|
||||||
public Vote sendQuorumVote(SimpleString handler, Vote vote) {
|
public Vote sendQuorumVote(SimpleString handler, Vote vote) {
|
||||||
try {
|
try {
|
||||||
|
ActiveMQServerLogger.LOGGER.sendingQuorumVoteRequest(getSessionFactory().getConnection().getRemoteAddress(), vote.toString());
|
||||||
QuorumVoteReplyMessage replyMessage = (QuorumVoteReplyMessage) clusterChannel.sendBlocking(new QuorumVoteMessage(handler, vote), PacketImpl.QUORUM_VOTE_REPLY);
|
QuorumVoteReplyMessage replyMessage = (QuorumVoteReplyMessage) clusterChannel.sendBlocking(new QuorumVoteMessage(handler, vote), PacketImpl.QUORUM_VOTE_REPLY);
|
||||||
QuorumVoteHandler voteHandler = server.getClusterManager().getQuorumManager().getVoteHandler(replyMessage.getHandler());
|
QuorumVoteHandler voteHandler = server.getClusterManager().getQuorumManager().getVoteHandler(replyMessage.getHandler());
|
||||||
replyMessage.decodeRest(voteHandler);
|
replyMessage.decodeRest(voteHandler);
|
||||||
return replyMessage.getVote();
|
Vote voteResponse = replyMessage.getVote();
|
||||||
|
ActiveMQServerLogger.LOGGER.receivedQuorumVoteResponse(getSessionFactory().getConnection().getRemoteAddress(), voteResponse.toString());
|
||||||
|
return voteResponse;
|
||||||
} catch (ActiveMQException e) {
|
} catch (ActiveMQException e) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -379,7 +379,9 @@ public class ClusterController implements ActiveMQComponent {
|
||||||
QuorumVoteMessage quorumVoteMessage = (QuorumVoteMessage) packet;
|
QuorumVoteMessage quorumVoteMessage = (QuorumVoteMessage) packet;
|
||||||
QuorumVoteHandler voteHandler = quorumManager.getVoteHandler(quorumVoteMessage.getHandler());
|
QuorumVoteHandler voteHandler = quorumManager.getVoteHandler(quorumVoteMessage.getHandler());
|
||||||
quorumVoteMessage.decode(voteHandler);
|
quorumVoteMessage.decode(voteHandler);
|
||||||
|
ActiveMQServerLogger.LOGGER.receivedQuorumVoteRequest(quorumVoteMessage.getVote().toString());
|
||||||
Vote vote = quorumManager.vote(quorumVoteMessage.getHandler(), quorumVoteMessage.getVote());
|
Vote vote = quorumManager.vote(quorumVoteMessage.getHandler(), quorumVoteMessage.getVote());
|
||||||
|
ActiveMQServerLogger.LOGGER.sendingQuorumVoteResponse(vote.toString());
|
||||||
clusterChannel.send(new QuorumVoteReplyMessage(quorumVoteMessage.getHandler(), vote));
|
clusterChannel.send(new QuorumVoteReplyMessage(quorumVoteMessage.getHandler(), vote));
|
||||||
} else if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT) {
|
} else if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT) {
|
||||||
ScaleDownAnnounceMessage message = (ScaleDownAnnounceMessage) packet;
|
ScaleDownAnnounceMessage message = (ScaleDownAnnounceMessage) packet;
|
||||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
*/
|
*/
|
||||||
public class BooleanVote extends Vote<Boolean> {
|
public class BooleanVote extends Vote<Boolean> {
|
||||||
|
|
||||||
private boolean vote;
|
protected boolean vote;
|
||||||
|
|
||||||
public BooleanVote(boolean vote) {
|
public BooleanVote(boolean vote) {
|
||||||
this.vote = vote;
|
this.vote = vote;
|
||||||
|
@ -56,4 +56,8 @@ public class BooleanVote extends Vote<Boolean> {
|
||||||
vote = buff.readBoolean();
|
vote = buff.readBoolean();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "BooleanVote [vote=" + vote + "]";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
|
||||||
import org.apache.activemq.artemis.api.core.client.TopologyMember;
|
import org.apache.activemq.artemis.api.core.client.TopologyMember;
|
||||||
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
|
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
|
import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
|
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
|
||||||
|
|
||||||
|
@ -181,6 +182,7 @@ public final class QuorumManager implements ClusterTopologyListener, ActiveMQCom
|
||||||
if (!started)
|
if (!started)
|
||||||
return;
|
return;
|
||||||
//send a vote to each node
|
//send a vote to each node
|
||||||
|
ActiveMQServerLogger.LOGGER.initiatingQuorumVote(quorumVote.getName());
|
||||||
for (TopologyMemberImpl tm : clusterController.getDefaultClusterTopology().getMembers()) {
|
for (TopologyMemberImpl tm : clusterController.getDefaultClusterTopology().getMembers()) {
|
||||||
//but not ourselves
|
//but not ourselves
|
||||||
if (!tm.getNodeId().equals(clusterController.getNodeID().toString())) {
|
if (!tm.getNodeId().equals(clusterController.getNodeID().toString())) {
|
||||||
|
|
|
@ -21,13 +21,14 @@ import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.client.impl.Topology;
|
import org.apache.activemq.artemis.core.client.impl.Topology;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Qourum Vote for deciding if a replicated backup should become live.
|
* A Quorum Vote for deciding if a replicated backup should become live.
|
||||||
*/
|
*/
|
||||||
public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boolean> {
|
public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boolean> {
|
||||||
|
|
||||||
public static final SimpleString LIVE_FAILOVER_VOTE = new SimpleString("LIVE_FAILOVER_VOTE");
|
public static final SimpleString LIVE_FAILOVER_VOTE = new SimpleString("LiveFailoverQuorumVote");
|
||||||
private final CountDownLatch latch;
|
private final CountDownLatch latch;
|
||||||
private final String targetNodeId;
|
private final String targetNodeId;
|
||||||
|
|
||||||
|
@ -123,6 +124,10 @@ public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boole
|
||||||
}
|
}
|
||||||
|
|
||||||
public void await(int latchTimeout, TimeUnit unit) throws InterruptedException {
|
public void await(int latchTimeout, TimeUnit unit) throws InterruptedException {
|
||||||
latch.await(latchTimeout, unit);
|
ActiveMQServerLogger.LOGGER.waitingForQuorumVoteResults(latchTimeout, unit.toString().toLowerCase());
|
||||||
|
if (latch.await(latchTimeout, unit))
|
||||||
|
ActiveMQServerLogger.LOGGER.receivedAllQuorumVotes();
|
||||||
|
else
|
||||||
|
ActiveMQServerLogger.LOGGER.timeoutWaitingForQuorumVoteResponses();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,4 +63,9 @@ public class ServerConnectVote extends BooleanVote {
|
||||||
public String getNodeId() {
|
public String getNodeId() {
|
||||||
return nodeId;
|
return nodeId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "ServerConnectVote [nodeId=" + nodeId + ", vote=" + vote + "]";
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -102,9 +102,11 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
|
||||||
}
|
}
|
||||||
if (!isLiveDown()) {
|
if (!isLiveDown()) {
|
||||||
//lost connection but don't know if live is down so restart as backup as we can't replicate any more
|
//lost connection but don't know if live is down so restart as backup as we can't replicate any more
|
||||||
|
ActiveMQServerLogger.LOGGER.restartingAsBackupBasedOnQuorumVoteResults();
|
||||||
signal = BACKUP_ACTIVATION.FAILURE_REPLICATING;
|
signal = BACKUP_ACTIVATION.FAILURE_REPLICATING;
|
||||||
} else {
|
} else {
|
||||||
// live is assumed to be down, backup fails-over
|
// live is assumed to be down, backup fails-over
|
||||||
|
ActiveMQServerLogger.LOGGER.failingOverBasedOnQuorumVoteResults();
|
||||||
signal = BACKUP_ACTIVATION.FAIL_OVER;
|
signal = BACKUP_ACTIVATION.FAIL_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -278,5 +278,10 @@ public class ColocatedActivation extends LiveActivation {
|
||||||
public Pair<String, Integer> getVote() {
|
public Pair<String, Integer> getVote() {
|
||||||
return new Pair<>(nodeID, backupsSize);
|
return new Pair<>(nodeID, backupsSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "RequestBackupVote [backupsSize=" + backupsSize + ", nodeID=" + nodeID + ", backupAvailable=" + backupAvailable + "]";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.impl;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
|
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteHandler;
|
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteHandler;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteServerConnect;
|
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteServerConnect;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.qourum.ServerConnectVote;
|
import org.apache.activemq.artemis.core.server.cluster.qourum.ServerConnectVote;
|
||||||
|
@ -37,8 +38,10 @@ public class ServerConnectVoteHandler implements QuorumVoteHandler {
|
||||||
String nodeid = serverConnectVote.getNodeId();
|
String nodeid = serverConnectVote.getNodeId();
|
||||||
TopologyMemberImpl member = server.getClusterManager().getDefaultConnection(null).getTopology().getMember(nodeid);
|
TopologyMemberImpl member = server.getClusterManager().getDefaultConnection(null).getTopology().getMember(nodeid);
|
||||||
if (member != null && member.getLive() != null) {
|
if (member != null && member.getLive() != null) {
|
||||||
|
ActiveMQServerLogger.LOGGER.nodeFoundInClusterTopology(nodeid);
|
||||||
return new ServerConnectVote(nodeid, false);
|
return new ServerConnectVote(nodeid, false);
|
||||||
}
|
}
|
||||||
|
ActiveMQServerLogger.LOGGER.nodeNotFoundInClusterTopology(nodeid);
|
||||||
return new ServerConnectVote(nodeid, true);
|
return new ServerConnectVote(nodeid, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue