diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java index 4d56a4ee4f7..57bd3b51253 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java @@ -171,7 +171,7 @@ import org.apache.hadoop.util.ShutdownHookManager; @InterfaceAudience.Public @InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */ -public final class FileContext { +public class FileContext { public static final Log LOG = LogFactory.getLog(FileContext.class); /** diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 37404342bf9..b58d5f1b6ef 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -613,6 +613,9 @@ Release 2.6.0 - UNRELEASED queue to which the apps were submitted is changed across RM restart. (Craig Welch & Chang Li via jianhe) + YARN-2566. DefaultContainerExecutor should pick a working directory randomly. + (Zhihai Xu via kasha) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index dd7fa890c16..a5e746451e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -840,10 +840,10 @@ public class YarnConfiguration extends Configuration { public static final String NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE = NM_DISK_HEALTH_CHECK_PREFIX + "max-disk-utilization-per-disk-percentage"; /** - * By default, 100% of the disk can be used before it is marked as offline. + * By default, 90% of the disk can be used before it is marked as offline. */ public static final float DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE = - 100.0F; + 90.0F; /** * The minimum space that must be available on a local dir for it to be used. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 13059f4f67e..f0b1166cc10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -948,7 +948,7 @@ for full disk. This applies to yarn-nodemanager.local-dirs and yarn.nodemanager.log-dirs. yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage - 100.0 + 90.0 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index 23d2e72e340..5d1e3df550d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; import java.util.List; +import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -103,8 +104,8 @@ public class DefaultContainerExecutor extends ContainerExecutor { createAppDirs(localDirs, user, appId); createAppLogDirs(appId, logDirs, user); - // TODO: Why pick first app dir. The same in LCE why not random? - Path appStorageDir = getFirstApplicationDir(localDirs, user, appId); + // randomly choose the local directory + Path appStorageDir = getWorkingDir(localDirs, user, appId); String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId); Path tokenDst = new Path(appStorageDir, tokenFn); @@ -466,6 +467,10 @@ public class DefaultContainerExecutor extends ContainerExecutor { return getApplicationDir(new Path(localDirs.get(0)), user, appId); } + private long getDiskFreeSpace(Path base) throws IOException { + return lfs.getFsStatus(base).getRemaining(); + } + private Path getApplicationDir(Path base, String user, String appId) { return new Path(getAppcacheDir(base, user), appId); } @@ -484,6 +489,56 @@ public class DefaultContainerExecutor extends ContainerExecutor { ContainerLocalizer.FILECACHE); } + private Path getWorkingDir(List localDirs, String user, + String appId) throws IOException { + Path appStorageDir = null; + long totalAvailable = 0L; + long[] availableOnDisk = new long[localDirs.size()]; + int i = 0; + // randomly choose the app directory + // the chance of picking a directory is proportional to + // the available space on the directory. + // firstly calculate the sum of all available space on these directories + for (String localDir : localDirs) { + Path curBase = getApplicationDir(new Path(localDir), + user, appId); + long space = 0L; + try { + space = getDiskFreeSpace(curBase); + } catch (IOException e) { + LOG.warn("Unable to get Free Space for " + curBase.toString(), e); + } + availableOnDisk[i++] = space; + totalAvailable += space; + } + + // throw an IOException if totalAvailable is 0. + if (totalAvailable <= 0L) { + throw new IOException("Not able to find a working directory for " + + user); + } + + // make probability to pick a directory proportional to + // the available space on the directory. + Random r = new Random(); + long randomPosition = Math.abs(r.nextLong()) % totalAvailable; + int dir = 0; + // skip zero available space directory, + // because totalAvailable is greater than 0 and randomPosition + // is less than totalAvailable, we can find a valid directory + // with nonzero available space. + while (availableOnDisk[dir] == 0L) { + dir++; + } + while (randomPosition > availableOnDisk[dir]) { + randomPosition -= availableOnDisk[dir++]; + } + appStorageDir = getApplicationDir(new Path(localDirs.get(dir)), + user, appId); + + return appStorageDir; + } + protected void createDir(Path dirPath, FsPermission perms, boolean createParent, String user) throws IOException { lfs.mkdir(dirPath, perms, createParent); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java index f6f0e9fd5ba..7db74f382cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.junit.Assert.assertTrue; +import java.io.DataOutputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; @@ -41,14 +42,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Random; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.AbstractFileSystem; @@ -57,20 +50,30 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.FsStatus; import org.apache.hadoop.fs.Options.CreateOpts; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FakeFSDataInputStream; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.junit.After; -import org.junit.Assert; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -296,6 +299,102 @@ public class TestDefaultContainerExecutor { } } + @Test(timeout = 30000) + public void testStartLocalizer() + throws IOException, InterruptedException { + InetSocketAddress localizationServerAddress; + final Path firstDir = new Path(BASE_TMP_PATH, "localDir1"); + List localDirs = new ArrayList(); + final Path secondDir = new Path(BASE_TMP_PATH, "localDir2"); + List logDirs = new ArrayList(); + final Path logDir = new Path(BASE_TMP_PATH, "logDir"); + final Path tokenDir = new Path(BASE_TMP_PATH, "tokenDir"); + FsPermission perms = new FsPermission((short)0770); + + Configuration conf = new Configuration(); + localizationServerAddress = conf.getSocketAddr( + YarnConfiguration.NM_BIND_HOST, + YarnConfiguration.NM_LOCALIZER_ADDRESS, + YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS, + YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT); + + final FileContext mockLfs = spy(FileContext.getLocalFSFileContext(conf)); + final FileContext.Util mockUtil = spy(mockLfs.util()); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) + throws Throwable { + return mockUtil; + } + }).when(mockLfs).util(); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) + throws Throwable { + Path dest = (Path) invocationOnMock.getArguments()[1]; + if (dest.toString().contains(firstDir.toString())) { + // throw an Exception when copy token to the first local dir + // to simulate no space on the first drive + throw new IOException("No space on this drive " + + dest.toString()); + } else { + // copy token to the second local dir + DataOutputStream tokenOut = null; + try { + Credentials credentials = new Credentials(); + tokenOut = mockLfs.create(dest, + EnumSet.of(CREATE, OVERWRITE)); + credentials.writeTokenStorageToStream(tokenOut); + } finally { + if (tokenOut != null) { + tokenOut.close(); + } + } + } + return null; + } + }).when(mockUtil).copy(any(Path.class), any(Path.class)); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) + throws Throwable { + Path p = (Path) invocationOnMock.getArguments()[0]; + // let second local directory return more free space than + // first local directory + if (p.toString().contains(firstDir.toString())) { + return new FsStatus(2000, 2000, 0); + } else { + return new FsStatus(1000, 0, 1000); + } + } + }).when(mockLfs).getFsStatus(any(Path.class)); + + DefaultContainerExecutor mockExec = spy(new DefaultContainerExecutor( + mockLfs)); + mockExec.setConf(conf); + localDirs.add(mockLfs.makeQualified(firstDir).toString()); + localDirs.add(mockLfs.makeQualified(secondDir).toString()); + logDirs.add(mockLfs.makeQualified(logDir).toString()); + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, + localDirs.toArray(new String[localDirs.size()])); + conf.set(YarnConfiguration.NM_LOG_DIRS, logDir.toString()); + mockLfs.mkdir(tokenDir, perms, true); + Path nmPrivateCTokensPath = new Path(tokenDir, "test.tokens"); + String appSubmitter = "nobody"; + String appId = "APP_ID"; + String locId = "LOC_ID"; + try { + mockExec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress, + appSubmitter, appId, locId, localDirs, logDirs); + } catch (IOException e) { + Assert.fail("StartLocalizer failed to copy token file " + e); + } finally { + mockExec.deleteAsUser(appSubmitter, firstDir); + mockExec.deleteAsUser(appSubmitter, secondDir); + mockExec.deleteAsUser(appSubmitter, logDir); + deleteTmpFiles(); + } + } // @Test // public void testInit() throws IOException, InterruptedException { // Configuration conf = new Configuration();