Merging trunk after fixing conflict with HDFS-4434.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1470089 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2013-04-20 00:02:43 +00:00
commit 9af0babe7e
38 changed files with 1342 additions and 532 deletions

View File

@ -163,6 +163,9 @@ Trunk (Unreleased)
HADOOP-9258 Add stricter tests to FileSystemContractTestBase (stevel) HADOOP-9258 Add stricter tests to FileSystemContractTestBase (stevel)
HADOOP-9486. Promoted Windows and Shell related utils from YARN to Hadoop
Common. (Chris Nauroth via vinodkv)
BUG FIXES BUG FIXES
HADOOP-9451. Fault single-layer config if node group topology is enabled. HADOOP-9451. Fault single-layer config if node group topology is enabled.
@ -367,6 +370,10 @@ Trunk (Unreleased)
HADOOP-9433 TestLocalFileSystem#testHasFileDescriptor leaks file handle HADOOP-9433 TestLocalFileSystem#testHasFileDescriptor leaks file handle
(Chris Nauroth via sanjay) (Chris Nauroth via sanjay)
HADOOP-9488. FileUtil#createJarWithClassPath only substitutes environment
variables from current process environment/does not support overriding
when launching new process (Chris Nauroth via bikas)
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-7761. Improve the performance of raw comparisons. (todd) HADOOP-7761. Improve the performance of raw comparisons. (todd)

View File

@ -1039,15 +1039,17 @@ public class FileUtil {
* *
* @param inputClassPath String input classpath to bundle into the jar manifest * @param inputClassPath String input classpath to bundle into the jar manifest
* @param pwd Path to working directory to save jar * @param pwd Path to working directory to save jar
* @param callerEnv Map<String, String> caller's environment variables to use
* for expansion
* @return String absolute path to new jar * @return String absolute path to new jar
* @throws IOException if there is an I/O error while writing the jar file * @throws IOException if there is an I/O error while writing the jar file
*/ */
public static String createJarWithClassPath(String inputClassPath, Path pwd) public static String createJarWithClassPath(String inputClassPath, Path pwd,
throws IOException { Map<String, String> callerEnv) throws IOException {
// Replace environment variables, case-insensitive on Windows // Replace environment variables, case-insensitive on Windows
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Map<String, String> env = Shell.WINDOWS ? Map<String, String> env = Shell.WINDOWS ? new CaseInsensitiveMap(callerEnv) :
new CaseInsensitiveMap(System.getenv()) : System.getenv(); callerEnv;
String[] classPathEntries = inputClassPath.split(File.pathSeparator); String[] classPathEntries = inputClassPath.split(File.pathSeparator);
for (int i = 0; i < classPathEntries.length; ++i) { for (int i = 0; i < classPathEntries.length; ++i) {
classPathEntries[i] = StringUtils.replaceTokens(classPathEntries[i], classPathEntries[i] = StringUtils.replaceTokens(classPathEntries[i],
@ -1078,9 +1080,22 @@ public class FileUtil {
} }
} }
} else { } else {
// Append just this jar // Append just this entry
classPathEntryList.add(new File(classPathEntry).toURI().toURL() String classPathEntryUrl = new File(classPathEntry).toURI().toURL()
.toExternalForm()); .toExternalForm();
// File.toURI only appends trailing '/' if it can determine that it is a
// directory that already exists. (See JavaDocs.) If this entry had a
// trailing '/' specified by the caller, then guarantee that the
// classpath entry in the manifest has a trailing '/', and thus refers to
// a directory instead of a file. This can happen if the caller is
// creating a classpath jar referencing a directory that hasn't been
// created yet, but will definitely be created before running.
if (classPathEntry.endsWith(Path.SEPARATOR) &&
!classPathEntryUrl.endsWith(Path.SEPARATOR)) {
classPathEntryUrl = classPathEntryUrl + Path.SEPARATOR;
}
classPathEntryList.add(classPathEntryUrl);
} }
} }
String jarClassPath = StringUtils.join(" ", classPathEntryList); String jarClassPath = StringUtils.join(" ", classPathEntryList);

View File

@ -123,6 +123,56 @@ abstract public class Shell {
: new String[] { "ln", "-s", target, link }; : new String[] { "ln", "-s", target, link };
} }
/** Return a command for determining if process with specified pid is alive. */
public static String[] getCheckProcessIsAliveCommand(String pid) {
return Shell.WINDOWS ?
new String[] { Shell.WINUTILS, "task", "isAlive", pid } :
new String[] { "kill", "-0", isSetsidAvailable ? "-" + pid : pid };
}
/** Return a command to send a signal to a given pid */
public static String[] getSignalKillCommand(int code, String pid) {
return Shell.WINDOWS ? new String[] { Shell.WINUTILS, "task", "kill", pid } :
new String[] { "kill", "-" + code, isSetsidAvailable ? "-" + pid : pid };
}
/**
* Returns a File referencing a script with the given basename, inside the
* given parent directory. The file extension is inferred by platform: ".cmd"
* on Windows, or ".sh" otherwise.
*
* @param parent File parent directory
* @param basename String script file basename
* @return File referencing the script in the directory
*/
public static File appendScriptExtension(File parent, String basename) {
return new File(parent, appendScriptExtension(basename));
}
/**
* Returns a script file name with the given basename. The file extension is
* inferred by platform: ".cmd" on Windows, or ".sh" otherwise.
*
* @param basename String script file basename
* @return String script file name
*/
public static String appendScriptExtension(String basename) {
return basename + (WINDOWS ? ".cmd" : ".sh");
}
/**
* Returns a command to run the given script. The script interpreter is
* inferred by platform: cmd on Windows or bash otherwise.
*
* @param script File script to run
* @return String[] command to run the script
*/
public static String[] getRunScriptCommand(File script) {
String absolutePath = script.getAbsolutePath();
return WINDOWS ? new String[] { "cmd", "/c", absolutePath } :
new String[] { "/bin/bash", absolutePath };
}
/** a Unix command to set permission */ /** a Unix command to set permission */
public static final String SET_PERMISSION_COMMAND = "chmod"; public static final String SET_PERMISSION_COMMAND = "chmod";
/** a Unix command to set owner */ /** a Unix command to set owner */
@ -243,6 +293,26 @@ abstract public class Shell {
return winUtilsPath; return winUtilsPath;
} }
public static final boolean isSetsidAvailable = isSetsidSupported();
private static boolean isSetsidSupported() {
if (Shell.WINDOWS) {
return false;
}
ShellCommandExecutor shexec = null;
boolean setsidSupported = true;
try {
String[] args = {"setsid", "bash", "-c", "echo $$"};
shexec = new ShellCommandExecutor(args);
shexec.execute();
} catch (IOException ioe) {
LOG.warn("setsid is not available on this machine. So not using it.");
setsidSupported = false;
} finally { // handle the exit code
LOG.info("setsid exited with exit code " + shexec.getExitCode());
}
return setsidSupported;
}
/** Token separator regex used to parse Shell tool outputs */ /** Token separator regex used to parse Shell tool outputs */
public static final String TOKEN_SEPARATOR_REGEX public static final String TOKEN_SEPARATOR_REGEX
= WINDOWS ? "[|\n\r]" : "[ \t\n\r\f]"; = WINDOWS ? "[|\n\r]" : "[ \t\n\r\f]";

View File

@ -24,6 +24,10 @@
#define ERROR_TASK_NOT_ALIVE 1 #define ERROR_TASK_NOT_ALIVE 1
// This exit code for killed processes is compatible with Unix, where a killed
// process exits with 128 + signal. For SIGKILL, this would be 128 + 9 = 137.
#define KILLED_PROCESS_EXIT_CODE 137
// List of different task related command line options supported by // List of different task related command line options supported by
// winutils. // winutils.
typedef enum TaskCommandOptionType typedef enum TaskCommandOptionType
@ -264,7 +268,7 @@ DWORD killTask(_TCHAR* jobObjName)
return err; return err;
} }
if(TerminateJobObject(jobObject, 1) == 0) if(TerminateJobObject(jobObject, KILLED_PROCESS_EXIT_CODE) == 0)
{ {
return GetLastError(); return GetLastError();
} }

View File

@ -755,11 +755,13 @@ public class TestFileUtil {
// create classpath jar // create classpath jar
String wildcardPath = tmp.getCanonicalPath() + File.separator + "*"; String wildcardPath = tmp.getCanonicalPath() + File.separator + "*";
String nonExistentSubdir = tmp.getCanonicalPath() + Path.SEPARATOR + "subdir"
+ Path.SEPARATOR;
List<String> classPaths = Arrays.asList("cp1.jar", "cp2.jar", wildcardPath, List<String> classPaths = Arrays.asList("cp1.jar", "cp2.jar", wildcardPath,
"cp3.jar"); "cp3.jar", nonExistentSubdir);
String inputClassPath = StringUtils.join(File.pathSeparator, classPaths); String inputClassPath = StringUtils.join(File.pathSeparator, classPaths);
String classPathJar = FileUtil.createJarWithClassPath(inputClassPath, String classPathJar = FileUtil.createJarWithClassPath(inputClassPath,
new Path(tmp.getCanonicalPath())); new Path(tmp.getCanonicalPath()), System.getenv());
// verify classpath by reading manifest from jar file // verify classpath by reading manifest from jar file
JarFile jarFile = null; JarFile jarFile = null;
@ -774,15 +776,20 @@ public class TestFileUtil {
Assert.assertNotNull(classPathAttr); Assert.assertNotNull(classPathAttr);
List<String> expectedClassPaths = new ArrayList<String>(); List<String> expectedClassPaths = new ArrayList<String>();
for (String classPath: classPaths) { for (String classPath: classPaths) {
if (!wildcardPath.equals(classPath)) { if (wildcardPath.equals(classPath)) {
expectedClassPaths.add(new File(classPath).toURI().toURL()
.toExternalForm());
} else {
// add wildcard matches // add wildcard matches
for (File wildcardMatch: wildcardMatches) { for (File wildcardMatch: wildcardMatches) {
expectedClassPaths.add(wildcardMatch.toURI().toURL() expectedClassPaths.add(wildcardMatch.toURI().toURL()
.toExternalForm()); .toExternalForm());
} }
} else if (nonExistentSubdir.equals(classPath)) {
// expect to maintain trailing path separator if present in input, even
// if directory doesn't exist yet
expectedClassPaths.add(new File(classPath).toURI().toURL()
.toExternalForm() + Path.SEPARATOR);
} else {
expectedClassPaths.add(new File(classPath).toURI().toURL()
.toExternalForm());
} }
} }
List<String> actualClassPaths = Arrays.asList(classPathAttr.split(" ")); List<String> actualClassPaths = Arrays.asList(classPathAttr.split(" "));

View File

@ -6,6 +6,8 @@ Trunk (Unreleased)
HDFS-3034. Remove the deprecated DFSOutputStream.sync() method. (szetszwo) HDFS-3034. Remove the deprecated DFSOutputStream.sync() method. (szetszwo)
HDFS-4434. Provide a mapping from INodeId to INode. (suresh)
NEW FEATURES NEW FEATURES
HDFS-3125. Add JournalService to enable Journal Daemon. (suresh) HDFS-3125. Add JournalService to enable Journal Daemon. (suresh)
@ -2558,6 +2560,9 @@ Release 0.23.8 - UNRELEASED
HDFS-4477. Secondary namenode may retain old tokens (daryn via kihwal) HDFS-4477. Secondary namenode may retain old tokens (daryn via kihwal)
HDFS-4699. TestPipelinesFailover#testPipelineRecoveryStress fails
sporadically (Chris Nauroth via kihwal)
Release 0.23.7 - UNRELEASED Release 0.23.7 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -67,7 +67,10 @@ class BlocksMap {
void close() { void close() {
if (blocks != null) {
blocks.clear(); blocks.clear();
blocks = null;
}
} }
BlockCollection getBlockCollection(Block b) { BlockCollection getBlockCollection(Block b) {

View File

@ -1286,7 +1286,10 @@ public class DataNode extends Configured
LOG.warn("checkDiskError: exception: ", e); LOG.warn("checkDiskError: exception: ", e);
if (e instanceof SocketException || e instanceof SocketTimeoutException if (e instanceof SocketException || e instanceof SocketTimeoutException
|| e instanceof ClosedByInterruptException || e instanceof ClosedByInterruptException
|| e.getMessage().startsWith("Broken pipe")) { || e.getMessage().startsWith("An established connection was aborted")
|| e.getMessage().startsWith("Broken pipe")
|| e.getMessage().startsWith("Connection reset")
|| e.getMessage().contains("java.nio.channels.SocketChannel")) {
LOG.info("Not checking disk as checkDiskError was called on a network" + LOG.info("Not checking disk as checkDiskError was called on a network" +
" related exception"); " related exception");
return; return;

View File

@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileAlreadyExistsException;
@ -60,8 +61,10 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.Content.CountsMap;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount; import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
import org.apache.hadoop.hdfs.server.namenode.Quota.Counts;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable; import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
@ -70,6 +73,8 @@ import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotAccessControlExce
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotException; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotException;
import org.apache.hadoop.hdfs.util.ByteArray; import org.apache.hadoop.hdfs.util.ByteArray;
import org.apache.hadoop.hdfs.util.ReadOnlyList; import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.hdfs.util.GSet;
import org.apache.hadoop.hdfs.util.LightWeightGSet;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -86,7 +91,7 @@ import com.google.common.base.Preconditions;
public class FSDirectory implements Closeable { public class FSDirectory implements Closeable {
private static INodeDirectoryWithQuota createRoot(FSNamesystem namesystem) { private static INodeDirectoryWithQuota createRoot(FSNamesystem namesystem) {
final INodeDirectoryWithQuota r = new INodeDirectoryWithQuota( final INodeDirectoryWithQuota r = new INodeDirectoryWithQuota(
namesystem.allocateNewInodeId(), INodeId.ROOT_INODE_ID,
INodeDirectory.ROOT_NAME, INodeDirectory.ROOT_NAME,
namesystem.createFsOwnerPermissions(new FsPermission((short) 0755))); namesystem.createFsOwnerPermissions(new FsPermission((short) 0755)));
final INodeDirectorySnapshottable s = new INodeDirectorySnapshottable(r); final INodeDirectorySnapshottable s = new INodeDirectorySnapshottable(r);
@ -94,6 +99,16 @@ public class FSDirectory implements Closeable {
return s; return s;
} }
@VisibleForTesting
static boolean CHECK_RESERVED_FILE_NAMES = true;
public final static String DOT_RESERVED_STRING = ".reserved";
public final static String DOT_RESERVED_PATH_PREFIX = Path.SEPARATOR
+ DOT_RESERVED_STRING;
public final static byte[] DOT_RESERVED =
DFSUtil.string2Bytes(DOT_RESERVED_STRING);
public final static String DOT_INODES_STRING = ".inodes";
public final static byte[] DOT_INODES =
DFSUtil.string2Bytes(DOT_INODES_STRING);
INodeDirectoryWithQuota rootDir; INodeDirectoryWithQuota rootDir;
FSImage fsImage; FSImage fsImage;
private final FSNamesystem namesystem; private final FSNamesystem namesystem;
@ -101,6 +116,7 @@ public class FSDirectory implements Closeable {
private final int maxComponentLength; private final int maxComponentLength;
private final int maxDirItems; private final int maxDirItems;
private final int lsLimit; // max list limit private final int lsLimit; // max list limit
private GSet<INode, INode> inodeMap; // Synchronized by dirLock
// lock to protect the directory and BlockMap // lock to protect the directory and BlockMap
private ReentrantReadWriteLock dirLock; private ReentrantReadWriteLock dirLock;
@ -141,6 +157,7 @@ public class FSDirectory implements Closeable {
this.dirLock = new ReentrantReadWriteLock(true); // fair this.dirLock = new ReentrantReadWriteLock(true); // fair
this.cond = dirLock.writeLock().newCondition(); this.cond = dirLock.writeLock().newCondition();
rootDir = createRoot(ns); rootDir = createRoot(ns);
inodeMap = initInodeMap(rootDir);
this.fsImage = fsImage; this.fsImage = fsImage;
int configuredLimit = conf.getInt( int configuredLimit = conf.getInt(
DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT); DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
@ -164,6 +181,16 @@ public class FSDirectory implements Closeable {
namesystem = ns; namesystem = ns;
} }
@VisibleForTesting
static LightWeightGSet<INode, INode> initInodeMap(INodeDirectory rootDir) {
// Compute the map capacity by allocating 1% of total memory
int capacity = LightWeightGSet.computeCapacity(1, "INodeMap");
LightWeightGSet<INode, INode> map = new LightWeightGSet<INode, INode>(
capacity);
map.put(rootDir);
return map;
}
private FSNamesystem getFSNamesystem() { private FSNamesystem getFSNamesystem() {
return namesystem; return namesystem;
} }
@ -272,9 +299,8 @@ public class FSDirectory implements Closeable {
if (!mkdirs(parent.toString(), permissions, true, modTime)) { if (!mkdirs(parent.toString(), permissions, true, modTime)) {
return null; return null;
} }
long id = namesystem.allocateNewInodeId();
INodeFileUnderConstruction newNode = new INodeFileUnderConstruction( INodeFileUnderConstruction newNode = new INodeFileUnderConstruction(
id, namesystem.allocateNewInodeId(),
permissions,replication, permissions,replication,
preferredBlockSize, modTime, clientName, preferredBlockSize, modTime, clientName,
clientMachine, clientNode); clientMachine, clientNode);
@ -1329,6 +1355,7 @@ public class FSDirectory implements Closeable {
// collect block // collect block
if (!targetNode.isInLatestSnapshot(latestSnapshot)) { if (!targetNode.isInLatestSnapshot(latestSnapshot)) {
targetNode.destroyAndCollectBlocks(collectedBlocks); targetNode.destroyAndCollectBlocks(collectedBlocks);
remvoedAllFromInodesFromMap(targetNode);
} else { } else {
Quota.Counts counts = targetNode.cleanSubtree(null, latestSnapshot, Quota.Counts counts = targetNode.cleanSubtree(null, latestSnapshot,
collectedBlocks); collectedBlocks);
@ -1393,6 +1420,7 @@ public class FSDirectory implements Closeable {
Preconditions.checkState(hasWriteLock()); Preconditions.checkState(hasWriteLock());
oldnode.getParent().replaceChild(oldnode, newnode); oldnode.getParent().replaceChild(oldnode, newnode);
inodeMap.put(newnode);
oldnode.clear(); oldnode.clear();
/* Currently oldnode and newnode are assumed to contain the same /* Currently oldnode and newnode are assumed to contain the same
@ -1909,6 +1937,15 @@ public class FSDirectory implements Closeable {
} }
} }
private INode getFromINodeMap(INode inode) {
readLock();
try {
return inodeMap.get(inode);
} finally {
readUnlock();
}
}
/** /**
* Add the given child to the namespace. * Add the given child to the namespace.
* @param src The full path name of the child node. * @param src The full path name of the child node.
@ -2082,6 +2119,17 @@ public class FSDirectory implements Closeable {
private boolean addChild(INodesInPath iip, int pos, private boolean addChild(INodesInPath iip, int pos,
INode child, boolean checkQuota) throws QuotaExceededException { INode child, boolean checkQuota) throws QuotaExceededException {
final INode[] inodes = iip.getINodes(); final INode[] inodes = iip.getINodes();
// Disallow creation of /.reserved. This may be created when loading
// editlog/fsimage during upgrade since /.reserved was a valid name in older
// release. This may also be called when a user tries to create a file
// or directory /.reserved.
if (pos == 1 && inodes[0] == rootDir && isReservedName(child)) {
throw new HadoopIllegalArgumentException(
"File name \"" + child.getLocalName() + "\" is reserved and cannot "
+ "be created. If this is during upgrade change the name of the "
+ "existing file or directory to another name before upgrading "
+ "to the new release.");
}
// The filesystem limits are not really quotas, so this check may appear // The filesystem limits are not really quotas, so this check may appear
// odd. It's because a rename operation deletes the src, tries to add // odd. It's because a rename operation deletes the src, tries to add
// to the dest, if that fails, re-adds the src from whence it came. // to the dest, if that fails, re-adds the src from whence it came.
@ -2106,6 +2154,7 @@ public class FSDirectory implements Closeable {
} else { } else {
// update parent node // update parent node
iip.setINode(pos - 1, child.getParent()); iip.setINode(pos - 1, child.getParent());
inodeMap.put(child);
} }
return added; return added;
} }
@ -2135,9 +2184,10 @@ public class FSDirectory implements Closeable {
if (!parent.removeChild(last, latestSnapshot)) { if (!parent.removeChild(last, latestSnapshot)) {
return -1; return -1;
} }
inodeMap.remove(last);
if (parent != last.getParent()) { if (parent != last.getParent()) {
// parent is changed // parent is changed
inodeMap.put(last.getParent());
iip.setINode(-2, last.getParent()); iip.setINode(-2, last.getParent());
} }
@ -2181,6 +2231,29 @@ public class FSDirectory implements Closeable {
} }
} }
/** This method is always called with writeLock held */
final void addToInodeMapUnprotected(INode inode) {
inodeMap.put(inode);
}
/* This method is always called with writeLock held */
private final void removeFromInodeMap(INode inode) {
inodeMap.remove(inode);
}
/** Remove all the inodes under given inode from the map */
private void remvoedAllFromInodesFromMap(INode inode) {
removeFromInodeMap(inode);
if (!inode.isDirectory()) {
return;
}
INodeDirectory dir = (INodeDirectory) inode;
for (INode child : dir.getChildrenList(null)) {
remvoedAllFromInodesFromMap(child);
}
dir.clearChildren();
}
/** /**
* See {@link ClientProtocol#setQuota(String, long, long)} for the contract. * See {@link ClientProtocol#setQuota(String, long, long)} for the contract.
* Sets quota for for a directory. * Sets quota for for a directory.
@ -2286,7 +2359,7 @@ public class FSDirectory implements Closeable {
boolean status = false; boolean status = false;
writeLock(); writeLock();
try { try {
status = unprotectedSetTimes(src, inode, mtime, atime, force, latest); status = unprotectedSetTimes(inode, mtime, atime, force, latest);
} finally { } finally {
writeUnlock(); writeUnlock();
} }
@ -2299,11 +2372,11 @@ public class FSDirectory implements Closeable {
throws UnresolvedLinkException, QuotaExceededException { throws UnresolvedLinkException, QuotaExceededException {
assert hasWriteLock(); assert hasWriteLock();
final INodesInPath i = getLastINodeInPath(src); final INodesInPath i = getLastINodeInPath(src);
return unprotectedSetTimes(src, i.getLastINode(), mtime, atime, force, return unprotectedSetTimes(i.getLastINode(), mtime, atime, force,
i.getLatestSnapshot()); i.getLatestSnapshot());
} }
private boolean unprotectedSetTimes(String src, INode inode, long mtime, private boolean unprotectedSetTimes(INode inode, long mtime,
long atime, boolean force, Snapshot latest) throws QuotaExceededException { long atime, boolean force, Snapshot latest) throws QuotaExceededException {
assert hasWriteLock(); assert hasWriteLock();
boolean status = false; boolean status = false;
@ -2496,5 +2569,128 @@ public class FSDirectory implements Closeable {
void shutdown() { void shutdown() {
nameCache.reset(); nameCache.reset();
inodeMap.clear();
inodeMap = null;
}
@VisibleForTesting
INode getInode(long id) {
INode inode = new INodeWithAdditionalFields(id, null, new PermissionStatus(
"", "", new FsPermission((short) 0)), 0, 0) {
@Override
INode recordModification(Snapshot latest) throws QuotaExceededException {
return null;
}
@Override
public void destroyAndCollectBlocks(BlocksMapUpdateInfo collectedBlocks) {
// Nothing to do
}
@Override
public Counts computeQuotaUsage(Counts counts, boolean useCache) {
return null;
}
@Override
public Content.Counts computeContentSummary(Content.Counts counts) {
return null;
}
@Override
public CountsMap computeContentSummary(CountsMap countsMap) {
return null;
}
@Override
public Counts cleanSubtree(Snapshot snapshot, Snapshot prior,
BlocksMapUpdateInfo collectedBlocks) throws QuotaExceededException {
return null;
}
};
return getFromINodeMap(inode);
}
/**
* Given an INode get all the path complents leading to it from the root.
* If an Inode corresponding to C is given in /A/B/C, the returned
* patch components will be {root, A, B, C}
*/
static byte[][] getPathComponents(INode inode) {
List<byte[]> components = new ArrayList<byte[]>();
components.add(0, inode.getLocalNameBytes());
while(inode.getParent() != null) {
components.add(0, inode.getParent().getLocalNameBytes());
inode = inode.getParent();
}
return components.toArray(new byte[components.size()][]);
}
/**
* @return path components for reserved path, else null.
*/
static byte[][] getPathComponentsForReservedPath(String src) {
return !isReservedName(src) ? null : INode.getPathComponents(src);
}
/**
* Resolve the path of /.reserved/.inodes/<inodeid>/... to a regular path
*
* @param src path that is being processed
* @param pathComponents path components corresponding to the path
* @param fsd FSDirectory
* @return if the path indicates an inode, return path after replacing upto
* <inodeid> with the corresponding path of the inode, else the path
* in {@code src} as is.
* @throws FileNotFoundException if inodeid is invalid
*/
static String resolvePath(String src, byte[][] pathComponents, FSDirectory fsd)
throws FileNotFoundException {
if (pathComponents == null || pathComponents.length <= 3) {
return src;
}
// Not /.reserved/.inodes
if (!Arrays.equals(DOT_RESERVED, pathComponents[1])
|| !Arrays.equals(DOT_INODES, pathComponents[2])) { // Not .inodes path
return src;
}
final String inodeId = DFSUtil.bytes2String(pathComponents[3]);
long id = 0;
try {
id = Long.valueOf(inodeId);
} catch (NumberFormatException e) {
throw new FileNotFoundException(
"File for given inode path does not exist: " + src);
}
if (id == INodeId.ROOT_INODE_ID && pathComponents.length == 4) {
return Path.SEPARATOR;
}
StringBuilder path = id == INodeId.ROOT_INODE_ID ? new StringBuilder()
: new StringBuilder(fsd.getInode(id).getFullPathName());
for (int i = 4; i < pathComponents.length; i++) {
path.append(Path.SEPARATOR).append(DFSUtil.bytes2String(pathComponents[i]));
}
if (NameNode.LOG.isDebugEnabled()) {
NameNode.LOG.debug("Resolved path is " + path);
}
return path.toString();
}
@VisibleForTesting
int getInodeMapSize() {
return inodeMap.size();
}
/** Check if a given inode name is reserved */
public static boolean isReservedName(INode inode) {
return CHECK_RESERVED_FILE_NAMES
&& Arrays.equals(inode.getLocalNameBytes(), DOT_RESERVED);
}
/** Check if a given path is reserved */
public static boolean isReservedName(String src) {
return src.startsWith(DOT_RESERVED_PATH_PREFIX);
} }
} }

View File

@ -735,7 +735,7 @@ public class FSImage implements Closeable {
} finally { } finally {
FSEditLog.closeAllStreams(editStreams); FSEditLog.closeAllStreams(editStreams);
// update the counts // update the counts
updateCountForQuota(target.dir.rootDir); updateCountForQuota(target.dir, target.dir.rootDir);
} }
return lastAppliedTxId - prevLastAppliedTxId; return lastAppliedTxId - prevLastAppliedTxId;
} }
@ -748,20 +748,22 @@ public class FSImage implements Closeable {
* This is an update of existing state of the filesystem and does not * This is an update of existing state of the filesystem and does not
* throw QuotaExceededException. * throw QuotaExceededException.
*/ */
static void updateCountForQuota(INodeDirectoryWithQuota root) { static void updateCountForQuota(FSDirectory fsd,
updateCountForQuotaRecursively(root, new Quota.Counts()); INodeDirectoryWithQuota root) {
updateCountForQuotaRecursively(fsd, root, new Quota.Counts());
} }
private static void updateCountForQuotaRecursively(INodeDirectory dir, private static void updateCountForQuotaRecursively(FSDirectory fsd,
Quota.Counts counts) { INodeDirectory dir, Quota.Counts counts) {
final long parentNamespace = counts.get(Quota.NAMESPACE); final long parentNamespace = counts.get(Quota.NAMESPACE);
final long parentDiskspace = counts.get(Quota.DISKSPACE); final long parentDiskspace = counts.get(Quota.DISKSPACE);
dir.computeQuotaUsage4CurrentDirectory(counts); dir.computeQuotaUsage4CurrentDirectory(counts);
for (INode child : dir.getChildrenList(null)) { for (INode child : dir.getChildrenList(null)) {
fsd.addToInodeMapUnprotected(child);
if (child.isDirectory()) { if (child.isDirectory()) {
updateCountForQuotaRecursively(child.asDirectory(), counts); updateCountForQuotaRecursively(fsd, child.asDirectory(), counts);
} else { } else {
// file or symlink: count here to reduce recursive calls. // file or symlink: count here to reduce recursive calls.
child.computeQuotaUsage(counts, false); child.computeQuotaUsage(counts, false);

View File

@ -38,6 +38,7 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -526,6 +527,13 @@ public class FSImageFormat {
* modification time update and space count update are not needed. * modification time update and space count update are not needed.
*/ */
private void addToParent(INodeDirectory parent, INode child) { private void addToParent(INodeDirectory parent, INode child) {
FSDirectory fsDir = namesystem.dir;
if (parent == fsDir.rootDir && FSDirectory.isReservedName(child)) {
throw new HadoopIllegalArgumentException("File name \""
+ child.getLocalName() + "\" is reserved. Please "
+ " change the name of the existing file or directory to another "
+ "name before upgrading to this release.");
}
// NOTE: This does not update space counts for parents // NOTE: This does not update space counts for parents
if (!parent.addChild(child)) { if (!parent.addChild(child)) {
return; return;

View File

@ -912,7 +912,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
} }
@Override
public void checkOperation(OperationCategory op) throws StandbyException { public void checkOperation(OperationCategory op) throws StandbyException {
if (haContext != null) { if (haContext != null) {
// null in some unit tests // null in some unit tests
@ -1217,12 +1217,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
HdfsFileStatus resultingStat = null; HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) { if (isInSafeMode()) {
throw new SafeModeException("Cannot set permission for " + src, safeMode); throw new SafeModeException("Cannot set permission for " + src, safeMode);
} }
src = FSDirectory.resolvePath(src, pathComponents, dir);
checkOwner(pc, src); checkOwner(pc, src);
dir.setPermission(src, permission); dir.setPermission(src, permission);
resultingStat = getAuditFileInfo(src, false); resultingStat = getAuditFileInfo(src, false);
@ -1254,12 +1256,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
HdfsFileStatus resultingStat = null; HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) { if (isInSafeMode()) {
throw new SafeModeException("Cannot set owner for " + src, safeMode); throw new SafeModeException("Cannot set owner for " + src, safeMode);
} }
src = FSDirectory.resolvePath(src, pathComponents, dir);
checkOwner(pc, src); checkOwner(pc, src);
if (!pc.isSuperUser()) { if (!pc.isSuperUser()) {
if (username != null && !pc.getUser().equals(username)) { if (username != null && !pc.getUser().equals(username)) {
@ -1355,6 +1359,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws FileNotFoundException, throws FileNotFoundException,
UnresolvedLinkException, IOException { UnresolvedLinkException, IOException {
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
for (int attempt = 0; attempt < 2; attempt++) { for (int attempt = 0; attempt < 2; attempt++) {
boolean isReadOp = (attempt == 0); boolean isReadOp = (attempt == 0);
if (isReadOp) { // first attempt is with readlock if (isReadOp) { // first attempt is with readlock
@ -1364,6 +1369,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
writeLock(); // writelock is needed to set accesstime writeLock(); // writelock is needed to set accesstime
} }
src = FSDirectory.resolvePath(src, pathComponents, dir);
try { try {
if (isReadOp) { if (isReadOp) {
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
@ -1413,6 +1419,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* Moves all the blocks from srcs and appends them to trg * Moves all the blocks from srcs and appends them to trg
* To avoid rollbacks we will verify validitity of ALL of the args * To avoid rollbacks we will verify validitity of ALL of the args
* before we start actual move. * before we start actual move.
*
* This does not support ".inodes" relative path
* @param target * @param target
* @param srcs * @param srcs
* @throws IOException * @throws IOException
@ -1603,12 +1611,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
HdfsFileStatus resultingStat = null; HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) { if (isInSafeMode()) {
throw new SafeModeException("Cannot set times " + src, safeMode); throw new SafeModeException("Cannot set times " + src, safeMode);
} }
src = FSDirectory.resolvePath(src, pathComponents, dir);
// Write access is required to set access and modification times // Write access is required to set access and modification times
if (isPermissionEnabled) { if (isPermissionEnabled) {
@ -1635,7 +1645,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
PermissionStatus dirPerms, boolean createParent) PermissionStatus dirPerms, boolean createParent)
throws IOException, UnresolvedLinkException { throws IOException, UnresolvedLinkException {
if (!DFSUtil.isValidName(link)) { if (!DFSUtil.isValidName(link)) {
throw new InvalidPathException("Invalid file name: " + link); throw new InvalidPathException("Invalid link name: " + link);
}
if (FSDirectory.isReservedName(target)) {
throw new InvalidPathException("Invalid target name: " + target);
} }
try { try {
createSymlinkInt(target, link, dirPerms, createParent); createSymlinkInt(target, link, dirPerms, createParent);
@ -1655,12 +1668,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
HdfsFileStatus resultingStat = null; HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(link);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) { if (isInSafeMode()) {
throw new SafeModeException("Cannot create symlink " + link, safeMode); throw new SafeModeException("Cannot create symlink " + link, safeMode);
} }
link = FSDirectory.resolvePath(link, pathComponents, dir);
if (!createParent) { if (!createParent) {
verifyParentDir(link); verifyParentDir(link);
} }
@ -1707,18 +1722,20 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
} }
private boolean setReplicationInt(final String src, final short replication) private boolean setReplicationInt(String src, final short replication)
throws IOException { throws IOException {
blockManager.verifyReplication(src, replication, null); blockManager.verifyReplication(src, replication, null);
final boolean isFile; final boolean isFile;
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) { if (isInSafeMode()) {
throw new SafeModeException("Cannot set replication for " + src, safeMode); throw new SafeModeException("Cannot set replication for " + src, safeMode);
} }
src = FSDirectory.resolvePath(src, pathComponents, dir);
if (isPermissionEnabled) { if (isPermissionEnabled) {
checkPathAccess(pc, src, FsAction.WRITE); checkPathAccess(pc, src, FsAction.WRITE);
} }
@ -1744,9 +1761,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws IOException, UnresolvedLinkException { throws IOException, UnresolvedLinkException {
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(filename);
readLock(); readLock();
try { try {
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
filename = FSDirectory.resolvePath(filename, pathComponents, dir);
if (isPermissionEnabled) { if (isPermissionEnabled) {
checkTraverse(pc, filename); checkTraverse(pc, filename);
} }
@ -1819,8 +1838,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
final HdfsFileStatus stat; final HdfsFileStatus stat;
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot create file" + src, safeMode);
}
src = FSDirectory.resolvePath(src, pathComponents, dir);
startFileInternal(pc, src, permissions, holder, clientMachine, flag, startFileInternal(pc, src, permissions, holder, clientMachine, flag,
createParent, replication, blockSize); createParent, replication, blockSize);
stat = dir.getFileInfo(src, false); stat = dir.getFileInfo(src, false);
@ -1863,10 +1888,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
AccessControlException, UnresolvedLinkException, FileNotFoundException, AccessControlException, UnresolvedLinkException, FileNotFoundException,
ParentNotDirectoryException, IOException { ParentNotDirectoryException, IOException {
assert hasWriteLock(); assert hasWriteLock();
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot create file" + src, safeMode);
}
// Verify that the destination does not exist as a directory already. // Verify that the destination does not exist as a directory already.
final INodesInPath iip = dir.getINodesInPath4Write(src); final INodesInPath iip = dir.getINodesInPath4Write(src);
final INode inode = iip.getLastINode(); final INode inode = iip.getLastINode();
@ -2003,6 +2024,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
boolean skipSync = false; boolean skipSync = false;
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
@ -2010,6 +2032,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new SafeModeException( throw new SafeModeException(
"Cannot recover the lease of " + src, safeMode); "Cannot recover the lease of " + src, safeMode);
} }
src = FSDirectory.resolvePath(src, pathComponents, dir);
final INodeFile inode = INodeFile.valueOf(dir.getINode(src), src); final INodeFile inode = INodeFile.valueOf(dir.getINode(src), src);
if (!inode.isUnderConstruction()) { if (!inode.isUnderConstruction()) {
return true; return true;
@ -2127,6 +2150,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws AccessControlException, SafeModeException, throws AccessControlException, SafeModeException,
FileAlreadyExistsException, FileNotFoundException, FileAlreadyExistsException, FileNotFoundException,
ParentNotDirectoryException, IOException { ParentNotDirectoryException, IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: src=" + src
+ ", holder=" + holder
+ ", clientMachine=" + clientMachine);
}
boolean skipSync = false; boolean skipSync = false;
if (!supportAppends) { if (!supportAppends) {
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
@ -2145,8 +2173,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
LocatedBlock lb = null; LocatedBlock lb = null;
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot append to file" + src, safeMode);
}
src = FSDirectory.resolvePath(src, pathComponents, dir);
lb = startFileInternal(pc, src, null, holder, clientMachine, lb = startFileInternal(pc, src, null, holder, clientMachine,
EnumSet.of(CreateFlag.APPEND), EnumSet.of(CreateFlag.APPEND),
false, blockManager.maxReplication, 0); false, blockManager.maxReplication, 0);
@ -2210,9 +2244,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// Part I. Analyze the state of the file with respect to the input data. // Part I. Analyze the state of the file with respect to the input data.
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
readLock(); readLock();
try { try {
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
src = FSDirectory.resolvePath(src, pathComponents, dir);
LocatedBlock[] onRetryBlock = new LocatedBlock[1]; LocatedBlock[] onRetryBlock = new LocatedBlock[1];
final INode[] inodes = analyzeFileState( final INode[] inodes = analyzeFileState(
src, fileId, clientName, previous, onRetryBlock).getINodes(); src, fileId, clientName, previous, onRetryBlock).getINodes();
@ -2384,7 +2420,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
/** @see NameNode#getAdditionalDatanode(String, ExtendedBlock, DatanodeInfo[], DatanodeInfo[], int, String) */ /** @see NameNode#getAdditionalDatanode(String, ExtendedBlock, DatanodeInfo[], DatanodeInfo[], int, String) */
LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk, LocatedBlock getAdditionalDatanode(String src, final ExtendedBlock blk,
final DatanodeInfo[] existings, final HashMap<Node, Node> excludes, final DatanodeInfo[] existings, final HashMap<Node, Node> excludes,
final int numAdditionalNodes, final String clientName final int numAdditionalNodes, final String clientName
) throws IOException { ) throws IOException {
@ -2395,6 +2431,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
final long preferredblocksize; final long preferredblocksize;
final List<DatanodeDescriptor> chosen; final List<DatanodeDescriptor> chosen;
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
readLock(); readLock();
try { try {
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
@ -2403,6 +2440,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new SafeModeException("Cannot add datanode; src=" + src throw new SafeModeException("Cannot add datanode; src=" + src
+ ", blk=" + blk, safeMode); + ", blk=" + blk, safeMode);
} }
src = FSDirectory.resolvePath(src, pathComponents, dir);
//check lease //check lease
final INodeFileUnderConstruction file = checkLease(src, clientName); final INodeFileUnderConstruction file = checkLease(src, clientName);
@ -2442,6 +2480,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
+ "of file " + src); + "of file " + src);
} }
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
@ -2449,6 +2488,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new SafeModeException("Cannot abandon block " + b + throw new SafeModeException("Cannot abandon block " + b +
" for fle" + src, safeMode); " for fle" + src, safeMode);
} }
src = FSDirectory.resolvePath(src, pathComponents, dir);
// //
// Remove the block from the pending creates list // Remove the block from the pending creates list
// //
@ -2520,8 +2561,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
checkBlock(last); checkBlock(last);
boolean success = false; boolean success = false;
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot complete file " + src, safeMode);
}
src = FSDirectory.resolvePath(src, pathComponents, dir);
success = completeFileInternal(src, holder, success = completeFileInternal(src, holder,
ExtendedBlock.getLocalBlock(last)); ExtendedBlock.getLocalBlock(last));
} finally { } finally {
@ -2537,11 +2584,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
String holder, Block last) throws SafeModeException, String holder, Block last) throws SafeModeException,
UnresolvedLinkException, IOException { UnresolvedLinkException, IOException {
assert hasWriteLock(); assert hasWriteLock();
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot complete file " + src, safeMode);
}
final INodesInPath iip = dir.getLastINodeInPath(src); final INodesInPath iip = dir.getLastINodeInPath(src);
final INodeFileUnderConstruction pendingFile; final INodeFileUnderConstruction pendingFile;
try { try {
@ -2687,10 +2729,19 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src);
byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst);
boolean status = false; boolean status = false;
HdfsFileStatus resultingStat = null; HdfsFileStatus resultingStat = null;
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot rename " + src, safeMode);
}
src = FSDirectory.resolvePath(src, srcComponents, dir);
dst = FSDirectory.resolvePath(dst, dstComponents, dir);
checkOperation(OperationCategory.WRITE);
status = renameToInternal(pc, src, dst); status = renameToInternal(pc, src, dst);
if (status) { if (status) {
resultingStat = getAuditFileInfo(dst, false); resultingStat = getAuditFileInfo(dst, false);
@ -2710,10 +2761,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private boolean renameToInternal(FSPermissionChecker pc, String src, String dst) private boolean renameToInternal(FSPermissionChecker pc, String src, String dst)
throws IOException, UnresolvedLinkException { throws IOException, UnresolvedLinkException {
assert hasWriteLock(); assert hasWriteLock();
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot rename " + src, safeMode);
}
if (isPermissionEnabled) { if (isPermissionEnabled) {
//We should not be doing this. This is move() not renameTo(). //We should not be doing this. This is move() not renameTo().
//but for now, //but for now,
@ -2744,9 +2791,17 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src);
byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst);
HdfsFileStatus resultingStat = null; HdfsFileStatus resultingStat = null;
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot rename " + src, safeMode);
}
src = FSDirectory.resolvePath(src, srcComponents, dir);
dst = FSDirectory.resolvePath(dst, dstComponents, dir);
renameToInternal(pc, src, dst, options); renameToInternal(pc, src, dst, options);
resultingStat = getAuditFileInfo(dst, false); resultingStat = getAuditFileInfo(dst, false);
} finally { } finally {
@ -2765,10 +2820,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private void renameToInternal(FSPermissionChecker pc, String src, String dst, private void renameToInternal(FSPermissionChecker pc, String src, String dst,
Options.Rename... options) throws IOException { Options.Rename... options) throws IOException {
assert hasWriteLock(); assert hasWriteLock();
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot rename " + src, safeMode);
}
if (isPermissionEnabled) { if (isPermissionEnabled) {
checkParentAccess(pc, src, FsAction.WRITE); checkParentAccess(pc, src, FsAction.WRITE);
checkAncestorAccess(pc, dst, FsAction.WRITE); checkAncestorAccess(pc, dst, FsAction.WRITE);
@ -2829,12 +2880,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo(); BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) { if (isInSafeMode()) {
throw new SafeModeException("Cannot delete " + src, safeMode); throw new SafeModeException("Cannot delete " + src, safeMode);
} }
src = FSDirectory.resolvePath(src, pathComponents, dir);
if (!recursive && dir.isNonEmptyDirectory(src)) { if (!recursive && dir.isNonEmptyDirectory(src)) {
throw new IOException(src + " is non empty"); throw new IOException(src + " is non empty");
} }
@ -2961,9 +3014,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
HdfsFileStatus stat = null; HdfsFileStatus stat = null;
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
if (!DFSUtil.isValidName(src)) {
throw new InvalidPathException("Invalid file name: " + src);
}
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
readLock(); readLock();
try { try {
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
src = FSDirectory.resolvePath(src, pathComponents, dir);
if (isPermissionEnabled) { if (isPermissionEnabled) {
checkTraverse(pc, src); checkTraverse(pc, src);
} }
@ -3028,10 +3086,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
HdfsFileStatus resultingStat = null; HdfsFileStatus resultingStat = null;
boolean status = false; boolean status = false;
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot create directory " + src, safeMode);
}
src = FSDirectory.resolvePath(src, pathComponents, dir);
status = mkdirsInternal(pc, src, permissions, createParent); status = mkdirsInternal(pc, src, permissions, createParent);
if (status) { if (status) {
resultingStat = dir.getFileInfo(src, false); resultingStat = dir.getFileInfo(src, false);
@ -3053,10 +3117,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
PermissionStatus permissions, boolean createParent) PermissionStatus permissions, boolean createParent)
throws IOException, UnresolvedLinkException { throws IOException, UnresolvedLinkException {
assert hasWriteLock(); assert hasWriteLock();
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot create directory " + src, safeMode);
}
if (isPermissionEnabled) { if (isPermissionEnabled) {
checkTraverse(pc, src); checkTraverse(pc, src);
} }
@ -3087,9 +3147,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
FileNotFoundException, UnresolvedLinkException, StandbyException { FileNotFoundException, UnresolvedLinkException, StandbyException {
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
readLock(); readLock();
try { try {
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
src = FSDirectory.resolvePath(src, pathComponents, dir);
if (isPermissionEnabled) { if (isPermissionEnabled) {
checkPermission(pc, src, false, null, null, null, FsAction.READ_EXECUTE); checkPermission(pc, src, false, null, null, null, FsAction.READ_EXECUTE);
} }
@ -3103,6 +3165,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* Set the namespace quota and diskspace quota for a directory. * Set the namespace quota and diskspace quota for a directory.
* See {@link ClientProtocol#setQuota(String, long, long)} for the * See {@link ClientProtocol#setQuota(String, long, long)} for the
* contract. * contract.
*
* Note: This does not support ".inodes" relative path.
*/ */
void setQuota(String path, long nsQuota, long dsQuota) void setQuota(String path, long nsQuota, long dsQuota)
throws IOException, UnresolvedLinkException { throws IOException, UnresolvedLinkException {
@ -3132,12 +3196,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws IOException, UnresolvedLinkException { throws IOException, UnresolvedLinkException {
NameNode.stateChangeLog.info("BLOCK* fsync: " + src + " for " + clientName); NameNode.stateChangeLog.info("BLOCK* fsync: " + src + " for " + clientName);
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) { if (isInSafeMode()) {
throw new SafeModeException("Cannot fsync file " + src, safeMode); throw new SafeModeException("Cannot fsync file " + src, safeMode);
} }
src = FSDirectory.resolvePath(src, pathComponents, dir);
INodeFileUnderConstruction pendingFile = checkLease(src, clientName); INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
if (lastBlockLength > 0) { if (lastBlockLength > 0) {
pendingFile.updateLengthOfLastBlock(lastBlockLength); pendingFile.updateLengthOfLastBlock(lastBlockLength);
@ -3488,9 +3554,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
DirectoryListing dl; DirectoryListing dl;
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
readLock(); readLock();
try { try {
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
src = FSDirectory.resolvePath(src, pathComponents, dir);
if (isPermissionEnabled) { if (isPermissionEnabled) {
if (dir.isDir(src)) { if (dir.isDir(src)) {

View File

@ -21,7 +21,6 @@ import java.io.PrintStream;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -39,6 +38,7 @@ import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.util.Diff; import org.apache.hadoop.hdfs.util.Diff;
import org.apache.hadoop.hdfs.util.LightWeightGSet.LinkedElement;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -50,7 +50,7 @@ import com.google.common.base.Preconditions;
* directory inodes. * directory inodes.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public abstract class INode implements Diff.Element<byte[]> { public abstract class INode implements Diff.Element<byte[]>, LinkedElement {
public static final Log LOG = LogFactory.getLog(INode.class); public static final Log LOG = LogFactory.getLog(INode.class);
/** parent is either an {@link INodeDirectory} or an {@link INodeReference}.*/ /** parent is either an {@link INodeDirectory} or an {@link INodeReference}.*/
@ -108,6 +108,7 @@ public abstract class INode implements Diff.Element<byte[]> {
* @return group name * @return group name
*/ */
abstract String getGroupName(Snapshot snapshot); abstract String getGroupName(Snapshot snapshot);
protected LinkedElement next = null;
/** The same as getGroupName(null). */ /** The same as getGroupName(null). */
public final String getGroupName() { public final String getGroupName() {
@ -612,13 +613,13 @@ public abstract class INode implements Diff.Element<byte[]> {
if (that == null || !(that instanceof INode)) { if (that == null || !(that instanceof INode)) {
return false; return false;
} }
return Arrays.equals(this.getLocalNameBytes(), return getId() == ((INode) that).getId();
((INode)that).getLocalNameBytes());
} }
@Override @Override
public final int hashCode() { public final int hashCode() {
return Arrays.hashCode(getLocalNameBytes()); long id = getId();
return (int)(id^(id>>>32));
} }
/** /**
@ -698,4 +699,14 @@ public abstract class INode implements Diff.Element<byte[]> {
toDeleteList.clear(); toDeleteList.clear();
} }
} }
@Override
public void setNext(LinkedElement next) {
this.next = next;
}
@Override
public LinkedElement getNext() {
return next;
}
} }

View File

@ -474,8 +474,11 @@ public class INodeDirectory extends INodeWithAdditionalFields {
/** Set the children list to null. */ /** Set the children list to null. */
public void clearChildren() { public void clearChildren() {
if (children != null) {
this.children.clear();
this.children = null; this.children = null;
} }
}
@Override @Override
public void clear() { public void clear() {

View File

@ -31,9 +31,11 @@ import org.apache.hadoop.util.SequentialNumber;
@InterfaceAudience.Private @InterfaceAudience.Private
public class INodeId extends SequentialNumber { public class INodeId extends SequentialNumber {
/** /**
* The last reserved inode id. * The last reserved inode id. InodeIDs are allocated from LAST_RESERVED_ID +
* 1.
*/ */
public static final long LAST_RESERVED_ID = 1000L; public static final long LAST_RESERVED_ID = 2 << 14 - 1;
public static final long ROOT_INODE_ID = LAST_RESERVED_ID + 1;
/** /**
* The inode id validation of lease check will be skipped when the request * The inode id validation of lease check will be skipped when the request
@ -55,6 +57,6 @@ public class INodeId extends SequentialNumber {
} }
INodeId() { INodeId() {
super(LAST_RESERVED_ID); super(ROOT_INODE_ID);
} }
} }

View File

@ -154,7 +154,7 @@ public class TestFSImageWithSnapshot {
fsn.getFSDirectory().writeLock(); fsn.getFSDirectory().writeLock();
try { try {
loader.load(imageFile); loader.load(imageFile);
FSImage.updateCountForQuota( FSImage.updateCountForQuota(fsn.getFSDirectory(),
(INodeDirectoryWithQuota)fsn.getFSDirectory().getINode("/")); (INodeDirectoryWithQuota)fsn.getFSDirectory().getINode("/"));
} finally { } finally {
fsn.getFSDirectory().writeUnlock(); fsn.getFSDirectory().writeUnlock();

View File

@ -20,17 +20,28 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotDirectoryException; import org.apache.hadoop.fs.PathIsNotDirectoryException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -38,10 +49,17 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
public class TestINodeFile { public class TestINodeFile {
public static final Log LOG = LogFactory.getLog(TestINodeFile.class);
static final short BLOCKBITS = 48; static final short BLOCKBITS = 48;
static final long BLKSIZE_MAXVALUE = ~(0xffffL << BLOCKBITS); static final long BLKSIZE_MAXVALUE = ~(0xffffL << BLOCKBITS);
@ -293,6 +311,7 @@ public class TestINodeFile {
INodeDirectory.valueOf(from, path); INodeDirectory.valueOf(from, path);
fail(); fail();
} catch(PathIsNotDirectoryException e) { } catch(PathIsNotDirectoryException e) {
// Expected
} }
} }
@ -314,7 +333,8 @@ public class TestINodeFile {
try { try {
INodeDirectory.valueOf(from, path); INodeDirectory.valueOf(from, path);
fail(); fail();
} catch(PathIsNotDirectoryException e) { } catch(PathIsNotDirectoryException expected) {
// expected
} }
} }
@ -345,13 +365,10 @@ public class TestINodeFile {
} }
/** /**
* Verify root always has inode id 1001 and new formated fsimage has last * This test verifies inode ID counter and inode map functionality.
* allocated inode id 1000. Validate correct lastInodeId is persisted.
* @throws IOException
*/ */
@Test @Test
public void testInodeId() throws IOException { public void testInodeId() throws IOException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT); DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT);
@ -361,55 +378,83 @@ public class TestINodeFile {
cluster.waitActive(); cluster.waitActive();
FSNamesystem fsn = cluster.getNamesystem(); FSNamesystem fsn = cluster.getNamesystem();
assertTrue(fsn.getLastInodeId() == 1001); long lastId = fsn.getLastInodeId();
// Create one directory and the last inode id should increase to 1002 // Ensure root has the correct inode ID
// Last inode ID should be root inode ID and inode map size should be 1
int inodeCount = 1;
long expectedLastInodeId = INodeId.ROOT_INODE_ID;
assertEquals(fsn.dir.rootDir.getId(), INodeId.ROOT_INODE_ID);
assertEquals(expectedLastInodeId, lastId);
assertEquals(inodeCount, fsn.dir.getInodeMapSize());
// Create a directory
// Last inode ID and inode map size should increase by 1
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();
Path path = new Path("/test1"); Path path = new Path("/test1");
assertTrue(fs.mkdirs(path)); assertTrue(fs.mkdirs(path));
assertTrue(fsn.getLastInodeId() == 1002); assertEquals(++expectedLastInodeId, fsn.getLastInodeId());
assertEquals(++inodeCount, fsn.dir.getInodeMapSize());
int fileLen = 1024; // Create a file
Path filePath = new Path("/test1/file"); // Last inode ID and inode map size should increase by 1
DFSTestUtil.createFile(fs, filePath, fileLen, (short) 1, 0); NamenodeProtocols nnrpc = cluster.getNameNodeRpc();
assertTrue(fsn.getLastInodeId() == 1003); DFSTestUtil.createFile(fs, new Path("/test1/file"), 1024, (short) 1, 0);
assertEquals(++expectedLastInodeId, fsn.getLastInodeId());
assertEquals(++inodeCount, fsn.dir.getInodeMapSize());
// Rename doesn't increase inode id // Ensure right inode ID is returned in file status
HdfsFileStatus fileStatus = nnrpc.getFileInfo("/test1/file");
assertEquals(expectedLastInodeId, fileStatus.getFileId());
// Rename a directory
// Last inode ID and inode map size should not change
Path renamedPath = new Path("/test2"); Path renamedPath = new Path("/test2");
fs.rename(path, renamedPath); assertTrue(fs.rename(path, renamedPath));
assertTrue(fsn.getLastInodeId() == 1003); assertEquals(expectedLastInodeId, fsn.getLastInodeId());
assertEquals(inodeCount, fsn.dir.getInodeMapSize());
// Delete test2/file and test2 and ensure inode map size decreases
assertTrue(fs.delete(renamedPath, true));
inodeCount -= 2;
assertEquals(inodeCount, fsn.dir.getInodeMapSize());
cluster.restartNameNode();
cluster.waitActive();
// Make sure empty editlog can be handled // Make sure empty editlog can be handled
cluster.restartNameNode(); cluster.restartNameNode();
cluster.waitActive(); cluster.waitActive();
fsn = cluster.getNamesystem(); fsn = cluster.getNamesystem();
assertTrue(fsn.getLastInodeId() == 1003); assertEquals(expectedLastInodeId, fsn.getLastInodeId());
assertEquals(inodeCount, fsn.dir.getInodeMapSize());
DFSTestUtil.createFile(fs, new Path("/test2/file2"), fileLen, (short) 1, // Create two inodes test2 and test2/file2
0); DFSTestUtil.createFile(fs, new Path("/test2/file2"), 1024, (short) 1, 0);
long id = fsn.getLastInodeId(); expectedLastInodeId += 2;
assertTrue(id == 1004); inodeCount += 2;
fs.delete(new Path("/test2"), true); assertEquals(expectedLastInodeId, fsn.getLastInodeId());
// create a file under construction assertEquals(inodeCount, fsn.dir.getInodeMapSize());
// create /test3, and /test3/file.
// /test3/file is a file under construction
FSDataOutputStream outStream = fs.create(new Path("/test3/file")); FSDataOutputStream outStream = fs.create(new Path("/test3/file"));
assertTrue(outStream != null); assertTrue(outStream != null);
assertTrue(fsn.getLastInodeId() == 1006); expectedLastInodeId += 2;
inodeCount += 2;
assertEquals(expectedLastInodeId, fsn.getLastInodeId());
assertEquals(inodeCount, fsn.dir.getInodeMapSize());
// Apply editlogs to fsimage, test fsimage with inodeUnderConstruction can // Apply editlogs to fsimage, ensure inodeUnderConstruction is handled
// be handled
fsn.enterSafeMode(false); fsn.enterSafeMode(false);
fsn.saveNamespace(); fsn.saveNamespace();
fsn.leaveSafeMode(); fsn.leaveSafeMode();
outStream.close(); outStream.close();
// The lastInodeId in fsimage should remain 1006 after reboot // The lastInodeId in fsimage should remain the same after reboot
cluster.restartNameNode(); cluster.restartNameNode();
cluster.waitActive(); cluster.waitActive();
fsn = cluster.getNamesystem(); fsn = cluster.getNamesystem();
assertTrue(fsn.getLastInodeId() == 1006); assertEquals(expectedLastInodeId, fsn.getLastInodeId());
assertEquals(inodeCount, fsn.dir.getInodeMapSize());
} finally { } finally {
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
@ -419,7 +464,6 @@ public class TestINodeFile {
@Test @Test
public void testWriteToRenamedFile() throws IOException { public void testWriteToRenamedFile() throws IOException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.build(); .build();
@ -450,10 +494,368 @@ public class TestINodeFile {
fail("Write should fail after rename"); fail("Write should fail after rename");
} catch (Exception e) { } catch (Exception e) {
/* Ignore */ /* Ignore */
} finally {
cluster.shutdown();
}
}
private Path getInodePath(long inodeId, String remainingPath) {
StringBuilder b = new StringBuilder();
b.append(Path.SEPARATOR).append(FSDirectory.DOT_RESERVED_STRING)
.append(Path.SEPARATOR).append(FSDirectory.DOT_INODES_STRING)
.append(Path.SEPARATOR).append(inodeId).append(Path.SEPARATOR)
.append(remainingPath);
Path p = new Path(b.toString());
LOG.info("Inode path is " + p);
return p;
}
/**
* Tests for addressing files using /.reserved/.inodes/<inodeID> in file system
* operations.
*/
@Test
public void testInodeIdBasedPaths() throws Exception {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT);
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
NamenodeProtocols nnRpc = cluster.getNameNodeRpc();
// FileSystem#mkdirs "/testInodeIdBasedPaths"
Path baseDir = getInodePath(INodeId.ROOT_INODE_ID, "testInodeIdBasedPaths");
Path baseDirRegPath = new Path("/testInodeIdBasedPaths");
fs.mkdirs(baseDir);
fs.exists(baseDir);
long baseDirFileId = nnRpc.getFileInfo(baseDir.toString()).getFileId();
// FileSystem#create file and FileSystem#close
Path testFileInodePath = getInodePath(baseDirFileId, "test1");
Path testFileRegularPath = new Path(baseDir, "test1");
final int testFileBlockSize = 1024;
FileSystemTestHelper.createFile(fs, testFileInodePath, 1, testFileBlockSize);
assertTrue(fs.exists(testFileInodePath));
// FileSystem#setPermission
FsPermission perm = new FsPermission((short)0666);
fs.setPermission(testFileInodePath, perm);
// FileSystem#getFileStatus and FileSystem#getPermission
FileStatus fileStatus = fs.getFileStatus(testFileInodePath);
assertEquals(perm, fileStatus.getPermission());
// FileSystem#setOwner
fs.setOwner(testFileInodePath, fileStatus.getOwner(), fileStatus.getGroup());
// FileSystem#setTimes
fs.setTimes(testFileInodePath, 0, 0);
fileStatus = fs.getFileStatus(testFileInodePath);
assertEquals(0, fileStatus.getModificationTime());
assertEquals(0, fileStatus.getAccessTime());
// FileSystem#setReplication
fs.setReplication(testFileInodePath, (short)3);
fileStatus = fs.getFileStatus(testFileInodePath);
assertEquals(3, fileStatus.getReplication());
fs.setReplication(testFileInodePath, (short)1);
// ClientProtocol#getPreferredBlockSize
assertEquals(testFileBlockSize,
nnRpc.getPreferredBlockSize(testFileInodePath.toString()));
// symbolic link related tests
// Reserved path is not allowed as a target
String invalidTarget = new Path(baseDir, "invalidTarget").toString();
String link = new Path(baseDir, "link").toString();
testInvalidSymlinkTarget(nnRpc, invalidTarget, link);
// Test creating a link using reserved inode path
String validTarget = "/validtarget";
testValidSymlinkTarget(nnRpc, validTarget, link);
// FileSystem#append
fs.append(testFileInodePath);
// DistributedFileSystem#recoverLease
fs.recoverLease(testFileInodePath);
// Namenode#getBlockLocations
LocatedBlocks l1 = nnRpc.getBlockLocations(testFileInodePath.toString(),
0, Long.MAX_VALUE);
LocatedBlocks l2 = nnRpc.getBlockLocations(testFileRegularPath.toString(),
0, Long.MAX_VALUE);
checkEquals(l1, l2);
// FileSystem#rename - both the variants
Path renameDst = getInodePath(baseDirFileId, "test2");
fileStatus = fs.getFileStatus(testFileInodePath);
// Rename variant 1: rename and rename bacck
fs.rename(testFileInodePath, renameDst);
fs.rename(renameDst, testFileInodePath);
assertEquals(fileStatus, fs.getFileStatus(testFileInodePath));
// Rename variant 2: rename and rename bacck
fs.rename(testFileInodePath, renameDst, Rename.OVERWRITE);
fs.rename(renameDst, testFileInodePath, Rename.OVERWRITE);
assertEquals(fileStatus, fs.getFileStatus(testFileInodePath));
// FileSystem#getContentSummary
assertEquals(fs.getContentSummary(testFileRegularPath).toString(),
fs.getContentSummary(testFileInodePath).toString());
// FileSystem#listFiles
checkEquals(fs.listFiles(baseDirRegPath, false),
fs.listFiles(baseDir, false));
// FileSystem#delete
fs.delete(testFileInodePath, true);
assertFalse(fs.exists(testFileInodePath));
} finally { } finally {
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
} }
} }
} }
private void testInvalidSymlinkTarget(NamenodeProtocols nnRpc,
String invalidTarget, String link) throws IOException {
try {
FsPermission perm = FsPermission.createImmutable((short)0755);
nnRpc.createSymlink(invalidTarget, link, perm, false);
fail("Symbolic link creation of target " + invalidTarget + " should fail");
} catch (InvalidPathException expected) {
// Expected
}
}
private void testValidSymlinkTarget(NamenodeProtocols nnRpc, String target,
String link) throws IOException {
FsPermission perm = FsPermission.createImmutable((short)0755);
nnRpc.createSymlink(target, link, perm, false);
assertEquals(target, nnRpc.getLinkTarget(link));
}
private static void checkEquals(LocatedBlocks l1, LocatedBlocks l2) {
List<LocatedBlock> list1 = l1.getLocatedBlocks();
List<LocatedBlock> list2 = l2.getLocatedBlocks();
assertEquals(list1.size(), list2.size());
for (int i = 0; i < list1.size(); i++) {
LocatedBlock b1 = list1.get(i);
LocatedBlock b2 = list2.get(i);
assertEquals(b1.getBlock(), b2.getBlock());
assertEquals(b1.getBlockSize(), b2.getBlockSize());
}
}
private static void checkEquals(RemoteIterator<LocatedFileStatus> i1,
RemoteIterator<LocatedFileStatus> i2) throws IOException {
while (i1.hasNext()) {
assertTrue(i2.hasNext());
// Compare all the fields but the path name, which is relative
// to the original path from listFiles.
LocatedFileStatus l1 = i1.next();
LocatedFileStatus l2 = i2.next();
assertEquals(l1.getAccessTime(), l2.getAccessTime());
assertEquals(l1.getBlockSize(), l2.getBlockSize());
assertEquals(l1.getGroup(), l2.getGroup());
assertEquals(l1.getLen(), l2.getLen());
assertEquals(l1.getModificationTime(), l2.getModificationTime());
assertEquals(l1.getOwner(), l2.getOwner());
assertEquals(l1.getPermission(), l2.getPermission());
assertEquals(l1.getReplication(), l2.getReplication());
}
assertFalse(i2.hasNext());
}
/**
* Check /.reserved path is reserved and cannot be created.
*/
@Test
public void testReservedFileNames() throws IOException {
Configuration conf = new Configuration();
MiniDFSCluster cluster = null;
try {
// First start a cluster with reserved file names check turned off
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
// Creation of directory or file with reserved path names is disallowed
ensureReservedFileNamesCannotBeCreated(fs, "/.reserved", false);
ensureReservedFileNamesCannotBeCreated(fs, "/.reserved", false);
Path reservedPath = new Path("/.reserved");
// Loading of fsimage or editlog with /.reserved directory should fail
// Mkdir "/.reserved reserved path with reserved path check turned off
FSDirectory.CHECK_RESERVED_FILE_NAMES = false;
fs.mkdirs(reservedPath);
assertTrue(fs.isDirectory(reservedPath));
ensureReservedFileNamesCannotBeLoaded(cluster);
// Loading of fsimage or editlog with /.reserved file should fail
// Create file "/.reserved reserved path with reserved path check turned off
FSDirectory.CHECK_RESERVED_FILE_NAMES = false;
ensureClusterRestartSucceeds(cluster);
fs.delete(reservedPath, true);
DFSTestUtil.createFile(fs, reservedPath, 10, (short)1, 0L);
assertTrue(!fs.isDirectory(reservedPath));
ensureReservedFileNamesCannotBeLoaded(cluster);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
private void ensureReservedFileNamesCannotBeCreated(FileSystem fs, String name,
boolean isDir) {
// Creation of directory or file with reserved path names is disallowed
Path reservedPath = new Path(name);
try {
if (isDir) {
fs.mkdirs(reservedPath);
} else {
DFSTestUtil.createFile(fs, reservedPath, 10, (short) 1, 0L);
}
fail((isDir ? "mkdir" : "create file") + " should be disallowed");
} catch (Exception expected) {
// ignored
}
}
private void ensureReservedFileNamesCannotBeLoaded(MiniDFSCluster cluster)
throws IOException {
// Turn on reserved file name checking. Loading of edits should fail
FSDirectory.CHECK_RESERVED_FILE_NAMES = true;
ensureClusterRestartFails(cluster);
// Turn off reserved file name checking and successfully load edits
FSDirectory.CHECK_RESERVED_FILE_NAMES = false;
ensureClusterRestartSucceeds(cluster);
// Turn on reserved file name checking. Loading of fsimage should fail
FSDirectory.CHECK_RESERVED_FILE_NAMES = true;
ensureClusterRestartFails(cluster);
}
private void ensureClusterRestartFails(MiniDFSCluster cluster) {
try {
cluster.restartNameNode();
fail("Cluster should not have successfully started");
} catch (Exception expected) {
LOG.info("Expected exception thrown " + expected);
}
assertFalse(cluster.isClusterUp());
}
private void ensureClusterRestartSucceeds(MiniDFSCluster cluster)
throws IOException {
cluster.restartNameNode();
cluster.waitActive();
assertTrue(cluster.isClusterUp());
}
/**
* For a given path, build a tree of INodes and return the leaf node.
*/
private INode createTreeOfInodes(String path) throws QuotaExceededException {
byte[][] components = INode.getPathComponents(path);
FsPermission perm = FsPermission.createImmutable((short)0755);
PermissionStatus permstatus = PermissionStatus.createImmutable("", "", perm);
long id = 0;
INodeDirectory prev = new INodeDirectory(++id, new byte[0], permstatus, 0);
INodeDirectory dir = null;
for (byte[] component : components) {
if (component.length == 0) {
continue;
}
System.out.println("Adding component " + DFSUtil.bytes2String(component));
dir = new INodeDirectory(++id, component, permstatus, 0);
prev.addChild(dir, false, null);
prev = dir;
}
return dir; // Last Inode in the chain
}
private static void checkEquals(byte[][] expected, byte[][] actual) {
assertEquals(expected.length, actual.length);
int i = 0;
for (byte[] e : expected) {
assertTrue(Arrays.equals(e, actual[i++]));
}
}
/**
* Test for {@link FSDirectory#getPathComponents(INode)}
*/
@Test
public void testGetPathFromInode() throws QuotaExceededException {
String path = "/a/b/c";
INode inode = createTreeOfInodes(path);
byte[][] expected = INode.getPathComponents(path);
byte[][] actual = FSDirectory.getPathComponents(inode);
checkEquals(expected, actual);
}
/**
* Tests for {@link FSDirectory#resolvePath(String, byte[][], FSDirectory)}
*/
@Test
public void testInodePath() throws IOException {
// For a non .inodes path the regular components are returned
String path = "/a/b/c";
INode inode = createTreeOfInodes(path);
// For an any inode look up return inode corresponding to "c" from /a/b/c
FSDirectory fsd = Mockito.mock(FSDirectory.class);
Mockito.doReturn(inode).when(fsd).getInode(Mockito.anyLong());
// Null components
assertEquals("/test", FSDirectory.resolvePath("/test", null, fsd));
// Tests for FSDirectory#resolvePath()
// Non inode regular path
byte[][] components = INode.getPathComponents(path);
String resolvedPath = FSDirectory.resolvePath(path, components, fsd);
assertEquals(path, resolvedPath);
// Inode path with no trailing separator
components = INode.getPathComponents("/.reserved/.inodes/1");
resolvedPath = FSDirectory.resolvePath(path, components, fsd);
assertEquals(path, resolvedPath);
// Inode path with trailing separator
components = INode.getPathComponents("/.reserved/.inodes/1/");
assertEquals(path, resolvedPath);
// Inode relative path
components = INode.getPathComponents("/.reserved/.inodes/1/d/e/f");
resolvedPath = FSDirectory.resolvePath(path, components, fsd);
assertEquals("/a/b/c/d/e/f", resolvedPath);
// A path with just .inodes returns the path as is
String testPath = "/.reserved/.inodes";
components = INode.getPathComponents(testPath);
resolvedPath = FSDirectory.resolvePath(testPath, components, fsd);
assertEquals(testPath, resolvedPath);
// Root inode path
testPath = "/.reserved/.inodes/" + INodeId.ROOT_INODE_ID;
components = INode.getPathComponents(testPath);
resolvedPath = FSDirectory.resolvePath(testPath, components, fsd);
assertEquals("/", resolvedPath);
// An invalid inode path should remain unresolved
testPath = "/.invalid/.inodes/1";
components = INode.getPathComponents(testPath);
resolvedPath = FSDirectory.resolvePath(testPath, components, fsd);
assertEquals(testPath, resolvedPath);
}
} }

View File

@ -422,6 +422,11 @@ public class TestPipelinesFailover {
// Disable permissions so that another user can recover the lease. // Disable permissions so that another user can recover the lease.
harness.conf.setBoolean( harness.conf.setBoolean(
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false); DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
// This test triggers rapid NN failovers. The client retry policy uses an
// exponential backoff. This can quickly lead to long sleep times and even
// timeout the whole test. Cap the sleep time at 1s to prevent this.
harness.conf.setInt(DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY,
1000);
final MiniDFSCluster cluster = harness.startCluster(); final MiniDFSCluster cluster = harness.startCluster();
try { try {
@ -537,11 +542,10 @@ public class TestPipelinesFailover {
} }
/** /**
* Try to cover the lease on the given file for up to 30 * Try to recover the lease on the given file for up to 60 seconds.
* seconds.
* @param fsOtherUser the filesystem to use for the recoverLease call * @param fsOtherUser the filesystem to use for the recoverLease call
* @param testPath the path on which to run lease recovery * @param testPath the path on which to run lease recovery
* @throws TimeoutException if lease recover does not succeed within 30 * @throws TimeoutException if lease recover does not succeed within 60
* seconds * seconds
* @throws InterruptedException if the thread is interrupted * @throws InterruptedException if the thread is interrupted
*/ */
@ -564,7 +568,7 @@ public class TestPipelinesFailover {
} }
return success; return success;
} }
}, 1000, 30000); }, 1000, 60000);
} catch (TimeoutException e) { } catch (TimeoutException e) {
throw new TimeoutException("Timed out recovering lease for " + throw new TimeoutException("Timed out recovering lease for " +
testPath); testPath);

View File

@ -134,6 +134,10 @@ Trunk (Unreleased)
MAPREDUCE-4885. Streaming tests have multiple failures on Windows. (Chris MAPREDUCE-4885. Streaming tests have multiple failures on Windows. (Chris
Nauroth via bikas) Nauroth via bikas)
MAPREDUCE-4987. TestMRJobs#testDistributedCache fails on Windows due to
classpath problems and unexpected behavior of symlinks (Chris Nauroth via
bikas)
BREAKDOWN OF HADOOP-8562 SUBTASKS BREAKDOWN OF HADOOP-8562 SUBTASKS
MAPREDUCE-4739. Some MapReduce tests fail to find winutils. MAPREDUCE-4739. Some MapReduce tests fail to find winutils.
@ -320,6 +324,9 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-4932. mapreduce.job#getTaskCompletionEvents incompatible with MAPREDUCE-4932. mapreduce.job#getTaskCompletionEvents incompatible with
Hadoop 1. (rkanter via tucu) Hadoop 1. (rkanter via tucu)
MAPREDUCE-5163. Update MR App to not use API utility methods for collections
after YARN-441. (Xuan Gong via vinodkv)
Release 2.0.4-alpha - UNRELEASED Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
@ -882,6 +889,9 @@ Release 0.23.8 - UNRELEASED
MAPREDUCE-5015. Coverage fix for org.apache.hadoop.mapreduce.tools.CLI MAPREDUCE-5015. Coverage fix for org.apache.hadoop.mapreduce.tools.CLI
(Aleksey Gorshkov via tgraves) (Aleksey Gorshkov via tgraves)
MAPREDUCE-5147. Maven build should create
hadoop-mapreduce-client-app-VERSION.jar directly (Robert Parker via tgraves)
Release 0.23.7 - UNRELEASED Release 0.23.7 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -76,8 +76,6 @@
</dependencies> </dependencies>
<build> <build>
<!-- local name for links -->
<finalName>mr-app</finalName>
<plugins> <plugins>
<plugin> <plugin>
<artifactId>maven-jar-plugin</artifactId> <artifactId>maven-jar-plugin</artifactId>
@ -90,26 +88,6 @@
</execution> </execution>
</executions> </executions>
</plugin> </plugin>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>create-mr-app-symlinks</id>
<phase>package</phase>
<configuration>
<target>
<symlink link="${applink.base}.jar"
resource="mr-app.jar" failonerror="false"/>
<symlink link="${applink.base}-3.0.0-SNAPSHOT.jar"
resource="mr-app.jar" failonerror="false"/>
</target>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins> </plugins>
</build> </build>

View File

@ -157,8 +157,9 @@ public class ContainerLauncherImpl extends AbstractService implements
startRequest.setContainer(event.getAllocatedContainer()); startRequest.setContainer(event.getAllocatedContainer());
StartContainerResponse response = proxy.startContainer(startRequest); StartContainerResponse response = proxy.startContainer(startRequest);
ByteBuffer portInfo = response ByteBuffer portInfo =
.getServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID); response.getAllServiceResponse().get(
ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
int port = -1; int port = -1;
if(portInfo != null) { if(portInfo != null) {
port = ShuffleHandler.deserializeMetaData(portInfo); port = ShuffleHandler.deserializeMetaData(portInfo);

View File

@ -26,7 +26,11 @@ import static org.mockito.Mockito.when;
import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeast;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
@ -58,6 +62,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class TestContainerLauncherImpl { public class TestContainerLauncherImpl {
@ -65,6 +70,15 @@ public class TestContainerLauncherImpl {
private static final RecordFactory recordFactory = private static final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null); RecordFactoryProvider.getRecordFactory(null);
private Map<String, ByteBuffer> serviceResponse =
new HashMap<String, ByteBuffer>();
@Before
public void setup() throws IOException {
serviceResponse.clear();
serviceResponse.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
ShuffleHandler.serializeMetaData(80));
}
private static class ContainerLauncherImplUnderTest extends private static class ContainerLauncherImplUnderTest extends
ContainerLauncherImpl { ContainerLauncherImpl {
@ -145,8 +159,7 @@ public class TestContainerLauncherImpl {
String cmAddress = "127.0.0.1:8000"; String cmAddress = "127.0.0.1:8000";
StartContainerResponse startResp = StartContainerResponse startResp =
recordFactory.newRecordInstance(StartContainerResponse.class); recordFactory.newRecordInstance(StartContainerResponse.class);
startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, startResp.setAllServiceResponse(serviceResponse);
ShuffleHandler.serializeMetaData(80));
LOG.info("inserting launch event"); LOG.info("inserting launch event");
@ -210,8 +223,7 @@ public class TestContainerLauncherImpl {
String cmAddress = "127.0.0.1:8000"; String cmAddress = "127.0.0.1:8000";
StartContainerResponse startResp = StartContainerResponse startResp =
recordFactory.newRecordInstance(StartContainerResponse.class); recordFactory.newRecordInstance(StartContainerResponse.class);
startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, startResp.setAllServiceResponse(serviceResponse);
ShuffleHandler.serializeMetaData(80));
LOG.info("inserting cleanup event"); LOG.info("inserting cleanup event");
ContainerLauncherEvent mockCleanupEvent = ContainerLauncherEvent mockCleanupEvent =
@ -275,8 +287,7 @@ public class TestContainerLauncherImpl {
String cmAddress = "127.0.0.1:8000"; String cmAddress = "127.0.0.1:8000";
StartContainerResponse startResp = StartContainerResponse startResp =
recordFactory.newRecordInstance(StartContainerResponse.class); recordFactory.newRecordInstance(StartContainerResponse.class);
startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, startResp.setAllServiceResponse(serviceResponse);
ShuffleHandler.serializeMetaData(80));
LOG.info("inserting launch event"); LOG.info("inserting launch event");
ContainerRemoteLaunchEvent mockLaunchEvent = ContainerRemoteLaunchEvent mockLaunchEvent =
@ -333,8 +344,7 @@ public class TestContainerLauncherImpl {
String cmAddress = "127.0.0.1:8000"; String cmAddress = "127.0.0.1:8000";
StartContainerResponse startResp = StartContainerResponse startResp =
recordFactory.newRecordInstance(StartContainerResponse.class); recordFactory.newRecordInstance(StartContainerResponse.class);
startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, startResp.setAllServiceResponse(serviceResponse);
ShuffleHandler.serializeMetaData(80));
LOG.info("inserting launch event"); LOG.info("inserting launch event");

View File

@ -18,11 +18,13 @@
package org.apache.hadoop.mapreduce.v2; package org.apache.hadoop.mapreduce.v2;
import java.io.BufferedReader;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.StringReader;
import java.net.URI; import java.net.URI;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.HashMap; import java.util.HashMap;
@ -47,6 +49,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@ -71,6 +74,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@ -93,13 +97,6 @@ public class TestMRJobs {
} catch (IOException io) { } catch (IOException io) {
throw new RuntimeException("problem getting local fs", io); throw new RuntimeException("problem getting local fs", io);
} }
try {
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
.format(true).racks(null).build();
remoteFs = dfsCluster.getFileSystem();
} catch (IOException io) {
throw new RuntimeException("problem starting mini dfs cluster", io);
}
} }
private static Path TEST_ROOT_DIR = new Path("target", private static Path TEST_ROOT_DIR = new Path("target",
@ -110,6 +107,13 @@ public class TestMRJobs {
@BeforeClass @BeforeClass
public static void setup() throws IOException { public static void setup() throws IOException {
try {
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
.format(true).racks(null).build();
remoteFs = dfsCluster.getFileSystem();
} catch (IOException io) {
throw new RuntimeException("problem starting mini dfs cluster", io);
}
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
@ -215,7 +219,7 @@ public class TestMRJobs {
} }
} }
@Test (timeout = 30000) @Test (timeout = 60000)
public void testRandomWriter() throws IOException, InterruptedException, public void testRandomWriter() throws IOException, InterruptedException,
ClassNotFoundException { ClassNotFoundException {
@ -277,7 +281,7 @@ public class TestMRJobs {
&& counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0); && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
} }
@Test (timeout = 30000) @Test (timeout = 60000)
public void testFailingMapper() throws IOException, InterruptedException, public void testFailingMapper() throws IOException, InterruptedException,
ClassNotFoundException { ClassNotFoundException {
@ -359,7 +363,7 @@ public class TestMRJobs {
return job; return job;
} }
//@Test (timeout = 30000) //@Test (timeout = 60000)
public void testSleepJobWithSecurityOn() throws IOException, public void testSleepJobWithSecurityOn() throws IOException,
InterruptedException, ClassNotFoundException { InterruptedException, ClassNotFoundException {
@ -467,9 +471,47 @@ public class TestMRJobs {
// Check that the symlink for the Job Jar was created in the cwd and // Check that the symlink for the Job Jar was created in the cwd and
// points to the extracted directory // points to the extracted directory
File jobJarDir = new File("job.jar"); File jobJarDir = new File("job.jar");
if (Shell.WINDOWS) {
Assert.assertTrue(isWindowsSymlinkedDirectory(jobJarDir));
} else {
Assert.assertTrue(FileUtils.isSymlink(jobJarDir)); Assert.assertTrue(FileUtils.isSymlink(jobJarDir));
Assert.assertTrue(jobJarDir.isDirectory()); Assert.assertTrue(jobJarDir.isDirectory());
} }
}
/**
* Used on Windows to determine if the specified file is a symlink that
* targets a directory. On most platforms, these checks can be done using
* commons-io. On Windows, the commons-io implementation is unreliable and
* always returns false. Instead, this method checks the output of the dir
* command. After migrating to Java 7, this method can be removed in favor
* of the new method java.nio.file.Files.isSymbolicLink, which is expected to
* work cross-platform.
*
* @param file File to check
* @return boolean true if the file is a symlink that targets a directory
* @throws IOException thrown for any I/O error
*/
private static boolean isWindowsSymlinkedDirectory(File file)
throws IOException {
String dirOut = Shell.execCommand("cmd", "/c", "dir",
file.getAbsoluteFile().getParent());
StringReader sr = new StringReader(dirOut);
BufferedReader br = new BufferedReader(sr);
try {
String line = br.readLine();
while (line != null) {
line = br.readLine();
if (line.contains(file.getName()) && line.contains("<SYMLINKD>")) {
return true;
}
}
return false;
} finally {
IOUtils.closeStream(br);
IOUtils.closeStream(sr);
}
}
/** /**
* Returns a mapping of the final component of each path to the corresponding * Returns a mapping of the final component of each path to the corresponding
@ -542,7 +584,7 @@ public class TestMRJobs {
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/")); trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
} }
@Test (timeout = 300000) @Test (timeout = 600000)
public void testDistributedCache() throws Exception { public void testDistributedCache() throws Exception {
// Test with a local (file:///) Job Jar // Test with a local (file:///) Job Jar
Path localJobJarPath = makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString()); Path localJobJarPath = makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString());

View File

@ -34,6 +34,13 @@ Trunk - Unreleased
YARN-487. Modify path manipulation in LocalDirsHandlerService to let YARN-487. Modify path manipulation in LocalDirsHandlerService to let
TestDiskFailures pass on Windows. (Chris Nauroth via vinodkv) TestDiskFailures pass on Windows. (Chris Nauroth via vinodkv)
YARN-493. Fixed some shell related flaws in YARN on Windows. (Chris Nauroth
via vinodkv)
YARN-593. container launch on Windows does not correctly populate
classpath with new process's environment variables and localized resources
(Chris Nauroth via bikas)
BREAKDOWN OF HADOOP-8562 SUBTASKS BREAKDOWN OF HADOOP-8562 SUBTASKS
YARN-158. Yarn creating package-info.java must not depend on sh. YARN-158. Yarn creating package-info.java must not depend on sh.
@ -85,6 +92,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-444. Moved special container exit codes from YarnConfiguration to API YARN-444. Moved special container exit codes from YarnConfiguration to API
where they belong. (Sandy Ryza via vinodkv) where they belong. (Sandy Ryza via vinodkv)
YARN-441. Removed unused utility methods for collections from two API
records. (Xuan Gong via vinodkv)
NEW FEATURES NEW FEATURES
YARN-482. FS: Extend SchedulingMode to intermediate queues. YARN-482. FS: Extend SchedulingMode to intermediate queues.

View File

@ -20,10 +20,8 @@ package org.apache.hadoop.yarn.api.protocolrecords;
import java.util.List; import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.AMRMProtocol; import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
@ -120,36 +118,16 @@ public interface AllocateRequest {
@Stable @Stable
List<ResourceRequest> getAskList(); List<ResourceRequest> getAskList();
@Private
@Unstable
ResourceRequest getAsk(int index);
@Private
@Unstable
int getAskCount();
/** /**
* Add list of <code>ResourceRequest</code> to update the * Set list of <code>ResourceRequest</code> to update the
* <code>ResourceManager</code> about the application's resource requirements. * <code>ResourceManager</code> about the application's resource requirements.
* @param resourceRequest list of <code>ResourceRequest</code> to update the * @param resourceRequests list of <code>ResourceRequest</code> to update the
* <code>ResourceManager</code> about the application's * <code>ResourceManager</code> about the application's
* resource requirements * resource requirements
*/ */
@Public @Public
@Stable @Stable
void addAllAsks(List<ResourceRequest> resourceRequest); void setAskList(List<ResourceRequest> resourceRequests);
@Private
@Unstable
void addAsk(ResourceRequest request);
@Private
@Unstable
void removeAsk(int index);
@Private
@Unstable
void clearAsks();
/** /**
* Get the list of <code>ContainerId</code> of containers being * Get the list of <code>ContainerId</code> of containers being
@ -161,16 +139,8 @@ public interface AllocateRequest {
@Stable @Stable
List<ContainerId> getReleaseList(); List<ContainerId> getReleaseList();
@Private
@Unstable
ContainerId getRelease(int index);
@Private
@Unstable
int getReleaseCount();
/** /**
* Add the list of <code>ContainerId</code> of containers being * Set the list of <code>ContainerId</code> of containers being
* released by the <code>ApplicationMaster</code> * released by the <code>ApplicationMaster</code>
* @param releaseContainers list of <code>ContainerId</code> of * @param releaseContainers list of <code>ContainerId</code> of
* containers being released by the < * containers being released by the <
@ -178,17 +148,5 @@ public interface AllocateRequest {
*/ */
@Public @Public
@Stable @Stable
void addAllReleases(List<ContainerId> releaseContainers); void setReleaseList(List<ContainerId> releaseContainers);
@Private
@Unstable
void addRelease(ContainerId container);
@Private
@Unstable
void removeRelease(int index);
@Private
@Unstable
void clearReleases();
} }

View File

@ -45,43 +45,11 @@ public interface StartContainerResponse {
Map<String, ByteBuffer> getAllServiceResponse(); Map<String, ByteBuffer> getAllServiceResponse();
/** /**
* Get the response from a single auxiliary service running on the * Set to the list of auxiliary services which have been started on the
* <code>NodeManager</code>
*
* @param key The auxiliary service name whose response is desired.
* @return The opaque blob <code>ByteBuffer</code> returned by the auxiliary
* service.
*/
ByteBuffer getServiceResponse(String key);
/**
* Add to the list of auxiliary services which have been started on the
* <code>NodeManager</code>. This is done only once when the * <code>NodeManager</code>. This is done only once when the
* <code>NodeManager</code> starts up * <code>NodeManager</code> starts up
* @param serviceResponse A map from auxiliary service names to the opaque * @param serviceResponses A map from auxiliary service names to the opaque
* blob <code>ByteBuffer</code>s for that auxiliary service * blob <code>ByteBuffer</code>s for that auxiliary service
*/ */
void addAllServiceResponse(Map<String, ByteBuffer> serviceResponse); void setAllServiceResponse(Map<String, ByteBuffer> serviceResponses);
/**
* Add to the list of auxiliary services which have been started on the
* <code>NodeManager</code>. This is done only once when the
* <code>NodeManager</code> starts up
*
* @param key The auxiliary service name
* @param value The opaque blob <code>ByteBuffer</code> managed by the
* auxiliary service
*/
void setServiceResponse(String key, ByteBuffer value);
/**
* Remove a single auxiliary service from the StartContainerResponse object
* @param key The auxiliary service to remove
*/
void removeServiceResponse(String key);
/**
* Remove all the auxiliary services from the StartContainerResponse object
*/
void clearServiceResponse();
} }

View File

@ -25,7 +25,6 @@ import java.util.List;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ProtoBase; import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -144,14 +143,13 @@ public class AllocateRequestPBImpl extends ProtoBase<AllocateRequestProto> imple
return this.ask; return this.ask;
} }
@Override @Override
public ResourceRequest getAsk(int index) { public void setAskList(final List<ResourceRequest> resourceRequests) {
initAsks(); if(resourceRequests == null) {
return this.ask.get(index); return;
} }
@Override
public int getAskCount() {
initAsks(); initAsks();
return this.ask.size(); this.ask.clear();
this.ask.addAll(resourceRequests);
} }
private void initAsks() { private void initAsks() {
@ -167,14 +165,6 @@ public class AllocateRequestPBImpl extends ProtoBase<AllocateRequestProto> imple
} }
} }
@Override
public void addAllAsks(final List<ResourceRequest> ask) {
if (ask == null)
return;
initAsks();
this.ask.addAll(ask);
}
private void addAsksToProto() { private void addAsksToProto() {
maybeInitBuilder(); maybeInitBuilder();
builder.clearAsk(); builder.clearAsk();
@ -209,34 +199,18 @@ public class AllocateRequestPBImpl extends ProtoBase<AllocateRequestProto> imple
builder.addAllAsk(iterable); builder.addAllAsk(iterable);
} }
@Override @Override
public void addAsk(ResourceRequest ask) {
initAsks();
this.ask.add(ask);
}
@Override
public void removeAsk(int index) {
initAsks();
this.ask.remove(index);
}
@Override
public void clearAsks() {
initAsks();
this.ask.clear();
}
@Override
public List<ContainerId> getReleaseList() { public List<ContainerId> getReleaseList() {
initReleases(); initReleases();
return this.release; return this.release;
} }
@Override @Override
public ContainerId getRelease(int index) { public void setReleaseList(List<ContainerId> releaseContainers) {
initReleases(); if(releaseContainers == null) {
return this.release.get(index); return;
} }
@Override
public int getReleaseCount() {
initReleases(); initReleases();
return this.release.size(); this.release.clear();
this.release.addAll(releaseContainers);
} }
private void initReleases() { private void initReleases() {
@ -252,14 +226,6 @@ public class AllocateRequestPBImpl extends ProtoBase<AllocateRequestProto> imple
} }
} }
@Override
public void addAllReleases(final List<ContainerId> release) {
if (release == null)
return;
initReleases();
this.release.addAll(release);
}
private void addReleasesToProto() { private void addReleasesToProto() {
maybeInitBuilder(); maybeInitBuilder();
builder.clearRelease(); builder.clearRelease();
@ -293,21 +259,6 @@ public class AllocateRequestPBImpl extends ProtoBase<AllocateRequestProto> imple
}; };
builder.addAllRelease(iterable); builder.addAllRelease(iterable);
} }
@Override
public void addRelease(ContainerId release) {
initReleases();
this.release.add(release);
}
@Override
public void removeRelease(int index) {
initReleases();
this.release.remove(index);
}
@Override
public void clearReleases() {
initReleases();
this.release.clear();
}
private ApplicationAttemptIdPBImpl convertFromProtoFormat(ApplicationAttemptIdProto p) { private ApplicationAttemptIdPBImpl convertFromProtoFormat(ApplicationAttemptIdProto p) {
return new ApplicationAttemptIdPBImpl(p); return new ApplicationAttemptIdPBImpl(p);

View File

@ -84,9 +84,14 @@ public class StartContainerResponsePBImpl extends ProtoBase<StartContainerRespon
return this.serviceResponse; return this.serviceResponse;
} }
@Override @Override
public synchronized ByteBuffer getServiceResponse(String key) { public synchronized void setAllServiceResponse(
Map<String, ByteBuffer> serviceResponses) {
if(serviceResponses == null) {
return;
}
initServiceResponse(); initServiceResponse();
return this.serviceResponse.get(key); this.serviceResponse.clear();
this.serviceResponse.putAll(serviceResponses);
} }
private synchronized void initServiceResponse() { private synchronized void initServiceResponse() {
@ -102,14 +107,6 @@ public class StartContainerResponsePBImpl extends ProtoBase<StartContainerRespon
} }
} }
@Override
public synchronized void addAllServiceResponse(final Map<String, ByteBuffer> serviceResponse) {
if (serviceResponse == null)
return;
initServiceResponse();
this.serviceResponse.putAll(serviceResponse);
}
private synchronized void addServiceResponseToProto() { private synchronized void addServiceResponseToProto() {
maybeInitBuilder(); maybeInitBuilder();
builder.clearServiceResponse(); builder.clearServiceResponse();
@ -143,19 +140,4 @@ public class StartContainerResponsePBImpl extends ProtoBase<StartContainerRespon
}; };
builder.addAllServiceResponse(iterable); builder.addAllServiceResponse(iterable);
} }
@Override
public synchronized void setServiceResponse(String key, ByteBuffer val) {
initServiceResponse();
this.serviceResponse.put(key, val);
}
@Override
public synchronized void removeServiceResponse(String key) {
initServiceResponse();
this.serviceResponse.remove(key);
}
@Override
public synchronized void clearServiceResponse() {
initServiceResponse();
this.serviceResponse.clear();
}
} }

View File

@ -393,8 +393,8 @@ public class BuilderUtils {
allocateRequest.setApplicationAttemptId(applicationAttemptId); allocateRequest.setApplicationAttemptId(applicationAttemptId);
allocateRequest.setResponseId(responseID); allocateRequest.setResponseId(responseID);
allocateRequest.setProgress(appProgress); allocateRequest.setProgress(appProgress);
allocateRequest.addAllAsks(resourceAsk); allocateRequest.setAskList(resourceAsk);
allocateRequest.addAllReleases(containersToBeReleased); allocateRequest.setReleaseList(containersToBeReleased);
return allocateRequest; return allocateRequest;
} }

View File

@ -222,19 +222,6 @@ public abstract class ContainerExecutor implements Configurable {
} }
/** Return a command for determining if process with specified pid is alive. */
protected static String[] getCheckProcessIsAliveCommand(String pid) {
return Shell.WINDOWS ?
new String[] { Shell.WINUTILS, "task", "isAlive", pid } :
new String[] { "kill", "-0", pid };
}
/** Return a command to send a signal to a given pid */
protected static String[] getSignalKillCommand(int code, String pid) {
return Shell.WINDOWS ? new String[] { Shell.WINUTILS, "task", "kill", pid } :
new String[] { "kill", "-" + code, pid };
}
/** /**
* Is the container still active? * Is the container still active?
* @param containerId * @param containerId
@ -303,26 +290,6 @@ public abstract class ContainerExecutor implements Configurable {
return pid; return pid;
} }
public static final boolean isSetsidAvailable = isSetsidSupported();
private static boolean isSetsidSupported() {
if (Shell.WINDOWS) {
return true;
}
ShellCommandExecutor shexec = null;
boolean setsidSupported = true;
try {
String[] args = {"setsid", "bash", "-c", "echo $$"};
shexec = new ShellCommandExecutor(args);
shexec.execute();
} catch (IOException ioe) {
LOG.warn("setsid is not available on this machine. So not using it.");
setsidSupported = false;
} finally { // handle the exit code
LOG.info("setsid exited with exit code " + shexec.getExitCode());
}
return setsidSupported;
}
public static class DelayedProcessKiller extends Thread { public static class DelayedProcessKiller extends Thread {
private final String user; private final String user;
private final String pid; private final String pid;

View File

@ -50,6 +50,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
public class DefaultContainerExecutor extends ContainerExecutor { public class DefaultContainerExecutor extends ContainerExecutor {
private static final Log LOG = LogFactory private static final Log LOG = LogFactory
@ -237,8 +239,9 @@ public class DefaultContainerExecutor extends ContainerExecutor {
protected abstract void writeLocalWrapperScript(Path launchDst, Path pidFile, protected abstract void writeLocalWrapperScript(Path launchDst, Path pidFile,
PrintStream pout); PrintStream pout);
protected LocalWrapperScriptBuilder(Path wrapperScriptPath) { protected LocalWrapperScriptBuilder(Path containerWorkDir) {
this.wrapperScriptPath = wrapperScriptPath; this.wrapperScriptPath = new Path(containerWorkDir,
Shell.appendScriptExtension("default_container_executor"));
} }
} }
@ -246,7 +249,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
extends LocalWrapperScriptBuilder { extends LocalWrapperScriptBuilder {
public UnixLocalWrapperScriptBuilder(Path containerWorkDir) { public UnixLocalWrapperScriptBuilder(Path containerWorkDir) {
super(new Path(containerWorkDir, "default_container_executor.sh")); super(containerWorkDir);
} }
@Override @Override
@ -260,7 +263,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
pout.println(); pout.println();
pout.println("echo $$ > " + pidFile.toString() + ".tmp"); pout.println("echo $$ > " + pidFile.toString() + ".tmp");
pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile); pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
String exec = ContainerExecutor.isSetsidAvailable? "exec setsid" : "exec"; String exec = Shell.isSetsidAvailable? "exec setsid" : "exec";
pout.println(exec + " /bin/bash -c \"" + pout.println(exec + " /bin/bash -c \"" +
launchDst.toUri().getPath().toString() + "\""); launchDst.toUri().getPath().toString() + "\"");
} }
@ -274,7 +277,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
public WindowsLocalWrapperScriptBuilder(String containerIdStr, public WindowsLocalWrapperScriptBuilder(String containerIdStr,
Path containerWorkDir) { Path containerWorkDir) {
super(new Path(containerWorkDir, "default_container_executor.cmd")); super(containerWorkDir);
this.containerIdStr = containerIdStr; this.containerIdStr = containerIdStr;
} }
@ -297,18 +300,15 @@ public class DefaultContainerExecutor extends ContainerExecutor {
@Override @Override
public boolean signalContainer(String user, String pid, Signal signal) public boolean signalContainer(String user, String pid, Signal signal)
throws IOException { throws IOException {
final String sigpid = ContainerExecutor.isSetsidAvailable LOG.debug("Sending signal " + signal.getValue() + " to pid " + pid
? "-" + pid
: pid;
LOG.debug("Sending signal " + signal.getValue() + " to pid " + sigpid
+ " as user " + user); + " as user " + user);
if (!containerIsAlive(sigpid)) { if (!containerIsAlive(pid)) {
return false; return false;
} }
try { try {
killContainer(sigpid, signal); killContainer(pid, signal);
} catch (IOException e) { } catch (IOException e) {
if (!containerIsAlive(sigpid)) { if (!containerIsAlive(pid)) {
return false; return false;
} }
throw e; throw e;
@ -322,9 +322,11 @@ public class DefaultContainerExecutor extends ContainerExecutor {
* @param pid String pid * @param pid String pid
* @return boolean true if the process is alive * @return boolean true if the process is alive
*/ */
private boolean containerIsAlive(String pid) throws IOException { @VisibleForTesting
public static boolean containerIsAlive(String pid) throws IOException {
try { try {
new ShellCommandExecutor(getCheckProcessIsAliveCommand(pid)).execute(); new ShellCommandExecutor(Shell.getCheckProcessIsAliveCommand(pid))
.execute();
// successful execution means process is alive // successful execution means process is alive
return true; return true;
} }
@ -342,7 +344,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
* (for logging). * (for logging).
*/ */
private void killContainer(String pid, Signal signal) throws IOException { private void killContainer(String pid, Signal signal) throws IOException {
new ShellCommandExecutor(getSignalKillCommand(signal.getValue(), pid)) new ShellCommandExecutor(Shell.getSignalKillCommand(signal.getValue(), pid))
.execute(); .execute();
} }

View File

@ -468,7 +468,7 @@ public class ContainerManagerImpl extends CompositeService implements
StartContainerResponse response = StartContainerResponse response =
recordFactory.newRecordInstance(StartContainerResponse.class); recordFactory.newRecordInstance(StartContainerResponse.class);
response.addAllServiceResponse(auxiliaryServices.getMeta()); response.setAllServiceResponse(auxiliaryServices.getMeta());
// TODO launchedContainer misplaced -> doesn't necessarily mean a container // TODO launchedContainer misplaced -> doesn't necessarily mean a container
// launch. A finished Application will not launch containers. // launch. A finished Application will not launch containers.
metrics.launchedContainer(); metrics.launchedContainer();

View File

@ -28,6 +28,7 @@ import java.io.OutputStream;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
@ -72,8 +73,8 @@ public class ContainerLaunch implements Callable<Integer> {
private static final Log LOG = LogFactory.getLog(ContainerLaunch.class); private static final Log LOG = LogFactory.getLog(ContainerLaunch.class);
public static final String CONTAINER_SCRIPT = Shell.WINDOWS ? public static final String CONTAINER_SCRIPT =
"launch_container.cmd" : "launch_container.sh"; Shell.appendScriptExtension("launch_container");
public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens"; public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens";
private static final String PID_FILE_NAME_FMT = "%s.pid"; private static final String PID_FILE_NAME_FMT = "%s.pid";
@ -211,7 +212,7 @@ public class ContainerLaunch implements Callable<Integer> {
FINAL_CONTAINER_TOKENS_FILE).toUri().getPath()); FINAL_CONTAINER_TOKENS_FILE).toUri().getPath());
// Sanitize the container's environment // Sanitize the container's environment
sanitizeEnv(environment, containerWorkDir, appDirs); sanitizeEnv(environment, containerWorkDir, appDirs, localResources);
// Write out the environment // Write out the environment
writeLaunchEnv(containerScriptOutStream, environment, localResources, writeLaunchEnv(containerScriptOutStream, environment, localResources,
@ -506,9 +507,17 @@ public class ContainerLaunch implements Callable<Integer> {
@Override @Override
protected void link(Path src, Path dst) throws IOException { protected void link(Path src, Path dst) throws IOException {
File srcFile = new File(src.toUri().getPath());
String srcFileStr = srcFile.getPath();
String dstFileStr = new File(dst.toString()).getPath();
// If not on Java7+ on Windows, then copy file instead of symlinking.
// See also FileUtil#symLink for full explanation.
if (!Shell.isJava7OrAbove() && srcFile.isFile()) {
line(String.format("@copy \"%s\" \"%s\"", srcFileStr, dstFileStr));
} else {
line(String.format("@%s symlink \"%s\" \"%s\"", Shell.WINUTILS, line(String.format("@%s symlink \"%s\" \"%s\"", Shell.WINUTILS,
new File(dst.toString()).getPath(), dstFileStr, srcFileStr));
new File(src.toUri().getPath()).getPath())); }
} }
@Override @Override
@ -532,7 +541,8 @@ public class ContainerLaunch implements Callable<Integer> {
} }
public void sanitizeEnv(Map<String, String> environment, public void sanitizeEnv(Map<String, String> environment,
Path pwd, List<Path> appDirs) throws IOException { Path pwd, List<Path> appDirs, Map<Path, List<String>> resources)
throws IOException {
/** /**
* Non-modifiable environment variables * Non-modifiable environment variables
*/ */
@ -566,16 +576,6 @@ public class ContainerLaunch implements Callable<Integer> {
environment.put("JVM_PID", "$$"); environment.put("JVM_PID", "$$");
} }
// TODO: Remove Windows check and use this approach on all platforms after
// additional testing. See YARN-358.
if (Shell.WINDOWS) {
String inputClassPath = environment.get(Environment.CLASSPATH.name());
if (inputClassPath != null && !inputClassPath.isEmpty()) {
environment.put(Environment.CLASSPATH.name(),
FileUtil.createJarWithClassPath(inputClassPath, pwd));
}
}
/** /**
* Modifiable environment variables * Modifiable environment variables
*/ */
@ -594,6 +594,57 @@ public class ContainerLaunch implements Callable<Integer> {
YarnConfiguration.NM_ADMIN_USER_ENV, YarnConfiguration.NM_ADMIN_USER_ENV,
YarnConfiguration.DEFAULT_NM_ADMIN_USER_ENV) YarnConfiguration.DEFAULT_NM_ADMIN_USER_ENV)
); );
// TODO: Remove Windows check and use this approach on all platforms after
// additional testing. See YARN-358.
if (Shell.WINDOWS) {
String inputClassPath = environment.get(Environment.CLASSPATH.name());
if (inputClassPath != null && !inputClassPath.isEmpty()) {
StringBuilder newClassPath = new StringBuilder(inputClassPath);
// Localized resources do not exist at the desired paths yet, because the
// container launch script has not run to create symlinks yet. This
// means that FileUtil.createJarWithClassPath can't automatically expand
// wildcards to separate classpath entries for each file in the manifest.
// To resolve this, append classpath entries explicitly for each
// resource.
for (Map.Entry<Path,List<String>> entry : resources.entrySet()) {
boolean targetIsDirectory = new File(entry.getKey().toUri().getPath())
.isDirectory();
for (String linkName : entry.getValue()) {
// Append resource.
newClassPath.append(File.pathSeparator).append(pwd.toString())
.append(Path.SEPARATOR).append(linkName);
// FileUtil.createJarWithClassPath must use File.toURI to convert
// each file to a URI to write into the manifest's classpath. For
// directories, the classpath must have a trailing '/', but
// File.toURI only appends the trailing '/' if it is a directory that
// already exists. To resolve this, add the classpath entries with
// explicit trailing '/' here for any localized resource that targets
// a directory. Then, FileUtil.createJarWithClassPath will guarantee
// that the resulting entry in the manifest's classpath will have a
// trailing '/', and thus refer to a directory instead of a file.
if (targetIsDirectory) {
newClassPath.append(Path.SEPARATOR);
}
}
}
// When the container launches, it takes the parent process's environment
// and then adds/overwrites with the entries from the container launch
// context. Do the same thing here for correct substitution of
// environment variables in the classpath jar manifest.
Map<String, String> mergedEnv = new HashMap<String, String>(
System.getenv());
mergedEnv.putAll(environment);
String classPathJar = FileUtil.createJarWithClassPath(
newClassPath.toString(), pwd, mergedEnv);
environment.put(Environment.CLASSPATH.name(), classPathJar);
}
}
} }
static void writeLaunchEnv(OutputStream out, static void writeLaunchEnv(OutputStream out,

View File

@ -22,12 +22,13 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File; import java.io.File;
import java.io.FileReader; import java.io.FileReader;
import java.io.FileWriter; import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -40,6 +41,7 @@ import junit.framework.Assert;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -59,6 +61,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
@ -81,6 +84,7 @@ public class TestNodeManagerShutdown {
.getRecordFactory(null); .getRecordFactory(null);
static final String user = "nobody"; static final String user = "nobody";
private FileContext localFS; private FileContext localFS;
private ContainerId cId;
private CyclicBarrier syncBarrier = new CyclicBarrier(2); private CyclicBarrier syncBarrier = new CyclicBarrier(2);
@Before @Before
@ -90,6 +94,9 @@ public class TestNodeManagerShutdown {
logsDir.mkdirs(); logsDir.mkdirs();
remoteLogsDir.mkdirs(); remoteLogsDir.mkdirs();
nmLocalDir.mkdirs(); nmLocalDir.mkdirs();
// Construct the Container-id
cId = createContainerId();
} }
@After @After
@ -115,9 +122,15 @@ public class TestNodeManagerShutdown {
nm.stop(); nm.stop();
// Now verify the contents of the file // Now verify the contents of the file. Script generates a message when it
// Script generates a message when it receives a sigterm // receives a sigterm so we look for that. We cannot perform this check on
// so we look for that // Windows, because the process is not notified when killed by winutils.
// There is no way for the process to trap and respond. Instead, we can
// verify that the job object with ID matching container ID no longer exists.
if (Shell.WINDOWS) {
Assert.assertFalse("Process is still alive!",
DefaultContainerExecutor.containerIsAlive(cId.toString()));
} else {
BufferedReader reader = BufferedReader reader =
new BufferedReader(new FileReader(processStartFile)); new BufferedReader(new FileReader(processStartFile));
@ -135,6 +148,7 @@ public class TestNodeManagerShutdown {
Assert.assertTrue("Did not find sigterm message", foundSigTermMessage); Assert.assertTrue("Did not find sigterm message", foundSigTermMessage);
reader.close(); reader.close();
} }
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test @Test
@ -162,8 +176,6 @@ public class TestNodeManagerShutdown {
ContainerLaunchContext containerLaunchContext = ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class); recordFactory.newRecordInstance(ContainerLaunchContext.class);
Container mockContainer = mock(Container.class); Container mockContainer = mock(Container.class);
// Construct the Container-id
ContainerId cId = createContainerId();
when(mockContainer.getId()).thenReturn(cId); when(mockContainer.getId()).thenReturn(cId);
containerLaunchContext.setUser(user); containerLaunchContext.setUser(user);
@ -184,9 +196,7 @@ public class TestNodeManagerShutdown {
localResources.put(destinationFile, localResource); localResources.put(destinationFile, localResource);
containerLaunchContext.setLocalResources(localResources); containerLaunchContext.setLocalResources(localResources);
containerLaunchContext.setUser(containerLaunchContext.getUser()); containerLaunchContext.setUser(containerLaunchContext.getUser());
List<String> commands = new ArrayList<String>(); List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
commands.add("/bin/bash");
commands.add(scriptFile.getAbsolutePath());
containerLaunchContext.setCommands(commands); containerLaunchContext.setCommands(commands);
Resource resource = BuilderUtils.newResource(1024, 1); Resource resource = BuilderUtils.newResource(1024, 1);
when(mockContainer.getResource()).thenReturn(resource); when(mockContainer.getResource()).thenReturn(resource);
@ -234,16 +244,24 @@ public class TestNodeManagerShutdown {
* stopped by external means. * stopped by external means.
*/ */
private File createUnhaltingScriptFile() throws IOException { private File createUnhaltingScriptFile() throws IOException {
File scriptFile = new File(tmpDir, "scriptFile.sh"); File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
BufferedWriter fileWriter = new BufferedWriter(new FileWriter(scriptFile)); PrintWriter fileWriter = new PrintWriter(scriptFile);
if (Shell.WINDOWS) {
fileWriter.println("@echo \"Running testscript for delayed kill\"");
fileWriter.println("@echo \"Writing pid to start file\"");
fileWriter.println("@echo " + cId + ">> " + processStartFile);
fileWriter.println("@pause");
} else {
fileWriter.write("#!/bin/bash\n\n"); fileWriter.write("#!/bin/bash\n\n");
fileWriter.write("echo \"Running testscript for delayed kill\"\n"); fileWriter.write("echo \"Running testscript for delayed kill\"\n");
fileWriter.write("hello=\"Got SIGTERM\"\n"); fileWriter.write("hello=\"Got SIGTERM\"\n");
fileWriter.write("umask 0\n"); fileWriter.write("umask 0\n");
fileWriter.write("trap \"echo $hello >> " + processStartFile + "\" SIGTERM\n"); fileWriter.write("trap \"echo $hello >> " + processStartFile +
"\" SIGTERM\n");
fileWriter.write("echo \"Writing pid to start file\"\n"); fileWriter.write("echo \"Writing pid to start file\"\n");
fileWriter.write("echo $$ >> " + processStartFile + "\n"); fileWriter.write("echo $$ >> " + processStartFile + "\n");
fileWriter.write("while true; do\ndate >> /dev/null;\n done\n"); fileWriter.write("while true; do\ndate >> /dev/null;\n done\n");
}
fileWriter.close(); fileWriter.close();
return scriptFile; return scriptFile;

View File

@ -76,15 +76,15 @@ public abstract class BaseContainerManagerTest {
public BaseContainerManagerTest() throws UnsupportedFileSystemException { public BaseContainerManagerTest() throws UnsupportedFileSystemException {
localFS = FileContext.getLocalFSFileContext(); localFS = FileContext.getLocalFSFileContext();
localDir = localDir =
new File("target", this.getClass().getName() + "-localDir") new File("target", this.getClass().getSimpleName() + "-localDir")
.getAbsoluteFile(); .getAbsoluteFile();
localLogDir = localLogDir =
new File("target", this.getClass().getName() + "-localLogDir") new File("target", this.getClass().getSimpleName() + "-localLogDir")
.getAbsoluteFile(); .getAbsoluteFile();
remoteLogDir = remoteLogDir =
new File("target", this.getClass().getName() + "-remoteLogDir") new File("target", this.getClass().getSimpleName() + "-remoteLogDir")
.getAbsoluteFile(); .getAbsoluteFile();
tmpDir = new File("target", this.getClass().getName() + "-tmpDir"); tmpDir = new File("target", this.getClass().getSimpleName() + "-tmpDir");
} }
protected static Log LOG = LogFactory protected static Log LOG = LogFactory

View File

@ -35,6 +35,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
@ -196,22 +198,29 @@ public class TestContainerManager extends BaseContainerManagerTest {
InterruptedException { InterruptedException {
containerManager.start(); containerManager.start();
File scriptFile = new File(tmpDir, "scriptFile.sh"); File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
PrintWriter fileWriter = new PrintWriter(scriptFile); PrintWriter fileWriter = new PrintWriter(scriptFile);
File processStartFile = File processStartFile =
new File(tmpDir, "start_file.txt").getAbsoluteFile(); new File(tmpDir, "start_file.txt").getAbsoluteFile();
// ////// Construct the Container-id
ContainerId cId = createContainerId();
if (Shell.WINDOWS) {
fileWriter.println("@echo Hello World!> " + processStartFile);
fileWriter.println("@echo " + cId + ">> " + processStartFile);
fileWriter.println("@ping -n 100 127.0.0.1 >nul");
} else {
fileWriter.write("\numask 0"); // So that start file is readable by the test fileWriter.write("\numask 0"); // So that start file is readable by the test
fileWriter.write("\necho Hello World! > " + processStartFile); fileWriter.write("\necho Hello World! > " + processStartFile);
fileWriter.write("\necho $$ >> " + processStartFile); fileWriter.write("\necho $$ >> " + processStartFile);
fileWriter.write("\nexec sleep 100"); fileWriter.write("\nexec sleep 100");
}
fileWriter.close(); fileWriter.close();
ContainerLaunchContext containerLaunchContext = ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class); recordFactory.newRecordInstance(ContainerLaunchContext.class);
// ////// Construct the Container-id
ContainerId cId = createContainerId();
containerLaunchContext.setUser(user); containerLaunchContext.setUser(user);
URL resource_alpha = URL resource_alpha =
@ -230,14 +239,12 @@ public class TestContainerManager extends BaseContainerManagerTest {
localResources.put(destinationFile, rsrc_alpha); localResources.put(destinationFile, rsrc_alpha);
containerLaunchContext.setLocalResources(localResources); containerLaunchContext.setLocalResources(localResources);
containerLaunchContext.setUser(containerLaunchContext.getUser()); containerLaunchContext.setUser(containerLaunchContext.getUser());
List<String> commands = new ArrayList<String>(); List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
commands.add("/bin/bash");
commands.add(scriptFile.getAbsolutePath());
containerLaunchContext.setCommands(commands); containerLaunchContext.setCommands(commands);
Container mockContainer = mock(Container.class); Container mockContainer = mock(Container.class);
when(mockContainer.getId()).thenReturn(cId); when(mockContainer.getId()).thenReturn(cId);
when(mockContainer.getResource()).thenReturn( when(mockContainer.getResource()).thenReturn(
BuilderUtils.newResource(100 * 1024 * 1024, 1)); BuilderUtils.newResource(100, 1)); // MB
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class); StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext); startRequest.setContainerLaunchContext(containerLaunchContext);
startRequest.setContainer(mockContainer); startRequest.setContainer(mockContainer);
@ -264,12 +271,10 @@ public class TestContainerManager extends BaseContainerManagerTest {
// Assert that the process is alive // Assert that the process is alive
Assert.assertTrue("Process is not alive!", Assert.assertTrue("Process is not alive!",
exec.signalContainer(user, DefaultContainerExecutor.containerIsAlive(pid));
pid, Signal.NULL));
// Once more // Once more
Assert.assertTrue("Process is not alive!", Assert.assertTrue("Process is not alive!",
exec.signalContainer(user, DefaultContainerExecutor.containerIsAlive(pid));
pid, Signal.NULL));
StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class); StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
stopRequest.setContainerId(cId); stopRequest.setContainerId(cId);
@ -283,38 +288,46 @@ public class TestContainerManager extends BaseContainerManagerTest {
gcsRequest.setContainerId(cId); gcsRequest.setContainerId(cId);
ContainerStatus containerStatus = ContainerStatus containerStatus =
containerManager.getContainerStatus(gcsRequest).getStatus(); containerManager.getContainerStatus(gcsRequest).getStatus();
Assert.assertEquals(ExitCode.TERMINATED.getExitCode(), int expectedExitCode = Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
containerStatus.getExitStatus()); ExitCode.TERMINATED.getExitCode();
Assert.assertEquals(expectedExitCode, containerStatus.getExitStatus());
// Assert that the process is not alive anymore // Assert that the process is not alive anymore
Assert.assertFalse("Process is still alive!", Assert.assertFalse("Process is still alive!",
exec.signalContainer(user, DefaultContainerExecutor.containerIsAlive(pid));
pid, Signal.NULL));
} }
private void testContainerLaunchAndExit(int exitCode) throws IOException, InterruptedException { private void testContainerLaunchAndExit(int exitCode) throws IOException, InterruptedException {
File scriptFile = new File(tmpDir, "scriptFile.sh"); File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
PrintWriter fileWriter = new PrintWriter(scriptFile); PrintWriter fileWriter = new PrintWriter(scriptFile);
File processStartFile = File processStartFile =
new File(tmpDir, "start_file.txt").getAbsoluteFile(); new File(tmpDir, "start_file.txt").getAbsoluteFile();
// ////// Construct the Container-id
ContainerId cId = createContainerId();
if (Shell.WINDOWS) {
fileWriter.println("@echo Hello World!> " + processStartFile);
fileWriter.println("@echo " + cId + ">> " + processStartFile);
if (exitCode != 0) {
fileWriter.println("@exit " + exitCode);
}
} else {
fileWriter.write("\numask 0"); // So that start file is readable by the test fileWriter.write("\numask 0"); // So that start file is readable by the test
fileWriter.write("\necho Hello World! > " + processStartFile); fileWriter.write("\necho Hello World! > " + processStartFile);
fileWriter.write("\necho $$ >> " + processStartFile); fileWriter.write("\necho $$ >> " + processStartFile);
// Have script throw an exit code at the end // Have script throw an exit code at the end
if (exitCode != 0) { if (exitCode != 0) {
fileWriter.write("\nexit "+exitCode); fileWriter.write("\nexit "+exitCode);
} }
}
fileWriter.close(); fileWriter.close();
ContainerLaunchContext containerLaunchContext = ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class); recordFactory.newRecordInstance(ContainerLaunchContext.class);
// ////// Construct the Container-id
ContainerId cId = createContainerId();
containerLaunchContext.setUser(user); containerLaunchContext.setUser(user);
URL resource_alpha = URL resource_alpha =
@ -333,14 +346,12 @@ public class TestContainerManager extends BaseContainerManagerTest {
localResources.put(destinationFile, rsrc_alpha); localResources.put(destinationFile, rsrc_alpha);
containerLaunchContext.setLocalResources(localResources); containerLaunchContext.setLocalResources(localResources);
containerLaunchContext.setUser(containerLaunchContext.getUser()); containerLaunchContext.setUser(containerLaunchContext.getUser());
List<String> commands = new ArrayList<String>(); List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
commands.add("/bin/bash");
commands.add(scriptFile.getAbsolutePath());
containerLaunchContext.setCommands(commands); containerLaunchContext.setCommands(commands);
Container mockContainer = mock(Container.class); Container mockContainer = mock(Container.class);
when(mockContainer.getId()).thenReturn(cId); when(mockContainer.getId()).thenReturn(cId);
when(mockContainer.getResource()).thenReturn( when(mockContainer.getResource()).thenReturn(
BuilderUtils.newResource(100 * 1024 * 1024, 1)); BuilderUtils.newResource(100, 1)); // MB
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class); StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext); startRequest.setContainerLaunchContext(containerLaunchContext);

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
@ -88,13 +89,15 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
File shellFile = null; File shellFile = null;
File tempFile = null; File tempFile = null;
String badSymlink = "foo@zz%_#*&!-+= bar()"; String badSymlink = Shell.WINDOWS ? "foo@zz_#!-+bar.cmd" :
"foo@zz%_#*&!-+= bar()";
File symLinkFile = null; File symLinkFile = null;
try { try {
shellFile = new File(tmpDir, "hello.sh"); shellFile = Shell.appendScriptExtension(tmpDir, "hello");
tempFile = new File(tmpDir, "temp.sh"); tempFile = Shell.appendScriptExtension(tmpDir, "temp");
String timeoutCommand = "echo \"hello\""; String timeoutCommand = Shell.WINDOWS ? "@echo \"hello\"" :
"echo \"hello\"";
PrintWriter writer = new PrintWriter(new FileOutputStream(shellFile)); PrintWriter writer = new PrintWriter(new FileOutputStream(shellFile));
shellFile.setExecutable(true); shellFile.setExecutable(true);
writer.println(timeoutCommand); writer.println(timeoutCommand);
@ -109,7 +112,13 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
Map<String, String> env = new HashMap<String, String>(); Map<String, String> env = new HashMap<String, String>();
List<String> commands = new ArrayList<String>(); List<String> commands = new ArrayList<String>();
if (Shell.WINDOWS) {
commands.add("cmd");
commands.add("/c");
commands.add("\"" + badSymlink + "\"");
} else {
commands.add("/bin/sh ./\\\"" + badSymlink + "\\\""); commands.add("/bin/sh ./\\\"" + badSymlink + "\\\"");
}
ContainerLaunch.writeLaunchEnv(fos, env, resources, commands); ContainerLaunch.writeLaunchEnv(fos, env, resources, commands);
fos.flush(); fos.flush();
@ -145,6 +154,19 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
// this is a dirty hack - but should be ok for a unittest. // this is a dirty hack - but should be ok for a unittest.
@SuppressWarnings({ "rawtypes", "unchecked" }) @SuppressWarnings({ "rawtypes", "unchecked" })
public static void setNewEnvironmentHack(Map<String, String> newenv) throws Exception { public static void setNewEnvironmentHack(Map<String, String> newenv) throws Exception {
try {
Class<?> cl = Class.forName("java.lang.ProcessEnvironment");
Field field = cl.getDeclaredField("theEnvironment");
field.setAccessible(true);
Map<String, String> env = (Map<String, String>)field.get(null);
env.clear();
env.putAll(newenv);
Field ciField = cl.getDeclaredField("theCaseInsensitiveEnvironment");
ciField.setAccessible(true);
Map<String, String> cienv = (Map<String, String>)ciField.get(null);
cienv.clear();
cienv.putAll(newenv);
} catch (NoSuchFieldException e) {
Class[] classes = Collections.class.getDeclaredClasses(); Class[] classes = Collections.class.getDeclaredClasses();
Map<String, String> env = System.getenv(); Map<String, String> env = System.getenv();
for (Class cl : classes) { for (Class cl : classes) {
@ -158,6 +180,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
} }
} }
} }
}
/** /**
* See if environment variable is forwarded using sanitizeEnv. * See if environment variable is forwarded using sanitizeEnv.
@ -172,22 +195,6 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
envWithDummy.put(Environment.MALLOC_ARENA_MAX.name(), "99"); envWithDummy.put(Environment.MALLOC_ARENA_MAX.name(), "99");
setNewEnvironmentHack(envWithDummy); setNewEnvironmentHack(envWithDummy);
String malloc = System.getenv(Environment.MALLOC_ARENA_MAX.name());
File scriptFile = new File(tmpDir, "scriptFile.sh");
PrintWriter fileWriter = new PrintWriter(scriptFile);
File processStartFile =
new File(tmpDir, "env_vars.txt").getAbsoluteFile();
fileWriter.write("\numask 0"); // So that start file is readable by the test
fileWriter.write("\necho $" + Environment.MALLOC_ARENA_MAX.name() + " > " + processStartFile);
fileWriter.write("\necho $$ >> " + processStartFile);
fileWriter.write("\nexec sleep 100");
fileWriter.close();
assert(malloc != null && !"".equals(malloc));
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
Container mockContainer = mock(Container.class); Container mockContainer = mock(Container.class);
// ////// Construct the Container-id // ////// Construct the Container-id
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class); ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
@ -200,6 +207,30 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
ContainerId cId = ContainerId cId =
recordFactory.newRecordInstance(ContainerId.class); recordFactory.newRecordInstance(ContainerId.class);
cId.setApplicationAttemptId(appAttemptId); cId.setApplicationAttemptId(appAttemptId);
String malloc = System.getenv(Environment.MALLOC_ARENA_MAX.name());
File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
PrintWriter fileWriter = new PrintWriter(scriptFile);
File processStartFile =
new File(tmpDir, "env_vars.txt").getAbsoluteFile();
if (Shell.WINDOWS) {
fileWriter.println("@echo " + Environment.MALLOC_ARENA_MAX.$() + "> " +
processStartFile);
fileWriter.println("@echo " + cId + ">> " + processStartFile);
fileWriter.println("@ping -n 100 127.0.0.1 >nul");
} else {
fileWriter.write("\numask 0"); // So that start file is readable by the test
fileWriter.write("\necho " + Environment.MALLOC_ARENA_MAX.$() + " > " +
processStartFile);
fileWriter.write("\necho $$ >> " + processStartFile);
fileWriter.write("\nexec sleep 100");
}
fileWriter.close();
assert(malloc != null && !"".equals(malloc));
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
when(mockContainer.getId()).thenReturn(cId); when(mockContainer.getId()).thenReturn(cId);
containerLaunchContext.setUser(user); containerLaunchContext.setUser(user);
@ -223,9 +254,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
// set up the rest of the container // set up the rest of the container
containerLaunchContext.setUser(containerLaunchContext.getUser()); containerLaunchContext.setUser(containerLaunchContext.getUser());
List<String> commands = new ArrayList<String>(); List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
commands.add("/bin/bash");
commands.add(scriptFile.getAbsolutePath());
containerLaunchContext.setCommands(commands); containerLaunchContext.setCommands(commands);
when(mockContainer.getResource()).thenReturn( when(mockContainer.getResource()).thenReturn(
BuilderUtils.newResource(1024, 1)); BuilderUtils.newResource(1024, 1));
@ -255,12 +284,10 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
// Assert that the process is alive // Assert that the process is alive
Assert.assertTrue("Process is not alive!", Assert.assertTrue("Process is not alive!",
exec.signalContainer(user, DefaultContainerExecutor.containerIsAlive(pid));
pid, Signal.NULL));
// Once more // Once more
Assert.assertTrue("Process is not alive!", Assert.assertTrue("Process is not alive!",
exec.signalContainer(user, DefaultContainerExecutor.containerIsAlive(pid));
pid, Signal.NULL));
StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class); StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
stopRequest.setContainerId(cId); stopRequest.setContainerId(cId);
@ -274,38 +301,19 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
gcsRequest.setContainerId(cId); gcsRequest.setContainerId(cId);
ContainerStatus containerStatus = ContainerStatus containerStatus =
containerManager.getContainerStatus(gcsRequest).getStatus(); containerManager.getContainerStatus(gcsRequest).getStatus();
Assert.assertEquals(ExitCode.TERMINATED.getExitCode(), int expectedExitCode = Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
containerStatus.getExitStatus()); ExitCode.TERMINATED.getExitCode();
Assert.assertEquals(expectedExitCode, containerStatus.getExitStatus());
// Assert that the process is not alive anymore // Assert that the process is not alive anymore
Assert.assertFalse("Process is still alive!", Assert.assertFalse("Process is still alive!",
exec.signalContainer(user, DefaultContainerExecutor.containerIsAlive(pid));
pid, Signal.NULL));
} }
@Test @Test
public void testDelayedKill() throws Exception { public void testDelayedKill() throws Exception {
containerManager.start(); containerManager.start();
File processStartFile =
new File(tmpDir, "pid.txt").getAbsoluteFile();
// setup a script that can handle sigterm gracefully
File scriptFile = new File(tmpDir, "testscript.sh");
PrintWriter writer = new PrintWriter(new FileOutputStream(scriptFile));
writer.println("#!/bin/bash\n\n");
writer.println("echo \"Running testscript for delayed kill\"");
writer.println("hello=\"Got SIGTERM\"");
writer.println("umask 0");
writer.println("trap \"echo $hello >> " + processStartFile + "\" SIGTERM");
writer.println("echo \"Writing pid to start file\"");
writer.println("echo $$ >> " + processStartFile);
writer.println("while true; do\nsleep 1s;\ndone");
writer.close();
scriptFile.setExecutable(true);
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
Container mockContainer = mock(Container.class); Container mockContainer = mock(Container.class);
// ////// Construct the Container-id // ////// Construct the Container-id
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class); ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
@ -318,6 +326,33 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
ContainerId cId = ContainerId cId =
recordFactory.newRecordInstance(ContainerId.class); recordFactory.newRecordInstance(ContainerId.class);
cId.setApplicationAttemptId(appAttemptId); cId.setApplicationAttemptId(appAttemptId);
File processStartFile =
new File(tmpDir, "pid.txt").getAbsoluteFile();
// setup a script that can handle sigterm gracefully
File scriptFile = Shell.appendScriptExtension(tmpDir, "testscript");
PrintWriter writer = new PrintWriter(new FileOutputStream(scriptFile));
if (Shell.WINDOWS) {
writer.println("@echo \"Running testscript for delayed kill\"");
writer.println("@echo \"Writing pid to start file\"");
writer.println("@echo " + cId + "> " + processStartFile);
writer.println("@ping -n 100 127.0.0.1 >nul");
} else {
writer.println("#!/bin/bash\n\n");
writer.println("echo \"Running testscript for delayed kill\"");
writer.println("hello=\"Got SIGTERM\"");
writer.println("umask 0");
writer.println("trap \"echo $hello >> " + processStartFile + "\" SIGTERM");
writer.println("echo \"Writing pid to start file\"");
writer.println("echo $$ >> " + processStartFile);
writer.println("while true; do\nsleep 1s;\ndone");
}
writer.close();
scriptFile.setExecutable(true);
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
when(mockContainer.getId()).thenReturn(cId); when(mockContainer.getId()).thenReturn(cId);
containerLaunchContext.setUser(user); containerLaunchContext.setUser(user);
@ -341,8 +376,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
// set up the rest of the container // set up the rest of the container
containerLaunchContext.setUser(containerLaunchContext.getUser()); containerLaunchContext.setUser(containerLaunchContext.getUser());
List<String> commands = new ArrayList<String>(); List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
commands.add(scriptFile.getAbsolutePath());
containerLaunchContext.setCommands(commands); containerLaunchContext.setCommands(commands);
when(mockContainer.getResource()).thenReturn( when(mockContainer.getResource()).thenReturn(
BuilderUtils.newResource(1024, 1)); BuilderUtils.newResource(1024, 1));
@ -376,9 +410,15 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
Assert.assertEquals(ExitCode.FORCE_KILLED.getExitCode(), Assert.assertEquals(ExitCode.FORCE_KILLED.getExitCode(),
containerStatus.getExitStatus()); containerStatus.getExitStatus());
// Now verify the contents of the file // Now verify the contents of the file. Script generates a message when it
// Script generates a message when it receives a sigterm // receives a sigterm so we look for that. We cannot perform this check on
// so we look for that // Windows, because the process is not notified when killed by winutils.
// There is no way for the process to trap and respond. Instead, we can
// verify that the job object with ID matching container ID no longer exists.
if (Shell.WINDOWS) {
Assert.assertFalse("Process is still alive!",
DefaultContainerExecutor.containerIsAlive(cId.toString()));
} else {
BufferedReader reader = BufferedReader reader =
new BufferedReader(new FileReader(processStartFile)); new BufferedReader(new FileReader(processStartFile));
@ -396,5 +436,6 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
Assert.assertTrue("Did not find sigterm message", foundSigTermMessage); Assert.assertTrue("Did not find sigterm message", foundSigTermMessage);
reader.close(); reader.close();
} }
}
} }

View File

@ -497,7 +497,7 @@ public class TestContainerManagerSecurity {
.getAllocatedContainers(); .getAllocatedContainers();
// Modify ask to request no more. // Modify ask to request no more.
allocateRequest.clearAsks(); allocateRequest.setAskList(new ArrayList<ResourceRequest>());
int waitCounter = 0; int waitCounter = 0;
while ((allocatedContainers == null || allocatedContainers.size() == 0) while ((allocatedContainers == null || allocatedContainers.size() == 0)