Merge r1236386 through r1237583 from 0.23.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1237590 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-01-30 10:17:28 +00:00
commit ab824d9e9b
69 changed files with 1519 additions and 399 deletions

View File

@ -18,4 +18,4 @@
OK_RELEASEAUDIT_WARNINGS=0
OK_FINDBUGS_WARNINGS=0
OK_JAVADOC_WARNINGS=6
OK_JAVADOC_WARNINGS=13

View File

@ -142,10 +142,21 @@ Release 0.23.1 - Unreleased
HADOOP-7919. Remove the unused hadoop.logfile.* properties from the
core-default.xml file. (harsh)
HADOOP-7939. Improve Hadoop subcomponent integration in Hadoop 0.23. (rvs via tucu)
HADOOP-7988. Upper case in hostname part of the principals doesn't work with
kerberos. (jitendra)
HADOOP-8002. SecurityUtil acquired token message should be a debug rather than info.
(Arpit Gupta via mahadev)
OPTIMIZATIONS
BUG FIXES
HADOOP-7998. CheckFileSystem does not correctly honor setVerifyChecksum
(Daryn Sharp via bobby)
HADOOP-7811. TestUserGroupInformation#testGetServerSideGroups test fails in chroot.
(Jonathan Eagles via mahadev)
@ -227,6 +238,15 @@ Release 0.23.1 - Unreleased
HADOOP-7981. Improve documentation for org.apache.hadoop.io.compress.
Decompressor.getRemaining (Jonathan Eagles via mahadev)
HADOOP-7997. SequenceFile.createWriter(...createParent...) no
longer works on existing file. (Gregory Chanan via eli)
HADOOP-7993. Hadoop ignores old-style config options for enabling compressed
output. (Anupam Seth via mahadev)
HADOOP-8000. fetchdt command not available in bin/hadoop.
(Arpit Gupta via mahadev)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -50,7 +50,7 @@ fi
COMMAND=$1
case $COMMAND in
#hdfs commands
namenode|secondarynamenode|datanode|dfs|dfsadmin|fsck|balancer)
namenode|secondarynamenode|datanode|dfs|dfsadmin|fsck|balancer|fetchdt)
echo "DEPRECATED: Use of this script to execute hdfs command is deprecated." 1>&2
echo "Instead use the hdfs command for it." 1>&2
echo "" 1>&2

View File

@ -25,9 +25,21 @@ common_bin=$(cd -P -- "$(dirname -- "$this")" && pwd -P)
script="$(basename -- "$this")"
this="$common_bin/$script"
[ -f "$common_bin/hadoop-layout.sh" ] && . "$common_bin/hadoop-layout.sh"
HADOOP_COMMON_DIR=${HADOOP_COMMON_DIR:-"share/hadoop/common"}
HADOOP_COMMON_LIB_JARS_DIR=${HADOOP_COMMON_LIB_JARS_DIR:-"share/hadoop/common/lib"}
HADOOP_COMMON_LIB_NATIVE_DIR=${HADOOP_COMMON_LIB_NATIVE_DIR:-"lib/native"}
HDFS_DIR=${HDFS_DIR:-"share/hadoop/hdfs"}
HDFS_LIB_JARS_DIR=${HDFS_LIB_JARS_DIR:-"share/hadoop/hdfs/lib"}
YARN_DIR=${YARN_DIR:-"share/hadoop/mapreduce"}
YARN_LIB_JARS_DIR=${YARN_LIB_JARS_DIR:-"share/hadoop/mapreduce/lib"}
MAPRED_DIR=${MAPRED_DIR:-"share/hadoop/mapreduce"}
MAPRED_LIB_JARS_DIR=${MAPRED_LIB_JARS_DIR:-"share/hadoop/mapreduce/lib"}
# the root of the Hadoop installation
# See HADOOP-6255 for directory structure layout
HADOOP_DEFAULT_PREFIX=`dirname "$this"`/..
HADOOP_DEFAULT_PREFIX=$(cd -P -- "$common_bin"/.. && pwd -P)
HADOOP_PREFIX=${HADOOP_PREFIX:-$HADOOP_DEFAULT_PREFIX}
export HADOOP_PREFIX
@ -144,16 +156,22 @@ CLASSPATH="${HADOOP_CONF_DIR}"
# so that filenames w/ spaces are handled correctly in loops below
IFS=
if [ "$HADOOP_COMMON_HOME" = "" ]; then
if [ -d "${HADOOP_PREFIX}/$HADOOP_COMMON_DIR" ]; then
HADOOP_COMMON_HOME=$HADOOP_PREFIX
fi
fi
# for releases, add core hadoop jar & webapps to CLASSPATH
if [ -d "$HADOOP_PREFIX/share/hadoop/common/webapps" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/share/hadoop/common/webapps
if [ -d "$HADOOP_COMMON_HOME/$HADOOP_COMMON_DIR/webapps" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_COMMON_HOME/$HADOOP_COMMON_DIR
fi
if [ -d "$HADOOP_PREFIX/share/hadoop/common/lib" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/share/hadoop/common/lib'/*'
if [ -d "$HADOOP_COMMON_HOME/$HADOOP_COMMON_LIB_JARS_DIR" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_COMMON_HOME/$HADOOP_COMMON_LIB_JARS_DIR'/*'
fi
CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/share/hadoop/common'/*'
CLASSPATH=${CLASSPATH}:$HADOOP_COMMON_HOME/$HADOOP_COMMON_DIR'/*'
# add user-specified CLASSPATH last
if [ "$HADOOP_CLASSPATH" != "" ]; then
@ -185,13 +203,13 @@ fi
# setup 'java.library.path' for native-hadoop code if necessary
if [ -d "${HADOOP_PREFIX}/build/native" -o -d "${HADOOP_PREFIX}/lib/native" ]; then
if [ -d "${HADOOP_PREFIX}/build/native" -o -d "${HADOOP_PREFIX}/$HADOOP_COMMON_LIB_NATIVE_DIR" ]; then
if [ -d "${HADOOP_PREFIX}/lib/native" ]; then
if [ -d "${HADOOP_PREFIX}/$HADOOP_COMMON_LIB_NATIVE_DIR" ]; then
if [ "x$JAVA_LIBRARY_PATH" != "x" ]; then
JAVA_LIBRARY_PATH=${JAVA_LIBRARY_PATH}:${HADOOP_PREFIX}/lib/native
JAVA_LIBRARY_PATH=${JAVA_LIBRARY_PATH}:${HADOOP_PREFIX}/$HADOOP_COMMON_LIB_NATIVE_DIR
else
JAVA_LIBRARY_PATH=${HADOOP_PREFIX}/lib/native
JAVA_LIBRARY_PATH=${HADOOP_PREFIX}/$HADOOP_COMMON_LIB_NATIVE_DIR
fi
fi
fi
@ -216,37 +234,56 @@ HADOOP_OPTS="$HADOOP_OPTS -Djava.net.preferIPv4Stack=true"
# put hdfs in classpath if present
if [ "$HADOOP_HDFS_HOME" = "" ]; then
if [ -d "${HADOOP_PREFIX}/share/hadoop/hdfs" ]; then
if [ -d "${HADOOP_PREFIX}/$HDFS_DIR" ]; then
HADOOP_HDFS_HOME=$HADOOP_PREFIX
fi
fi
if [ -d "$HADOOP_HDFS_HOME/share/hadoop/hdfs/webapps" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/share/hadoop/hdfs
if [ -d "$HADOOP_HDFS_HOME/$HDFS_DIR/webapps" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/$HDFS_DIR
fi
if [ -d "$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib'/*'
if [ -d "$HADOOP_HDFS_HOME/$HDFS_LIB_JARS_DIR" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/$HDFS_LIB_JARS_DIR'/*'
fi
CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/share/hadoop/hdfs'/*'
CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/$HDFS_DIR'/*'
# put yarn in classpath if present
if [ "$YARN_HOME" = "" ]; then
if [ -d "${HADOOP_PREFIX}/share/hadoop/mapreduce" ]; then
if [ -d "${HADOOP_PREFIX}/$YARN_DIR" ]; then
YARN_HOME=$HADOOP_PREFIX
fi
fi
if [ -d "$YARN_HOME/share/hadoop/mapreduce/webapps" ]; then
CLASSPATH=${CLASSPATH}:$YARN_HOME/share/hadoop/mapreduce
if [ -d "$YARN_HOME/$YARN_DIR/webapps" ]; then
CLASSPATH=${CLASSPATH}:$YARN_HOME/$YARN_DIR
fi
if [ -d "$YARN_HOME/share/hadoop/mapreduce/lib" ]; then
CLASSPATH=${CLASSPATH}:$YARN_HOME/share/hadoop/mapreduce/lib'/*'
if [ -d "$YARN_HOME/$YARN_LIB_JARS_DIR" ]; then
CLASSPATH=${CLASSPATH}:$YARN_HOME/$YARN_LIB_JARS_DIR'/*'
fi
CLASSPATH=${CLASSPATH}:$YARN_HOME/share/hadoop/mapreduce'/*'
CLASSPATH=${CLASSPATH}:$YARN_HOME/$YARN_DIR'/*'
# put mapred in classpath if present AND different from YARN
if [ "$HADOOP_MAPRED_HOME" = "" ]; then
if [ -d "${HADOOP_PREFIX}/$MAPRED_DIR" ]; then
HADOOP_MAPRED_HOME=$HADOOP_PREFIX
fi
fi
if [ "$HADOOP_MAPRED_HOME/$MAPRED_DIR" != "$YARN_HOME/$YARN_DIR" ] ; then
if [ -d "$HADOOP_MAPRED_HOME/$MAPRED_DIR/webapps" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/$MAPRED_DIR
fi
if [ -d "$HADOOP_MAPRED_HOME/$MAPRED_LIB_JARS_DIR" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/$MAPRED_LIB_JARS_DIR'/*'
fi
CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/$MAPRED_DIR'/*'
fi
# cygwin path translation
if $cygwin; then

View File

@ -345,7 +345,17 @@ private String handleDeprecation(String name) {
}
return name;
}
private void handleDeprecation() {
LOG.debug("Handling deprecation for all properties in config...");
Set<Object> keys = new HashSet<Object>();
keys.addAll(getProps().keySet());
for (Object item: keys) {
LOG.debug("Handling deprecation for " + (String)item);
handleDeprecation((String)item);
}
}
static{
//print deprecation warning if hadoop-site.xml is found in classpath
ClassLoader cL = Thread.currentThread().getContextClassLoader();
@ -1665,7 +1675,7 @@ private synchronized Document asXmlDocument() throws IOException {
Element conf = doc.createElement("configuration");
doc.appendChild(conf);
conf.appendChild(doc.createTextNode("\n"));
getProps(); // ensure properties is set
handleDeprecation(); //ensure properties is set and deprecation is handled
for (Enumeration e = properties.keys(); e.hasMoreElements();) {
String name = (String)e.nextElement();
Object object = properties.get(name);

View File

@ -304,8 +304,9 @@ public synchronized void seek(long pos) throws IOException {
*/
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return new FSDataInputStream(
new ChecksumFSInputChecker(this, f, bufferSize));
return verifyChecksum
? new FSDataInputStream(new ChecksumFSInputChecker(this, f, bufferSize))
: getRawFileSystem().open(f, bufferSize);
}
/** {@inheritDoc} */

View File

@ -467,7 +467,7 @@ public static Writer createWriter(Configuration conf, Writer.Option... opts
Metadata metadata) throws IOException {
return createWriter(FileContext.getFileContext(fs.getUri(), conf),
conf, name, keyClass, valClass, compressionType, codec,
metadata, EnumSet.of(CreateFlag.CREATE),
metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE),
CreateOpts.bufferSize(bufferSize),
createParent ? CreateOpts.createParent()
: CreateOpts.donotCreateParent(),

View File

@ -236,7 +236,7 @@ private static String replacePattern(String[] components, String hostname)
if (fqdn == null || fqdn.equals("") || fqdn.equals("0.0.0.0")) {
fqdn = getLocalHostName();
}
return components[0] + "/" + fqdn + "@" + components[2];
return components[0] + "/" + fqdn.toLowerCase() + "@" + components[2];
}
static String getLocalHostName() throws UnknownHostException {
@ -409,7 +409,9 @@ public static void setTokenService(Token<?> token, InetSocketAddress addr) {
Text service = buildTokenService(addr);
if (token != null) {
token.setService(service);
LOG.info("Acquired token "+token); // Token#toString() prints service
if (LOG.isDebugEnabled()) {
LOG.debug("Acquired token "+token); // Token#toString() prints service
}
} else {
LOG.warn("Failed to get token for service "+service);
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.conf;
import java.io.ByteArrayOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@ -32,4 +34,22 @@ public void testDeprecatedKeys() throws Exception {
String scriptFile = conf.get(CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY);
assertTrue(scriptFile.equals("xyz")) ;
}
//Tests reading / writing a conf file with deprecation after setting
public void testReadWriteWithDeprecatedKeys() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean("old.config.yet.to.be.deprecated", true);
Configuration.addDeprecation("old.config.yet.to.be.deprecated",
new String[]{"new.conf.to.replace.deprecated.conf"});
ByteArrayOutputStream out=new ByteArrayOutputStream();
String fileContents;
try {
conf.writeXml(out);
fileContents = out.toString();
} finally {
out.close();
}
assertTrue(fileContents.contains("old.config.yet.to.be.deprecated"));
assertTrue(fileContents.contains("new.conf.to.replace.deprecated.conf"));
}
}

View File

@ -22,12 +22,22 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import static org.apache.hadoop.fs.FileSystemTestHelper.*;
import org.apache.hadoop.conf.Configuration;
import junit.framework.TestCase;
import org.junit.*;
import static org.junit.Assert.*;
public class TestChecksumFileSystem extends TestCase {
public class TestChecksumFileSystem {
static final String TEST_ROOT_DIR
= System.getProperty("test.build.data","build/test/data/work-dir/localfs");
static LocalFileSystem localFs;
@Before
public void resetLocalFs() throws Exception {
localFs = FileSystem.getLocal(new Configuration());
localFs.setVerifyChecksum(true);
}
@Test
public void testgetChecksumLength() throws Exception {
assertEquals(8, ChecksumFileSystem.getChecksumLength(0L, 512));
assertEquals(12, ChecksumFileSystem.getChecksumLength(1L, 512));
@ -40,9 +50,8 @@ public void testgetChecksumLength() throws Exception {
ChecksumFileSystem.getChecksumLength(10000000000000L, 10));
}
@Test
public void testVerifyChecksum() throws Exception {
Configuration conf = new Configuration();
LocalFileSystem localFs = FileSystem.getLocal(conf);
Path testPath = new Path(TEST_ROOT_DIR, "testPath");
Path testPath11 = new Path(TEST_ROOT_DIR, "testPath11");
FSDataOutputStream fout = localFs.create(testPath);
@ -68,7 +77,7 @@ public void testVerifyChecksum() throws Exception {
//copying the wrong checksum file
FileUtil.copy(localFs, localFs.getChecksumFile(testPath11), localFs,
localFs.getChecksumFile(testPath),false,true,conf);
localFs.getChecksumFile(testPath),false,true,localFs.getConf());
assertTrue("checksum exists", localFs.exists(localFs.getChecksumFile(testPath)));
boolean errorRead = false;
@ -80,20 +89,13 @@ public void testVerifyChecksum() throws Exception {
assertTrue("error reading", errorRead);
//now setting verify false, the read should succeed
try {
localFs.setVerifyChecksum(false);
String str = readFile(localFs, testPath, 1024).toString();
assertTrue("read", "testing".equals(str));
} finally {
// reset for other tests
localFs.setVerifyChecksum(true);
}
localFs.setVerifyChecksum(false);
String str = readFile(localFs, testPath, 1024).toString();
assertTrue("read", "testing".equals(str));
}
@Test
public void testMultiChunkFile() throws Exception {
Configuration conf = new Configuration();
LocalFileSystem localFs = FileSystem.getLocal(conf);
Path testPath = new Path(TEST_ROOT_DIR, "testMultiChunk");
FSDataOutputStream fout = localFs.create(testPath);
for (int i = 0; i < 1000; i++) {
@ -116,9 +118,8 @@ public void testMultiChunkFile() throws Exception {
* Test to ensure that if the checksum file is truncated, a
* ChecksumException is thrown
*/
@Test
public void testTruncatedChecksum() throws Exception {
Configuration conf = new Configuration();
LocalFileSystem localFs = FileSystem.getLocal(conf);
Path testPath = new Path(TEST_ROOT_DIR, "testtruncatedcrc");
FSDataOutputStream fout = localFs.create(testPath);
fout.write("testing truncation".getBytes());
@ -146,14 +147,60 @@ public void testTruncatedChecksum() throws Exception {
}
// telling it not to verify checksums, should avoid issue.
localFs.setVerifyChecksum(false);
String str = readFile(localFs, testPath, 1024).toString();
assertTrue("read", "testing truncation".equals(str));
}
@Test
public void testStreamType() throws Exception {
Path testPath = new Path(TEST_ROOT_DIR, "testStreamType");
localFs.create(testPath).close();
FSDataInputStream in = null;
localFs.setVerifyChecksum(true);
in = localFs.open(testPath);
assertTrue("stream is input checker",
in.getWrappedStream() instanceof FSInputChecker);
localFs.setVerifyChecksum(false);
in = localFs.open(testPath);
assertFalse("stream is not input checker",
in.getWrappedStream() instanceof FSInputChecker);
}
@Test
public void testCorruptedChecksum() throws Exception {
Path testPath = new Path(TEST_ROOT_DIR, "testCorruptChecksum");
Path checksumPath = localFs.getChecksumFile(testPath);
// write a file to generate checksum
FSDataOutputStream out = localFs.create(testPath, true);
out.write("testing 1 2 3".getBytes());
out.close();
assertTrue(localFs.exists(checksumPath));
FileStatus stat = localFs.getFileStatus(checksumPath);
// alter file directly so checksum is invalid
out = localFs.getRawFileSystem().create(testPath, true);
out.write("testing stale checksum".getBytes());
out.close();
assertTrue(localFs.exists(checksumPath));
// checksum didn't change on disk
assertEquals(stat, localFs.getFileStatus(checksumPath));
Exception e = null;
try {
localFs.setVerifyChecksum(false);
String str = readFile(localFs, testPath, 1024).toString();
assertTrue("read", "testing truncation".equals(str));
} finally {
// reset for other tests
localFs.setVerifyChecksum(true);
readFile(localFs, testPath, 1024);
} catch (ChecksumException ce) {
e = ce;
} finally {
assertNotNull("got checksum error", e);
}
localFs.setVerifyChecksum(false);
String str = readFile(localFs, testPath, 1024);
assertEquals("testing stale checksum", str);
}
}

View File

@ -517,6 +517,23 @@ protected FSDataInputStream openFile(FileSystem fs, Path file, int bufferSize, l
assertTrue("InputStream for " + path + " should have been closed.", openedFile[0].isClosed());
}
/**
* Test that makes sure createWriter succeeds on a file that was
* already created
* @throws IOException
*/
public void testCreateWriterOnExistingFile() throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
Path name = new Path(new Path(System.getProperty("test.build.data","."),
"createWriterOnExistingFile") , "file");
fs.create(name);
SequenceFile.createWriter(fs, conf, name, RandomDatum.class,
RandomDatum.class, 512, (short) 1, 4096, false,
CompressionType.NONE, null, new Metadata());
}
public void testRecursiveSeqFileCreate() throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);

View File

@ -89,6 +89,16 @@ public void testGetServerPrincipal() throws IOException {
Mockito.verify(notUsed, Mockito.never()).getCanonicalHostName();
}
@Test
public void testPrincipalsWithLowerCaseHosts() throws IOException {
String service = "xyz/";
String realm = "@REALM";
String principalInConf = service + SecurityUtil.HOSTNAME_PATTERN + realm;
String hostname = "FooHost";
String principal = service + hostname.toLowerCase() + realm;
verify(principalInConf, hostname, principal);
}
@Test
public void testLocalHostNameForNullOrWild() throws Exception {
String local = SecurityUtil.getLocalHostName();

View File

@ -55,8 +55,8 @@ if [ "${1}" = "stop" ]; then
fi
if [ "${HTTPFS_SILENT}" != "true" ]; then
${BASEDIR}/share/hadoop/httpfs/tomcat/bin/catalina.sh "$@"
${CATALINA_BASE:-"${BASEDIR}/share/hadoop/httpfs/tomcat"}/bin/catalina.sh "$@"
else
${BASEDIR}/share/hadoop/httpfs/tomcat/bin/catalina.sh "$@" > /dev/null
${CATALINA_BASE:-"${BASEDIR}/share/hadoop/httpfs/tomcat"}/bin/catalina.sh "$@" > /dev/null
fi

View File

@ -47,7 +47,7 @@ public void hostname() throws Exception {
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse)
throws IOException, ServletException {
Assert.assertEquals(HostnameFilter.get(), "localhost");
Assert.assertTrue(HostnameFilter.get().contains("localhost"));
invoked.set(true);
}
};

View File

@ -295,6 +295,11 @@ Release 0.23.1 - UNRELEASED
HDFS-2837. mvn javadoc:javadoc not seeing LimitedPrivate class (revans2 via tucu)
HDFS-2840. TestHostnameFilter should work with localhost or localhost.localdomain (tucu)
HDFS-2791. If block report races with closing of file, replica is
incorrectly marked corrupt. (todd)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -22,8 +22,6 @@ bin=`which "$0"`
bin=`dirname "${bin}"`
bin=`cd "$bin"; pwd`
export HADOOP_PREFIX="${HADOOP_PREFIX:-$bin/..}"
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
if [ -e "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh" ]; then

View File

@ -1571,7 +1571,24 @@ private boolean isReplicaCorrupt(Block iblk, ReplicaState reportedState,
}
case RBW:
case RWR:
return storedBlock.isComplete();
if (!storedBlock.isComplete()) {
return false;
} else if (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()) {
return true;
} else { // COMPLETE block, same genstamp
if (reportedState == ReplicaState.RBW) {
// If it's a RBW report for a COMPLETE block, it may just be that
// the block report got a little bit delayed after the pipeline
// closed. So, ignore this report, assuming we will get a
// FINALIZED replica later. See HDFS-2791
LOG.info("Received an RBW replica for block " + storedBlock +
" on " + dn.getName() + ": ignoring it, since the block is " +
"complete with the same generation stamp.");
return false;
} else {
return true;
}
}
case RUR: // should not be reported
case TEMPORARY: // should not be reported
default:

View File

@ -781,4 +781,13 @@ private void startDistributedUpgradeIfNeeded() throws IOException {
return;
}
@VisibleForTesting
DatanodeProtocol getBpNamenode() {
return bpNamenode;
}
@VisibleForTesting
void setBpNamenode(DatanodeProtocol bpNamenode) {
this.bpNamenode = bpNamenode;
}
}

View File

@ -101,7 +101,7 @@ public static FileSystem createHdfsWithDifferentUsername(final Configuration con
return DFSTestUtil.getFileSystemAs(ugi, conf);
}
static void write(OutputStream out, int offset, int length) throws IOException {
public static void write(OutputStream out, int offset, int length) throws IOException {
final byte[] bytes = new byte[length];
for(int i = 0; i < length; i++) {
bytes[i] = (byte)(offset + i);

View File

@ -17,6 +17,15 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.test.GenericTestUtils;
import org.mockito.Mockito;
import com.google.common.base.Preconditions;
/**
* WARNING!! This is TEST ONLY class: it never has to be used
* for ANY development purposes.
@ -42,4 +51,34 @@ public static void setHeartbeatsDisabledForTests(DataNode dn,
boolean heartbeatsDisabledForTests) {
dn.setHeartbeatsDisabledForTests(heartbeatsDisabledForTests);
}
/**
* Insert a Mockito spy object between the given DataNode and
* the given NameNode. This can be used to delay or wait for
* RPC calls on the datanode->NN path.
*/
public static DatanodeProtocol spyOnBposToNN(
DataNode dn, NameNode nn) {
String bpid = nn.getNamesystem().getBlockPoolId();
BPOfferService bpos = null;
for (BPOfferService thisBpos : dn.getAllBpOs()) {
if (thisBpos.getBlockPoolId().equals(bpid)) {
bpos = thisBpos;
break;
}
}
Preconditions.checkArgument(bpos != null,
"No such bpid: %s", bpid);
// When protobufs are merged, the following can be converted
// to a simple spy. Because you can't spy on proxy objects,
// we have to use the DelegateAnswer trick.
DatanodeProtocol origNN = bpos.getBpNamenode();
DatanodeProtocol spy = Mockito.mock(DatanodeProtocol.class,
new GenericTestUtils.DelegateAnswer(origNN));
bpos.setBpNamenode(spy);
return spy;
}
}

View File

@ -21,7 +21,9 @@
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -35,14 +37,19 @@
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
import org.apache.log4j.Level;
import org.junit.After;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import java.io.File;
import java.io.FilenameFilter;
@ -50,6 +57,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
/**
* This test simulates a variety of situations when blocks are being
@ -491,6 +499,84 @@ public void blockReport_09() throws IOException {
resetConfiguration(); // return the initial state of the configuration
}
}
/**
* Test for the case where one of the DNs in the pipeline is in the
* process of doing a block report exactly when the block is closed.
* In this case, the block report becomes delayed until after the
* block is marked completed on the NN, and hence it reports an RBW
* replica for a COMPLETE block. Such a report should not be marked
* corrupt.
* This is a regression test for HDFS-2791.
*/
@Test
public void testOneReplicaRbwReportArrivesAfterBlockCompleted() throws Exception {
final CountDownLatch brFinished = new CountDownLatch(1);
DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG) {
@Override
protected Object passThrough(InvocationOnMock invocation)
throws Throwable {
try {
return super.passThrough(invocation);
} finally {
// inform the test that our block report went through.
brFinished.countDown();
}
}
};
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat");
// Start a second DN for this test -- we're checking
// what happens when one of the DNs is slowed for some reason.
REPL_FACTOR = 2;
startDNandWait(null, false);
NameNode nn = cluster.getNameNode();
FSDataOutputStream out = fs.create(filePath, REPL_FACTOR);
try {
AppendTestUtil.write(out, 0, 10);
out.hflush();
// Set up a spy so that we can delay the block report coming
// from this node.
DataNode dn = cluster.getDataNodes().get(0);
DatanodeProtocol spy =
DataNodeAdapter.spyOnBposToNN(dn, nn);
Mockito.doAnswer(delayer)
.when(spy).blockReport(
Mockito.<DatanodeRegistration>anyObject(),
Mockito.anyString(),
Mockito.<long[]>anyObject());
// Force a block report to be generated. The block report will have
// an RBW replica in it. Wait for the RPC to be sent, but block
// it before it gets to the NN.
dn.scheduleAllBlockReport(0);
delayer.waitForCall();
} finally {
IOUtils.closeStream(out);
}
// Now that the stream is closed, the NN will have the block in COMPLETE
// state.
delayer.proceed();
brFinished.await();
// Verify that no replicas are marked corrupt, and that the
// file is still readable.
BlockManagerTestUtil.updateState(nn.getNamesystem().getBlockManager());
assertEquals(0, nn.getNamesystem().getCorruptReplicaBlocks());
DFSTestUtil.readFile(fs, filePath);
// Ensure that the file is readable even from the DN that we futzed with.
cluster.stopDataNode(1);
DFSTestUtil.readFile(fs, filePath);
}
private void waitForTempReplica(Block bl, int DN_N1) throws IOException {
final boolean tooLongWait = false;

View File

@ -36,6 +36,8 @@ Release 0.23.1 - Unreleased
MAPREDUCE-778. Rumen Anonymizer. (Amar Kamat and Chris Douglas via amarrk)
IMPROVEMENTS
MAPREDUCE-3481. [Gridmix] Improve Gridmix STRESS mode. (amarrk)
MAPREDUCE-3597. [Rumen] Rumen should provide APIs to access all the
job-history related information.
@ -153,6 +155,15 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3737. The Web Application Proxy's is not documented very well.
(Robert Evans via mahadev)
MAPREDUCE-3699. Increased RPC handlers for all YARN servers to reasonable
values for working at scale. (Hitesh Shah via vinodkv)
MAPREDUCE-3693. Added mapreduce.admin.user.env to mapred-default.xml.
(Roman Shapshonik via acmurthy)
MAPREDUCE-3732. Modified CapacityScheduler to use only users with pending
requests for computing user-limits. (Arun C Murthy via vinodkv)
OPTIMIZATIONS
MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar
@ -179,7 +190,13 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3718. Change default AM heartbeat interval to 1 second. (Hitesh
Shah via sseth)
MAPREDUCE-3360. Added information about lost/rebooted/decommissioned nodes
on the webapps. (Bhallamudi Venkata Siva Kamesh and Jason Lowe via vinodkv)
BUG FIXES
MAPREDUCE-2784. [Gridmix] Bug fixes in ExecutionSummarizer and
ResourceUsageMatcher. (amarrk)
MAPREDUCE-3194. "mapred mradmin" command is broken in mrv2
(Jason Lowe via bobby)
@ -558,6 +575,12 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3735. Add distcp jar to the distribution (tar).
(mahadev)
MAPREDUCE-3720. Changed bin/mapred job -list to not print job-specific
information not available at RM. (vinodkv via acmurthy)
MAPREDUCE-3742. "yarn logs" command fails with ClassNotFoundException.
(Jason Lowe via mahadev)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -91,15 +91,15 @@ if [ -d "$HADOOP_MAPRED_HOME/build/tools" ]; then
fi
# for releases, add core mapred jar & webapps to CLASSPATH
if [ -d "$HADOOP_PREFIX/share/hadoop/mapreduce/webapps" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/share/hadoop/mapreduce
if [ -d "$HADOOP_PREFIX/${MAPRED_DIR}/webapps" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/${MAPRED_DIR}
fi
for f in $HADOOP_MAPRED_HOME/share/hadoop-mapreduce/*.jar; do
for f in $HADOOP_MAPRED_HOME/${MAPRED_DIR}/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
# add libs to CLASSPATH
for f in $HADOOP_MAPRED_HOME/lib/*.jar; do
for f in $HADOOP_MAPRED_HOME/${MAPRED_LIB_JARS_DIR}/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done

View File

@ -455,10 +455,14 @@ private void setCluster(Cluster cluster) {
public String toString() {
ensureState(JobState.RUNNING);
String reasonforFailure = " ";
int numMaps = 0;
int numReduces = 0;
try {
updateStatus();
if (status.getState().equals(JobStatus.State.FAILED))
reasonforFailure = getTaskFailureEventString();
numMaps = getTaskReports(TaskType.MAP).length;
numReduces = getTaskReports(TaskType.REDUCE).length;
} catch (IOException e) {
} catch (InterruptedException ie) {
}
@ -468,6 +472,8 @@ public String toString() {
sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
sb.append("\n");
sb.append("Uber job : ").append(status.isUber()).append("\n");
sb.append("Number of maps: ").append(numMaps);
sb.append("Number of reduces: ").append(numReduces);
sb.append("map() completion: ");
sb.append(status.getMapProgress()).append("\n");
sb.append("reduce() completion: ");

View File

@ -412,7 +412,7 @@ public interface MRJobConfig {
/** The number of threads used to handle task RPC calls.*/
public static final String MR_AM_TASK_LISTENER_THREAD_COUNT =
MR_AM_PREFIX + "job.task.listener.thread-count";
public static final int DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT = 10;
public static final int DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT = 30;
/** How often the AM should send heartbeats to the RM.*/
public static final String MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS =

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.tools;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
@ -25,6 +26,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.ipc.RemoteException;
@ -560,25 +562,28 @@ protected void displayTasks(Job job, String type, String state)
}
}
}
public void displayJobList(JobStatus[] jobs)
throws IOException, InterruptedException {
System.out.println("Total jobs:" + jobs.length);
System.out.println("JobId\tState\tStartTime\t" +
"UserName\tQueue\tPriority\tMaps\tReduces\tUsedContainers\t" +
"RsvdContainers\tUsedMem\tRsvdMem\tNeededMem\tAM info");
for (JobStatus job : jobs) {
TaskReport[] mapReports =
cluster.getJob(job.getJobID()).getTaskReports(TaskType.MAP);
TaskReport[] reduceReports =
cluster.getJob(job.getJobID()).getTaskReports(TaskType.REDUCE);
displayJobList(jobs, new PrintWriter(System.out));
}
System.out.printf("%s\t%s\t%d\t%s\t%s\t%s\t%d\t%d\t%d\t%d\t%dM\t%dM\t%dM\t%s\n",
@Private
public static String headerPattern = "%23s\t%10s\t%14s\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
@Private
public static String dataPattern = "%23s\t%10s\t%14d\t%12s\t%12s\t%10s\t%14d\t%14d\t%7dM\t%7sM\t%9dM\t%10s\n";
@Private
public void displayJobList(JobStatus[] jobs, PrintWriter writer) {
writer.println("Total jobs:" + jobs.length);
writer.printf(headerPattern, "JobId", "State", "StartTime", "UserName",
"Queue", "Priority", "UsedContainers",
"RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info");
for (JobStatus job : jobs) {
writer.printf(dataPattern,
job.getJobID().toString(), job.getState(), job.getStartTime(),
job.getUsername(), job.getQueue(),
job.getPriority().name(),
mapReports.length,
reduceReports.length,
job.getNumUsedSlots(),
job.getNumReservedSlots(),
job.getUsedMem(),
@ -586,6 +591,7 @@ public void displayJobList(JobStatus[] jobs)
job.getNeededMem(),
job.getSchedulingInfo());
}
writer.flush();
}
public static void main(String[] argv) throws Exception {

View File

@ -440,6 +440,16 @@
</description>
</property>
<property>
<name>mapreduce.admin.user.env</name>
<value>LD_LIBRARY_PATH=$HADOOP_COMMON_HOME/lib/native</value>
<description>Expert: Additional execution environment entries for
map and reduce task processes. This is not an additive property.
You must preserve the original value if you want your map and
reduce tasks to have access to native libraries (compression, etc).
</description>
</property>
<property>
<name>mapreduce.task.tmp.dir</name>
<value>./tmp</value>
@ -1224,4 +1234,18 @@
mapreduce.job.end-notification.max.retry.interval</description>
</property>
<property>
<name>yarn.app.mapreduce.am.job.task.listener.thread-count</name>
<value>30</value>
<description>The number of threads used to handle RPC calls in the
MR AppMaster from remote tasks</description>
</property>
<property>
<name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name>
<value>1000</value>
<description>The interval in ms at which the MR AppMaster should send
heartbeats to the ResourceManager</description>
</property>
</configuration>

View File

@ -22,19 +22,24 @@
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.hadoop.mapred.JobConf;
import java.io.PrintWriter;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobPriority;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapreduce.TaskType;
import org.junit.Assert;
import org.junit.Test;
@SuppressWarnings("deprecation")
public class JobClientUnitTest {
public class TestJobClient extends JobClient {
@ -48,7 +53,6 @@ void setCluster(Cluster cluster) {
}
}
@SuppressWarnings("deprecation")
@Test
public void testMapTaskReportsWithNullJob() throws Exception {
TestJobClient client = new TestJobClient(new JobConf());
@ -64,7 +68,6 @@ public void testMapTaskReportsWithNullJob() throws Exception {
verify(mockCluster).getJob(id);
}
@SuppressWarnings("deprecation")
@Test
public void testReduceTaskReportsWithNullJob() throws Exception {
TestJobClient client = new TestJobClient(new JobConf());
@ -80,7 +83,6 @@ public void testReduceTaskReportsWithNullJob() throws Exception {
verify(mockCluster).getJob(id);
}
@SuppressWarnings("deprecation")
@Test
public void testSetupTaskReportsWithNullJob() throws Exception {
TestJobClient client = new TestJobClient(new JobConf());
@ -96,7 +98,6 @@ public void testSetupTaskReportsWithNullJob() throws Exception {
verify(mockCluster).getJob(id);
}
@SuppressWarnings("deprecation")
@Test
public void testCleanupTaskReportsWithNullJob() throws Exception {
TestJobClient client = new TestJobClient(new JobConf());
@ -115,12 +116,15 @@ public void testCleanupTaskReportsWithNullJob() throws Exception {
@Test
public void testShowJob() throws Exception {
TestJobClient client = new TestJobClient(new JobConf());
JobID jobID = new JobID("test", 0);
long startTime = System.currentTimeMillis();
JobID jobID = new JobID(String.valueOf(startTime), 12345);
JobStatus mockJobStatus = mock(JobStatus.class);
when(mockJobStatus.getJobID()).thenReturn(jobID);
when(mockJobStatus.getState()).thenReturn(JobStatus.State.RUNNING);
when(mockJobStatus.getStartTime()).thenReturn(0L);
when(mockJobStatus.getStartTime()).thenReturn(startTime);
when(mockJobStatus.getUsername()).thenReturn("mockuser");
when(mockJobStatus.getQueue()).thenReturn("mockqueue");
when(mockJobStatus.getPriority()).thenReturn(JobPriority.NORMAL);
@ -132,18 +136,21 @@ public void testShowJob() throws Exception {
when(mockJobStatus.getSchedulingInfo()).thenReturn("NA");
Job mockJob = mock(Job.class);
when(mockJob.getTaskReports(isA(TaskType.class))).thenReturn(new TaskReport[0]);
when(mockJob.getTaskReports(isA(TaskType.class))).thenReturn(
new TaskReport[5]);
Cluster mockCluster = mock(Cluster.class);
when(mockCluster.getJob(jobID)).thenReturn(mockJob);
client.setCluster(mockCluster);
client.displayJobList(new JobStatus[] {mockJobStatus});
ByteArrayOutputStream out = new ByteArrayOutputStream();
client.displayJobList(new JobStatus[] {mockJobStatus}, new PrintWriter(out));
String commandLineOutput = out.toString();
System.out.println(commandLineOutput);
Assert.assertTrue(commandLineOutput.contains("Total jobs:1"));
verify(mockJobStatus, atLeastOnce()).getJobID();
verify(mockJob, atLeastOnce()).getTaskReports(isA(TaskType.class));
verify(mockCluster, atLeastOnce()).getJob(jobID);
verify(mockJobStatus).getState();
verify(mockJobStatus).getStartTime();
verify(mockJobStatus).getUsername();
@ -155,5 +162,9 @@ public void testShowJob() throws Exception {
verify(mockJobStatus).getReservedMem();
verify(mockJobStatus).getNeededMem();
verify(mockJobStatus).getSchedulingInfo();
// This call should not go to each AM.
verify(mockCluster, never()).getJob(jobID);
verify(mockJob, never()).getTaskReports(isA(TaskType.class));
}
}

View File

@ -140,8 +140,8 @@ if [ -d "$YARN_HOME/build/tools" ]; then
CLASSPATH=${CLASSPATH}:$YARN_HOME/build/tools
fi
CLASSPATH=${CLASSPATH}:$YARN_HOME/share/hadoop/mapreduce/*
CLASSPATH=${CLASSPATH}:$YARN_HOME/share/hadoop/mapreduce/lib/*
CLASSPATH=${CLASSPATH}:$YARN_HOME/${YARN_DIR}/*
CLASSPATH=${CLASSPATH}:$YARN_HOME/${YARN_LIB_JARS_DIR}/*
# so that filenames w/ spaces are handled correctly in loops below
IFS=
@ -194,7 +194,7 @@ elif [ "$COMMAND" = "jar" ] ; then
CLASS=org.apache.hadoop.util.RunJar
YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS"
elif [ "$COMMAND" = "logs" ] ; then
CLASS=org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogDumper
CLASS=org.apache.hadoop.yarn.logaggregation.LogDumper
YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS"
elif [ "$COMMAND" = "daemonlog" ] ; then
CLASS=org.apache.hadoop.log.LogLevel

View File

@ -19,8 +19,6 @@ bin=`which "$0"`
bin=`dirname "${bin}"`
bin=`cd "$bin"; pwd`
export HADOOP_PREFIX="${HADOOP_PREFIX:-$bin/..}"
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
if [ -e "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh" ]; then

View File

@ -90,7 +90,7 @@ public class YarnConfiguration extends Configuration {
/** The number of threads used to handle applications manager requests.*/
public static final String RM_CLIENT_THREAD_COUNT =
RM_PREFIX + "client.thread-count";
public static final int DEFAULT_RM_CLIENT_THREAD_COUNT = 10;
public static final int DEFAULT_RM_CLIENT_THREAD_COUNT = 50;
/** The Kerberos principal for the resource manager.*/
public static final String RM_PRINCIPAL =
@ -106,7 +106,7 @@ public class YarnConfiguration extends Configuration {
/** Number of threads to handle scheduler interface.*/
public static final String RM_SCHEDULER_CLIENT_THREAD_COUNT =
RM_PREFIX + "scheduler.client.thread-count";
public static final int DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT = 10;
public static final int DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT = 50;
/** The address of the RM web application.*/
public static final String RM_WEBAPP_ADDRESS =
@ -184,7 +184,7 @@ public class YarnConfiguration extends Configuration {
/** Number of threads to handle resource tracker calls.*/
public static final String RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT =
RM_PREFIX + "resource-tracker.client.thread-count";
public static final int DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT = 10;
public static final int DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT = 50;
/** The class to use as the resource scheduler.*/
public static final String RM_SCHEDULER =
@ -257,7 +257,7 @@ public class YarnConfiguration extends Configuration {
/** Number of threads container manager uses.*/
public static final String NM_CONTAINER_MGR_THREAD_COUNT =
NM_PREFIX + "container-manager.thread-count";
public static final int DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT = 5;
public static final int DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT = 20;
/** Number of threads used in cleanup.*/
public static final String NM_DELETE_THREAD_COUNT =

View File

@ -67,7 +67,7 @@
<property>
<description>The number of threads used to handle applications manager requests.</description>
<name>yarn.resourcemanager.client.thread-count</name>
<value>10</value>
<value>50</value>
</property>
<property>
@ -90,7 +90,7 @@
<property>
<description>Number of threads to handle scheduler interface.</description>
<name>yarn.resourcemanager.scheduler.client.thread-count</name>
<value>10</value>
<value>50</value>
</property>
<property>
@ -179,7 +179,7 @@
<property>
<description>Number of threads to handle resource tracker calls.</description>
<name>yarn.resourcemanager.resource-tracker.client.thread-count</name>
<value>10</value>
<value>50</value>
</property>
<property>
@ -244,7 +244,7 @@
<property>
<description>Number of threads container manager uses.</description>
<name>yarn.nodemanager.container-manager.thread-count</name>
<value>5</value>
<value>20</value>
</property>
<property>

View File

@ -29,7 +29,6 @@
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@ -39,9 +38,9 @@ public class ClusterMetrics {
private static AtomicBoolean isInitialized = new AtomicBoolean(false);
@Metric("# of NMs") MutableGaugeInt numNMs;
@Metric("# of decommissioned NMs") MutableCounterInt numDecommissionedNMs;
@Metric("# of lost NMs") MutableCounterInt numLostNMs;
@Metric("# of active NMs") MutableGaugeInt numNMs;
@Metric("# of decommissioned NMs") MutableGaugeInt numDecommissionedNMs;
@Metric("# of lost NMs") MutableGaugeInt numLostNMs;
@Metric("# of unhealthy NMs") MutableGaugeInt numUnhealthyNMs;
@Metric("# of Rebooted NMs") MutableGaugeInt numRebootedNMs;
@ -73,8 +72,8 @@ private static void registerMetrics() {
}
}
//Total Nodemanagers
public int getNumNMs() {
//Active Nodemanagers
public int getNumActiveNMs() {
return numNMs.value();
}
@ -87,6 +86,10 @@ public void incrDecommisionedNMs() {
numDecommissionedNMs.incr();
}
public void decrDecommisionedNMs() {
numDecommissionedNMs.decr();
}
//Lost NMs
public int getNumLostNMs() {
return numLostNMs.value();
@ -96,6 +99,10 @@ public void incrNumLostNMs() {
numLostNMs.incr();
}
public void decrNumLostNMs() {
numLostNMs.decr();
}
//Unhealthy NMs
public int getUnhealthyNMs() {
return numUnhealthyNMs.value();
@ -118,6 +125,10 @@ public void incrNumRebootedNMs() {
numRebootedNMs.incr();
}
public void decrNumRebootedNMs() {
numRebootedNMs.decr();
}
public void removeNode(RMNodeEventType nodeEventType) {
numNMs.decr();
switch(nodeEventType){

View File

@ -43,6 +43,8 @@ public interface RMContext {
ApplicationsStore getApplicationsStore();
ConcurrentMap<ApplicationId, RMApp> getRMApps();
ConcurrentMap<String, RMNode> getInactiveRMNodes();
ConcurrentMap<NodeId, RMNode> getRMNodes();

View File

@ -43,6 +43,9 @@ public class RMContextImpl implements RMContext {
private final ConcurrentMap<NodeId, RMNode> nodes
= new ConcurrentHashMap<NodeId, RMNode>();
private final ConcurrentMap<String, RMNode> inactiveNodes
= new ConcurrentHashMap<String, RMNode>();
private AMLivelinessMonitor amLivelinessMonitor;
private ContainerAllocationExpirer containerAllocationExpirer;
@ -83,6 +86,11 @@ public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
public ConcurrentMap<NodeId, RMNode> getRMNodes() {
return this.nodes;
}
@Override
public ConcurrentMap<String, RMNode> getInactiveRMNodes() {
return this.inactiveNodes;
}
@Override
public ContainerAllocationExpirer getContainerAllocationExpirer() {

View File

@ -220,10 +220,6 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
if (rmNode == null) {
/* node does not exist */
LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId());
// Updating the metrics directly as reboot event cannot be
// triggered on a null rmNode
ClusterMetrics.getMetrics().incrNumRebootedNMs();
return reboot;
}

View File

@ -119,7 +119,7 @@ RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
RMNodeEventType.DECOMMISSION, new RemoveNodeTransition())
.addTransition(RMNodeState.RUNNING, RMNodeState.LOST,
RMNodeEventType.EXPIRE, new RemoveNodeTransition())
.addTransition(RMNodeState.RUNNING, RMNodeState.LOST,
.addTransition(RMNodeState.RUNNING, RMNodeState.REBOOTED,
RMNodeEventType.REBOOTING, new RemoveNodeTransition())
.addTransition(RMNodeState.RUNNING, RMNodeState.RUNNING,
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
@ -307,6 +307,21 @@ public void handle(RMNodeEvent event) {
public static class AddNodeTransition implements
SingleArcTransition<RMNodeImpl, RMNodeEvent> {
private void updateMetrics(RMNodeState nodeState) {
ClusterMetrics metrics = ClusterMetrics.getMetrics();
switch (nodeState) {
case LOST:
metrics.decrNumLostNMs();
break;
case REBOOTED:
metrics.decrNumRebootedNMs();
break;
case DECOMMISSIONED:
metrics.decrDecommisionedNMs();
break;
}
}
@SuppressWarnings("unchecked")
@Override
@ -315,6 +330,13 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeAddedSchedulerEvent(rmNode));
String host = rmNode.nodeId.getHost();
if (rmNode.context.getInactiveRMNodes().containsKey(host)) {
RMNode node = rmNode.context.getInactiveRMNodes().get(host);
rmNode.context.getInactiveRMNodes().remove(host);
updateMetrics(node.getState());
}
ClusterMetrics.getMetrics().addNode();
}
@ -353,7 +375,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Remove the node from the system.
rmNode.context.getRMNodes().remove(rmNode.nodeId);
LOG.info("Removed Node " + rmNode.nodeId);
rmNode.context.getInactiveRMNodes().put(rmNode.nodeId.getHost(), rmNode);
//Update the metrics
ClusterMetrics.getMetrics().removeNode(event.getType());
}

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.Lock;
import org.apache.hadoop.yarn.api.records.ApplicationId;
/**
* {@link ActiveUsersManager} tracks active users in the system.
* A user is deemed to be active if he has any running applications with
* outstanding resource requests.
*
* An active user is defined as someone with outstanding resource requests.
*/
@Private
public class ActiveUsersManager {
private static final Log LOG = LogFactory.getLog(ActiveUsersManager.class);
private final QueueMetrics metrics;
private int activeUsers = 0;
private Map<String, Set<ApplicationId>> usersApplications =
new HashMap<String, Set<ApplicationId>>();
public ActiveUsersManager(QueueMetrics metrics) {
this.metrics = metrics;
}
/**
* An application has new outstanding requests.
*
* @param user application user
* @param applicationId activated application
*/
@Lock({Queue.class, SchedulerApp.class})
synchronized public void activateApplication(
String user, ApplicationId applicationId) {
Set<ApplicationId> userApps = usersApplications.get(user);
if (userApps == null) {
userApps = new HashSet<ApplicationId>();
usersApplications.put(user, userApps);
++activeUsers;
metrics.incrActiveUsers();
LOG.debug("User " + user + " added to activeUsers, currently: " +
activeUsers);
}
if (userApps.add(applicationId)) {
metrics.activateApp(user);
}
}
/**
* An application has no more outstanding requests.
*
* @param user application user
* @param applicationId deactivated application
*/
@Lock({Queue.class, SchedulerApp.class})
synchronized public void deactivateApplication(
String user, ApplicationId applicationId) {
Set<ApplicationId> userApps = usersApplications.get(user);
if (userApps != null) {
if (userApps.remove(applicationId)) {
metrics.deactivateApp(user);
}
if (userApps.isEmpty()) {
usersApplications.remove(user);
--activeUsers;
metrics.decrActiveUsers();
LOG.debug("User " + user + " removed from activeUsers, currently: " +
activeUsers);
}
}
}
/**
* Get number of active users i.e. users with applications which have pending
* resource requests.
* @return number of active users
*/
@Lock({Queue.class, SchedulerApp.class})
synchronized public int getNumActiveUsers() {
return activeUsers;
}
}

View File

@ -36,12 +36,11 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
/**
* This class keeps track of all the consumption of an application. This also
@ -59,27 +58,27 @@ public class AppSchedulingInfo {
final String user;
private final AtomicInteger containerIdCounter = new AtomicInteger(0);
private final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
final Set<Priority> priorities = new TreeSet<Priority>(
new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
final Map<Priority, Map<String, ResourceRequest>> requests =
new HashMap<Priority, Map<String, ResourceRequest>>();
private final ApplicationStore store;
//private final ApplicationStore store;
private final ActiveUsersManager activeUsersManager;
/* Allocated by scheduler */
boolean pending = true; // for app metrics
public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
String user, Queue queue, ApplicationStore store) {
String user, Queue queue, ActiveUsersManager activeUsersManager,
ApplicationStore store) {
this.applicationAttemptId = appAttemptId;
this.applicationId = appAttemptId.getApplicationId();
this.queue = queue;
this.queueName = queue.getQueueName();
this.user = user;
this.store = store;
//this.store = store;
this.activeUsersManager = activeUsersManager;
}
public ApplicationId getApplicationId() {
@ -123,7 +122,8 @@ public int getNewContainerId() {
* @param requests
* resources to be acquired
*/
synchronized public void updateResourceRequests(List<ResourceRequest> requests) {
synchronized public void updateResourceRequests(
List<ResourceRequest> requests) {
QueueMetrics metrics = queue.getMetrics();
// Update resource requests
for (ResourceRequest request : requests) {
@ -138,6 +138,16 @@ synchronized public void updateResourceRequests(List<ResourceRequest> requests)
+ request);
}
updatePendingResources = true;
// Premature optimization?
// Assumes that we won't see more than one priority request updated
// in one call, reasonable assumption... however, it's totally safe
// to activate same application more than once.
// Thus we don't need another loop ala the one in decrementOutstanding()
// which is needed during deactivate.
if (request.getNumContainers() > 0) {
activeUsersManager.activateApplication(user, applicationId);
}
}
Map<String, ResourceRequest> asks = this.requests.get(priority);
@ -246,10 +256,7 @@ synchronized private void allocateNodeLocal(SchedulerNode node, Priority priorit
this.requests.get(priority).remove(node.getRackName());
}
// Do not remove ANY
ResourceRequest offSwitchRequest = requests.get(priority).get(
RMNode.ANY);
offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - 1);
decrementOutstanding(requests.get(priority).get(RMNode.ANY));
}
/**
@ -271,10 +278,7 @@ synchronized private void allocateRackLocal(SchedulerNode node, Priority priorit
this.requests.get(priority).remove(node.getRackName());
}
// Do not remove ANY
ResourceRequest offSwitchRequest = requests.get(priority).get(
RMNode.ANY);
offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - 1);
decrementOutstanding(requests.get(priority).get(RMNode.ANY));
}
/**
@ -291,11 +295,32 @@ synchronized private void allocateOffSwitch(SchedulerNode node, Priority priorit
allocate(container);
// Update future requirements
// Do not remove ANY
offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - 1);
decrementOutstanding(offSwitchRequest);
}
synchronized private void decrementOutstanding(
ResourceRequest offSwitchRequest) {
int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
// Do not remove ANY
offSwitchRequest.setNumContainers(numOffSwitchContainers);
// Do we have any outstanding requests?
// If there is nothing, we need to deactivate this application
if (numOffSwitchContainers == 0) {
boolean deactivate = true;
for (Priority priority : getPriorities()) {
ResourceRequest request = getResourceRequest(priority, RMNodeImpl.ANY);
if (request.getNumContainers() > 0) {
deactivate = false;
break;
}
}
if (deactivate) {
activeUsersManager.deactivateApplication(user, applicationId);
}
}
}
synchronized private void allocate(Container container) {
// Update consumption and track allocations
//TODO: fixme sharad

View File

@ -60,6 +60,8 @@ public class QueueMetrics {
@Metric("# of pending containers") MutableGaugeInt pendingContainers;
@Metric("# of reserved memory in GiB") MutableGaugeInt reservedGB;
@Metric("# of reserved containers") MutableGaugeInt reservedContainers;
@Metric("# of active users") MutableGaugeInt activeUsers;
@Metric("# of active users") MutableGaugeInt activeApplications;
static final Logger LOG = LoggerFactory.getLogger(QueueMetrics.class);
static final int GB = 1024; // resource.memory is in MB
@ -287,6 +289,36 @@ public void unreserveResource(String user, Resource res) {
}
}
public void incrActiveUsers() {
activeUsers.incr();
}
public void decrActiveUsers() {
activeUsers.decr();
}
public void activateApp(String user) {
activeApplications.incr();
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.activateApp(user);
}
if (parent != null) {
parent.activateApp(user);
}
}
public void deactivateApp(String user) {
activeApplications.decr();
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.deactivateApp(user);
}
if (parent != null) {
parent.deactivateApp(user);
}
}
public int getAppsSubmitted() {
return appsSubmitted.value();
}
@ -338,4 +370,12 @@ public int getReservedGB() {
public int getReservedContainers() {
return reservedContainers.value();
}
public int getActiveUsers() {
return activeUsers.value();
}
public int getActiveApps() {
return activeApplications.value();
}
}

View File

@ -102,11 +102,12 @@ public class SchedulerApp {
private final RMContext rmContext;
public SchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue,
String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext, ApplicationStore store) {
this.rmContext = rmContext;
this.appSchedulingInfo =
new AppSchedulingInfo(applicationAttemptId, user, queue, store);
new AppSchedulingInfo(applicationAttemptId, user, queue,
activeUsersManager, store);
this.queue = queue;
}

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@ -197,6 +198,12 @@ public void reinitialize(CSQueue queue, Resource clusterResource)
*/
public void updateClusterResource(Resource clusterResource);
/**
* Get the {@link ActiveUsersManager} for the queue.
* @return the <code>ActiveUsersManager</code> for the queue
*/
public ActiveUsersManager getActiveUsersManager();
/**
* Recover the state of the queue
* @param clusterResource the resource of the cluster

View File

@ -355,7 +355,8 @@ synchronized CSQueue getQueue(String queueName) {
// TODO: Fix store
SchedulerApp SchedulerApp =
new SchedulerApp(applicationAttemptId, user, queue, rmContext, null);
new SchedulerApp(applicationAttemptId, user, queue,
queue.getActiveUsersManager(), rmContext, null);
// Submit to the queue
try {

View File

@ -37,6 +37,8 @@
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.Lock;
import org.apache.hadoop.yarn.Lock.NoLock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -58,6 +60,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
@ -120,6 +123,8 @@ public class LeafQueue implements CSQueue {
private CapacitySchedulerContext scheduler;
private final ActiveUsersManager activeUsersManager;
final static int DEFAULT_AM_RESOURCE = 2 * 1024;
public LeafQueue(CapacitySchedulerContext cs,
@ -132,7 +137,7 @@ public LeafQueue(CapacitySchedulerContext cs,
this.metrics = old != null ? old.getMetrics() :
QueueMetrics.forQueue(getQueuePath(), parent,
cs.getConfiguration().getEnableUserMetrics());
this.activeUsersManager = new ActiveUsersManager(metrics);
this.minimumAllocation = cs.getMinimumResourceCapability();
this.maximumAllocation = cs.getMaximumResourceCapability();
this.minimumAllocationFactor =
@ -348,6 +353,11 @@ public synchronized int getMaximumActiveApplicationsPerUser() {
return maxActiveApplicationsPerUser;
}
@Override
public ActiveUsersManager getActiveUsersManager() {
return activeUsersManager;
}
@Override
public synchronized float getUsedCapacity() {
return usedCapacity;
@ -674,6 +684,12 @@ public synchronized void removeApplication(SchedulerApp application, User user)
// Check if we can activate more applications
activateApplications();
// Inform the activeUsersManager
synchronized (application) {
activeUsersManager.deactivateApplication(
application.getUser(), application.getApplicationId());
}
LOG.info("Application removed -" +
" appId: " + application.getApplicationId() +
" user: " + application.getUser() +
@ -837,6 +853,7 @@ private synchronized boolean assignToQueue(Resource clusterResource,
return true;
}
@Lock({LeafQueue.class, SchedulerApp.class})
private Resource computeAndSetUserResourceLimit(SchedulerApp application,
Resource clusterResource, Resource required) {
String user = application.getUser();
@ -853,6 +870,7 @@ private int roundUp(int memory) {
minimumAllocation.getMemory();
}
@Lock(NoLock.class)
private Resource computeUserLimit(SchedulerApp application,
Resource clusterResource, Resource required) {
// What is our current capacity?
@ -877,11 +895,8 @@ private Resource computeUserLimit(SchedulerApp application,
// queue's configured capacity * user-limit-factor.
// Also, the queue's configured capacity should be higher than
// queue-hard-limit * ulMin
String userName = application.getUser();
final int activeUsers = users.size();
User user = getUser(userName);
final int activeUsers = activeUsersManager.getNumActiveUsers();
int limit =
roundUp(
@ -893,12 +908,13 @@ private Resource computeUserLimit(SchedulerApp application,
);
if (LOG.isDebugEnabled()) {
String userName = application.getUser();
LOG.debug("User limit computation for " + userName +
" in queue " + getQueueName() +
" userLimit=" + userLimit +
" userLimitFactor=" + userLimitFactor +
" required: " + required +
" consumed: " + user.getConsumedResources() +
" consumed: " + getUser(userName).getConsumedResources() +
" limit: " + limit +
" queueCapacity: " + queueCapacity +
" qconsumed: " + consumed +
@ -1308,8 +1324,10 @@ public synchronized void updateClusterResource(Resource clusterResource) {
// Update application properties
for (SchedulerApp application : activeApplications) {
computeAndSetUserResourceLimit(
application, clusterResource, Resources.none());
synchronized (application) {
computeAndSetUserResourceLimit(
application, clusterResource, Resources.none());
}
}
}

View File

@ -48,6 +48,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
@ -240,6 +241,12 @@ public float getMaximumCapacity() {
return maximumCapacity;
}
@Override
public ActiveUsersManager getActiveUsersManager() {
// Should never be called since all applications are submitted to LeafQueues
return null;
}
@Override
public synchronized float getUsedCapacity() {
return usedCapacity;

View File

@ -66,6 +66,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
@ -124,10 +125,11 @@ public class FifoScheduler implements ResourceScheduler {
private Map<ApplicationAttemptId, SchedulerApp> applications
= new TreeMap<ApplicationAttemptId, SchedulerApp>();
private final ActiveUsersManager activeUsersManager;
private static final String DEFAULT_QUEUE_NAME = "default";
private final QueueMetrics metrics =
QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false);
private final QueueMetrics metrics;
private final Queue DEFAULT_QUEUE = new Queue() {
@Override
@ -174,6 +176,11 @@ public List<QueueUserACLInfo> getQueueUserAclInfo(
}
};
public FifoScheduler() {
metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false);
activeUsersManager = new ActiveUsersManager(metrics);
}
@Override
public Resource getMinimumResourceCapability() {
return minimumAllocation;
@ -288,7 +295,7 @@ private synchronized void addApplication(ApplicationAttemptId appAttemptId,
String user) {
// TODO: Fix store
SchedulerApp schedulerApp =
new SchedulerApp(appAttemptId, user, DEFAULT_QUEUE,
new SchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager,
this.rmContext, null);
applications.put(appAttemptId, schedulerApp);
metrics.submitApp(user);
@ -318,6 +325,12 @@ private synchronized void doneApplication(
RMContainerEventType.KILL);
}
// Inform the activeUsersManager
synchronized (application) {
activeUsersManager.deactivateApplication(
application.getUser(), application.getApplicationId());
}
// Clean up pending requests, metrics etc.
application.stop(rmAppAttemptFinalState);

View File

@ -68,7 +68,7 @@ protected void render(Block html) {
th().$class("ui-state-default")._("Memory Used")._().
th().$class("ui-state-default")._("Memory Total")._().
th().$class("ui-state-default")._("Memory Reserved")._().
th().$class("ui-state-default")._("Total Nodes")._().
th().$class("ui-state-default")._("Active Nodes")._().
th().$class("ui-state-default")._("Decommissioned Nodes")._().
th().$class("ui-state-default")._("Lost Nodes")._().
th().$class("ui-state-default")._("Unhealthy Nodes")._().
@ -82,7 +82,7 @@ protected void render(Block html) {
td(StringUtils.byteDesc(clusterMetrics.getAllocatedMB() * BYTES_IN_MB)).
td(StringUtils.byteDesc(clusterMetrics.getTotalMB() * BYTES_IN_MB)).
td(StringUtils.byteDesc(clusterMetrics.getReservedMB() * BYTES_IN_MB)).
td().a(url("nodes"),String.valueOf(clusterMetrics.getTotalNodes()))._().
td().a(url("nodes"),String.valueOf(clusterMetrics.getActiveNodes()))._().
td().a(url("nodes/decommissioned"),String.valueOf(clusterMetrics.getDecommissionedNodes()))._().
td().a(url("nodes/lost"),String.valueOf(clusterMetrics.getLostNodes()))._().
td().a(url("nodes/unhealthy"),String.valueOf(clusterMetrics.getUnhealthyNodes()))._().

View File

@ -24,6 +24,8 @@
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit;
import java.util.Collection;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
@ -36,6 +38,7 @@
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import com.google.inject.Inject;
@ -79,7 +82,19 @@ protected void render(Block html) {
if(type != null && !type.isEmpty()) {
stateFilter = RMNodeState.valueOf(type.toUpperCase());
}
for (RMNode ni : this.rmContext.getRMNodes().values()) {
Collection<RMNode> rmNodes = this.rmContext.getRMNodes().values();
boolean isInactive = false;
if (stateFilter != null) {
switch (stateFilter) {
case DECOMMISSIONED:
case LOST:
case REBOOTED:
rmNodes = this.rmContext.getInactiveRMNodes().values();
isInactive = true;
break;
}
}
for (RMNode ni : rmNodes) {
if(stateFilter != null) {
RMNodeState state = ni.getState();
if(!stateFilter.equals(state)) {
@ -89,12 +104,17 @@ protected void render(Block html) {
NodeInfo info = new NodeInfo(ni, sched);
int usedMemory = (int)info.getUsedMemory();
int availableMemory = (int)info.getAvailableMemory();
tbody.tr().
TR<TBODY<TABLE<Hamlet>>> row = tbody.tr().
td(info.getRack()).
td(info.getState()).
td(info.getNodeId()).
td().a("http://" + info.getNodeHTTPAddress(), info.getNodeHTTPAddress())._().
td(info.getHealthStatus()).
td(info.getNodeId());
if (isInactive) {
row.td()._("N/A")._();
} else {
String httpAddress = info.getNodeHTTPAddress();
row.td().a("http://" + httpAddress, httpAddress)._();
}
row.td(info.getHealthStatus()).
td(Times.format(info.getLastHealthUpdate())).
td(info.getHealthReport()).
td(String.valueOf(info.getNumContainers())).

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.ConcurrentMap;
import javax.servlet.http.HttpServletRequest;
@ -68,6 +69,7 @@
@Singleton
@Path("/ws/v1/cluster")
public class RMWebServices {
private static final String EMPTY = "";
private static final Log LOG = LogFactory.getLog(RMWebServices.class);
private final ResourceManager rm;
private static RecordFactory recordFactory = RecordFactoryProvider
@ -144,12 +146,23 @@ public NodesInfo getNodes(@QueryParam("state") String filterState,
if (sched == null) {
throw new NotFoundException("Null ResourceScheduler instance");
}
Collection<RMNode> rmNodes = this.rm.getRMContext().getRMNodes().values();
boolean isInactive = false;
if (filterState != null && !filterState.isEmpty()) {
RMNodeState nodeState = RMNodeState.valueOf(filterState.toUpperCase());
switch (nodeState) {
case DECOMMISSIONED:
case LOST:
case REBOOTED:
rmNodes = this.rm.getRMContext().getInactiveRMNodes().values();
isInactive = true;
break;
}
}
NodesInfo allNodes = new NodesInfo();
for (RMNode ni : this.rm.getRMContext().getRMNodes().values()) {
for (RMNode ni : rmNodes) {
NodeInfo nodeInfo = new NodeInfo(ni, sched);
if (filterState != null) {
RMNodeState.valueOf(filterState);
if (!(nodeInfo.getState().equalsIgnoreCase(filterState))) {
continue;
}
@ -165,6 +178,9 @@ public NodesInfo getNodes(@QueryParam("state") String filterState,
continue;
}
}
if (isInactive) {
nodeInfo.setNodeHTTPAddress(EMPTY);
}
allNodes.add(nodeInfo);
}
return allNodes;
@ -183,10 +199,19 @@ public NodeInfo getNode(@PathParam("nodeId") String nodeId) {
}
NodeId nid = ConverterUtils.toNodeId(nodeId);
RMNode ni = this.rm.getRMContext().getRMNodes().get(nid);
boolean isInactive = false;
if (ni == null) {
throw new NotFoundException("nodeId, " + nodeId + ", is not found");
ni = this.rm.getRMContext().getInactiveRMNodes().get(nid.getHost());
if (ni == null) {
throw new NotFoundException("nodeId, " + nodeId + ", is not found");
}
isInactive = true;
}
return new NodeInfo(ni, sched);
NodeInfo nodeInfo = new NodeInfo(ni, sched);
if (isInactive) {
nodeInfo.setNodeHTTPAddress(EMPTY);
}
return nodeInfo;
}
@GET

View File

@ -44,6 +44,7 @@ public class ClusterMetricsInfo {
protected int unhealthyNodes;
protected int decommissionedNodes;
protected int rebootedNodes;
protected int activeNodes;
public ClusterMetricsInfo() {
} // JAXB needs this
@ -59,12 +60,13 @@ public ClusterMetricsInfo(final ResourceManager rm, final RMContext rmContext) {
this.allocatedMB = metrics.getAllocatedGB() * MB_IN_GB;
this.containersAllocated = metrics.getAllocatedContainers();
this.totalMB = availableMB + reservedMB + allocatedMB;
this.totalNodes = clusterMetrics.getNumNMs();
this.activeNodes = clusterMetrics.getNumActiveNMs();
this.lostNodes = clusterMetrics.getNumLostNMs();
this.unhealthyNodes = clusterMetrics.getUnhealthyNMs();
this.decommissionedNodes = clusterMetrics.getNumDecommisionedNMs();
this.rebootedNodes = clusterMetrics.getNumRebootedNMs();
this.totalNodes = activeNodes + lostNodes + decommissionedNodes
+ rebootedNodes;
}
public int getAppsSubmitted() {
@ -94,6 +96,10 @@ public long getTotalMB() {
public int getTotalNodes() {
return this.totalNodes;
}
public int getActiveNodes() {
return this.activeNodes;
}
public int getLostNodes() {
return this.lostNodes;

View File

@ -94,6 +94,10 @@ public String getNodeId() {
public String getNodeHTTPAddress() {
return this.nodeHTTPAddress;
}
public void setNodeHTTPAddress(String nodeHTTPAddress) {
this.nodeHTTPAddress = nodeHTTPAddress;
}
public String getHealthStatus() {
return this.healthStatus;

View File

@ -81,13 +81,20 @@ public NodeId registerNode() throws Exception {
}
public HeartbeatResponse nodeHeartbeat(boolean b) throws Exception {
return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(), b);
return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(),
b, ++responseId);
}
public HeartbeatResponse nodeHeartbeat(Map<ApplicationId,
List<ContainerStatus>> conts, boolean isHealthy) throws Exception {
return nodeHeartbeat(conts, isHealthy, ++responseId);
}
public HeartbeatResponse nodeHeartbeat(Map<ApplicationId,
List<ContainerStatus>> conts, boolean isHealthy, int resId) throws Exception {
NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
NodeStatus status = Records.newRecord(NodeStatus.class);
status.setResponseId(resId);
status.setNodeId(nodeId);
for (Map.Entry<ApplicationId, List<ContainerStatus>> entry : conts.entrySet()) {
status.setContainersStatuses(entry.getValue());
@ -97,7 +104,6 @@ public HeartbeatResponse nodeHeartbeat(Map<ApplicationId,
healthStatus.setIsNodeHealthy(isHealthy);
healthStatus.setLastHealthReportTime(1);
status.setNodeHealthStatus(healthStatus);
status.setResponseId(++responseId);
req.setNodeStatus(status);
return resourceTracker.nodeHeartbeat(req).getHeartbeatResponse();
}

View File

@ -56,6 +56,17 @@ public static List<RMNode> newNodes(int racks, int nodesPerRack,
}
return list;
}
public static List<RMNode> lostNodes(int racks, int nodesPerRack,
Resource perNode) {
List<RMNode> list = Lists.newArrayList();
for (int i = 0; i < racks; ++i) {
for (int j = 0; j < nodesPerRack; ++j) {
list.add(lostNodeInfo(i, perNode, RMNodeState.LOST));
}
}
return list;
}
public static NodeId newNodeID(String host, int port) {
NodeId nid = recordFactory.newRecordInstance(NodeId.class);
@ -82,92 +93,120 @@ public static Resource newAvailResource(Resource total, Resource used) {
return rs;
}
public static RMNode newNodeInfo(int rack, final Resource perNode) {
private static class MockRMNodeImpl implements RMNode {
private NodeId nodeId;
private String hostName;
private String nodeAddr;
private String httpAddress;
private int cmdPort;
private Resource perNode;
private String rackName;
private NodeHealthStatus nodeHealthStatus;
private RMNodeState state;
public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
Resource perNode, String rackName, NodeHealthStatus nodeHealthStatus,
int cmdPort, String hostName, RMNodeState state) {
this.nodeId = nodeId;
this.nodeAddr = nodeAddr;
this.httpAddress = httpAddress;
this.perNode = perNode;
this.rackName = rackName;
this.nodeHealthStatus = nodeHealthStatus;
this.cmdPort = cmdPort;
this.hostName = hostName;
this.state = state;
}
@Override
public NodeId getNodeID() {
return this.nodeId;
}
@Override
public String getHostName() {
return this.hostName;
}
@Override
public int getCommandPort() {
return this.cmdPort;
}
@Override
public int getHttpPort() {
return 0;
}
@Override
public String getNodeAddress() {
return this.nodeAddr;
}
@Override
public String getHttpAddress() {
return this.httpAddress;
}
@Override
public NodeHealthStatus getNodeHealthStatus() {
return this.nodeHealthStatus;
}
@Override
public Resource getTotalCapability() {
return this.perNode;
}
@Override
public String getRackName() {
return this.rackName;
}
@Override
public Node getNode() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public RMNodeState getState() {
return this.state;
}
@Override
public List<ContainerId> getContainersToCleanUp() {
return null;
}
@Override
public List<ApplicationId> getAppsToCleanup() {
return null;
}
@Override
public HeartbeatResponse getLastHeartBeatResponse() {
return null;
}
};
private static RMNode buildRMNode(int rack, final Resource perNode, RMNodeState state, String httpAddr) {
final String rackName = "rack"+ rack;
final int nid = NODE_ID++;
final String hostName = "host"+ nid;
final int port = 123;
final NodeId nodeID = newNodeID(hostName, port);
final String httpAddress = "localhost:0";
final String httpAddress = httpAddr;
final NodeHealthStatus nodeHealthStatus =
recordFactory.newRecordInstance(NodeHealthStatus.class);
final Resource used = newUsedResource(perNode);
final Resource avail = newAvailResource(perNode, used);
return new RMNode() {
@Override
public NodeId getNodeID() {
return nodeID;
}
return new MockRMNodeImpl(nodeID, hostName, httpAddress, perNode, rackName,
nodeHealthStatus, nid, hostName, state);
}
@Override
public String getNodeAddress() {
return hostName;
}
public static RMNode lostNodeInfo(int rack, final Resource perNode, RMNodeState state) {
return buildRMNode(rack, perNode, state, "N/A");
}
@Override
public String getHttpAddress() {
return httpAddress;
}
@Override
public Resource getTotalCapability() {
return perNode;
}
@Override
public String getRackName() {
return rackName;
}
@Override
public Node getNode() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public NodeHealthStatus getNodeHealthStatus() {
return nodeHealthStatus;
}
@Override
public int getCommandPort() {
return nid;
}
@Override
public int getHttpPort() {
// TODO Auto-generated method stub
return 0;
}
@Override
public String getHostName() {
return hostName;
}
@Override
public RMNodeState getState() {
// TODO Auto-generated method stub
return null;
}
@Override
public List<ApplicationId> getAppsToCleanup() {
// TODO Auto-generated method stub
return null;
}
@Override
public List<ContainerId> getContainersToCleanUp() {
// TODO Auto-generated method stub
return null;
}
@Override
public HeartbeatResponse getLastHeartBeatResponse() {
// TODO Auto-generated method stub
return null;
}
};
public static RMNode newNodeInfo(int rack, final Resource perNode) {
return buildRMNode(rack, perNode, null, "localhost:0");
}
}

View File

@ -130,6 +130,12 @@ public void sendNodeStarted(MockNM nm) throws Exception {
nm.getNodeId());
node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.STARTED));
}
public void sendNodeLost(MockNM nm) throws Exception {
RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
nm.getNodeId());
node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.EXPIRE));
}
public void NMwaitForState(NodeId nodeid, RMNodeState finalState)
throws Exception {

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
@ -100,8 +101,8 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
rmDispatcher.register(SchedulerEventType.class,
new TestSchedulerEventDispatcher());
node = new RMNodeImpl(null, rmContext, null, 0, 0, null, null);
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null);
}

View File

@ -157,14 +157,14 @@ public void testReboot() throws Exception {
rm.start();
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = new MockNM("host2:1234", 2048, rm.getResourceTrackerService());
MockNM nm2 = rm.registerNode("host2:1234", 2048);
int initialMetricCount = ClusterMetrics.getMetrics().getNumRebootedNMs();
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm2.nodeHeartbeat(
new HashMap<ApplicationId, List<ContainerStatus>>(), true);
new HashMap<ApplicationId, List<ContainerStatus>>(), true, -100);
Assert.assertTrue(NodeAction.REBOOT.equals(nodeHeartbeat.getNodeAction()));
checkRebootedNMCount(rm, ++initialMetricCount);
}

View File

@ -302,7 +302,8 @@ public void testHeadroom() throws Exception {
final ApplicationAttemptId appAttemptId_0_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0_0 =
spy(new SchedulerApp(appAttemptId_0_0, user_0, queue, rmContext, null));
spy(new SchedulerApp(appAttemptId_0_0, user_0, queue,
queue.getActiveUsersManager(), rmContext, null));
queue.submitApplication(app_0_0, user_0, A);
List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
@ -320,7 +321,8 @@ public void testHeadroom() throws Exception {
final ApplicationAttemptId appAttemptId_0_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_0_1 =
spy(new SchedulerApp(appAttemptId_0_1, user_0, queue, rmContext, null));
spy(new SchedulerApp(appAttemptId_0_1, user_0, queue,
queue.getActiveUsersManager(), rmContext, null));
queue.submitApplication(app_0_1, user_0, A);
List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
@ -338,7 +340,8 @@ public void testHeadroom() throws Exception {
final ApplicationAttemptId appAttemptId_1_0 =
TestUtils.getMockApplicationAttemptId(2, 0);
SchedulerApp app_1_0 =
spy(new SchedulerApp(appAttemptId_1_0, user_1, queue, rmContext, null));
spy(new SchedulerApp(appAttemptId_1_0, user_1, queue,
queue.getActiveUsersManager(), rmContext, null));
queue.submitApplication(app_1_0, user_1, A);
List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();

View File

@ -18,8 +18,18 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
@ -28,9 +38,6 @@
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
@ -48,19 +55,17 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestLeafQueue {
private static final Log LOG = LogFactory.getLog(TestLeafQueue.class);
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@ -136,7 +141,6 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
final String Q_C1 = Q_C + "." + C1;
conf.setCapacity(Q_C1, 100);
LOG.info("Setup top-level queues a and b");
}
static LeafQueue stubLeafQueue(LeafQueue queue) {
@ -217,13 +221,15 @@ public void testSingleQueueOneUserMetrics() throws Exception {
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null);
new SchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_0, user_0, B);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_0, a, rmContext, null);
new SchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_1, user_0, B); // same user
@ -264,13 +270,15 @@ public void testSingleQueueWithOneUser() throws Exception {
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null);
new SchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_0, a, rmContext, null);
new SchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_1, user_0, A); // same user
@ -371,6 +379,99 @@ public void testSingleQueueWithOneUser() throws Exception {
assertEquals(1, a.getMetrics().getAvailableGB());
}
@Test
public void testUserLimits() throws Exception {
// Mock the queue
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
//unset maxCapacity
a.setMaxCapacity(1.0f);
// Users
final String user_0 = "user_0";
final String user_1 = "user_1";
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_1, user_0, A); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
SchedulerApp app_2 =
new SchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_2, user_1, A);
// Setup some nodes
String host_0 = "host_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
String host_1 = "host_1";
SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
final int numNodes = 2;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 2*GB, 1, priority,
recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
recordFactory)));
/**
* Start testing...
*/
// Set user-limit
a.setUserLimit(50);
a.setUserLimitFactor(2);
// Now, only user_0 should be active since he is the only one with
// outstanding requests
assertEquals("There should only be 1 active user!",
1, a.getActiveUsersManager().getNumActiveUsers());
// This commented code is key to test 'activeUsers'.
// It should fail the test if uncommented since
// it would increase 'activeUsers' to 2 and stop user_2
// Pre MAPREDUCE-3732 this test should fail without this block too
// app_2.updateResourceRequests(Collections.singletonList(
// TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, priority,
// recordFactory)));
// 1 container to user_0
a.assignContainers(clusterResource, node_0);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Again one to user_0 since he hasn't exceeded user limit yet
a.assignContainers(clusterResource, node_0);
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
// One more to user_0 since he is the only active user
a.assignContainers(clusterResource, node_1);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
}
@Test
public void testSingleQueueWithMultipleUsers() throws Exception {
@ -388,15 +489,31 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null);
new SchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_0, a, rmContext, null);
new SchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_1, user_0, A); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
SchedulerApp app_2 =
new SchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_2, user_1, A);
final ApplicationAttemptId appAttemptId_3 =
TestUtils.getMockApplicationAttemptId(3, 0);
SchedulerApp app_3 =
new SchedulerApp(appAttemptId_3, user_2, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_3, user_2, A);
// Setup some nodes
String host_0 = "host_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
@ -438,19 +555,8 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Submit more apps
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
SchedulerApp app_2 =
new SchedulerApp(appAttemptId_2, user_1, a, rmContext, null);
a.submitApplication(app_2, user_1, A);
final ApplicationAttemptId appAttemptId_3 =
TestUtils.getMockApplicationAttemptId(3, 0);
SchedulerApp app_3 =
new SchedulerApp(appAttemptId_3, user_2, a, rmContext, null);
a.submitApplication(app_3, user_2, A);
// Submit resource requests for other apps now to 'activate' them
app_2.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 3*GB, 1, priority,
@ -558,13 +664,15 @@ public void testReservation() throws Exception {
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null);
new SchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_1, a, rmContext, null);
new SchedulerApp(appAttemptId_1, user_1, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_1, user_1, A);
// Setup some nodes
@ -657,13 +765,15 @@ public void testReservationExchange() throws Exception {
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null);
new SchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_1, a, rmContext, null);
new SchedulerApp(appAttemptId_1, user_1, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_1, user_1, A);
// Setup some nodes
@ -770,7 +880,8 @@ public void testLocalityScheduling() throws Exception {
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
spy(new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null));
spy(new SchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null));
a.submitApplication(app_0, user_0, A);
// Setup some nodes and racks
@ -899,7 +1010,8 @@ public void testApplicationPriorityScheduling() throws Exception {
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
spy(new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null));
spy(new SchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null));
a.submitApplication(app_0, user_0, A);
// Setup some nodes and racks
@ -1028,7 +1140,8 @@ public void testSchedulingConstraints() throws Exception {
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
spy(new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null));
spy(new SchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null));
a.submitApplication(app_0, user_0, A);
// Setup some nodes and racks

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodesPage.NodesBlock;
import org.apache.hadoop.yarn.webapp.test.WebAppTests;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@ -36,39 +37,65 @@
* data for all the columns in the table as specified in the header.
*/
public class TestNodesPage {
final int numberOfRacks = 2;
final int numberOfNodesPerRack = 2;
// Number of Actual Table Headers for NodesPage.NodesBlock might change in
// future. In that case this value should be adjusted to the new value.
final int numberOfThInMetricsTable = 10;
final int numberOfActualTableHeaders = 10;
@Test
public void testNodesBlockRender() throws Exception {
final int numberOfRacks = 2;
final int numberOfNodesPerRack = 2;
// Number of Actual Table Headers for NodesPage.NodesBlock might change in
// future. In that case this value should be adjusted to the new value.
final int numberOfThInMetricsTable = 10;
final int numberOfActualTableHeaders = 10;
Injector injector = WebAppTests.createMockInjector(RMContext.class,
TestRMWebApp.mockRMContext(3, numberOfRacks, numberOfNodesPerRack, 8*TestRMWebApp.GiB),
new Module() {
private Injector injector;
@Before
public void setUp() throws Exception {
injector = WebAppTests.createMockInjector(RMContext.class, TestRMWebApp
.mockRMContext(3, numberOfRacks, numberOfNodesPerRack,
8 * TestRMWebApp.GiB), new Module() {
@Override
public void configure(Binder binder) {
try {
binder.bind(ResourceManager.class).toInstance(TestRMWebApp.mockRm(3,
numberOfRacks, numberOfNodesPerRack, 8*TestRMWebApp.GiB));
binder.bind(ResourceManager.class).toInstance(
TestRMWebApp.mockRm(3, numberOfRacks, numberOfNodesPerRack,
8 * TestRMWebApp.GiB));
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
});
}
@Test
public void testNodesBlockRender() throws Exception {
injector.getInstance(NodesBlock.class).render();
PrintWriter writer = injector.getInstance(PrintWriter.class);
WebAppTests.flushOutput(injector);
Mockito.verify(writer, Mockito.times(numberOfActualTableHeaders +
numberOfThInMetricsTable)).print(
"<th");
Mockito.verify(writer,
Mockito.times(numberOfActualTableHeaders + numberOfThInMetricsTable))
.print("<th");
Mockito.verify(
writer,
Mockito.times(numberOfRacks * numberOfNodesPerRack
* numberOfActualTableHeaders + numberOfThInMetricsTable)).print("<td");
* numberOfActualTableHeaders + numberOfThInMetricsTable)).print(
"<td");
}
@Test
public void testNodesBlockRenderForLostNodes() {
NodesBlock nodesBlock = injector.getInstance(NodesBlock.class);
nodesBlock.set("node.state", "lost");
nodesBlock.render();
PrintWriter writer = injector.getInstance(PrintWriter.class);
WebAppTests.flushOutput(injector);
Mockito.verify(writer,
Mockito.times(numberOfActualTableHeaders + numberOfThInMetricsTable))
.print("<th");
Mockito.verify(
writer,
Mockito.times(numberOfRacks * numberOfNodesPerRack
* numberOfActualTableHeaders + numberOfThInMetricsTable)).print(
"<td");
}
}

View File

@ -120,12 +120,23 @@ public static RMContext mockRMContext(int numApps, int racks, int numNodes,
for (RMNode node : nodes) {
nodesMap.put(node.getNodeID(), node);
}
final List<RMNode> lostNodes = MockNodes.lostNodes(racks, numNodes,
newResource(mbsPerNode));
final ConcurrentMap<String, RMNode> lostNodesMap = Maps.newConcurrentMap();
for (RMNode node : lostNodes) {
lostNodesMap.put(node.getHostName(), node);
}
return new RMContextImpl(new MemStore(), null, null, null, null) {
@Override
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
return applicationsMaps;
}
@Override
public ConcurrentMap<String, RMNode> getInactiveRMNodes() {
return lostNodesMap;
}
@Override
public ConcurrentMap<NodeId, RMNode> getRMNodes() {
return nodesMap;
}

View File

@ -370,7 +370,8 @@ public void verifyClusterMetricsXML(String xml) throws JSONException,
WebServicesTestUtils.getXmlInt(element, "lostNodes"),
WebServicesTestUtils.getXmlInt(element, "unhealthyNodes"),
WebServicesTestUtils.getXmlInt(element, "decommissionedNodes"),
WebServicesTestUtils.getXmlInt(element, "rebootedNodes"));
WebServicesTestUtils.getXmlInt(element, "rebootedNodes"),
WebServicesTestUtils.getXmlInt(element, "activeNodes"));
}
}
@ -378,7 +379,7 @@ public void verifyClusterMetricsJSON(JSONObject json) throws JSONException,
Exception {
assertEquals("incorrect number of elements", 1, json.length());
JSONObject clusterinfo = json.getJSONObject("clusterMetrics");
assertEquals("incorrect number of elements", 11, clusterinfo.length());
assertEquals("incorrect number of elements", 12, clusterinfo.length());
verifyClusterMetrics(clusterinfo.getInt("appsSubmitted"),
clusterinfo.getInt("reservedMB"), clusterinfo.getInt("availableMB"),
clusterinfo.getInt("allocatedMB"),
@ -386,13 +387,13 @@ public void verifyClusterMetricsJSON(JSONObject json) throws JSONException,
clusterinfo.getInt("totalMB"), clusterinfo.getInt("totalNodes"),
clusterinfo.getInt("lostNodes"), clusterinfo.getInt("unhealthyNodes"),
clusterinfo.getInt("decommissionedNodes"),
clusterinfo.getInt("rebootedNodes"));
clusterinfo.getInt("rebootedNodes"),clusterinfo.getInt("activeNodes"));
}
public void verifyClusterMetrics(int sub, int reservedMB, int availableMB,
int allocMB, int containersAlloc, int totalMB, int totalNodes,
int lostNodes, int unhealthyNodes, int decommissionedNodes,
int rebootedNodes) throws JSONException, Exception {
int rebootedNodes, int activeNodes) throws JSONException, Exception {
ResourceScheduler rs = rm.getResourceScheduler();
QueueMetrics metrics = rs.getRootQueueMetrics();
@ -412,8 +413,11 @@ public void verifyClusterMetrics(int sub, int reservedMB, int availableMB,
* MB_IN_GB, allocMB);
assertEquals("containersAllocated doesn't match", 0, containersAlloc);
assertEquals("totalMB doesn't match", totalMBExpect, totalMB);
assertEquals("totalNodes doesn't match", clusterMetrics.getNumNMs(),
totalNodes);
assertEquals(
"totalNodes doesn't match",
clusterMetrics.getNumActiveNMs() + clusterMetrics.getNumLostNMs()
+ clusterMetrics.getNumDecommisionedNMs()
+ clusterMetrics.getNumRebootedNMs(), totalNodes);
assertEquals("lostNodes doesn't match", clusterMetrics.getNumLostNMs(),
lostNodes);
assertEquals("unhealthyNodes doesn't match",
@ -422,6 +426,8 @@ public void verifyClusterMetrics(int sub, int reservedMB, int availableMB,
clusterMetrics.getNumDecommisionedNMs(), decommissionedNodes);
assertEquals("rebootedNodes doesn't match",
clusterMetrics.getNumRebootedNMs(), rebootedNodes);
assertEquals("activeNodes doesn't match", clusterMetrics.getNumActiveNMs(),
activeNodes);
}
@Test

View File

@ -202,6 +202,69 @@ public void testNodesQueryStateInvalid() throws JSONException, Exception {
rm.stop();
}
}
@Test
public void testNodesQueryStateLost() throws JSONException, Exception {
WebResource r = resource();
MockNM nm1 = rm.registerNode("h1:1234", 5120);
MockNM nm2 = rm.registerNode("h2:1234", 5120);
rm.sendNodeStarted(nm1);
rm.sendNodeStarted(nm2);
rm.NMwaitForState(nm1.getNodeId(), RMNodeState.RUNNING);
rm.NMwaitForState(nm2.getNodeId(), RMNodeState.RUNNING);
rm.sendNodeLost(nm1);
rm.sendNodeLost(nm2);
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("nodes").queryParam("state", RMNodeState.LOST.toString())
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
JSONObject nodes = json.getJSONObject("nodes");
assertEquals("incorrect number of elements", 1, nodes.length());
JSONArray nodeArray = nodes.getJSONArray("node");
assertEquals("incorrect number of elements", 2, nodeArray.length());
for (int i = 0; i < nodeArray.length(); ++i) {
JSONObject info = nodeArray.getJSONObject(i);
String host = info.get("id").toString().split(":")[0];
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(host);
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
info.getString("nodeHTTPAddress"));
WebServicesTestUtils.checkStringMatch("state", rmNode.getState()
.toString(), info.getString("state"));
}
}
@Test
public void testSingleNodeQueryStateLost() throws JSONException, Exception {
WebResource r = resource();
MockNM nm1 = rm.registerNode("h1:1234", 5120);
MockNM nm2 = rm.registerNode("h2:1234", 5120);
rm.sendNodeStarted(nm1);
rm.sendNodeStarted(nm2);
rm.NMwaitForState(nm1.getNodeId(), RMNodeState.RUNNING);
rm.NMwaitForState(nm2.getNodeId(), RMNodeState.RUNNING);
rm.sendNodeLost(nm1);
rm.sendNodeLost(nm2);
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("nodes").path("h2:1234").accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
JSONObject info = json.getJSONObject("node");
String id = info.get("id").toString();
assertEquals("Incorrect Node Information.", "h2:1234", id);
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get("h2");
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
info.getString("nodeHTTPAddress"));
WebServicesTestUtils.checkStringMatch("state",
rmNode.getState().toString(), info.getString("state"));
}
@Test
public void testNodesQueryHealthy() throws JSONException, Exception {

View File

@ -149,10 +149,15 @@ void finalize(JobFactory factory, String inputPath, long dataSize,
throws IOException {
numJobsInInputTrace = factory.numJobsInTrace;
endTime = System.currentTimeMillis();
Path inputTracePath = new Path(inputPath);
FileSystem fs = inputTracePath.getFileSystem(conf);
inputTraceLocation = fs.makeQualified(inputTracePath).toString();
inputTraceSignature = getTraceSignature(inputTraceLocation);
if ("-".equals(inputPath)) {
inputTraceLocation = Summarizer.NA;
inputTraceSignature = Summarizer.NA;
} else {
Path inputTracePath = new Path(inputPath);
FileSystem fs = inputTracePath.getFileSystem(conf);
inputTraceLocation = fs.makeQualified(inputTracePath).toString();
inputTraceSignature = getTraceSignature(inputPath);
}
jobSubmissionPolicy = Gridmix.getJobSubmissionPolicy(conf).name();
resolver = userResolver.getClass().getName();
if (dataSize > 0) {

View File

@ -314,9 +314,13 @@ public Integer run() throws Exception {
}
});
// print the run summary
System.out.print("\n\n");
System.out.println(summarizer.toString());
// print the gridmix summary if the run was successful
if (val == 0) {
// print the run summary
System.out.print("\n\n");
System.out.println(summarizer.toString());
}
return val;
}

View File

@ -101,13 +101,15 @@ public void addJobStats(Job job, JobStory jobdesc) {
}
int maps = 0;
int reds = 0;
if (jobdesc == null) {
throw new IllegalArgumentException(
" JobStory not available for job " + job.getJobName());
} else {
maps = jobdesc.getNumberMaps();
reds = jobdesc.getNumberReduces();
}
JobStats stats = new JobStats(maps,job);
JobStats stats = new JobStats(maps, reds, job);
jobMaps.put(seq,stats);
}
@ -258,15 +260,20 @@ public void abort() {
*/
static class JobStats {
private int noOfMaps;
private int noOfReds;
private Job job;
public JobStats(int noOfMaps,Job job){
public JobStats(int noOfMaps,int numOfReds, Job job){
this.job = job;
this.noOfMaps = noOfMaps;
this.noOfReds = numOfReds;
}
public int getNoOfMaps() {
return noOfMaps;
}
public int getNoOfReds() {
return noOfReds;
}
/**
* Returns the job ,

View File

@ -31,13 +31,12 @@
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.atomic.AtomicBoolean;
public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
public static final Log LOG = LogFactory.getLog(StressJobFactory.class);
private final LoadStatus loadStatus = new LoadStatus();
private final Condition condUnderloaded = this.lock.newCondition();
/**
* The minimum ratio between pending+running map tasks (aka. incomplete map
* tasks) and cluster map slot capacity for us to consider the cluster is
@ -150,23 +149,32 @@ public void run() {
}
LOG.info("START STRESS @ " + System.currentTimeMillis());
while (!Thread.currentThread().isInterrupted()) {
lock.lock();
try {
while (loadStatus.overloaded()) {
//Wait while JT is overloaded.
if (LOG.isDebugEnabled()) {
LOG.debug("Cluster overloaded in run! Sleeping...");
}
// sleep
try {
condUnderloaded.await();
Thread.sleep(1000);
} catch (InterruptedException ie) {
return;
}
}
while (!loadStatus.overloaded()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cluster underloaded in run! Stressing...");
}
try {
//TODO This in-line read can block submission for large jobs.
final JobStory job = getNextJobFiltered();
if (null == job) {
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Job Selected: " + job.getJobID());
}
submitter.add(
jobCreator.createGridmixJob(
conf, 0L, job, scratch,
@ -175,14 +183,20 @@ public void run() {
sequence.getAndIncrement()));
// TODO: We need to take care of scenario when one map/reduce
// takes more than 1 slot.
loadStatus.mapSlotsBackfill -=
calcEffectiveIncompleteMapTasks(
loadStatus.mapSlotCapacity, job.getNumberMaps(), 0.0f);
loadStatus.reduceSlotsBackfill -=
calcEffectiveIncompleteReduceTasks(
loadStatus.reduceSlotCapacity, job.getNumberReduces(),
0.0f);
--loadStatus.numJobsBackfill;
// Lock the loadjob as we are making updates
int incompleteMapTasks = (int) calcEffectiveIncompleteMapTasks(
loadStatus.getMapCapacity(),
job.getNumberMaps(), 0.0f);
loadStatus.decrementMapLoad(incompleteMapTasks);
int incompleteReduceTasks =
(int) calcEffectiveIncompleteReduceTasks(
loadStatus.getReduceCapacity(),
job.getNumberReduces(), 0.0f);
loadStatus.decrementReduceLoad(incompleteReduceTasks);
loadStatus.decrementJobLoad(1);
} catch (IOException e) {
LOG.error("Error while submitting the job ", e);
error = e;
@ -191,7 +205,7 @@ public void run() {
}
} finally {
lock.unlock();
// do nothing
}
}
} catch (InterruptedException e) {
@ -210,19 +224,11 @@ public void run() {
*/
@Override
public void update(Statistics.ClusterStats item) {
lock.lock();
ClusterStatus clusterMetrics = item.getStatus();
try {
ClusterStatus clusterMetrics = item.getStatus();
try {
checkLoadAndGetSlotsToBackfill(item,clusterMetrics);
} catch (Exception e) {
LOG.error("Couldn't get the new Status",e);
}
if (!loadStatus.overloaded()) {
condUnderloaded.signalAll();
}
} finally {
lock.unlock();
checkLoadAndGetSlotsToBackfill(item, clusterMetrics);
} catch (Exception e) {
LOG.error("Couldn't get the new Status",e);
}
}
@ -254,18 +260,25 @@ float calcEffectiveIncompleteReduceTasks(int reduceSlotCapacity,
*/
private void checkLoadAndGetSlotsToBackfill(
ClusterStats stats, ClusterStatus clusterStatus) throws IOException, InterruptedException {
loadStatus.mapSlotCapacity = clusterStatus.getMaxMapTasks();
loadStatus.reduceSlotCapacity = clusterStatus.getMaxReduceTasks();
// update the max cluster capacity incase its updated
int mapCapacity = clusterStatus.getMaxMapTasks();
loadStatus.updateMapCapacity(mapCapacity);
loadStatus.numJobsBackfill =
(int) (maxJobTrackerRatio * clusterStatus.getTaskTrackers())
- stats.getNumRunningJob();
if (loadStatus.numJobsBackfill <= 0) {
int reduceCapacity = clusterStatus.getMaxReduceTasks();
loadStatus.updateReduceCapacity(reduceCapacity);
int numTrackers = clusterStatus.getTaskTrackers();
int jobLoad =
(int) (maxJobTrackerRatio * numTrackers) - stats.getNumRunningJob();
loadStatus.updateJobLoad(jobLoad);
if (loadStatus.getJobLoad() <= 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(System.currentTimeMillis() + " Overloaded is "
LOG.debug(System.currentTimeMillis() + " [JobLoad] Overloaded is "
+ Boolean.TRUE.toString() + " NumJobsBackfill is "
+ loadStatus.numJobsBackfill);
+ loadStatus.getJobLoad());
}
return; // stop calculation because we know it is overloaded.
}
@ -275,56 +288,84 @@ private void checkLoadAndGetSlotsToBackfill(
float mapProgress = job.getJob().mapProgress();
int noOfMaps = job.getNoOfMaps();
incompleteMapTasks +=
calcEffectiveIncompleteMapTasks(
clusterStatus.getMaxMapTasks(), noOfMaps, mapProgress);
calcEffectiveIncompleteMapTasks(mapCapacity, noOfMaps, mapProgress);
}
loadStatus.mapSlotsBackfill =
(int) ((overloadMapTaskMapSlotRatio * clusterStatus.getMaxMapTasks())
- incompleteMapTasks);
if (loadStatus.mapSlotsBackfill <= 0) {
int mapSlotsBackFill =
(int) ((overloadMapTaskMapSlotRatio * mapCapacity) - incompleteMapTasks);
loadStatus.updateMapLoad(mapSlotsBackFill);
if (loadStatus.getMapLoad() <= 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(System.currentTimeMillis() + " Overloaded is "
LOG.debug(System.currentTimeMillis() + " [MAP-LOAD] Overloaded is "
+ Boolean.TRUE.toString() + " MapSlotsBackfill is "
+ loadStatus.mapSlotsBackfill);
+ loadStatus.getMapLoad());
}
return; // stop calculation because we know it is overloaded.
}
float incompleteReduceTasks = 0; // include pending & running reduce tasks.
for (JobStats job : ClusterStats.getRunningJobStats()) {
int noOfReduces = job.getJob().getNumReduceTasks();
// Cached the num-reds value in JobStats
int noOfReduces = job.getNoOfReds();
if (noOfReduces > 0) {
float reduceProgress = job.getJob().reduceProgress();
incompleteReduceTasks +=
calcEffectiveIncompleteReduceTasks(
clusterStatus.getMaxReduceTasks(), noOfReduces, reduceProgress);
calcEffectiveIncompleteReduceTasks(reduceCapacity, noOfReduces,
reduceProgress);
}
}
loadStatus.reduceSlotsBackfill =
(int) ((overloadReduceTaskReduceSlotRatio * clusterStatus.getMaxReduceTasks())
int reduceSlotsBackFill =
(int)((overloadReduceTaskReduceSlotRatio * reduceCapacity)
- incompleteReduceTasks);
if (loadStatus.reduceSlotsBackfill <= 0) {
loadStatus.updateReduceLoad(reduceSlotsBackFill);
if (loadStatus.getReduceLoad() <= 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(System.currentTimeMillis() + " Overloaded is "
LOG.debug(System.currentTimeMillis() + " [REDUCE-LOAD] Overloaded is "
+ Boolean.TRUE.toString() + " ReduceSlotsBackfill is "
+ loadStatus.reduceSlotsBackfill);
+ loadStatus.getReduceLoad());
}
return; // stop calculation because we know it is overloaded.
}
if (LOG.isDebugEnabled()) {
LOG.debug(System.currentTimeMillis() + " Overloaded is "
LOG.debug(System.currentTimeMillis() + " [OVERALL] Overloaded is "
+ Boolean.FALSE.toString() + "Current load Status is "
+ loadStatus);
}
}
static class LoadStatus {
int mapSlotsBackfill;
int mapSlotCapacity;
int reduceSlotsBackfill;
int reduceSlotCapacity;
int numJobsBackfill;
/**
* Additional number of map slots that can be requested before
* declaring (by Gridmix STRESS mode) the cluster as overloaded.
*/
private volatile int mapSlotsBackfill;
/**
* Determines the total map slot capacity of the cluster.
*/
private volatile int mapSlotCapacity;
/**
* Additional number of reduce slots that can be requested before
* declaring (by Gridmix STRESS mode) the cluster as overloaded.
*/
private volatile int reduceSlotsBackfill;
/**
* Determines the total reduce slot capacity of the cluster.
*/
private volatile int reduceSlotCapacity;
/**
* Determines the max count of running jobs in the cluster.
*/
private volatile int numJobsBackfill;
// set the default to true
private AtomicBoolean overloaded = new AtomicBoolean(true);
/**
* Construct the LoadStatus in an unknown state - assuming the cluster is
@ -339,12 +380,76 @@ static class LoadStatus {
reduceSlotCapacity = -1;
}
public boolean overloaded() {
return (mapSlotsBackfill <= 0) || (reduceSlotsBackfill <= 0)
|| (numJobsBackfill <= 0);
public synchronized int getMapLoad() {
return mapSlotsBackfill;
}
public String toString() {
public synchronized int getMapCapacity() {
return mapSlotCapacity;
}
public synchronized int getReduceLoad() {
return reduceSlotsBackfill;
}
public synchronized int getReduceCapacity() {
return reduceSlotCapacity;
}
public synchronized int getJobLoad() {
return numJobsBackfill;
}
public synchronized void decrementMapLoad(int mapSlotsConsumed) {
this.mapSlotsBackfill -= mapSlotsConsumed;
updateOverloadStatus();
}
public synchronized void decrementReduceLoad(int reduceSlotsConsumed) {
this.reduceSlotsBackfill -= reduceSlotsConsumed;
updateOverloadStatus();
}
public synchronized void decrementJobLoad(int numJobsConsumed) {
this.numJobsBackfill -= numJobsConsumed;
updateOverloadStatus();
}
public synchronized void updateMapCapacity(int mapSlotsCapacity) {
this.mapSlotCapacity = mapSlotsCapacity;
updateOverloadStatus();
}
public synchronized void updateReduceCapacity(int reduceSlotsCapacity) {
this.reduceSlotCapacity = reduceSlotsCapacity;
updateOverloadStatus();
}
public synchronized void updateMapLoad(int mapSlotsBackfill) {
this.mapSlotsBackfill = mapSlotsBackfill;
updateOverloadStatus();
}
public synchronized void updateReduceLoad(int reduceSlotsBackfill) {
this.reduceSlotsBackfill = reduceSlotsBackfill;
updateOverloadStatus();
}
public synchronized void updateJobLoad(int numJobsBackfill) {
this.numJobsBackfill = numJobsBackfill;
updateOverloadStatus();
}
private synchronized void updateOverloadStatus() {
overloaded.set((mapSlotsBackfill <= 0) || (reduceSlotsBackfill <= 0)
|| (numJobsBackfill <= 0));
}
public synchronized boolean overloaded() {
return overloaded.get();
}
public synchronized String toString() {
// TODO Use StringBuilder instead
return " Overloaded = " + overloaded()
+ ", MapSlotBackfill = " + mapSlotsBackfill

View File

@ -52,15 +52,23 @@ public class ResourceUsageMatcher {
@SuppressWarnings("unchecked")
public void configure(Configuration conf, ResourceCalculatorPlugin monitor,
ResourceUsageMetrics metrics, Progressive progress) {
Class[] plugins =
conf.getClasses(RESOURCE_USAGE_EMULATION_PLUGINS,
ResourceUsageEmulatorPlugin.class);
Class[] plugins = conf.getClasses(RESOURCE_USAGE_EMULATION_PLUGINS);
if (plugins == null) {
System.out.println("No resource usage emulator plugins configured.");
} else {
for (Class<? extends ResourceUsageEmulatorPlugin> plugin : plugins) {
if (plugin != null) {
emulationPlugins.add(ReflectionUtils.newInstance(plugin, conf));
for (Class clazz : plugins) {
if (clazz != null) {
if (ResourceUsageEmulatorPlugin.class.isAssignableFrom(clazz)) {
ResourceUsageEmulatorPlugin plugin =
(ResourceUsageEmulatorPlugin) ReflectionUtils.newInstance(clazz,
conf);
emulationPlugins.add(plugin);
} else {
throw new RuntimeException("Misconfigured resource usage plugins. "
+ "Class " + clazz.getClass().getName() + " is not a resource "
+ "usage plugin as it does not extend "
+ ResourceUsageEmulatorPlugin.class.getName());
}
}
}
}

View File

@ -101,10 +101,17 @@ public TestMonitor(int expected, Statistics stats) {
retiredJobs = new LinkedBlockingQueue<Job>();
}
public void verify(ArrayList<JobStory> submitted) throws Exception {
public void verify(ArrayList<JobStory> submitted, Configuration clientConf)
throws Exception {
final ArrayList<Job> succeeded = new ArrayList<Job>();
assertEquals("Bad job count", expected, retiredJobs.drainTo(succeeded));
final HashMap<String,JobStory> sub = new HashMap<String,JobStory>();
// define the input and output path for the run
final Path in = new Path("foo").makeQualified(GridmixTestUtils.dfs);
final Path out =
new Path(in, clientConf.get(Gridmix.GRIDMIX_OUT_DIR, "gridmix"));
for (JobStory spec : submitted) {
sub.put(spec.getJobID().toString(), spec);
}
@ -115,8 +122,7 @@ public void verify(ArrayList<JobStory> submitted) throws Exception {
Configuration conf = job.getConfiguration();
if (GenerateData.JOB_NAME.equals(jobName)) {
verifyQueue(conf, jobName);
final Path in = new Path("foo").makeQualified(GridmixTestUtils.dfs);
final Path out = new Path("/gridmix").makeQualified(GridmixTestUtils.dfs);
final ContentSummary generated = GridmixTestUtils.dfs.getContentSummary(in);
assertTrue("Mismatched data gen", // +/- 100k for logs
(GENDATA << 20) < generated.getLength() + GENSLOP ||
@ -164,7 +170,7 @@ public void verify(ArrayList<JobStory> submitted) throws Exception {
final FileStatus stat =
GridmixTestUtils.dfs.getFileStatus(
new Path(GridmixTestUtils.DEST, "" + Integer.valueOf(jobSeqNum)));
new Path(out, "" + Integer.valueOf(jobSeqNum)));
assertEquals("Wrong owner for " + jobName, spec.getUser(),
stat.getOwner());
@ -337,8 +343,9 @@ static class DebugGridmix extends Gridmix {
private JobFactory factory;
private TestMonitor monitor;
public void checkMonitor() throws Exception {
monitor.verify(((DebugJobFactory.Debuggable)factory).getSubmitted());
public void checkMonitor(Configuration conf) throws Exception {
monitor.verify(((DebugJobFactory.Debuggable)factory).getSubmitted(),
conf);
}
@Override
@ -534,9 +541,11 @@ private void doSubmission(boolean useDefaultQueue,
GridmixTestUtils.dfs.setPermission(root, new FsPermission((short)0777));
int res = ToolRunner.run(conf, client, argv);
assertEquals("Client exited with nonzero status", 0, res);
client.checkMonitor();
client.checkMonitor(conf);
} catch (Exception e) {
e.printStackTrace();
// fail the test if there is an exception
throw new RuntimeException(e);
} finally {
in.getFileSystem(conf).delete(in, true);
out.getFileSystem(conf).delete(out, true);

View File

@ -159,7 +159,7 @@ public void update(Object item) {
@Override
protected Thread createReaderThread() {
return null;
return new Thread();
}
}
@ -243,7 +243,7 @@ public void testExecutionSummarizer() throws IOException {
tid, es.getInputTraceSignature());
// test trace location
Path qPath = fs.makeQualified(testTraceFile);
assertEquals("Mismatch in trace signature",
assertEquals("Mismatch in trace filename",
qPath.toString(), es.getInputTraceLocation());
// test expected data size
assertEquals("Mismatch in expected data size",
@ -275,7 +275,7 @@ public void testExecutionSummarizer() throws IOException {
es.finalize(factory, testTraceFile.toString(), 0L, resolver, dataStats,
conf);
// test missing expected data size
assertEquals("Mismatch in trace signature",
assertEquals("Mismatch in trace data size",
Summarizer.NA, es.getExpectedDataSize());
assertFalse("Mismatch in trace signature",
tid.equals(es.getInputTraceSignature()));
@ -295,6 +295,12 @@ public void testExecutionSummarizer() throws IOException {
assertEquals("Mismatch in trace signature",
tid, es.getInputTraceSignature());
// finalize trace identifier '-' input
es.finalize(factory, "-", 0L, resolver, dataStats, conf);
assertEquals("Mismatch in trace signature",
Summarizer.NA, es.getInputTraceSignature());
assertEquals("Mismatch in trace file location",
Summarizer.NA, es.getInputTraceLocation());
}
// test the ExecutionSummarizer
@ -332,7 +338,7 @@ public boolean isSuccessful() throws IOException, InterruptedException {
return isSuccessful;
};
};
return new JobStats(numMaps, fakeJob);
return new JobStats(numMaps, numReds, fakeJob);
}
/**