YARN-1781. Modified NodeManagers to allow admins to specify max disk utilization for local disks so as to be able to offline full disks. Contributed by Varun Vasudev.

svn merge --ignore-ancestry -c 1575463 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1575464 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-03-08 00:52:32 +00:00
parent fd3e554ff0
commit 2fbec50fed
6 changed files with 301 additions and 24 deletions

View File

@ -256,6 +256,10 @@ Release 2.4.0 - UNRELEASED
YARN-1525. Web UI should redirect to active RM when HA is enabled. (Cindy Li YARN-1525. Web UI should redirect to active RM when HA is enabled. (Cindy Li
via kasha) via kasha)
YARN-1781. Modified NodeManagers to allow admins to specify max disk
utilization for local disks so as to be able to offline full disks. (Varun
Vasudev via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -718,15 +718,18 @@ public class YarnConfiguration extends Configuration {
public static final String NM_CONTAINER_MON_PROCESS_TREE = public static final String NM_CONTAINER_MON_PROCESS_TREE =
NM_PREFIX + "container-monitor.process-tree.class"; NM_PREFIX + "container-monitor.process-tree.class";
/** Prefix for all node manager disk health checker configs. */
private static final String NM_DISK_HEALTH_CHECK_PREFIX =
"yarn.nodemanager.disk-health-checker.";
/** /**
* Enable/Disable disks' health checker. Default is true. * Enable/Disable disks' health checker. Default is true. An expert level
* An expert level configuration property. * configuration property.
*/ */
public static final String NM_DISK_HEALTH_CHECK_ENABLE = public static final String NM_DISK_HEALTH_CHECK_ENABLE =
NM_PREFIX + "disk-health-checker.enable"; NM_DISK_HEALTH_CHECK_PREFIX + "enable";
/** Frequency of running disks' health checker. */ /** Frequency of running disks' health checker. */
public static final String NM_DISK_HEALTH_CHECK_INTERVAL_MS = public static final String NM_DISK_HEALTH_CHECK_INTERVAL_MS =
NM_PREFIX + "disk-health-checker.interval-ms"; NM_DISK_HEALTH_CHECK_PREFIX + "interval-ms";
/** By default, disks' health is checked every 2 minutes. */ /** By default, disks' health is checked every 2 minutes. */
public static final long DEFAULT_NM_DISK_HEALTH_CHECK_INTERVAL_MS = public static final long DEFAULT_NM_DISK_HEALTH_CHECK_INTERVAL_MS =
2 * 60 * 1000; 2 * 60 * 1000;
@ -736,13 +739,37 @@ public class YarnConfiguration extends Configuration {
* to launch new containers. This applies to nm-local-dirs and nm-log-dirs. * to launch new containers. This applies to nm-local-dirs and nm-log-dirs.
*/ */
public static final String NM_MIN_HEALTHY_DISKS_FRACTION = public static final String NM_MIN_HEALTHY_DISKS_FRACTION =
NM_PREFIX + "disk-health-checker.min-healthy-disks"; NM_DISK_HEALTH_CHECK_PREFIX + "min-healthy-disks";
/** /**
* By default, at least 25% of disks are to be healthy to say that the node * By default, at least 25% of disks are to be healthy to say that the node is
* is healthy in terms of disks. * healthy in terms of disks.
*/ */
public static final float DEFAULT_NM_MIN_HEALTHY_DISKS_FRACTION public static final float DEFAULT_NM_MIN_HEALTHY_DISKS_FRACTION = 0.25F;
= 0.25F;
/**
* The maximum percentage of disk space that can be used after which a disk is
* marked as offline. Values can range from 0.0 to 100.0. If the value is
* greater than or equal to 100, NM will check for full disk. This applies to
* nm-local-dirs and nm-log-dirs.
*/
public static final String NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE =
NM_DISK_HEALTH_CHECK_PREFIX + "max-disk-utilization-per-disk-percentage";
/**
* By default, 100% of the disk can be used before it is marked as offline.
*/
public static final float DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE =
100.0F;
/**
* The minimum space that must be available on a local dir for it to be used.
* This applies to nm-local-dirs and nm-log-dirs.
*/
public static final String NM_MIN_PER_DISK_FREE_SPACE_MB =
NM_DISK_HEALTH_CHECK_PREFIX + "min-free-space-per-disk-mb";
/**
* By default, all of the disk can be used before it is marked as offline.
*/
public static final long DEFAULT_NM_MIN_PER_DISK_FREE_SPACE_MB = 0;
/** Frequency of running node health script.*/ /** Frequency of running node health script.*/
public static final String NM_HEALTH_CHECK_INTERVAL_MS = public static final String NM_HEALTH_CHECK_INTERVAL_MS =

View File

@ -870,6 +870,24 @@
<value>0.25</value> <value>0.25</value>
</property> </property>
<property>
<description>The maximum percentage of disk space utilization allowed after
which a disk is marked as bad. Values can range from 0.0 to 100.0.
If the value is greater than or equal to 100, the nodemanager will check
for full disk. This applies to yarn-nodemanager.local-dirs and
yarn.nodemanager.log-dirs.</description>
<name>yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage</name>
<value>100.0</value>
</property>
<property>
<description>The minimum space that must be available on a disk for
it to be used. This applies to yarn-nodemanager.local-dirs and
yarn.nodemanager.log-dirs.</description>
<name>yarn.nodemanager.disk-health-checker.min-free-space-per-disk-mb</name>
<value>0</value>
</property>
<property> <property>
<description>The path to the Linux container executor.</description> <description>The path to the Linux container executor.</description>
<name>yarn.nodemanager.linux-container-executor.path</name> <name>yarn.nodemanager.linux-container-executor.path</name>

View File

@ -22,6 +22,7 @@ import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
@ -44,9 +45,79 @@ class DirectoryCollection {
private List<String> failedDirs; private List<String> failedDirs;
private int numFailures; private int numFailures;
private float diskUtilizationPercentageCutoff;
private long diskUtilizationSpaceCutoff;
/**
* Create collection for the directories specified. No check for free space.
*
* @param dirs
* directories to be monitored
*/
public DirectoryCollection(String[] dirs) { public DirectoryCollection(String[] dirs) {
this(dirs, 100.0F, 0);
}
/**
* Create collection for the directories specified. Users must specify the
* maximum percentage of disk utilization allowed. Minimum amount of disk
* space is not checked.
*
* @param dirs
* directories to be monitored
* @param utilizationPercentageCutOff
* percentage of disk that can be used before the dir is taken out of
* the good dirs list
*
*/
public DirectoryCollection(String[] dirs, float utilizationPercentageCutOff) {
this(dirs, utilizationPercentageCutOff, 0);
}
/**
* Create collection for the directories specified. Users must specify the
* minimum amount of free space that must be available for the dir to be used.
*
* @param dirs
* directories to be monitored
* @param utilizationSpaceCutOff
* minimum space, in MB, that must be available on the disk for the
* dir to be marked as good
*
*/
public DirectoryCollection(String[] dirs, long utilizationSpaceCutOff) {
this(dirs, 100.0F, utilizationSpaceCutOff);
}
/**
* Create collection for the directories specified. Users must specify the
* maximum percentage of disk utilization allowed and the minimum amount of
* free space that must be available for the dir to be used. If either check
* fails the dir is removed from the good dirs list.
*
* @param dirs
* directories to be monitored
* @param utilizationPercentageCutOff
* percentage of disk that can be used before the dir is taken out of
* the good dirs list
* @param utilizationSpaceCutOff
* minimum space, in MB, that must be available on the disk for the
* dir to be marked as good
*
*/
public DirectoryCollection(String[] dirs,
float utilizationPercentageCutOff,
long utilizationSpaceCutOff) {
localDirs = new CopyOnWriteArrayList<String>(dirs); localDirs = new CopyOnWriteArrayList<String>(dirs);
failedDirs = new CopyOnWriteArrayList<String>(); failedDirs = new CopyOnWriteArrayList<String>();
diskUtilizationPercentageCutoff = utilizationPercentageCutOff;
diskUtilizationSpaceCutoff = utilizationSpaceCutOff;
diskUtilizationPercentageCutoff =
utilizationPercentageCutOff < 0.0F ? 0.0F
: (utilizationPercentageCutOff > 100.0F ? 100.0F
: utilizationPercentageCutOff);
diskUtilizationSpaceCutoff =
utilizationSpaceCutOff < 0 ? 0 : utilizationSpaceCutOff;
} }
/** /**
@ -103,20 +174,56 @@ class DirectoryCollection {
*/ */
synchronized boolean checkDirs() { synchronized boolean checkDirs() {
int oldNumFailures = numFailures; int oldNumFailures = numFailures;
HashSet<String> checkFailedDirs = new HashSet<String>();
for (final String dir : localDirs) { for (final String dir : localDirs) {
try { try {
DiskChecker.checkDir(new File(dir)); File testDir = new File(dir);
DiskChecker.checkDir(testDir);
if (isDiskUsageUnderPercentageLimit(testDir)) {
LOG.warn("Directory " + dir
+ " error, used space above threshold of "
+ diskUtilizationPercentageCutoff
+ "%, removing from the list of valid directories.");
checkFailedDirs.add(dir);
} else if (isDiskFreeSpaceWithinLimit(testDir)) {
LOG.warn("Directory " + dir + " error, free space below limit of "
+ diskUtilizationSpaceCutoff
+ "MB, removing from the list of valid directories.");
checkFailedDirs.add(dir);
}
} catch (DiskErrorException de) { } catch (DiskErrorException de) {
LOG.warn("Directory " + dir + " error " + LOG.warn("Directory " + dir + " error " + de.getMessage()
de.getMessage() + ", removing from the list of valid directories."); + ", removing from the list of valid directories.");
checkFailedDirs.add(dir);
}
}
for (String dir : checkFailedDirs) {
localDirs.remove(dir); localDirs.remove(dir);
failedDirs.add(dir); failedDirs.add(dir);
numFailures++; numFailures++;
} }
}
return numFailures > oldNumFailures; return numFailures > oldNumFailures;
} }
private boolean isDiskUsageUnderPercentageLimit(File dir) {
float freePercentage =
100 * (dir.getUsableSpace() / (float) dir.getTotalSpace());
float usedPercentage = 100.0F - freePercentage;
if (usedPercentage > diskUtilizationPercentageCutoff
|| usedPercentage >= 100.0F) {
return true;
}
return false;
}
private boolean isDiskFreeSpaceWithinLimit(File dir) {
long freeSpace = dir.getUsableSpace() / (1024 * 1024);
if (freeSpace < this.diskUtilizationSpaceCutoff) {
return true;
}
return false;
}
private void createDir(FileContext localFs, Path dir, FsPermission perm) private void createDir(FileContext localFs, Path dir, FsPermission perm)
throws IOException { throws IOException {
if (dir == null) { if (dir == null) {
@ -132,4 +239,26 @@ class DirectoryCollection {
} }
} }
} }
public float getDiskUtilizationPercentageCutoff() {
return diskUtilizationPercentageCutoff;
}
public void setDiskUtilizationPercentageCutoff(
float diskUtilizationPercentageCutoff) {
this.diskUtilizationPercentageCutoff =
diskUtilizationPercentageCutoff < 0.0F ? 0.0F
: (diskUtilizationPercentageCutoff > 100.0F ? 100.0F
: diskUtilizationPercentageCutoff);
}
public long getDiskUtilizationSpaceCutoff() {
return diskUtilizationSpaceCutoff;
}
public void setDiskUtilizationSpaceCutoff(long diskUtilizationSpaceCutoff) {
diskUtilizationSpaceCutoff =
diskUtilizationSpaceCutoff < 0 ? 0 : diskUtilizationSpaceCutoff;
this.diskUtilizationSpaceCutoff = diskUtilizationSpaceCutoff;
}
} }

View File

@ -89,10 +89,22 @@ public class LocalDirsHandlerService extends AbstractService {
private final class MonitoringTimerTask extends TimerTask { private final class MonitoringTimerTask extends TimerTask {
public MonitoringTimerTask(Configuration conf) throws YarnRuntimeException { public MonitoringTimerTask(Configuration conf) throws YarnRuntimeException {
localDirs = new DirectoryCollection( float maxUsableSpacePercentagePerDisk =
validatePaths(conf.getTrimmedStrings(YarnConfiguration.NM_LOCAL_DIRS))); conf.getFloat(
logDirs = new DirectoryCollection( YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
validatePaths(conf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS))); YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE);
long minFreeSpacePerDiskMB =
conf.getLong(YarnConfiguration.NM_MIN_PER_DISK_FREE_SPACE_MB,
YarnConfiguration.DEFAULT_NM_MIN_PER_DISK_FREE_SPACE_MB);
localDirs =
new DirectoryCollection(
validatePaths(conf
.getTrimmedStrings(YarnConfiguration.NM_LOCAL_DIRS)),
maxUsableSpacePercentagePerDisk, minFreeSpacePerDiskMB);
logDirs =
new DirectoryCollection(
validatePaths(conf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS)),
maxUsableSpacePercentagePerDisk, minFreeSpacePerDiskMB);
localDirsAllocator = new LocalDirAllocator( localDirsAllocator = new LocalDirAllocator(
YarnConfiguration.NM_LOCAL_DIRS); YarnConfiguration.NM_LOCAL_DIRS);
logDirsAllocator = new LocalDirAllocator(YarnConfiguration.NM_LOG_DIRS); logDirsAllocator = new LocalDirAllocator(YarnConfiguration.NM_LOG_DIRS);

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -55,8 +56,11 @@ public class TestDirectoryCollection {
@Test @Test
public void testConcurrentAccess() throws IOException { public void testConcurrentAccess() throws IOException {
// Initialize DirectoryCollection with a file instead of a directory // Initialize DirectoryCollection with a file instead of a directory
Configuration conf = new Configuration();
String[] dirs = {testFile.getPath()}; String[] dirs = {testFile.getPath()};
DirectoryCollection dc = new DirectoryCollection(dirs); DirectoryCollection dc = new DirectoryCollection(dirs,
conf.getFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE));
// Create an iterator before checkDirs is called to reliable test case // Create an iterator before checkDirs is called to reliable test case
List<String> list = dc.getGoodDirs(); List<String> list = dc.getGoodDirs();
@ -88,7 +92,9 @@ public class TestDirectoryCollection {
localFs.setPermission(pathC, permDirC); localFs.setPermission(pathC, permDirC);
String[] dirs = { dirA, dirB, dirC }; String[] dirs = { dirA, dirB, dirC };
DirectoryCollection dc = new DirectoryCollection(dirs); DirectoryCollection dc = new DirectoryCollection(dirs,
conf.getFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE));
FsPermission defaultPerm = FsPermission.getDefault() FsPermission defaultPerm = FsPermission.getDefault()
.applyUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK)); .applyUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK));
boolean createResult = dc.createNonExistentDirs(localFs, defaultPerm); boolean createResult = dc.createNonExistentDirs(localFs, defaultPerm);
@ -104,4 +110,85 @@ public class TestDirectoryCollection {
Assert.assertEquals("existing local directory permissions modified", Assert.assertEquals("existing local directory permissions modified",
permDirC, status.getPermission()); permDirC, status.getPermission());
} }
@Test
public void testDiskSpaceUtilizationLimit() throws IOException {
String dirA = new File(testDir, "dirA").getPath();
String[] dirs = { dirA };
DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F);
dc.checkDirs();
Assert.assertEquals(0, dc.getGoodDirs().size());
Assert.assertEquals(1, dc.getFailedDirs().size());
dc = new DirectoryCollection(dirs, 100.0F);
dc.checkDirs();
Assert.assertEquals(1, dc.getGoodDirs().size());
Assert.assertEquals(0, dc.getFailedDirs().size());
dc = new DirectoryCollection(dirs, testDir.getTotalSpace() / (1024 * 1024));
dc.checkDirs();
Assert.assertEquals(0, dc.getGoodDirs().size());
Assert.assertEquals(1, dc.getFailedDirs().size());
dc = new DirectoryCollection(dirs, 100.0F, 0);
dc.checkDirs();
Assert.assertEquals(1, dc.getGoodDirs().size());
Assert.assertEquals(0, dc.getFailedDirs().size());
}
@Test
public void testDiskLimitsCutoffSetters() {
String[] dirs = { "dir" };
DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F, 100);
float testValue = 57.5F;
float delta = 0.1F;
dc.setDiskUtilizationPercentageCutoff(testValue);
Assert.assertEquals(testValue, dc.getDiskUtilizationPercentageCutoff(),
delta);
testValue = -57.5F;
dc.setDiskUtilizationPercentageCutoff(testValue);
Assert.assertEquals(0.0F, dc.getDiskUtilizationPercentageCutoff(), delta);
testValue = 157.5F;
dc.setDiskUtilizationPercentageCutoff(testValue);
Assert.assertEquals(100.0F, dc.getDiskUtilizationPercentageCutoff(), delta);
long spaceValue = 57;
dc.setDiskUtilizationSpaceCutoff(spaceValue);
Assert.assertEquals(spaceValue, dc.getDiskUtilizationSpaceCutoff());
spaceValue = -57;
dc.setDiskUtilizationSpaceCutoff(spaceValue);
Assert.assertEquals(0, dc.getDiskUtilizationSpaceCutoff());
}
@Test
public void testConstructors() {
String[] dirs = { "dir" };
float delta = 0.1F;
DirectoryCollection dc = new DirectoryCollection(dirs);
Assert.assertEquals(100.0F, dc.getDiskUtilizationPercentageCutoff(), delta);
Assert.assertEquals(0, dc.getDiskUtilizationSpaceCutoff());
dc = new DirectoryCollection(dirs, 57.5F);
Assert.assertEquals(57.5F, dc.getDiskUtilizationPercentageCutoff(), delta);
Assert.assertEquals(0, dc.getDiskUtilizationSpaceCutoff());
dc = new DirectoryCollection(dirs, 57);
Assert.assertEquals(100.0F, dc.getDiskUtilizationPercentageCutoff(), delta);
Assert.assertEquals(57, dc.getDiskUtilizationSpaceCutoff());
dc = new DirectoryCollection(dirs, 57.5F, 67);
Assert.assertEquals(57.5F, dc.getDiskUtilizationPercentageCutoff(), delta);
Assert.assertEquals(67, dc.getDiskUtilizationSpaceCutoff());
dc = new DirectoryCollection(dirs, -57.5F, -67);
Assert.assertEquals(0.0F, dc.getDiskUtilizationPercentageCutoff(), delta);
Assert.assertEquals(0, dc.getDiskUtilizationSpaceCutoff());
dc = new DirectoryCollection(dirs, 157.5F, -67);
Assert.assertEquals(100.0F, dc.getDiskUtilizationPercentageCutoff(), delta);
Assert.assertEquals(0, dc.getDiskUtilizationSpaceCutoff());
}
} }