This commit is contained in:
Clebert Suconic 2020-04-08 12:53:03 -04:00
commit 67b1b01196
3 changed files with 216 additions and 33 deletions

View File

@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.cluster.Bridge; import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl; import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl; import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import org.apache.activemq.artemis.core.server.cluster.qourum.ServerConnectVote;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.Notification;
@ -81,6 +82,10 @@ public interface ActiveMQServerLogger extends BasicLogger {
@Message(id = 223000, value = "Received Interrupt Exception whilst waiting for component to shutdown: {0}", format = Message.Format.MESSAGE_FORMAT) @Message(id = 223000, value = "Received Interrupt Exception whilst waiting for component to shutdown: {0}", format = Message.Format.MESSAGE_FORMAT)
void interruptWhilstStoppingComponent(String componentClassName); void interruptWhilstStoppingComponent(String componentClassName);
@LogMessage(level = Logger.Level.DEBUG)
@Message(id = 223001, value = "Ignored quorum vote due to quorum reached or vote casted: {0}", format = Message.Format.MESSAGE_FORMAT)
void ignoredQuorumVote(ServerConnectVote vote);
@LogMessage(level = Logger.Level.INFO) @LogMessage(level = Logger.Level.INFO)
@Message(id = 221000, value = "{0} Message Broker is starting with configuration {1}", format = Message.Format.MESSAGE_FORMAT) @Message(id = 221000, value = "{0} Message Broker is starting with configuration {1}", format = Message.Format.MESSAGE_FORMAT)
void serverStarting(String type, Configuration configuration); void serverStarting(String type, Configuration configuration);
@ -2041,7 +2046,7 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.INFO) @LogMessage(level = Logger.Level.INFO)
@Message(id = 224098, value = "Received a vote saying the backup is live with connector: {0}", format = Message.Format.MESSAGE_FORMAT) @Message(id = 224098, value = "Received a vote saying the backup is live with connector: {0}", format = Message.Format.MESSAGE_FORMAT)
void qourumBackupIsLive(String liveConnector); void quorumBackupIsLive(String liveConnector);
@LogMessage(level = Logger.Level.WARN) @LogMessage(level = Logger.Level.WARN)
@Message(id = 224099, value = "Message with ID {0} has a header too large. More information available on debug level for class {1}", @Message(id = 224099, value = "Message with ID {0} has a header too large. More information available on debug level for class {1}",

View File

@ -29,18 +29,14 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boolean> { public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boolean> {
public static final SimpleString LIVE_FAILOVER_VOTE = new SimpleString("LiveFailoverQuorumVote"); public static final SimpleString LIVE_FAILOVER_VOTE = new SimpleString("LiveFailoverQuorumVote");
private final CountDownLatch latch; // this flag mark the end of the vote
private final CountDownLatch voteCompleted;
private final String targetNodeId; private final String targetNodeId;
private final String liveConnector; private final String liveConnector;
private int votesNeeded; private int votesNeeded;
private int total = 0;
private boolean decision = false;
// Is this the live requesting to stay live, or a backup requesting to become live. // Is this the live requesting to stay live, or a backup requesting to become live.
private boolean requestToStayLive = false; private final boolean requestToStayLive;
/** /**
* live nodes | remaining nodes | majority | votes needed * live nodes | remaining nodes | majority | votes needed
@ -65,9 +61,9 @@ public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boole
} }
//votes needed could be say 2.5 so we add 1 in this case //votes needed could be say 2.5 so we add 1 in this case
votesNeeded = (int) majority; votesNeeded = (int) majority;
latch = new CountDownLatch(votesNeeded); voteCompleted = new CountDownLatch(1);
if (votesNeeded == 0) { if (votesNeeded == 0) {
decision = true; voteCompleted.countDown();
} }
this.requestToStayLive = requestToStayLive; this.requestToStayLive = requestToStayLive;
} }
@ -108,42 +104,46 @@ public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boole
*/ */
@Override @Override
public synchronized void vote(ServerConnectVote vote) { public synchronized void vote(ServerConnectVote vote) {
if (decision) if (voteCompleted.getCount() == 0) {
return; ActiveMQServerLogger.LOGGER.ignoredQuorumVote(vote);
if (!requestToStayLive && vote.getVote()) {
total++;
latch.countDown();
if (total >= votesNeeded) {
decision = true;
}//do the opposite, if it says there is a node connected it means the backup has come live
} else if (requestToStayLive && vote.getVote()) {
total++;
latch.countDown();
if (liveConnector != null && !liveConnector.equals(vote.getTransportConfiguration())) {
ActiveMQServerLogger.LOGGER.qourumBackupIsLive(liveConnector);
return; return;
} }
if (total >= votesNeeded) { if (vote.getVote()) {
decision = true; if (!requestToStayLive) {
acceptPositiveVote();
} else if (liveConnector.equals(vote.getTransportConfiguration())) {
acceptPositiveVote();
} else {
ActiveMQServerLogger.LOGGER.quorumBackupIsLive(vote.getTransportConfiguration());
} }
} }
} }
@Override private synchronized void acceptPositiveVote() {
public void allVotesCast(Topology voteTopology) { if (voteCompleted.getCount() == 0) {
while (latch.getCount() > 0) { throw new IllegalStateException("Cannot accept any new positive vote if the vote is completed or the decision is already taken");
latch.countDown(); }
votesNeeded--;
if (votesNeeded == 0) {
voteCompleted.countDown();
} }
} }
@Override @Override
public Boolean getDecision() { public synchronized void allVotesCast(Topology voteTopology) {
return decision; if (voteCompleted.getCount() > 0) {
voteCompleted.countDown();
}
}
@Override
public synchronized Boolean getDecision() {
return votesNeeded == 0;
} }
public void await(int latchTimeout, TimeUnit unit) throws InterruptedException { public void await(int latchTimeout, TimeUnit unit) throws InterruptedException {
ActiveMQServerLogger.LOGGER.waitingForQuorumVoteResults(latchTimeout, unit.toString().toLowerCase()); ActiveMQServerLogger.LOGGER.waitingForQuorumVoteResults(latchTimeout, unit.toString().toLowerCase());
if (latch.await(latchTimeout, unit)) if (voteCompleted.await(latchTimeout, unit))
ActiveMQServerLogger.LOGGER.receivedAllQuorumVotes(); ActiveMQServerLogger.LOGGER.receivedAllQuorumVotes();
else else
ActiveMQServerLogger.LOGGER.timeoutWaitingForQuorumVoteResponses(); ActiveMQServerLogger.LOGGER.timeoutWaitingForQuorumVoteResponses();

View File

@ -18,10 +18,20 @@ package org.apache.activemq.artemis.tests.integration.cluster.failover;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteServerConnect; import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteServerConnect;
import org.apache.activemq.artemis.core.server.cluster.qourum.ServerConnectVote; import org.apache.activemq.artemis.core.server.cluster.qourum.ServerConnectVote;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
@ -43,6 +53,174 @@ public class QuorumVoteServerConnectTest extends ActiveMQTestBase {
this.trueVotes = trueVotes; this.trueVotes = trueVotes;
} }
@Test
public void testVoteOnRequestToStay() {
Assume.assumeThat(trueVotes, Matchers.greaterThan(0));
Assume.assumeThat(size, Matchers.greaterThan(trueVotes));
final String liveConnector = "live";
final String backupConnector = "backup";
QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, "foo", true, liveConnector);
quorum.vote(new ServerConnectVote("foo", true, backupConnector));
Assert.assertFalse(quorum.getDecision());
for (int i = 0; i < trueVotes - 1; i++) {
quorum.vote(new ServerConnectVote("foo", true, liveConnector));
Assert.assertFalse(quorum.getDecision());
}
quorum.vote(new ServerConnectVote("foo", true, liveConnector));
Assert.assertTrue(quorum.getDecision());
}
@Test
public void testAllVoteCastFreezeNotRequestToStayDecision() {
QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, "foo");
Assert.assertFalse(quorum.isRequestToStayLive());
final boolean decisionBeforeVoteCompleted = quorum.getDecision();
quorum.allVotesCast(null);
for (int i = 0; i < trueVotes; i++) {
quorum.vote(new ServerConnectVote("foo", true, null));
}
Assert.assertEquals(decisionBeforeVoteCompleted, quorum.getDecision());
}
@Test
public void testAllVoteCastFreezeRequestToStayDecision() {
final String liveConnector = "live";
QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, "foo", true, liveConnector);
Assert.assertTrue(quorum.isRequestToStayLive());
final boolean decisionBeforeVoteCompleted = quorum.getDecision();
quorum.allVotesCast(null);
for (int i = 0; i < trueVotes; i++) {
quorum.vote(new ServerConnectVote("foo", true, liveConnector));
}
Assert.assertEquals(decisionBeforeVoteCompleted, quorum.getDecision());
}
@Test
public void testAllVoteCastUnblockAwait() throws InterruptedException {
Assume.assumeThat(trueVotes, Matchers.greaterThan(0));
Assume.assumeThat(size, Matchers.greaterThan(trueVotes));
QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, "foo");
Assert.assertFalse(quorum.getDecision());
CountDownLatch taskStarted = new CountDownLatch(1);
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
final Future<InterruptedException> waitingTaskResult = executor.submit(() -> {
taskStarted.countDown();
try {
quorum.await(1, TimeUnit.DAYS);
return null;
} catch (InterruptedException e) {
return e;
}
});
// realistic expectation of the max time to start a Thread
Assert.assertTrue(taskStarted.await(10, TimeUnit.SECONDS));
Assert.assertFalse(waitingTaskResult.isDone());
quorum.allVotesCast(null);
try {
Assert.assertNull(waitingTaskResult.get(5, TimeUnit.SECONDS));
} catch (TimeoutException ex) {
Assert.fail("allVoteCast hasn't unblocked the waiting task");
} catch (ExecutionException ex) {
Assert.fail("This shouldn't really happen: the wait task shouldn't throw any exception: " + ex);
}
Assert.assertTrue(waitingTaskResult.isDone());
Assert.assertFalse(quorum.getDecision());
} finally {
executor.shutdownNow();
}
}
@Test
public void testRequestToStayQuorumUnblockAwait() throws InterruptedException {
Assume.assumeThat(trueVotes, Matchers.greaterThan(0));
Assume.assumeThat(size, Matchers.greaterThan(trueVotes));
final String liveConnector = "live";
final String backupConnector = "backup";
QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, "foo", true, liveConnector);
Assert.assertFalse(quorum.getDecision());
CountDownLatch taskStarted = new CountDownLatch(1);
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
final Future<InterruptedException> waitingTaskResult = executor.submit(() -> {
taskStarted.countDown();
try {
quorum.await(1, TimeUnit.DAYS);
return null;
} catch (InterruptedException e) {
return e;
}
});
// realistic expectation of the max time to start a Thread
Assert.assertTrue(taskStarted.await(10, TimeUnit.SECONDS));
quorum.vote(new ServerConnectVote("foo", true, backupConnector));
Assert.assertFalse(waitingTaskResult.isDone());
Assert.assertFalse(quorum.getDecision());
for (int i = 0; i < trueVotes - 1; i++) {
quorum.vote(new ServerConnectVote("foo", true, liveConnector));
Assert.assertFalse(waitingTaskResult.isDone());
Assert.assertFalse(quorum.getDecision());
}
quorum.vote(new ServerConnectVote("foo", true, liveConnector));
Assert.assertTrue(quorum.getDecision());
try {
Assert.assertNull(waitingTaskResult.get(5, TimeUnit.SECONDS));
} catch (TimeoutException ex) {
Assert.fail("allVoteCast hasn't unblocked the waiting task");
} catch (ExecutionException ex) {
Assert.fail("This shouldn't really happen: the wait task shouldn't throw any exception: " + ex);
}
Assert.assertTrue(waitingTaskResult.isDone());
Assert.assertTrue(quorum.getDecision());
} finally {
executor.shutdownNow();
}
}
@Test
public void testNotRequestToStayQuorumUnblockAwait() throws InterruptedException {
Assume.assumeThat(trueVotes, Matchers.greaterThan(0));
Assume.assumeThat(size, Matchers.greaterThan(trueVotes));
QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, "foo");
Assert.assertFalse(quorum.getDecision());
CountDownLatch taskStarted = new CountDownLatch(1);
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
final Future<InterruptedException> waitingTaskResult = executor.submit(() -> {
taskStarted.countDown();
try {
quorum.await(1, TimeUnit.DAYS);
return null;
} catch (InterruptedException e) {
return e;
}
});
// realistic expectation of the max time to start a Thread
Assert.assertTrue(taskStarted.await(10, TimeUnit.SECONDS));
quorum.vote(new ServerConnectVote("foo", false, null));
Assert.assertFalse(waitingTaskResult.isDone());
Assert.assertFalse(quorum.getDecision());
for (int i = 0; i < trueVotes - 1; i++) {
quorum.vote(new ServerConnectVote("foo", true, null));
Assert.assertFalse(waitingTaskResult.isDone());
Assert.assertFalse(quorum.getDecision());
}
quorum.vote(new ServerConnectVote("foo", true, null));
Assert.assertTrue(quorum.getDecision());
try {
Assert.assertNull(waitingTaskResult.get(5, TimeUnit.SECONDS));
} catch (TimeoutException ex) {
Assert.fail("allVoteCast hasn't unblocked the waiting task");
} catch (ExecutionException ex) {
Assert.fail("This shouldn't really happen: the wait task shouldn't throw any exception: " + ex);
}
Assert.assertTrue(waitingTaskResult.isDone());
Assert.assertTrue(quorum.getDecision());
} finally {
executor.shutdownNow();
}
}
@Test @Test
public void testSuccessfulVote() { public void testSuccessfulVote() {
QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, "foo"); QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, "foo");