ARTEMIS-3429 Backup forget coordination-id after quorum loss

This commit is contained in:
franz1981 2021-08-09 13:04:57 +02:00
parent 276f822a0e
commit 44dd84d704
14 changed files with 368 additions and 72 deletions

View File

@ -94,7 +94,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
public interface ReplicationEndpointEventListener { public interface ReplicationEndpointEventListener {
void onRemoteBackupUpToDate(); void onRemoteBackupUpToDate(String nodeId, long activationSequence);
void onLiveStopping(ReplicationLiveIsStoppingMessage.LiveStopping message) throws ActiveMQException; void onLiveStopping(ReplicationLiveIsStoppingMessage.LiveStopping message) throws ActiveMQException;
@ -419,9 +419,9 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
} }
} }
private synchronized void finishSynchronization(String liveID) throws Exception { private synchronized void finishSynchronization(String liveID, long activationSequence) throws Exception {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("BACKUP-SYNC-START: finishSynchronization::" + liveID); logger.trace("BACKUP-SYNC-START: finishSynchronization::" + liveID + " activationSequence = " + activationSequence);
} }
for (JournalContent jc : EnumSet.allOf(JournalContent.class)) { for (JournalContent jc : EnumSet.allOf(JournalContent.class)) {
Journal journal = journalsHolder.remove(jc); Journal journal = journalsHolder.remove(jc);
@ -476,8 +476,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
} }
journalsHolder = null; journalsHolder = null;
eventListener.onLiveNodeId(liveID); eventListener.onRemoteBackupUpToDate(liveID, activationSequence);
eventListener.onRemoteBackupUpToDate();
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Backup is synchronized / BACKUP-SYNC-DONE"); logger.trace("Backup is synchronized / BACKUP-SYNC-DONE");
@ -560,12 +559,13 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
return replicationResponseMessage; return replicationResponseMessage;
if (packet.isSynchronizationFinished()) { if (packet.isSynchronizationFinished()) {
long activationSequence = 0;
if (packet.getFileIds() != null && packet.getFileIds().length == 1) { if (packet.getFileIds() != null && packet.getFileIds().length == 1) {
// this is the version sequence of the data we are replicating // this is the version sequence of the data we are replicating
// verified if we activate with this data // verified if we activate with this data
server.getNodeManager().writeNodeActivationSequence(packet.getFileIds()[0]); activationSequence = packet.getFileIds()[0];
} }
finishSynchronization(packet.getNodeID()); finishSynchronization(packet.getNodeID(), activationSequence);
replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true); replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true);
return replicationResponseMessage; return replicationResponseMessage;
} }

View File

@ -39,7 +39,8 @@ public abstract class NodeManager implements ActiveMQComponent {
private UUID uuid; private UUID uuid;
private boolean isStarted = false; private boolean isStarted = false;
private final Set<FileLockNodeManager.LockListener> lockListeners; private final Set<FileLockNodeManager.LockListener> lockListeners;
protected long nodeActivationSequence; // local version of a coordinated sequence, tracking state transitions of ownership public static final long NULL_NODE_ACTIVATION_SEQUENCE = -1;
protected long nodeActivationSequence = NULL_NODE_ACTIVATION_SEQUENCE; // local version of a coordinated sequence, tracking state transitions of ownership
public NodeManager(final boolean replicatedBackup) { public NodeManager(final boolean replicatedBackup) {
this.replicatedBackup = replicatedBackup; this.replicatedBackup = replicatedBackup;
@ -98,9 +99,12 @@ public abstract class NodeManager implements ActiveMQComponent {
} }
} }
public void setNodeActivationSequence(long activationSequence) { public void setNodeActivationSequence(long sequence) {
if (sequence != NULL_NODE_ACTIVATION_SEQUENCE && sequence < 0) {
throw new IllegalArgumentException("activation sequence must be >=0 or NULL_NODE_ACTIVATION_SEQUENCE");
}
synchronized (nodeIDGuard) { synchronized (nodeIDGuard) {
nodeActivationSequence = activationSequence; nodeActivationSequence = sequence;
} }
} }

View File

@ -22,6 +22,9 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.NodeManager;
@ -39,6 +42,7 @@ public abstract class FileBasedNodeManager extends NodeManager {
public static final String SERVER_ACTIVATION_SEQUENCE_NAME = "server.activation.sequence"; public static final String SERVER_ACTIVATION_SEQUENCE_NAME = "server.activation.sequence";
private static final String ACCESS_MODE = "rw"; private static final String ACCESS_MODE = "rw";
private final File directory; private final File directory;
private final Path activationSequencePath;
protected FileChannel channel; protected FileChannel channel;
protected FileChannel activationSequenceChannel; protected FileChannel activationSequenceChannel;
@ -48,13 +52,31 @@ public abstract class FileBasedNodeManager extends NodeManager {
if (directory != null) { if (directory != null) {
directory.mkdirs(); directory.mkdirs();
} }
activationSequencePath = new File(directory, SERVER_ACTIVATION_SEQUENCE_NAME).toPath();
} }
protected void useActivationSequenceChannel() throws IOException { /**
if (activationSequenceChannel != null) { * If {@code createIfNotExists} and activation sequence file doesn't exist yet, it returns {@code null},
return; * otherwise it opens it.<br>
* if {@code !createIfNotExists} it just open to create it.
*/
private FileChannel useActivationSequenceChannel(final boolean createIfNotExists) throws IOException {
FileChannel channel = this.activationSequenceChannel;
if (channel != null) {
return channel;
} }
activationSequenceChannel = FileChannel.open(newFile(SERVER_ACTIVATION_SEQUENCE_NAME).toPath(), READ, WRITE, CREATE); final OpenOption[] openOptions;
if (!createIfNotExists) {
if (!Files.exists(activationSequencePath)) {
return null;
}
openOptions = new OpenOption[]{READ, WRITE};
} else {
openOptions = new OpenOption[]{READ, WRITE, CREATE};
}
channel = FileChannel.open(activationSequencePath, openOptions);
activationSequenceChannel = channel;
return channel;
} }
@Override @Override
@ -63,30 +85,37 @@ public abstract class FileBasedNodeManager extends NodeManager {
throw new NodeManagerException(new IllegalStateException("node manager must be started first")); throw new NodeManagerException(new IllegalStateException("node manager must be started first"));
} }
try { try {
useActivationSequenceChannel(); final FileChannel channel = useActivationSequenceChannel(false);
ByteBuffer tmpBuffer = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN); if (channel == null) {
if (activationSequenceChannel.read(tmpBuffer, 0) != Long.BYTES) { setNodeActivationSequence(NULL_NODE_ACTIVATION_SEQUENCE);
return 0; return NULL_NODE_ACTIVATION_SEQUENCE;
}
final ByteBuffer tmpBuffer = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN);
if (channel.read(tmpBuffer, 0) != Long.BYTES) {
setNodeActivationSequence(NULL_NODE_ACTIVATION_SEQUENCE);
return NULL_NODE_ACTIVATION_SEQUENCE;
} }
tmpBuffer.flip(); tmpBuffer.flip();
return tmpBuffer.getLong(0); final long activationSequence = tmpBuffer.getLong(0);
setNodeActivationSequence(activationSequence);
return activationSequence;
} catch (IOException ie) { } catch (IOException ie) {
throw new NodeManagerException(ie); throw new NodeManagerException(ie);
} }
} }
@Override @Override
public void writeNodeActivationSequence(long version) throws NodeManagerException { public void writeNodeActivationSequence(long sequence) throws NodeManagerException {
if (!isStarted()) { if (!isStarted()) {
throw new NodeManagerException(new IllegalStateException("node manager must be started first")); throw new NodeManagerException(new IllegalStateException("node manager must be started first"));
} }
try { try {
useActivationSequenceChannel(); final FileChannel channel = useActivationSequenceChannel(true);
ByteBuffer tmpBuffer = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN); final ByteBuffer tmpBuffer = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN);
tmpBuffer.putLong(0, version); tmpBuffer.putLong(0, sequence);
activationSequenceChannel.write(tmpBuffer, 0); channel.write(tmpBuffer, 0);
activationSequenceChannel.force(false); channel.force(false);
setNodeActivationSequence(version); setNodeActivationSequence(sequence);
} catch (IOException ie) { } catch (IOException ie) {
throw new NodeManagerException(ie); throw new NodeManagerException(ie);
} }
@ -190,6 +219,7 @@ public abstract class FileBasedNodeManager extends NodeManager {
channelCopy.close(); channelCopy.close();
} finally { } finally {
try { try {
setNodeActivationSequence(NULL_NODE_ACTIVATION_SEQUENCE);
FileChannel dataVersionChannel = this.activationSequenceChannel; FileChannel dataVersionChannel = this.activationSequenceChannel;
this.activationSequenceChannel = null; this.activationSequenceChannel = null;
if (dataVersionChannel != null) { if (dataVersionChannel != null) {
@ -203,13 +233,19 @@ public abstract class FileBasedNodeManager extends NodeManager {
@Override @Override
public void stopBackup() throws NodeManagerException { public void stopBackup() throws NodeManagerException {
synchronized (nodeIDGuard) {
if (replicatedBackup && getNodeId() != null) { if (replicatedBackup && getNodeId() != null) {
try { try {
setUpServerLockFile(); setUpServerLockFile();
final long nodeActivationSequence = this.nodeActivationSequence;
if (nodeActivationSequence != NULL_NODE_ACTIVATION_SEQUENCE) {
writeNodeActivationSequence(nodeActivationSequence);
}
} catch (IOException e) { } catch (IOException e) {
throw new NodeManagerException(e); throw new NodeManagerException(e);
} }
} }
super.stopBackup(); super.stopBackup();
} }
}
} }

View File

@ -100,6 +100,7 @@ public class FileLockNodeManager extends FileBasedNodeManager {
} }
super.start(); super.start();
readNodeActivationSequence();
} }
@Override @Override

View File

@ -62,6 +62,15 @@ public final class InVMNodeManager extends FileBasedNodeManager {
setUUID(UUIDGenerator.getInstance().generateUUID()); setUUID(UUIDGenerator.getInstance().generateUUID());
} }
@Override
public synchronized void start() throws Exception {
if (isStarted()) {
return;
}
super.start();
readNodeActivationSequence();
}
@Override @Override
public void awaitLiveNode() throws InterruptedException { public void awaitLiveNode() throws InterruptedException {
do { do {

View File

@ -41,6 +41,7 @@ import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager;
import org.apache.activemq.artemis.quorum.UnavailableStateException; import org.apache.activemq.artemis.quorum.UnavailableStateException;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.server.NodeManager.NULL_NODE_ACTIVATION_SEQUENCE;
import static org.apache.activemq.artemis.core.server.impl.ReplicationObserver.ReplicationFailure; import static org.apache.activemq.artemis.core.server.impl.ReplicationObserver.ReplicationFailure;
import static org.apache.activemq.artemis.core.server.impl.quorum.ActivationSequenceStateMachine.ensureSequentialAccessToNodeData; import static org.apache.activemq.artemis.core.server.impl.quorum.ActivationSequenceStateMachine.ensureSequentialAccessToNodeData;
import static org.apache.activemq.artemis.core.server.impl.quorum.ActivationSequenceStateMachine.tryActivate; import static org.apache.activemq.artemis.core.server.impl.quorum.ActivationSequenceStateMachine.tryActivate;
@ -74,11 +75,17 @@ public final class ReplicationBackupActivation extends Activation implements Dis
final ReplicationBackupPolicy policy) { final ReplicationBackupPolicy policy) {
this.activeMQServer = activeMQServer; this.activeMQServer = activeMQServer;
if (policy.isTryFailback()) { if (policy.isTryFailback()) {
// patch expectedNodeID
final String coordinationId = policy.getLivePolicy().getCoordinationId();
if (coordinationId != null) {
expectedNodeID = coordinationId;
} else {
final SimpleString serverNodeID = activeMQServer.getNodeID(); final SimpleString serverNodeID = activeMQServer.getNodeID();
if (serverNodeID == null || serverNodeID.isEmpty()) { if (serverNodeID == null || serverNodeID.isEmpty()) {
throw new IllegalStateException("A failback activation must be biased around a specific NodeID"); throw new IllegalStateException("A failback activation must be biased around a specific NodeID");
} }
this.expectedNodeID = serverNodeID.toString(); this.expectedNodeID = serverNodeID.toString();
}
} else { } else {
this.expectedNodeID = null; this.expectedNodeID = null;
} }
@ -142,8 +149,19 @@ public final class ReplicationBackupActivation extends Activation implements Dis
} }
} }
try { try {
synchronized (activeMQServer) {
activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
}
// restart of server due to unavailable quorum can cause NM restart losing the coordination-id
final String coordinationId = policy.getLivePolicy().getCoordinationId();
if (coordinationId != null) {
final String nodeId = activeMQServer.getNodeManager().getNodeId().toString();
if (!coordinationId.equals(nodeId)) {
ReplicationPrimaryActivation.applyCoordinationId(coordinationId, activeMQServer);
}
}
distributedManager.start(); distributedManager.start();
final long nodeActivationSequence = activeMQServer.getNodeManager().readNodeActivationSequence(); final long nodeActivationSequence = activeMQServer.getNodeManager().getNodeActivationSequence();
// only a backup with positive local activation sequence could contain valuable data // only a backup with positive local activation sequence could contain valuable data
if (nodeActivationSequence > 0) { if (nodeActivationSequence > 0) {
final String nodeId = activeMQServer.getNodeManager().getNodeId().toString(); final String nodeId = activeMQServer.getNodeManager().getNodeId().toString();
@ -162,7 +180,6 @@ public final class ReplicationBackupActivation extends Activation implements Dis
if (!activeMQServer.initialisePart1(false)) { if (!activeMQServer.initialisePart1(false)) {
return; return;
} }
activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
startAsLive(liveLockWithInSyncReplica); startAsLive(liveLockWithInSyncReplica);
return; return;
} }
@ -174,6 +191,7 @@ public final class ReplicationBackupActivation extends Activation implements Dis
// A primary need to preserve NodeID across runs // A primary need to preserve NodeID across runs
activeMQServer.moveServerData(policy.getMaxSavedReplicatedJournalsSize(), policy.isTryFailback()); activeMQServer.moveServerData(policy.getMaxSavedReplicatedJournalsSize(), policy.isTryFailback());
activeMQServer.getNodeManager().start(); activeMQServer.getNodeManager().start();
// allow JMX to query Artemis state
if (!activeMQServer.initialisePart1(false)) { if (!activeMQServer.initialisePart1(false)) {
return; return;
} }
@ -190,7 +208,6 @@ public final class ReplicationBackupActivation extends Activation implements Dis
clusterController.awaitConnectionToReplicationCluster(); clusterController.awaitConnectionToReplicationCluster();
activeMQServer.getBackupManager().start(); activeMQServer.getBackupManager().start();
activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
final DistributedLock liveLock = replicateAndFailover(clusterController); final DistributedLock liveLock = replicateAndFailover(clusterController);
if (liveLock == null) { if (liveLock == null) {
return; return;
@ -213,8 +230,14 @@ public final class ReplicationBackupActivation extends Activation implements Dis
liveLock.close(); liveLock.close();
return; return;
} }
final NodeManager nodeManager = activeMQServer.getNodeManager();
try { try {
ensureSequentialAccessToNodeData(activeMQServer, distributedManager, LOGGER); // stopBackup is going to write the NodeID and activation sequence previously set on the NodeManager,
// because activeMQServer.resetNodeManager() has created a NodeManager with replicatedBackup == true.
nodeManager.stopBackup();
final String nodeId = nodeManager.getNodeId().toString();
final long nodeActivationSequence = nodeManager.getNodeActivationSequence();
ensureSequentialAccessToNodeData(nodeId, nodeActivationSequence, activeMQServer, distributedManager, LOGGER);
} catch (Throwable fatal) { } catch (Throwable fatal) {
LOGGER.warn(fatal); LOGGER.warn(fatal);
// policy is already live one, but there's no activation yet: we can just stop // policy is already live one, but there's no activation yet: we can just stop
@ -222,9 +245,6 @@ public final class ReplicationBackupActivation extends Activation implements Dis
throw new ActiveMQIllegalStateException("This server cannot ensure sequential access to broker data: activation is failed"); throw new ActiveMQIllegalStateException("This server cannot ensure sequential access to broker data: activation is failed");
} }
ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer); ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer);
// stopBackup is going to write the NodeID previously set on the NodeManager,
// because activeMQServer.resetNodeManager() has created a NodeManager with replicatedBackup == true.
activeMQServer.getNodeManager().stopBackup();
activeMQServer.getStorageManager().start(); activeMQServer.getStorageManager().start();
activeMQServer.getBackupManager().activated(); activeMQServer.getBackupManager().activated();
// IMPORTANT: // IMPORTANT:
@ -280,6 +300,9 @@ public final class ReplicationBackupActivation extends Activation implements Dis
return null; return null;
} }
} }
if (expectedNodeID != null) {
LOGGER.infof("awaiting connecting to any live node with NodeID = %s", expectedNodeID);
}
final ReplicationFailure failure = replicateLive(clusterController, nodeLocator, registrationFailureForwarder); final ReplicationFailure failure = replicateLive(clusterController, nodeLocator, registrationFailureForwarder);
if (failure == null) { if (failure == null) {
Thread.sleep(clusterController.getRetryIntervalForReplicatedCluster()); Thread.sleep(clusterController.getRetryIntervalForReplicatedCluster());
@ -300,7 +323,7 @@ public final class ReplicationBackupActivation extends Activation implements Dis
} }
// no more interested into these events: handling it manually from here // no more interested into these events: handling it manually from here
distributedManager.removeUnavailableManagerListener(this); distributedManager.removeUnavailableManagerListener(this);
final long nodeActivationSequence = activeMQServer.getNodeManager().readNodeActivationSequence(); final long nodeActivationSequence = activeMQServer.getNodeManager().getNodeActivationSequence();
final String nodeId = activeMQServer.getNodeManager().getNodeId().toString(); final String nodeId = activeMQServer.getNodeManager().getNodeId().toString();
DistributedLock liveLockWithInSyncReplica = null; DistributedLock liveLockWithInSyncReplica = null;
if (nodeActivationSequence > 0) { if (nodeActivationSequence > 0) {
@ -340,7 +363,7 @@ public final class ReplicationBackupActivation extends Activation implements Dis
if (activationSequence != 0) { if (activationSequence != 0) {
final SimpleString syncNodeId = activeMQServer.getNodeManager().getNodeId(); final SimpleString syncNodeId = activeMQServer.getNodeManager().getNodeId();
try { try {
activeMQServer.getNodeManager().writeNodeActivationSequence(0); activeMQServer.getNodeManager().setNodeActivationSequence(NULL_NODE_ACTIVATION_SEQUENCE);
} catch (Throwable fatal) { } catch (Throwable fatal) {
LOGGER.errorf(fatal, "Errored while resetting local activation sequence %d for NodeID = %s: stopping broker", LOGGER.errorf(fatal, "Errored while resetting local activation sequence %d for NodeID = %s: stopping broker",
activationSequence, syncNodeId); activationSequence, syncNodeId);
@ -356,6 +379,10 @@ public final class ReplicationBackupActivation extends Activation implements Dis
LOGGER.error("Stopping broker because of wrong node ID communication from live: maybe a misbehaving live?"); LOGGER.error("Stopping broker because of wrong node ID communication from live: maybe a misbehaving live?");
asyncRestartServer(activeMQServer, false); asyncRestartServer(activeMQServer, false);
return null; return null;
case WrongActivationSequence:
LOGGER.error("Stopping broker because of wrong activation sequence communication from live: maybe a misbehaving live?");
asyncRestartServer(activeMQServer, false);
return null;
default: default:
throw new AssertionError("Unsupported failure " + failure); throw new AssertionError("Unsupported failure " + failure);
} }

View File

@ -41,7 +41,7 @@ final class ReplicationObserver implements ClusterTopologyListener, SessionFailu
private static final Logger LOGGER = Logger.getLogger(ReplicationObserver.class); private static final Logger LOGGER = Logger.getLogger(ReplicationObserver.class);
public enum ReplicationFailure { public enum ReplicationFailure {
VoluntaryFailOver, BackupNotInSync, NonVoluntaryFailover, RegistrationError, AlreadyReplicating, ClosedObserver, WrongNodeId; VoluntaryFailOver, BackupNotInSync, NonVoluntaryFailover, RegistrationError, AlreadyReplicating, ClosedObserver, WrongNodeId, WrongActivationSequence
} }
private final NodeManager nodeManager; private final NodeManager nodeManager;
@ -244,7 +244,7 @@ final class ReplicationObserver implements ClusterTopologyListener, SessionFailu
} }
@Override @Override
public void onRemoteBackupUpToDate() { public void onRemoteBackupUpToDate(String nodeId, long activationSequence) {
if (backupUpToDate || closed || replicationFailure.isDone()) { if (backupUpToDate || closed || replicationFailure.isDone()) {
return; return;
} }
@ -252,7 +252,27 @@ final class ReplicationObserver implements ClusterTopologyListener, SessionFailu
if (backupUpToDate || closed || replicationFailure.isDone()) { if (backupUpToDate || closed || replicationFailure.isDone()) {
return; return;
} }
assert liveID != null; if (!validateNodeId(nodeId)) {
stopForcedFailoverAfterDelay();
unlistenConnectionFailures();
replicationFailure.complete(ReplicationFailure.WrongNodeId);
return;
}
if (liveID == null) {
liveID = nodeId;
}
if (activationSequence <= 0) {
// NOTE:
// activationSequence == 0 is still illegal,
// because live has to increase the sequence before replicating
stopForcedFailoverAfterDelay();
unlistenConnectionFailures();
LOGGER.errorf("Illegal activation sequence %d from NodeID = %s", activationSequence, nodeId);
replicationFailure.complete(ReplicationFailure.WrongActivationSequence);
return;
}
nodeManager.setNodeID(nodeId);
nodeManager.setNodeActivationSequence(activationSequence);
backupManager.announceBackup(); backupManager.announceBackup();
backupUpToDate = true; backupUpToDate = true;
} }
@ -298,8 +318,9 @@ final class ReplicationObserver implements ClusterTopologyListener, SessionFailu
unlistenConnectionFailures(); unlistenConnectionFailures();
replicationFailure.complete(ReplicationFailure.WrongNodeId); replicationFailure.complete(ReplicationFailure.WrongNodeId);
} else if (liveID == null) { } else if (liveID == null) {
// just store it locally: if is stored on the node manager
// it will be persisted on broker's stop while data is not yet in sync
liveID = nodeId; liveID = nodeId;
nodeManager.setNodeID(nodeId);
} }
} }
} }

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server.impl; package org.apache.activemq.artemis.core.server.impl;
import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.GuardedBy;
import java.util.Objects;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -110,15 +111,11 @@ public class ReplicationPrimaryActivation extends LiveActivation implements Dist
try { try {
// we have a common nodeId that we can share and coordinate with between peers // we have a common nodeId that we can share and coordinate with between peers
if (policy.getCoordinationId() != null) { if (policy.getCoordinationId() != null) {
LOGGER.infof("Applying shared peer NodeID=%s to enable coordinated live activation", policy.getCoordinationId()); applyCoordinationId(policy.getCoordinationId(), activeMQServer);
// REVISIT: this is quite clunky, also in backup activation, we just need new nodeID persisted!
activeMQServer.resetNodeManager();
activeMQServer.getNodeManager().start();
activeMQServer.getNodeManager().setNodeID(policy.getCoordinationId());
activeMQServer.getNodeManager().stopBackup();
} }
final long nodeActivationSequence = activeMQServer.getNodeManager().readNodeActivationSequence(); final long sequence = activeMQServer.getNodeManager().getNodeActivationSequence();
final String nodeId = activeMQServer.getNodeManager().readNodeId().toString(); final long nodeActivationSequence = sequence == NodeManager.NULL_NODE_ACTIVATION_SEQUENCE ? 0 : sequence;
final String nodeId = activeMQServer.getNodeManager().getNodeId().toString();
DistributedLock liveLock; DistributedLock liveLock;
while (true) { while (true) {
distributedManager.start(); distributedManager.start();
@ -136,7 +133,7 @@ public class ReplicationPrimaryActivation extends LiveActivation implements Dist
return; return;
} }
ensureSequentialAccessToNodeData(activeMQServer, distributedManager, LOGGER); ensureSequentialAccessToNodeData(nodeId, nodeActivationSequence, activeMQServer, distributedManager, LOGGER);
activeMQServer.initialisePart1(false); activeMQServer.initialisePart1(false);
@ -166,6 +163,25 @@ public class ReplicationPrimaryActivation extends LiveActivation implements Dist
} }
} }
public static void applyCoordinationId(final String coordinationId,
final ActiveMQServerImpl activeMQServer) throws Exception {
Objects.requireNonNull(coordinationId);
if (!activeMQServer.getNodeManager().isStarted()) {
throw new IllegalStateException("NodeManager should be started");
}
final long activationSequence = activeMQServer.getNodeManager().getNodeActivationSequence();
LOGGER.infof("Applying shared peer NodeID=%s to enable coordinated live activation", coordinationId);
// REVISIT: this is quite clunky, also in backup activation, we just need new nodeID persisted!
activeMQServer.resetNodeManager();
final NodeManager nodeManager = activeMQServer.getNodeManager();
nodeManager.start();
nodeManager.setNodeID(coordinationId);
nodeManager.stopBackup();
// despite NM is restarted as "replicatedBackup" we need the last written activation sequence value in-memory
final long freshActivationSequence = nodeManager.readNodeActivationSequence();
assert freshActivationSequence == activationSequence;
}
@Override @Override
public ChannelHandler getActivationChannelHandler(final Channel channel, final Acceptor acceptorUsed) { public ChannelHandler getActivationChannelHandler(final Channel channel, final Acceptor acceptorUsed) {
if (stoppingServer.get()) { if (stoppingServer.get()) {
@ -366,8 +382,11 @@ public class ReplicationPrimaryActivation extends LiveActivation implements Dist
} }
// we increment only if we are staying alive // we increment only if we are staying alive
if (!stoppingServer.get() && STARTED.equals(activeMQServer.getState())) { if (!stoppingServer.get() && STARTED.equals(activeMQServer.getState())) {
final NodeManager nodeManager = activeMQServer.getNodeManager();
final String nodeId = nodeManager.getNodeId().toString();
final long nodeActivationSequence = nodeManager.getNodeActivationSequence();
try { try {
ensureSequentialAccessToNodeData(activeMQServer, distributedManager, LOGGER); ensureSequentialAccessToNodeData(nodeId, nodeActivationSequence, activeMQServer, distributedManager, LOGGER);
} catch (Throwable fatal) { } catch (Throwable fatal) {
LOGGER.errorf(fatal, "Unexpected exception: %s on attempted activation sequence increment; stopping server async", fatal.getLocalizedMessage()); LOGGER.errorf(fatal, "Unexpected exception: %s on attempted activation sequence increment; stopping server async", fatal.getLocalizedMessage());
asyncStopServer(); asyncStopServer();

View File

@ -465,7 +465,8 @@ public final class SharedNothingBackupActivation extends Activation implements R
} }
@Override @Override
public void onRemoteBackupUpToDate() { public void onRemoteBackupUpToDate(String nodeId, long ignoredActivationSequence) {
backupQuorum.liveIDSet(nodeId);
activeMQServer.getBackupManager().announceBackup(); activeMQServer.getBackupManager().announceBackup();
backupUpToDate = true; backupUpToDate = true;
backupSyncLatch.countDown(); backupSyncLatch.countDown();

View File

@ -58,13 +58,17 @@ public final class ActivationSequenceStateMachine {
* </ul><p> * </ul><p>
* <p> * <p>
* After successfully returning from this method ie not null return value, a broker should use * After successfully returning from this method ie not null return value, a broker should use
* {@link #ensureSequentialAccessToNodeData(ActiveMQServer, DistributedPrimitiveManager, Logger)} to complete * {@link #ensureSequentialAccessToNodeData} to complete
* the activation and guarantee the initial not-replicated ownership of data. * the activation and guarantee the initial not-replicated ownership of data.
*/ */
public static DistributedLock tryActivate(final String nodeId, public static DistributedLock tryActivate(final String nodeId,
final long nodeActivationSequence, final long nodeActivationSequence,
final DistributedPrimitiveManager distributedManager, final DistributedPrimitiveManager distributedManager,
final Logger logger) throws InterruptedException, ExecutionException, TimeoutException, UnavailableStateException { final Logger logger) throws InterruptedException, ExecutionException, TimeoutException, UnavailableStateException {
Objects.requireNonNull(nodeId);
if (nodeActivationSequence < 0) {
throw new IllegalArgumentException("nodeActivationSequence must be > 0");
}
final DistributedLock activationLock = distributedManager.getDistributedLock(nodeId); final DistributedLock activationLock = distributedManager.getDistributedLock(nodeId);
try (MutableLong coordinatedNodeSequence = distributedManager.getMutableLong(nodeId)) { try (MutableLong coordinatedNodeSequence = distributedManager.getMutableLong(nodeId)) {
while (true) { while (true) {
@ -231,19 +235,25 @@ public final class ActivationSequenceStateMachine {
} }
/** /**
* This is going to increment the coordinated activation sequence while holding the live lock, failing with some exception otherwise.<br> * This is going to increment the coordinated activation sequence and the local activation sequence
* (using {@link NodeManager#writeNodeActivationSequence}) while holding the live lock,
* failing with some exception otherwise.<br>
* <p> * <p>
* The acceptable states are {@link ValidationResult#InSync} and {@link ValidationResult#SelfRepair}, throwing some exception otherwise. * The acceptable states are {@link ValidationResult#InSync} and {@link ValidationResult#SelfRepair}, throwing some exception otherwise.
* <p> * <p>
* This must be used while holding a live lock to ensure not-exclusive ownership of data ie can be both used * This must be used while holding a live lock to ensure not-exclusive ownership of data ie can be both used
* while loosing connectivity with a replica or after successfully {@link #tryActivate(String, long, DistributedPrimitiveManager, Logger)}. * while loosing connectivity with a replica or after successfully {@link #tryActivate(String, long, DistributedPrimitiveManager, Logger)}.
*/ */
public static void ensureSequentialAccessToNodeData(ActiveMQServer activeMQServer, public static void ensureSequentialAccessToNodeData(final String lockAndLongId,
DistributedPrimitiveManager distributedPrimitiveManager, final long nodeActivationSequence,
final ActiveMQServer activeMQServer,
final DistributedPrimitiveManager distributedPrimitiveManager,
final Logger logger) throws ActiveMQException, InterruptedException, UnavailableStateException, ExecutionException, TimeoutException { final Logger logger) throws ActiveMQException, InterruptedException, UnavailableStateException, ExecutionException, TimeoutException {
final NodeManager nodeManager = activeMQServer.getNodeManager(); Objects.requireNonNull(lockAndLongId);
final String lockAndLongId = nodeManager.getNodeId().toString(); if (nodeActivationSequence < 0) {
throw new IllegalArgumentException("nodeActivationSequence must be >= 0");
}
final DistributedLock liveLock = distributedPrimitiveManager.getDistributedLock(lockAndLongId); final DistributedLock liveLock = distributedPrimitiveManager.getDistributedLock(lockAndLongId);
if (!liveLock.isHeldByCaller()) { if (!liveLock.isHeldByCaller()) {
final String message = String.format("Server [%s], live lock for NodeID = %s, not held, activation sequence cannot be safely changed", final String message = String.format("Server [%s], live lock for NodeID = %s, not held, activation sequence cannot be safely changed",
@ -251,7 +261,6 @@ public final class ActivationSequenceStateMachine {
logger.info(message); logger.info(message);
throw new UnavailableStateException(message); throw new UnavailableStateException(message);
} }
final long nodeActivationSequence = nodeManager.readNodeActivationSequence();
final MutableLong coordinatedNodeActivationSequence = distributedPrimitiveManager.getMutableLong(lockAndLongId); final MutableLong coordinatedNodeActivationSequence = distributedPrimitiveManager.getMutableLong(lockAndLongId);
final long currentCoordinatedActivationSequence = coordinatedNodeActivationSequence.get(); final long currentCoordinatedActivationSequence = coordinatedNodeActivationSequence.get();
final long nextActivationSequence; final long nextActivationSequence;
@ -275,6 +284,7 @@ public final class ActivationSequenceStateMachine {
} }
nextActivationSequence = nodeActivationSequence + 1; nextActivationSequence = nodeActivationSequence + 1;
} }
final NodeManager nodeManager = activeMQServer.getNodeManager();
// UN_REPLICATED STATE ENTER: auto-repair doesn't need to claim and write locally // UN_REPLICATED STATE ENTER: auto-repair doesn't need to claim and write locally
if (nodeActivationSequence != nextActivationSequence) { if (nodeActivationSequence != nextActivationSequence) {
// claim // claim

View File

@ -137,7 +137,9 @@ public abstract class PluggableQuorumSinglePairTest extends SmokeTestBase {
protected abstract boolean awaitAsyncSetupCompleted(long timeout, TimeUnit unit) throws InterruptedException; protected abstract boolean awaitAsyncSetupCompleted(long timeout, TimeUnit unit) throws InterruptedException;
protected abstract void stopMajority() throws Exception; protected abstract int[] stopMajority() throws Exception;
protected abstract void restart(int[] nodes) throws Exception;
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
@ -150,14 +152,33 @@ public abstract class PluggableQuorumSinglePairTest extends SmokeTestBase {
super.after(); super.after();
} }
@Test
public void testCanQueryEmptyBackup() throws Exception {
final int timeout = (int) TimeUnit.SECONDS.toMillis(30);
LOGGER.info("starting primary");
Process live = primary.startServer(this, timeout);
Assert.assertTrue(awaitAsyncSetupCompleted(timeout, TimeUnit.MILLISECONDS));
Wait.assertTrue(() -> !primary.isBackup().orElse(true), timeout);
LOGGER.info("killing primary");
ServerUtil.killServer(live, forceKill);
LOGGER.info("starting backup");
backup.startServer(this, 0);
Wait.assertTrue(() -> backup.isBackup().orElse(false), timeout);
LOGGER.info("Stopping majority of consensus nodes");
final int[] stopped = stopMajority();
LOGGER.info("Waiting until isolated");
Thread.sleep(2000);
LOGGER.info("Restarting majority of consensus nodes");
restart(stopped);
Wait.assertTrue(() -> backup.isBackup().orElse(false), timeout);
}
@Test @Test
public void testBackupFailoverAndPrimaryFailback() throws Exception { public void testBackupFailoverAndPrimaryFailback() throws Exception {
final int timeout = (int) TimeUnit.SECONDS.toMillis(30); final int timeout = (int) TimeUnit.SECONDS.toMillis(30);
LOGGER.info("starting primary"); LOGGER.info("starting primary");
Process primaryInstance = primary.startServer(this, timeout); Process primaryInstance = primary.startServer(this, timeout);
Assert.assertTrue(awaitAsyncSetupCompleted(timeout, TimeUnit.MILLISECONDS)); Assert.assertTrue(awaitAsyncSetupCompleted(timeout, TimeUnit.MILLISECONDS));
Wait.assertTrue(() -> !primary.isBackup().orElse(true), timeout);
// primary UN REPLICATED // primary UN REPLICATED
Assert.assertEquals(1L, primary.getActivationSequence().get().longValue()); Assert.assertEquals(1L, primary.getActivationSequence().get().longValue());

View File

@ -29,6 +29,8 @@ import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static org.apache.activemq.artemis.tests.smoke.utils.Jmx.containsExactNodeIds; import static org.apache.activemq.artemis.tests.smoke.utils.Jmx.containsExactNodeIds;
import static org.apache.activemq.artemis.tests.smoke.utils.Jmx.decodeNetworkTopologyJson;
import static org.apache.activemq.artemis.tests.smoke.utils.Jmx.liveOf;
import static org.apache.activemq.artemis.tests.smoke.utils.Jmx.validateNetworkTopology; import static org.apache.activemq.artemis.tests.smoke.utils.Jmx.validateNetworkTopology;
import static org.apache.activemq.artemis.tests.smoke.utils.Jmx.withBackup; import static org.apache.activemq.artemis.tests.smoke.utils.Jmx.withBackup;
import static org.apache.activemq.artemis.tests.smoke.utils.Jmx.withLive; import static org.apache.activemq.artemis.tests.smoke.utils.Jmx.withLive;
@ -56,6 +58,44 @@ public class ZookeeperPluggableQuorumPeerTest extends ZookeeperPluggableQuorumSi
Wait.waitFor(this::ensembleHasLeader); Wait.waitFor(this::ensembleHasLeader);
} }
@Test
public void testBackupCannotForgetPeerIdOnLostQuorum() throws Exception {
// see FileLockTest::testCorrelationId to get more info why this is not peer-journal-001 as in broker.xml
final String coordinationId = "peer.journal.001";
final int timeout = (int) TimeUnit.SECONDS.toMillis(30);
LOGGER.info("starting peer a");
final Process live = primary.startServer(this, 0);
LOGGER.info("waiting peer a to increase coordinated activation sequence to 1");
Wait.assertEquals(1L, () -> primary.getActivationSequence().orElse(Long.MAX_VALUE).longValue(), timeout);
Assert.assertEquals(coordinationId, primary.getNodeID().get());
Wait.waitFor(() -> primary.listNetworkTopology().isPresent(), timeout);
final String urlPeerA = liveOf(coordinationId, decodeNetworkTopologyJson(primary.listNetworkTopology().get()));
Assert.assertNotNull(urlPeerA);
LOGGER.infof("peer a acceptor: %s", urlPeerA);
LOGGER.info("killing peer a");
ServerUtil.killServer(live, forceKill);
LOGGER.info("starting peer b");
Process emptyBackup = backup.startServer(this, 0);
LOGGER.info("waiting until peer b act as empty backup");
Wait.assertTrue(() -> backup.isBackup().orElse(false), timeout);
LOGGER.info("Stop majority of quorum nodes");
final int[] majority = stopMajority();
LOGGER.info("Wait peer b to deactivate");
Thread.sleep(2000);
LOGGER.info("Restart majority of quorum nodes");
restart(majority);
LOGGER.info("Restart peer a as legit last live");
final Process restartedLive = primary.startServer(this, 0);
LOGGER.info("waiting peer a to increase coordinated activation sequence to 2");
Wait.assertEquals(2L, () -> primary.getActivationSequence().orElse(Long.MAX_VALUE).longValue(), timeout);
Assert.assertEquals(coordinationId, primary.getNodeID().get());
LOGGER.info("waiting peer b to be a replica");
Wait.waitFor(() -> backup.isReplicaSync().get());
Wait.assertEquals(2L, () -> backup.getActivationSequence().get().longValue());
final String expectedUrlPeerA = liveOf(coordinationId, decodeNetworkTopologyJson(primary.listNetworkTopology().get()));
Assert.assertEquals(urlPeerA, expectedUrlPeerA);
}
@Test @Test
public void testMultiPrimary_Peer() throws Exception { public void testMultiPrimary_Peer() throws Exception {

View File

@ -87,11 +87,22 @@ public class ZookeeperPluggableQuorumSinglePairTest extends PluggableQuorumSingl
} }
@Override @Override
protected void stopMajority() throws Exception { protected int[] stopMajority() throws Exception {
List<TestingZooKeeperServer> followers = testingServer.getServers(); List<TestingZooKeeperServer> followers = testingServer.getServers();
final int quorum = (nodes / 2) + 1; final int quorum = (nodes / 2) + 1;
final int[] stopped = new int[quorum];
for (int i = 0; i < quorum; i++) { for (int i = 0; i < quorum; i++) {
followers.get(i).stop(); followers.get(i).stop();
stopped[i] = i;
}
return stopped;
}
@Override
protected void restart(int[] nodes) throws Exception {
List<TestingZooKeeperServer> servers = testingServer.getServers();
for (int nodeIndex : nodes) {
servers.get(nodeIndex).restart();
} }
} }
} }

View File

@ -17,13 +17,24 @@
package org.apache.activemq.artemis.tests.unit.core.server.impl; package org.apache.activemq.artemis.tests.unit.core.server.impl;
import java.io.File; import java.io.File;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Set;
import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager; import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static java.util.stream.Collectors.toSet;
public class FileLockTest extends ActiveMQTestBase { public class FileLockTest extends ActiveMQTestBase {
@Override @Override
@ -53,6 +64,91 @@ public class FileLockTest extends ActiveMQTestBase {
} }
} }
@Test
public void testNodeManagerStartPersistence() throws Exception {
final File managerDirectory = getTestDirfile();
FileLockNodeManager manager = new FileLockNodeManager(managerDirectory, false);
manager.start();
Set<File> files = Arrays.stream(managerDirectory.listFiles(pathname -> pathname.isFile())).collect(toSet());
final Set<String> expectedFileNames = Arrays.stream(new String[]{FileLockNodeManager.SERVER_LOCK_NAME, "serverlock.1", "serverlock.2"})
.collect(toSet());
Assert.assertEquals(expectedFileNames, files.stream().map(File::getName).collect(toSet()));
final File nodeIdFile = files.stream().filter(file -> file.getName().equals(FileLockNodeManager.SERVER_LOCK_NAME)).findFirst().get();
final byte[] encodedNodeId = manager.getUUID().asBytes();
try (FileChannel serverLock = FileChannel.open(nodeIdFile.toPath(), StandardOpenOption.READ)) {
Assert.assertEquals(16, encodedNodeId.length);
Assert.assertEquals(19, serverLock.size());
final ByteBuffer readNodeId = ByteBuffer.allocate(16);
serverLock.read(readNodeId, 3);
readNodeId.flip();
Assert.assertArrayEquals(encodedNodeId, readNodeId.array());
}
Assert.assertEquals(NodeManager.NULL_NODE_ACTIVATION_SEQUENCE, manager.getNodeActivationSequence());
Assert.assertEquals(NodeManager.NULL_NODE_ACTIVATION_SEQUENCE, manager.readNodeActivationSequence());
Assert.assertEquals(3, managerDirectory.listFiles(pathname -> pathname.isFile()).length);
manager.stop();
}
@Test
public void testReplicateBackupNodeManagerStartPersistence() throws Exception {
final File managerDirectory = getTestDirfile();
FileLockNodeManager manager = new FileLockNodeManager(managerDirectory, true);
manager.start();
Set<File> files = Arrays.stream(managerDirectory.listFiles(pathname -> pathname.isFile())).collect(toSet());
Assert.assertTrue(files.isEmpty());
Assert.assertNull(manager.getNodeId());
Assert.assertNull(manager.getUUID());
Assert.assertEquals(NodeManager.NULL_NODE_ACTIVATION_SEQUENCE, manager.getNodeActivationSequence());
Assert.assertEquals(NodeManager.NULL_NODE_ACTIVATION_SEQUENCE, manager.readNodeActivationSequence());
Assert.assertEquals(0, managerDirectory.listFiles(pathname -> pathname.isFile()).length);
manager.stop();
}
@Test
public void testReplicatedStopBackupPersistence() throws Exception {
final FileLockNodeManager manager = new FileLockNodeManager(getTestDirfile(), false);
manager.start();
Assert.assertNotNull(manager.getUUID());
manager.writeNodeActivationSequence(1);
final long nodeActivationSequence = manager.getNodeActivationSequence();
Assert.assertEquals(1, nodeActivationSequence);
manager.stop();
// replicated manager read activation sequence (if any) but ignore NodeId
final FileLockNodeManager replicatedManager = new FileLockNodeManager(getTestDirfile(), true);
replicatedManager.start();
Assert.assertNull(replicatedManager.getUUID());
Assert.assertEquals(1, replicatedManager.getNodeActivationSequence());
UUID storedNodeId = UUIDGenerator.getInstance().generateUUID();
replicatedManager.setNodeID(storedNodeId.toString());
replicatedManager.setNodeActivationSequence(2);
replicatedManager.stopBackup();
replicatedManager.setNodeID(UUIDGenerator.getInstance().generateStringUUID());
replicatedManager.setNodeActivationSequence(3);
replicatedManager.stop();
// start read whatever has been persisted by stopBackup
manager.start();
Assert.assertEquals(storedNodeId, manager.getUUID());
Assert.assertEquals(2, manager.getNodeActivationSequence());
manager.stop();
}
@Test
public void testWriteNodeActivationSequence() throws Exception {
final FileLockNodeManager manager = new FileLockNodeManager(getTestDirfile(), false);
manager.start();
UUID id = manager.getUUID();
Assert.assertNotNull(manager.getUUID());
manager.writeNodeActivationSequence(1);
final long nodeActivationSequence = manager.getNodeActivationSequence();
Assert.assertEquals(1, nodeActivationSequence);
manager.stop();
final FileLockNodeManager otherManager = new FileLockNodeManager(getTestDirfile(), false);
otherManager.start();
Assert.assertEquals(id, otherManager.getUUID());
Assert.assertEquals(1, otherManager.getNodeActivationSequence());
otherManager.stop();
}
@Test @Test
public void testNIOLock() throws Exception { public void testNIOLock() throws Exception {
doTestLock(new FileLockNodeManager(getTestDirfile(), false), new FileLockNodeManager(getTestDirfile(), false)); doTestLock(new FileLockNodeManager(getTestDirfile(), false), new FileLockNodeManager(getTestDirfile(), false));