ARTEMIS-1302 make quorum voting more transparent
This commit is contained in:
parent
f138bc5284
commit
59841b8872
|
@ -67,4 +67,20 @@ public class QuorumVoteMessage extends PacketImpl {
|
|||
public void decode(QuorumVoteHandler voteHandler) {
|
||||
vote = voteHandler.decode(voteBuffer);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuffer buff = new StringBuffer(getParentString());
|
||||
buff.append("]");
|
||||
return buff.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getParentString() {
|
||||
StringBuffer buff = new StringBuffer(super.getParentString());
|
||||
buff.append(", vote=" + vote);
|
||||
buff.append(", handler=" + handler);
|
||||
return buff.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -346,6 +346,54 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
|||
@Message(id = 221059, value = "Deleting old data directory {0} as the max folders is set to 0", format = Message.Format.MESSAGE_FORMAT)
|
||||
void backupDeletingData(String oldPath);
|
||||
|
||||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(id = 221060, value = "Sending quorum vote request to {0}: {1}", format = Message.Format.MESSAGE_FORMAT)
|
||||
void sendingQuorumVoteRequest(String remoteAddress, String vote);
|
||||
|
||||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(id = 221061, value = "Received quorum vote response from {0}: {1}", format = Message.Format.MESSAGE_FORMAT)
|
||||
void receivedQuorumVoteResponse(String remoteAddress, String vote);
|
||||
|
||||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(id = 221062, value = "Received quorum vote request: {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||
void receivedQuorumVoteRequest(String vote);
|
||||
|
||||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(id = 221063, value = "Sending quorum vote response: {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||
void sendingQuorumVoteResponse(String vote);
|
||||
|
||||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(id = 221064, value = "Node {0} found in cluster topology", format = Message.Format.MESSAGE_FORMAT)
|
||||
void nodeFoundInClusterTopology(String nodeId);
|
||||
|
||||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(id = 221065, value = "Node {0} not found in cluster topology", format = Message.Format.MESSAGE_FORMAT)
|
||||
void nodeNotFoundInClusterTopology(String nodeId);
|
||||
|
||||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(id = 221066, value = "Initiating quorum vote: {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||
void initiatingQuorumVote(SimpleString vote);
|
||||
|
||||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(id = 221067, value = "Waiting {0} {1} for quorum vote results.", format = Message.Format.MESSAGE_FORMAT)
|
||||
void waitingForQuorumVoteResults(int timeout, String unit);
|
||||
|
||||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(id = 221068, value = "Received all quorum votes.", format = Message.Format.MESSAGE_FORMAT)
|
||||
void receivedAllQuorumVotes();
|
||||
|
||||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(id = 221069, value = "Timeout waiting for quorum vote responses.", format = Message.Format.MESSAGE_FORMAT)
|
||||
void timeoutWaitingForQuorumVoteResponses();
|
||||
|
||||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(id = 221070, value = "Restarting as backup based on quorum vote results.", format = Message.Format.MESSAGE_FORMAT)
|
||||
void restartingAsBackupBasedOnQuorumVoteResults();
|
||||
|
||||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(id = 221071, value = "Failing over based on quorum vote results.", format = Message.Format.MESSAGE_FORMAT)
|
||||
void failingOverBasedOnQuorumVoteResults();
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 222000, value = "ActiveMQServer is being finalized and has not been stopped. Please remember to stop the server before letting it go out of scope",
|
||||
format = Message.Format.MESSAGE_FORMAT)
|
||||
|
|
|
@ -159,10 +159,13 @@ public class ClusterControl implements AutoCloseable {
|
|||
|
||||
public Vote sendQuorumVote(SimpleString handler, Vote vote) {
|
||||
try {
|
||||
ActiveMQServerLogger.LOGGER.sendingQuorumVoteRequest(getSessionFactory().getConnection().getRemoteAddress(), vote.toString());
|
||||
QuorumVoteReplyMessage replyMessage = (QuorumVoteReplyMessage) clusterChannel.sendBlocking(new QuorumVoteMessage(handler, vote), PacketImpl.QUORUM_VOTE_REPLY);
|
||||
QuorumVoteHandler voteHandler = server.getClusterManager().getQuorumManager().getVoteHandler(replyMessage.getHandler());
|
||||
replyMessage.decodeRest(voteHandler);
|
||||
return replyMessage.getVote();
|
||||
Vote voteResponse = replyMessage.getVote();
|
||||
ActiveMQServerLogger.LOGGER.receivedQuorumVoteResponse(getSessionFactory().getConnection().getRemoteAddress(), voteResponse.toString());
|
||||
return voteResponse;
|
||||
} catch (ActiveMQException e) {
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -379,7 +379,9 @@ public class ClusterController implements ActiveMQComponent {
|
|||
QuorumVoteMessage quorumVoteMessage = (QuorumVoteMessage) packet;
|
||||
QuorumVoteHandler voteHandler = quorumManager.getVoteHandler(quorumVoteMessage.getHandler());
|
||||
quorumVoteMessage.decode(voteHandler);
|
||||
ActiveMQServerLogger.LOGGER.receivedQuorumVoteRequest(quorumVoteMessage.getVote().toString());
|
||||
Vote vote = quorumManager.vote(quorumVoteMessage.getHandler(), quorumVoteMessage.getVote());
|
||||
ActiveMQServerLogger.LOGGER.sendingQuorumVoteResponse(vote.toString());
|
||||
clusterChannel.send(new QuorumVoteReplyMessage(quorumVoteMessage.getHandler(), vote));
|
||||
} else if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT) {
|
||||
ScaleDownAnnounceMessage message = (ScaleDownAnnounceMessage) packet;
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
|||
*/
|
||||
public class BooleanVote extends Vote<Boolean> {
|
||||
|
||||
private boolean vote;
|
||||
protected boolean vote;
|
||||
|
||||
public BooleanVote(boolean vote) {
|
||||
this.vote = vote;
|
||||
|
@ -56,4 +56,8 @@ public class BooleanVote extends Vote<Boolean> {
|
|||
vote = buff.readBoolean();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "BooleanVote [vote=" + vote + "]";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
|
|||
import org.apache.activemq.artemis.api.core.client.TopologyMember;
|
||||
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
|
||||
|
||||
|
@ -181,6 +182,7 @@ public final class QuorumManager implements ClusterTopologyListener, ActiveMQCom
|
|||
if (!started)
|
||||
return;
|
||||
//send a vote to each node
|
||||
ActiveMQServerLogger.LOGGER.initiatingQuorumVote(quorumVote.getName());
|
||||
for (TopologyMemberImpl tm : clusterController.getDefaultClusterTopology().getMembers()) {
|
||||
//but not ourselves
|
||||
if (!tm.getNodeId().equals(clusterController.getNodeID().toString())) {
|
||||
|
|
|
@ -21,13 +21,14 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.client.impl.Topology;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
|
||||
/**
|
||||
* A Qourum Vote for deciding if a replicated backup should become live.
|
||||
* A Quorum Vote for deciding if a replicated backup should become live.
|
||||
*/
|
||||
public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boolean> {
|
||||
|
||||
public static final SimpleString LIVE_FAILOVER_VOTE = new SimpleString("LIVE_FAILOVER_VOTE");
|
||||
public static final SimpleString LIVE_FAILOVER_VOTE = new SimpleString("LiveFailoverQuorumVote");
|
||||
private final CountDownLatch latch;
|
||||
private final String targetNodeId;
|
||||
|
||||
|
@ -123,6 +124,10 @@ public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boole
|
|||
}
|
||||
|
||||
public void await(int latchTimeout, TimeUnit unit) throws InterruptedException {
|
||||
latch.await(latchTimeout, unit);
|
||||
ActiveMQServerLogger.LOGGER.waitingForQuorumVoteResults(latchTimeout, unit.toString().toLowerCase());
|
||||
if (latch.await(latchTimeout, unit))
|
||||
ActiveMQServerLogger.LOGGER.receivedAllQuorumVotes();
|
||||
else
|
||||
ActiveMQServerLogger.LOGGER.timeoutWaitingForQuorumVoteResponses();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -63,4 +63,9 @@ public class ServerConnectVote extends BooleanVote {
|
|||
public String getNodeId() {
|
||||
return nodeId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ServerConnectVote [nodeId=" + nodeId + ", vote=" + vote + "]";
|
||||
}
|
||||
}
|
|
@ -102,9 +102,11 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
|
|||
}
|
||||
if (!isLiveDown()) {
|
||||
//lost connection but don't know if live is down so restart as backup as we can't replicate any more
|
||||
ActiveMQServerLogger.LOGGER.restartingAsBackupBasedOnQuorumVoteResults();
|
||||
signal = BACKUP_ACTIVATION.FAILURE_REPLICATING;
|
||||
} else {
|
||||
// live is assumed to be down, backup fails-over
|
||||
ActiveMQServerLogger.LOGGER.failingOverBasedOnQuorumVoteResults();
|
||||
signal = BACKUP_ACTIVATION.FAIL_OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -278,5 +278,10 @@ public class ColocatedActivation extends LiveActivation {
|
|||
public Pair<String, Integer> getVote() {
|
||||
return new Pair<>(nodeID, backupsSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "RequestBackupVote [backupsSize=" + backupsSize + ", nodeID=" + nodeID + ", backupAvailable=" + backupAvailable + "]";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.impl;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteHandler;
|
||||
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteServerConnect;
|
||||
import org.apache.activemq.artemis.core.server.cluster.qourum.ServerConnectVote;
|
||||
|
@ -37,8 +38,10 @@ public class ServerConnectVoteHandler implements QuorumVoteHandler {
|
|||
String nodeid = serverConnectVote.getNodeId();
|
||||
TopologyMemberImpl member = server.getClusterManager().getDefaultConnection(null).getTopology().getMember(nodeid);
|
||||
if (member != null && member.getLive() != null) {
|
||||
ActiveMQServerLogger.LOGGER.nodeFoundInClusterTopology(nodeid);
|
||||
return new ServerConnectVote(nodeid, false);
|
||||
}
|
||||
ActiveMQServerLogger.LOGGER.nodeNotFoundInClusterTopology(nodeid);
|
||||
return new ServerConnectVote(nodeid, true);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue