diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index ad2093ad8b..63a7bbd44a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -94,7 +94,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon public interface ReplicationEndpointEventListener { - void onRemoteBackupUpToDate(); + void onRemoteBackupUpToDate(String nodeId, long activationSequence); void onLiveStopping(ReplicationLiveIsStoppingMessage.LiveStopping message) throws ActiveMQException; @@ -419,9 +419,9 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon } } - private synchronized void finishSynchronization(String liveID) throws Exception { + private synchronized void finishSynchronization(String liveID, long activationSequence) throws Exception { if (logger.isTraceEnabled()) { - logger.trace("BACKUP-SYNC-START: finishSynchronization::" + liveID); + logger.trace("BACKUP-SYNC-START: finishSynchronization::" + liveID + " activationSequence = " + activationSequence); } for (JournalContent jc : EnumSet.allOf(JournalContent.class)) { Journal journal = journalsHolder.remove(jc); @@ -476,8 +476,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon } journalsHolder = null; - eventListener.onLiveNodeId(liveID); - eventListener.onRemoteBackupUpToDate(); + eventListener.onRemoteBackupUpToDate(liveID, activationSequence); if (logger.isTraceEnabled()) { logger.trace("Backup is synchronized / BACKUP-SYNC-DONE"); @@ -560,12 +559,13 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon return replicationResponseMessage; if (packet.isSynchronizationFinished()) { + long activationSequence = 0; if (packet.getFileIds() != null && packet.getFileIds().length == 1) { // this is the version sequence of the data we are replicating // verified if we activate with this data - server.getNodeManager().writeNodeActivationSequence(packet.getFileIds()[0]); + activationSequence = packet.getFileIds()[0]; } - finishSynchronization(packet.getNodeID()); + finishSynchronization(packet.getNodeID(), activationSequence); replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true); return replicationResponseMessage; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java index b6dd08f19c..fd3eec5f04 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java @@ -39,7 +39,8 @@ public abstract class NodeManager implements ActiveMQComponent { private UUID uuid; private boolean isStarted = false; private final Set lockListeners; - protected long nodeActivationSequence; // local version of a coordinated sequence, tracking state transitions of ownership + public static final long NULL_NODE_ACTIVATION_SEQUENCE = -1; + protected long nodeActivationSequence = NULL_NODE_ACTIVATION_SEQUENCE; // local version of a coordinated sequence, tracking state transitions of ownership public NodeManager(final boolean replicatedBackup) { this.replicatedBackup = replicatedBackup; @@ -98,9 +99,12 @@ public abstract class NodeManager implements ActiveMQComponent { } } - public void setNodeActivationSequence(long activationSequence) { + public void setNodeActivationSequence(long sequence) { + if (sequence != NULL_NODE_ACTIVATION_SEQUENCE && sequence < 0) { + throw new IllegalArgumentException("activation sequence must be >=0 or NULL_NODE_ACTIVATION_SEQUENCE"); + } synchronized (nodeIDGuard) { - nodeActivationSequence = activationSequence; + nodeActivationSequence = sequence; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileBasedNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileBasedNodeManager.java index 56b59252a8..6193913375 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileBasedNodeManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileBasedNodeManager.java @@ -22,6 +22,9 @@ import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.OpenOption; +import java.nio.file.Path; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.NodeManager; @@ -39,6 +42,7 @@ public abstract class FileBasedNodeManager extends NodeManager { public static final String SERVER_ACTIVATION_SEQUENCE_NAME = "server.activation.sequence"; private static final String ACCESS_MODE = "rw"; private final File directory; + private final Path activationSequencePath; protected FileChannel channel; protected FileChannel activationSequenceChannel; @@ -48,13 +52,31 @@ public abstract class FileBasedNodeManager extends NodeManager { if (directory != null) { directory.mkdirs(); } + activationSequencePath = new File(directory, SERVER_ACTIVATION_SEQUENCE_NAME).toPath(); } - protected void useActivationSequenceChannel() throws IOException { - if (activationSequenceChannel != null) { - return; + /** + * If {@code createIfNotExists} and activation sequence file doesn't exist yet, it returns {@code null}, + * otherwise it opens it.
+ * if {@code !createIfNotExists} it just open to create it. + */ + private FileChannel useActivationSequenceChannel(final boolean createIfNotExists) throws IOException { + FileChannel channel = this.activationSequenceChannel; + if (channel != null) { + return channel; } - activationSequenceChannel = FileChannel.open(newFile(SERVER_ACTIVATION_SEQUENCE_NAME).toPath(), READ, WRITE, CREATE); + final OpenOption[] openOptions; + if (!createIfNotExists) { + if (!Files.exists(activationSequencePath)) { + return null; + } + openOptions = new OpenOption[]{READ, WRITE}; + } else { + openOptions = new OpenOption[]{READ, WRITE, CREATE}; + } + channel = FileChannel.open(activationSequencePath, openOptions); + activationSequenceChannel = channel; + return channel; } @Override @@ -63,30 +85,37 @@ public abstract class FileBasedNodeManager extends NodeManager { throw new NodeManagerException(new IllegalStateException("node manager must be started first")); } try { - useActivationSequenceChannel(); - ByteBuffer tmpBuffer = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN); - if (activationSequenceChannel.read(tmpBuffer, 0) != Long.BYTES) { - return 0; + final FileChannel channel = useActivationSequenceChannel(false); + if (channel == null) { + setNodeActivationSequence(NULL_NODE_ACTIVATION_SEQUENCE); + return NULL_NODE_ACTIVATION_SEQUENCE; + } + final ByteBuffer tmpBuffer = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN); + if (channel.read(tmpBuffer, 0) != Long.BYTES) { + setNodeActivationSequence(NULL_NODE_ACTIVATION_SEQUENCE); + return NULL_NODE_ACTIVATION_SEQUENCE; } tmpBuffer.flip(); - return tmpBuffer.getLong(0); + final long activationSequence = tmpBuffer.getLong(0); + setNodeActivationSequence(activationSequence); + return activationSequence; } catch (IOException ie) { throw new NodeManagerException(ie); } } @Override - public void writeNodeActivationSequence(long version) throws NodeManagerException { + public void writeNodeActivationSequence(long sequence) throws NodeManagerException { if (!isStarted()) { throw new NodeManagerException(new IllegalStateException("node manager must be started first")); } try { - useActivationSequenceChannel(); - ByteBuffer tmpBuffer = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN); - tmpBuffer.putLong(0, version); - activationSequenceChannel.write(tmpBuffer, 0); - activationSequenceChannel.force(false); - setNodeActivationSequence(version); + final FileChannel channel = useActivationSequenceChannel(true); + final ByteBuffer tmpBuffer = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN); + tmpBuffer.putLong(0, sequence); + channel.write(tmpBuffer, 0); + channel.force(false); + setNodeActivationSequence(sequence); } catch (IOException ie) { throw new NodeManagerException(ie); } @@ -190,6 +219,7 @@ public abstract class FileBasedNodeManager extends NodeManager { channelCopy.close(); } finally { try { + setNodeActivationSequence(NULL_NODE_ACTIVATION_SEQUENCE); FileChannel dataVersionChannel = this.activationSequenceChannel; this.activationSequenceChannel = null; if (dataVersionChannel != null) { @@ -203,13 +233,19 @@ public abstract class FileBasedNodeManager extends NodeManager { @Override public void stopBackup() throws NodeManagerException { - if (replicatedBackup && getNodeId() != null) { - try { - setUpServerLockFile(); - } catch (IOException e) { - throw new NodeManagerException(e); + synchronized (nodeIDGuard) { + if (replicatedBackup && getNodeId() != null) { + try { + setUpServerLockFile(); + final long nodeActivationSequence = this.nodeActivationSequence; + if (nodeActivationSequence != NULL_NODE_ACTIVATION_SEQUENCE) { + writeNodeActivationSequence(nodeActivationSequence); + } + } catch (IOException e) { + throw new NodeManagerException(e); + } } + super.stopBackup(); } - super.stopBackup(); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java index 3b3da1c8c4..67f469ce3a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java @@ -100,6 +100,7 @@ public class FileLockNodeManager extends FileBasedNodeManager { } super.start(); + readNodeActivationSequence(); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java index bbe4191a09..f4471e71b6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java @@ -62,6 +62,15 @@ public final class InVMNodeManager extends FileBasedNodeManager { setUUID(UUIDGenerator.getInstance().generateUUID()); } + @Override + public synchronized void start() throws Exception { + if (isStarted()) { + return; + } + super.start(); + readNodeActivationSequence(); + } + @Override public void awaitLiveNode() throws InterruptedException { do { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationBackupActivation.java index efeb2dd312..d4e9a4c71c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationBackupActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationBackupActivation.java @@ -41,6 +41,7 @@ import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager; import org.apache.activemq.artemis.quorum.UnavailableStateException; import org.jboss.logging.Logger; +import static org.apache.activemq.artemis.core.server.NodeManager.NULL_NODE_ACTIVATION_SEQUENCE; import static org.apache.activemq.artemis.core.server.impl.ReplicationObserver.ReplicationFailure; import static org.apache.activemq.artemis.core.server.impl.quorum.ActivationSequenceStateMachine.ensureSequentialAccessToNodeData; import static org.apache.activemq.artemis.core.server.impl.quorum.ActivationSequenceStateMachine.tryActivate; @@ -74,11 +75,17 @@ public final class ReplicationBackupActivation extends Activation implements Dis final ReplicationBackupPolicy policy) { this.activeMQServer = activeMQServer; if (policy.isTryFailback()) { - final SimpleString serverNodeID = activeMQServer.getNodeID(); - if (serverNodeID == null || serverNodeID.isEmpty()) { - throw new IllegalStateException("A failback activation must be biased around a specific NodeID"); + // patch expectedNodeID + final String coordinationId = policy.getLivePolicy().getCoordinationId(); + if (coordinationId != null) { + expectedNodeID = coordinationId; + } else { + final SimpleString serverNodeID = activeMQServer.getNodeID(); + if (serverNodeID == null || serverNodeID.isEmpty()) { + throw new IllegalStateException("A failback activation must be biased around a specific NodeID"); + } + this.expectedNodeID = serverNodeID.toString(); } - this.expectedNodeID = serverNodeID.toString(); } else { this.expectedNodeID = null; } @@ -142,8 +149,19 @@ public final class ReplicationBackupActivation extends Activation implements Dis } } try { + synchronized (activeMQServer) { + activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED); + } + // restart of server due to unavailable quorum can cause NM restart losing the coordination-id + final String coordinationId = policy.getLivePolicy().getCoordinationId(); + if (coordinationId != null) { + final String nodeId = activeMQServer.getNodeManager().getNodeId().toString(); + if (!coordinationId.equals(nodeId)) { + ReplicationPrimaryActivation.applyCoordinationId(coordinationId, activeMQServer); + } + } distributedManager.start(); - final long nodeActivationSequence = activeMQServer.getNodeManager().readNodeActivationSequence(); + final long nodeActivationSequence = activeMQServer.getNodeManager().getNodeActivationSequence(); // only a backup with positive local activation sequence could contain valuable data if (nodeActivationSequence > 0) { final String nodeId = activeMQServer.getNodeManager().getNodeId().toString(); @@ -162,7 +180,6 @@ public final class ReplicationBackupActivation extends Activation implements Dis if (!activeMQServer.initialisePart1(false)) { return; } - activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED); startAsLive(liveLockWithInSyncReplica); return; } @@ -174,6 +191,7 @@ public final class ReplicationBackupActivation extends Activation implements Dis // A primary need to preserve NodeID across runs activeMQServer.moveServerData(policy.getMaxSavedReplicatedJournalsSize(), policy.isTryFailback()); activeMQServer.getNodeManager().start(); + // allow JMX to query Artemis state if (!activeMQServer.initialisePart1(false)) { return; } @@ -190,7 +208,6 @@ public final class ReplicationBackupActivation extends Activation implements Dis clusterController.awaitConnectionToReplicationCluster(); activeMQServer.getBackupManager().start(); - activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED); final DistributedLock liveLock = replicateAndFailover(clusterController); if (liveLock == null) { return; @@ -213,8 +230,14 @@ public final class ReplicationBackupActivation extends Activation implements Dis liveLock.close(); return; } + final NodeManager nodeManager = activeMQServer.getNodeManager(); try { - ensureSequentialAccessToNodeData(activeMQServer, distributedManager, LOGGER); + // stopBackup is going to write the NodeID and activation sequence previously set on the NodeManager, + // because activeMQServer.resetNodeManager() has created a NodeManager with replicatedBackup == true. + nodeManager.stopBackup(); + final String nodeId = nodeManager.getNodeId().toString(); + final long nodeActivationSequence = nodeManager.getNodeActivationSequence(); + ensureSequentialAccessToNodeData(nodeId, nodeActivationSequence, activeMQServer, distributedManager, LOGGER); } catch (Throwable fatal) { LOGGER.warn(fatal); // policy is already live one, but there's no activation yet: we can just stop @@ -222,9 +245,6 @@ public final class ReplicationBackupActivation extends Activation implements Dis throw new ActiveMQIllegalStateException("This server cannot ensure sequential access to broker data: activation is failed"); } ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer); - // stopBackup is going to write the NodeID previously set on the NodeManager, - // because activeMQServer.resetNodeManager() has created a NodeManager with replicatedBackup == true. - activeMQServer.getNodeManager().stopBackup(); activeMQServer.getStorageManager().start(); activeMQServer.getBackupManager().activated(); // IMPORTANT: @@ -280,6 +300,9 @@ public final class ReplicationBackupActivation extends Activation implements Dis return null; } } + if (expectedNodeID != null) { + LOGGER.infof("awaiting connecting to any live node with NodeID = %s", expectedNodeID); + } final ReplicationFailure failure = replicateLive(clusterController, nodeLocator, registrationFailureForwarder); if (failure == null) { Thread.sleep(clusterController.getRetryIntervalForReplicatedCluster()); @@ -300,7 +323,7 @@ public final class ReplicationBackupActivation extends Activation implements Dis } // no more interested into these events: handling it manually from here distributedManager.removeUnavailableManagerListener(this); - final long nodeActivationSequence = activeMQServer.getNodeManager().readNodeActivationSequence(); + final long nodeActivationSequence = activeMQServer.getNodeManager().getNodeActivationSequence(); final String nodeId = activeMQServer.getNodeManager().getNodeId().toString(); DistributedLock liveLockWithInSyncReplica = null; if (nodeActivationSequence > 0) { @@ -340,7 +363,7 @@ public final class ReplicationBackupActivation extends Activation implements Dis if (activationSequence != 0) { final SimpleString syncNodeId = activeMQServer.getNodeManager().getNodeId(); try { - activeMQServer.getNodeManager().writeNodeActivationSequence(0); + activeMQServer.getNodeManager().setNodeActivationSequence(NULL_NODE_ACTIVATION_SEQUENCE); } catch (Throwable fatal) { LOGGER.errorf(fatal, "Errored while resetting local activation sequence %d for NodeID = %s: stopping broker", activationSequence, syncNodeId); @@ -356,6 +379,10 @@ public final class ReplicationBackupActivation extends Activation implements Dis LOGGER.error("Stopping broker because of wrong node ID communication from live: maybe a misbehaving live?"); asyncRestartServer(activeMQServer, false); return null; + case WrongActivationSequence: + LOGGER.error("Stopping broker because of wrong activation sequence communication from live: maybe a misbehaving live?"); + asyncRestartServer(activeMQServer, false); + return null; default: throw new AssertionError("Unsupported failure " + failure); } @@ -560,7 +587,7 @@ public final class ReplicationBackupActivation extends Activation implements Dis } private static boolean connectToReplicationEndpoint(final ClusterControl liveControl, - final ReplicationEndpoint replicationEndpoint) { + final ReplicationEndpoint replicationEndpoint) { final Channel replicationChannel = liveControl.createReplicationChannel(); replicationChannel.setHandler(replicationEndpoint); replicationEndpoint.setChannel(replicationChannel); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationObserver.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationObserver.java index be2737c26c..c28a5d2e0c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationObserver.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationObserver.java @@ -41,7 +41,7 @@ final class ReplicationObserver implements ClusterTopologyListener, SessionFailu private static final Logger LOGGER = Logger.getLogger(ReplicationObserver.class); public enum ReplicationFailure { - VoluntaryFailOver, BackupNotInSync, NonVoluntaryFailover, RegistrationError, AlreadyReplicating, ClosedObserver, WrongNodeId; + VoluntaryFailOver, BackupNotInSync, NonVoluntaryFailover, RegistrationError, AlreadyReplicating, ClosedObserver, WrongNodeId, WrongActivationSequence } private final NodeManager nodeManager; @@ -244,7 +244,7 @@ final class ReplicationObserver implements ClusterTopologyListener, SessionFailu } @Override - public void onRemoteBackupUpToDate() { + public void onRemoteBackupUpToDate(String nodeId, long activationSequence) { if (backupUpToDate || closed || replicationFailure.isDone()) { return; } @@ -252,7 +252,27 @@ final class ReplicationObserver implements ClusterTopologyListener, SessionFailu if (backupUpToDate || closed || replicationFailure.isDone()) { return; } - assert liveID != null; + if (!validateNodeId(nodeId)) { + stopForcedFailoverAfterDelay(); + unlistenConnectionFailures(); + replicationFailure.complete(ReplicationFailure.WrongNodeId); + return; + } + if (liveID == null) { + liveID = nodeId; + } + if (activationSequence <= 0) { + // NOTE: + // activationSequence == 0 is still illegal, + // because live has to increase the sequence before replicating + stopForcedFailoverAfterDelay(); + unlistenConnectionFailures(); + LOGGER.errorf("Illegal activation sequence %d from NodeID = %s", activationSequence, nodeId); + replicationFailure.complete(ReplicationFailure.WrongActivationSequence); + return; + } + nodeManager.setNodeID(nodeId); + nodeManager.setNodeActivationSequence(activationSequence); backupManager.announceBackup(); backupUpToDate = true; } @@ -298,8 +318,9 @@ final class ReplicationObserver implements ClusterTopologyListener, SessionFailu unlistenConnectionFailures(); replicationFailure.complete(ReplicationFailure.WrongNodeId); } else if (liveID == null) { + // just store it locally: if is stored on the node manager + // it will be persisted on broker's stop while data is not yet in sync liveID = nodeId; - nodeManager.setNodeID(nodeId); } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationPrimaryActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationPrimaryActivation.java index abf92226d7..d8c3d38a35 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationPrimaryActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationPrimaryActivation.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.server.impl; import javax.annotation.concurrent.GuardedBy; +import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -110,15 +111,11 @@ public class ReplicationPrimaryActivation extends LiveActivation implements Dist try { // we have a common nodeId that we can share and coordinate with between peers if (policy.getCoordinationId() != null) { - LOGGER.infof("Applying shared peer NodeID=%s to enable coordinated live activation", policy.getCoordinationId()); - // REVISIT: this is quite clunky, also in backup activation, we just need new nodeID persisted! - activeMQServer.resetNodeManager(); - activeMQServer.getNodeManager().start(); - activeMQServer.getNodeManager().setNodeID(policy.getCoordinationId()); - activeMQServer.getNodeManager().stopBackup(); + applyCoordinationId(policy.getCoordinationId(), activeMQServer); } - final long nodeActivationSequence = activeMQServer.getNodeManager().readNodeActivationSequence(); - final String nodeId = activeMQServer.getNodeManager().readNodeId().toString(); + final long sequence = activeMQServer.getNodeManager().getNodeActivationSequence(); + final long nodeActivationSequence = sequence == NodeManager.NULL_NODE_ACTIVATION_SEQUENCE ? 0 : sequence; + final String nodeId = activeMQServer.getNodeManager().getNodeId().toString(); DistributedLock liveLock; while (true) { distributedManager.start(); @@ -136,7 +133,7 @@ public class ReplicationPrimaryActivation extends LiveActivation implements Dist return; } - ensureSequentialAccessToNodeData(activeMQServer, distributedManager, LOGGER); + ensureSequentialAccessToNodeData(nodeId, nodeActivationSequence, activeMQServer, distributedManager, LOGGER); activeMQServer.initialisePart1(false); @@ -166,6 +163,25 @@ public class ReplicationPrimaryActivation extends LiveActivation implements Dist } } + public static void applyCoordinationId(final String coordinationId, + final ActiveMQServerImpl activeMQServer) throws Exception { + Objects.requireNonNull(coordinationId); + if (!activeMQServer.getNodeManager().isStarted()) { + throw new IllegalStateException("NodeManager should be started"); + } + final long activationSequence = activeMQServer.getNodeManager().getNodeActivationSequence(); + LOGGER.infof("Applying shared peer NodeID=%s to enable coordinated live activation", coordinationId); + // REVISIT: this is quite clunky, also in backup activation, we just need new nodeID persisted! + activeMQServer.resetNodeManager(); + final NodeManager nodeManager = activeMQServer.getNodeManager(); + nodeManager.start(); + nodeManager.setNodeID(coordinationId); + nodeManager.stopBackup(); + // despite NM is restarted as "replicatedBackup" we need the last written activation sequence value in-memory + final long freshActivationSequence = nodeManager.readNodeActivationSequence(); + assert freshActivationSequence == activationSequence; + } + @Override public ChannelHandler getActivationChannelHandler(final Channel channel, final Acceptor acceptorUsed) { if (stoppingServer.get()) { @@ -366,8 +382,11 @@ public class ReplicationPrimaryActivation extends LiveActivation implements Dist } // we increment only if we are staying alive if (!stoppingServer.get() && STARTED.equals(activeMQServer.getState())) { + final NodeManager nodeManager = activeMQServer.getNodeManager(); + final String nodeId = nodeManager.getNodeId().toString(); + final long nodeActivationSequence = nodeManager.getNodeActivationSequence(); try { - ensureSequentialAccessToNodeData(activeMQServer, distributedManager, LOGGER); + ensureSequentialAccessToNodeData(nodeId, nodeActivationSequence, activeMQServer, distributedManager, LOGGER); } catch (Throwable fatal) { LOGGER.errorf(fatal, "Unexpected exception: %s on attempted activation sequence increment; stopping server async", fatal.getLocalizedMessage()); asyncStopServer(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java index 3876185803..13421c8e8c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java @@ -465,7 +465,8 @@ public final class SharedNothingBackupActivation extends Activation implements R } @Override - public void onRemoteBackupUpToDate() { + public void onRemoteBackupUpToDate(String nodeId, long ignoredActivationSequence) { + backupQuorum.liveIDSet(nodeId); activeMQServer.getBackupManager().announceBackup(); backupUpToDate = true; backupSyncLatch.countDown(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/quorum/ActivationSequenceStateMachine.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/quorum/ActivationSequenceStateMachine.java index 96083634df..60358ffa17 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/quorum/ActivationSequenceStateMachine.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/quorum/ActivationSequenceStateMachine.java @@ -58,13 +58,17 @@ public final class ActivationSequenceStateMachine { *

*

* After successfully returning from this method ie not null return value, a broker should use - * {@link #ensureSequentialAccessToNodeData(ActiveMQServer, DistributedPrimitiveManager, Logger)} to complete + * {@link #ensureSequentialAccessToNodeData} to complete * the activation and guarantee the initial not-replicated ownership of data. */ public static DistributedLock tryActivate(final String nodeId, final long nodeActivationSequence, final DistributedPrimitiveManager distributedManager, final Logger logger) throws InterruptedException, ExecutionException, TimeoutException, UnavailableStateException { + Objects.requireNonNull(nodeId); + if (nodeActivationSequence < 0) { + throw new IllegalArgumentException("nodeActivationSequence must be > 0"); + } final DistributedLock activationLock = distributedManager.getDistributedLock(nodeId); try (MutableLong coordinatedNodeSequence = distributedManager.getMutableLong(nodeId)) { while (true) { @@ -231,19 +235,25 @@ public final class ActivationSequenceStateMachine { } /** - * This is going to increment the coordinated activation sequence while holding the live lock, failing with some exception otherwise.
+ * This is going to increment the coordinated activation sequence and the local activation sequence + * (using {@link NodeManager#writeNodeActivationSequence}) while holding the live lock, + * failing with some exception otherwise.
*

* The acceptable states are {@link ValidationResult#InSync} and {@link ValidationResult#SelfRepair}, throwing some exception otherwise. *

* This must be used while holding a live lock to ensure not-exclusive ownership of data ie can be both used * while loosing connectivity with a replica or after successfully {@link #tryActivate(String, long, DistributedPrimitiveManager, Logger)}. */ - public static void ensureSequentialAccessToNodeData(ActiveMQServer activeMQServer, - DistributedPrimitiveManager distributedPrimitiveManager, + public static void ensureSequentialAccessToNodeData(final String lockAndLongId, + final long nodeActivationSequence, + final ActiveMQServer activeMQServer, + final DistributedPrimitiveManager distributedPrimitiveManager, final Logger logger) throws ActiveMQException, InterruptedException, UnavailableStateException, ExecutionException, TimeoutException { - final NodeManager nodeManager = activeMQServer.getNodeManager(); - final String lockAndLongId = nodeManager.getNodeId().toString(); + Objects.requireNonNull(lockAndLongId); + if (nodeActivationSequence < 0) { + throw new IllegalArgumentException("nodeActivationSequence must be >= 0"); + } final DistributedLock liveLock = distributedPrimitiveManager.getDistributedLock(lockAndLongId); if (!liveLock.isHeldByCaller()) { final String message = String.format("Server [%s], live lock for NodeID = %s, not held, activation sequence cannot be safely changed", @@ -251,7 +261,6 @@ public final class ActivationSequenceStateMachine { logger.info(message); throw new UnavailableStateException(message); } - final long nodeActivationSequence = nodeManager.readNodeActivationSequence(); final MutableLong coordinatedNodeActivationSequence = distributedPrimitiveManager.getMutableLong(lockAndLongId); final long currentCoordinatedActivationSequence = coordinatedNodeActivationSequence.get(); final long nextActivationSequence; @@ -275,6 +284,7 @@ public final class ActivationSequenceStateMachine { } nextActivationSequence = nodeActivationSequence + 1; } + final NodeManager nodeManager = activeMQServer.getNodeManager(); // UN_REPLICATED STATE ENTER: auto-repair doesn't need to claim and write locally if (nodeActivationSequence != nextActivationSequence) { // claim diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/quorum/PluggableQuorumSinglePairTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/quorum/PluggableQuorumSinglePairTest.java index ab6ee9237e..3f739b10c4 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/quorum/PluggableQuorumSinglePairTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/quorum/PluggableQuorumSinglePairTest.java @@ -137,7 +137,9 @@ public abstract class PluggableQuorumSinglePairTest extends SmokeTestBase { protected abstract boolean awaitAsyncSetupCompleted(long timeout, TimeUnit unit) throws InterruptedException; - protected abstract void stopMajority() throws Exception; + protected abstract int[] stopMajority() throws Exception; + + protected abstract void restart(int[] nodes) throws Exception; @Before public void setup() throws Exception { @@ -150,14 +152,33 @@ public abstract class PluggableQuorumSinglePairTest extends SmokeTestBase { super.after(); } + @Test + public void testCanQueryEmptyBackup() throws Exception { + final int timeout = (int) TimeUnit.SECONDS.toMillis(30); + LOGGER.info("starting primary"); + Process live = primary.startServer(this, timeout); + Assert.assertTrue(awaitAsyncSetupCompleted(timeout, TimeUnit.MILLISECONDS)); + Wait.assertTrue(() -> !primary.isBackup().orElse(true), timeout); + LOGGER.info("killing primary"); + ServerUtil.killServer(live, forceKill); + LOGGER.info("starting backup"); + backup.startServer(this, 0); + Wait.assertTrue(() -> backup.isBackup().orElse(false), timeout); + LOGGER.info("Stopping majority of consensus nodes"); + final int[] stopped = stopMajority(); + LOGGER.info("Waiting until isolated"); + Thread.sleep(2000); + LOGGER.info("Restarting majority of consensus nodes"); + restart(stopped); + Wait.assertTrue(() -> backup.isBackup().orElse(false), timeout); + } + @Test public void testBackupFailoverAndPrimaryFailback() throws Exception { final int timeout = (int) TimeUnit.SECONDS.toMillis(30); LOGGER.info("starting primary"); Process primaryInstance = primary.startServer(this, timeout); Assert.assertTrue(awaitAsyncSetupCompleted(timeout, TimeUnit.MILLISECONDS)); - Wait.assertTrue(() -> !primary.isBackup().orElse(true), timeout); - // primary UN REPLICATED Assert.assertEquals(1L, primary.getActivationSequence().get().longValue()); diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/quorum/ZookeeperPluggableQuorumPeerTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/quorum/ZookeeperPluggableQuorumPeerTest.java index 14fabd8292..6e622b3b33 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/quorum/ZookeeperPluggableQuorumPeerTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/quorum/ZookeeperPluggableQuorumPeerTest.java @@ -29,6 +29,8 @@ import org.junit.Assert; import org.junit.Test; import static org.apache.activemq.artemis.tests.smoke.utils.Jmx.containsExactNodeIds; +import static org.apache.activemq.artemis.tests.smoke.utils.Jmx.decodeNetworkTopologyJson; +import static org.apache.activemq.artemis.tests.smoke.utils.Jmx.liveOf; import static org.apache.activemq.artemis.tests.smoke.utils.Jmx.validateNetworkTopology; import static org.apache.activemq.artemis.tests.smoke.utils.Jmx.withBackup; import static org.apache.activemq.artemis.tests.smoke.utils.Jmx.withLive; @@ -56,6 +58,44 @@ public class ZookeeperPluggableQuorumPeerTest extends ZookeeperPluggableQuorumSi Wait.waitFor(this::ensembleHasLeader); } + @Test + public void testBackupCannotForgetPeerIdOnLostQuorum() throws Exception { + // see FileLockTest::testCorrelationId to get more info why this is not peer-journal-001 as in broker.xml + final String coordinationId = "peer.journal.001"; + final int timeout = (int) TimeUnit.SECONDS.toMillis(30); + LOGGER.info("starting peer a"); + final Process live = primary.startServer(this, 0); + LOGGER.info("waiting peer a to increase coordinated activation sequence to 1"); + Wait.assertEquals(1L, () -> primary.getActivationSequence().orElse(Long.MAX_VALUE).longValue(), timeout); + Assert.assertEquals(coordinationId, primary.getNodeID().get()); + Wait.waitFor(() -> primary.listNetworkTopology().isPresent(), timeout); + final String urlPeerA = liveOf(coordinationId, decodeNetworkTopologyJson(primary.listNetworkTopology().get())); + Assert.assertNotNull(urlPeerA); + LOGGER.infof("peer a acceptor: %s", urlPeerA); + LOGGER.info("killing peer a"); + ServerUtil.killServer(live, forceKill); + LOGGER.info("starting peer b"); + Process emptyBackup = backup.startServer(this, 0); + LOGGER.info("waiting until peer b act as empty backup"); + Wait.assertTrue(() -> backup.isBackup().orElse(false), timeout); + LOGGER.info("Stop majority of quorum nodes"); + final int[] majority = stopMajority(); + LOGGER.info("Wait peer b to deactivate"); + Thread.sleep(2000); + LOGGER.info("Restart majority of quorum nodes"); + restart(majority); + LOGGER.info("Restart peer a as legit last live"); + final Process restartedLive = primary.startServer(this, 0); + LOGGER.info("waiting peer a to increase coordinated activation sequence to 2"); + Wait.assertEquals(2L, () -> primary.getActivationSequence().orElse(Long.MAX_VALUE).longValue(), timeout); + Assert.assertEquals(coordinationId, primary.getNodeID().get()); + LOGGER.info("waiting peer b to be a replica"); + Wait.waitFor(() -> backup.isReplicaSync().get()); + Wait.assertEquals(2L, () -> backup.getActivationSequence().get().longValue()); + final String expectedUrlPeerA = liveOf(coordinationId, decodeNetworkTopologyJson(primary.listNetworkTopology().get())); + Assert.assertEquals(urlPeerA, expectedUrlPeerA); + } + @Test public void testMultiPrimary_Peer() throws Exception { diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/quorum/ZookeeperPluggableQuorumSinglePairTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/quorum/ZookeeperPluggableQuorumSinglePairTest.java index b284541708..5d0de04424 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/quorum/ZookeeperPluggableQuorumSinglePairTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/quorum/ZookeeperPluggableQuorumSinglePairTest.java @@ -87,11 +87,22 @@ public class ZookeeperPluggableQuorumSinglePairTest extends PluggableQuorumSingl } @Override - protected void stopMajority() throws Exception { + protected int[] stopMajority() throws Exception { List followers = testingServer.getServers(); final int quorum = (nodes / 2) + 1; + final int[] stopped = new int[quorum]; for (int i = 0; i < quorum; i++) { followers.get(i).stop(); + stopped[i] = i; + } + return stopped; + } + + @Override + protected void restart(int[] nodes) throws Exception { + List servers = testingServer.getServers(); + for (int nodeIndex : nodes) { + servers.get(nodeIndex).restart(); } } } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/FileLockTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/FileLockTest.java index bfac547824..c3ea7aa485 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/FileLockTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/FileLockTest.java @@ -17,13 +17,24 @@ package org.apache.activemq.artemis.tests.unit.core.server.impl; import java.io.File; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.StandardOpenOption; +import java.util.Arrays; +import java.util.Set; import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration; +import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.UUID; +import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static java.util.stream.Collectors.toSet; + public class FileLockTest extends ActiveMQTestBase { @Override @@ -53,6 +64,91 @@ public class FileLockTest extends ActiveMQTestBase { } } + @Test + public void testNodeManagerStartPersistence() throws Exception { + final File managerDirectory = getTestDirfile(); + FileLockNodeManager manager = new FileLockNodeManager(managerDirectory, false); + manager.start(); + Set files = Arrays.stream(managerDirectory.listFiles(pathname -> pathname.isFile())).collect(toSet()); + final Set expectedFileNames = Arrays.stream(new String[]{FileLockNodeManager.SERVER_LOCK_NAME, "serverlock.1", "serverlock.2"}) + .collect(toSet()); + Assert.assertEquals(expectedFileNames, files.stream().map(File::getName).collect(toSet())); + final File nodeIdFile = files.stream().filter(file -> file.getName().equals(FileLockNodeManager.SERVER_LOCK_NAME)).findFirst().get(); + final byte[] encodedNodeId = manager.getUUID().asBytes(); + try (FileChannel serverLock = FileChannel.open(nodeIdFile.toPath(), StandardOpenOption.READ)) { + Assert.assertEquals(16, encodedNodeId.length); + Assert.assertEquals(19, serverLock.size()); + final ByteBuffer readNodeId = ByteBuffer.allocate(16); + serverLock.read(readNodeId, 3); + readNodeId.flip(); + Assert.assertArrayEquals(encodedNodeId, readNodeId.array()); + } + Assert.assertEquals(NodeManager.NULL_NODE_ACTIVATION_SEQUENCE, manager.getNodeActivationSequence()); + Assert.assertEquals(NodeManager.NULL_NODE_ACTIVATION_SEQUENCE, manager.readNodeActivationSequence()); + Assert.assertEquals(3, managerDirectory.listFiles(pathname -> pathname.isFile()).length); + manager.stop(); + } + + @Test + public void testReplicateBackupNodeManagerStartPersistence() throws Exception { + final File managerDirectory = getTestDirfile(); + FileLockNodeManager manager = new FileLockNodeManager(managerDirectory, true); + manager.start(); + Set files = Arrays.stream(managerDirectory.listFiles(pathname -> pathname.isFile())).collect(toSet()); + Assert.assertTrue(files.isEmpty()); + Assert.assertNull(manager.getNodeId()); + Assert.assertNull(manager.getUUID()); + Assert.assertEquals(NodeManager.NULL_NODE_ACTIVATION_SEQUENCE, manager.getNodeActivationSequence()); + Assert.assertEquals(NodeManager.NULL_NODE_ACTIVATION_SEQUENCE, manager.readNodeActivationSequence()); + Assert.assertEquals(0, managerDirectory.listFiles(pathname -> pathname.isFile()).length); + manager.stop(); + } + + @Test + public void testReplicatedStopBackupPersistence() throws Exception { + final FileLockNodeManager manager = new FileLockNodeManager(getTestDirfile(), false); + manager.start(); + Assert.assertNotNull(manager.getUUID()); + manager.writeNodeActivationSequence(1); + final long nodeActivationSequence = manager.getNodeActivationSequence(); + Assert.assertEquals(1, nodeActivationSequence); + manager.stop(); + // replicated manager read activation sequence (if any) but ignore NodeId + final FileLockNodeManager replicatedManager = new FileLockNodeManager(getTestDirfile(), true); + replicatedManager.start(); + Assert.assertNull(replicatedManager.getUUID()); + Assert.assertEquals(1, replicatedManager.getNodeActivationSequence()); + UUID storedNodeId = UUIDGenerator.getInstance().generateUUID(); + replicatedManager.setNodeID(storedNodeId.toString()); + replicatedManager.setNodeActivationSequence(2); + replicatedManager.stopBackup(); + replicatedManager.setNodeID(UUIDGenerator.getInstance().generateStringUUID()); + replicatedManager.setNodeActivationSequence(3); + replicatedManager.stop(); + // start read whatever has been persisted by stopBackup + manager.start(); + Assert.assertEquals(storedNodeId, manager.getUUID()); + Assert.assertEquals(2, manager.getNodeActivationSequence()); + manager.stop(); + } + + @Test + public void testWriteNodeActivationSequence() throws Exception { + final FileLockNodeManager manager = new FileLockNodeManager(getTestDirfile(), false); + manager.start(); + UUID id = manager.getUUID(); + Assert.assertNotNull(manager.getUUID()); + manager.writeNodeActivationSequence(1); + final long nodeActivationSequence = manager.getNodeActivationSequence(); + Assert.assertEquals(1, nodeActivationSequence); + manager.stop(); + final FileLockNodeManager otherManager = new FileLockNodeManager(getTestDirfile(), false); + otherManager.start(); + Assert.assertEquals(id, otherManager.getUUID()); + Assert.assertEquals(1, otherManager.getNodeActivationSequence()); + otherManager.stop(); + } + @Test public void testNIOLock() throws Exception { doTestLock(new FileLockNodeManager(getTestDirfile(), false), new FileLockNodeManager(getTestDirfile(), false));