ARTEMIS-4143 improve mitigation against split-brain with shared-storage
Configurations employing shared-storage with NFS are susceptible to split-brain in certain scenarios. For example: 1) Primary loses network connection to NFS. 2) Backup activates. 3) Primary reconnects to NFS. 4) Split-brain. In reality this situation is pretty unlikely due to the timing involved, but the possibility still exists. Currently the file lock held by the primary broker on the NFS share is essentially worthless in this situation. This commit adds logic by which the timestamp of the lock file is updated during activation and then routinely checked during runtime to ensure consistency. This effectively mitigates split-brain in this situation (and likely others). Here's how it works now. 1) Primary loses network connection to NFS. 2) Backup activates. 3) Primary reconnects to NFS. 4) Primary detects that the lock file's timestamp has been updated and shuts itself down. When the primary shuts down in step #4 the Topology on the backup can be damaged. Protections were added for this via ARTEMIS-2868 but only for the replicated use-case. This commit applies the protection for removeMember() so that the Topology remains intact. There are no tests for these changes as I cannot determine how to properly simulate this use-case. However, there have never been robust, automated tests for these kinds of NFS use-cases so this is not a departure from the norm.
This commit is contained in:
parent
d1b3610f68
commit
8f30347b18
|
@ -28,7 +28,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
|||
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics;
|
||||
|
||||
public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyListener {
|
||||
public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyListener {
|
||||
|
||||
SimpleString getName();
|
||||
|
||||
|
|
|
@ -555,7 +555,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
|||
*/
|
||||
@Override
|
||||
public boolean removeMember(final long uniqueEventID, final String nodeId) {
|
||||
if (splitBrainDetection && nodeId.equals(nodeManager.getNodeId().toString())) {
|
||||
if (nodeId.equals(nodeManager.getNodeId().toString())) {
|
||||
ActiveMQServerLogger.LOGGER.possibleSplitBrain(nodeId);
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -42,6 +42,7 @@ public abstract class FileBasedNodeManager extends NodeManager {
|
|||
public static final String SERVER_ACTIVATION_SEQUENCE_NAME = "server.activation.sequence";
|
||||
private static final String ACCESS_MODE = "rw";
|
||||
private final File directory;
|
||||
protected File serverLockFile;
|
||||
private final Path activationSequencePath;
|
||||
protected FileChannel channel;
|
||||
protected FileChannel activationSequenceChannel;
|
||||
|
@ -134,7 +135,7 @@ public abstract class FileBasedNodeManager extends NodeManager {
|
|||
* </ol>
|
||||
*/
|
||||
protected synchronized void setUpServerLockFile() throws IOException {
|
||||
File serverLockFile = newFile(SERVER_LOCK_NAME);
|
||||
serverLockFile = newFile(SERVER_LOCK_NAME);
|
||||
|
||||
boolean fileCreated = false;
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.RandomAccessFile;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.channels.FileLock;
|
||||
import java.util.Date;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -45,8 +46,6 @@ public class FileLockNodeManager extends FileBasedNodeManager {
|
|||
|
||||
private static final int BACKUP_LOCK_POS = 2;
|
||||
|
||||
private static final long LOCK_LENGTH = 1;
|
||||
|
||||
private static final byte LIVE = 'L';
|
||||
|
||||
private static final byte FAILINGBACK = 'F';
|
||||
|
@ -65,6 +64,8 @@ public class FileLockNodeManager extends FileBasedNodeManager {
|
|||
|
||||
private final FileChannel[] lockChannels = new FileChannel[3];
|
||||
|
||||
private long serverLockLastModified;
|
||||
|
||||
private final long lockAcquisitionTimeoutNanos;
|
||||
|
||||
protected boolean interrupted = false;
|
||||
|
@ -110,6 +111,7 @@ public class FileLockNodeManager extends FileBasedNodeManager {
|
|||
super.setUpServerLockFile();
|
||||
|
||||
lockChannels[0] = channel;
|
||||
serverLockLastModified = serverLockFile.lastModified();
|
||||
|
||||
for (int i = 1; i < 3; i++) {
|
||||
if (lockChannels[i] != null && lockChannels[i].isOpen()) {
|
||||
|
@ -189,33 +191,32 @@ public class FileLockNodeManager extends FileBasedNodeManager {
|
|||
logger.debug("awaiting live node...");
|
||||
do {
|
||||
byte state = getState();
|
||||
while (state == FileLockNodeManager.NOT_STARTED || state == FIRST_TIME_START) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("awaiting live node startup state='{}'", state);
|
||||
}
|
||||
while (state == NOT_STARTED || state == FIRST_TIME_START) {
|
||||
logger.debug("awaiting live node startup state = '{}'", (char) state);
|
||||
|
||||
Thread.sleep(2000);
|
||||
state = getState();
|
||||
}
|
||||
|
||||
liveLock = lock(FileLockNodeManager.LIVE_LOCK_POS);
|
||||
liveLock = lock(LIVE_LOCK_POS);
|
||||
if (interrupted) {
|
||||
interrupted = false;
|
||||
throw new InterruptedException("Lock was interrupted");
|
||||
}
|
||||
state = getState();
|
||||
if (state == FileLockNodeManager.PAUSED) {
|
||||
if (state == PAUSED) {
|
||||
liveLock.release();
|
||||
logger.debug("awaiting live node restarting");
|
||||
Thread.sleep(2000);
|
||||
} else if (state == FileLockNodeManager.FAILINGBACK) {
|
||||
} else if (state == FAILINGBACK) {
|
||||
liveLock.release();
|
||||
logger.debug("awaiting live node failing back");
|
||||
Thread.sleep(2000);
|
||||
} else if (state == FileLockNodeManager.LIVE) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("acquired live node lock state = {}", (char) state);
|
||||
}
|
||||
} else if (state == LIVE) {
|
||||
// if the backup acquires the file lock and the state is 'L' that means the primary died
|
||||
logger.debug("acquired live node lock state = {}", (char) state);
|
||||
serverLockFile.setLastModified(System.currentTimeMillis());
|
||||
logger.debug("touched {}; new time: {}", serverLockFile.getAbsoluteFile(), serverLockFile.lastModified());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -305,7 +306,7 @@ public class FileLockNodeManager extends FileBasedNodeManager {
|
|||
}
|
||||
|
||||
private void setLive() throws NodeManagerException {
|
||||
writeFileLockStatus(FileLockNodeManager.LIVE);
|
||||
writeFileLockStatus(LIVE);
|
||||
}
|
||||
|
||||
private void setFailingBack() throws NodeManagerException {
|
||||
|
@ -318,16 +319,14 @@ public class FileLockNodeManager extends FileBasedNodeManager {
|
|||
|
||||
/**
|
||||
* @param status
|
||||
* @throws ActiveMQLockAcquisitionTimeoutException,IOException
|
||||
* @throws NodeManagerException
|
||||
*/
|
||||
private void writeFileLockStatus(byte status) throws NodeManagerException {
|
||||
if (replicatedBackup && channel == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("writing status: {}", status);
|
||||
}
|
||||
logger.debug("writing status: {}", (char) status);
|
||||
ByteBuffer bb = ByteBuffer.allocateDirect(1);
|
||||
bb.put(status);
|
||||
bb.position(0);
|
||||
|
@ -345,6 +344,8 @@ public class FileLockNodeManager extends FileBasedNodeManager {
|
|||
lock.release();
|
||||
}
|
||||
}
|
||||
serverLockLastModified = serverLockFile.lastModified();
|
||||
logger.debug("Modified {} at {}", serverLockFile.getName(), serverLockLastModified);
|
||||
} catch (IOException | ActiveMQLockAcquisitionTimeoutException e) {
|
||||
throw new NodeManagerException(e);
|
||||
}
|
||||
|
@ -371,9 +372,7 @@ public class FileLockNodeManager extends FileBasedNodeManager {
|
|||
}
|
||||
}
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("state: {}", result);
|
||||
}
|
||||
logger.debug("state: {}", (char) result);
|
||||
return result;
|
||||
} catch (IOException | ActiveMQLockAcquisitionTimeoutException e) {
|
||||
throw new NodeManagerException(e);
|
||||
|
@ -400,17 +399,13 @@ public class FileLockNodeManager extends FileBasedNodeManager {
|
|||
|
||||
protected FileLock tryLock(final int lockPos) throws IOException {
|
||||
try {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("trying to lock position: {}", lockPos);
|
||||
}
|
||||
logger.debug("trying to lock position: {}", lockPos);
|
||||
|
||||
FileLock lock = lockChannels[lockPos].tryLock();
|
||||
if (logger.isDebugEnabled()) {
|
||||
if (lock != null) {
|
||||
logger.debug("locked position: {}", lockPos);
|
||||
} else {
|
||||
logger.debug("failed to lock position: {}", lockPos);
|
||||
}
|
||||
if (lock != null) {
|
||||
logger.debug("locked position: {}", lockPos);
|
||||
} else {
|
||||
logger.debug("failed to lock position: {}", lockPos);
|
||||
}
|
||||
|
||||
return lock;
|
||||
|
@ -429,16 +424,19 @@ public class FileLockNodeManager extends FileBasedNodeManager {
|
|||
FileLock lock = tryLock(lockPosition);
|
||||
isRecurringFailure = false;
|
||||
|
||||
logger.debug("lock: {}", lock);
|
||||
|
||||
// even if the lock is valid it may have taken too long to acquire
|
||||
if (this.lockAcquisitionTimeoutNanos != -1 && (System.nanoTime() - start) > this.lockAcquisitionTimeoutNanos) {
|
||||
throw new ActiveMQLockAcquisitionTimeoutException("Timed out waiting for lock. Waited for " + TimeUnit.NANOSECONDS.toSeconds(lockAcquisitionTimeoutNanos));
|
||||
}
|
||||
|
||||
if (lock == null) {
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
} catch (InterruptedException e) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (this.lockAcquisitionTimeoutNanos != -1 && (System.nanoTime() - start) > this.lockAcquisitionTimeoutNanos) {
|
||||
throw new ActiveMQLockAcquisitionTimeoutException("timed out waiting for lock");
|
||||
}
|
||||
} else {
|
||||
return lock;
|
||||
}
|
||||
|
@ -456,7 +454,7 @@ public class FileLockNodeManager extends FileBasedNodeManager {
|
|||
if (this.lockAcquisitionTimeoutNanos != -1) {
|
||||
final long remainingTime = this.lockAcquisitionTimeoutNanos - (System.nanoTime() - start);
|
||||
if (remainingTime <= 0) {
|
||||
throw new ActiveMQLockAcquisitionTimeoutException("timed out waiting for lock");
|
||||
throw new ActiveMQLockAcquisitionTimeoutException("Timed out waiting for lock. Waited for " + TimeUnit.NANOSECONDS.toSeconds(lockAcquisitionTimeoutNanos));
|
||||
}
|
||||
waitTime = Math.min(waitTime, remainingTime);
|
||||
}
|
||||
|
@ -529,19 +527,23 @@ public class FileLockNodeManager extends FileBasedNodeManager {
|
|||
}
|
||||
lostLock = isLiveLockLost();
|
||||
if (!lostLock) {
|
||||
logger.debug("Server still has the lock, double check status is live");
|
||||
// Java always thinks the lock is still valid even when there is no filesystem
|
||||
// so we do another check
|
||||
/*
|
||||
* Java always thinks the lock is still valid even when there is no filesystem
|
||||
* so we perform additional checks...
|
||||
*/
|
||||
|
||||
// Should be able to retrieve the status unless something is wrong
|
||||
// When EFS is gone, this locks. Which can be solved but is a lot of threading
|
||||
// work where we need to
|
||||
// manage the timeout ourselves and interrupt the thread used to claim the lock.
|
||||
/*
|
||||
* We should be able to retrieve the status unless something is wrong. When EFS is
|
||||
* gone, this locks. Which can be solved but is a lot of threading work where we
|
||||
* need to manage the timeout ourselves and interrupt the thread used to claim the
|
||||
* lock.
|
||||
*/
|
||||
logger.debug("Lock appears to be valid; double check by reading status");
|
||||
byte state = getState();
|
||||
if (state == LIVE) {
|
||||
logger.debug("Status is set to live");
|
||||
} else {
|
||||
logger.debug("Status is not live");
|
||||
|
||||
logger.debug("Lock appears to be valid; triple check by comparing timestamp");
|
||||
if (hasBeenModified(state)) {
|
||||
lostLock = true;
|
||||
}
|
||||
}
|
||||
} catch (Exception exception) {
|
||||
|
@ -554,9 +556,26 @@ public class FileLockNodeManager extends FileBasedNodeManager {
|
|||
logger.warn("Lost the lock according to the monitor, notifying listeners");
|
||||
notifyLostLock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
private boolean hasBeenModified(byte state) {
|
||||
boolean modified = false;
|
||||
|
||||
// Create a new instance of the File object so we can get the most up-to-date information on the file.
|
||||
File freshServerLockFile = new File(serverLockFile.getAbsolutePath());
|
||||
|
||||
if (freshServerLockFile.exists()) {
|
||||
// the other broker competing for the lock may modify the state as 'F' when it starts so ensure the state is 'L' before returning true
|
||||
if (freshServerLockFile.lastModified() > serverLockLastModified && state == LIVE) {
|
||||
logger.debug("Lock file {} originally locked at {} was modified at {}", serverLockFile.getAbsolutePath(), new Date(serverLockLastModified), new Date(freshServerLockFile.lastModified()));
|
||||
modified = true;
|
||||
}
|
||||
} else {
|
||||
logger.debug("Lock file {} does not exist", serverLockFile.getAbsolutePath());
|
||||
modified = true;
|
||||
}
|
||||
|
||||
return modified;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -316,7 +316,7 @@
|
|||
<scope>test</scope>
|
||||
<!-- License: EPL 1.0 -->
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.easymock</groupId>
|
||||
<artifactId>easymock</artifactId>
|
||||
|
|
|
@ -96,6 +96,6 @@ public class FileLockTimeoutTest extends ActiveMQTestBase {
|
|||
service.shutdown();
|
||||
|
||||
assertTrue("Expected to find AMQ224000", AssertionLoggerHandler.findText("AMQ224000"));
|
||||
assertTrue("Expected to find \"timed out waiting for lock\"", AssertionLoggerHandler.findText("timed out waiting for lock"));
|
||||
assertTrue("Expected to find \"Timed out waiting for lock\"", AssertionLoggerHandler.findText("Timed out waiting for lock"));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue