Merge r1237584 through r1239397 from 0.23.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1239398 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-02-01 23:24:15 +00:00
commit fe9369fb3c
83 changed files with 1567 additions and 372 deletions

299
hadoop-client/pom.xml Normal file
View File

@ -0,0 +1,299 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-project</artifactId>
<version>0.23.1-SNAPSHOT</version>
<relativePath>../hadoop-project</relativePath>
</parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>0.23.1-SNAPSHOT</version>
<packaging>jar</packaging>
<description>Apache Hadoop Client</description>
<name>Apache Hadoop Client</name>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
<exclusion>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging-api</artifactId>
</exclusion>
<exclusion>
<groupId>jetty</groupId>
<artifactId>org.mortbay.jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-api-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api-2.5</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jdt</groupId>
<artifactId>core</artifactId>
</exclusion>
<exclusion>
<groupId>org.aspectj</groupId>
<artifactId>aspectjrt</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
</exclusion>
<exclusion>
<groupId>net.sf.kosmosfs</groupId>
<artifactId>kfs</artifactId>
</exclusion>
<exclusion>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>commons-daemon</groupId>
<artifactId>commons-daemon</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-app</artifactId>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-nodemanager</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
</exclusion>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
<exclusion>
<groupId>com.cenqua.clover</groupId>
<artifactId>clover</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey.jersey-test-framework</groupId>
<artifactId>jersey-test-framework-grizzly2</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-guice</artifactId>
</exclusion>
<exclusion>
<groupId>com.cenqua.clover</groupId>
<artifactId>clover</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey.jersey-test-framework</groupId>
<artifactId>jersey-test-framework-grizzly2</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-guice</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
</exclusion>
<exclusion>
<groupId>com.cenqua.clover</groupId>
<artifactId>clover</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
</exclusion>
<exclusion>
<groupId>com.cenqua.clover</groupId>
<artifactId>clover</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

View File

@ -150,9 +150,17 @@ Release 0.23.1 - Unreleased
HADOOP-8002. SecurityUtil acquired token message should be a debug rather than info.
(Arpit Gupta via mahadev)
HADOOP-8009. Create hadoop-client and hadoop-minicluster artifacts for downstream
projects. (tucu)
HADOOP-7470. Move up to Jackson 1.8.8. (Enis Soztutar via szetszwo)
OPTIMIZATIONS
BUG FIXES
HADOOP-8006 TestFSInputChecker is failing in trunk.
(Daryn Sharp via bobby)
HADOOP-7998. CheckFileSystem does not correctly honor setVerifyChecksum
(Daryn Sharp via bobby)
@ -247,6 +255,9 @@ Release 0.23.1 - Unreleased
HADOOP-8000. fetchdt command not available in bin/hadoop.
(Arpit Gupta via mahadev)
HADOOP-7999. "hadoop archive" fails with ClassNotFoundException.
(Jason Lowe via mahadev)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -20,6 +20,16 @@
# Resolve links ($0 may be a softlink) and convert a relative path
# to an absolute path. NB: The -P option requires bash built-ins
# or POSIX:2001 compliant cd and pwd.
# HADOOP_CLASSPATH Extra Java CLASSPATH entries.
#
# HADOOP_USER_CLASSPATH_FIRST When defined, the HADOOP_CLASSPATH is
# added in the beginning of the global
# classpath. Can be defined, for example,
# by doing
# export HADOOP_USER_CLASSPATH_FIRST=true
#
this="${BASH_SOURCE-$0}"
common_bin=$(cd -P -- "$(dirname -- "$this")" && pwd -P)
script="$(basename -- "$this")"
@ -153,6 +163,10 @@ fi
# CLASSPATH initially contains $HADOOP_CONF_DIR
CLASSPATH="${HADOOP_CONF_DIR}"
if [ "$HADOOP_USER_CLASSPATH_FIRST" != "" ] && [ "$HADOOP_CLASSPATH" != "" ] ; then
CLASSPATH=${CLASSPATH}:${HADOOP_CLASSPATH}
fi
# so that filenames w/ spaces are handled correctly in loops below
IFS=
@ -174,7 +188,7 @@ fi
CLASSPATH=${CLASSPATH}:$HADOOP_COMMON_HOME/$HADOOP_COMMON_DIR'/*'
# add user-specified CLASSPATH last
if [ "$HADOOP_CLASSPATH" != "" ]; then
if [ "$HADOOP_USER_CLASSPATH_FIRST" = "" ] && [ "$HADOOP_CLASSPATH" != "" ]; then
CLASSPATH=${CLASSPATH}:${HADOOP_CLASSPATH}
fi
@ -214,6 +228,9 @@ if [ -d "${HADOOP_PREFIX}/build/native" -o -d "${HADOOP_PREFIX}/$HADOOP_COMMON_L
fi
fi
# setup a default TOOL_PATH
TOOL_PATH="${TOOL_PATH:-$HADOOP_PREFIX/share/hadoop/tools/lib/*}"
# cygwin path translation
if $cygwin; then
JAVA_LIBRARY_PATH=`cygpath -p "$JAVA_LIBRARY_PATH"`

View File

@ -119,7 +119,6 @@ private static class ChecksumFSInputChecker extends FSInputChecker {
private static final int HEADER_LENGTH = 8;
private int bytesPerSum = 1;
private long fileLen = -1L;
public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file)
throws IOException {
@ -244,6 +243,24 @@ protected int readChunk(long pos, byte[] buf, int offset, int len,
}
return nread;
}
}
private static class FSDataBoundedInputStream extends FSDataInputStream {
private FileSystem fs;
private Path file;
private long fileLen = -1L;
FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in)
throws IOException {
super(in);
this.fs = fs;
this.file = file;
}
@Override
public boolean markSupported() {
return false;
}
/* Return the file length */
private long getFileLength() throws IOException {
@ -304,9 +321,16 @@ public synchronized void seek(long pos) throws IOException {
*/
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return verifyChecksum
? new FSDataInputStream(new ChecksumFSInputChecker(this, f, bufferSize))
: getRawFileSystem().open(f, bufferSize);
FileSystem fs;
InputStream in;
if (verifyChecksum) {
fs = this;
in = new ChecksumFSInputChecker(this, f, bufferSize);
} else {
fs = getRawFileSystem();
in = fs.open(f, bufferSize);
}
return new FSDataBoundedInputStream(fs, f, in);
}
/** {@inheritDoc} */

View File

@ -228,6 +228,9 @@ Release 0.23.1 - UNRELEASED
HDFS-2826. Add test case for HDFS-1476 (safemode can initialize
replication queues before exiting) (todd)
HDFS-2864. Remove some redundant methods and the constant METADATA_VERSION
from FSDataset. (szetszwo)
BUG FIXES
HDFS-2541. For a sufficiently large value of blocks, the DN Scanner
@ -303,6 +306,13 @@ Release 0.23.1 - UNRELEASED
HDFS-2791. If block report races with closing of file, replica is
incorrectly marked corrupt. (todd)
HDFS-2827. When the parent of a directory is the root, renaming the
directory results in leases updated incorrectly. (Uma Maheswara Rao G
via szetszwo)
HDFS-2835. Fix findbugs and javadoc issue with GetConf.java.
(suresh)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -31,13 +31,12 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.FSDataset;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.token.Token;
@ -183,7 +182,7 @@ static BlockReaderLocal newBlockReader(Configuration conf, String file,
BlockMetadataHeader header = BlockMetadataHeader
.readHeader(new DataInputStream(checksumIn));
short version = header.getVersion();
if (version != FSDataset.METADATA_VERSION) {
if (version != BlockMetadataHeader.VERSION) {
LOG.warn("Wrong version (" + version + ") for metadata file for "
+ blk + " ignoring ...");
}

View File

@ -42,7 +42,7 @@
@InterfaceStability.Evolving
public class BlockMetadataHeader {
static final short METADATA_VERSION = FSDataset.METADATA_VERSION;
public static final short VERSION = 1;
/**
* Header includes everything except the checksum(s) themselves.
@ -138,7 +138,7 @@ private static void writeHeader(DataOutputStream out,
*/
static void writeHeader(DataOutputStream out, DataChecksum checksum)
throws IOException {
writeHeader(out, new BlockMetadataHeader(METADATA_VERSION, checksum));
writeHeader(out, new BlockMetadataHeader(VERSION, checksum));
}
/**

View File

@ -425,9 +425,8 @@ private void verifyBlock(ExtendedBlock block) {
updateScanStatus(block.getLocalBlock(), ScanType.VERIFICATION_SCAN, false);
// If the block does not exists anymore, then its not an error
if ( dataset.getFile(block.getBlockPoolId(), block.getLocalBlock()) == null ) {
LOG.info("Verification failed for " + block + ". Its ok since " +
"it not in datanode dataset anymore.");
if (!dataset.contains(block)) {
LOG.info(block + " is no longer in the dataset.");
deleteBlock(block.getLocalBlock());
return;
}

View File

@ -226,7 +226,7 @@ class BlockSender implements java.io.Closeable {
// read and handle the common header here. For now just a version
BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
short version = header.getVersion();
if (version != FSDataset.METADATA_VERSION) {
if (version != BlockMetadataHeader.VERSION) {
LOG.warn("Wrong version (" + version + ") for metadata file for "
+ block + " ignoring ...");
}

View File

@ -470,7 +470,7 @@ private long validateIntegrity(File blockFile, long genStamp) {
// read and handle the common header here. For now just a version
BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
short version = header.getVersion();
if (version != FSDataset.METADATA_VERSION) {
if (version != BlockMetadataHeader.VERSION) {
DataNode.LOG.warn("Wrong version (" + version + ") for metadata file "
+ metaFile + " ignoring ...");
}
@ -945,8 +945,7 @@ private void shutdown() {
//////////////////////////////////////////////////////
//Find better place?
public static final String METADATA_EXTENSION = ".meta";
public static final short METADATA_VERSION = 1;
static final String METADATA_EXTENSION = ".meta";
static final String UNLINK_BLOCK_SUFFIX = ".unlinked";
private static boolean isUnlinkTmpFile(File f) {
@ -1031,15 +1030,10 @@ private static long parseGenerationStamp(File blockFile, File metaFile
}
}
/** Return the block file for the given ID */
public File findBlockFile(String bpid, long blockId) {
return getFile(bpid, blockId);
}
@Override // FSDatasetInterface
public synchronized Block getStoredBlock(String bpid, long blkid)
throws IOException {
File blockfile = findBlockFile(bpid, blkid);
File blockfile = getFile(bpid, blkid);
if (blockfile == null) {
return null;
}
@ -1259,8 +1253,7 @@ public File getBlockFile(ExtendedBlock b) throws IOException {
/**
* Get File name for a given block.
*/
public File getBlockFile(String bpid, Block b)
throws IOException {
File getBlockFile(String bpid, Block b) throws IOException {
File f = validateBlockFile(bpid, b);
if(f == null) {
if (DataNode.LOG.isDebugEnabled()) {
@ -1291,7 +1284,10 @@ public InputStream getBlockInputStream(ExtendedBlock b)
*/
private File getBlockFileNoExistsCheck(ExtendedBlock b)
throws IOException {
File f = getFile(b.getBlockPoolId(), b.getLocalBlock());
final File f;
synchronized(this) {
f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId());
}
if (f == null) {
throw new IOException("Block " + b + " is not valid");
}
@ -2021,7 +2017,10 @@ private boolean isValid(final ExtendedBlock b, final ReplicaState state) {
*/
File validateBlockFile(String bpid, Block b) throws IOException {
//Should we check for metadata file too?
File f = getFile(bpid, b);
final File f;
synchronized(this) {
f = getFile(bpid, b.getBlockId());
}
if(f != null ) {
if(f.exists())
@ -2071,7 +2070,7 @@ public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
File f = null;
FSVolume v;
synchronized (this) {
f = getFile(bpid, invalidBlks[i]);
f = getFile(bpid, invalidBlks[i].getBlockId());
ReplicaInfo dinfo = volumeMap.get(bpid, invalidBlks[i]);
if (dinfo == null ||
dinfo.getGenerationStamp() != invalidBlks[i].getGenerationStamp()) {
@ -2130,11 +2129,10 @@ public void notifyNamenodeDeletedBlock(ExtendedBlock block){
datanode.notifyNamenodeDeletedBlock(block);
}
/**
* Turn the block identifier into a filename; ignore generation stamp!!!
*/
public synchronized File getFile(String bpid, Block b) {
return getFile(bpid, b.getBlockId());
@Override // {@link FSDatasetInterface}
public synchronized boolean contains(final ExtendedBlock block) {
final long blockId = block.getLocalBlock().getBlockId();
return getFile(block.getBlockPoolId(), blockId) != null;
}
/**
@ -2143,7 +2141,7 @@ public synchronized File getFile(String bpid, Block b) {
* @param blockId a block's id
* @return on disk data file path; null if the replica does not exist
*/
private File getFile(String bpid, long blockId) {
File getFile(final String bpid, final long blockId) {
ReplicaInfo info = volumeMap.get(bpid, blockId);
if (info != null) {
return info.getBlockFile();

View File

@ -19,7 +19,6 @@
import java.io.Closeable;
import java.io.File;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
@ -27,13 +26,13 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@ -303,6 +302,9 @@ public void recoverClose(ExtendedBlock b,
*/
public BlockListAsLongs getBlockReport(String bpid);
/** Does the dataset contain the block? */
public boolean contains(ExtendedBlock block);
/**
* Is the block valid?
* @param b

View File

@ -3857,7 +3857,12 @@ void unprotectedChangeLease(String src, String dst, HdfsFileStatus dinfo) {
if (destinationExisted && dinfo.isDir()) {
Path spath = new Path(src);
overwrite = spath.getParent().toString() + Path.SEPARATOR;
Path parent = spath.getParent();
if (isRoot(parent)) {
overwrite = parent.toString();
} else {
overwrite = parent.toString() + Path.SEPARATOR;
}
replaceBy = dst + Path.SEPARATOR;
} else {
overwrite = src;
@ -3867,6 +3872,10 @@ void unprotectedChangeLease(String src, String dst, HdfsFileStatus dinfo) {
leaseManager.changeLease(src, dst, overwrite, replaceBy);
}
private boolean isRoot(Path path) {
return path.getParent() == null;
}
/**
* Serializes leases.
*/

View File

@ -21,7 +21,9 @@
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
@ -47,6 +49,9 @@
* {@link GetConf.Command}.
*
* See {@link GetConf.Command#NAMENODE} for example.
*
* Add for the new option added, a map entry with the corresponding
* {@link GetConf.CommandHandler}.
* </ul>
*/
public class GetConf extends Configured implements Tool {
@ -54,31 +59,40 @@ public class GetConf extends Configured implements Tool {
+ "getting configuration information from the config file.\n";
enum Command {
NAMENODE("-namenodes", new NameNodesCommandHandler(),
"gets list of namenodes in the cluster."),
SECONDARY("-secondaryNameNodes", new SecondaryNameNodesCommandHandler(),
NAMENODE("-namenodes", "gets list of namenodes in the cluster."),
SECONDARY("-secondaryNameNodes",
"gets list of secondary namenodes in the cluster."),
BACKUP("-backupNodes", new BackupNodesCommandHandler(),
"gets list of backup nodes in the cluster."),
BACKUP("-backupNodes", "gets list of backup nodes in the cluster."),
INCLUDE_FILE("-includeFile",
new CommandHandler("DFSConfigKeys.DFS_HOSTS"),
"gets the include file path that defines the datanodes " +
"that can join the cluster."),
EXCLUDE_FILE("-excludeFile",
new CommandHandler("DFSConfigKeys.DFS_HOSTS_EXCLUDE"),
"gets the exclude file path that defines the datanodes " +
"that need to decommissioned."),
NNRPCADDRESSES("-nnRpcAddresses",
new NNRpcAddressesCommandHandler(),
"gets the namenode rpc addresses");
NNRPCADDRESSES("-nnRpcAddresses", "gets the namenode rpc addresses");
private static Map<String, CommandHandler> map;
static {
map = new HashMap<String, CommandHandler>();
map.put(NAMENODE.getName().toLowerCase(),
new NameNodesCommandHandler());
map.put(SECONDARY.getName().toLowerCase(),
new SecondaryNameNodesCommandHandler());
map.put(BACKUP.getName().toLowerCase(),
new BackupNodesCommandHandler());
map.put(INCLUDE_FILE.getName().toLowerCase(),
new CommandHandler("DFSConfigKeys.DFS_HOSTS"));
map.put(EXCLUDE_FILE.getName().toLowerCase(),
new CommandHandler("DFSConfigKeys.DFS_HOSTS_EXCLUDE"));
map.put(NNRPCADDRESSES.getName().toLowerCase(),
new NNRpcAddressesCommandHandler());
}
private final String cmd;
private final CommandHandler handler;
private final String description;
Command(String cmd, CommandHandler handler, String description) {
Command(String cmd, String description) {
this.cmd = cmd;
this.handler = handler;
this.description = description;
}
@ -91,12 +105,7 @@ public String getDescription() {
}
public static CommandHandler getHandler(String name) {
for (Command cmd : values()) {
if (cmd.getName().equalsIgnoreCase(name)) {
return cmd.handler;
}
}
return null;
return map.get(name.toLowerCase());
}
}

View File

@ -45,6 +45,7 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.FSDataset;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.IOUtils;
@ -1165,7 +1166,7 @@ static List<File> getBlockFiles(MiniDFSCluster cluster) throws IOException {
for(int i = 0; i < blocks.length; i++) {
FSDataset ds = (FSDataset)datanodes.get(i).getFSDataset();
for(Block b : blocks[i]) {
files.add(ds.getBlockFile(poolId, b));
files.add(DataNodeTestUtils.getBlockFile(ds, poolId, b.getBlockId()));
}
}
return files;

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.FSDataset;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
@ -139,7 +140,8 @@ public void testCopyOnWrite() throws IOException {
//
for (int i = 0; i < blocks.size(); i = i + 2) {
ExtendedBlock b = blocks.get(i).getBlock();
File f = dataset.getFile(b.getBlockPoolId(), b.getLocalBlock());
final File f = DataNodeTestUtils.getBlockFile(dataset,
b.getBlockPoolId(), b.getLocalBlock().getBlockId());
File link = new File(f.toString() + ".link");
System.out.println("Creating hardlink for File " + f + " to " + link);
HardLink.createHardLink(f, link);

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.FSDataset;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@ -831,7 +832,8 @@ public void testLeaseExpireHardLimit() throws Exception {
FSDataset dataset = (FSDataset)datanode.data;
ExtendedBlock blk = locatedblock.getBlock();
Block b = dataset.getStoredBlock(blk.getBlockPoolId(), blk.getBlockId());
File blockfile = dataset.findBlockFile(blk.getBlockPoolId(), b.getBlockId());
final File blockfile = DataNodeTestUtils.getBlockFile(dataset,
blk.getBlockPoolId(), b.getBlockId());
System.out.println("blockfile=" + blockfile);
if (blockfile != null) {
BufferedReader in = new BufferedReader(new FileReader(blockfile));

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -37,5 +38,8 @@ public class DataNodeTestUtils {
getDNRegistrationForBP(DataNode dn, String bpid) throws IOException {
return dn.getDNRegistrationForBP(bpid);
}
public static File getBlockFile(FSDataset fsdataset, String bpid, long bid) {
return fsdataset.getFile(bpid, bid);
}
}

View File

@ -81,8 +81,8 @@ public class SimulatedFSDataset implements FSDatasetInterface, Configurable{
CHECKSUM_NULL, 16*1024 );
byte[] nullCrcHeader = checksum.getHeader();
nullCrcFileData = new byte[2 + nullCrcHeader.length];
nullCrcFileData[0] = (byte) ((FSDataset.METADATA_VERSION >>> 8) & 0xff);
nullCrcFileData[1] = (byte) (FSDataset.METADATA_VERSION & 0xff);
nullCrcFileData[0] = (byte) ((BlockMetadataHeader.VERSION >>> 8) & 0xff);
nullCrcFileData[1] = (byte) (BlockMetadataHeader.VERSION & 0xff);
for (int i = 0; i < nullCrcHeader.length; i++) {
nullCrcFileData[i+2] = nullCrcHeader[i];
}
@ -390,9 +390,7 @@ public synchronized void injectBlocks(String bpid,
Iterable<Block> injectBlocks) throws IOException {
ExtendedBlock blk = new ExtendedBlock();
if (injectBlocks != null) {
int numInjectedBlocks = 0;
for (Block b: injectBlocks) { // if any blocks in list is bad, reject list
numInjectedBlocks++;
if (b == null) {
throw new NullPointerException("Null blocks in block list");
}
@ -555,31 +553,27 @@ public synchronized void invalidate(String bpid, Block[] invalidBlks)
}
}
private BInfo getBInfo(final ExtendedBlock b) {
final Map<Block, BInfo> map = blockMap.get(b.getBlockPoolId());
return map == null? null: map.get(b.getLocalBlock());
}
@Override // {@link FSDatasetInterface}
public boolean contains(ExtendedBlock block) {
return getBInfo(block) != null;
}
@Override // FSDatasetInterface
public synchronized boolean isValidBlock(ExtendedBlock b) {
final Map<Block, BInfo> map = blockMap.get(b.getBlockPoolId());
if (map == null) {
return false;
}
BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) {
return false;
}
return binfo.isFinalized();
final BInfo binfo = getBInfo(b);
return binfo != null && binfo.isFinalized();
}
/* check if a block is created but not finalized */
@Override
public synchronized boolean isValidRbw(ExtendedBlock b) {
final Map<Block, BInfo> map = blockMap.get(b.getBlockPoolId());
if (map == null) {
return false;
}
BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) {
return false;
}
return !binfo.isFinalized();
final BInfo binfo = getBInfo(b);
return binfo != null && !binfo.isFinalized();
}
@Override

View File

@ -352,7 +352,7 @@ private void verifyAddition(long blockId, long genStamp, long size) {
// Added block has the same file as the one created by the test
File file = new File(getBlockFile(blockId));
assertEquals(file.getName(), fds.findBlockFile(bpid, blockId).getName());
assertEquals(file.getName(), fds.getFile(bpid, blockId).getName());
// Generation stamp is same as that of created file
assertEquals(genStamp, replicainfo.getGenerationStamp());

View File

@ -101,7 +101,7 @@ public void testGetMetaData() throws IOException {
InputStream metaInput = fsdataset.getMetaDataInputStream(b);
DataInputStream metaDataInput = new DataInputStream(metaInput);
short version = metaDataInput.readShort();
assertEquals(FSDataset.METADATA_VERSION, version);
assertEquals(BlockMetadataHeader.VERSION, version);
DataChecksum checksum = DataChecksum.newDataChecksum(metaDataInput);
assertEquals(DataChecksum.CHECKSUM_NULL, checksum.getChecksumType());
assertEquals(0, checksum.getChecksumSize());

View File

@ -18,9 +18,10 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doAnswer;
@ -29,21 +30,25 @@
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Level;
import org.junit.Test;
import org.mockito.Mockito;
@ -479,6 +484,34 @@ public void testTxIdPersistence() throws Exception {
}
}
/**
* Test for save namespace should succeed when parent directory renamed with
* open lease and destination directory exist.
* This test is a regression for HDFS-2827
*/
@Test
public void testSaveNamespaceWithRenamedLease() throws Exception {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(new Configuration())
.numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
OutputStream out = null;
try {
fs.mkdirs(new Path("/test-target"));
out = fs.create(new Path("/test-source/foo")); // don't close
fs.rename(new Path("/test-source/"), new Path("/test-target/"));
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
cluster.getNameNodeRpc().saveNamespace();
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
} finally {
IOUtils.cleanup(LOG, out, fs);
if (cluster != null) {
cluster.shutdown();
}
}
}
private void doAnEdit(FSNamesystem fsn, int id) throws IOException {
// Make an edit
fsn.mkdirs(

View File

@ -164,6 +164,15 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3732. Modified CapacityScheduler to use only users with pending
requests for computing user-limits. (Arun C Murthy via vinodkv)
MAPREDUCE-3679. AM logs and others should not automatically refresh after every 1
second. (Vinod KV via mahadev)
MAPREDUCE-3754. Modified RM UI to filter applications based on state of the
applications. (vinodkv)
MAPREDUCE-3774. Moved yarn-default.xml to hadoop-yarn-common from
hadoop-server-common. (Mahadev Konar via vinodkv)
OPTIMIZATIONS
MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar
@ -193,6 +202,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3360. Added information about lost/rebooted/decommissioned nodes
on the webapps. (Bhallamudi Venkata Siva Kamesh and Jason Lowe via vinodkv)
MAPREDUCE-3756. Made single shuffle limit configurable. (Hitesh Shah via
acmurthy)
BUG FIXES
MAPREDUCE-2784. [Gridmix] Bug fixes in ExecutionSummarizer and
ResourceUsageMatcher. (amarrk)
@ -581,6 +593,34 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3742. "yarn logs" command fails with ClassNotFoundException.
(Jason Lowe via mahadev)
MAPREDUCE-3703. ResourceManager should provide node lists in JMX output.
(Eric Payne via mahadev)
MAPREDUCE-3716. Fixing YARN+MR to allow MR jobs to be able to use
java.io.File.createTempFile to create temporary files as part of their
tasks. (Jonathan Eagles via vinodkv)
MAPREDUCE-3748. Changed a log in CapacityScheduler.nodeUpdate to debug.
(ramya via acmurthy)
MAPREDUCE-3764. Fixed resource usage metrics for queues and users.
(acmurthy)
MAPREDUCE-3749. ConcurrentModificationException in counter groups.
(tomwhite)
MAPREDUCE-3762. Fixed default CapacityScheduler configs. (mahadev via
acmurthy)
MAPREDUCE-3499. New MiniMR does not setup proxyuser configuration
correctly, thus tests using doAs do not work. (johnvijoe via tucu)
MAPREDUCE-3696. MR job via oozie does not work on hadoop 23.
(John George via mahadev)
MAPREDUCE-3427. Fix streaming unit tests broken after mavenization.
(Hitesh Shah via acmurthy)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Apps;
@SuppressWarnings("deprecation")
@ -201,7 +202,8 @@ public static List<String> getVMCommand(
vargs.add(javaOptsSplit[i]);
}
String childTmpDir = Environment.PWD.$() + Path.SEPARATOR + "tmp";
Path childTmpDir = new Path(Environment.PWD.$(),
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
vargs.add("-Djava.io.tmpdir=" + childTmpDir);
// Setup the log4j prop

View File

@ -33,7 +33,6 @@ public class AppView extends TwoColumnLayout {
}
protected void commonPreHead(Page.HTML<_> html) {
html.meta_http("refresh", "10");
set(ACCORDION_ID, "nav");
set(initID(ACCORDION, "nav"), "{autoHeight:false, active:1}");
set(THEMESWITCHER_ID, "themeswitcher");

View File

@ -27,6 +27,10 @@ public class CountersPage extends AppView {
@Override protected void preHead(Page.HTML<_> html) {
commonPreHead(html);
// Counters page is a summary. Helps to refresh automatically.
html.meta_http("refresh", "10");
String tid = $(TASK_ID);
String activeNav = "3";
if(tid == null || tid.isEmpty()) {

View File

@ -32,6 +32,10 @@ public class JobPage extends AppView {
set(TITLE, jobID.isEmpty() ? "Bad request: missing job ID"
: join("MapReduce Job ", $(JOB_ID)));
commonPreHead(html);
// This is a job-summary page. Helps to refresh automatically.
html.meta_http("refresh", "10");
set(initID(ACCORDION, "nav"), "{autoHeight:false, active:2}");
}

View File

@ -116,6 +116,11 @@ protected Collection<TaskAttempt> getTaskAttempts() {
@Override protected void preHead(Page.HTML<_> html) {
commonPreHead(html);
// This page is a list of all attempts which are limited in number. Okay to
// refresh automatically.
html.meta_http("refresh", "10");
set(initID(ACCORDION, "nav"), "{autoHeight:false, active:3}");
set(DATATABLES_ID, "attempts");
set(initID(DATATABLES, "attempts"), attemptsTableInit());

View File

@ -227,15 +227,23 @@ private static void setMRFrameworkClasspath(
public static void setClasspath(Map<String, String> environment,
Configuration conf) throws IOException {
boolean userClassesTakesPrecedence =
conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);
if (!userClassesTakesPrecedence) {
MRApps.setMRFrameworkClasspath(environment, conf);
}
Apps.addToEnvironment(
environment,
Environment.CLASSPATH.name(),
environment,
Environment.CLASSPATH.name(),
MRJobConfig.JOB_JAR);
Apps.addToEnvironment(
environment,
environment,
Environment.CLASSPATH.name(),
Environment.PWD.$() + Path.SEPARATOR + "*");
MRApps.setMRFrameworkClasspath(environment, conf);
if (userClassesTakesPrecedence) {
MRApps.setMRFrameworkClasspath(environment, conf);
}
}
private static final String STAGING_CONSTANT = ".staging";

View File

@ -130,13 +130,43 @@ public class TestMRApps {
Job job = Job.getInstance();
Map<String, String> environment = new HashMap<String, String>();
MRApps.setClasspath(environment, job.getConfiguration());
assertEquals("job.jar:$PWD/*:$HADOOP_CONF_DIR:" +
assertEquals("$HADOOP_CONF_DIR:" +
"$HADOOP_COMMON_HOME/share/hadoop/common/*:" +
"$HADOOP_COMMON_HOME/share/hadoop/common/lib/*:" +
"$HADOOP_HDFS_HOME/share/hadoop/hdfs/*:" +
"$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*:" +
"$YARN_HOME/share/hadoop/mapreduce/*:" +
"$YARN_HOME/share/hadoop/mapreduce/lib/*",
"$YARN_HOME/share/hadoop/mapreduce/lib/*:" +
"job.jar:$PWD/*",
environment.get("CLASSPATH"));
}
@Test public void testSetClasspathWithUserPrecendence() {
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
Map<String, String> env = new HashMap<String, String>();
try {
MRApps.setClasspath(env, conf);
} catch (Exception e) {
fail("Got exception while setting classpath");
}
String env_str = env.get("CLASSPATH");
assertSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST set, but not taking effect!",
env_str.indexOf("job.jar"), 0);
}
@Test public void testSetClasspathWithNoUserPrecendence() {
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);
Map<String, String> env = new HashMap<String, String>();
try {
MRApps.setClasspath(env, conf);
} catch (Exception e) {
fail("Got exception while setting classpath");
}
String env_str = env.get("CLASSPATH");
assertNotSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking effect!",
env_str.indexOf("job.jar"), 0);
}
}

View File

@ -147,7 +147,7 @@ public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
*/
private boolean getDelegationTokenCalled = false;
/* notes the renewer that will renew the delegation token */
private Text dtRenewer = null;
private String dtRenewer = null;
/* do we need a HS delegation token for this client */
static final String HS_DELEGATION_TOKEN_REQUIRED
= "mapreduce.history.server.delegationtoken.required";
@ -600,7 +600,7 @@ public RunningJob submitJob(final JobConf conf) throws FileNotFoundException,
if (getDelegationTokenCalled) {
conf.setBoolean(HS_DELEGATION_TOKEN_REQUIRED, getDelegationTokenCalled);
getDelegationTokenCalled = false;
conf.set(HS_DELEGATION_TOKEN_RENEWER, dtRenewer.toString());
conf.set(HS_DELEGATION_TOKEN_RENEWER, dtRenewer);
dtRenewer = null;
}
Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
@ -1202,7 +1202,7 @@ public org.apache.hadoop.mapreduce.QueueAclsInfo[] run()
public Token<DelegationTokenIdentifier>
getDelegationToken(final Text renewer) throws IOException, InterruptedException {
getDelegationTokenCalled = true;
dtRenewer = renewer;
dtRenewer = renewer.toString();
return clientUgi.doAs(new
PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
public Token<DelegationTokenIdentifier> run() throws IOException,

View File

@ -118,6 +118,8 @@ public interface MRJobConfig {
public static final String USER_LOG_RETAIN_HOURS = "mapreduce.job.userlog.retain.hours";
public static final String MAPREDUCE_JOB_USER_CLASSPATH_FIRST = "mapreduce.job.user.classpath.first";
public static final String IO_SORT_FACTOR = "mapreduce.task.io.sort.factor";
public static final String IO_SORT_MB = "mapreduce.task.io.sort.mb";
@ -228,7 +230,10 @@ public interface MRJobConfig {
public static final String SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent";
public static final String SHUFFLE_MERGE_EPRCENT = "mapreduce.reduce.shuffle.merge.percent";
public static final String SHUFFLE_MEMORY_LIMIT_PERCENT
= "mapreduce.reduce.shuffle.memory.limit.percent";
public static final String SHUFFLE_MERGE_PERCENT = "mapreduce.reduce.shuffle.merge.percent";
public static final String REDUCE_FAILURES_MAXPERCENT = "mapreduce.reduce.failures.maxpercent";

View File

@ -24,6 +24,7 @@
import java.util.Iterator;
import java.util.Map;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
@ -56,7 +57,7 @@ public AbstractCounterGroup(String name, String displayName,
}
@Override
public synchronized String getName() {
public String getName() {
return name;
}
@ -95,7 +96,7 @@ private T addCounterImpl(String name, String displayName, long value) {
}
@Override
public T findCounter(String counterName, String displayName) {
public synchronized T findCounter(String counterName, String displayName) {
String saveName = limits.filterCounterName(counterName);
T counter = findCounterImpl(saveName, false);
if (counter == null) {
@ -109,7 +110,7 @@ public synchronized T findCounter(String counterName, boolean create) {
return findCounterImpl(limits.filterCounterName(counterName), create);
}
private T findCounterImpl(String counterName, boolean create) {
private synchronized T findCounterImpl(String counterName, boolean create) {
T counter = counters.get(counterName);
if (counter == null && create) {
String localized =
@ -142,7 +143,7 @@ protected abstract T newCounter(String counterName, String displayName,
@Override
public synchronized Iterator<T> iterator() {
return counters.values().iterator();
return ImmutableSet.copyOf(counters.values()).iterator();
}
/**

View File

@ -24,6 +24,7 @@
import java.util.Iterator;
import java.util.Map;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
@ -179,13 +180,14 @@ public synchronized C findCounter(String scheme, FileSystemCounter key) {
* @return Set of counter names.
*/
public synchronized Iterable<String> getGroupNames() {
return Iterables.concat(fgroups.keySet(), groups.keySet());
return Iterables.concat(ImmutableSet.copyOf(fgroups.keySet()),
ImmutableSet.copyOf(groups.keySet()));
}
@Override
public Iterator<G> iterator() {
return Iterators.concat(fgroups.values().iterator(),
groups.values().iterator());
public synchronized Iterator<G> iterator() {
return Iterators.concat(ImmutableSet.copyOf(fgroups.values()).iterator(),
ImmutableSet.copyOf(groups.values()).iterator());
}
/**

View File

@ -46,7 +46,6 @@
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.IFile.Reader;
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.mapred.Merger.Segment;
@ -68,7 +67,8 @@ public class MergeManager<K, V> {
/* Maximum percentage of the in-memory limit that a single shuffle can
* consume*/
private static final float MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION = 0.25f;
private static final float DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT
= 0.25f;
private final TaskAttemptID reduceId;
@ -169,12 +169,22 @@ public MergeManager(TaskAttemptID reduceId, JobConf jobConf,
this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100);
final float singleShuffleMemoryLimitPercent =
jobConf.getFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT,
DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT);
if (singleShuffleMemoryLimitPercent <= 0.0f
|| singleShuffleMemoryLimitPercent > 1.0f) {
throw new IllegalArgumentException("Invalid value for "
+ MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
+ singleShuffleMemoryLimitPercent);
}
this.maxSingleShuffleLimit =
(long)(memoryLimit * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION);
(long)(memoryLimit * singleShuffleMemoryLimitPercent);
this.memToMemMergeOutputsThreshold =
jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor);
this.mergeThreshold = (long)(this.memoryLimit *
jobConf.getFloat(MRJobConfig.SHUFFLE_MERGE_EPRCENT,
jobConf.getFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT,
0.90f));
LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " +
"maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " +

View File

@ -355,7 +355,7 @@ private static void addDeprecatedKeys() {
Configuration.addDeprecation("mapred.job.shuffle.input.buffer.percent",
new String[] {MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT});
Configuration.addDeprecation("mapred.job.shuffle.merge.percent",
new String[] {MRJobConfig.SHUFFLE_MERGE_EPRCENT});
new String[] {MRJobConfig.SHUFFLE_MERGE_PERCENT});
Configuration.addDeprecation("mapred.max.reduce.failures.percent",
new String[] {MRJobConfig.REDUCE_FAILURES_MAXPERCENT});
Configuration.addDeprecation("mapred.reduce.child.env",

View File

@ -517,6 +517,13 @@
</description>
</property>
<property>
<name>mapreduce.reduce.shuffle.memory.limit.percent</name>
<value>0.25</value>
<description>Expert: Maximum percentage of the in-memory limit that a
single shuffle can consume</description>
</property>
<property>
<name>mapreduce.reduce.markreset.buffer.percent</name>
<value>0.0</value>

View File

@ -21,9 +21,11 @@
import java.io.IOException;
import java.text.ParseException;
import java.util.Iterator;
import java.util.Random;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.Counters.Group;
import org.apache.hadoop.mapreduce.FileSystemCounter;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.TaskCounter;
@ -71,8 +73,6 @@ private void testCounter(Counters counter) throws ParseException {
// Check for recovery from string
assertEquals("Recovered counter does not match on content",
counter, recoveredCounter);
assertEquals("recovered counter has wrong hash code",
counter.hashCode(), recoveredCounter.hashCode());
}
@Test
@ -159,6 +159,28 @@ public void testLegacyNames() {
"FILE_BYTES_READ").getValue());
}
@SuppressWarnings("deprecation")
@Test
public void testCounterIteratorConcurrency() {
Counters counters = new Counters();
counters.incrCounter("group1", "counter1", 1);
Iterator<Group> iterator = counters.iterator();
counters.incrCounter("group2", "counter2", 1);
iterator.next();
}
@SuppressWarnings("deprecation")
@Test
public void testGroupIteratorConcurrency() {
Counters counters = new Counters();
counters.incrCounter("group1", "counter1", 1);
Group group = counters.getGroup("group1");
Iterator<Counter> iterator = group.iterator();
counters.incrCounter("group1", "counter2", 1);
iterator.next();
}
public static void main(String[] args) throws IOException {
new TestCounters().testCounters();
}

View File

@ -263,16 +263,20 @@ public long getTaskTrackerExpiryInterval() throws IOException,
@Override
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {
// JobClient will set this flag if getDelegationToken is called, if so, get
// the delegation tokens for the HistoryServer also.
if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED,
DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) {
Token hsDT = getDelegationTokenFromHS(clientCache.
getInitializedHSProxy(), new Text(
conf.get(JobClient.HS_DELEGATION_TOKEN_RENEWER)));
ts.addToken(hsDT.getService(), hsDT);
}
/* check if we have a hsproxy, if not, no need */
MRClientProtocol hsProxy = clientCache.getInitializedHSProxy();
if (hsProxy != null) {
// JobClient will set this flag if getDelegationToken is called, if so, get
// the delegation tokens for the HistoryServer also.
if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED,
DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) {
Token hsDT = getDelegationTokenFromHS(hsProxy, new Text(
conf.get(JobClient.HS_DELEGATION_TOKEN_RENEWER)));
ts.addToken(hsDT.getService(), hsDT);
}
}
// Upload only in security mode: TODO
Path applicationTokensFile =
new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
@ -404,7 +408,7 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
+ mergedCommand);
// Setup the CLASSPATH in environment
// i.e. add { job jar, CWD, Hadoop jars} to classpath.
// i.e. add { Hadoop jars, job jar, CWD } to classpath.
Map<String, String> environment = new HashMap<String, String>();
MRApps.setClasspath(environment, conf);

View File

@ -53,7 +53,7 @@ public static MiniMRClientCluster create(Class<?> caller, int noOfNMs,
Path appMasterJar = new Path(MiniMRYarnCluster.APPJAR);
fs.copyFromLocalFile(appMasterJar, appJar);
fs.setPermission(appJar, new FsPermission("700"));
fs.setPermission(appJar, new FsPermission("744"));
Job job = Job.getInstance(conf);

View File

@ -17,10 +17,17 @@
*/
package org.apache.hadoop.mapred;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.*;
import java.util.Iterator;
import junit.framework.TestCase;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -29,20 +36,21 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.Ignore;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
/**
* Class to test mapred task's
* - temp directory
* - child env
*/
@Ignore
public class TestMiniMRChildTask extends TestCase {
public class TestMiniMRChildTask {
private static final Log LOG =
LogFactory.getLog(TestMiniMRChildTask.class.getName());
@ -51,10 +59,24 @@ public class TestMiniMRChildTask extends TestCase {
private final static String MAP_OPTS_VAL = "-Xmx200m";
private final static String REDUCE_OPTS_VAL = "-Xmx300m";
private MiniMRCluster mr;
private MiniDFSCluster dfs;
private FileSystem fileSys;
private static MiniMRYarnCluster mr;
private static MiniDFSCluster dfs;
private static FileSystem fileSys;
private static Configuration conf = new Configuration();
private static FileSystem localFs;
static {
try {
localFs = FileSystem.getLocal(conf);
} catch (IOException io) {
throw new RuntimeException("problem getting local fs", io);
}
}
private static Path TEST_ROOT_DIR = new Path("target",
TestMiniMRChildTask.class.getName() + "-tmpDir").makeQualified(localFs);
static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
/**
* Map class which checks whether temp directory exists
* and check the value of java.io.tmpdir
@ -62,34 +84,26 @@ public class TestMiniMRChildTask extends TestCase {
* temp directory specified.
*/
public static class MapClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {
Path tmpDir;
FileSystem localFs;
public void map (LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String tmp = null;
if (localFs.exists(tmpDir)) {
tmp = tmpDir.makeQualified(localFs).toString();
assertEquals(tmp, new Path(System.getProperty("java.io.tmpdir")).
makeQualified(localFs).toString());
} else {
fail("Temp directory "+tmpDir +" doesnt exist.");
}
File tmpFile = File.createTempFile("test", ".tmp");
assertEquals(tmp, new Path(tmpFile.getParent()).
makeQualified(localFs).toString());
}
public void configure(JobConf job) {
tmpDir = new Path(job.get(JobContext.TASK_TEMP_DIR, "./tmp"));
try {
localFs = FileSystem.getLocal(job);
} catch (IOException ioe) {
ioe.printStackTrace();
fail("IOException in getting localFS");
}
}
implements Mapper<LongWritable, Text, Text, IntWritable> {
Path tmpDir;
public void map (LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
if (localFs.exists(tmpDir)) {
} else {
fail("Temp directory " + tmpDir +" doesnt exist.");
}
File tmpFile = File.createTempFile("test", ".tmp");
}
public void configure(JobConf job) {
tmpDir = new Path(System.getProperty("java.io.tmpdir"));
try {
localFs = FileSystem.getLocal(job);
} catch (IOException ioe) {
ioe.printStackTrace();
fail("IOException in getting localFS");
}
}
}
// configure a job
@ -136,7 +150,7 @@ public void launchTest(JobConf conf,
Path inDir,
Path outDir,
String input)
throws IOException {
throws IOException, InterruptedException, ClassNotFoundException {
configure(conf, inDir, outDir, input,
MapClass.class, IdentityReducer.class);
@ -144,48 +158,13 @@ public void launchTest(JobConf conf,
// Launch job with default option for temp dir.
// i.e. temp dir is ./tmp
JobClient.runJob(conf);
outFs.delete(outDir, true);
final String DEFAULT_ABS_TMP_PATH = "/tmp";
final String DEFAULT_REL_TMP_PATH = "../temp";
String absoluteTempPath = null;
String relativeTempPath = null;
for (String key : new String[] { "test.temp.dir", "test.tmp.dir" }) {
String p = conf.get(key);
if (p == null || p.isEmpty()) {
continue;
}
if (new Path(p).isAbsolute()) {
if (absoluteTempPath == null) {
absoluteTempPath = p;
}
} else {
if (relativeTempPath == null) {
relativeTempPath = p;
}
}
}
if (absoluteTempPath == null) {
absoluteTempPath = DEFAULT_ABS_TMP_PATH;
}
if (relativeTempPath == null) {
relativeTempPath = DEFAULT_REL_TMP_PATH;
}
// Launch job by giving relative path to temp dir.
LOG.info("Testing with relative temp dir = "+relativeTempPath);
conf.set("mapred.child.tmp", relativeTempPath);
JobClient.runJob(conf);
outFs.delete(outDir, true);
// Launch job by giving absolute path to temp dir
LOG.info("Testing with absolute temp dir = "+absoluteTempPath);
conf.set("mapred.child.tmp", absoluteTempPath);
JobClient.runJob(conf);
Job job = new Job(conf);
job.addFileToClassPath(APP_JAR);
job.setJarByClass(TestMiniMRChildTask.class);
job.setMaxMapAttempts(1); // speed up failures
job.waitForCompletion(true);
boolean succeeded = job.waitForCompletion(true);
assertTrue(succeeded);
outFs.delete(outDir, true);
}
@ -311,20 +290,33 @@ public void reduce(WritableComparable key, Iterator<Writable> values,
}
@Override
public void setUp() {
try {
// create configuration, dfs, file system and mapred cluster
dfs = new MiniDFSCluster(new Configuration(), 1, true, null);
fileSys = dfs.getFileSystem();
mr = new MiniMRCluster(2, fileSys.getUri().toString(), 1);
} catch (IOException ioe) {
tearDown();
@BeforeClass
public static void setup() throws IOException {
// create configuration, dfs, file system and mapred cluster
dfs = new MiniDFSCluster(conf, 1, true, null);
fileSys = dfs.getFileSystem();
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
if (mr == null) {
mr = new MiniMRYarnCluster(TestMiniMRChildTask.class.getName());
Configuration conf = new Configuration();
mr.init(conf);
mr.start();
}
// Copy MRAppJar and make it private. TODO: FIXME. This is a hack to
// workaround the absent public discache.
localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
localFs.setPermission(APP_JAR, new FsPermission("700"));
}
@Override
public void tearDown() {
@AfterClass
public static void tearDown() {
// close file system and shut down dfs and mapred cluster
try {
if (fileSys != null) {
@ -334,7 +326,8 @@ public void tearDown() {
dfs.shutdown();
}
if (mr != null) {
mr.shutdown();
mr.stop();
mr = null;
}
} catch (IOException ioe) {
LOG.info("IO exception in closing file system)" );
@ -351,9 +344,10 @@ public void tearDown() {
* the directory specified. We create a temp file and check if is is
* created in the directory specified.
*/
@Test
public void testTaskTempDir(){
try {
JobConf conf = mr.createJobConf();
JobConf conf = new JobConf(mr.getConfig());
// intialize input, output directories
Path inDir = new Path("testing/wc/input");
@ -375,9 +369,10 @@ public void testTaskTempDir(){
* - x=y (x can be a already existing env variable or a new variable)
* - x=$x:y (replace $x with the current value of x)
*/
public void testTaskEnv(){
try {
JobConf conf = mr.createJobConf();
JobConf conf = new JobConf(mr.getConfig());
// initialize input, output directories
Path inDir = new Path("testing/wc/input1");
Path outDir = new Path("testing/wc/output1");
@ -399,7 +394,7 @@ public void testTaskEnv(){
*/
public void testTaskOldEnv(){
try {
JobConf conf = mr.createJobConf();
JobConf conf = new JobConf(mr.getConfig());
// initialize input, output directories
Path inDir = new Path("testing/wc/input1");
Path outDir = new Path("testing/wc/output1");
@ -414,7 +409,7 @@ public void testTaskOldEnv(){
}
void runTestTaskEnv(JobConf conf, Path inDir, Path outDir, boolean oldConfigs)
throws IOException {
throws IOException, InterruptedException, ClassNotFoundException {
String input = "The input";
configure(conf, inDir, outDir, input,
EnvCheckMapper.class, EnvCheckReducer.class);
@ -445,8 +440,14 @@ void runTestTaskEnv(JobConf conf, Path inDir, Path outDir, boolean oldConfigs)
conf.set("path", System.getenv("PATH"));
conf.set(mapTaskJavaOptsKey, mapTaskJavaOpts);
conf.set(reduceTaskJavaOptsKey, reduceTaskJavaOpts);
RunningJob job = JobClient.runJob(conf);
assertTrue("The environment checker job failed.", job.isSuccessful());
Job job = new Job(conf);
job.addFileToClassPath(APP_JAR);
job.setJarByClass(TestMiniMRChildTask.class);
job.setMaxMapAttempts(1); // speed up failures
job.waitForCompletion(true);
boolean succeeded = job.waitForCompletion(true);
assertTrue("The environment checker job failed.", succeeded);
}
}

View File

@ -89,7 +89,7 @@ public void testReduceFromPartialMem() throws Exception {
job.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "-Xmx128m");
job.setLong(JobContext.REDUCE_MEMORY_TOTAL_BYTES, 128 << 20);
job.set(JobContext.SHUFFLE_INPUT_BUFFER_PERCENT, "0.14");
job.set(JobContext.SHUFFLE_MERGE_EPRCENT, "1.0");
job.set(JobContext.SHUFFLE_MERGE_PERCENT, "1.0");
Counters c = runJob(job);
final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter();
final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter();

View File

@ -24,12 +24,17 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.mapred.LocalContainerLauncher;
import org.apache.hadoop.mapred.ShuffleHandler;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -66,7 +71,27 @@ public MiniMRYarnCluster(String testName, int noOfNMs) {
public void init(Configuration conf) {
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
"apps_staging_dir/${user.name}/").getAbsolutePath());
"apps_staging_dir/").getAbsolutePath());
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000");
try {
Path stagingPath = FileContext.getFileContext(conf).makeQualified(
new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
if (fc.util().exists(stagingPath)) {
LOG.info(stagingPath + " exists! deleting...");
fc.delete(stagingPath, true);
}
LOG.info("mkdir: " + stagingPath);
//mkdir the staging directory so that right permissions are set while running as proxy user
fc.mkdir(stagingPath, null, true);
//mkdir done directory as well
String doneDir = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
Path doneDirPath = fc.makeQualified(new Path(doneDir));
fc.mkdir(doneDirPath, null, true);
} catch (IOException e) {
throw new YarnException("Could not create staging directory. ", e);
}
conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
// which shuffle doesn't happen
//configure the shuffle service in NM

View File

@ -0,0 +1,163 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2;
import junit.framework.TestCase;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.ProxyUsers;
import java.net.InetAddress;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.security.PrivilegedExceptionAction;
public class TestMiniMRProxyUser extends TestCase {
private MiniDFSCluster dfsCluster = null;
private MiniMRCluster mrCluster = null;
protected void setUp() throws Exception {
super.setUp();
if (System.getProperty("hadoop.log.dir") == null) {
System.setProperty("hadoop.log.dir", "/tmp");
}
int taskTrackers = 2;
int dataNodes = 2;
String proxyUser = System.getProperty("user.name");
String proxyGroup = "g";
StringBuilder sb = new StringBuilder();
sb.append("127.0.0.1,localhost");
for (InetAddress i : InetAddress.getAllByName(InetAddress.getLocalHost().getHostName())) {
sb.append(",").append(i.getCanonicalHostName());
}
JobConf conf = new JobConf();
conf.set("dfs.block.access.token.enable", "false");
conf.set("dfs.permissions", "true");
conf.set("hadoop.security.authentication", "simple");
conf.set("hadoop.proxyuser." + proxyUser + ".hosts", sb.toString());
conf.set("hadoop.proxyuser." + proxyUser + ".groups", proxyGroup);
String[] userGroups = new String[]{proxyGroup};
UserGroupInformation.createUserForTesting(proxyUser, userGroups);
UserGroupInformation.createUserForTesting("u1", userGroups);
UserGroupInformation.createUserForTesting("u2", new String[]{"gg"});
dfsCluster = new MiniDFSCluster(conf, dataNodes, true, null);
FileSystem fileSystem = dfsCluster.getFileSystem();
fileSystem.mkdirs(new Path("/tmp"));
fileSystem.mkdirs(new Path("/user"));
fileSystem.mkdirs(new Path("/hadoop/mapred/system"));
fileSystem.setPermission(new Path("/tmp"), FsPermission.valueOf("-rwxrwxrwx"));
fileSystem.setPermission(new Path("/user"), FsPermission.valueOf("-rwxrwxrwx"));
fileSystem.setPermission(new Path("/hadoop/mapred/system"), FsPermission.valueOf("-rwx------"));
String nnURI = fileSystem.getUri().toString();
int numDirs = 1;
String[] racks = null;
String[] hosts = null;
mrCluster = new MiniMRCluster(0, 0, taskTrackers, nnURI, numDirs, racks, hosts, null, conf);
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
}
protected JobConf getJobConf() {
return mrCluster.createJobConf();
}
@Override
protected void tearDown() throws Exception {
if (mrCluster != null) {
mrCluster.shutdown();
}
if (dfsCluster != null) {
dfsCluster.shutdown();
}
super.tearDown();
}
private void mrRun() throws Exception {
FileSystem fs = FileSystem.get(getJobConf());
Path inputDir = new Path("input");
fs.mkdirs(inputDir);
Writer writer = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
writer.write("hello");
writer.close();
Path outputDir = new Path("output", "output");
JobConf jobConf = new JobConf(getJobConf());
jobConf.setInt("mapred.map.tasks", 1);
jobConf.setInt("mapred.map.max.attempts", 1);
jobConf.setInt("mapred.reduce.max.attempts", 1);
jobConf.set("mapred.input.dir", inputDir.toString());
jobConf.set("mapred.output.dir", outputDir.toString());
JobClient jobClient = new JobClient(jobConf);
RunningJob runJob = jobClient.submitJob(jobConf);
runJob.waitForCompletion();
assertTrue(runJob.isComplete());
assertTrue(runJob.isSuccessful());
}
public void __testCurrentUser() throws Exception {
mrRun();
}
public void testValidProxyUser() throws Exception {
UserGroupInformation ugi = UserGroupInformation.createProxyUser("u1", UserGroupInformation.getLoginUser());
ugi.doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
mrRun();
return null;
}
});
}
public void ___testInvalidProxyUser() throws Exception {
UserGroupInformation ugi = UserGroupInformation.createProxyUser("u2", UserGroupInformation.getLoginUser());
ugi.doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
try {
mrRun();
fail();
}
catch (RemoteException ex) {
//nop
}
catch (Exception ex) {
fail();
}
return null;
}
});
}
}

View File

@ -0,0 +1,129 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMNMInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestRMNMInfo {
private static final Log LOG = LogFactory.getLog(TestRMNMInfo.class);
private static final int NUMNODEMANAGERS = 4;
protected static MiniMRYarnCluster mrCluster;
private static Configuration initialConf = new Configuration();
private static FileSystem localFs;
static {
try {
localFs = FileSystem.getLocal(initialConf);
} catch (IOException io) {
throw new RuntimeException("problem getting local fs", io);
}
}
private static Path TEST_ROOT_DIR =
new Path("target",TestRMNMInfo.class.getName() + "-tmpDir")
.makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
@BeforeClass
public static void setup() throws IOException {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
if (mrCluster == null) {
mrCluster = new MiniMRYarnCluster(TestRMNMInfo.class.getName(),
NUMNODEMANAGERS);
Configuration conf = new Configuration();
mrCluster.init(conf);
mrCluster.start();
}
// workaround the absent public distcache.
localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
localFs.setPermission(APP_JAR, new FsPermission("700"));
}
@AfterClass
public static void tearDown() {
if (mrCluster != null) {
mrCluster.stop();
mrCluster = null;
}
}
@Test
public void testRMNMInfo() throws Exception {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
RMContext rmc = mrCluster.getResourceManager().getRMContext();
ResourceScheduler rms = mrCluster.getResourceManager()
.getResourceScheduler();
RMNMInfo rmInfo = new RMNMInfo(rmc,rms);
String liveNMs = rmInfo.getLiveNodeManagers();
ObjectMapper mapper = new ObjectMapper();
JsonNode jn = mapper.readTree(liveNMs);
Assert.assertEquals("Unexpected number of live nodes:",
NUMNODEMANAGERS, jn.size());
Iterator<JsonNode> it = jn.iterator();
while (it.hasNext()) {
JsonNode n = it.next();
Assert.assertNotNull(n.get("HostName"));
Assert.assertNotNull(n.get("Rack"));
Assert.assertTrue("Node " + n.get("NodeId") + " should be RUNNING",
n.get("State").getValueAsText().contains("RUNNING"));
Assert.assertNotNull(n.get("NodeHTTPAddress"));
Assert.assertTrue("Node " + n.get("NodeId") + " should be Healthy",
n.get("HealthStatus").getValueAsText().contains("Healthy"));
Assert.assertNotNull(n.get("LastHealthUpdate"));
Assert.assertNotNull(n.get("HealthReport"));
Assert.assertNotNull(n.get("NumContainersMB"));
Assert.assertEquals(
n.get("NodeId") + ": Unexpected number of used containers",
0, n.get("NumContainersMB").getValueAsInt());
Assert.assertEquals(
n.get("NodeId") + ": Unexpected amount of used memory",
0, n.get("UsedMemoryMB").getValueAsInt());
Assert.assertNotNull(n.get("AvailableMemoryMB"));
}
}
}

View File

@ -512,6 +512,9 @@ public class YarnConfiguration extends Configuration {
public static final String YARN_APPLICATION_CLASSPATH = YARN_PREFIX
+ "application.classpath";
/** Container temp directory */
public static final String DEFAULT_CONTAINER_TEMP_DIR = "./tmp";
public YarnConfiguration() {
super();
}

View File

@ -25,4 +25,7 @@ public interface YarnWebParams {
String CONTAINER_LOG_TYPE= "log.type";
String ENTITY_STRING = "entity.string";
String APP_OWNER = "app.owner";
String APP_STATE = "app.state";
String QUEUE_NAME = "queue.name";
String NODE_STATE = "node.state";
}

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
@ -128,6 +129,10 @@ public int launchContainer(Container container,
// Create the container log-dirs on all disks
createContainerLogDirs(appIdStr, containerIdStr, logDirs);
Path tmpDir = new Path(containerWorkDir,
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
lfs.mkdir(tmpDir, null, false);
// copy launch script to work dir
Path launchDst =
new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT);

View File

@ -44,6 +44,10 @@ public class ApplicationPage extends NMView implements YarnWebParams {
@Override protected void preHead(Page.HTML<_> html) {
commonPreHead(html);
// Per-app information. Helps to refresh automatically.
html.meta_http("refresh", "10");
set(DATATABLES_ID, "containers");
set(initID(DATATABLES, "containers"), containersTableInit());
setTableStyles(html, "containers");

View File

@ -62,7 +62,6 @@ public class ContainerLogsPage extends NMView {
String redirectUrl = $(REDIRECT_URL);
if (redirectUrl == null || redirectUrl.isEmpty()) {
set(TITLE, join("Logs for ", $(CONTAINER_ID)));
html.meta_http("refresh", "10");
} else {
if (redirectUrl.equals("false")) {
set(TITLE, join("Failed redirect for ", $(CONTAINER_ID)));

View File

@ -40,6 +40,10 @@ public class ContainerPage extends NMView implements YarnWebParams {
@Override
protected void preHead(Page.HTML<_> html) {
commonPreHead(html);
// Per-container information. Helps to refresh automatically.
html.meta_http("refresh", "10");
setTitle("Container " + $(CONTAINER_ID));
set(initID(ACCORDION, "nav"), "{autoHeight:false, active:0}");
}

View File

@ -33,7 +33,6 @@ public class NMView extends TwoColumnLayout {
}
protected void commonPreHead(Page.HTML<_> html) {
html.meta_http("refresh", "10");
set(ACCORDION_ID, "nav");
set(initID(ACCORDION, "nav"), "{autoHeight:false, active:0}");
set(THEMESWITCHER_ID, "themeswitcher");

View File

@ -41,6 +41,10 @@ public class NodePage extends NMView {
@Override
protected void commonPreHead(HTML<_> html) {
super.commonPreHead(html);
// Node summary page. Helps to refresh automatically.
html.meta_http("refresh", "10");
set(initID(ACCORDION, "nav"), "{autoHeight:false, active:0}");
}

View File

@ -314,6 +314,13 @@ char* get_app_log_directory(const char *log_root, const char* app_id) {
app_id);
}
/**
* Get the tmp directory under the working directory
*/
char *get_tmp_directory(const char *work_dir) {
return concatenate("%s/%s", "tmp dir", 2, work_dir, TMP_DIR);
}
/**
* Ensure that the given path and all of the parent directories are created
* with the desired permissions.
@ -357,7 +364,7 @@ int mkdirs(const char* path, mode_t perm) {
* It creates the container work and log directories.
*/
static int create_container_directories(const char* user, const char *app_id,
const char *container_id, char* const* local_dir, char* const* log_dir) {
const char *container_id, char* const* local_dir, char* const* log_dir, const char *work_dir) {
// create dirs as 0750
const mode_t perms = S_IRWXU | S_IRGRP | S_IXGRP;
if (app_id == NULL || container_id == NULL || user == NULL) {
@ -409,6 +416,23 @@ static int create_container_directories(const char* user, const char *app_id,
}
free(combined_name);
}
if (result != 0) {
return result;
}
result = -1;
// also make the tmp directory
char *tmp_dir = get_tmp_directory(work_dir);
if (tmp_dir == NULL) {
return -1;
}
if (mkdirs(tmp_dir, perms) == 0) {
result = 0;
}
free(tmp_dir);
return result;
}
@ -823,7 +847,7 @@ int launch_container_as_user(const char *user, const char *app_id,
}
if (create_container_directories(user, app_id, container_id, local_dirs,
log_dirs) != 0) {
log_dirs, work_dir) != 0) {
fprintf(LOGFILE, "Could not create container dirs");
goto cleanup;
}

View File

@ -64,6 +64,7 @@ enum errorcodes {
#define CREDENTIALS_FILENAME "container_tokens"
#define MIN_USERID_KEY "min.user.id"
#define BANNED_USERS_KEY "banned.users"
#define TMP_DIR "tmp"
extern struct passwd *user_detail;

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import javax.management.NotCompliantMBeanException;
import javax.management.StandardMBean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.mortbay.util.ajax.JSON;
/**
* JMX bean listing statuses of all node managers.
*/
public class RMNMInfo implements RMNMInfoBeans {
private static final Log LOG = LogFactory.getLog(RMNMInfo.class);
private RMContext rmContext;
private ResourceScheduler scheduler;
/**
* Constructor for RMNMInfo registers the bean with JMX.
*
* @param rmc resource manager's context object
* @param sched resource manager's scheduler object
*/
public RMNMInfo(RMContext rmc, ResourceScheduler sched) {
this.rmContext = rmc;
this.scheduler = sched;
StandardMBean bean;
try {
bean = new StandardMBean(this,RMNMInfoBeans.class);
MBeans.register("ResourceManager", "RMNMInfo", bean);
} catch (NotCompliantMBeanException e) {
LOG.warn("Error registering RMNMInfo MBean", e);
}
LOG.info("Registered RMNMInfo MBean");
}
static class InfoMap extends LinkedHashMap<String, Object> {
private static final long serialVersionUID = 1L;
}
/**
* Implements getLiveNodeManagers()
*
* @return JSON formatted string containing statuses of all node managers
*/
@Override // RMNMInfoBeans
public String getLiveNodeManagers() {
Collection<RMNode> nodes = this.rmContext.getRMNodes().values();
List<InfoMap> nodesInfo = new ArrayList<InfoMap>();
for (final RMNode ni : nodes) {
SchedulerNodeReport report = scheduler.getNodeReport(ni.getNodeID());
InfoMap info = new InfoMap();
info.put("HostName", ni.getHostName());
info.put("Rack", ni.getRackName());
info.put("State", ni.getState().toString());
info.put("NodeId", ni.getNodeID());
info.put("NodeHTTPAddress", ni.getHttpAddress());
info.put("HealthStatus",
ni.getNodeHealthStatus().getIsNodeHealthy() ?
"Healthy" : "Unhealthy");
info.put("LastHealthUpdate",
ni.getNodeHealthStatus().getLastHealthReportTime());
info.put("HealthReport",
ni.getNodeHealthStatus().getHealthReport());
info.put("NumContainersMB", report.getNumContainers());
info.put("UsedMemoryMB", report.getUsedResource().getMemory());
info.put("AvailableMemoryMB",
report.getAvailableResource().getMemory());
nodesInfo.add(info);
}
return JSON.toString(nodesInfo);
}
}

View File

@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
public interface RMNMInfoBeans {
public String getLiveNodeManagers();
}

View File

@ -207,6 +207,8 @@ public synchronized void init(Configuration conf) {
addService(applicationMasterLauncher);
new RMNMInfo(this.rmContext, this.scheduler);
super.init(conf);
}

View File

@ -51,20 +51,19 @@ public class QueueMetrics {
@Metric("# of apps killed") MutableCounterInt appsKilled;
@Metric("# of apps failed") MutableCounterInt appsFailed;
@Metric("Allocated memory in GiB") MutableGaugeInt allocatedGB;
@Metric("Allocated memory in MB") MutableGaugeInt allocatedMB;
@Metric("# of allocated containers") MutableGaugeInt allocatedContainers;
@Metric("Aggregate # of allocated containers") MutableCounterLong aggregateContainersAllocated;
@Metric("Aggregate # of released containers") MutableCounterLong aggregateContainersReleased;
@Metric("Available memory in GiB") MutableGaugeInt availableGB;
@Metric("Pending memory allocation in GiB") MutableGaugeInt pendingGB;
@Metric("Available memory in MB") MutableGaugeInt availableMB;
@Metric("Pending memory allocation in MB") MutableGaugeInt pendingMB;
@Metric("# of pending containers") MutableGaugeInt pendingContainers;
@Metric("# of reserved memory in GiB") MutableGaugeInt reservedGB;
@Metric("# of reserved memory in MB") MutableGaugeInt reservedMB;
@Metric("# of reserved containers") MutableGaugeInt reservedContainers;
@Metric("# of active users") MutableGaugeInt activeUsers;
@Metric("# of active users") MutableGaugeInt activeApplications;
static final Logger LOG = LoggerFactory.getLogger(QueueMetrics.class);
static final int GB = 1024; // resource.memory is in MB
static final MetricsInfo RECORD_INFO = info("QueueMetrics",
"Metrics for the resource scheduler");
static final MetricsInfo QUEUE_INFO = info("Queue", "Metrics by queue");
@ -183,7 +182,7 @@ public void finishApp(AppSchedulingInfo app,
* @param limit resource limit
*/
public void setAvailableResourcesToQueue(Resource limit) {
availableGB.set(limit.getMemory()/GB);
availableMB.set(limit.getMemory());
}
/**
@ -219,7 +218,7 @@ public void incrPendingResources(String user, int containers, Resource res) {
private void _incrPendingResources(int containers, Resource res) {
pendingContainers.incr(containers);
pendingGB.incr(res.getMemory()/GB);
pendingMB.incr(res.getMemory());
}
public void decrPendingResources(String user, int containers, Resource res) {
@ -235,13 +234,13 @@ public void decrPendingResources(String user, int containers, Resource res) {
private void _decrPendingResources(int containers, Resource res) {
pendingContainers.decr(containers);
pendingGB.decr(res.getMemory()/GB);
pendingMB.decr(res.getMemory());
}
public void allocateResources(String user, int containers, Resource res) {
allocatedContainers.incr(containers);
aggregateContainersAllocated.incr(containers);
allocatedGB.incr(res.getMemory()/GB * containers);
allocatedMB.incr(res.getMemory() * containers);
_decrPendingResources(containers, multiply(res, containers));
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
@ -255,7 +254,7 @@ public void allocateResources(String user, int containers, Resource res) {
public void releaseResources(String user, int containers, Resource res) {
allocatedContainers.decr(containers);
aggregateContainersReleased.incr(containers);
allocatedGB.decr(res.getMemory()/GB * containers);
allocatedMB.decr(res.getMemory() * containers);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.releaseResources(user, containers, res);
@ -267,7 +266,7 @@ public void releaseResources(String user, int containers, Resource res) {
public void reserveResource(String user, Resource res) {
reservedContainers.incr();
reservedGB.incr(res.getMemory()/GB);
reservedMB.incr(res.getMemory());
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.reserveResource(user, res);
@ -279,7 +278,7 @@ public void reserveResource(String user, Resource res) {
public void unreserveResource(String user, Resource res) {
reservedContainers.decr();
reservedGB.decr(res.getMemory()/GB);
reservedMB.decr(res.getMemory());
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.unreserveResource(user, res);
@ -343,28 +342,28 @@ public int getAppsFailed() {
return appsFailed.value();
}
public int getAllocatedGB() {
return allocatedGB.value();
public int getAllocatedMB() {
return allocatedMB.value();
}
public int getAllocatedContainers() {
return allocatedContainers.value();
}
public int getAvailableGB() {
return availableGB.value();
public int getAvailableMB() {
return availableMB.value();
}
public int getPendingGB() {
return pendingGB.value();
public int getPendingMB() {
return pendingMB.value();
}
public int getPendingContainers() {
return pendingContainers.value();
}
public int getReservedGB() {
return reservedGB.value();
public int getReservedMB() {
return reservedMB.value();
}
public int getReservedContainers() {

View File

@ -531,8 +531,10 @@ public List<QueueUserACLInfo> getQueueUserAclInfo() {
private synchronized void nodeUpdate(RMNode nm,
List<ContainerStatus> newlyLaunchedContainers,
List<ContainerStatus> completedContainers) {
LOG.info("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
if (LOG.isDebugEnabled()) {
LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
}
SchedulerNode node = getNode(nm.getNodeID());
// Processing the newly launched containers

View File

@ -24,6 +24,8 @@ public class AppPage extends RmView {
@Override protected void preHead(Page.HTML<_> html) {
commonPreHead(html);
// App page is per-app information. Helps to refresh automatically.
html.meta_http("refresh", "10");
}
@Override protected Class<? extends SubView> content() {

View File

@ -19,10 +19,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import static org.apache.hadoop.yarn.util.StringHelper.join;
import static org.apache.hadoop.yarn.util.StringHelper.sjoin;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_STATE;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR_VALUE;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
@ -56,7 +59,12 @@ class AppsBlock extends HtmlBlock {
th(".note", "Note")._()._().
tbody();
int i = 0;
String reqState = $(APP_STATE);
reqState = (reqState == null ? "" : reqState);
for (RMApp app : list.apps.values()) {
if (!reqState.isEmpty() && app.getState() != RMAppState.valueOf(reqState)) {
continue;
}
AppInfo appInfo = new AppInfo(app, true);
String percent = String.format("%.1f", appInfo.getProgress());
tbody.
@ -86,7 +94,7 @@ class AppsBlock extends HtmlBlock {
if (list.rendering == Render.JS_ARRAY) {
echo("<script type='text/javascript'>\n",
"var appsData=");
list.toDataTableArrays(writer());
list.toDataTableArrays(reqState, writer());
echo("\n</script>\n");
}
}

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.webapp.Controller.RequestContext;
import org.apache.hadoop.yarn.webapp.ToJSON;
@ -51,10 +52,14 @@ class AppsList implements ToJSON {
apps = rmContext.getRMApps();
}
void toDataTableArrays(PrintWriter out) {
void toDataTableArrays(String requiredAppState, PrintWriter out) {
out.append('[');
boolean first = true;
for (RMApp app : apps.values()) {
if (requiredAppState != null && !requiredAppState.isEmpty()
&& app.getState() != RMAppState.valueOf(requiredAppState)) {
continue;
}
AppInfo appInfo = new AppInfo(app, false);
if (first) {
first = false;
@ -84,7 +89,7 @@ void toDataTableArrays(PrintWriter out) {
@Override
public void toJSON(PrintWriter out) {
out.print("{\"aaData\":");
toDataTableArrays(out);
toDataTableArrays(null, out);
out.print("}\n");
}
}

View File

@ -18,18 +18,32 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.LI;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.UL;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
public class NavBlock extends HtmlBlock {
@Override public void render(Block html) {
html.
UL<DIV<Hamlet>> mainList = html.
div("#nav").
h3("Cluster").
ul().
li().a(url("cluster"), "About")._().
li().a(url("nodes"), "Nodes")._().
li().a(url("apps"), "Applications")._().
li().a(url("nodes"), "Nodes")._();
UL<LI<UL<DIV<Hamlet>>>> subAppsList = mainList.
li().a(url("apps"), "Applications").
ul();
subAppsList.li()._();
for (RMAppState state : RMAppState.values()) {
subAppsList.
li().a(url("apps", state.toString()), state.toString())._();
}
subAppsList._()._();
mainList.
li().a(url("scheduler"), "Scheduler")._()._().
h3("Tools").
ul().

View File

@ -25,14 +25,12 @@
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
/**
* The RM webapp
*/
public class RMWebApp extends WebApp {
static final String APP_ID = "app.id";
static final String QUEUE_NAME = "queue.name";
static final String NODE_STATE = "node.state";
public class RMWebApp extends WebApp implements YarnWebParams {
private final ResourceManager rm;
@ -53,9 +51,9 @@ public void setup() {
}
route("/", RmController.class);
route(pajoin("/nodes", NODE_STATE), RmController.class, "nodes");
route("/apps", RmController.class);
route(pajoin("/apps", APP_STATE), RmController.class);
route("/cluster", RmController.class, "about");
route(pajoin("/app", APP_ID), RmController.class, "app");
route(pajoin("/app", APPLICATION_ID), RmController.class, "app");
route("/scheduler", RmController.class, "scheduler");
route(pajoin("/queue", QUEUE_NAME), RmController.class, "queue");
}

View File

@ -18,9 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp.APP_ID;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp.QUEUE_NAME;
import static org.apache.hadoop.yarn.util.StringHelper.join;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APPLICATION_ID;
import javax.servlet.http.HttpServletResponse;
@ -64,7 +64,7 @@ public void about() {
}
public void app() {
String aid = $(APP_ID);
String aid = $(APPLICATION_ID);
if (aid.isEmpty()) {
setStatus(HttpServletResponse.SC_BAD_REQUEST);
setTitle("Bad request: requires application ID");

View File

@ -21,6 +21,8 @@
import org.apache.hadoop.yarn.webapp.SubView;
import org.apache.hadoop.yarn.webapp.view.TwoColumnLayout;
import static org.apache.hadoop.yarn.util.StringHelper.sjoin;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_STATE;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
// Do NOT rename/refactor this to RMView as it will wreak havoc
@ -36,10 +38,14 @@ protected void preHead(Page.HTML<_> html) {
set(DATATABLES_ID, "apps");
set(initID(DATATABLES, "apps"), appsTableInit());
setTableStyles(html, "apps", ".queue {width:6em}", ".ui {width:8em}");
// Set the correct title.
String reqState = $(APP_STATE);
reqState = (reqState == null || reqState.isEmpty() ? "All" : reqState);
setTitle(sjoin(reqState, "Applications"));
}
protected void commonPreHead(Page.HTML<_> html) {
//html.meta_http("refresh", "20");
set(ACCORDION_ID, "nav");
set(initID(ACCORDION, "nav"), "{autoHeight:false, active:0}");
set(THEMESWITCHER_ID, "themeswitcher");

View File

@ -31,8 +31,6 @@
@XmlAccessorType(XmlAccessType.FIELD)
public class ClusterMetricsInfo {
private static final long MB_IN_GB = 1024;
protected int appsSubmitted;
protected long reservedMB;
protected long availableMB;
@ -55,9 +53,9 @@ public ClusterMetricsInfo(final ResourceManager rm, final RMContext rmContext) {
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
this.appsSubmitted = metrics.getAppsSubmitted();
this.reservedMB = metrics.getReservedGB() * MB_IN_GB;
this.availableMB = metrics.getAvailableGB() * MB_IN_GB;
this.allocatedMB = metrics.getAllocatedGB() * MB_IN_GB;
this.reservedMB = metrics.getReservedMB();
this.availableMB = metrics.getAvailableMB();
this.allocatedMB = metrics.getAllocatedMB();
this.containersAllocated = metrics.getAllocatedContainers();
this.totalMB = availableMB + reservedMB + allocatedMB;
this.activeNodes = clusterMetrics.getNumActiveNMs();

View File

@ -31,8 +31,6 @@
@XmlAccessorType(XmlAccessType.FIELD)
public class UserMetricsInfo {
private static final long MB_IN_GB = 1024;
protected int appsSubmitted;
protected int runningContainers;
protected int pendingContainers;
@ -60,9 +58,9 @@ public UserMetricsInfo(final ResourceManager rm, final RMContext rmContext,
this.runningContainers = userMetrics.getAllocatedContainers();
this.pendingContainers = userMetrics.getPendingContainers();
this.reservedContainers = userMetrics.getReservedContainers();
this.reservedMB = userMetrics.getReservedGB() * MB_IN_GB;
this.pendingMB = userMetrics.getPendingGB() * MB_IN_GB;
this.allocatedMB = userMetrics.getAllocatedGB() * MB_IN_GB;
this.reservedMB = userMetrics.getReservedMB();
this.pendingMB = userMetrics.getPendingMB();
this.allocatedMB = userMetrics.getAllocatedMB();
}
}

View File

@ -53,9 +53,9 @@
<property>
<name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
<value>-1</value>
<value>100</value>
<description>
The maximum capacity of the default queue. A value of -1 disables this.
The maximum capacity of the default queue.
</description>
</property>

View File

@ -22,14 +22,14 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationMaster;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.ApplicationStatus;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@ -41,7 +41,6 @@
@InterfaceAudience.Private
public abstract class MockAsm extends MockApps {
static final int DT = 1000000; // ms
public static class AppMasterBase implements ApplicationMaster {
@Override
@ -232,9 +231,10 @@ public static RMApp newApplication(int i) {
final String user = newUserName();
final String name = newAppName();
final String queue = newQueue();
final long start = System.currentTimeMillis() - (int)(Math.random()*DT);
final long finish = Math.random() < 0.5 ? 0 :
System.currentTimeMillis() + (int)(Math.random()*DT);
final long start = 123456 + i * 1000;
final long finish = 234567 + i * 1000;
RMAppState[] allStates = RMAppState.values();
final RMAppState state = allStates[i % allStates.length];
return new ApplicationBase() {
@Override
public ApplicationId getApplicationId() {
@ -270,7 +270,7 @@ public String getTrackingUrl() {
}
@Override
public RMAppState getState() {
return RMAppState.RUNNING;
return state;
}
@Override
public StringBuilder getDiagnostics() {

View File

@ -57,16 +57,16 @@ public class TestQueueMetrics {
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
checkResources(queueSource, 0, 0, 0, 0, 100, 15, 5, 0, 0);
checkResources(queueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0);
metrics.incrAppsRunning(user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0);
metrics.allocateResources(user, 3, Resources.createResource(2*GB));
checkResources(queueSource, 6, 3, 3, 0, 100, 9, 2, 0, 0);
checkResources(queueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 0, 0);
metrics.releaseResources(user, 1, Resources.createResource(2*GB));
checkResources(queueSource, 4, 2, 3, 1, 100, 9, 2, 0, 0);
checkResources(queueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0);
metrics.finishApp(app, RMAppAttemptState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0);
@ -92,20 +92,20 @@ public class TestQueueMetrics {
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
checkResources(queueSource, 0, 0, 0, 0, 100, 15, 5, 0, 0);
checkResources(userSource, 0, 0, 0, 0, 10, 15, 5, 0, 0);
checkResources(queueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0);
checkResources(userSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0);
metrics.incrAppsRunning(user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0);
checkApps(userSource, 1, 0, 1, 0, 0, 0);
metrics.allocateResources(user, 3, Resources.createResource(2*GB));
checkResources(queueSource, 6, 3, 3, 0, 100, 9, 2, 0, 0);
checkResources(userSource, 6, 3, 3, 0, 10, 9, 2, 0, 0);
checkResources(queueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 0, 0);
checkResources(userSource, 6*GB, 3, 3, 0, 10*GB, 9*GB, 2, 0, 0);
metrics.releaseResources(user, 1, Resources.createResource(2*GB));
checkResources(queueSource, 4, 2, 3, 1, 100, 9, 2, 0, 0);
checkResources(userSource, 4, 2, 3, 1, 10, 9, 2, 0, 0);
checkResources(queueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0);
checkResources(userSource, 4*GB, 2, 3, 1, 10*GB, 9*GB, 2, 0, 0);
metrics.finishApp(app, RMAppAttemptState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0);
@ -141,10 +141,10 @@ public class TestQueueMetrics {
parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB));
metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB));
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
checkResources(queueSource, 0, 0, 0, 0, 100, 15, 5, 0, 0);
checkResources(parentQueueSource, 0, 0, 0, 0, 100, 15, 5, 0, 0);
checkResources(userSource, 0, 0, 0, 0, 10, 15, 5, 0, 0);
checkResources(parentUserSource, 0, 0, 0, 0, 10, 15, 5, 0, 0);
checkResources(queueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0);
checkResources(parentQueueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0);
checkResources(userSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0);
checkResources(parentUserSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0);
metrics.incrAppsRunning(user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0);
@ -154,17 +154,17 @@ public class TestQueueMetrics {
metrics.reserveResource(user, Resources.createResource(3*GB));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
checkResources(queueSource, 6, 3, 3, 0, 100, 9, 2, 3, 1);
checkResources(parentQueueSource, 6, 3, 3, 0, 100, 9, 2, 3, 1);
checkResources(userSource, 6, 3, 3, 0, 10, 9, 2, 3, 1);
checkResources(parentUserSource, 6, 3, 3, 0, 10, 9, 2, 3, 1);
checkResources(queueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 3*GB, 1);
checkResources(parentQueueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 3*GB, 1);
checkResources(userSource, 6*GB, 3, 3, 0, 10*GB, 9*GB, 2, 3*GB, 1);
checkResources(parentUserSource, 6*GB, 3, 3, 0, 10*GB, 9*GB, 2, 3*GB, 1);
metrics.releaseResources(user, 1, Resources.createResource(2*GB));
metrics.unreserveResource(user, Resources.createResource(3*GB));
checkResources(queueSource, 4, 2, 3, 1, 100, 9, 2, 0, 0);
checkResources(parentQueueSource, 4, 2, 3, 1, 100, 9, 2, 0, 0);
checkResources(userSource, 4, 2, 3, 1, 10, 9, 2, 0, 0);
checkResources(parentUserSource, 4, 2, 3, 1, 10, 9, 2, 0, 0);
checkResources(queueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0);
checkResources(parentQueueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0);
checkResources(userSource, 4*GB, 2, 3, 1, 10*GB, 9*GB, 2, 0, 0);
checkResources(parentUserSource, 4*GB, 2, 3, 1, 10*GB, 9*GB, 2, 0, 0);
metrics.finishApp(app, RMAppAttemptState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0);
@ -184,18 +184,19 @@ public static void checkApps(MetricsSource source, int submitted, int pending,
assertCounter("AppsKilled", killed, rb);
}
public static void checkResources(MetricsSource source, int allocGB,
int allocCtnrs, long aggreAllocCtnrs, long aggreReleasedCtnrs, int availGB, int pendingGB, int pendingCtnrs,
int reservedGB, int reservedCtnrs) {
public static void checkResources(MetricsSource source, int allocatedMB,
int allocCtnrs, long aggreAllocCtnrs, long aggreReleasedCtnrs,
int availableMB, int pendingMB, int pendingCtnrs,
int reservedMB, int reservedCtnrs) {
MetricsRecordBuilder rb = getMetrics(source);
assertGauge("AllocatedGB", allocGB, rb);
assertGauge("AllocatedMB", allocatedMB, rb);
assertGauge("AllocatedContainers", allocCtnrs, rb);
assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb);
assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb);
assertGauge("AvailableGB", availGB, rb);
assertGauge("PendingGB", pendingGB, rb);
assertGauge("AvailableMB", availableMB, rb);
assertGauge("PendingMB", pendingMB, rb);
assertGauge("PendingContainers", pendingCtnrs, rb);
assertGauge("ReservedGB", reservedGB, rb);
assertGauge("ReservedMB", reservedMB, rb);
assertGauge("ReservedContainers", reservedCtnrs, rb);
}

View File

@ -251,7 +251,7 @@ public void testSingleQueueOneUserMetrics() throws Exception {
// Only 1 container
a.assignContainers(clusterResource, node_0);
assertEquals(7, a.getMetrics().getAvailableGB());
assertEquals(7*GB, a.getMetrics().getAvailableMB());
}
@ -307,9 +307,9 @@ public void testSingleQueueWithOneUser() throws Exception {
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0, a.getMetrics().getReservedGB());
assertEquals(1, a.getMetrics().getAllocatedGB());
assertEquals(0, a.getMetrics().getAvailableGB());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(1*GB, a.getMetrics().getAllocatedMB());
assertEquals(0*GB, a.getMetrics().getAvailableMB());
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
@ -317,16 +317,16 @@ public void testSingleQueueWithOneUser() throws Exception {
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0, a.getMetrics().getReservedGB());
assertEquals(2, a.getMetrics().getAllocatedGB());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
// Can't allocate 3rd due to user-limit
a.assignContainers(clusterResource, node_0);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0, a.getMetrics().getReservedGB());
assertEquals(2, a.getMetrics().getAllocatedGB());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
// Bump up user-limit-factor, now allocate should work
a.setUserLimitFactor(10);
@ -334,16 +334,16 @@ public void testSingleQueueWithOneUser() throws Exception {
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0, a.getMetrics().getReservedGB());
assertEquals(3, a.getMetrics().getAllocatedGB());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(3*GB, a.getMetrics().getAllocatedMB());
// One more should work, for app_1, due to user-limit-factor
a.assignContainers(clusterResource, node_0);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0, a.getMetrics().getReservedGB());
assertEquals(4, a.getMetrics().getAllocatedGB());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(4*GB, a.getMetrics().getAllocatedMB());
// Test max-capacity
// Now - no more allocs since we are at max-cap
@ -352,8 +352,8 @@ public void testSingleQueueWithOneUser() throws Exception {
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0, a.getMetrics().getReservedGB());
assertEquals(4, a.getMetrics().getAllocatedGB());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(4*GB, a.getMetrics().getAllocatedMB());
// Release each container from app_0
for (RMContainer rmContainer : app_0.getLiveContainers()) {
@ -363,8 +363,8 @@ public void testSingleQueueWithOneUser() throws Exception {
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0, a.getMetrics().getReservedGB());
assertEquals(1, a.getMetrics().getAllocatedGB());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(1*GB, a.getMetrics().getAllocatedMB());
// Release each container from app_1
for (RMContainer rmContainer : app_1.getLiveContainers()) {
@ -374,9 +374,9 @@ public void testSingleQueueWithOneUser() throws Exception {
assertEquals(0*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0, a.getMetrics().getReservedGB());
assertEquals(0, a.getMetrics().getAllocatedGB());
assertEquals(1, a.getMetrics().getAvailableGB());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(0*GB, a.getMetrics().getAllocatedMB());
assertEquals(1*GB, a.getMetrics().getAvailableMB());
}
@Test
@ -700,9 +700,9 @@ public void testReservation() throws Exception {
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0, a.getMetrics().getReservedGB());
assertEquals(1, a.getMetrics().getAllocatedGB());
assertEquals(0, a.getMetrics().getAvailableGB());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(1*GB, a.getMetrics().getAllocatedMB());
assertEquals(0*GB, a.getMetrics().getAvailableMB());
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
@ -710,8 +710,8 @@ public void testReservation() throws Exception {
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0, a.getMetrics().getReservedGB());
assertEquals(2, a.getMetrics().getAllocatedGB());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
// Now, reservation should kick in for app_1
a.assignContainers(clusterResource, node_0);
@ -720,8 +720,8 @@ public void testReservation() throws Exception {
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
assertEquals(2*GB, node_0.getUsedResource().getMemory());
assertEquals(4, a.getMetrics().getReservedGB());
assertEquals(2, a.getMetrics().getAllocatedGB());
assertEquals(4*GB, a.getMetrics().getReservedMB());
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
// Now free 1 container from app_0 i.e. 1G
a.completedContainer(clusterResource, app_0, node_0,
@ -732,8 +732,8 @@ public void testReservation() throws Exception {
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
assertEquals(1*GB, node_0.getUsedResource().getMemory());
assertEquals(4, a.getMetrics().getReservedGB());
assertEquals(1, a.getMetrics().getAllocatedGB());
assertEquals(4*GB, a.getMetrics().getReservedMB());
assertEquals(1*GB, a.getMetrics().getAllocatedMB());
// Now finish another container from app_0 and fulfill the reservation
a.completedContainer(clusterResource, app_0, node_0,
@ -744,8 +744,8 @@ public void testReservation() throws Exception {
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentReservation().getMemory());
assertEquals(4*GB, node_0.getUsedResource().getMemory());
assertEquals(0, a.getMetrics().getReservedGB());
assertEquals(4, a.getMetrics().getAllocatedGB());
assertEquals(0*GB, a.getMetrics().getReservedMB());
assertEquals(4*GB, a.getMetrics().getAllocatedMB());
}
@Test

View File

@ -38,6 +38,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.MockAsm;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@ -45,6 +46,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
import org.apache.hadoop.yarn.webapp.test.WebAppTests;
import org.junit.Test;
@ -74,7 +76,7 @@ public void configure(Binder binder) {
@Test public void testView() {
Injector injector = WebAppTests.createMockInjector(RMContext.class,
mockRMContext(3, 1, 2, 8*GiB),
mockRMContext(15, 1, 2, 8*GiB),
new Module() {
@Override
public void configure(Binder binder) {
@ -85,7 +87,9 @@ public void configure(Binder binder) {
}
}
});
injector.getInstance(RmView.class).render();
RmView rmViewInstance = injector.getInstance(RmView.class);
rmViewInstance.set(YarnWebParams.APP_STATE, RMAppState.RUNNING.toString());
rmViewInstance.render();
WebAppTests.flushOutput(injector);
}

View File

@ -398,19 +398,19 @@ public void verifyClusterMetrics(int sub, int reservedMB, int availableMB,
ResourceScheduler rs = rm.getResourceScheduler();
QueueMetrics metrics = rs.getRootQueueMetrics();
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
final long MB_IN_GB = 1024;
long totalMBExpect = (metrics.getReservedGB() * MB_IN_GB)
+ (metrics.getAvailableGB() * MB_IN_GB)
+ (metrics.getAllocatedGB() * MB_IN_GB);
long totalMBExpect =
metrics.getReservedMB()+ metrics.getAvailableMB()
+ metrics.getAllocatedMB();
assertEquals("appsSubmitted doesn't match", metrics.getAppsSubmitted(), sub);
assertEquals("appsSubmitted doesn't match",
metrics.getAppsSubmitted(), sub);
assertEquals("reservedMB doesn't match",
metrics.getReservedGB() * MB_IN_GB, reservedMB);
assertEquals("availableMB doesn't match", metrics.getAvailableGB()
* MB_IN_GB, availableMB);
assertEquals("allocatedMB doesn't match", metrics.getAllocatedGB()
* MB_IN_GB, allocMB);
metrics.getReservedMB(), reservedMB);
assertEquals("availableMB doesn't match",
metrics.getAvailableMB(), availableMB);
assertEquals("allocatedMB doesn't match",
metrics.getAllocatedMB(), allocMB);
assertEquals("containersAllocated doesn't match", 0, containersAlloc);
assertEquals("totalMB doesn't match", totalMBExpect, totalMB);
assertEquals(

View File

@ -81,6 +81,6 @@ wagon-http.version=1.0-beta-2
xmlenc.version=0.52
xerces.version=1.4.4
jackson.version=1.8.2
jackson.version=1.8.8
yarn.version=0.23.1-SNAPSHOT
hadoop-mapreduce.version=0.23.1-SNAPSHOT

View File

@ -433,18 +433,6 @@
</description>
</property>
<property>
<name>mapreduce.task.tmp.dir</name>
<value>./tmp</value>
<description> To set the value of tmp directory for map and reduce tasks.
If the value is an absolute path, it is directly assigned. Otherwise, it is
prepended with task's working directory. The java tasks are executed with
option -Djava.io.tmpdir='the absolute path of the tmp dir'. Pipes and
streaming are set with environment variable,
TMPDIR='the absolute path of the tmp dir'
</description>
</property>
<property>
<name>mapreduce.map.log.level</name>
<value>INFO</value>

105
hadoop-minicluster/pom.xml Normal file
View File

@ -0,0 +1,105 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-project</artifactId>
<version>0.23.1-SNAPSHOT</version>
<relativePath>../hadoop-project</relativePath>
</parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>0.23.1-SNAPSHOT</version>
<packaging>jar</packaging>
<description>Apache Hadoop Mini-Cluster</description>
<name>Apache Hadoop Mini-Cluster</name>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>compile</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>compile</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<scope>compile</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>compile</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-app</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-hs</artifactId>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@ -524,7 +524,7 @@
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.7.1</version>
<version>1.8.8</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>

View File

@ -29,7 +29,7 @@
<properties>
<hadoop.log.dir>${project.build.directory}/log</hadoop.log.dir>
<test.exclude.pattern>%regex[.*(TestStreamingBadRecords|TestStreamingStatus|TestUlimit).*]</test.exclude.pattern>
<test.exclude.pattern>%regex[.*(TestStreamingStatus).*]</test.exclude.pattern>
</properties>
<dependencies>

View File

@ -154,6 +154,10 @@ private void validateOutput(RunningJob runningJob, boolean validateCount)
}
}
/*
* Disable test as skipping bad records not supported in 0.23
*/
/*
public void testSkip() throws Exception {
JobConf clusterConf = createJobConf();
createInput();
@ -195,7 +199,12 @@ public void testSkip() throws Exception {
//validate that there is no skip directory as it has been set to "none"
assertTrue(SkipBadRecords.getSkipOutputPath(job.jobConf_)==null);
}
*/
/*
* Disable test as skipping bad records not supported in 0.23
*/
/*
public void testNarrowDown() throws Exception {
createInput();
JobConf clusterConf = createJobConf();
@ -231,6 +240,11 @@ public void testNarrowDown() throws Exception {
validateOutput(job.running_, true);
assertTrue(SkipBadRecords.getSkipOutputPath(job.jobConf_)!=null);
}
*/
public void testNoOp() {
// Added to avoid warnings when running this disabled test
}
static class App{
boolean isReducer;

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.util.StringUtils;
import org.junit.Ignore;
import org.junit.Test;
import static org.junit.Assert.*;
@ -52,7 +53,6 @@ public class TestUlimit {
private static String SET_MEMORY_LIMIT = "786432"; // 768MB
String[] genArgs(String memLimit) {
String strJobtracker = JTConfig.JT_IPC_ADDRESS + "=localhost:" + mr.createJobConf().get(JTConfig.JT_IPC_ADDRESS);
String strNamenode = "fs.default.name=" + mr.createJobConf().get("fs.default.name");
return new String[] {
"-input", inputPath.toString(),
@ -63,7 +63,6 @@ String[] genArgs(String memLimit) {
"-jobconf", MRJobConfig.NUM_MAPS + "=1",
"-jobconf", JobConf.MAPRED_MAP_TASK_ULIMIT + "=" + memLimit,
"-jobconf", strNamenode,
"-jobconf", strJobtracker,
"-jobconf", "stream.tmpdir=" +
System.getProperty("test.build.data","/tmp"),
"-jobconf", "mapred.jar=" + TestStreaming.STREAMING_JAR,
@ -79,6 +78,7 @@ String[] genArgs(String memLimit) {
* is expected to be a failure.
*/
@Test
@Ignore
public void testCommandLine() {
if (UtilTest.isCygwin()) {
return;

View File

@ -79,6 +79,8 @@
<module>hadoop-mapreduce-project</module>
<module>hadoop-tools</module>
<module>hadoop-dist</module>
<module>hadoop-client</module>
<module>hadoop-minicluster</module>
</modules>
<build>