diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 419fbf9e25f..297f8f8feb2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -445,6 +445,9 @@ Release 2.0.3-alpha - Unreleased HDFS-3916. libwebhdfs testing code cleanup. (Jing Zhao via suresh) + HDFS-4143. Change blocks to private in INodeFile and renames isLink() to + isSymlink() in INode. (szetszwo) + OPTIMIZATIONS BUG FIXES @@ -536,6 +539,12 @@ Release 2.0.3-alpha - Unreleased HDFS-3809. Make BKJM use protobufs for all serialization with ZK. (Ivan Kelly via umamahesh) + HDFS-3804. TestHftpFileSystem fails intermittently with JDK7 + (Trevor Robinson via daryn) + + HDFS-4132. When libwebhdfs is not enabled, nativeMiniDfsClient frees + uninitialized memory (Colin Patrick McCabe via todd) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 0e57ff6ce37..37aeacb960d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -313,7 +313,7 @@ public class FSDirectory implements Closeable { } if(newParent == null) return null; - if(!newNode.isDirectory() && !newNode.isLink()) { + if(!newNode.isDirectory() && !newNode.isSymlink()) { // Add file->block mapping INodeFile newF = (INodeFile)newNode; BlockInfo[] blocks = newF.getBlocks(); @@ -533,7 +533,7 @@ public class FSDirectory implements Closeable { if (dst.equals(src)) { return true; } - if (srcInode.isLink() && + if (srcInode.isSymlink() && dst.equals(((INodeSymlink)srcInode).getLinkValue())) { throw new FileAlreadyExistsException( "Cannot rename symlink "+src+" to its target "+dst); @@ -655,7 +655,7 @@ public class FSDirectory implements Closeable { throw new FileAlreadyExistsException( "The source "+src+" and destination "+dst+" are the same"); } - if (srcInode.isLink() && + if (srcInode.isSymlink() && dst.equals(((INodeSymlink)srcInode).getLinkValue())) { throw new FileAlreadyExistsException( "Cannot rename symlink "+src+" to its target "+dst); @@ -819,7 +819,7 @@ public class FSDirectory implements Closeable { if (inode == null) { return null; } - assert !inode.isLink(); + assert !inode.isSymlink(); if (inode.isDirectory()) { return null; } @@ -851,7 +851,7 @@ public class FSDirectory implements Closeable { if (inode == null) { throw new FileNotFoundException("File does not exist: " + filename); } - if (inode.isDirectory() || inode.isLink()) { + if (inode.isDirectory() || inode.isSymlink()) { throw new IOException("Getting block size of non-file: "+ filename); } return ((INodeFile)inode).getPreferredBlockSize(); @@ -868,7 +868,7 @@ public class FSDirectory implements Closeable { if (inode == null) { return false; } - return inode.isDirectory() || inode.isLink() + return inode.isDirectory() || inode.isSymlink() ? true : ((INodeFile)inode).getBlocks() != null; } finally { @@ -968,7 +968,7 @@ public class FSDirectory implements Closeable { for(String src : srcs) { INodeFile srcInode = (INodeFile)getINode(src); allSrcInodes[i++] = srcInode; - totalBlocks += srcInode.blocks.length; + totalBlocks += srcInode.numBlocks(); } trgInode.appendBlocks(allSrcInodes, totalBlocks); // copy the blocks @@ -977,7 +977,7 @@ public class FSDirectory implements Closeable { for(INodeFile nodeToRemove: allSrcInodes) { if(nodeToRemove == null) continue; - nodeToRemove.blocks = null; + nodeToRemove.setBlocks(null); trgParent.removeChild(nodeToRemove); count++; } @@ -1232,7 +1232,7 @@ public class FSDirectory implements Closeable { return null; if (targetNode.isDirectory()) return null; - if (targetNode.isLink()) + if (targetNode.isSymlink()) return null; return ((INodeFile)targetNode).getBlocks(); } finally { @@ -1846,7 +1846,7 @@ public class FSDirectory implements Closeable { if (child.isDirectory()) { updateCountForINodeWithQuota((INodeDirectory)child, counts, nodesInPath); - } else if (child.isLink()) { + } else if (child.isSymlink()) { counts.nsCount += 1; } else { // reduce recursive calls counts.nsCount += 1; @@ -2075,7 +2075,7 @@ public class FSDirectory implements Closeable { node.getFsPermission(), node.getUserName(), node.getGroupName(), - node.isLink() ? ((INodeSymlink)node).getSymlink() : null, + node.isSymlink() ? ((INodeSymlink)node).getSymlink() : null, path); } @@ -2111,7 +2111,7 @@ public class FSDirectory implements Closeable { node.getFsPermission(), node.getUserName(), node.getGroupName(), - node.isLink() ? ((INodeSymlink)node).getSymlink() : null, + node.isSymlink() ? ((INodeSymlink)node).getSymlink() : null, path, loc); } @@ -2182,7 +2182,7 @@ public class FSDirectory implements Closeable { */ void cacheName(INode inode) { // Name is cached only for files - if (inode.isDirectory() || inode.isLink()) { + if (inode.isDirectory() || inode.isSymlink()) { return; } ByteArray name = new ByteArray(inode.getLocalNameBytes()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java index 173d96c9693..defd6e0f217 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java @@ -162,7 +162,7 @@ public class FSImageSerialization { PermissionStatus.write(out, node.getUserName(), node.getGroupName(), filePerm); - } else if (node.isLink()) { + } else if (node.isSymlink()) { out.writeShort(0); // replication out.writeLong(0); // modification time out.writeLong(0); // access time diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 8961282dd86..0e91c9c6473 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -1394,7 +1394,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, + target + " is under construction"); } // per design target shouldn't be empty and all the blocks same size - if(trgInode.blocks.length == 0) { + if(trgInode.numBlocks() == 0) { throw new HadoopIllegalArgumentException("concat: target file " + target + " is empty"); } @@ -1406,10 +1406,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats, long blockSize = trgInode.getPreferredBlockSize(); // check the end block to be full - if(blockSize != trgInode.blocks[trgInode.blocks.length-1].getNumBytes()) { + final BlockInfo last = trgInode.getLastBlock(); + if(blockSize != last.getNumBytes()) { throw new HadoopIllegalArgumentException("The last block in " + target - + " is not full; last block size = " - + trgInode.blocks[trgInode.blocks.length-1].getNumBytes() + + " is not full; last block size = " + last.getNumBytes() + " but file block size = " + blockSize); } @@ -1426,7 +1426,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, final INodeFile srcInode = INodeFile.valueOf(dir.getINode(src), src); if(src.isEmpty() || srcInode.isUnderConstruction() - || srcInode.blocks.length == 0) { + || srcInode.numBlocks() == 0) { throw new HadoopIllegalArgumentException("concat: source file " + src + " is invalid or empty or underConstruction"); } @@ -1443,15 +1443,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats, //boolean endBlock=false; // verify that all the blocks are of the same length as target // should be enough to check the end blocks - int idx = srcInode.blocks.length-1; + final BlockInfo[] srcBlocks = srcInode.getBlocks(); + int idx = srcBlocks.length-1; if(endSrc) - idx = srcInode.blocks.length-2; // end block of endSrc is OK not to be full - if(idx >= 0 && srcInode.blocks[idx].getNumBytes() != blockSize) { + idx = srcBlocks.length-2; // end block of endSrc is OK not to be full + if(idx >= 0 && srcBlocks[idx].getNumBytes() != blockSize) { throw new HadoopIllegalArgumentException("concat: the soruce file " + src + " and the target file " + target + " should have the same blocks sizes: target block size is " + blockSize + " but the size of source block " + idx + " is " - + srcInode.blocks[idx].getNumBytes()); + + srcBlocks[idx].getNumBytes()); } si.add(srcInode); @@ -1686,7 +1687,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, if (parentNode == null) { throw new FileNotFoundException("Parent directory doesn't exist: " + parent.toString()); - } else if (!parentNode.isDirectory() && !parentNode.isLink()) { + } else if (!parentNode.isDirectory() && !parentNode.isSymlink()) { throw new ParentNotDirectoryException("Parent path is not a directory: " + parent.toString()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java index a337080d446..0455a7cabb0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java @@ -183,7 +183,9 @@ public abstract class INode implements Comparable { /** * Check whether it's a directory */ - abstract boolean isDirectory(); + public boolean isDirectory() { + return false; + } /** * Collect all the blocks in all children of this INode. @@ -337,7 +339,7 @@ public abstract class INode implements Comparable { /** * Check whether it's a symlink */ - public boolean isLink() { + public boolean isSymlink() { return false; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java index ca870feb13b..dc5a25eae15 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java @@ -81,11 +81,9 @@ public class INodeDirectory extends INode { this.children = other.getChildren(); } - /** - * Check whether it's a directory - */ + /** @return true unconditionally. */ @Override - public boolean isDirectory() { + public final boolean isDirectory() { return true; } @@ -214,7 +212,7 @@ public class INodeDirectory extends INode { if (index >= 0) { existing.addNode(curNode); } - if (curNode.isLink() && (!lastComp || (lastComp && resolveLink))) { + if (curNode.isSymlink() && (!lastComp || (lastComp && resolveLink))) { final String path = constructPath(components, 0, components.length); final String preceding = constructPath(components, 0, count); final String remainder = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java index b721f31a06d..34d4d008a6a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java @@ -55,7 +55,7 @@ public class INodeFile extends INode implements BlockCollection { private long header; - protected BlockInfo[] blocks = null; + protected BlockInfo[] blocks; INodeFile(PermissionStatus permissions, BlockInfo[] blklist, short replication, long modificationTime, @@ -63,7 +63,7 @@ public class INodeFile extends INode implements BlockCollection { super(permissions, modificationTime, atime); this.setFileReplication(replication); this.setPreferredBlockSize(preferredBlockSize); - blocks = blklist; + this.blocks = blklist; } protected INodeFile(INodeFile f) { @@ -82,11 +82,6 @@ public class INodeFile extends INode implements BlockCollection { super.setPermission(permission.applyUMask(UMASK)); } - @Override - boolean isDirectory() { - return false; - } - /** @return the replication factor of the file. */ public final short getFileReplication() { return (short) ((header & HEADERMASK) >> BLOCKBITS); @@ -138,7 +133,7 @@ public class INodeFile extends INode implements BlockCollection { for(BlockInfo bi: newlist) { bi.setBlockCollection(this); } - this.blocks = newlist; + setBlocks(newlist); } /** @@ -146,14 +141,13 @@ public class INodeFile extends INode implements BlockCollection { */ void addBlock(BlockInfo newblock) { if (this.blocks == null) { - this.blocks = new BlockInfo[1]; - this.blocks[0] = newblock; + this.setBlocks(new BlockInfo[]{newblock}); } else { int size = this.blocks.length; BlockInfo[] newlist = new BlockInfo[size + 1]; System.arraycopy(this.blocks, 0, newlist, 0, size); newlist[size] = newblock; - this.blocks = newlist; + this.setBlocks(newlist); } } @@ -162,6 +156,11 @@ public class INodeFile extends INode implements BlockCollection { this.blocks[idx] = blk; } + /** Set the blocks. */ + public void setBlocks(BlockInfo[] blocks) { + this.blocks = blocks; + } + @Override protected int collectSubtreeBlocksAndClear(List v) { parent = null; @@ -171,7 +170,7 @@ public class INodeFile extends INode implements BlockCollection { blk.setBlockCollection(null); } } - blocks = null; + setBlocks(null); return 1; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java index fe1cd8518bd..4005ee69686 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.IOException; +import java.util.Arrays; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.permission.PermissionStatus; @@ -28,8 +29,6 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.MutableBlockCollection; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; -import com.google.common.base.Joiner; - /** * I-node for file being written. */ @@ -109,9 +108,9 @@ public class INodeFileUnderConstruction extends INodeFile implements MutableBloc // use the modification time as the access time // INodeFile convertToInodeFile() { - assert allBlocksComplete() : - "Can't finalize inode " + this + " since it contains " + - "non-complete blocks! Blocks are: " + blocksAsString(); + assert allBlocksComplete() : "Can't finalize inode " + this + + " since it contains non-complete blocks! Blocks are " + + Arrays.asList(getBlocks()); //TODO SNAPSHOT: may convert to INodeFileWithLink INodeFile obj = new INodeFile(getPermissionStatus(), getBlocks(), @@ -127,7 +126,7 @@ public class INodeFileUnderConstruction extends INodeFile implements MutableBloc * @return true if all of the blocks in this file are marked as completed. */ private boolean allBlocksComplete() { - for (BlockInfo b : blocks) { + for (BlockInfo b : getBlocks()) { if (!b.isComplete()) { return false; } @@ -140,6 +139,7 @@ public class INodeFileUnderConstruction extends INodeFile implements MutableBloc * the last one on the list. */ void removeLastBlock(Block oldblock) throws IOException { + final BlockInfo[] blocks = getBlocks(); if (blocks == null) { throw new IOException("Trying to delete non-existant block " + oldblock); } @@ -151,7 +151,7 @@ public class INodeFileUnderConstruction extends INodeFile implements MutableBloc //copy to a new list BlockInfo[] newlist = new BlockInfo[size_1]; System.arraycopy(blocks, 0, newlist, 0, size_1); - blocks = newlist; + setBlocks(newlist); } /** @@ -160,11 +160,9 @@ public class INodeFileUnderConstruction extends INodeFile implements MutableBloc */ @Override public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock, - DatanodeDescriptor[] targets) - throws IOException { - if (blocks == null || blocks.length == 0) { - throw new IOException("Trying to update non-existant block. " + - "File is empty."); + DatanodeDescriptor[] targets) throws IOException { + if (numBlocks() == 0) { + throw new IOException("Failed to set last block: File is empty."); } BlockInfoUnderConstruction ucBlock = lastBlock.convertToBlockUnderConstruction( @@ -173,8 +171,4 @@ public class INodeFileUnderConstruction extends INodeFile implements MutableBloc setBlock(numBlocks()-1, ucBlock); return ucBlock; } - - private String blocksAsString() { - return Joiner.on(",").join(this.blocks); - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java index f71d515fc21..e48ece057a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java @@ -49,7 +49,7 @@ public class INodeSymlink extends INode { } @Override - public boolean isLink() { + public boolean isSymlink() { return true; } @@ -81,9 +81,4 @@ public class INodeSymlink extends INode { summary[1]++; // Increment the file count return summary; } - - @Override - public boolean isDirectory() { - return false; - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c index 175d9471205..a1e786450f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c @@ -44,11 +44,11 @@ struct NativeMiniDfsCluster { struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf) { struct NativeMiniDfsCluster* cl = NULL; - jobject bld = NULL, bld2 = NULL, cobj = NULL; + jobject bld = NULL, cobj = NULL, cluster = NULL; jvalue val; JNIEnv *env = getJNIEnv(); jthrowable jthr; - jstring jconfStr; + jstring jconfStr = NULL; if (!env) { fprintf(stderr, "nmdCreate: unable to construct JNIEnv.\n"); @@ -63,14 +63,14 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf) if (jthr) { printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: new Configuration"); - goto error_free_cl; + goto error; } if (conf->webhdfsEnabled) { jthr = newJavaStr(env, DFS_WEBHDFS_ENABLED_KEY, &jconfStr); if (jthr) { printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: new String"); - goto error_dlr_cobj; + goto error; } jthr = invokeMethod(env, NULL, INSTANCE, cobj, HADOOP_CONF, "setBoolean", "(Ljava/lang/String;Z)V", @@ -78,7 +78,7 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf) if (jthr) { printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: Configuration::setBoolean"); - goto error_dlr_cobj; + goto error; } } jthr = constructNewObjectOfClass(env, &bld, MINIDFS_CLUSTER_BUILDER, @@ -86,58 +86,53 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf) if (jthr) { printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: NativeMiniDfsCluster#Builder#Builder"); - goto error_dlr_cobj; + goto error; } jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER, "format", "(Z)L" MINIDFS_CLUSTER_BUILDER ";", conf->doFormat); if (jthr) { printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: " "Builder::format"); - goto error_dlr_bld; + goto error; } - bld2 = val.l; + (*env)->DeleteLocalRef(env, val.l); if (conf->webhdfsEnabled) { - jthr = invokeMethod(env, &val, INSTANCE, bld2, MINIDFS_CLUSTER_BUILDER, + jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER, "nameNodeHttpPort", "(I)L" MINIDFS_CLUSTER_BUILDER ";", conf->namenodeHttpPort); if (jthr) { printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: " "Builder::nameNodeHttpPort"); - goto error_dlr_bld2; + goto error; } + (*env)->DeleteLocalRef(env, val.l); } jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER, "build", "()L" MINIDFS_CLUSTER ";"); if (jthr) { printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: Builder#build"); - goto error_dlr_bld2; + goto error; } - cl->obj = (*env)->NewGlobalRef(env, val.l); + cluster = val.l; + cl->obj = (*env)->NewGlobalRef(env, val.l); if (!cl->obj) { printPendingExceptionAndFree(env, PRINT_EXC_ALL, "nmdCreate: NewGlobalRef"); - goto error_dlr_val; + goto error; } - (*env)->DeleteLocalRef(env, val.l); - (*env)->DeleteLocalRef(env, bld2); + (*env)->DeleteLocalRef(env, cluster); (*env)->DeleteLocalRef(env, bld); (*env)->DeleteLocalRef(env, cobj); (*env)->DeleteLocalRef(env, jconfStr); return cl; -error_dlr_val: - (*env)->DeleteLocalRef(env, val.l); -error_dlr_bld2: - (*env)->DeleteLocalRef(env, bld2); -error_dlr_bld: +error: + (*env)->DeleteLocalRef(env, cluster); (*env)->DeleteLocalRef(env, bld); -error_dlr_cobj: (*env)->DeleteLocalRef(env, cobj); (*env)->DeleteLocalRef(env, jconfStr); -error_free_cl: free(cl); -error: return NULL; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHftpFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHftpFileSystem.java index 6cb0ad1ce83..af62f3ca30e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHftpFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHftpFileSystem.java @@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -42,18 +43,17 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.util.ServletUtil; import org.apache.log4j.Level; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; +import org.junit.*; public class TestHftpFileSystem { private static final Random RAN = new Random(); private static Configuration config = null; private static MiniDFSCluster cluster = null; - private static FileSystem hdfs = null; - private static HftpFileSystem hftpFs = null; private static String blockPoolId = null; + private static String hftpUri = null; + private FileSystem hdfs = null; + private HftpFileSystem hftpFs = null; private static Path[] TEST_PATHS = new Path[] { // URI does not encode, Request#getPathInfo returns /foo @@ -93,26 +93,33 @@ public class TestHftpFileSystem { config = new Configuration(); cluster = new MiniDFSCluster.Builder(config).numDataNodes(2).build(); - hdfs = cluster.getFileSystem(); blockPoolId = cluster.getNamesystem().getBlockPoolId(); - final String hftpUri = + hftpUri = "hftp://" + config.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY); - hftpFs = (HftpFileSystem) new Path(hftpUri).getFileSystem(config); } @AfterClass public static void tearDown() throws IOException { - if (hdfs != null) { - hdfs.close(); - } - if (hftpFs != null) { - hftpFs.close(); - } if (cluster != null) { cluster.shutdown(); } } + + @Before + public void initFileSystems() throws IOException { + hdfs = cluster.getFileSystem(); + hftpFs = (HftpFileSystem) new Path(hftpUri).getFileSystem(config); + // clear out the namespace + for (FileStatus stat : hdfs.listStatus(new Path("/"))) { + hdfs.delete(stat.getPath(), true); + } + } + @After + public void resetFileSystems() throws IOException { + FileSystem.closeAll(); + } + /** * Test file creation and access with file names that need encoding. */ @@ -280,19 +287,8 @@ public class TestHftpFileSystem { assertEquals("Stream closed", ioe.getMessage()); } - public void resetFileSystem() throws IOException { - // filesystem caching has a quirk/bug that it caches based on the user's - // given uri. the result is if a filesystem is instantiated with no port, - // it gets the default port. then if the default port is changed, - // and another filesystem is instantiated with no port, the prior fs - // is returned, not a new one using the changed port. so let's flush - // the cache between tests... - FileSystem.closeAll(); - } - @Test public void testHftpDefaultPorts() throws IOException { - resetFileSystem(); Configuration conf = new Configuration(); URI uri = URI.create("hftp://localhost"); HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf); @@ -309,7 +305,6 @@ public class TestHftpFileSystem { @Test public void testHftpCustomDefaultPorts() throws IOException { - resetFileSystem(); Configuration conf = new Configuration(); conf.setInt("dfs.http.port", 123); conf.setInt("dfs.https.port", 456); @@ -329,7 +324,6 @@ public class TestHftpFileSystem { @Test public void testHftpCustomUriPortWithDefaultPorts() throws IOException { - resetFileSystem(); Configuration conf = new Configuration(); URI uri = URI.create("hftp://localhost:123"); HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf); @@ -346,7 +340,6 @@ public class TestHftpFileSystem { @Test public void testHftpCustomUriPortWithCustomDefaultPorts() throws IOException { - resetFileSystem(); Configuration conf = new Configuration(); conf.setInt("dfs.http.port", 123); conf.setInt("dfs.https.port", 456); @@ -368,7 +361,6 @@ public class TestHftpFileSystem { @Test public void testHsftpDefaultPorts() throws IOException { - resetFileSystem(); Configuration conf = new Configuration(); URI uri = URI.create("hsftp://localhost"); HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf); @@ -385,7 +377,6 @@ public class TestHftpFileSystem { @Test public void testHsftpCustomDefaultPorts() throws IOException { - resetFileSystem(); Configuration conf = new Configuration(); conf.setInt("dfs.http.port", 123); conf.setInt("dfs.https.port", 456); @@ -405,7 +396,6 @@ public class TestHftpFileSystem { @Test public void testHsftpCustomUriPortWithDefaultPorts() throws IOException { - resetFileSystem(); Configuration conf = new Configuration(); URI uri = URI.create("hsftp://localhost:123"); HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf); @@ -422,7 +412,6 @@ public class TestHftpFileSystem { @Test public void testHsftpCustomUriPortWithCustomDefaultPorts() throws IOException { - resetFileSystem(); Configuration conf = new Configuration(); conf.setInt("dfs.http.port", 123); conf.setInt("dfs.https.port", 456); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java index 85d7125c53a..0e1183e48ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.Result; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.tools.DFSck; @@ -678,11 +679,11 @@ public class TestFsck { DFSTestUtil.waitReplication(fs, filePath, (short)1); // intentionally corrupt NN data structure - INodeFile node = - (INodeFile)cluster.getNamesystem().dir.rootDir.getNode(fileName, - true); - assertEquals(node.blocks.length, 1); - node.blocks[0].setNumBytes(-1L); // set the block length to be negative + INodeFile node = (INodeFile)cluster.getNamesystem().dir.rootDir.getNode( + fileName, true); + final BlockInfo[] blocks = node.getBlocks(); + assertEquals(blocks.length, 1); + blocks[0].setNumBytes(-1L); // set the block length to be negative // run fsck and expect a failure with -1 as the error code String outStr = runFsck(conf, -1, true, fileName); diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index b2f9a793636..e7e7ef12193 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -623,6 +623,15 @@ Release 0.23.5 - UNRELEASED MAPREDUCE-4724. job history web ui applications page should be sorted to display last app first (tgraves via bobby) + MAPREDUCE-4746. The MR Application Master does not have a config to set + environment variables (Rob Parker via bobby) + + MAPREDUCE-4729. job history UI not showing all job attempts. (Vinod + Kumar Vavilapalli via jlowe) + + MAPREDUCE-4763 repair test TestUmbilicalProtocolWithJobToken (Ivan A. + Veselovsky via bobby) + Release 0.23.4 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 1191f8d789b..51415ee0874 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -23,14 +23,17 @@ import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -45,6 +48,9 @@ import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent; +import org.apache.hadoop.mapreduce.jobhistory.EventReader; +import org.apache.hadoop.mapreduce.jobhistory.EventType; +import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; @@ -89,6 +95,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.ClusterInfo; import org.apache.hadoop.yarn.SystemClock; @@ -826,16 +833,21 @@ public class MRAppMaster extends CompositeService { @Override public void start() { + amInfos = new LinkedList(); + // Pull completedTasks etc from recovery if (inRecovery) { completedTasksFromPreviousRun = recoveryServ.getCompletedTasks(); amInfos = recoveryServ.getAMInfos(); + } else { + // Get the amInfos anyways irrespective of whether recovery is enabled or + // not IF this is not the first AM generation + if (appAttemptID.getAttemptId() != 1) { + amInfos.addAll(readJustAMInfos()); + } } - // / Create the AMInfo for the current AppMaster - if (amInfos == null) { - amInfos = new LinkedList(); - } + // Current an AMInfo for the current AM generation. AMInfo amInfo = MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost, nmPort, nmHttpPort); @@ -893,6 +905,51 @@ public class MRAppMaster extends CompositeService { startJobs(); } + private List readJustAMInfos() { + List amInfos = new ArrayList(); + FSDataInputStream inputStream = null; + try { + inputStream = + RecoveryService.getPreviousJobHistoryFileStream(getConfig(), + appAttemptID); + EventReader jobHistoryEventReader = new EventReader(inputStream); + + // All AMInfos are contiguous. Track when the first AMStartedEvent + // appears. + boolean amStartedEventsBegan = false; + + HistoryEvent event; + while ((event = jobHistoryEventReader.getNextEvent()) != null) { + if (event.getEventType() == EventType.AM_STARTED) { + if (!amStartedEventsBegan) { + // First AMStartedEvent. + amStartedEventsBegan = true; + } + AMStartedEvent amStartedEvent = (AMStartedEvent) event; + amInfos.add(MRBuilderUtils.newAMInfo( + amStartedEvent.getAppAttemptId(), amStartedEvent.getStartTime(), + amStartedEvent.getContainerId(), + StringInterner.weakIntern(amStartedEvent.getNodeManagerHost()), + amStartedEvent.getNodeManagerPort(), + amStartedEvent.getNodeManagerHttpPort())); + } else if (amStartedEventsBegan) { + // This means AMStartedEvents began and this event is a + // non-AMStarted event. + // No need to continue reading all the other events. + break; + } + } + } catch (IOException e) { + LOG.warn("Could not parse the old history file. " + + "Will not have old AMinfos ", e); + } finally { + if (inputStream != null) { + IOUtils.closeQuietly(inputStream); + } + } + return amInfos; + } + /** * This can be overridden to instantiate multiple jobs and create a * workflow. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java index 5e5699ce0ad..4e36b906acc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java @@ -34,7 +34,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; @@ -178,26 +177,13 @@ public class RecoveryService extends CompositeService implements Recovery { } private void parse() throws IOException { - // TODO: parse history file based on startCount - String jobName = - TypeConverter.fromYarn(applicationAttemptId.getApplicationId()).toString(); - String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(getConfig()); - FSDataInputStream in = null; - Path historyFile = null; - Path histDirPath = FileContext.getFileContext(getConfig()).makeQualified( - new Path(jobhistoryDir)); - FileContext fc = FileContext.getFileContext(histDirPath.toUri(), - getConfig()); - //read the previous history file - historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile( - histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1))); - LOG.info("History file is at " + historyFile); - in = fc.open(historyFile); + FSDataInputStream in = + getPreviousJobHistoryFileStream(getConfig(), applicationAttemptId); JobHistoryParser parser = new JobHistoryParser(in); jobInfo = parser.parse(); Exception parseException = parser.getParseException(); if (parseException != null) { - LOG.info("Got an error parsing job-history file " + historyFile + + LOG.info("Got an error parsing job-history file" + ", ignoring incomplete events.", parseException); } Map taskInfos = jobInfo @@ -213,6 +199,28 @@ public class RecoveryService extends CompositeService implements Recovery { LOG.info("Read completed tasks from history " + completedTasks.size()); } + + public static FSDataInputStream getPreviousJobHistoryFileStream( + Configuration conf, ApplicationAttemptId applicationAttemptId) + throws IOException { + FSDataInputStream in = null; + Path historyFile = null; + String jobName = + TypeConverter.fromYarn(applicationAttemptId.getApplicationId()) + .toString(); + String jobhistoryDir = + JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf); + Path histDirPath = + FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir)); + FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf); + // read the previous history file + historyFile = + fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath, + jobName, (applicationAttemptId.getAttemptId() - 1))); + LOG.info("History file is at " + historyFile); + in = fc.open(historyFile); + return in; + } protected Dispatcher createRecoveryDispatcher() { return new RecoveryDispatcher(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestAMInfos.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestAMInfos.java new file mode 100644 index 00000000000..41387739ec9 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestAMInfos.java @@ -0,0 +1,84 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.app; + +import java.util.Iterator; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskState; +import org.apache.hadoop.mapreduce.v2.app.TestRecovery.MRAppWithHistory; +import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.Task; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.junit.Test; + +public class TestAMInfos { + + @Test + public void testAMInfosWithoutRecoveryEnabled() throws Exception { + int runCount = 0; + MRApp app = + new MRAppWithHistory(1, 0, false, this.getClass().getName(), true, + ++runCount); + Configuration conf = new Configuration(); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + + long am1StartTime = app.getAllAMInfos().get(0).getStartTime(); + + Assert.assertEquals("No of tasks not correct", 1, job.getTasks().size()); + Iterator it = job.getTasks().values().iterator(); + Task mapTask = it.next(); + app.waitForState(mapTask, TaskState.RUNNING); + TaskAttempt taskAttempt = mapTask.getAttempts().values().iterator().next(); + app.waitForState(taskAttempt, TaskAttemptState.RUNNING); + + // stop the app + app.stop(); + + // rerun + app = + new MRAppWithHistory(1, 0, false, this.getClass().getName(), false, + ++runCount); + conf = new Configuration(); + // in rerun the AMInfo will be recovered from previous run even if recovery + // is not enabled. + conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("No of tasks not correct", 1, job.getTasks().size()); + it = job.getTasks().values().iterator(); + mapTask = it.next(); + // There should be two AMInfos + List amInfos = app.getAllAMInfos(); + Assert.assertEquals(2, amInfos.size()); + AMInfo amInfoOne = amInfos.get(0); + Assert.assertEquals(am1StartTime, amInfoOne.getStartTime()); + app.stop(); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 0427aa3bd81..cf865cf8720 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -486,6 +486,9 @@ public interface MRJobConfig { public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD = 50; + public static final String MR_AM_ENV = + MR_AM_PREFIX + "env"; + public static final String MAPRED_MAP_ADMIN_JAVA_OPTS = "mapreduce.admin.map.child.java.opts"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 92aeba6ad8e..6979855adbd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -839,6 +839,33 @@ mapreduce.job.end-notification.max.retry.interval + + yarn.app.mapreduce.am.env + + User added environment variables for the MR App Master + processes. Example : + 1) A=foo This will set the env variable A to foo + 2) B=$B:c This is inherit tasktracker's B env variable. + + + + + yarn.app.mapreduce.am.command-opts + -Xmx1024m + Java opts for the MR App Master processes. + The following symbol, if present, will be interpolated: @taskid@ is replaced + by current TaskID. Any other occurrences of '@' will go unchanged. + For example, to enable verbose gc logging to a file named for the taskid in + /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: + -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc + + Usage of -Djava.library.path can cause programs to no longer function if + hadoop native libraries are used. These values should instead be set as part + of LD_LIBRARY_PATH in the map / reduce JVM env using the mapreduce.map.env and + mapreduce.reduce.env config settings. + + + yarn.app.mapreduce.am.job.task.listener.thread-count 30 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index b5008ee6181..834959967f8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -397,7 +397,7 @@ public class YARNRunner implements ClientProtocol { Vector vargsFinal = new Vector(8); - // Final commmand + // Final command StringBuilder mergedCommand = new StringBuilder(); for (CharSequence str : vargs) { mergedCommand.append(str).append(" "); @@ -411,6 +411,10 @@ public class YARNRunner implements ClientProtocol { // i.e. add { Hadoop jars, job jar, CWD } to classpath. Map environment = new HashMap(); MRApps.setClasspath(environment, conf); + + // Setup the environment variables (LD_LIBRARY_PATH, etc) + MRApps.setEnvFromInputString(environment, + conf.get(MRJobConfig.MR_AM_ENV)); // Parse distributed cache MRApps.setupDistributedCache(jobConf, localResources); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java index 4736d1935b2..8b3ba3a3aa6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java @@ -51,11 +51,14 @@ import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.log4j.Level; -import org.junit.Ignore; import org.junit.Test; -/** Unit tests for using Job Token over RPC. */ -@Ignore +/** Unit tests for using Job Token over RPC. + * + * System properties required: + * -Djava.security.krb5.conf=.../hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/target/test-classes/krb5.conf + * -Djava.net.preferIPv4Stack=true + */ public class TestUmbilicalProtocolWithJobToken { private static final String ADDRESS = "0.0.0.0";