From ce3b50c1a6f76c76f6d3887d1128c5591f2e4f37 Mon Sep 17 00:00:00 2001 From: franz1981 Date: Thu, 19 Aug 2021 13:36:06 +0200 Subject: [PATCH] ARTEMIS-3430 Activation Sequence Auto-Repair --- .../server/impl/FileBasedNodeManager.java | 12 ++ .../impl/ReplicationBackupActivation.java | 25 +-- .../core/server/impl/ReplicationObserver.java | 2 + .../impl/ReplicationPrimaryActivation.java | 19 +- .../ActivationSequenceStateMachine.java | 166 +++++++++--------- .../PluggableQuorumReplicationTest.java | 23 +-- .../ZookeeperPluggableQuorumPeerTest.java | 2 +- 7 files changed, 125 insertions(+), 124 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileBasedNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileBasedNodeManager.java index 6193913375..9ac2b6957f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileBasedNodeManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileBasedNodeManager.java @@ -30,6 +30,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.jboss.logging.Logger; import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.READ; @@ -37,6 +38,7 @@ import static java.nio.file.StandardOpenOption.WRITE; public abstract class FileBasedNodeManager extends NodeManager { + private static final Logger LOGGER = Logger.getLogger(FileBasedNodeManager.class); protected static final byte FIRST_TIME_START = '0'; public static final String SERVER_LOCK_NAME = "server.lock"; public static final String SERVER_ACTIVATION_SEQUENCE_NAME = "server.activation.sequence"; @@ -162,6 +164,16 @@ public abstract class FileBasedNodeManager extends NodeManager { } } + if (channel != null) { + try { + channel.close(); + } catch (IOException ignored) { + // can ignore it: going to open a new file and that's the I/O to care about + } finally { + channel = null; + } + } + @SuppressWarnings("resource") RandomAccessFile raFile = new RandomAccessFile(serverLockFile, ACCESS_MODE); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationBackupActivation.java index d4e9a4c71c..10212e0877 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationBackupActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationBackupActivation.java @@ -161,18 +161,21 @@ public final class ReplicationBackupActivation extends Activation implements Dis } } distributedManager.start(); - final long nodeActivationSequence = activeMQServer.getNodeManager().getNodeActivationSequence(); + final NodeManager nodeManager = activeMQServer.getNodeManager(); // only a backup with positive local activation sequence could contain valuable data - if (nodeActivationSequence > 0) { - final String nodeId = activeMQServer.getNodeManager().getNodeId().toString(); + if (nodeManager.getNodeActivationSequence() > 0) { DistributedLock liveLockWithInSyncReplica; while (true) { distributedManager.start(); try { - liveLockWithInSyncReplica = tryActivate(nodeId, nodeActivationSequence, distributedManager, LOGGER); + liveLockWithInSyncReplica = tryActivate(activeMQServer.getNodeManager(), distributedManager, LOGGER); break; } catch (UnavailableStateException canRecoverEx) { distributedManager.stop(); + } catch (NodeManager.NodeManagerException fatalEx) { + LOGGER.warn("Failed while auto-repairing activation sequence: stop server now", fatalEx); + asyncRestartServer(activeMQServer, false); + return; } } if (liveLockWithInSyncReplica != null) { @@ -235,9 +238,7 @@ public final class ReplicationBackupActivation extends Activation implements Dis // stopBackup is going to write the NodeID and activation sequence previously set on the NodeManager, // because activeMQServer.resetNodeManager() has created a NodeManager with replicatedBackup == true. nodeManager.stopBackup(); - final String nodeId = nodeManager.getNodeId().toString(); - final long nodeActivationSequence = nodeManager.getNodeActivationSequence(); - ensureSequentialAccessToNodeData(nodeId, nodeActivationSequence, activeMQServer, distributedManager, LOGGER); + ensureSequentialAccessToNodeData(activeMQServer.toString(), nodeManager, distributedManager, LOGGER); } catch (Throwable fatal) { LOGGER.warn(fatal); // policy is already live one, but there's no activation yet: we can just stop @@ -323,19 +324,19 @@ public final class ReplicationBackupActivation extends Activation implements Dis } // no more interested into these events: handling it manually from here distributedManager.removeUnavailableManagerListener(this); - final long nodeActivationSequence = activeMQServer.getNodeManager().getNodeActivationSequence(); - final String nodeId = activeMQServer.getNodeManager().getNodeId().toString(); + final NodeManager nodeManager = activeMQServer.getNodeManager(); DistributedLock liveLockWithInSyncReplica = null; - if (nodeActivationSequence > 0) { + if (nodeManager.getNodeActivationSequence() > 0) { try { - liveLockWithInSyncReplica = tryActivate(nodeId, nodeActivationSequence, distributedManager, LOGGER); + liveLockWithInSyncReplica = tryActivate(nodeManager, distributedManager, LOGGER); } catch (Throwable error) { // no need to retry here, can just restart as backup that will handle a more resilient tryActivate LOGGER.warn("Errored while attempting failover", error); liveLockWithInSyncReplica = null; } } else { - LOGGER.warnf("We expect local activation sequence for NodeID = %s to be > 0 on a fail-over, while is %d", nodeId, nodeActivationSequence); + LOGGER.errorf("Expected positive local activation sequence for NodeID = %s during fail-over, but was %d: restarting as backup", + nodeManager.getNodeId(), nodeManager.getNodeActivationSequence()); } assert stopping.get(); if (liveLockWithInSyncReplica != null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationObserver.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationObserver.java index c28a5d2e0c..cb08b8dcdd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationObserver.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationObserver.java @@ -273,6 +273,8 @@ final class ReplicationObserver implements ClusterTopologyListener, SessionFailu } nodeManager.setNodeID(nodeId); nodeManager.setNodeActivationSequence(activationSequence); + // persists nodeID and nodeActivationSequence + nodeManager.stopBackup(); backupManager.announceBackup(); backupUpToDate = true; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationPrimaryActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationPrimaryActivation.java index d8c3d38a35..870832203e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationPrimaryActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationPrimaryActivation.java @@ -113,14 +113,16 @@ public class ReplicationPrimaryActivation extends LiveActivation implements Dist if (policy.getCoordinationId() != null) { applyCoordinationId(policy.getCoordinationId(), activeMQServer); } - final long sequence = activeMQServer.getNodeManager().getNodeActivationSequence(); - final long nodeActivationSequence = sequence == NodeManager.NULL_NODE_ACTIVATION_SEQUENCE ? 0 : sequence; - final String nodeId = activeMQServer.getNodeManager().getNodeId().toString(); + final NodeManager nodeManager = activeMQServer.getNodeManager(); + if (nodeManager.getNodeActivationSequence() == NodeManager.NULL_NODE_ACTIVATION_SEQUENCE) { + // persists an initial activation sequence + nodeManager.writeNodeActivationSequence(0); + } DistributedLock liveLock; while (true) { distributedManager.start(); try { - liveLock = tryActivate(nodeId, nodeActivationSequence, distributedManager, LOGGER); + liveLock = tryActivate(nodeManager, distributedManager, LOGGER); break; } catch (UnavailableStateException canRecoverEx) { distributedManager.stop(); @@ -128,12 +130,12 @@ public class ReplicationPrimaryActivation extends LiveActivation implements Dist } if (liveLock == null) { distributedManager.stop(); - LOGGER.infof("This broker cannot become a live server with NodeID = %s: restarting as backup", nodeId); + LOGGER.infof("This broker cannot become a live server with NodeID = %s: restarting as backup", nodeManager.getNodeId()); activeMQServer.setHAPolicy(policy.getBackupPolicy()); return; } - ensureSequentialAccessToNodeData(nodeId, nodeActivationSequence, activeMQServer, distributedManager, LOGGER); + ensureSequentialAccessToNodeData(activeMQServer.toString(), nodeManager, distributedManager, LOGGER); activeMQServer.initialisePart1(false); @@ -382,11 +384,8 @@ public class ReplicationPrimaryActivation extends LiveActivation implements Dist } // we increment only if we are staying alive if (!stoppingServer.get() && STARTED.equals(activeMQServer.getState())) { - final NodeManager nodeManager = activeMQServer.getNodeManager(); - final String nodeId = nodeManager.getNodeId().toString(); - final long nodeActivationSequence = nodeManager.getNodeActivationSequence(); try { - ensureSequentialAccessToNodeData(nodeId, nodeActivationSequence, activeMQServer, distributedManager, LOGGER); + ensureSequentialAccessToNodeData(activeMQServer.toString(), activeMQServer.getNodeManager(), distributedManager, LOGGER); } catch (Throwable fatal) { LOGGER.errorf(fatal, "Unexpected exception: %s on attempted activation sequence increment; stopping server async", fatal.getLocalizedMessage()); asyncStopServer(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/quorum/ActivationSequenceStateMachine.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/quorum/ActivationSequenceStateMachine.java index 60358ffa17..bfe246d17b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/quorum/ActivationSequenceStateMachine.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/quorum/ActivationSequenceStateMachine.java @@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.quorum.DistributedLock; import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager; @@ -40,7 +39,6 @@ import org.jboss.logging.Logger; public final class ActivationSequenceStateMachine { private static final long CHECK_ACTIVATION_SEQUENCE_WAIT_MILLIS = 200; - private static final long CHECK_REPAIRED_ACTIVATION_SEQUENCE_WAIT_MILLIS = 2000; private static final long LIVE_LOCK_ACQUIRE_TIMEOUT_MILLIS = 2000; private ActivationSequenceStateMachine() { @@ -49,23 +47,27 @@ public final class ActivationSequenceStateMachine { /** * It loops if the data of the broker is still valuable, but cannot become live. - * It loops (temporarly) if data is in sync or can self-heal, but cannot yet acquire the live lock. + * It loops (temporarily) if data is in sync or can auto-repair, but cannot yet acquire the live lock. *

* It stops loop and return: *

*

* After successfully returning from this method ie not null return value, a broker should use * {@link #ensureSequentialAccessToNodeData} to complete * the activation and guarantee the initial not-replicated ownership of data. */ - public static DistributedLock tryActivate(final String nodeId, - final long nodeActivationSequence, + public static DistributedLock tryActivate(final NodeManager nodeManager, final DistributedPrimitiveManager distributedManager, final Logger logger) throws InterruptedException, ExecutionException, TimeoutException, UnavailableStateException { + Objects.requireNonNull(nodeManager); + Objects.requireNonNull(distributedManager); + Objects.requireNonNull(logger); + final String nodeId = nodeManager.getNodeId() == null ? null : nodeManager.getNodeId().toString(); Objects.requireNonNull(nodeId); + final long nodeActivationSequence = nodeManager.getNodeActivationSequence(); if (nodeActivationSequence < 0) { throw new IllegalArgumentException("nodeActivationSequence must be > 0"); } @@ -75,31 +77,26 @@ public final class ActivationSequenceStateMachine { // dirty read is sufficient to know if we are *not* an in sync replica // typically the lock owner will increment to signal our data is stale and we are happy without any // further coordination at this point + long timeout = 0; switch (validateActivationSequence(coordinatedNodeSequence, activationLock, nodeId, nodeActivationSequence, logger)) { case Stale: activationLock.close(); return null; - case SelfRepair: case InSync: + timeout = LIVE_LOCK_ACQUIRE_TIMEOUT_MILLIS; break; + case SelfRepair: case MaybeInSync: - if (activationLock.tryLock()) { - // BAD: where's the broker that should commit it? - activationLock.unlock(); - logger.warnf("Cannot assume live role for NodeID = %s: claimed activation sequence need to be repaired", - nodeId); - TimeUnit.MILLISECONDS.sleep(CHECK_REPAIRED_ACTIVATION_SEQUENCE_WAIT_MILLIS); - continue; - } - // quick path while data is still valuable: wait until something change (commit/repair) - TimeUnit.MILLISECONDS.sleep(CHECK_ACTIVATION_SEQUENCE_WAIT_MILLIS); - continue; + break; } - // SelfRepair, InSync - if (!activationLock.tryLock(LIVE_LOCK_ACQUIRE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + // SelfRepair, MaybeInSync, InSync + if (!activationLock.tryLock(timeout, TimeUnit.MILLISECONDS)) { logger.debugf("Candidate for Node ID = %s, with local activation sequence: %d, cannot acquire live lock within %dms; retrying", - nodeId, nodeActivationSequence, LIVE_LOCK_ACQUIRE_TIMEOUT_MILLIS); + nodeId, nodeActivationSequence, timeout); + if (timeout == 0) { + Thread.sleep(CHECK_ACTIVATION_SEQUENCE_WAIT_MILLIS); + } continue; } switch (validateActivationSequence(coordinatedNodeSequence, activationLock, nodeId, nodeActivationSequence, logger)) { @@ -107,25 +104,46 @@ public final class ActivationSequenceStateMachine { case Stale: activationLock.close(); return null; - case SelfRepair: - // Self-repair sequence ie we were the only one with the most up to date data. - // NOTE: We cannot move the sequence now, let's delay it on ensureSequentialAccessToNodeData - logger.infof("Assuming live role for NodeID = %s: local activation sequence %d matches claimed coordinated activation sequence %d. Repairing sequence", nodeId, nodeActivationSequence, nodeActivationSequence); - return activationLock; case InSync: // we are an in_sync_replica, good to go live as UNREPLICATED logger.infof("Assuming live role for NodeID = %s, local activation sequence %d matches current coordinated activation sequence %d", nodeId, nodeActivationSequence, nodeActivationSequence); return activationLock; + case SelfRepair: + // Self-repair sequence + logger.infof("Assuming live role for NodeID = %s: local activation sequence %d matches claimed coordinated activation sequence %d. Repairing sequence", nodeId, nodeActivationSequence, nodeActivationSequence); + try { + repairActivationSequence(nodeManager, coordinatedNodeSequence, nodeActivationSequence, true); + return activationLock; + } catch (NodeManager.NodeManagerException | UnavailableStateException ex) { + activationLock.close(); + throw ex; + } case MaybeInSync: - activationLock.unlock(); - logger.warnf("Cannot assume live role for NodeID = %s: claimed activation sequence need to be repaired", nodeId); - TimeUnit.MILLISECONDS.sleep(CHECK_REPAIRED_ACTIVATION_SEQUENCE_WAIT_MILLIS); - continue; + // Auto-repair sequence + logger.warnf("Assuming live role for NodeID = %s: repairing claimed activation sequence", nodeId); + try { + repairActivationSequence(nodeManager, coordinatedNodeSequence, nodeActivationSequence, false); + return activationLock; + } catch (NodeManager.NodeManagerException | UnavailableStateException ex) { + activationLock.close(); + throw ex; + } } } } } + private static void repairActivationSequence(final NodeManager nodeManager, + final MutableLong coordinatedNodeSequence, + final long nodeActivationSequence, + final boolean selfHeal) throws UnavailableStateException { + final long coordinatedNodeActivationSequence = selfHeal ? nodeActivationSequence : nodeActivationSequence + 1; + if (!selfHeal) { + nodeManager.writeNodeActivationSequence(coordinatedNodeActivationSequence); + } + coordinatedNodeSequence.set(coordinatedNodeActivationSequence); + } + private enum ValidationResult { /** * coordinated activation sequence (claimed/committed) is far beyond the local one: data is not valuable anymore @@ -239,84 +257,60 @@ public final class ActivationSequenceStateMachine { * (using {@link NodeManager#writeNodeActivationSequence}) while holding the live lock, * failing with some exception otherwise.
*

- * The acceptable states are {@link ValidationResult#InSync} and {@link ValidationResult#SelfRepair}, throwing some exception otherwise. - *

* This must be used while holding a live lock to ensure not-exclusive ownership of data ie can be both used - * while loosing connectivity with a replica or after successfully {@link #tryActivate(String, long, DistributedPrimitiveManager, Logger)}. + * while loosing connectivity with a replica or after successfully {@link #tryActivate}. */ - public static void ensureSequentialAccessToNodeData(final String lockAndLongId, - final long nodeActivationSequence, - final ActiveMQServer activeMQServer, - final DistributedPrimitiveManager distributedPrimitiveManager, + public static void ensureSequentialAccessToNodeData(final String serverDescription, + final NodeManager nodeManager, + final DistributedPrimitiveManager distributedManager, final Logger logger) throws ActiveMQException, InterruptedException, UnavailableStateException, ExecutionException, TimeoutException { + Objects.requireNonNull(serverDescription); + Objects.requireNonNull(nodeManager); + Objects.requireNonNull(distributedManager); + Objects.requireNonNull(logger); + final String lockAndLongId = nodeManager.getNodeId() == null ? null : nodeManager.getNodeId().toString(); Objects.requireNonNull(lockAndLongId); + final long nodeActivationSequence = nodeManager.getNodeActivationSequence(); if (nodeActivationSequence < 0) { throw new IllegalArgumentException("nodeActivationSequence must be >= 0"); } - final DistributedLock liveLock = distributedPrimitiveManager.getDistributedLock(lockAndLongId); + final DistributedLock liveLock = distributedManager.getDistributedLock(lockAndLongId); if (!liveLock.isHeldByCaller()) { final String message = String.format("Server [%s], live lock for NodeID = %s, not held, activation sequence cannot be safely changed", - activeMQServer, lockAndLongId); + serverDescription, lockAndLongId); logger.info(message); throw new UnavailableStateException(message); } - final MutableLong coordinatedNodeActivationSequence = distributedPrimitiveManager.getMutableLong(lockAndLongId); - final long currentCoordinatedActivationSequence = coordinatedNodeActivationSequence.get(); - final long nextActivationSequence; - if (currentCoordinatedActivationSequence < 0) { - // Check Self-Repair - if (nodeActivationSequence != -currentCoordinatedActivationSequence) { - final String message = String.format("Server [%s], cannot assume live role for NodeID = %s, local activation sequence %d does not match current claimed coordinated sequence %d: need repair", - activeMQServer, lockAndLongId, nodeActivationSequence, -currentCoordinatedActivationSequence); - logger.info(message); - throw new ActiveMQException(message); - } - // auto-repair: this is the same server that failed to commit its claimed sequence - nextActivationSequence = nodeActivationSequence; - } else { - // Check InSync - if (nodeActivationSequence != currentCoordinatedActivationSequence) { - final String message = String.format("Server [%s], cannot assume live role for NodeID = %s, local activation sequence %d does not match current coordinated sequence %d", - activeMQServer, lockAndLongId, nodeActivationSequence, currentCoordinatedActivationSequence); - logger.info(message); - throw new ActiveMQException(message); - } - nextActivationSequence = nodeActivationSequence + 1; - } - final NodeManager nodeManager = activeMQServer.getNodeManager(); + final MutableLong coordinatedNodeActivationSequence = distributedManager.getMutableLong(lockAndLongId); + final long nextActivationSequence = nodeActivationSequence + 1; // UN_REPLICATED STATE ENTER: auto-repair doesn't need to claim and write locally - if (nodeActivationSequence != nextActivationSequence) { - // claim - if (!coordinatedNodeActivationSequence.compareAndSet(nodeActivationSequence, -nextActivationSequence)) { - final String message = String.format("Server [%s], cannot assume live role for NodeID = %s, activation sequence claim failed, local activation sequence %d no longer matches current coordinated sequence %d", - activeMQServer, lockAndLongId, nodeActivationSequence, coordinatedNodeActivationSequence.get()); - logger.infof(message); - throw new ActiveMQException(message); - } - // claim success: write locally - try { - nodeManager.writeNodeActivationSequence(nextActivationSequence); - } catch (NodeManager.NodeManagerException fatal) { - logger.errorf("Server [%s] failed to set local activation sequence to: %d for NodeId =%s. Cannot continue committing coordinated activation sequence: REQUIRES ADMIN INTERVENTION", - activeMQServer, nextActivationSequence, lockAndLongId); - throw new UnavailableStateException(fatal); - } - logger.infof("Server [%s], incremented local activation sequence to: %d for NodeId = %s", - activeMQServer, nextActivationSequence, lockAndLongId); - } else { - // self-heal need to update the in-memory sequence, because no writes will do it - nodeManager.setNodeActivationSequence(nextActivationSequence); + // claim + if (!coordinatedNodeActivationSequence.compareAndSet(nodeActivationSequence, -nextActivationSequence)) { + final String message = String.format("Server [%s], cannot assume live role for NodeID = %s, activation sequence claim failed, local activation sequence %d no longer matches current coordinated sequence %d", + serverDescription, lockAndLongId, nodeActivationSequence, coordinatedNodeActivationSequence.get()); + logger.infof(message); + throw new ActiveMQException(message); } + // claim success: write locally + try { + nodeManager.writeNodeActivationSequence(nextActivationSequence); + } catch (NodeManager.NodeManagerException fatal) { + logger.errorf("Server [%s] failed to set local activation sequence to: %d for NodeId =%s. Cannot continue committing coordinated activation sequence: REQUIRES ADMIN INTERVENTION", + serverDescription, nextActivationSequence, lockAndLongId); + throw new UnavailableStateException(fatal); + } + logger.infof("Server [%s], incremented local activation sequence to: %d for NodeId = %s", + serverDescription, nextActivationSequence, lockAndLongId); // commit if (!coordinatedNodeActivationSequence.compareAndSet(-nextActivationSequence, nextActivationSequence)) { final String message = String.format("Server [%s], cannot assume live role for NodeID = %s, activation sequence commit failed, local activation sequence %d no longer matches current coordinated sequence %d", - activeMQServer, lockAndLongId, nodeActivationSequence, coordinatedNodeActivationSequence.get()); + serverDescription, lockAndLongId, nodeActivationSequence, coordinatedNodeActivationSequence.get()); logger.infof(message); throw new ActiveMQException(message); } logger.infof("Server [%s], incremented coordinated activation sequence to: %d for NodeId = %s", - activeMQServer, nextActivationSequence, lockAndLongId); + serverDescription, nextActivationSequence, lockAndLongId); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/PluggableQuorumReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/PluggableQuorumReplicationTest.java index 69eadfd539..74c61396f6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/PluggableQuorumReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/PluggableQuorumReplicationTest.java @@ -428,8 +428,8 @@ public class PluggableQuorumReplicationTest extends SharedNothingReplicationTest } liveServer.start(); Wait.waitFor(liveServer::isStarted); - Assert.assertEquals(2, liveServer.getNodeManager().getNodeActivationSequence()); - Assert.assertEquals(2, distributedPrimitiveManager.getMutableLong(coordinatedId).get()); + Assert.assertEquals(3, liveServer.getNodeManager().getNodeActivationSequence()); + Assert.assertEquals(3, distributedPrimitiveManager.getMutableLong(coordinatedId).get()); distributedPrimitiveManager.stop(); @@ -438,7 +438,7 @@ public class PluggableQuorumReplicationTest extends SharedNothingReplicationTest backupServer.setIdentity("BACKUP"); backupServer.start(); Wait.waitFor(backupServer::isReplicaSync); - Assert.assertEquals(2, backupServer.getNodeManager().getNodeActivationSequence()); + Assert.assertEquals(3, backupServer.getNodeManager().getNodeActivationSequence()); backupServer.stop(); } @@ -575,20 +575,15 @@ public class PluggableQuorumReplicationTest extends SharedNothingReplicationTest final MutableLong activationSequence = distributedPrimitiveManager.getMutableLong(coordinatedId); Assert.assertTrue(activationSequence.compareAndSet(2, -2)); - // case: 1, the fail to write locally 2 but the write actually succeeding - // should delay pending resolution of the uncommitted claim - backupServer.start(); - // live server should activate after self healing its outstanding claim liveServer.start(); Wait.waitFor(liveServer::isStarted); - - assertTrue(Wait.waitFor(backupServer::isReplicaSync)); - assertTrue(liveServer.isReplicaSync()); + Assert.assertEquals(3, liveServer.getNodeManager().getNodeActivationSequence()); + Assert.assertEquals(3, activationSequence.get()); } @Test - public void testUnavailableAdminIntervention() throws Exception { + public void testUnavailableAutoRepair() throws Exception { // start backup Configuration backupConfiguration = createBackupConfiguration(); ActiveMQServer backupServer = addServer(ActiveMQServers.newActiveMQServer(backupConfiguration)); @@ -610,7 +605,6 @@ public class PluggableQuorumReplicationTest extends SharedNothingReplicationTest final String coordinatedId = liveServer.getNodeID().toString(); - System.err.println("coodr id: " + coordinatedId); backupServer.stop(); TimeUnit.MILLISECONDS.sleep(500); liveServer.stop(); @@ -651,15 +645,14 @@ public class PluggableQuorumReplicationTest extends SharedNothingReplicationTest TimeUnit.MILLISECONDS.sleep(500); // both are candidates and one of them failed to commit the claim // let them compete on retry - Assert.assertTrue(coordinatedActivationSequence.compareAndSet(-2, 1)); // one of the two can activate Wait.waitFor(() -> liveServer.isStarted() || backupServer.isStarted()); assertTrue(Wait.waitFor(backupServer::isReplicaSync)); assertTrue(liveServer.isReplicaSync()); - assertEquals(2, backupServer.getNodeManager().getNodeActivationSequence()); - assertEquals(2, liveServer.getNodeManager().getNodeActivationSequence()); + assertEquals(3, backupServer.getNodeManager().getNodeActivationSequence()); + assertEquals(3, liveServer.getNodeManager().getNodeActivationSequence()); } diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/quorum/ZookeeperPluggableQuorumPeerTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/quorum/ZookeeperPluggableQuorumPeerTest.java index 6e622b3b33..ca2bc13f7c 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/quorum/ZookeeperPluggableQuorumPeerTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/quorum/ZookeeperPluggableQuorumPeerTest.java @@ -90,7 +90,7 @@ public class ZookeeperPluggableQuorumPeerTest extends ZookeeperPluggableQuorumSi Wait.assertEquals(2L, () -> primary.getActivationSequence().orElse(Long.MAX_VALUE).longValue(), timeout); Assert.assertEquals(coordinationId, primary.getNodeID().get()); LOGGER.info("waiting peer b to be a replica"); - Wait.waitFor(() -> backup.isReplicaSync().get()); + Wait.waitFor(() -> backup.isReplicaSync().orElse(false)); Wait.assertEquals(2L, () -> backup.getActivationSequence().get().longValue()); final String expectedUrlPeerA = liveOf(coordinationId, decodeNetworkTopologyJson(primary.listNetworkTopology().get())); Assert.assertEquals(urlPeerA, expectedUrlPeerA);