ARTEMIS-3430 Activation Sequence Auto-Repair

This commit is contained in:
franz1981 2021-08-19 13:36:06 +02:00
parent c31db95f6b
commit ce3b50c1a6
7 changed files with 125 additions and 124 deletions

View File

@ -30,6 +30,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.jboss.logging.Logger;
import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.READ; import static java.nio.file.StandardOpenOption.READ;
@ -37,6 +38,7 @@ import static java.nio.file.StandardOpenOption.WRITE;
public abstract class FileBasedNodeManager extends NodeManager { public abstract class FileBasedNodeManager extends NodeManager {
private static final Logger LOGGER = Logger.getLogger(FileBasedNodeManager.class);
protected static final byte FIRST_TIME_START = '0'; protected static final byte FIRST_TIME_START = '0';
public static final String SERVER_LOCK_NAME = "server.lock"; public static final String SERVER_LOCK_NAME = "server.lock";
public static final String SERVER_ACTIVATION_SEQUENCE_NAME = "server.activation.sequence"; public static final String SERVER_ACTIVATION_SEQUENCE_NAME = "server.activation.sequence";
@ -162,6 +164,16 @@ public abstract class FileBasedNodeManager extends NodeManager {
} }
} }
if (channel != null) {
try {
channel.close();
} catch (IOException ignored) {
// can ignore it: going to open a new file and that's the I/O to care about
} finally {
channel = null;
}
}
@SuppressWarnings("resource") @SuppressWarnings("resource")
RandomAccessFile raFile = new RandomAccessFile(serverLockFile, ACCESS_MODE); RandomAccessFile raFile = new RandomAccessFile(serverLockFile, ACCESS_MODE);

View File

@ -161,18 +161,21 @@ public final class ReplicationBackupActivation extends Activation implements Dis
} }
} }
distributedManager.start(); distributedManager.start();
final long nodeActivationSequence = activeMQServer.getNodeManager().getNodeActivationSequence(); final NodeManager nodeManager = activeMQServer.getNodeManager();
// only a backup with positive local activation sequence could contain valuable data // only a backup with positive local activation sequence could contain valuable data
if (nodeActivationSequence > 0) { if (nodeManager.getNodeActivationSequence() > 0) {
final String nodeId = activeMQServer.getNodeManager().getNodeId().toString();
DistributedLock liveLockWithInSyncReplica; DistributedLock liveLockWithInSyncReplica;
while (true) { while (true) {
distributedManager.start(); distributedManager.start();
try { try {
liveLockWithInSyncReplica = tryActivate(nodeId, nodeActivationSequence, distributedManager, LOGGER); liveLockWithInSyncReplica = tryActivate(activeMQServer.getNodeManager(), distributedManager, LOGGER);
break; break;
} catch (UnavailableStateException canRecoverEx) { } catch (UnavailableStateException canRecoverEx) {
distributedManager.stop(); distributedManager.stop();
} catch (NodeManager.NodeManagerException fatalEx) {
LOGGER.warn("Failed while auto-repairing activation sequence: stop server now", fatalEx);
asyncRestartServer(activeMQServer, false);
return;
} }
} }
if (liveLockWithInSyncReplica != null) { if (liveLockWithInSyncReplica != null) {
@ -235,9 +238,7 @@ public final class ReplicationBackupActivation extends Activation implements Dis
// stopBackup is going to write the NodeID and activation sequence previously set on the NodeManager, // stopBackup is going to write the NodeID and activation sequence previously set on the NodeManager,
// because activeMQServer.resetNodeManager() has created a NodeManager with replicatedBackup == true. // because activeMQServer.resetNodeManager() has created a NodeManager with replicatedBackup == true.
nodeManager.stopBackup(); nodeManager.stopBackup();
final String nodeId = nodeManager.getNodeId().toString(); ensureSequentialAccessToNodeData(activeMQServer.toString(), nodeManager, distributedManager, LOGGER);
final long nodeActivationSequence = nodeManager.getNodeActivationSequence();
ensureSequentialAccessToNodeData(nodeId, nodeActivationSequence, activeMQServer, distributedManager, LOGGER);
} catch (Throwable fatal) { } catch (Throwable fatal) {
LOGGER.warn(fatal); LOGGER.warn(fatal);
// policy is already live one, but there's no activation yet: we can just stop // policy is already live one, but there's no activation yet: we can just stop
@ -323,19 +324,19 @@ public final class ReplicationBackupActivation extends Activation implements Dis
} }
// no more interested into these events: handling it manually from here // no more interested into these events: handling it manually from here
distributedManager.removeUnavailableManagerListener(this); distributedManager.removeUnavailableManagerListener(this);
final long nodeActivationSequence = activeMQServer.getNodeManager().getNodeActivationSequence(); final NodeManager nodeManager = activeMQServer.getNodeManager();
final String nodeId = activeMQServer.getNodeManager().getNodeId().toString();
DistributedLock liveLockWithInSyncReplica = null; DistributedLock liveLockWithInSyncReplica = null;
if (nodeActivationSequence > 0) { if (nodeManager.getNodeActivationSequence() > 0) {
try { try {
liveLockWithInSyncReplica = tryActivate(nodeId, nodeActivationSequence, distributedManager, LOGGER); liveLockWithInSyncReplica = tryActivate(nodeManager, distributedManager, LOGGER);
} catch (Throwable error) { } catch (Throwable error) {
// no need to retry here, can just restart as backup that will handle a more resilient tryActivate // no need to retry here, can just restart as backup that will handle a more resilient tryActivate
LOGGER.warn("Errored while attempting failover", error); LOGGER.warn("Errored while attempting failover", error);
liveLockWithInSyncReplica = null; liveLockWithInSyncReplica = null;
} }
} else { } else {
LOGGER.warnf("We expect local activation sequence for NodeID = %s to be > 0 on a fail-over, while is %d", nodeId, nodeActivationSequence); LOGGER.errorf("Expected positive local activation sequence for NodeID = %s during fail-over, but was %d: restarting as backup",
nodeManager.getNodeId(), nodeManager.getNodeActivationSequence());
} }
assert stopping.get(); assert stopping.get();
if (liveLockWithInSyncReplica != null) { if (liveLockWithInSyncReplica != null) {

View File

@ -273,6 +273,8 @@ final class ReplicationObserver implements ClusterTopologyListener, SessionFailu
} }
nodeManager.setNodeID(nodeId); nodeManager.setNodeID(nodeId);
nodeManager.setNodeActivationSequence(activationSequence); nodeManager.setNodeActivationSequence(activationSequence);
// persists nodeID and nodeActivationSequence
nodeManager.stopBackup();
backupManager.announceBackup(); backupManager.announceBackup();
backupUpToDate = true; backupUpToDate = true;
} }

View File

@ -113,14 +113,16 @@ public class ReplicationPrimaryActivation extends LiveActivation implements Dist
if (policy.getCoordinationId() != null) { if (policy.getCoordinationId() != null) {
applyCoordinationId(policy.getCoordinationId(), activeMQServer); applyCoordinationId(policy.getCoordinationId(), activeMQServer);
} }
final long sequence = activeMQServer.getNodeManager().getNodeActivationSequence(); final NodeManager nodeManager = activeMQServer.getNodeManager();
final long nodeActivationSequence = sequence == NodeManager.NULL_NODE_ACTIVATION_SEQUENCE ? 0 : sequence; if (nodeManager.getNodeActivationSequence() == NodeManager.NULL_NODE_ACTIVATION_SEQUENCE) {
final String nodeId = activeMQServer.getNodeManager().getNodeId().toString(); // persists an initial activation sequence
nodeManager.writeNodeActivationSequence(0);
}
DistributedLock liveLock; DistributedLock liveLock;
while (true) { while (true) {
distributedManager.start(); distributedManager.start();
try { try {
liveLock = tryActivate(nodeId, nodeActivationSequence, distributedManager, LOGGER); liveLock = tryActivate(nodeManager, distributedManager, LOGGER);
break; break;
} catch (UnavailableStateException canRecoverEx) { } catch (UnavailableStateException canRecoverEx) {
distributedManager.stop(); distributedManager.stop();
@ -128,12 +130,12 @@ public class ReplicationPrimaryActivation extends LiveActivation implements Dist
} }
if (liveLock == null) { if (liveLock == null) {
distributedManager.stop(); distributedManager.stop();
LOGGER.infof("This broker cannot become a live server with NodeID = %s: restarting as backup", nodeId); LOGGER.infof("This broker cannot become a live server with NodeID = %s: restarting as backup", nodeManager.getNodeId());
activeMQServer.setHAPolicy(policy.getBackupPolicy()); activeMQServer.setHAPolicy(policy.getBackupPolicy());
return; return;
} }
ensureSequentialAccessToNodeData(nodeId, nodeActivationSequence, activeMQServer, distributedManager, LOGGER); ensureSequentialAccessToNodeData(activeMQServer.toString(), nodeManager, distributedManager, LOGGER);
activeMQServer.initialisePart1(false); activeMQServer.initialisePart1(false);
@ -382,11 +384,8 @@ public class ReplicationPrimaryActivation extends LiveActivation implements Dist
} }
// we increment only if we are staying alive // we increment only if we are staying alive
if (!stoppingServer.get() && STARTED.equals(activeMQServer.getState())) { if (!stoppingServer.get() && STARTED.equals(activeMQServer.getState())) {
final NodeManager nodeManager = activeMQServer.getNodeManager();
final String nodeId = nodeManager.getNodeId().toString();
final long nodeActivationSequence = nodeManager.getNodeActivationSequence();
try { try {
ensureSequentialAccessToNodeData(nodeId, nodeActivationSequence, activeMQServer, distributedManager, LOGGER); ensureSequentialAccessToNodeData(activeMQServer.toString(), activeMQServer.getNodeManager(), distributedManager, LOGGER);
} catch (Throwable fatal) { } catch (Throwable fatal) {
LOGGER.errorf(fatal, "Unexpected exception: %s on attempted activation sequence increment; stopping server async", fatal.getLocalizedMessage()); LOGGER.errorf(fatal, "Unexpected exception: %s on attempted activation sequence increment; stopping server async", fatal.getLocalizedMessage());
asyncStopServer(); asyncStopServer();

View File

@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.quorum.DistributedLock; import org.apache.activemq.artemis.quorum.DistributedLock;
import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager; import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager;
@ -40,7 +39,6 @@ import org.jboss.logging.Logger;
public final class ActivationSequenceStateMachine { public final class ActivationSequenceStateMachine {
private static final long CHECK_ACTIVATION_SEQUENCE_WAIT_MILLIS = 200; private static final long CHECK_ACTIVATION_SEQUENCE_WAIT_MILLIS = 200;
private static final long CHECK_REPAIRED_ACTIVATION_SEQUENCE_WAIT_MILLIS = 2000;
private static final long LIVE_LOCK_ACQUIRE_TIMEOUT_MILLIS = 2000; private static final long LIVE_LOCK_ACQUIRE_TIMEOUT_MILLIS = 2000;
private ActivationSequenceStateMachine() { private ActivationSequenceStateMachine() {
@ -49,23 +47,27 @@ public final class ActivationSequenceStateMachine {
/** /**
* It loops if the data of the broker is still valuable, but cannot become live. * It loops if the data of the broker is still valuable, but cannot become live.
* It loops (temporarly) if data is in sync or can self-heal, but cannot yet acquire the live lock. * It loops (temporarily) if data is in sync or can auto-repair, but cannot yet acquire the live lock.
* <p> * <p>
* It stops loop and return: * It stops loop and return:
* <p><ul> * <p><ul>
* <li>{@code null}: if data is stale (and there are no rights to become live) * <li>{@code null}: if data is stale (and there are no rights to become live)
* <li>{@code !=null}: if data is in sync and the {@link DistributedLock} is correctly acquired * <li>{@code !=null}: if data is in sync/repaired and the {@link DistributedLock} is correctly acquired
* </ul><p> * </ul><p>
* <p> * <p>
* After successfully returning from this method ie not null return value, a broker should use * After successfully returning from this method ie not null return value, a broker should use
* {@link #ensureSequentialAccessToNodeData} to complete * {@link #ensureSequentialAccessToNodeData} to complete
* the activation and guarantee the initial not-replicated ownership of data. * the activation and guarantee the initial not-replicated ownership of data.
*/ */
public static DistributedLock tryActivate(final String nodeId, public static DistributedLock tryActivate(final NodeManager nodeManager,
final long nodeActivationSequence,
final DistributedPrimitiveManager distributedManager, final DistributedPrimitiveManager distributedManager,
final Logger logger) throws InterruptedException, ExecutionException, TimeoutException, UnavailableStateException { final Logger logger) throws InterruptedException, ExecutionException, TimeoutException, UnavailableStateException {
Objects.requireNonNull(nodeManager);
Objects.requireNonNull(distributedManager);
Objects.requireNonNull(logger);
final String nodeId = nodeManager.getNodeId() == null ? null : nodeManager.getNodeId().toString();
Objects.requireNonNull(nodeId); Objects.requireNonNull(nodeId);
final long nodeActivationSequence = nodeManager.getNodeActivationSequence();
if (nodeActivationSequence < 0) { if (nodeActivationSequence < 0) {
throw new IllegalArgumentException("nodeActivationSequence must be > 0"); throw new IllegalArgumentException("nodeActivationSequence must be > 0");
} }
@ -75,31 +77,26 @@ public final class ActivationSequenceStateMachine {
// dirty read is sufficient to know if we are *not* an in sync replica // dirty read is sufficient to know if we are *not* an in sync replica
// typically the lock owner will increment to signal our data is stale and we are happy without any // typically the lock owner will increment to signal our data is stale and we are happy without any
// further coordination at this point // further coordination at this point
long timeout = 0;
switch (validateActivationSequence(coordinatedNodeSequence, activationLock, nodeId, nodeActivationSequence, logger)) { switch (validateActivationSequence(coordinatedNodeSequence, activationLock, nodeId, nodeActivationSequence, logger)) {
case Stale: case Stale:
activationLock.close(); activationLock.close();
return null; return null;
case SelfRepair:
case InSync: case InSync:
timeout = LIVE_LOCK_ACQUIRE_TIMEOUT_MILLIS;
break; break;
case SelfRepair:
case MaybeInSync: case MaybeInSync:
if (activationLock.tryLock()) { break;
// BAD: where's the broker that should commit it?
activationLock.unlock();
logger.warnf("Cannot assume live role for NodeID = %s: claimed activation sequence need to be repaired",
nodeId);
TimeUnit.MILLISECONDS.sleep(CHECK_REPAIRED_ACTIVATION_SEQUENCE_WAIT_MILLIS);
continue;
}
// quick path while data is still valuable: wait until something change (commit/repair)
TimeUnit.MILLISECONDS.sleep(CHECK_ACTIVATION_SEQUENCE_WAIT_MILLIS);
continue;
} }
// SelfRepair, InSync // SelfRepair, MaybeInSync, InSync
if (!activationLock.tryLock(LIVE_LOCK_ACQUIRE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { if (!activationLock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
logger.debugf("Candidate for Node ID = %s, with local activation sequence: %d, cannot acquire live lock within %dms; retrying", logger.debugf("Candidate for Node ID = %s, with local activation sequence: %d, cannot acquire live lock within %dms; retrying",
nodeId, nodeActivationSequence, LIVE_LOCK_ACQUIRE_TIMEOUT_MILLIS); nodeId, nodeActivationSequence, timeout);
if (timeout == 0) {
Thread.sleep(CHECK_ACTIVATION_SEQUENCE_WAIT_MILLIS);
}
continue; continue;
} }
switch (validateActivationSequence(coordinatedNodeSequence, activationLock, nodeId, nodeActivationSequence, logger)) { switch (validateActivationSequence(coordinatedNodeSequence, activationLock, nodeId, nodeActivationSequence, logger)) {
@ -107,25 +104,46 @@ public final class ActivationSequenceStateMachine {
case Stale: case Stale:
activationLock.close(); activationLock.close();
return null; return null;
case SelfRepair:
// Self-repair sequence ie we were the only one with the most up to date data.
// NOTE: We cannot move the sequence now, let's delay it on ensureSequentialAccessToNodeData
logger.infof("Assuming live role for NodeID = %s: local activation sequence %d matches claimed coordinated activation sequence %d. Repairing sequence", nodeId, nodeActivationSequence, nodeActivationSequence);
return activationLock;
case InSync: case InSync:
// we are an in_sync_replica, good to go live as UNREPLICATED // we are an in_sync_replica, good to go live as UNREPLICATED
logger.infof("Assuming live role for NodeID = %s, local activation sequence %d matches current coordinated activation sequence %d", nodeId, nodeActivationSequence, nodeActivationSequence); logger.infof("Assuming live role for NodeID = %s, local activation sequence %d matches current coordinated activation sequence %d", nodeId, nodeActivationSequence, nodeActivationSequence);
return activationLock; return activationLock;
case SelfRepair:
// Self-repair sequence
logger.infof("Assuming live role for NodeID = %s: local activation sequence %d matches claimed coordinated activation sequence %d. Repairing sequence", nodeId, nodeActivationSequence, nodeActivationSequence);
try {
repairActivationSequence(nodeManager, coordinatedNodeSequence, nodeActivationSequence, true);
return activationLock;
} catch (NodeManager.NodeManagerException | UnavailableStateException ex) {
activationLock.close();
throw ex;
}
case MaybeInSync: case MaybeInSync:
activationLock.unlock(); // Auto-repair sequence
logger.warnf("Cannot assume live role for NodeID = %s: claimed activation sequence need to be repaired", nodeId); logger.warnf("Assuming live role for NodeID = %s: repairing claimed activation sequence", nodeId);
TimeUnit.MILLISECONDS.sleep(CHECK_REPAIRED_ACTIVATION_SEQUENCE_WAIT_MILLIS); try {
continue; repairActivationSequence(nodeManager, coordinatedNodeSequence, nodeActivationSequence, false);
return activationLock;
} catch (NodeManager.NodeManagerException | UnavailableStateException ex) {
activationLock.close();
throw ex;
}
} }
} }
} }
} }
private static void repairActivationSequence(final NodeManager nodeManager,
final MutableLong coordinatedNodeSequence,
final long nodeActivationSequence,
final boolean selfHeal) throws UnavailableStateException {
final long coordinatedNodeActivationSequence = selfHeal ? nodeActivationSequence : nodeActivationSequence + 1;
if (!selfHeal) {
nodeManager.writeNodeActivationSequence(coordinatedNodeActivationSequence);
}
coordinatedNodeSequence.set(coordinatedNodeActivationSequence);
}
private enum ValidationResult { private enum ValidationResult {
/** /**
* coordinated activation sequence (claimed/committed) is far beyond the local one: data is not valuable anymore * coordinated activation sequence (claimed/committed) is far beyond the local one: data is not valuable anymore
@ -239,84 +257,60 @@ public final class ActivationSequenceStateMachine {
* (using {@link NodeManager#writeNodeActivationSequence}) while holding the live lock, * (using {@link NodeManager#writeNodeActivationSequence}) while holding the live lock,
* failing with some exception otherwise.<br> * failing with some exception otherwise.<br>
* <p> * <p>
* The acceptable states are {@link ValidationResult#InSync} and {@link ValidationResult#SelfRepair}, throwing some exception otherwise.
* <p>
* This must be used while holding a live lock to ensure not-exclusive ownership of data ie can be both used * This must be used while holding a live lock to ensure not-exclusive ownership of data ie can be both used
* while loosing connectivity with a replica or after successfully {@link #tryActivate(String, long, DistributedPrimitiveManager, Logger)}. * while loosing connectivity with a replica or after successfully {@link #tryActivate}.
*/ */
public static void ensureSequentialAccessToNodeData(final String lockAndLongId, public static void ensureSequentialAccessToNodeData(final String serverDescription,
final long nodeActivationSequence, final NodeManager nodeManager,
final ActiveMQServer activeMQServer, final DistributedPrimitiveManager distributedManager,
final DistributedPrimitiveManager distributedPrimitiveManager,
final Logger logger) throws ActiveMQException, InterruptedException, UnavailableStateException, ExecutionException, TimeoutException { final Logger logger) throws ActiveMQException, InterruptedException, UnavailableStateException, ExecutionException, TimeoutException {
Objects.requireNonNull(serverDescription);
Objects.requireNonNull(nodeManager);
Objects.requireNonNull(distributedManager);
Objects.requireNonNull(logger);
final String lockAndLongId = nodeManager.getNodeId() == null ? null : nodeManager.getNodeId().toString();
Objects.requireNonNull(lockAndLongId); Objects.requireNonNull(lockAndLongId);
final long nodeActivationSequence = nodeManager.getNodeActivationSequence();
if (nodeActivationSequence < 0) { if (nodeActivationSequence < 0) {
throw new IllegalArgumentException("nodeActivationSequence must be >= 0"); throw new IllegalArgumentException("nodeActivationSequence must be >= 0");
} }
final DistributedLock liveLock = distributedPrimitiveManager.getDistributedLock(lockAndLongId); final DistributedLock liveLock = distributedManager.getDistributedLock(lockAndLongId);
if (!liveLock.isHeldByCaller()) { if (!liveLock.isHeldByCaller()) {
final String message = String.format("Server [%s], live lock for NodeID = %s, not held, activation sequence cannot be safely changed", final String message = String.format("Server [%s], live lock for NodeID = %s, not held, activation sequence cannot be safely changed",
activeMQServer, lockAndLongId); serverDescription, lockAndLongId);
logger.info(message); logger.info(message);
throw new UnavailableStateException(message); throw new UnavailableStateException(message);
} }
final MutableLong coordinatedNodeActivationSequence = distributedPrimitiveManager.getMutableLong(lockAndLongId); final MutableLong coordinatedNodeActivationSequence = distributedManager.getMutableLong(lockAndLongId);
final long currentCoordinatedActivationSequence = coordinatedNodeActivationSequence.get(); final long nextActivationSequence = nodeActivationSequence + 1;
final long nextActivationSequence;
if (currentCoordinatedActivationSequence < 0) {
// Check Self-Repair
if (nodeActivationSequence != -currentCoordinatedActivationSequence) {
final String message = String.format("Server [%s], cannot assume live role for NodeID = %s, local activation sequence %d does not match current claimed coordinated sequence %d: need repair",
activeMQServer, lockAndLongId, nodeActivationSequence, -currentCoordinatedActivationSequence);
logger.info(message);
throw new ActiveMQException(message);
}
// auto-repair: this is the same server that failed to commit its claimed sequence
nextActivationSequence = nodeActivationSequence;
} else {
// Check InSync
if (nodeActivationSequence != currentCoordinatedActivationSequence) {
final String message = String.format("Server [%s], cannot assume live role for NodeID = %s, local activation sequence %d does not match current coordinated sequence %d",
activeMQServer, lockAndLongId, nodeActivationSequence, currentCoordinatedActivationSequence);
logger.info(message);
throw new ActiveMQException(message);
}
nextActivationSequence = nodeActivationSequence + 1;
}
final NodeManager nodeManager = activeMQServer.getNodeManager();
// UN_REPLICATED STATE ENTER: auto-repair doesn't need to claim and write locally // UN_REPLICATED STATE ENTER: auto-repair doesn't need to claim and write locally
if (nodeActivationSequence != nextActivationSequence) { // claim
// claim if (!coordinatedNodeActivationSequence.compareAndSet(nodeActivationSequence, -nextActivationSequence)) {
if (!coordinatedNodeActivationSequence.compareAndSet(nodeActivationSequence, -nextActivationSequence)) { final String message = String.format("Server [%s], cannot assume live role for NodeID = %s, activation sequence claim failed, local activation sequence %d no longer matches current coordinated sequence %d",
final String message = String.format("Server [%s], cannot assume live role for NodeID = %s, activation sequence claim failed, local activation sequence %d no longer matches current coordinated sequence %d", serverDescription, lockAndLongId, nodeActivationSequence, coordinatedNodeActivationSequence.get());
activeMQServer, lockAndLongId, nodeActivationSequence, coordinatedNodeActivationSequence.get()); logger.infof(message);
logger.infof(message); throw new ActiveMQException(message);
throw new ActiveMQException(message);
}
// claim success: write locally
try {
nodeManager.writeNodeActivationSequence(nextActivationSequence);
} catch (NodeManager.NodeManagerException fatal) {
logger.errorf("Server [%s] failed to set local activation sequence to: %d for NodeId =%s. Cannot continue committing coordinated activation sequence: REQUIRES ADMIN INTERVENTION",
activeMQServer, nextActivationSequence, lockAndLongId);
throw new UnavailableStateException(fatal);
}
logger.infof("Server [%s], incremented local activation sequence to: %d for NodeId = %s",
activeMQServer, nextActivationSequence, lockAndLongId);
} else {
// self-heal need to update the in-memory sequence, because no writes will do it
nodeManager.setNodeActivationSequence(nextActivationSequence);
} }
// claim success: write locally
try {
nodeManager.writeNodeActivationSequence(nextActivationSequence);
} catch (NodeManager.NodeManagerException fatal) {
logger.errorf("Server [%s] failed to set local activation sequence to: %d for NodeId =%s. Cannot continue committing coordinated activation sequence: REQUIRES ADMIN INTERVENTION",
serverDescription, nextActivationSequence, lockAndLongId);
throw new UnavailableStateException(fatal);
}
logger.infof("Server [%s], incremented local activation sequence to: %d for NodeId = %s",
serverDescription, nextActivationSequence, lockAndLongId);
// commit // commit
if (!coordinatedNodeActivationSequence.compareAndSet(-nextActivationSequence, nextActivationSequence)) { if (!coordinatedNodeActivationSequence.compareAndSet(-nextActivationSequence, nextActivationSequence)) {
final String message = String.format("Server [%s], cannot assume live role for NodeID = %s, activation sequence commit failed, local activation sequence %d no longer matches current coordinated sequence %d", final String message = String.format("Server [%s], cannot assume live role for NodeID = %s, activation sequence commit failed, local activation sequence %d no longer matches current coordinated sequence %d",
activeMQServer, lockAndLongId, nodeActivationSequence, coordinatedNodeActivationSequence.get()); serverDescription, lockAndLongId, nodeActivationSequence, coordinatedNodeActivationSequence.get());
logger.infof(message); logger.infof(message);
throw new ActiveMQException(message); throw new ActiveMQException(message);
} }
logger.infof("Server [%s], incremented coordinated activation sequence to: %d for NodeId = %s", logger.infof("Server [%s], incremented coordinated activation sequence to: %d for NodeId = %s",
activeMQServer, nextActivationSequence, lockAndLongId); serverDescription, nextActivationSequence, lockAndLongId);
} }
} }

View File

@ -428,8 +428,8 @@ public class PluggableQuorumReplicationTest extends SharedNothingReplicationTest
} }
liveServer.start(); liveServer.start();
Wait.waitFor(liveServer::isStarted); Wait.waitFor(liveServer::isStarted);
Assert.assertEquals(2, liveServer.getNodeManager().getNodeActivationSequence()); Assert.assertEquals(3, liveServer.getNodeManager().getNodeActivationSequence());
Assert.assertEquals(2, distributedPrimitiveManager.getMutableLong(coordinatedId).get()); Assert.assertEquals(3, distributedPrimitiveManager.getMutableLong(coordinatedId).get());
distributedPrimitiveManager.stop(); distributedPrimitiveManager.stop();
@ -438,7 +438,7 @@ public class PluggableQuorumReplicationTest extends SharedNothingReplicationTest
backupServer.setIdentity("BACKUP"); backupServer.setIdentity("BACKUP");
backupServer.start(); backupServer.start();
Wait.waitFor(backupServer::isReplicaSync); Wait.waitFor(backupServer::isReplicaSync);
Assert.assertEquals(2, backupServer.getNodeManager().getNodeActivationSequence()); Assert.assertEquals(3, backupServer.getNodeManager().getNodeActivationSequence());
backupServer.stop(); backupServer.stop();
} }
@ -575,20 +575,15 @@ public class PluggableQuorumReplicationTest extends SharedNothingReplicationTest
final MutableLong activationSequence = distributedPrimitiveManager.getMutableLong(coordinatedId); final MutableLong activationSequence = distributedPrimitiveManager.getMutableLong(coordinatedId);
Assert.assertTrue(activationSequence.compareAndSet(2, -2)); Assert.assertTrue(activationSequence.compareAndSet(2, -2));
// case: 1, the fail to write locally 2 but the write actually succeeding
// should delay pending resolution of the uncommitted claim
backupServer.start();
// live server should activate after self healing its outstanding claim // live server should activate after self healing its outstanding claim
liveServer.start(); liveServer.start();
Wait.waitFor(liveServer::isStarted); Wait.waitFor(liveServer::isStarted);
Assert.assertEquals(3, liveServer.getNodeManager().getNodeActivationSequence());
assertTrue(Wait.waitFor(backupServer::isReplicaSync)); Assert.assertEquals(3, activationSequence.get());
assertTrue(liveServer.isReplicaSync());
} }
@Test @Test
public void testUnavailableAdminIntervention() throws Exception { public void testUnavailableAutoRepair() throws Exception {
// start backup // start backup
Configuration backupConfiguration = createBackupConfiguration(); Configuration backupConfiguration = createBackupConfiguration();
ActiveMQServer backupServer = addServer(ActiveMQServers.newActiveMQServer(backupConfiguration)); ActiveMQServer backupServer = addServer(ActiveMQServers.newActiveMQServer(backupConfiguration));
@ -610,7 +605,6 @@ public class PluggableQuorumReplicationTest extends SharedNothingReplicationTest
final String coordinatedId = liveServer.getNodeID().toString(); final String coordinatedId = liveServer.getNodeID().toString();
System.err.println("coodr id: " + coordinatedId);
backupServer.stop(); backupServer.stop();
TimeUnit.MILLISECONDS.sleep(500); TimeUnit.MILLISECONDS.sleep(500);
liveServer.stop(); liveServer.stop();
@ -651,15 +645,14 @@ public class PluggableQuorumReplicationTest extends SharedNothingReplicationTest
TimeUnit.MILLISECONDS.sleep(500); TimeUnit.MILLISECONDS.sleep(500);
// both are candidates and one of them failed to commit the claim // both are candidates and one of them failed to commit the claim
// let them compete on retry // let them compete on retry
Assert.assertTrue(coordinatedActivationSequence.compareAndSet(-2, 1));
// one of the two can activate // one of the two can activate
Wait.waitFor(() -> liveServer.isStarted() || backupServer.isStarted()); Wait.waitFor(() -> liveServer.isStarted() || backupServer.isStarted());
assertTrue(Wait.waitFor(backupServer::isReplicaSync)); assertTrue(Wait.waitFor(backupServer::isReplicaSync));
assertTrue(liveServer.isReplicaSync()); assertTrue(liveServer.isReplicaSync());
assertEquals(2, backupServer.getNodeManager().getNodeActivationSequence()); assertEquals(3, backupServer.getNodeManager().getNodeActivationSequence());
assertEquals(2, liveServer.getNodeManager().getNodeActivationSequence()); assertEquals(3, liveServer.getNodeManager().getNodeActivationSequence());
} }

View File

@ -90,7 +90,7 @@ public class ZookeeperPluggableQuorumPeerTest extends ZookeeperPluggableQuorumSi
Wait.assertEquals(2L, () -> primary.getActivationSequence().orElse(Long.MAX_VALUE).longValue(), timeout); Wait.assertEquals(2L, () -> primary.getActivationSequence().orElse(Long.MAX_VALUE).longValue(), timeout);
Assert.assertEquals(coordinationId, primary.getNodeID().get()); Assert.assertEquals(coordinationId, primary.getNodeID().get());
LOGGER.info("waiting peer b to be a replica"); LOGGER.info("waiting peer b to be a replica");
Wait.waitFor(() -> backup.isReplicaSync().get()); Wait.waitFor(() -> backup.isReplicaSync().orElse(false));
Wait.assertEquals(2L, () -> backup.getActivationSequence().get().longValue()); Wait.assertEquals(2L, () -> backup.getActivationSequence().get().longValue());
final String expectedUrlPeerA = liveOf(coordinationId, decodeNetworkTopologyJson(primary.listNetworkTopology().get())); final String expectedUrlPeerA = liveOf(coordinationId, decodeNetworkTopologyJson(primary.listNetworkTopology().get()));
Assert.assertEquals(urlPeerA, expectedUrlPeerA); Assert.assertEquals(urlPeerA, expectedUrlPeerA);