Remove node from cluster when node locks broken (#61400)
In #52680 we introduced a mechanism that will allow nodes to remove themselves from the cluster if they locally determine themselves to be unhealthy. The only check today is that their data paths are all empirically writeable. This commit extends this check to consider a failure of `NodeEnvironment#assertEnvIsLocked()` to be an indication of unhealthiness. Closes #58373
This commit is contained in:
parent
aa0dc56412
commit
bc6bea5924
|
@ -58,6 +58,7 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH
|
|||
private static final Logger logger = LogManager.getLogger(FsHealthService.class);
|
||||
private final ThreadPool threadPool;
|
||||
private volatile boolean enabled;
|
||||
private volatile boolean brokenLock;
|
||||
private final TimeValue refreshInterval;
|
||||
private volatile TimeValue slowPathLoggingThreshold;
|
||||
private final NodeEnvironment nodeEnv;
|
||||
|
@ -117,6 +118,8 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH
|
|||
Set<Path> unhealthyPaths = this.unhealthyPaths;
|
||||
if (enabled == false) {
|
||||
statusInfo = new StatusInfo(HEALTHY, "health check disabled");
|
||||
} else if (brokenLock) {
|
||||
statusInfo = new StatusInfo(UNHEALTHY, "health check failed due to broken node lock");
|
||||
} else if (unhealthyPaths == null) {
|
||||
statusInfo = new StatusInfo(HEALTHY, "health check passed");
|
||||
} else {
|
||||
|
@ -150,7 +153,16 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH
|
|||
|
||||
private void monitorFSHealth() {
|
||||
Set<Path> currentUnhealthyPaths = null;
|
||||
for (Path path : nodeEnv.nodeDataPaths()) {
|
||||
Path[] paths = null;
|
||||
try {
|
||||
paths = nodeEnv.nodeDataPaths();
|
||||
} catch (IllegalStateException e) {
|
||||
logger.error("health check failed", e);
|
||||
brokenLock = true;
|
||||
return;
|
||||
}
|
||||
|
||||
for (Path path : paths) {
|
||||
long executionStartTime = currentTimeMillisSupplier.getAsLong();
|
||||
try {
|
||||
if (Files.exists(path)) {
|
||||
|
@ -176,6 +188,7 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH
|
|||
}
|
||||
}
|
||||
unhealthyPaths = currentUnhealthyPaths;
|
||||
brokenLock = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,8 +42,8 @@ import java.io.IOException;
|
|||
import java.io.OutputStream;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.FileSystem;
|
||||
import java.nio.file.OpenOption;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.OpenOption;
|
||||
import java.nio.file.attribute.FileAttribute;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -231,6 +231,36 @@ public class FsHealthServiceTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testFailsHealthOnUnexpectedLockFileSize() throws IOException {
|
||||
FileSystem fileSystem = PathUtils.getDefaultFileSystem();
|
||||
final Settings settings = Settings.EMPTY;
|
||||
TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings);
|
||||
FileSystemUnexpectedLockFileSizeProvider unexpectedLockFileSizeFileSystemProvider = new FileSystemUnexpectedLockFileSizeProvider(
|
||||
fileSystem, 1, testThreadPool);
|
||||
fileSystem = unexpectedLockFileSizeFileSystemProvider.getFileSystem(null);
|
||||
PathUtilsForTesting.installMock(fileSystem);
|
||||
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
try (NodeEnvironment env = newNodeEnvironment()) {
|
||||
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
|
||||
fsHealthService.new FsHealthMonitor().run();
|
||||
assertEquals(HEALTHY, fsHealthService.getHealth().getStatus());
|
||||
assertEquals("health check passed", fsHealthService.getHealth().getInfo());
|
||||
|
||||
// enabling unexpected file size injection
|
||||
unexpectedLockFileSizeFileSystemProvider.injectUnexpectedFileSize.set(true);
|
||||
|
||||
fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
|
||||
fsHealthService.new FsHealthMonitor().run();
|
||||
assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus());
|
||||
assertThat(fsHealthService.getHealth().getInfo(), is("health check failed due to broken node lock"));
|
||||
assertEquals(1, unexpectedLockFileSizeFileSystemProvider.getInjectedPathCount());
|
||||
} finally {
|
||||
unexpectedLockFileSizeFileSystemProvider.injectUnexpectedFileSize.set(false);
|
||||
PathUtilsForTesting.teardown();
|
||||
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
private static class FileSystemIOExceptionProvider extends FilterFileSystemProvider {
|
||||
|
||||
AtomicBoolean injectIOException = new AtomicBoolean();
|
||||
|
@ -254,7 +284,8 @@ public class FsHealthServiceTests extends ESTestCase {
|
|||
public OutputStream newOutputStream(Path path, OpenOption... options) throws IOException {
|
||||
if (injectIOException.get()){
|
||||
assert pathPrefix != null : "must set pathPrefix before starting disruptions";
|
||||
if (path.toString().startsWith(pathPrefix) && path.toString().endsWith(".es_temp_file")) {
|
||||
if (path.toString().startsWith(pathPrefix) && path.toString().
|
||||
endsWith(FsHealthService.FsHealthMonitor.TEMP_FILE_NAME)) {
|
||||
injectedPaths.incrementAndGet();
|
||||
throw new IOException("fake IOException");
|
||||
}
|
||||
|
@ -289,7 +320,8 @@ public class FsHealthServiceTests extends ESTestCase {
|
|||
public void force(boolean metaData) throws IOException {
|
||||
if (injectIOException.get()) {
|
||||
assert pathPrefix != null : "must set pathPrefix before starting disruptions";
|
||||
if (path.toString().startsWith(pathPrefix) && path.toString().endsWith(".es_temp_file")) {
|
||||
if (path.toString().startsWith(pathPrefix) && path.toString().
|
||||
endsWith(FsHealthService.FsHealthMonitor.TEMP_FILE_NAME)) {
|
||||
injectedPaths.incrementAndGet();
|
||||
throw new IOException("fake IOException");
|
||||
}
|
||||
|
@ -341,4 +373,39 @@ public class FsHealthServiceTests extends ESTestCase {
|
|||
};
|
||||
}
|
||||
}
|
||||
|
||||
private static class FileSystemUnexpectedLockFileSizeProvider extends FilterFileSystemProvider {
|
||||
|
||||
AtomicBoolean injectUnexpectedFileSize = new AtomicBoolean();
|
||||
AtomicInteger injectedPaths = new AtomicInteger();
|
||||
|
||||
private final long size;
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
FileSystemUnexpectedLockFileSizeProvider(FileSystem inner, long size, ThreadPool threadPool) {
|
||||
super("disrupt_fs_health://", inner);
|
||||
this.size = size;
|
||||
this.threadPool = threadPool;
|
||||
}
|
||||
|
||||
public int getInjectedPathCount(){
|
||||
return injectedPaths.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException {
|
||||
return new FilterFileChannel(super.newFileChannel(path, options, attrs)) {
|
||||
@Override
|
||||
public long size() throws IOException {
|
||||
if (injectUnexpectedFileSize.get()) {
|
||||
if (path.getFileName().toString().equals(NodeEnvironment.NODE_LOCK_FILENAME)) {
|
||||
injectedPaths.incrementAndGet();
|
||||
return size;
|
||||
}
|
||||
}
|
||||
return super.size();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue