Merge r1240450 through r1241553 from 0.23.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1241555 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-02-07 19:12:19 +00:00
commit 88bf529cfd
90 changed files with 1907 additions and 577 deletions

View File

@ -71,6 +71,14 @@
</includes>
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>bin</directory>
<outputDirectory>sbin</outputDirectory>
<includes>
<include>mr-jobhistory-daemon.sh</include>
</includes>
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>hadoop-yarn/conf</directory>
<outputDirectory>etc/hadoop</outputDirectory>

View File

@ -153,13 +153,18 @@ Release 0.23.1 - Unreleased
HADOOP-8002. SecurityUtil acquired token message should be a debug rather than info.
(Arpit Gupta via mahadev)
HADOOP-8009. Create hadoop-client and hadoop-minicluster artifacts for downstream
projects. (tucu)
HADOOP-8009. Create hadoop-client and hadoop-minicluster artifacts for downstream
projects. (tucu)
HADOOP-7470. Move up to Jackson 1.8.8. (Enis Soztutar via szetszwo)
HADOOP-8027. Visiting /jmx on the daemon web interfaces may print
unnecessary error in logs. (atm)
OPTIMIZATIONS
HADOOP-7761. Improve the performance of raw comparisons. (todd)
BUG FIXES
HADOOP-8013. ViewFileSystem does not honor setVerifyChecksum
(Daryn Sharp via bobby)

View File

@ -0,0 +1,237 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io;
import java.lang.reflect.Field;
import java.nio.ByteOrder;
import java.security.AccessController;
import java.security.PrivilegedAction;
import sun.misc.Unsafe;
import com.google.common.primitives.Longs;
import com.google.common.primitives.UnsignedBytes;
/**
* Utility code to do optimized byte-array comparison.
* This is borrowed and slightly modified from Guava's {@link UnsignedBytes}
* class to be able to compare arrays that start at non-zero offsets.
*/
abstract class FastByteComparisons {
/**
* Lexicographically compare two byte arrays.
*/
public static int compareTo(byte[] b1, int s1, int l1, byte[] b2, int s2,
int l2) {
return LexicographicalComparerHolder.BEST_COMPARER.compareTo(
b1, s1, l1, b2, s2, l2);
}
private interface Comparer<T> {
abstract public int compareTo(T buffer1, int offset1, int length1,
T buffer2, int offset2, int length2);
}
private static Comparer<byte[]> lexicographicalComparerJavaImpl() {
return LexicographicalComparerHolder.PureJavaComparer.INSTANCE;
}
/**
* Provides a lexicographical comparer implementation; either a Java
* implementation or a faster implementation based on {@link Unsafe}.
*
* <p>Uses reflection to gracefully fall back to the Java implementation if
* {@code Unsafe} isn't available.
*/
private static class LexicographicalComparerHolder {
static final String UNSAFE_COMPARER_NAME =
LexicographicalComparerHolder.class.getName() + "$UnsafeComparer";
static final Comparer<byte[]> BEST_COMPARER = getBestComparer();
/**
* Returns the Unsafe-using Comparer, or falls back to the pure-Java
* implementation if unable to do so.
*/
static Comparer<byte[]> getBestComparer() {
try {
Class<?> theClass = Class.forName(UNSAFE_COMPARER_NAME);
// yes, UnsafeComparer does implement Comparer<byte[]>
@SuppressWarnings("unchecked")
Comparer<byte[]> comparer =
(Comparer<byte[]>) theClass.getEnumConstants()[0];
return comparer;
} catch (Throwable t) { // ensure we really catch *everything*
return lexicographicalComparerJavaImpl();
}
}
private enum PureJavaComparer implements Comparer<byte[]> {
INSTANCE;
@Override
public int compareTo(byte[] buffer1, int offset1, int length1,
byte[] buffer2, int offset2, int length2) {
// Short circuit equal case
if (buffer1 == buffer2 &&
offset1 == offset2 &&
length1 == length2) {
return 0;
}
// Bring WritableComparator code local
int end1 = offset1 + length1;
int end2 = offset2 + length2;
for (int i = offset1, j = offset2; i < end1 && j < end2; i++, j++) {
int a = (buffer1[i] & 0xff);
int b = (buffer2[j] & 0xff);
if (a != b) {
return a - b;
}
}
return length1 - length2;
}
}
@SuppressWarnings("unused") // used via reflection
private enum UnsafeComparer implements Comparer<byte[]> {
INSTANCE;
static final Unsafe theUnsafe;
/** The offset to the first element in a byte array. */
static final int BYTE_ARRAY_BASE_OFFSET;
static {
theUnsafe = (Unsafe) AccessController.doPrivileged(
new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
return f.get(null);
} catch (NoSuchFieldException e) {
// It doesn't matter what we throw;
// it's swallowed in getBestComparer().
throw new Error();
} catch (IllegalAccessException e) {
throw new Error();
}
}
});
BYTE_ARRAY_BASE_OFFSET = theUnsafe.arrayBaseOffset(byte[].class);
// sanity check - this should never fail
if (theUnsafe.arrayIndexScale(byte[].class) != 1) {
throw new AssertionError();
}
}
static final boolean littleEndian =
ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
/**
* Returns true if x1 is less than x2, when both values are treated as
* unsigned.
*/
static boolean lessThanUnsigned(long x1, long x2) {
return (x1 + Long.MIN_VALUE) < (x2 + Long.MIN_VALUE);
}
/**
* Lexicographically compare two arrays.
*
* @param buffer1 left operand
* @param buffer2 right operand
* @param offset1 Where to start comparing in the left buffer
* @param offset2 Where to start comparing in the right buffer
* @param length1 How much to compare from the left buffer
* @param length2 How much to compare from the right buffer
* @return 0 if equal, < 0 if left is less than right, etc.
*/
@Override
public int compareTo(byte[] buffer1, int offset1, int length1,
byte[] buffer2, int offset2, int length2) {
// Short circuit equal case
if (buffer1 == buffer2 &&
offset1 == offset2 &&
length1 == length2) {
return 0;
}
int minLength = Math.min(length1, length2);
int minWords = minLength / Longs.BYTES;
int offset1Adj = offset1 + BYTE_ARRAY_BASE_OFFSET;
int offset2Adj = offset2 + BYTE_ARRAY_BASE_OFFSET;
/*
* Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a
* time is no slower than comparing 4 bytes at a time even on 32-bit.
* On the other hand, it is substantially faster on 64-bit.
*/
for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
long lw = theUnsafe.getLong(buffer1, offset1Adj + (long) i);
long rw = theUnsafe.getLong(buffer2, offset2Adj + (long) i);
long diff = lw ^ rw;
if (diff != 0) {
if (!littleEndian) {
return lessThanUnsigned(lw, rw) ? -1 : 1;
}
// Use binary search
int n = 0;
int y;
int x = (int) diff;
if (x == 0) {
x = (int) (diff >>> 32);
n = 32;
}
y = x << 16;
if (y == 0) {
n += 16;
} else {
x = y;
}
y = x << 8;
if (y == 0) {
n += 8;
}
return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
}
}
// The epilogue to cover the last (minLength % 8) elements.
for (int i = minWords * Longs.BYTES; i < minLength; i++) {
int result = UnsignedBytes.compare(
buffer1[offset1 + i],
buffer2[offset2 + i]);
if (result != 0) {
return result;
}
}
return length1 - length2;
}
}
}
}

View File

@ -151,16 +151,7 @@ public class WritableComparator implements RawComparator {
/** Lexicographic order of binary data. */
public static int compareBytes(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
int end1 = s1 + l1;
int end2 = s2 + l2;
for (int i = s1, j = s2; i < end1 && j < end2; i++, j++) {
int a = (b1[i] & 0xff);
int b = (b2[j] & 0xff);
if (a != b) {
return a - b;
}
}
return l1 - l2;
return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
}
/** Compute hash for binary data. */

View File

@ -34,6 +34,7 @@ import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.ReflectionException;
import javax.management.RuntimeMBeanException;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.TabularData;
@ -312,6 +313,15 @@ public class JMXJsonServlet extends HttpServlet {
Object value = null;
try {
value = mBeanServer.getAttribute(oname, attName);
} catch (RuntimeMBeanException e) {
// UnsupportedOperationExceptions happen in the normal course of business,
// so no need to log them as errors all the time.
if (e.getCause() instanceof UnsupportedOperationException) {
LOG.debug("getting attribute "+attName+" of "+oname+" threw an exception", e);
} else {
LOG.error("getting attribute "+attName+" of "+oname+" threw an exception", e);
}
return;
} catch (AttributeNotFoundException e) {
//Ignored the attribute was not found, which should never happen because the bean
//just told us that it has this attribute, but if this happens just don't output

View File

@ -214,6 +214,11 @@ Release 0.23.1 - UNRELEASED
HDFS-2784. Update hftp and hdfs for host-based token support.
(Kihwal Lee via jitendra)
HDFS-2785. Update webhdfs and httpfs for host-based token support.
(Robert Joseph Evans via jitendra)
HDFS-2868. Expose xceiver counts via the DataNode MXBean. (harsh)
OPTIMIZATIONS
HDFS-2130. Switch default checksum to CRC32C. (todd)
@ -324,6 +329,9 @@ Release 0.23.1 - UNRELEASED
HDFS-2889. getNumCurrentReplicas is package private but should be public on
0.23 (see HDFS-2408). (Gregory Chanan via atm)
HDFS-2893. The start/stop scripts don't start/stop the 2NN when
using the default configuration. (eli)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES
@ -1667,6 +1675,8 @@ Release 0.22.1 - Unreleased
HDFS-2718. Optimize OP_ADD in edits loading. (shv)
HDFS-2886. CreateEditLogs should generate a realistic edit log. (shv)
BUG FIXES
HDFS-2877. If locking of a storage dir fails, it will remove the other

View File

@ -59,7 +59,7 @@ echo "Starting namenodes on [$NAMENODES]"
--script "$bin/hdfs" start namenode $nameStartOpt
#---------------------------------------------------------
# datanodes (using defalut slaves file)
# datanodes (using default slaves file)
if [ -n "$HADOOP_SECURE_DN_USER" ]; then
echo \
@ -74,22 +74,13 @@ fi
#---------------------------------------------------------
# secondary namenodes (if any)
# if there are no secondary namenodes configured it returns
# 0.0.0.0 or empty string
SECONDARY_NAMENODES=$($HADOOP_PREFIX/bin/hdfs getconf -secondarynamenodes 2>&-)
SECONDARY_NAMENODES=${SECONDARY_NAMENODES:='0.0.0.0'}
if [ "$SECONDARY_NAMENODES" = '0.0.0.0' ] ; then
echo \
"Secondary namenodes are not configured. " \
"Cannot start secondary namenodes."
else
echo "Starting secondary namenodes [$SECONDARY_NAMENODES]"
echo "Starting secondary namenodes [$SECONDARY_NAMENODES]"
"$HADOOP_PREFIX/sbin/hadoop-daemons.sh" \
"$HADOOP_PREFIX/sbin/hadoop-daemons.sh" \
--config "$HADOOP_CONF_DIR" \
--hostnames "$SECONDARY_NAMENODES" \
--script "$bin/hdfs" start secondarynamenode
fi
# eof

View File

@ -50,22 +50,13 @@ fi
#---------------------------------------------------------
# secondary namenodes (if any)
# if there are no secondary namenodes configured it returns
# 0.0.0.0 or empty string
SECONDARY_NAMENODES=$($HADOOP_PREFIX/bin/hdfs getconf -secondarynamenodes 2>&-)
SECONDARY_NAMENODES=${SECONDARY_NAMENODES:-'0.0.0.0'}
if [ "$SECONDARY_NAMENODES" = '0.0.0.0' ] ; then
echo \
"Secondary namenodes are not configured. " \
"Cannot stop secondary namenodes."
else
echo "Stopping secondary namenodes [$SECONDARY_NAMENODES]"
echo "Stopping secondary namenodes [$SECONDARY_NAMENODES]"
"$HADOOP_PREFIX/sbin/hadoop-daemons.sh" \
"$HADOOP_PREFIX/sbin/hadoop-daemons.sh" \
--config "$HADOOP_CONF_DIR" \
--hostnames "$SECONDARY_NAMENODES" \
--script "$bin/hdfs" stop secondarynamenode
fi
# eof

View File

@ -253,9 +253,7 @@
The secondary NameNode merges the fsimage and the edits log files periodically
and keeps edits log size within a limit. It is usually run on a
different machine than the primary NameNode since its memory requirements
are on the same order as the primary NameNode. The secondary
NameNode is started by <code>bin/start-dfs.sh</code> on the nodes
specified in <code>conf/masters</code> file.
are on the same order as the primary NameNode.
</p>
<p>
The start of the checkpoint process on the secondary NameNode is

View File

@ -1297,7 +1297,8 @@ public class DataNode extends Configured
}
/** Number of concurrent xceivers per node. */
int getXceiverCount() {
@Override // DataNodeMXBean
public int getXceiverCount() {
return threadGroup == null ? 0 : threadGroup.activeCount();
}

View File

@ -70,5 +70,10 @@ public interface DataNodeMXBean {
* @return the cluster id
*/
public String getClusterId();
/**
* Returns an estimate of the number of Datanode threads
* actively transferring blocks.
*/
public int getXceiverCount();
}

View File

@ -308,12 +308,9 @@ public class FSDirectory implements Closeable {
*/
void updateFile(INodeFile file,
String path,
PermissionStatus permissions,
BlockInfo[] blocks,
short replication,
long mtime,
long atime,
long preferredBlockSize) throws IOException {
long atime) throws IOException {
// Update the salient file attributes.
file.setAccessTime(atime);

View File

@ -203,9 +203,8 @@ public class FSEditLogLoader {
addCloseOp.mtime, addCloseOp.atime, blockSize,
addCloseOp.clientName, addCloseOp.clientMachine);
} else {
fsDir.updateFile(oldFile,
addCloseOp.path, permissions, blocks, replication,
addCloseOp.mtime, addCloseOp.atime, blockSize);
fsDir.updateFile(oldFile, addCloseOp.path, blocks,
addCloseOp.mtime, addCloseOp.atime);
if(addCloseOp.opCode == FSEditLogOpCodes.OP_CLOSE) { // OP_CLOSE
assert oldFile.isUnderConstruction() :
"File is not under construction: " + addCloseOp.path;

View File

@ -141,6 +141,7 @@ public class WebHdfsFileSystem extends FileSystem
private final UserGroupInformation ugi;
private InetSocketAddress nnAddr;
private URI uri;
private Token<?> delegationToken;
private final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
private Path workingDir;
@ -158,7 +159,11 @@ public class WebHdfsFileSystem extends FileSystem
) throws IOException {
super.initialize(uri, conf);
setConf(conf);
try {
this.uri = new URI(uri.getScheme(), uri.getAuthority(), null, null, null);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
this.nnAddr = NetUtils.createSocketAddr(uri.toString());
this.workingDir = getHomeDirectory();
@ -203,12 +208,7 @@ public class WebHdfsFileSystem extends FileSystem
@Override
public URI getUri() {
try {
return new URI(SCHEME, null, nnAddr.getHostName(), nnAddr.getPort(),
null, null, null);
} catch (URISyntaxException e) {
return null;
}
return this.uri;
}
/** @return the home directory. */
@ -810,8 +810,7 @@ public class WebHdfsFileSystem extends FileSystem
final Token<?> token, final Configuration conf
) throws IOException, InterruptedException, URISyntaxException {
final InetSocketAddress nnAddr = NetUtils.createSocketAddr(
token.getService().toString());
final InetSocketAddress nnAddr = SecurityUtil.getTokenServiceAddr(token);
final URI uri = DFSUtil.createUri(WebHdfsFileSystem.SCHEME, nnAddr);
return (WebHdfsFileSystem)FileSystem.get(uri, conf);
}
@ -821,7 +820,7 @@ public class WebHdfsFileSystem extends FileSystem
) throws IOException, InterruptedException {
final UserGroupInformation ugi = UserGroupInformation.getLoginUser();
// update the kerberos credentials, if they are coming from a keytab
ugi.checkTGTAndReloginFromKeytab();
ugi.reloginFromKeytab();
try {
WebHdfsFileSystem webhdfs = getWebHdfs(token, conf);

View File

@ -65,6 +65,11 @@ public class TestDataNodeMXBean {
String volumeInfo = (String)mbs.getAttribute(mxbeanName, "VolumeInfo");
Assert.assertEquals(replaceDigits(datanode.getVolumeInfo()),
replaceDigits(volumeInfo));
// Ensure mxbean's XceiverCount is same as the DataNode's
// live value.
int xceiverCount = (Integer)mbs.getAttribute(mxbeanName,
"XceiverCount");
Assert.assertEquals(datanode.getXceiverCount(), xceiverCount);
} finally {
if (cluster != null) {cluster.shutdown();}
}

View File

@ -93,7 +93,9 @@ public class CreateEditsLog {
dirInode = new INodeDirectory(p, 0L);
editLog.logMkDir(currentDir, dirInode);
}
editLog.logOpenFile(filePath, inode);
editLog.logOpenFile(filePath,
new INodeFileUnderConstruction(
p, replication, 0, blockSize, "", "", null));
editLog.logCloseFile(filePath, inode);
if (currentBlockId - bidAtSync >= 2000) { // sync every 2K blocks

View File

@ -133,8 +133,8 @@ public class TestEditLog extends TestCase {
for (int i = 0; i < numTransactions; i++) {
INodeFileUnderConstruction inode = new INodeFileUnderConstruction(
p, replication, blockSize, 0, "", "", null);
editLog.logOpenFile("/filename" + startIndex + i, inode);
editLog.logCloseFile("/filename" + startIndex + i, inode);
editLog.logOpenFile("/filename" + (startIndex + i), inode);
editLog.logCloseFile("/filename" + (startIndex + i), inode);
editLog.logSync();
}
}

View File

@ -212,7 +212,13 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3756. Made single shuffle limit configurable. (Hitesh Shah via
acmurthy)
MAPREDUCE-3811. Made jobclient-to-AM retries configurable. (sseth via
acmurthy)
BUG FIXES
MAPREDUCE-3804. yarn webapp interface vulnerable to cross scripting attacks
(Dave Thompson via bobby)
MAPREDUCE-2784. [Gridmix] Bug fixes in ExecutionSummarizer and
ResourceUsageMatcher. (amarrk)
@ -659,6 +665,57 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3803. Fix broken build of raid contrib due to HDFS-2864.
(Ravi Prakash via suresh)
MAPREDUCE-3791. can't build site in hadoop-yarn-server-common.
(mahadev)
MAPREDUCE-3723. TestAMWebServicesJobs & TestHSWebServicesJobs
incorrectly asserting tests (Bhallamudi Venkata Siva Kamesh
via mahadev)
MAPREDUCE-3795. "job -status" command line output is malformed.
(vinodkv via mahadev)
MAPREDUCE-3759. ClassCastException thrown in -list-active-trackers when
there are a few unhealthy nodes (vinodkv via mahadev)
MAPREDUCE-3775. Change MiniYarnCluster to escape special chars in testname.
(Hitesh Shah via mahadev)
MAPREDUCE-3765. FifoScheduler does not respect yarn.scheduler.fifo.minimum-
allocation-mb setting (Hitesh Shah via mahadev)
MAPREDUCE-3747. Initialize queue metrics upfront and added start/finish
time to RM Web-UI. (acmurthy)
MAPREDUCE-3814. Fixed MRV1 compilation. (Arun C Murthy via vinodkv)
MAPREDUCE-3810. Performance tweaks - reduced logging in AM and defined
hascode/equals for ResourceRequest & Priority. (vinodkv via acmurthy)
MAPREDUCE-3813. Added a cache for resolved racks. (vinodkv via acmurthy)
MAPREDUCE-3808. Fixed an NPE in FileOutputCommitter for jobs with maps
but no reduces. (Robert Joseph Evans via vinodkv)
MAPREDUCE-3354. Changed scripts so that jobhistory server is started by
bin/mapred instead of bin/yarn. (Jonathan Eagles via acmurthy)
MAPREDUCE-3809. Ensure that there is no needless sleep in Task at the end
of the task. (sseth via acmurthy)
MAPREDUCE-3794. Support mapred.Task.Counter and mapred.JobInProgress.Counter
enums for compatibility (Tom White via mahadev)
MAPREDUCE-3697. Support binary compatibility for Counters after
MAPREDUCE-901. (mahadev via acmurthy)
MAPREDUCE-3817. Fixed bin/mapred to allow running of distcp and archive
jobs. (Arpit Gupta via acmurthy)
MAPREDUCE-3709. TestDistributedShell is failing. (Hitesh Shah via
mahadev)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -59,12 +59,12 @@ Step 10) sbin/yarn-daemon.sh start resourcemanager
Step 11) sbin/yarn-daemon.sh start nodemanager
Step 12) sbin/yarn-daemon.sh start historyserver
Step 12) sbin/mr-jobhistory-daemon.sh start historyserver
Step 13) You are all set, an example on how to run a mapreduce job is:
cd $HADOOP_MAPRED_HOME
ant examples -Dresolvers=internal
$HADOOP_COMMON_HOME/bin/hadoop jar $HADOOP_MAPRED_HOME/build/hadoop-mapreduce-examples-0.23.0-SNAPSHOT.jar randomwriter -Dmapreduce.job.user.name=$USER -Dmapreduce.clientfactory.class.name=org.apache.hadoop.mapred.YarnClientFactory -Dmapreduce.randomwriter.bytespermap=10000 -Ddfs.blocksize=536870912 -Ddfs.block.size=536870912 -libjars $YARN_HOME/modules/hadoop-mapreduce-client-jobclient-0.23.0-SNAPSHOT.jar output
$HADOOP_COMMON_HOME/bin/hadoop jar $HADOOP_MAPRED_HOME/build/hadoop-mapreduce-examples-*.jar randomwriter -Dmapreduce.job.user.name=$USER -Dmapreduce.clientfactory.class.name=org.apache.hadoop.mapred.YarnClientFactory -Dmapreduce.randomwriter.bytespermap=10000 -Ddfs.blocksize=536870912 -Ddfs.block.size=536870912 -libjars $YARN_HOME/modules/hadoop-mapreduce-client-jobclient-*.jar output
The output on the command line should be almost similar to what you see in the JT/TT setup (Hadoop 0.20/0.21)

View File

@ -36,6 +36,9 @@ function print_usage(){
echo " classpath prints the class path needed for running"
echo " mapreduce subcommands"
echo " groups get the groups which users belong to"
echo " historyserver run job history servers as a standalone daemon"
echo " distcp <srcurl> <desturl> copy file or directories recursively"
echo " archive -archiveName NAME -p <parent path> <src>* <dest> create a hadoop archive"
echo ""
echo "Most commands print help when invoked w/o parameters."
}
@ -48,6 +51,8 @@ fi
COMMAND=$1
shift
HADOOP_JOB_HISTORYSERVER_OPTS="-Dmapred.jobsummary.logger=${HADOOP_JHS_LOGGER:-INFO,console}"
if [ "$COMMAND" = "job" ] ; then
CLASS=org.apache.hadoop.mapred.JobClient
elif [ "$COMMAND" = "queue" ] ; then
@ -63,6 +68,9 @@ elif [ "$COMMAND" = "classpath" ] ; then
elif [ "$COMMAND" = "groups" ] ; then
CLASS=org.apache.hadoop.mapred.tools.GetGroups
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "historyserver" ] ; then
CLASS=org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_JOB_HISTORYSERVER_OPTS"
elif [ "$COMMAND" = "mradmin" ] \
|| [ "$COMMAND" = "jobtracker" ] \
|| [ "$COMMAND" = "tasktracker" ] ; then
@ -70,6 +78,14 @@ elif [ "$COMMAND" = "mradmin" ] \
echo "You may find similar functionality with the \"yarn\" shell command."
print_usage
exit
elif [ "$COMMAND" = "distcp" ] ; then
CLASS=org.apache.hadoop.tools.DistCp
CLASSPATH=${CLASSPATH}:${TOOL_PATH}
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "archive" ] ; then
CLASS=org.apache.hadoop.tools.HadoopArchives
CLASSPATH=${CLASSPATH}:${TOOL_PATH}
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
else
echo $COMMAND - invalid command
print_usage
@ -103,6 +119,11 @@ for f in $HADOOP_MAPRED_HOME/${MAPRED_LIB_JARS_DIR}/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
# add modules to CLASSPATH
for f in $HADOOP_MAPRED_HOME/modules/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
if $cygwin; then
CLASSPATH=`cygpath -p -w "$CLASSPATH"`
fi
@ -112,12 +133,7 @@ if [ "$COMMAND" = "classpath" ] ; then
exit
fi
#turn security logger on the jobtracker
if [ $COMMAND = "jobtracker" ]; then
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,DRFAS}"
else
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,NullAppender}"
fi
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,NullAppender}"
export CLASSPATH
exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"

View File

@ -0,0 +1,144 @@
#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Runs a yarn command as a daemon.
#
# Environment Variables
#
# YARN_CONF_DIR Alternate conf dir. Default is ${YARN_HOME}/conf.
# YARN_LOG_DIR Where log files are stored. PWD by default.
# YARN_MASTER host:path where hadoop code should be rsync'd from
# YARN_PID_DIR The pid files are stored. /tmp by default.
# YARN_IDENT_STRING A string representing this instance of hadoop. $USER by default
# YARN_NICENESS The scheduling priority for daemons. Defaults to 0.
##
usage="Usage: mr-jobhistory-daemon.sh [--config <conf-dir>] [--hosts hostlistfile] (start|stop) <mapred-command> "
# if no args specified, show usage
if [ $# -le 1 ]; then
echo $usage
exit 1
fi
bin=`dirname "${BASH_SOURCE-$0}"`
bin=`cd "$bin"; pwd`
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/yarn-config.sh
# get arguments
startStop=$1
shift
command=$1
shift
hadoop_rotate_log ()
{
log=$1;
num=5;
if [ -n "$2" ]; then
num=$2
fi
if [ -f "$log" ]; then # rotate logs
while [ $num -gt 1 ]; do
prev=`expr $num - 1`
[ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num"
num=$prev
done
mv "$log" "$log.$num";
fi
}
if [ -f "${YARN_CONF_DIR}/yarn-env.sh" ]; then
. "${YARN_CONF_DIR}/yarn-env.sh"
fi
if [ "$YARN_IDENT_STRING" = "" ]; then
export YARN_IDENT_STRING="$USER"
fi
# get log directory
if [ "$YARN_LOG_DIR" = "" ]; then
export YARN_LOG_DIR="$YARN_HOME/logs"
fi
mkdir -p "$YARN_LOG_DIR"
chown $YARN_IDENT_STRING $YARN_LOG_DIR
if [ "$YARN_PID_DIR" = "" ]; then
YARN_PID_DIR=/tmp
fi
# some variables
export YARN_LOGFILE=yarn-$YARN_IDENT_STRING-$command-$HOSTNAME.log
export YARN_ROOT_LOGGER=${YARN_ROOT_LOGGER:-INFO,DRFA}
log=$YARN_LOG_DIR/yarn-$YARN_IDENT_STRING-$command-$HOSTNAME.out
pid=$YARN_PID_DIR/yarn-$YARN_IDENT_STRING-$command.pid
# Set default scheduling priority
if [ "$YARN_NICENESS" = "" ]; then
export YARN_NICENESS=0
fi
case $startStop in
(start)
mkdir -p "$YARN_PID_DIR"
if [ -f $pid ]; then
if kill -0 `cat $pid` > /dev/null 2>&1; then
echo $command running as process `cat $pid`. Stop it first.
exit 1
fi
fi
if [ "$YARN_MASTER" != "" ]; then
echo rsync from $YARN_MASTER
rsync -a -e ssh --delete --exclude=.svn --exclude='logs/*' --exclude='contrib/hod/logs/*' $YARN_MASTER/ "$YARN_HOME"
fi
hadoop_rotate_log $log
echo starting $command, logging to $log
cd "$YARN_HOME"
nohup nice -n $YARN_NICENESS "$YARN_HOME"/bin/mapred --config $YARN_CONF_DIR $command "$@" > "$log" 2>&1 < /dev/null &
echo $! > $pid
sleep 1; head "$log"
;;
(stop)
if [ -f $pid ]; then
if kill -0 `cat $pid` > /dev/null 2>&1; then
echo stopping $command
kill `cat $pid`
else
echo no $command to stop
fi
else
echo no $command to stop
fi
;;
(*)
echo $usage
exit 1
;;
esac

View File

@ -525,43 +525,11 @@
dest.dir="${test.mapred.build.classes}"
classpath="test.classpath"/>
<javac
encoding="${build.encoding}"
srcdir="${test.src.dir}/mapred/testjar"
includes="*.java"
destdir="${test.mapred.build.testjar}"
debug="${javac.debug}"
optimize="${javac.optimize}"
target="${javac.version}"
source="${javac.version}"
deprecation="${javac.deprecation}">
<compilerarg line="${javac.args} ${javac.args.warnings}" />
<classpath refid="test.classpath"/>
</javac>
<delete file="${test.mapred.build.testjar}/testjob.jar"/>
<jar jarfile="${test.mapred.build.testjar}/testjob.jar"
basedir="${test.mapred.build.testjar}">
</jar>
<javac
encoding="${build.encoding}"
srcdir="${test.src.dir}/mapred/testshell"
includes="*.java"
destdir="${test.mapred.build.testshell}"
debug="${javac.debug}"
optimize="${javac.optimize}"
target="${javac.version}"
source="${javac.version}"
deprecation="${javac.deprecation}">
<compilerarg line="${javac.args} ${javac.args.warnings}"/>
<classpath refid="test.classpath"/>
</javac>
<delete file="${test.mapred.build.testshell}/testshell.jar"/>
<jar jarfile="${test.mapred.build.testshell}/testshell.jar"
basedir="${test.mapred.build.testshell}">
</jar>
<delete dir="${test.cache.data}"/>
<mkdir dir="${test.cache.data}"/>
<delete dir="${test.concat.data}"/>
@ -618,8 +586,6 @@
<jar jarfile="${hadoop-mapred-test-sources.jar}">
<fileset dir="${test.src.dir}/mapred" includes="org/apache/hadoop/**/*.java" />
<fileset dir="${test.src.dir}/unit" includes="org/apache/hadoop/**/*.java" />
<fileset dir="${test.src.dir}/mapred/testjar" includes="*.java" />
<fileset dir="${test.src.dir}/mapred/testshell" includes="*.java" />
</jar>
</target>

View File

@ -127,7 +127,7 @@ import org.apache.hadoop.yarn.util.RackResolver;
/**
* Implementation of TaskAttempt interface.
*/
@SuppressWarnings({ "rawtypes", "deprecation" })
@SuppressWarnings({ "rawtypes" })
public abstract class TaskAttemptImpl implements
org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt,
EventHandler<TaskAttemptEvent> {
@ -910,8 +910,10 @@ public abstract class TaskAttemptImpl implements
@SuppressWarnings("unchecked")
@Override
public void handle(TaskAttemptEvent event) {
LOG.info("Processing " + event.getTaskAttemptID() +
" of type " + event.getType());
if (LOG.isDebugEnabled()) {
LOG.debug("Processing " + event.getTaskAttemptID() + " of type "
+ event.getType());
}
writeLock.lock();
try {
final TaskAttemptState oldState = getState();
@ -1278,15 +1280,11 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEvent event) {
//set the finish time
taskAttempt.setFinishTime();
String taskType =
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()).toString();
LOG.info("In TaskAttemptImpl taskType: " + taskType);
long slotMillis = computeSlotMillis(taskAttempt);
JobCounterUpdateEvent jce =
new JobCounterUpdateEvent(taskAttempt.attemptId.getTaskId()
.getJobId());
TaskId taskId = taskAttempt.attemptId.getTaskId();
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
jce.addCounterUpdate(
taskAttempt.attemptId.getTaskId().getTaskType() == TaskType.MAP ?
taskId.getTaskType() == TaskType.MAP ?
JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
slotMillis);
taskAttempt.eventHandler.handle(jce);

View File

@ -81,7 +81,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
/**
* Implementation of Task interface.
*/
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
@SuppressWarnings({ "rawtypes", "unchecked" })
public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
private static final Log LOG = LogFactory.getLog(TaskImpl.class);
@ -505,7 +505,9 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
// This is always called in the Write Lock
private void addAndScheduleAttempt() {
TaskAttempt attempt = createAttempt();
LOG.info("Created attempt " + attempt.getID());
if (LOG.isDebugEnabled()) {
LOG.debug("Created attempt " + attempt.getID());
}
switch (attempts.size()) {
case 0:
attempts = Collections.singletonMap(attempt.getID(), attempt);
@ -537,7 +539,10 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
@Override
public void handle(TaskEvent event) {
LOG.debug("Processing " + event.getTaskID() + " of type " + event.getType());
if (LOG.isDebugEnabled()) {
LOG.debug("Processing " + event.getTaskID() + " of type "
+ event.getType());
}
try {
writeLock.lock();
TaskState oldState = getState();

View File

@ -30,7 +30,6 @@ import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
@ -46,9 +45,9 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -149,7 +148,7 @@ public abstract class RMCommunicator extends AbstractService {
LOG.info("minContainerCapability: " + minContainerCapability.getMemory());
LOG.info("maxContainerCapability: " + maxContainerCapability.getMemory());
} catch (Exception are) {
LOG.info("Exception while registering", are);
LOG.error("Exception while registering", are);
throw new YarnException(are);
}
}
@ -183,7 +182,7 @@ public abstract class RMCommunicator extends AbstractService {
request.setTrackingUrl(historyUrl);
scheduler.finishApplicationMaster(request);
} catch(Exception are) {
LOG.info("Exception while unregistering ", are);
LOG.error("Exception while unregistering ", are);
}
}
@ -205,7 +204,7 @@ public abstract class RMCommunicator extends AbstractService {
try {
allocatorThread.join();
} catch (InterruptedException ie) {
LOG.info("InterruptedException while stopping", ie);
LOG.warn("InterruptedException while stopping", ie);
}
unregister();
super.stop();
@ -228,7 +227,7 @@ public abstract class RMCommunicator extends AbstractService {
// TODO: for other exceptions
}
} catch (InterruptedException e) {
LOG.info("Allocated thread interrupted. Returning.");
LOG.warn("Allocated thread interrupted. Returning.");
return;
}
}
@ -255,7 +254,9 @@ public abstract class RMCommunicator extends AbstractService {
if (UserGroupInformation.isSecurityEnabled()) {
String tokenURLEncodedStr = System.getenv().get(
ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
LOG.debug("AppMasterToken is " + tokenURLEncodedStr);
if (LOG.isDebugEnabled()) {
LOG.debug("AppMasterToken is " + tokenURLEncodedStr);
}
Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
try {

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
@ -254,28 +255,30 @@ public class RMContainerAllocator extends RMContainerRequestor
@SuppressWarnings({ "unchecked" })
protected synchronized void handleEvent(ContainerAllocatorEvent event) {
LOG.info("Processing the event " + event.toString());
recalculateReduceSchedule = true;
if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
ContainerRequestEvent reqEvent = (ContainerRequestEvent) event;
JobId jobId = getJob().getID();
int supportedMaxContainerCapability =
getMaxContainerCapability().getMemory();
if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) {
if (mapResourceReqt == 0) {
mapResourceReqt = reqEvent.getCapability().getMemory();
int minSlotMemSize = getMinContainerCapability().getMemory();
mapResourceReqt = (int) Math.ceil((float) mapResourceReqt/minSlotMemSize)
* minSlotMemSize;
eventHandler.handle(new JobHistoryEvent(getJob().getID(),
eventHandler.handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP,
mapResourceReqt)));
LOG.info("mapResourceReqt:"+mapResourceReqt);
if (mapResourceReqt > getMaxContainerCapability().getMemory()) {
if (mapResourceReqt > supportedMaxContainerCapability) {
String diagMsg = "MAP capability required is more than the supported " +
"max container capability in the cluster. Killing the Job. mapResourceReqt: " +
mapResourceReqt + " maxContainerCapability:" + getMaxContainerCapability().getMemory();
mapResourceReqt + " maxContainerCapability:" + supportedMaxContainerCapability;
LOG.info(diagMsg);
eventHandler.handle(new JobDiagnosticsUpdateEvent(
getJob().getID(), diagMsg));
eventHandler.handle(new JobEvent(getJob().getID(), JobEventType.JOB_KILL));
jobId, diagMsg));
eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
}
}
//set the rounded off memory
@ -288,20 +291,20 @@ public class RMContainerAllocator extends RMContainerRequestor
//round off on slotsize
reduceResourceReqt = (int) Math.ceil((float)
reduceResourceReqt/minSlotMemSize) * minSlotMemSize;
eventHandler.handle(new JobHistoryEvent(getJob().getID(),
eventHandler.handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.REDUCE,
reduceResourceReqt)));
LOG.info("reduceResourceReqt:"+reduceResourceReqt);
if (reduceResourceReqt > getMaxContainerCapability().getMemory()) {
if (reduceResourceReqt > supportedMaxContainerCapability) {
String diagMsg = "REDUCE capability required is more than the " +
"supported max container capability in the cluster. Killing the " +
"Job. reduceResourceReqt: " + reduceResourceReqt +
" maxContainerCapability:" + getMaxContainerCapability().getMemory();
" maxContainerCapability:" + supportedMaxContainerCapability;
LOG.info(diagMsg);
eventHandler.handle(new JobDiagnosticsUpdateEvent(
getJob().getID(), diagMsg));
eventHandler.handle(new JobEvent(getJob().getID(), JobEventType.JOB_KILL));
jobId, diagMsg));
eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
}
}
//set the rounded off memory
@ -317,6 +320,9 @@ public class RMContainerAllocator extends RMContainerRequestor
} else if (
event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) {
LOG.info("Processing the event " + event.toString());
TaskAttemptId aId = event.getAttemptID();
boolean removed = scheduledRequests.remove(aId);
@ -579,7 +585,7 @@ public class RMContainerAllocator extends RMContainerRequestor
computeIgnoreBlacklisting();
for (ContainerStatus cont : finishedContainers) {
LOG.info("Received completed container " + cont);
LOG.info("Received completed container " + cont.getContainerId());
TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId());
if (attemptID == null) {
LOG.error("Container complete event for unknown container id "
@ -664,7 +670,9 @@ public class RMContainerAllocator extends RMContainerRequestor
mapsHostMapping.put(host, list);
}
list.add(event.getAttemptID());
LOG.info("Added attempt req to host " + host);
if (LOG.isDebugEnabled()) {
LOG.debug("Added attempt req to host " + host);
}
}
for (String rack: event.getRacks()) {
LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
@ -673,7 +681,9 @@ public class RMContainerAllocator extends RMContainerRequestor
mapsRackMapping.put(rack, list);
}
list.add(event.getAttemptID());
LOG.info("Added attempt req to rack " + rack);
if (LOG.isDebugEnabled()) {
LOG.debug("Added attempt req to rack " + rack);
}
}
request = new ContainerRequest(event, PRIORITY_MAP);
}
@ -694,18 +704,21 @@ public class RMContainerAllocator extends RMContainerRequestor
containersAllocated += allocatedContainers.size();
while (it.hasNext()) {
Container allocated = it.next();
LOG.info("Assigning container " + allocated.getId() +
" with priority " + allocated.getPriority() +
" to NM " + allocated.getNodeId());
if (LOG.isDebugEnabled()) {
LOG.debug("Assigning container " + allocated.getId()
+ " with priority " + allocated.getPriority() + " to NM "
+ allocated.getNodeId());
}
// check if allocated container meets memory requirements
// and whether we have any scheduled tasks that need
// a container to be assigned
boolean isAssignable = true;
Priority priority = allocated.getPriority();
int allocatedMemory = allocated.getResource().getMemory();
if (PRIORITY_FAST_FAIL_MAP.equals(priority)
|| PRIORITY_MAP.equals(priority)) {
if (allocated.getResource().getMemory() < mapResourceReqt
if (allocatedMemory < mapResourceReqt
|| maps.isEmpty()) {
LOG.info("Cannot assign container " + allocated
+ " for a map as either "
@ -716,7 +729,7 @@ public class RMContainerAllocator extends RMContainerRequestor
}
}
else if (PRIORITY_REDUCE.equals(priority)) {
if (allocated.getResource().getMemory() < reduceResourceReqt
if (allocatedMemory < reduceResourceReqt
|| reduces.isEmpty()) {
LOG.info("Cannot assign container " + allocated
+ " for a reduce as either "
@ -730,15 +743,17 @@ public class RMContainerAllocator extends RMContainerRequestor
boolean blackListed = false;
ContainerRequest assigned = null;
ContainerId allocatedContainerId = allocated.getId();
if (isAssignable) {
// do not assign if allocated container is on a
// blacklisted host
blackListed = isNodeBlacklisted(allocated.getNodeId().getHost());
String allocatedHost = allocated.getNodeId().getHost();
blackListed = isNodeBlacklisted(allocatedHost);
if (blackListed) {
// we need to request for a new container
// and release the current one
LOG.info("Got allocated container on a blacklisted "
+ " host "+allocated.getNodeId().getHost()
+ " host "+allocatedHost
+". Releasing container " + allocated);
// find the request matching this allocated container
@ -775,11 +790,13 @@ public class RMContainerAllocator extends RMContainerRequestor
eventHandler.handle(new TaskAttemptContainerAssignedEvent(
assigned.attemptID, allocated, applicationACLs));
assignedRequests.add(allocated.getId(), assigned.attemptID);
assignedRequests.add(allocatedContainerId, assigned.attemptID);
LOG.info("Assigned container (" + allocated + ") " +
" to task " + assigned.attemptID +
" on node " + allocated.getNodeId().toString());
if (LOG.isDebugEnabled()) {
LOG.info("Assigned container (" + allocated + ") "
+ " to task " + assigned.attemptID + " on node "
+ allocated.getNodeId().toString());
}
}
else {
//not assigned to any request, release the container
@ -794,7 +811,7 @@ public class RMContainerAllocator extends RMContainerRequestor
// or if we could not assign it
if (blackListed || assigned == null) {
containersReleased++;
release(allocated.getId());
release(allocatedContainerId);
}
}
}
@ -807,10 +824,14 @@ public class RMContainerAllocator extends RMContainerRequestor
LOG.info("Assigning container " + allocated + " to fast fail map");
assigned = assignToFailedMap(allocated);
} else if (PRIORITY_REDUCE.equals(priority)) {
LOG.info("Assigning container " + allocated + " to reduce");
if (LOG.isDebugEnabled()) {
LOG.debug("Assigning container " + allocated + " to reduce");
}
assigned = assignToReduce(allocated);
} else if (PRIORITY_MAP.equals(priority)) {
LOG.info("Assigning container " + allocated + " to map");
if (LOG.isDebugEnabled()) {
LOG.debug("Assigning container " + allocated + " to map");
}
assigned = assignToMap(allocated);
} else {
LOG.warn("Container allocated at unwanted priority: " + priority +
@ -897,7 +918,9 @@ public class RMContainerAllocator extends RMContainerRequestor
String host = allocated.getNodeId().getHost();
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
while (list != null && list.size() > 0) {
LOG.info("Host matched to the request list " + host);
if (LOG.isDebugEnabled()) {
LOG.debug("Host matched to the request list " + host);
}
TaskAttemptId tId = list.removeFirst();
if (maps.containsKey(tId)) {
assigned = maps.remove(tId);
@ -906,7 +929,9 @@ public class RMContainerAllocator extends RMContainerRequestor
jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1);
eventHandler.handle(jce);
hostLocalAssigned++;
LOG.info("Assigned based on host match " + host);
if (LOG.isDebugEnabled()) {
LOG.debug("Assigned based on host match " + host);
}
break;
}
}
@ -922,7 +947,9 @@ public class RMContainerAllocator extends RMContainerRequestor
jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1);
eventHandler.handle(jce);
rackLocalAssigned++;
LOG.info("Assigned based on rack match " + rack);
if (LOG.isDebugEnabled()) {
LOG.debug("Assigned based on rack match " + rack);
}
break;
}
}
@ -933,7 +960,9 @@ public class RMContainerAllocator extends RMContainerRequestor
new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
eventHandler.handle(jce);
LOG.info("Assigned based on * match");
if (LOG.isDebugEnabled()) {
LOG.debug("Assigned based on * match");
}
break;
}
}
@ -953,8 +982,7 @@ public class RMContainerAllocator extends RMContainerRequestor
new HashSet<TaskAttemptId>();
void add(ContainerId containerId, TaskAttemptId tId) {
LOG.info("Assigned container " + containerId.toString()
+ " to " + tId);
LOG.info("Assigned container " + containerId.toString() + " to " + tId);
containerToAttemptMap.put(containerId, tId);
if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
maps.put(tId, containerId);
@ -963,6 +991,7 @@ public class RMContainerAllocator extends RMContainerRequestor
}
}
@SuppressWarnings("unchecked")
void preemptReduce(int toPreempt) {
List<TaskAttemptId> reduceList = new ArrayList<TaskAttemptId>
(reduces.keySet());

View File

@ -155,13 +155,14 @@ public abstract class RMContainerRequestor extends RMCommunicator {
lastClusterNmCount = clusterNmCount;
clusterNmCount = allocateResponse.getNumClusterNodes();
LOG.info("getResources() for " + applicationId + ":" + " ask="
+ ask.size() + " release= " + release.size() +
" newContainers=" + response.getAllocatedContainers().size() +
" finishedContainers=" +
response.getCompletedContainersStatuses().size() +
" resourcelimit=" + availableResources +
" knownNMs=" + clusterNmCount);
if (ask.size() > 0 || release.size() > 0) {
LOG.info("getResources() for " + applicationId + ":" + " ask="
+ ask.size() + " release= " + release.size() + " newContainers="
+ response.getAllocatedContainers().size() + " finishedContainers="
+ response.getCompletedContainersStatuses().size()
+ " resourcelimit=" + availableResources + " knownNMs="
+ clusterNmCount);
}
ask.clear();
release.clear();
@ -172,6 +173,9 @@ public abstract class RMContainerRequestor extends RMCommunicator {
// knownNodeCount is based on node managers, not hosts. blacklisting is
// currently based on hosts.
protected void computeIgnoreBlacklisting() {
if (!nodeBlacklistingEnabled) {
return;
}
if (blacklistDisablePercent != -1
&& (blacklistedNodeCount != blacklistedNodes.size() ||
clusterNmCount != lastClusterNmCount)) {
@ -200,7 +204,9 @@ public abstract class RMContainerRequestor extends RMCommunicator {
return;
}
if (blacklistedNodes.contains(hostName)) {
LOG.info("Host " + hostName + " is already blacklisted.");
if (LOG.isDebugEnabled()) {
LOG.debug("Host " + hostName + " is already blacklisted.");
}
return; //already blacklisted
}
Integer failures = nodeFailures.remove(hostName);
@ -293,7 +299,9 @@ public abstract class RMContainerRequestor extends RMCommunicator {
if (remoteRequests == null) {
remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
this.remoteRequestsTable.put(priority, remoteRequests);
LOG.info("Added priority=" + priority);
if (LOG.isDebugEnabled()) {
LOG.debug("Added priority=" + priority);
}
}
Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
if (reqMap == null) {
@ -313,10 +321,12 @@ public abstract class RMContainerRequestor extends RMCommunicator {
// Note this down for next interaction with ResourceManager
ask.add(remoteRequest);
LOG.info("addResourceRequest:" + " applicationId=" + applicationId.getId()
+ " priority=" + priority.getPriority() + " resourceName=" + resourceName
+ " numContainers=" + remoteRequest.getNumContainers() + " #asks="
+ ask.size());
if (LOG.isDebugEnabled()) {
LOG.debug("addResourceRequest:" + " applicationId="
+ applicationId.getId() + " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers="
+ remoteRequest.getNumContainers() + " #asks=" + ask.size());
}
}
private void decResourceRequest(Priority priority, String resourceName,
@ -328,16 +338,20 @@ public abstract class RMContainerRequestor extends RMCommunicator {
// as we modify the resource requests by filtering out blacklisted hosts
// when they are added, this value may be null when being
// decremented
LOG.debug("Not decrementing resource as " + resourceName
+ " is not present in request table");
if (LOG.isDebugEnabled()) {
LOG.debug("Not decrementing resource as " + resourceName
+ " is not present in request table");
}
return;
}
ResourceRequest remoteRequest = reqMap.get(capability);
LOG.info("BEFORE decResourceRequest:" + " applicationId=" + applicationId.getId()
+ " priority=" + priority.getPriority() + " resourceName=" + resourceName
+ " numContainers=" + remoteRequest.getNumContainers() + " #asks="
+ ask.size());
if (LOG.isDebugEnabled()) {
LOG.debug("BEFORE decResourceRequest:" + " applicationId="
+ applicationId.getId() + " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers="
+ remoteRequest.getNumContainers() + " #asks=" + ask.size());
}
remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1);
if (remoteRequest.getNumContainers() == 0) {
@ -355,10 +369,12 @@ public abstract class RMContainerRequestor extends RMCommunicator {
//already have it.
}
LOG.info("AFTER decResourceRequest:" + " applicationId="
+ applicationId.getId() + " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers="
+ remoteRequest.getNumContainers() + " #asks=" + ask.size());
if (LOG.isDebugEnabled()) {
LOG.info("AFTER decResourceRequest:" + " applicationId="
+ applicationId.getId() + " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers="
+ remoteRequest.getNumContainers() + " #asks=" + ask.size());
}
}
protected void release(ContainerId containerId) {

View File

@ -436,7 +436,7 @@ public class MRApp extends MRAppMaster {
return new ClientService(){
@Override
public InetSocketAddress getBindAddress() {
return null;
return NetUtils.createSocketAddr("localhost:9876");
}
@Override

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.v2.app;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@ -29,16 +31,30 @@ import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.Test;
public class MRAppBenchmark {
@ -167,17 +183,89 @@ public class MRAppBenchmark {
}
}
@Test
public void benchmark1() throws Exception {
int maps = 100000;
int reduces = 100;
int maps = 100; // Adjust for benchmarking. Start with thousands.
int reduces = 0;
System.out.println("Running benchmark with maps:"+maps +
" reduces:"+reduces);
run(new MRApp(maps, reduces, true, this.getClass().getName(), true));
run(new MRApp(maps, reduces, true, this.getClass().getName(), true) {
@Override
protected ContainerAllocator createContainerAllocator(
ClientService clientService, AppContext context) {
return new RMContainerAllocator(clientService, context) {
@Override
protected AMRMProtocol createSchedulerProxy() {
return new AMRMProtocol() {
@Override
public RegisterApplicationMasterResponse
registerApplicationMaster(
RegisterApplicationMasterRequest request)
throws YarnRemoteException {
RegisterApplicationMasterResponse response =
Records.newRecord(RegisterApplicationMasterResponse.class);
response.setMinimumResourceCapability(BuilderUtils
.newResource(1024));
response.setMaximumResourceCapability(BuilderUtils
.newResource(10240));
return response;
}
@Override
public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request)
throws YarnRemoteException {
FinishApplicationMasterResponse response =
Records.newRecord(FinishApplicationMasterResponse.class);
return response;
}
@Override
public AllocateResponse allocate(AllocateRequest request)
throws YarnRemoteException {
AllocateResponse response =
Records.newRecord(AllocateResponse.class);
List<ResourceRequest> askList = request.getAskList();
List<Container> containers = new ArrayList<Container>();
for (ResourceRequest req : askList) {
if (req.getHostName() != "*") {
continue;
}
int numContainers = req.getNumContainers();
for (int i = 0; i < numContainers; i++) {
ContainerId containerId =
BuilderUtils.newContainerId(
request.getApplicationAttemptId(),
request.getResponseId() + i);
containers.add(BuilderUtils
.newContainer(containerId, BuilderUtils.newNodeId("host"
+ containerId.getId(), 2345),
"host" + containerId.getId() + ":5678", req
.getCapability(), req.getPriority(), null));
}
}
AMResponse amResponse = Records.newRecord(AMResponse.class);
amResponse.setAllocatedContainers(containers);
amResponse.setResponseId(request.getResponseId() + 1);
response.setAMResponse(amResponse);
response.setNumClusterNodes(350);
return response;
}
};
}
};
}
});
}
@Test
public void benchmark2() throws Exception {
int maps = 4000;
int reduces = 1000;
int maps = 100; // Adjust for benchmarking, start with a couple of thousands
int reduces = 50;
int maxConcurrentRunningTasks = 500;
System.out.println("Running benchmark with throttled running tasks with " +

View File

@ -777,7 +777,7 @@ public class TestAMWebServicesJobs extends JerseyTest {
assertTrue("name not set", (name != null && !name.isEmpty()));
JSONArray counters = counterGroup.getJSONArray("counter");
for (int j = 0; j < counters.length(); j++) {
JSONObject counter = counters.getJSONObject(i);
JSONObject counter = counters.getJSONObject(j);
String counterName = counter.getString("name");
assertTrue("counter name not set",
(counterName != null && !counterName.isEmpty()));

View File

@ -21,8 +21,14 @@ package org.apache.hadoop.mapred;
import static org.apache.hadoop.mapreduce.util.CountersStrings.parseEscapedCompactString;
import static org.apache.hadoop.mapreduce.util.CountersStrings.toEscapedCompactString;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.ParseException;
import java.util.Collection;
import java.util.Iterator;
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -36,6 +42,9 @@ import org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup;
import org.apache.hadoop.mapreduce.counters.GenericCounter;
import org.apache.hadoop.mapreduce.counters.Limits;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
import org.apache.hadoop.mapreduce.util.CountersStrings;
import com.google.common.collect.Iterators;
/**
* A set of named counters.
@ -51,7 +60,9 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
@InterfaceStability.Stable
public class Counters
extends AbstractCounters<Counters.Counter, Counters.Group> {
public static int MAX_COUNTER_LIMIT = Limits.COUNTERS_MAX;
public Counters() {
super(groupFactory);
}
@ -69,17 +80,82 @@ public class Counters
return new Counters(newCounters);
}
public synchronized Group getGroup(String groupName) {
return super.getGroup(groupName);
}
@SuppressWarnings("unchecked")
public synchronized Collection<String> getGroupNames() {
return IteratorUtils.toList(super.getGroupNames().iterator());
}
public synchronized String makeCompactString() {
return CountersStrings.toEscapedCompactString(this);
}
/**
* A counter record, comprising its name and value.
*/
public interface Counter extends org.apache.hadoop.mapreduce.Counter {
public static class Counter implements org.apache.hadoop.mapreduce.Counter {
org.apache.hadoop.mapreduce.Counter realCounter;
Counter(org.apache.hadoop.mapreduce.Counter counter) {
this.realCounter = counter;
}
public Counter() {
this(new GenericCounter());
}
@SuppressWarnings("deprecation")
@Override
public void setDisplayName(String displayName) {
realCounter.setDisplayName(displayName);
}
@Override
public String getName() {
return realCounter.getName();
}
@Override
public String getDisplayName() {
return realCounter.getDisplayName();
}
@Override
public long getValue() {
return realCounter.getValue();
}
@Override
public void setValue(long value) {
realCounter.setValue(value);
}
@Override
public void increment(long incr) {
realCounter.increment(incr);
}
@Override
public void write(DataOutput out) throws IOException {
realCounter.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
realCounter.readFields(in);
}
/**
* Returns the compact stringified version of the counter in the format
* [(actual-name)(display-name)(value)]
* @return the stringified result
*/
String makeEscapedCompactString();
public String makeEscapedCompactString() {
return toEscapedCompactString(realCounter);
}
/**
* Checks for (content) equality of two (basic) counters
@ -88,39 +164,42 @@ public class Counters
* @deprecated
*/
@Deprecated
boolean contentEquals(Counter counter);
public boolean contentEquals(Counter counter) {
return realCounter.equals(counter.getUnderlyingCounter());
}
/**
* @return the value of the counter
*/
long getCounter();
}
static class OldCounterImpl extends GenericCounter implements Counter {
OldCounterImpl() {
}
OldCounterImpl(String name, String displayName, long value) {
super(name, displayName, value);
}
@Override
public synchronized String makeEscapedCompactString() {
return toEscapedCompactString(this);
}
@Override @Deprecated
public boolean contentEquals(Counter counter) {
return equals(counter);
}
@Override
public long getCounter() {
return getValue();
return realCounter.getValue();
}
@Override
public org.apache.hadoop.mapreduce.Counter getUnderlyingCounter() {
return realCounter;
}
@Override
public synchronized boolean equals(Object genericRight) {
if (genericRight instanceof Counter) {
synchronized (genericRight) {
Counter right = (Counter) genericRight;
return getName().equals(right.getName()) &&
getDisplayName().equals(right.getDisplayName()) &&
getValue() == right.getValue();
}
}
return false;
}
@Override
public int hashCode() {
return realCounter.hashCode();
}
}
/**
* <code>Group</code> of counters, comprising of counters from a particular
* counter {@link Enum} class.
@ -128,21 +207,38 @@ public class Counters
* <p><code>Group</code>handles localization of the class name and the
* counter names.</p>
*/
public static interface Group extends CounterGroupBase<Counter> {
public static class Group implements CounterGroupBase<Counter> {
private CounterGroupBase<Counter> realGroup;
Group(GenericGroup group) {
this.realGroup = group;
}
Group(FSGroupImpl group) {
this.realGroup = group;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
Group(FrameworkGroupImpl group) {
this.realGroup = group;
}
/**
* @param counterName the name of the counter
* @return the value of the specified counter, or 0 if the counter does
* not exist.
*/
long getCounter(String counterName);
public long getCounter(String counterName) {
return getCounterValue(realGroup, counterName);
}
/**
* @return the compact stringified version of the group in the format
* {(actual-name)(display-name)(value)[][][]} where [] are compact strings
* for the counters within.
*/
String makeEscapedCompactString();
public String makeEscapedCompactString() {
return toEscapedCompactString(realGroup);
}
/**
* Get the counter for the given id and create it if it doesn't exist.
@ -152,172 +248,184 @@ public class Counters
* @deprecated use {@link #findCounter(String)} instead
*/
@Deprecated
Counter getCounter(int id, String name);
public Counter getCounter(int id, String name) {
return findCounter(name);
}
/**
* Get the counter for the given name and create it if it doesn't exist.
* @param name the internal counter name
* @return the counter
*/
Counter getCounterForName(String name);
public Counter getCounterForName(String name) {
return findCounter(name);
}
@Override
public void write(DataOutput out) throws IOException {
realGroup.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
realGroup.readFields(in);
}
@Override
public Iterator<Counter> iterator() {
return realGroup.iterator();
}
@Override
public String getName() {
return realGroup.getName();
}
@Override
public String getDisplayName() {
return realGroup.getDisplayName();
}
@Override
public void setDisplayName(String displayName) {
realGroup.setDisplayName(displayName);
}
@Override
public void addCounter(Counter counter) {
realGroup.addCounter(counter);
}
@Override
public Counter addCounter(String name, String displayName, long value) {
return realGroup.addCounter(name, displayName, value);
}
@Override
public Counter findCounter(String counterName, String displayName) {
return realGroup.findCounter(counterName, displayName);
}
@Override
public Counter findCounter(String counterName, boolean create) {
return realGroup.findCounter(counterName, create);
}
@Override
public Counter findCounter(String counterName) {
return realGroup.findCounter(counterName);
}
@Override
public int size() {
return realGroup.size();
}
@Override
public void incrAllCounters(CounterGroupBase<Counter> rightGroup) {
realGroup.incrAllCounters(rightGroup);
}
@Override
public CounterGroupBase<Counter> getUnderlyingGroup() {
return realGroup;
}
@Override
public synchronized boolean equals(Object genericRight) {
if (genericRight instanceof CounterGroupBase<?>) {
@SuppressWarnings("unchecked")
CounterGroupBase<Counter> right = ((CounterGroupBase<Counter>)
genericRight).getUnderlyingGroup();
return Iterators.elementsEqual(iterator(), right.iterator());
}
return false;
}
@Override
public int hashCode() {
return realGroup.hashCode();
}
}
// All the group impls need this for legacy group interface
static long getCounterValue(Group group, String counterName) {
static long getCounterValue(CounterGroupBase<Counter> group, String counterName) {
Counter counter = group.findCounter(counterName, false);
if (counter != null) return counter.getValue();
return 0L;
}
// Mix the generic group implementation into the Group interface
private static class GenericGroup extends AbstractCounterGroup<Counter>
implements Group {
private static class GenericGroup extends AbstractCounterGroup<Counter> {
GenericGroup(String name, String displayName, Limits limits) {
super(name, displayName, limits);
}
@Override
public long getCounter(String counterName) {
return getCounterValue(this, counterName);
}
@Override
public String makeEscapedCompactString() {
return toEscapedCompactString(this);
}
@Override
public Counter getCounter(int id, String name) {
return findCounter(name);
}
@Override
public Counter getCounterForName(String name) {
return findCounter(name);
}
@Override
protected Counter newCounter(String counterName, String displayName,
long value) {
return new OldCounterImpl(counterName, displayName, value);
return new Counter(new GenericCounter(counterName, displayName, value));
}
@Override
protected Counter newCounter() {
return new OldCounterImpl();
return new Counter();
}
@Override
public CounterGroupBase<Counter> getUnderlyingGroup() {
return this;
}
}
// Mix the framework group implementation into the Group interface
private static class FrameworkGroupImpl<T extends Enum<T>>
extends FrameworkCounterGroup<T, Counter> implements Group {
// Mix the framework counter implmementation into the Counter interface
class FrameworkCounterImpl extends FrameworkCounter implements Counter {
extends FrameworkCounterGroup<T, Counter> {
// Mix the framework counter implementation into the Counter interface
class FrameworkCounterImpl extends FrameworkCounter {
FrameworkCounterImpl(T key) {
super(key);
}
@Override
public String makeEscapedCompactString() {
return toEscapedCompactString(this);
}
@Override
public boolean contentEquals(Counter counter) {
return equals(counter);
}
@Override
public long getCounter() {
return getValue();
}
}
FrameworkGroupImpl(Class<T> cls) {
super(cls);
}
@Override
public long getCounter(String counterName) {
return getCounterValue(this, counterName);
}
@Override
public String makeEscapedCompactString() {
return toEscapedCompactString(this);
}
@Override @Deprecated
public Counter getCounter(int id, String name) {
return findCounter(name);
}
@Override
public Counter getCounterForName(String name) {
return findCounter(name);
}
@Override
protected Counter newCounter(T key) {
return new FrameworkCounterImpl(key);
return new Counter(new FrameworkCounterImpl(key));
}
@Override
public CounterGroupBase<Counter> getUnderlyingGroup() {
return this;
}
}
// Mix the file system counter group implementation into the Group interface
private static class FSGroupImpl extends FileSystemCounterGroup<Counter>
implements Group {
private static class FSGroupImpl extends FileSystemCounterGroup<Counter> {
private class FSCounterImpl extends FSCounter implements Counter {
private class FSCounterImpl extends FSCounter {
FSCounterImpl(String scheme, FileSystemCounter key) {
super(scheme, key);
}
@Override
public String makeEscapedCompactString() {
return toEscapedCompactString(this);
}
@Override @Deprecated
public boolean contentEquals(Counter counter) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public long getCounter() {
return getValue();
}
}
@Override
protected Counter newCounter(String scheme, FileSystemCounter key) {
return new FSCounterImpl(scheme, key);
return new Counter(new FSCounterImpl(scheme, key));
}
@Override
public long getCounter(String counterName) {
return getCounterValue(this, counterName);
public CounterGroupBase<Counter> getUnderlyingGroup() {
return this;
}
@Override
public String makeEscapedCompactString() {
return toEscapedCompactString(this);
}
@Override @Deprecated
public Counter getCounter(int id, String name) {
return findCounter(name);
}
@Override
public Counter getCounterForName(String name) {
return findCounter(name);
}
}
public synchronized Counter findCounter(String group, String name) {
@ -342,7 +450,7 @@ public class Counters
FrameworkGroupFactory<Group> newFrameworkGroupFactory(final Class<T> cls) {
return new FrameworkGroupFactory<Group>() {
@Override public Group newGroup(String name) {
return new FrameworkGroupImpl<T>(cls); // impl in this package
return new Group(new FrameworkGroupImpl<T>(cls)); // impl in this package
}
};
}
@ -350,12 +458,12 @@ public class Counters
@Override
protected Group newGenericGroup(String name, String displayName,
Limits limits) {
return new GenericGroup(name, displayName, limits);
return new Group(new GenericGroup(name, displayName, limits));
}
@Override
protected Group newFileSystemGroup() {
return new FSGroupImpl();
return new Group(new FSGroupImpl());
}
}

View File

@ -85,18 +85,21 @@ public class FileOutputCommitter extends OutputCommitter {
*/
@Private
Path getJobAttemptPath(JobContext context) {
return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
.getJobAttemptPath(context, getOutputPath(context));
Path out = getOutputPath(context);
return out == null ? null :
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
.getJobAttemptPath(context, out);
}
@Private
Path getTaskAttemptPath(TaskAttemptContext context) throws IOException {
return getTaskAttemptPath(context, getOutputPath(context));
Path out = getOutputPath(context);
return out == null ? null : getTaskAttemptPath(context, out);
}
private Path getTaskAttemptPath(TaskAttemptContext context, Path out) throws IOException {
Path workPath = FileOutputFormat.getWorkOutputPath(context.getJobConf());
if(workPath == null) {
if(workPath == null && out != null) {
return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
.getTaskAttemptPath(context, out);
}
@ -110,14 +113,17 @@ public class FileOutputCommitter extends OutputCommitter {
* @return the path where the output of a committed task is stored until
* the entire job is committed.
*/
@Private
Path getCommittedTaskPath(TaskAttemptContext context) {
return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
.getCommittedTaskPath(context, getOutputPath(context));
Path out = getOutputPath(context);
return out == null ? null :
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
.getCommittedTaskPath(context, out);
}
public Path getWorkPath(TaskAttemptContext context, Path outputPath)
throws IOException {
return getTaskAttemptPath(context, outputPath);
return outputPath == null ? null : getTaskAttemptPath(context, outputPath);
}
@Override
@ -156,6 +162,7 @@ public class FileOutputCommitter extends OutputCommitter {
getWrapped(context).abortJob(context, state);
}
@Override
public void setupTask(TaskAttemptContext context) throws IOException {
getWrapped(context).setupTask(context);
}

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.JobCounter;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class JobInProgress {
/**
* @deprecated Provided for compatibility. Use {@link JobCounter} instead.
*/
@Deprecated
public static enum Counter {
NUM_FAILED_MAPS,
NUM_FAILED_REDUCES,
TOTAL_LAUNCHED_MAPS,
TOTAL_LAUNCHED_REDUCES,
OTHER_LOCAL_MAPS,
DATA_LOCAL_MAPS,
RACK_LOCAL_MAPS,
SLOTS_MILLIS_MAPS,
SLOTS_MILLIS_REDUCES,
FALLOW_SLOTS_MILLIS_MAPS,
FALLOW_SLOTS_MILLIS_REDUCES
}
}

View File

@ -80,6 +80,33 @@ abstract public class Task implements Writable, Configurable {
public static String MERGED_OUTPUT_PREFIX = ".merged";
public static final long DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS = 10000;
/**
* @deprecated Provided for compatibility. Use {@link TaskCounter} instead.
*/
@Deprecated
public static enum Counter {
MAP_INPUT_RECORDS,
MAP_OUTPUT_RECORDS,
MAP_SKIPPED_RECORDS,
MAP_INPUT_BYTES,
MAP_OUTPUT_BYTES,
MAP_OUTPUT_MATERIALIZED_BYTES,
COMBINE_INPUT_RECORDS,
COMBINE_OUTPUT_RECORDS,
REDUCE_INPUT_GROUPS,
REDUCE_SHUFFLE_BYTES,
REDUCE_INPUT_RECORDS,
REDUCE_OUTPUT_RECORDS,
REDUCE_SKIPPED_GROUPS,
REDUCE_SKIPPED_RECORDS,
SPILLED_RECORDS,
SPLIT_RAW_BYTES,
CPU_MILLISECONDS,
PHYSICAL_MEMORY_BYTES,
VIRTUAL_MEMORY_BYTES,
COMMITTED_HEAP_BYTES
}
/**
* Counters to measure the usage of the different file systems.
@ -656,14 +683,13 @@ abstract public class Task implements Writable, Configurable {
try {
boolean taskFound = true; // whether TT knows about this task
// sleep for a bit
try {
Thread.sleep(PROGRESS_INTERVAL);
}
catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug(getTaskID() + " Progress/ping thread exiting " +
"since it got interrupted");
synchronized(lock) {
if (taskDone.get()) {
break;
}
lock.wait(PROGRESS_INTERVAL);
}
if (taskDone.get()) {
break;
}
@ -721,7 +747,14 @@ abstract public class Task implements Writable, Configurable {
}
public void stopCommunicationThread() throws InterruptedException {
if (pingThread != null) {
synchronized (lock) {
// Intent of the lock is to not send an interupt in the middle of an
// umbilical.ping or umbilical.statusUpdate
synchronized(lock) {
//Interrupt if sleeping. Otherwise wait for the RPC call to return.
lock.notify();
}
synchronized (lock) {
while (!done) {
lock.wait();
}
@ -820,7 +853,8 @@ abstract public class Task implements Writable, Configurable {
return; // nothing to do.
}
Counter gcCounter = counters.findCounter(TaskCounter.GC_TIME_MILLIS);
org.apache.hadoop.mapred.Counters.Counter gcCounter =
counters.findCounter(TaskCounter.GC_TIME_MILLIS);
if (null != gcCounter) {
gcCounter.increment(getElapsedGc());
}

View File

@ -72,4 +72,10 @@ public interface Counter extends Writable {
* @param incr the value to increase this counter by
*/
void increment(long incr);
/**
* Return the underlying object if this is a facade.
* @return the undelying object.
*/
Counter getUnderlyingCounter();
}

View File

@ -52,6 +52,11 @@ public class Counters extends AbstractCounters<Counter, CounterGroup> {
protected FrameworkCounter newCounter(T key) {
return new FrameworkCounter(key);
}
@Override
public CounterGroupBase<Counter> getUnderlyingGroup() {
return this;
}
}
// Mix generic group implementation into CounterGroup interface
@ -72,6 +77,11 @@ public class Counters extends AbstractCounters<Counter, CounterGroup> {
protected Counter newCounter() {
return new GenericCounter();
}
@Override
public CounterGroupBase<Counter> getUnderlyingGroup() {
return this;
}
}
// Mix file system group implementation into the CounterGroup interface
@ -82,6 +92,11 @@ public class Counters extends AbstractCounters<Counter, CounterGroup> {
protected Counter newCounter(String scheme, FileSystemCounter key) {
return new FSCounter(scheme, key);
}
@Override
public CounterGroupBase<Counter> getUnderlyingGroup() {
return this;
}
}
/**

View File

@ -472,8 +472,8 @@ public class Job extends JobContextImpl implements JobContext {
sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
sb.append("\n");
sb.append("Uber job : ").append(status.isUber()).append("\n");
sb.append("Number of maps: ").append(numMaps);
sb.append("Number of reduces: ").append(numReduces);
sb.append("Number of maps: ").append(numMaps).append("\n");
sb.append("Number of reduces: ").append(numReduces).append("\n");
sb.append("map() completion: ");
sb.append(status.getMapProgress()).append("\n");
sb.append("reduce() completion: ");

View File

@ -320,6 +320,13 @@ public interface MRJobConfig {
public static final String MR_AM_PREFIX = MR_PREFIX + "am.";
/** The number of client retires to the AM - before reconnecting to the RM
* to fetch Application State.
*/
public static final String MR_CLIENT_TO_AM_IPC_MAX_RETRIES =
MR_PREFIX + "client-am.ipc.max-retries";
public static final int DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES = 3;
/** The staging directory for map reduce.*/
public static final String MR_AM_STAGING_DIR =
MR_AM_PREFIX+"staging-dir";

View File

@ -172,7 +172,8 @@ public abstract class AbstractCounters<C extends Counter,
@InterfaceAudience.Private
public synchronized C findCounter(String scheme, FileSystemCounter key) {
return ((FileSystemCounterGroup<C>) getGroup(
FileSystemCounter.class.getName())).findCounter(scheme, key);
FileSystemCounter.class.getName()).getUnderlyingGroup()).
findCounter(scheme, key);
}
/**
@ -243,11 +244,11 @@ public abstract class AbstractCounters<C extends Counter,
WritableUtils.writeVInt(out, groupFactory.version());
WritableUtils.writeVInt(out, fgroups.size()); // framework groups first
for (G group : fgroups.values()) {
if (group instanceof FrameworkCounterGroup<?, ?>) {
if (group.getUnderlyingGroup() instanceof FrameworkCounterGroup<?, ?>) {
WritableUtils.writeVInt(out, GroupType.FRAMEWORK.ordinal());
WritableUtils.writeVInt(out, getFrameworkGroupId(group.getName()));
group.write(out);
} else if (group instanceof FileSystemCounterGroup<?>) {
} else if (group.getUnderlyingGroup() instanceof FileSystemCounterGroup<?>) {
WritableUtils.writeVInt(out, GroupType.FILESYSTEM.ordinal());
group.write(out);
}

View File

@ -98,4 +98,10 @@ public interface CounterGroupBase<T extends Counter>
* @param rightGroup the group to be added to this group
*/
void incrAllCounters(CounterGroupBase<T> rightGroup);
/**
* Exposes the underlying group type if a facade.
* @return the underlying object that this object is wrapping up.
*/
CounterGroupBase<T> getUnderlyingGroup();
}

View File

@ -110,6 +110,11 @@ public abstract class FileSystemCounterGroup<C extends Counter>
public void readFields(DataInput in) throws IOException {
assert false : "shouldn't be called";
}
@Override
public Counter getUnderlyingCounter() {
return this;
}
}
@Override
@ -231,10 +236,10 @@ public abstract class FileSystemCounterGroup<C extends Counter>
@Override
@SuppressWarnings("unchecked")
public void incrAllCounters(CounterGroupBase<C> other) {
if (checkNotNull(other, "other group")
if (checkNotNull(other.getUnderlyingGroup(), "other group")
instanceof FileSystemCounterGroup<?>) {
for (Counter counter : other) {
FSCounter c = (FSCounter) counter;
FSCounter c = (FSCounter) ((Counter)counter).getUnderlyingCounter();
findCounter(c.scheme, c.key) .increment(counter.getValue());
}
}
@ -253,7 +258,7 @@ public abstract class FileSystemCounterGroup<C extends Counter>
for (Object counter : entry.getValue()) {
if (counter == null) continue;
@SuppressWarnings("unchecked")
FSCounter c = (FSCounter) counter;
FSCounter c = (FSCounter) ((Counter)counter).getUnderlyingCounter();
WritableUtils.writeVInt(out, c.key.ordinal()); // key
WritableUtils.writeVLong(out, c.getValue()); // value
}

View File

@ -18,21 +18,24 @@
package org.apache.hadoop.mapreduce.counters;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import static com.google.common.base.Preconditions.*;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterators;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.util.ResourceBundles;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterators;
/**
* An abstract class to provide common implementation for the framework
* counter group in both mapred and mapreduce packages.
@ -43,7 +46,8 @@ import org.apache.hadoop.mapreduce.util.ResourceBundles;
@InterfaceAudience.Private
public abstract class FrameworkCounterGroup<T extends Enum<T>,
C extends Counter> implements CounterGroupBase<C> {
private static final Log LOG = LogFactory.getLog(FrameworkCounterGroup.class);
private final Class<T> enumClass; // for Enum.valueOf
private final Object[] counters; // local casts are OK and save a class ref
private String displayName = null;
@ -95,6 +99,11 @@ public abstract class FrameworkCounterGroup<T extends Enum<T>,
public void readFields(DataInput in) throws IOException {
assert false : "shouldn't be called";
}
@Override
public Counter getUnderlyingCounter() {
return this;
}
}
@SuppressWarnings("unchecked")

View File

@ -25,6 +25,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Counter;
/**
* A generic counter implementation
@ -101,4 +102,9 @@ public class GenericCounter extends AbstractCounter {
public synchronized void increment(long incr) {
value += incr;
}
@Override
public Counter getUnderlyingCounter() {
return this;
}
}

View File

@ -495,36 +495,40 @@ public class FileOutputCommitter extends OutputCommitter {
@Override
public void recoverTask(TaskAttemptContext context)
throws IOException {
context.progress();
TaskAttemptID attemptId = context.getTaskAttemptID();
int previousAttempt = getAppAttemptId(context) - 1;
if (previousAttempt < 0) {
throw new IOException ("Cannot recover task output for first attempt...");
}
Path committedTaskPath = getCommittedTaskPath(context);
Path previousCommittedTaskPath = getCommittedTaskPath(
previousAttempt, context);
FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration());
LOG.debug("Trying to recover task from " + previousCommittedTaskPath
+ " into " + committedTaskPath);
if (fs.exists(previousCommittedTaskPath)) {
if(fs.exists(committedTaskPath)) {
if(!fs.delete(committedTaskPath, true)) {
throw new IOException("Could not delete "+committedTaskPath);
if(hasOutputPath()) {
context.progress();
TaskAttemptID attemptId = context.getTaskAttemptID();
int previousAttempt = getAppAttemptId(context) - 1;
if (previousAttempt < 0) {
throw new IOException ("Cannot recover task output for first attempt...");
}
Path committedTaskPath = getCommittedTaskPath(context);
Path previousCommittedTaskPath = getCommittedTaskPath(
previousAttempt, context);
FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration());
LOG.debug("Trying to recover task from " + previousCommittedTaskPath
+ " into " + committedTaskPath);
if (fs.exists(previousCommittedTaskPath)) {
if(fs.exists(committedTaskPath)) {
if(!fs.delete(committedTaskPath, true)) {
throw new IOException("Could not delete "+committedTaskPath);
}
}
//Rename can fail if the parent directory does not yet exist.
Path committedParent = committedTaskPath.getParent();
fs.mkdirs(committedParent);
if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
throw new IOException("Could not rename " + previousCommittedTaskPath +
" to " + committedTaskPath);
}
LOG.info("Saved output of " + attemptId + " to " + committedTaskPath);
} else {
LOG.warn(attemptId+" had no output to recover.");
}
//Rename can fail if the parent directory does not yet exist.
Path committedParent = committedTaskPath.getParent();
fs.mkdirs(committedParent);
if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
throw new IOException("Could not rename " + previousCommittedTaskPath +
" to " + committedTaskPath);
}
LOG.info("Saved output of " + attemptId + " to " + committedTaskPath);
} else {
LOG.warn(attemptId+" had no output to recover.");
LOG.warn("Output Path is null in recoverTask()");
}
}
}

View File

@ -1255,4 +1255,11 @@
heartbeats to the ResourceManager</description>
</property>
<property>
<name>yarn.app.mapreduce.client-am.ipc.max-retries</name>
<value>1</value>
<description>The number of client retries to the AM - before reconnecting
to the RM to fetch Application Status.</description>
</property>
</configuration>

View File

@ -24,6 +24,8 @@ import java.text.ParseException;
import java.util.Iterator;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.Counters.Group;
import org.apache.hadoop.mapreduce.FileSystemCounter;
@ -37,6 +39,7 @@ import org.junit.Test;
public class TestCounters {
enum myCounters {TEST1, TEST2};
private static final long MAX_VALUE = 10;
private static final Log LOG = LogFactory.getLog(TestCounters.class);
// Generates enum based counters
private Counters getEnumCounters(Enum[] keys) {
@ -132,23 +135,43 @@ public class TestCounters {
@SuppressWarnings("deprecation")
@Test
public void testLegacyNames() {
public void testReadWithLegacyNames() {
Counters counters = new Counters();
counters.incrCounter(TaskCounter.MAP_INPUT_RECORDS, 1);
counters.incrCounter(JobCounter.DATA_LOCAL_MAPS, 1);
counters.findCounter("file", FileSystemCounter.BYTES_READ).increment(1);
checkLegacyNames(counters);
}
@SuppressWarnings("deprecation")
@Test
public void testWriteWithLegacyNames() {
Counters counters = new Counters();
counters.incrCounter(Task.Counter.MAP_INPUT_RECORDS, 1);
counters.incrCounter(JobInProgress.Counter.DATA_LOCAL_MAPS, 1);
counters.findCounter("FileSystemCounter", "FILE_BYTES_READ").increment(1);
checkLegacyNames(counters);
}
@SuppressWarnings("deprecation")
private void checkLegacyNames(Counters counters) {
assertEquals("New name", 1, counters.findCounter(
TaskCounter.class.getName(), "MAP_INPUT_RECORDS").getValue());
assertEquals("Legacy name", 1, counters.findCounter(
"org.apache.hadoop.mapred.Task$Counter",
"MAP_INPUT_RECORDS").getValue());
assertEquals("Legacy enum", 1,
counters.findCounter(Task.Counter.MAP_INPUT_RECORDS).getValue());
assertEquals("New name", 1, counters.findCounter(
JobCounter.class.getName(), "DATA_LOCAL_MAPS").getValue());
assertEquals("Legacy name", 1, counters.findCounter(
"org.apache.hadoop.mapred.JobInProgress$Counter",
"DATA_LOCAL_MAPS").getValue());
assertEquals("Legacy enum", 1,
counters.findCounter(JobInProgress.Counter.DATA_LOCAL_MAPS).getValue());
assertEquals("New name", 1, counters.findCounter(
FileSystemCounter.class.getName(), "FILE_BYTES_READ").getValue());

View File

@ -104,7 +104,9 @@ public class TestFileOutputCommitter extends TestCase {
writeOutput(theRecordWriter, tContext);
// do commit
committer.commitTask(tContext);
if(committer.needsTaskCommit(tContext)) {
committer.commitTask(tContext);
}
Path jobTempDir1 = committer.getCommittedTaskPath(tContext);
File jtd1 = new File(jobTempDir1.toUri().getPath());
assertTrue(jtd1.exists());
@ -188,7 +190,9 @@ public class TestFileOutputCommitter extends TestCase {
writeOutput(theRecordWriter, tContext);
// do commit
committer.commitTask(tContext);
if(committer.needsTaskCommit(tContext)) {
committer.commitTask(tContext);
}
committer.commitJob(jContext);
// validate output
@ -214,7 +218,9 @@ public class TestFileOutputCommitter extends TestCase {
writeMapFileOutput(theRecordWriter, tContext);
// do commit
committer.commitTask(tContext);
if(committer.needsTaskCommit(tContext)) {
committer.commitTask(tContext);
}
committer.commitJob(jContext);
// validate output
@ -222,6 +228,28 @@ public class TestFileOutputCommitter extends TestCase {
FileUtil.fullyDelete(new File(outDir.toString()));
}
public void testMapOnlyNoOutput() throws Exception {
JobConf conf = new JobConf();
//This is not set on purpose. FileOutputFormat.setOutputPath(conf, outDir);
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter();
// setup
committer.setupJob(jContext);
committer.setupTask(tContext);
if(committer.needsTaskCommit(tContext)) {
// do commit
committer.commitTask(tContext);
}
committer.commitJob(jContext);
// validate output
FileUtil.fullyDelete(new File(outDir.toString()));
}
public void testAbort() throws IOException, InterruptedException {
JobConf conf = new JobConf();
FileOutputFormat.setOutputPath(conf, outDir);

View File

@ -21,7 +21,9 @@ package org.apache.hadoop.mapreduce;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@ -34,6 +36,7 @@ import java.io.StringReader;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.log4j.Layout;
@ -88,6 +91,7 @@ public class TestJobMonitorAndPrint extends TestCase {
}
).when(job).getTaskCompletionEvents(anyInt(), anyInt());
doReturn(new TaskReport[5]).when(job).getTaskReports(isA(TaskType.class));
when(clientProtocol.getJobStatus(any(JobID.class))).thenReturn(jobStatus_1, jobStatus_2);
// setup the logger to capture all logs
Layout layout =
@ -106,21 +110,25 @@ public class TestJobMonitorAndPrint extends TestCase {
boolean foundHundred = false;
boolean foundComplete = false;
boolean foundUber = false;
String match_1 = "uber mode : true";
String match_2 = "map 100% reduce 100%";
String match_3 = "completed successfully";
String uberModeMatch = "uber mode : true";
String progressMatch = "map 100% reduce 100%";
String completionMatch = "completed successfully";
while ((line = r.readLine()) != null) {
if (line.contains(match_1)) {
if (line.contains(uberModeMatch)) {
foundUber = true;
}
foundHundred = line.contains(match_2);
foundHundred = line.contains(progressMatch);
if (foundHundred)
break;
}
line = r.readLine();
foundComplete = line.contains(match_3);
foundComplete = line.contains(completionMatch);
assertTrue(foundUber);
assertTrue(foundHundred);
assertTrue(foundComplete);
System.out.println("The output of job.toString() is : \n" + job.toString());
assertTrue(job.toString().contains("Number of maps: 5\n"));
assertTrue(job.toString().contains("Number of reduces: 5\n"));
}
}

View File

@ -617,7 +617,7 @@ public class TestHsWebServicesJobs extends JerseyTest {
assertTrue("name not set", (name != null && !name.isEmpty()));
JSONArray counters = counterGroup.getJSONArray("counter");
for (int j = 0; j < counters.length(); j++) {
JSONObject counter = counters.getJSONObject(i);
JSONObject counter = counters.getJSONObject(j);
String counterName = counter.getString("name");
assertTrue("counter name not set",
(counterName != null && !counterName.isEmpty()));

View File

@ -101,7 +101,9 @@ public class ClientServiceDelegate {
this.conf = new Configuration(conf); // Cloning for modifying.
// For faster redirects from AM to HS.
this.conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 3);
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
this.conf.getInt(MRJobConfig.MR_CLIENT_TO_AM_IPC_MAX_RETRIES,
MRJobConfig.DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES));
this.rm = rm;
this.jobId = jobId;
this.historyServerProxy = historyServerProxy;

View File

@ -18,6 +18,10 @@
package org.apache.hadoop.mapred;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Formatter;
@ -25,11 +29,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.StringTokenizer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -40,12 +39,14 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* This is an wordcount application that tests the count of records

View File

@ -49,6 +49,9 @@ public class TestMiniMRClientCluster {
private static Path[] inFiles = new Path[5];
private static MiniMRClientCluster mrCluster;
private class InternalClass {
}
@BeforeClass
public static void setup() throws IOException {
final Configuration conf = new Configuration();
@ -73,7 +76,7 @@ public class TestMiniMRClientCluster {
// create the mini cluster to be used for the tests
mrCluster = MiniMRClientClusterFactory.create(
TestMiniMRClientCluster.class, 1, new Configuration());
InternalClass.class, 1, new Configuration());
}
@AfterClass

View File

@ -31,7 +31,5 @@ HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
"$bin"/yarn-daemon.sh --config $YARN_CONF_DIR start resourcemanager
# start nodeManager
"$bin"/yarn-daemons.sh --config $YARN_CONF_DIR start nodemanager
# start historyserver
#"$bin"/yarn-daemon.sh --config $YARN_CONF_DIR start historyserver
# start proxyserver
#"$bin"/yarn-daemon.sh --config $YARN_CONF_DIR start proxyserver

View File

@ -31,7 +31,5 @@ HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
"$bin"/yarn-daemon.sh --config $YARN_CONF_DIR stop resourcemanager
# stop nodeManager
"$bin"/yarn-daemons.sh --config $YARN_CONF_DIR stop nodemanager
# stop historyServer
"$bin"/yarn-daemon.sh --config $YARN_CONF_DIR stop historyserver
# stop proxy server
"$bin"/yarn-daemon.sh --config $YARN_CONF_DIR stop proxyserver

View File

@ -59,7 +59,6 @@ if [ $# = 0 ]; then
echo "where COMMAND is one of:"
echo " resourcemanager run the ResourceManager"
echo " nodemanager run a nodemanager on each slave"
echo " historyserver run job history servers as a standalone daemon"
echo " rmadmin admin tools"
echo " version print the version"
echo " jar <jar> run a jar file"
@ -154,8 +153,6 @@ if [ "$YARN_LOGFILE" = "" ]; then
YARN_LOGFILE='yarn.log'
fi
YARN_JOB_HISTORYSERVER_OPTS="-Dmapred.jobsummary.logger=${YARN_JHS_LOGGER:-INFO,console}"
# restore ordinary behaviour
unset IFS
@ -181,9 +178,6 @@ elif [ "$COMMAND" = "nodemanager" ] ; then
else
YARN_OPTS="$YARN_OPTS -server $YARN_NODEMANAGER_OPTS"
fi
elif [ "$COMMAND" = "historyserver" ] ; then
CLASS=org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer
YARN_OPTS="$YARN_OPTS $YARN_JOB_HISTORYSERVER_OPTS"
elif [ "$COMMAND" = "proxyserver" ] ; then
CLASS='org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer'
YARN_OPTS="$YARN_OPTS $YARN_PROXYSERVER_OPTS"

View File

@ -91,7 +91,6 @@ fi
# some variables
export YARN_LOGFILE=yarn-$YARN_IDENT_STRING-$command-$HOSTNAME.log
export YARN_ROOT_LOGGER=${YARN_ROOT_LOGGER:-INFO,DRFA}
export YARN_JHS_LOGGER=${YARN_JHS_LOGGER:-INFO,JSA}
log=$YARN_LOG_DIR/yarn-$YARN_IDENT_STRING-$command-$HOSTNAME.out
pid=$YARN_PID_DIR/yarn-$YARN_IDENT_STRING-$command.pid

View File

@ -161,6 +161,12 @@
</Or>
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<!-- Inconsistent sync warning - minimumAllocation is only initialized once and never changed -->
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler" />
<Field name="minimumAllocation" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<!-- Don't care if putIfAbsent value is ignored -->
<Match>

View File

@ -23,7 +23,7 @@ package org.apache.hadoop.yarn.api.records;
* allocation
*
*/
public interface Priority extends Comparable<Priority> {
public abstract class Priority implements Comparable<Priority> {
/**
* Get the assigned priority
@ -37,4 +37,31 @@ public interface Priority extends Comparable<Priority> {
*/
public abstract void setPriority(int priority);
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + getPriority();
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Priority other = (Priority) obj;
if (getPriority() != other.getPriority())
return false;
return true;
}
@Override
public int compareTo(Priority other) {
return this.getPriority() - other.getPriority();
}
}

View File

@ -48,7 +48,7 @@ import org.apache.hadoop.yarn.api.AMRMProtocol;
*/
@Public
@Stable
public interface ResourceRequest extends Comparable<ResourceRequest> {
public abstract class ResourceRequest implements Comparable<ResourceRequest> {
/**
* Get the <code>Priority</code> of the request.
* @return <code>Priority</code> of the request
@ -121,4 +121,79 @@ public interface ResourceRequest extends Comparable<ResourceRequest> {
@Public
@Stable
public abstract void setNumContainers(int numContainers);
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
Resource capability = getCapability();
String hostName = getHostName();
Priority priority = getPriority();
result =
prime * result + ((capability == null) ? 0 : capability.hashCode());
result = prime * result + ((hostName == null) ? 0 : hostName.hashCode());
result = prime * result + getNumContainers();
result = prime * result + ((priority == null) ? 0 : priority.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ResourceRequest other = (ResourceRequest) obj;
Resource capability = getCapability();
if (capability == null) {
if (other.getCapability() != null)
return false;
} else if (!capability.equals(other.getCapability()))
return false;
String hostName = getHostName();
if (hostName == null) {
if (other.getHostName() != null)
return false;
} else if (!hostName.equals(other.getHostName()))
return false;
if (getNumContainers() != other.getNumContainers())
return false;
Priority priority = getPriority();
if (priority == null) {
if (other.getPriority() != null)
return false;
} else if (!priority.equals(other.getPriority()))
return false;
return true;
}
@Override
public int compareTo(ResourceRequest other) {
int priorityComparison = this.getPriority().compareTo(other.getPriority());
if (priorityComparison == 0) {
int hostNameComparison =
this.getHostName().compareTo(other.getHostName());
if (hostNameComparison == 0) {
int capabilityComparison =
this.getCapability().compareTo(other.getCapability());
if (capabilityComparison == 0) {
int numContainersComparison =
this.getNumContainers() - other.getNumContainers();
if (numContainersComparison == 0) {
return 0;
} else {
return numContainersComparison;
}
} else {
return capabilityComparison;
}
} else {
return hostNameComparison;
}
} else {
return priorityComparison;
}
}
}

View File

@ -18,15 +18,11 @@
package org.apache.hadoop.yarn.api.records.impl.pb;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProtoOrBuilder;
public class PriorityPBImpl extends ProtoBase<PriorityProto> implements Priority {
public class PriorityPBImpl extends Priority {
PriorityProto proto = PriorityProto.getDefaultInstance();
PriorityProto.Builder builder = null;
boolean viaProto = false;
@ -66,11 +62,4 @@ public class PriorityPBImpl extends ProtoBase<PriorityProto> implements Priority
builder.setPriority((priority));
}
@Override
public int compareTo(Priority other) {
return this.getPriority() - other.getPriority();
}
}

View File

@ -20,19 +20,14 @@ package org.apache.hadoop.yarn.api.records.impl.pb;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProtoOrBuilder;
public class ResourceRequestPBImpl extends ProtoBase<ResourceRequestProto> implements ResourceRequest {
public class ResourceRequestPBImpl extends ResourceRequest {
ResourceRequestProto proto = ResourceRequestProto.getDefaultInstance();
ResourceRequestProto.Builder builder = null;
boolean viaProto = false;
@ -168,25 +163,4 @@ public class ResourceRequestPBImpl extends ProtoBase<ResourceRequestProto> imple
return ((ResourcePBImpl)t).getProto();
}
@Override
public int compareTo(ResourceRequest other) {
if (this.getPriority().compareTo(other.getPriority()) == 0) {
if (this.getHostName().equals(other.getHostName())) {
if (this.getCapability().equals(other.getCapability())) {
if (this.getNumContainers() == other.getNumContainers()) {
return 0;
} else {
return this.getNumContainers() - other.getNumContainers();
}
} else {
return this.getCapability().compareTo(other.getCapability());
}
} else {
return this.getHostName().compareTo(other.getHostName());
}
} else {
return this.getPriority().compareTo(other.getPriority());
}
}
}

View File

@ -104,6 +104,7 @@
<configuration>
<environmentVariables>
<JAVA_HOME>${java.home}</JAVA_HOME>
<MALLOC_ARENA_MAX>4</MALLOC_ARENA_MAX>
</environmentVariables>
</configuration>
</plugin>

View File

@ -531,6 +531,8 @@ public class Client {
// Set java executable command
LOG.info("Setting up app master command");
vargs.add("${JAVA_HOME}" + "/bin/java");
// Set Xmx based on am memory size
vargs.add("-Xmx" + amMemory + "m");
// Set class name
vargs.add(appMasterMainClass);
// Set params for Application Master

View File

@ -42,6 +42,7 @@ public class TestDistributedShell {
@BeforeClass
public static void setup() throws InterruptedException, IOException {
LOG.info("Starting up YARN cluster");
conf.setInt("yarn.scheduler.fifo.minimum-allocation-mb", 128);
if (yarnCluster == null) {
yarnCluster = new MiniYARNCluster(TestDistributedShell.class.getName(),
1, 1, 1);
@ -74,9 +75,9 @@ public class TestDistributedShell {
"--shell_command",
"ls",
"--master_memory",
"1536",
"512",
"--container_memory",
"1536"
"128"
};
LOG.info("Initializing DS Client");

View File

@ -70,6 +70,17 @@
<goal>run</goal>
</goals>
</execution>
<execution>
<phase>pre-site</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<tasks>
<copy file="src/main/resources/yarn-default.xml" todir="src/site/resources"/>
</tasks>
</configuration>
</execution>
</executions>
</plugin>

View File

@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.net.CachedDNSToSwitchMapping;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
@ -50,7 +51,14 @@ public class RackResolver {
try {
Constructor<? extends DNSToSwitchMapping> dnsToSwitchMappingConstructor
= dnsToSwitchMappingClass.getConstructor();
dnsToSwitchMapping = dnsToSwitchMappingConstructor.newInstance();
DNSToSwitchMapping newInstance =
dnsToSwitchMappingConstructor.newInstance();
// Wrap around the configured class with the Cached implementation so as
// to save on repetitive lookups.
// Check if the impl is already caching, to avoid double caching.
dnsToSwitchMapping =
((newInstance instanceof CachedDNSToSwitchMapping) ? newInstance
: new CachedDNSToSwitchMapping(newInstance));
} catch (Exception e) {
throw new RuntimeException(e);
}

View File

@ -36,6 +36,7 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.yarn.webapp.Controller.RequestContext;
import org.apache.hadoop.yarn.webapp.Router.Dest;
import org.apache.hadoop.yarn.webapp.view.ErrorPage;
import org.apache.hadoop.http.HtmlQuoting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -73,7 +74,8 @@ public class Dispatcher extends HttpServlet {
public void service(HttpServletRequest req, HttpServletResponse res)
throws ServletException, IOException {
res.setCharacterEncoding("UTF-8");
String uri = req.getRequestURI();
String uri = HtmlQuoting.quoteHtmlChars(req.getRequestURI());
if (uri == null) {
uri = "/";
}

View File

@ -307,7 +307,7 @@ public class HamletImpl extends HamletSpec {
sb.setLength(0);
sb.append(' ').append(name);
if (value != null) {
sb.append("=\"").append(value).append("\"");
sb.append("=\"").append(escapeHtml(value)).append("\"");
}
out.print(sb.toString());
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.util;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.Node;
import org.junit.Assert;
import org.junit.Test;
public class TestRackResolver {
public static final class MyResolver implements DNSToSwitchMapping {
int numHost1 = 0;
@Override
public List<String> resolve(List<String> hostList) {
// Only one host at a time
Assert.assertTrue("hostList size is " + hostList.size(),
hostList.size() <= 1);
List<String> returnList = new ArrayList<String>();
if (hostList.isEmpty()) {
return returnList;
}
if (hostList.get(0).equals("host1")) {
numHost1++;
returnList.add("/rack1");
}
// I should not be reached again as RackResolver is supposed to do
// caching.
Assert.assertTrue(numHost1 <= 1);
return returnList;
}
}
@Test
public void testCaching() {
Configuration conf = new Configuration();
conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf);
Node node = RackResolver.resolve("host1");
Assert.assertEquals("/rack1", node.getNetworkLocation());
node = RackResolver.resolve("host1");
Assert.assertEquals("/rack1", node.getNetworkLocation());
}
}

View File

@ -64,7 +64,6 @@
</goals>
<configuration>
<tasks>
<copy file="src/main/resources/yarn-default.xml" todir="src/site/resources"/>
<copy file="src/main/xsl/configuration.xsl" todir="src/site/resources"/>
</tasks>
</configuration>

View File

@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text;
@ -75,7 +76,6 @@ import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@ -157,6 +157,11 @@ public class ClientRMService extends AbstractService implements
super.start();
}
@Private
public InetSocketAddress getBindAddress() {
return clientBindAddress;
}
/**
* check if the calling user has the access to application information.
* @param callerUGI
@ -412,7 +417,7 @@ public class ClientRMService extends AbstractService implements
SchedulerNodeReport schedulerNodeReport =
scheduler.getNodeReport(rmNode.getNodeID());
Resource used = Resources.none();
Resource used = BuilderUtils.newResource(0);
int numContainers = 0;
if (schedulerNodeReport != null) {
used = schedulerNodeReport.getUsedResource();

View File

@ -110,7 +110,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected ApplicationACLsManager applicationACLsManager;
protected RMDelegationTokenSecretManager rmDTSecretManager;
private WebApp webApp;
private RMContext rmContext;
protected RMContext rmContext;
private final Store store;
protected ResourceTrackerService resourceTracker;

View File

@ -98,6 +98,12 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
*/
public float getUsedCapacity();
/**
* Set used capacity of the queue.
* @param usedCapacity used capacity of the queue
*/
public void setUsedCapacity(float usedCapacity);
/**
* Get the currently utilized resources in the cluster
* by the queue and children (if any).
@ -114,6 +120,12 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
*/
public float getUtilization();
/**
* Get the current <em>utilization</em> of the queue.
* @param utilization queue utilization
*/
public void setUtilization(float utilization);
/**
* Get the current run-state of the queue
* @return current run-state

View File

@ -17,7 +17,9 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.Lock;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
class CSQueueUtils {
@ -65,4 +67,40 @@ class CSQueueUtils {
1);
}
@Lock(CSQueue.class)
public static void updateQueueStatistics(
final CSQueue childQueue, final CSQueue parentQueue,
final Resource clusterResource, final Resource minimumAllocation) {
final int clusterMemory = clusterResource.getMemory();
final int usedMemory = childQueue.getUsedResources().getMemory();
float queueLimit = 0.0f;
float utilization = 0.0f;
float usedCapacity = 0.0f;
if (clusterMemory > 0) {
queueLimit = clusterMemory * childQueue.getAbsoluteCapacity();
final float parentAbsoluteCapacity =
(parentQueue == null) ? 1.0f : parentQueue.getAbsoluteCapacity();
utilization = (usedMemory / queueLimit);
usedCapacity = (usedMemory / (clusterMemory * parentAbsoluteCapacity));
}
childQueue.setUtilization(utilization);
childQueue.setUsedCapacity(usedCapacity);
int available =
Math.max((roundUp(minimumAllocation, (int)queueLimit) - usedMemory), 0);
childQueue.getMetrics().setAvailableResourcesToQueue(
Resources.createResource(available));
}
public static int roundUp(Resource minimumAllocation, int memory) {
int minMemory = minimumAllocation.getMemory();
return LeafQueue.divideAndCeil(memory, minMemory) * minMemory;
}
public static int roundDown(Resource minimumAllocation, int memory) {
int minMemory = minimumAllocation.getMemory();
return (memory / minMemory) * minMemory;
}
}

View File

@ -180,7 +180,9 @@ public class LeafQueue implements CSQueue {
Map<QueueACL, AccessControlList> acls =
cs.getConfiguration().getAcls(getQueuePath());
setupQueueConfigs(capacity, absoluteCapacity,
setupQueueConfigs(
cs.getClusterResources(),
capacity, absoluteCapacity,
maximumCapacity, absoluteMaxCapacity,
userLimit, userLimitFactor,
maxApplications, maxApplicationsPerUser,
@ -198,6 +200,7 @@ public class LeafQueue implements CSQueue {
}
private synchronized void setupQueueConfigs(
Resource clusterResource,
float capacity, float absoluteCapacity,
float maximumCapacity, float absoluteMaxCapacity,
int userLimit, float userLimitFactor,
@ -235,6 +238,10 @@ public class LeafQueue implements CSQueue {
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
}
// Update metrics
CSQueueUtils.updateQueueStatistics(
this, parent, clusterResource, minimumAllocation);
LOG.info("Initializing " + queueName + "\n" +
"capacity = " + capacity +
@ -386,11 +393,11 @@ public class LeafQueue implements CSQueue {
return null;
}
synchronized void setUtilization(float utilization) {
public synchronized void setUtilization(float utilization) {
this.utilization = utilization;
}
synchronized void setUsedCapacity(float usedCapacity) {
public synchronized void setUsedCapacity(float usedCapacity) {
this.usedCapacity = usedCapacity;
}
@ -534,7 +541,9 @@ public class LeafQueue implements CSQueue {
}
LeafQueue leafQueue = (LeafQueue)queue;
setupQueueConfigs(leafQueue.capacity, leafQueue.absoluteCapacity,
setupQueueConfigs(
clusterResource,
leafQueue.capacity, leafQueue.absoluteCapacity,
leafQueue.maximumCapacity, leafQueue.absoluteMaxCapacity,
leafQueue.userLimit, leafQueue.userLimitFactor,
leafQueue.maxApplications,
@ -542,8 +551,6 @@ public class LeafQueue implements CSQueue {
leafQueue.getMaximumActiveApplications(),
leafQueue.getMaximumActiveApplicationsPerUser(),
leafQueue.state, leafQueue.acls);
updateResource(clusterResource);
}
@Override
@ -883,7 +890,8 @@ public class LeafQueue implements CSQueue {
Resource queueMaxCap = // Queue Max-Capacity
Resources.createResource(
roundDown((int)(absoluteMaxCapacity * clusterResource.getMemory()))
CSQueueUtils.roundDown(minimumAllocation,
(int)(absoluteMaxCapacity * clusterResource.getMemory()))
);
Resource userConsumed = getUser(user).getConsumedResources();
@ -904,16 +912,6 @@ public class LeafQueue implements CSQueue {
return userLimit;
}
private int roundUp(int memory) {
int minMemory = minimumAllocation.getMemory();
return divideAndCeil(memory, minMemory) * minMemory;
}
private int roundDown(int memory) {
int minMemory = minimumAllocation.getMemory();
return (memory / minMemory) * minMemory;
}
@Lock(NoLock.class)
private Resource computeUserLimit(SchedulerApp application,
Resource clusterResource, Resource required) {
@ -927,8 +925,11 @@ public class LeafQueue implements CSQueue {
// Allow progress for queues with miniscule capacity
final int queueCapacity =
Math.max(
roundUp((int)(absoluteCapacity * clusterResource.getMemory())),
required.getMemory());
CSQueueUtils.roundUp(
minimumAllocation,
(int)(absoluteCapacity * clusterResource.getMemory())),
required.getMemory()
);
final int consumed = usedResources.getMemory();
final int currentCapacity =
@ -943,7 +944,8 @@ public class LeafQueue implements CSQueue {
final int activeUsers = activeUsersManager.getNumActiveUsers();
int limit =
roundUp(
CSQueueUtils.roundUp(
minimumAllocation,
Math.min(
Math.max(divideAndCeil(currentCapacity, activeUsers),
divideAndCeil((int)userLimit*currentCapacity, 100)),
@ -991,7 +993,7 @@ public class LeafQueue implements CSQueue {
return true;
}
private static int divideAndCeil(int a, int b) {
static int divideAndCeil(int a, int b) {
if (b == 0) {
LOG.info("divideAndCeil called with a=" + a + " b=" + b);
return 0;
@ -1325,7 +1327,8 @@ public class LeafQueue implements CSQueue {
SchedulerApp application, Resource resource) {
// Update queue metrics
Resources.addTo(usedResources, resource);
updateResource(clusterResource);
CSQueueUtils.updateQueueStatistics(
this, parent, clusterResource, minimumAllocation);
++numContainers;
// Update user metrics
@ -1349,7 +1352,8 @@ public class LeafQueue implements CSQueue {
SchedulerApp application, Resource resource) {
// Update queue metrics
Resources.subtractFrom(usedResources, resource);
updateResource(clusterResource);
CSQueueUtils.updateQueueStatistics(
this, parent, clusterResource, minimumAllocation);
--numContainers;
// Update user metrics
@ -1374,6 +1378,10 @@ public class LeafQueue implements CSQueue {
CSQueueUtils.computeMaxActiveApplicationsPerUser(
maxActiveApplications, userLimit, userLimitFactor);
// Update metrics
CSQueueUtils.updateQueueStatistics(
this, parent, clusterResource, minimumAllocation);
// Update application properties
for (SchedulerApp application : activeApplications) {
synchronized (application) {
@ -1383,18 +1391,6 @@ public class LeafQueue implements CSQueue {
}
}
private synchronized void updateResource(Resource clusterResource) {
float queueLimit = clusterResource.getMemory() * absoluteCapacity;
setUtilization(usedResources.getMemory() / queueLimit);
setUsedCapacity(usedResources.getMemory()
/ (clusterResource.getMemory() * parent.getAbsoluteCapacity()));
Resource resourceLimit =
Resources.createResource(roundUp((int)queueLimit));
metrics.setAvailableResourcesToQueue(
Resources.subtractFrom(resourceLimit, usedResources));
}
@Override
public QueueMetrics getMetrics() {
return metrics;

View File

@ -97,7 +97,8 @@ public class ParentQueue implements CSQueue {
RecordFactoryProvider.getRecordFactory(null);
public ParentQueue(CapacitySchedulerContext cs,
String queueName, Comparator<CSQueue> comparator, CSQueue parent, CSQueue old) {
String queueName, Comparator<CSQueue> comparator,
CSQueue parent, CSQueue old) {
minimumAllocation = cs.getMinimumResourceCapability();
this.parent = parent;
@ -137,7 +138,8 @@ public class ParentQueue implements CSQueue {
this.queueInfo.setQueueName(queueName);
this.queueInfo.setChildQueues(new ArrayList<QueueInfo>());
setupQueueConfigs(capacity, absoluteCapacity,
setupQueueConfigs(cs.getClusterResources(),
capacity, absoluteCapacity,
maximumCapacity, absoluteMaxCapacity, state, acls);
this.queueComparator = comparator;
@ -149,9 +151,10 @@ public class ParentQueue implements CSQueue {
}
private synchronized void setupQueueConfigs(
float capacity, float absoluteCapacity,
float maximumCapacity, float absoluteMaxCapacity,
QueueState state, Map<QueueACL, AccessControlList> acls
Resource clusterResource,
float capacity, float absoluteCapacity,
float maximumCapacity, float absoluteMaxCapacity,
QueueState state, Map<QueueACL, AccessControlList> acls
) {
// Sanity check
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
@ -174,6 +177,10 @@ public class ParentQueue implements CSQueue {
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
}
// Update metrics
CSQueueUtils.updateQueueStatistics(
this, parent, clusterResource, minimumAllocation);
LOG.info(queueName +
", capacity=" + capacity +
", asboluteCapacity=" + absoluteCapacity +
@ -384,12 +391,10 @@ public class ParentQueue implements CSQueue {
childQueues.addAll(currentChildQueues.values());
// Set new configs
setupQueueConfigs(parentQueue.capacity, parentQueue.absoluteCapacity,
setupQueueConfigs(clusterResource,
parentQueue.capacity, parentQueue.absoluteCapacity,
parentQueue.maximumCapacity, parentQueue.absoluteMaxCapacity,
parentQueue.state, parentQueue.acls);
// Update
updateResource(clusterResource);
}
Map<String, CSQueue> getQueues(Set<CSQueue> queues) {
@ -485,11 +490,11 @@ public class ParentQueue implements CSQueue {
" #applications: " + getNumApplications());
}
synchronized void setUsedCapacity(float usedCapacity) {
public synchronized void setUsedCapacity(float usedCapacity) {
this.usedCapacity = usedCapacity;
}
synchronized void setUtilization(float utilization) {
public synchronized void setUtilization(float utilization) {
this.utilization = utilization;
}
@ -674,14 +679,16 @@ public class ParentQueue implements CSQueue {
synchronized void allocateResource(Resource clusterResource,
Resource resource) {
Resources.addTo(usedResources, resource);
updateResource(clusterResource);
CSQueueUtils.updateQueueStatistics(
this, parent, clusterResource, minimumAllocation);
++numContainers;
}
synchronized void releaseResource(Resource clusterResource,
Resource resource) {
Resources.subtractFrom(usedResources, resource);
updateResource(clusterResource);
CSQueueUtils.updateQueueStatistics(
this, parent, clusterResource, minimumAllocation);
--numContainers;
}
@ -691,22 +698,12 @@ public class ParentQueue implements CSQueue {
for (CSQueue childQueue : childQueues) {
childQueue.updateClusterResource(clusterResource);
}
// Update metrics
CSQueueUtils.updateQueueStatistics(
this, parent, clusterResource, minimumAllocation);
}
private synchronized void updateResource(Resource clusterResource) {
float queueLimit = clusterResource.getMemory() * absoluteCapacity;
float parentAbsoluteCapacity =
(rootQueue) ? 1.0f : parent.getAbsoluteCapacity();
setUtilization(usedResources.getMemory() / queueLimit);
setUsedCapacity(usedResources.getMemory()
/ (clusterResource.getMemory() * parentAbsoluteCapacity));
Resource resourceLimit =
Resources.createResource((int)queueLimit);
metrics.setAvailableResourcesToQueue(
Resources.subtractFrom(resourceLimit, usedResources));
}
@Override
public QueueMetrics getMetrics() {
return metrics;

View File

@ -230,7 +230,7 @@ public class FifoScheduler implements ResourceScheduler {
}
// Sanity check
SchedulerUtils.normalizeRequests(ask, MINIMUM_MEMORY);
SchedulerUtils.normalizeRequests(ask, minimumAllocation.getMemory());
// Release containers
for (ContainerId releasedContainer : release) {
@ -592,7 +592,7 @@ public class FifoScheduler implements ResourceScheduler {
minimumAllocation)) {
LOG.debug("Node heartbeat " + rmNode.getNodeID() +
" available resource = " + node.getAvailableResource());
assignContainers(node);
LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import static org.apache.hadoop.yarn.util.StringHelper.join;
import static org.apache.hadoop.yarn.util.StringHelper.sjoin;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_STATE;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR_VALUE;
@ -27,6 +26,7 @@ import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR_VALUE;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
@ -52,11 +52,12 @@ class AppsBlock extends HtmlBlock {
th(".user", "User").
th(".name", "Name").
th(".queue", "Queue").
th(".starttime", "StartTime").
th(".finishtime", "FinishTime").
th(".state", "State").
th(".finalstatus", "FinalStatus").
th(".progress", "Progress").
th(".ui", "Tracking UI").
th(".note", "Note")._()._().
th(".ui", "Tracking UI")._()._().
tbody();
int i = 0;
String reqState = $(APP_STATE);
@ -67,6 +68,8 @@ class AppsBlock extends HtmlBlock {
}
AppInfo appInfo = new AppInfo(app, true);
String percent = String.format("%.1f", appInfo.getProgress());
String startTime = Times.format(appInfo.getStartTime());
String finishTime = Times.format(appInfo.getFinishTime());
tbody.
tr().
td().
@ -75,6 +78,10 @@ class AppsBlock extends HtmlBlock {
td(appInfo.getUser()).
td(appInfo.getName()).
td(appInfo.getQueue()).
td().
br().$title(startTime)._()._(startTime)._().
td().
br().$title(startTime)._()._(finishTime)._().
td(appInfo.getState()).
td(appInfo.getFinalStatus()).
td().
@ -85,8 +92,7 @@ class AppsBlock extends HtmlBlock {
$style(join("width:", percent, '%'))._()._()._().
td().
a(!appInfo.isTrackingUrlReady()?
"#" : appInfo.getTrackingUrlPretty(), appInfo.getTrackingUI())._().
td(appInfo.getNote())._();
"#" : appInfo.getTrackingUrlPretty(), appInfo.getTrackingUI())._()._();
if (list.rendering != Render.HTML && ++i >= 20) break;
}
tbody._()._();

View File

@ -55,15 +55,19 @@ public class MetricsOverviewTable extends HtmlBlock {
//CSS in the correct spot
html.style(".metrics {margin-bottom:5px}");
ClusterMetricsInfo clusterMetrics = new ClusterMetricsInfo(this.rm, this.rmContext);
ClusterMetricsInfo clusterMetrics =
new ClusterMetricsInfo(this.rm, this.rmContext);
DIV<Hamlet> div = html.div().$class("metrics");
div.table("#metricsoverview").
div.h3("Cluster Metrics").
table("#metricsoverview").
thead().$class("ui-widget-header").
tr().
th().$class("ui-state-default")._("Apps Submitted")._().
th().$class("ui-state-default")._("Apps Pending")._().
th().$class("ui-state-default")._("Apps Running")._().
th().$class("ui-state-default")._("Apps Completed")._().
th().$class("ui-state-default")._("Containers Running")._().
th().$class("ui-state-default")._("Memory Used")._().
th().$class("ui-state-default")._("Memory Total")._().
@ -78,6 +82,14 @@ public class MetricsOverviewTable extends HtmlBlock {
tbody().$class("ui-widget-content").
tr().
td(String.valueOf(clusterMetrics.getAppsSubmitted())).
td(String.valueOf(clusterMetrics.getAppsPending())).
td(String.valueOf(clusterMetrics.getAppsRunning())).
td(
String.valueOf(
clusterMetrics.getAppsCompleted() +
clusterMetrics.getAppsFailed() + clusterMetrics.getAppsKilled()
)
).
td(String.valueOf(clusterMetrics.getContainersAllocated())).
td(StringUtils.byteDesc(clusterMetrics.getAllocatedMB() * BYTES_IN_MB)).
td(StringUtils.byteDesc(clusterMetrics.getTotalMB() * BYTES_IN_MB)).
@ -89,26 +101,38 @@ public class MetricsOverviewTable extends HtmlBlock {
td().a(url("nodes/rebooted"),String.valueOf(clusterMetrics.getRebootedNodes()))._().
_().
_()._();
String user = request().getRemoteUser();
if (user != null) {
UserMetricsInfo userMetrics = new UserMetricsInfo(this.rm, this.rmContext, user);
if (userMetrics.metricsAvailable()) {
div.table("#usermetricsoverview").
div.h3("User Metrics for " + user).
table("#usermetricsoverview").
thead().$class("ui-widget-header").
tr().
th().$class("ui-state-default")._("Apps Submitted ("+user+")")._().
th().$class("ui-state-default")._("Containers Running ("+user+")")._().
th().$class("ui-state-default")._("Containers Pending ("+user+")")._().
th().$class("ui-state-default")._("Containers Reserved ("+user+")")._().
th().$class("ui-state-default")._("Memory Used ("+user+")")._().
th().$class("ui-state-default")._("Memory Pending ("+user+")")._().
th().$class("ui-state-default")._("Memory Reserved ("+user+")")._().
th().$class("ui-state-default")._("Apps Submitted")._().
th().$class("ui-state-default")._("Apps Pending")._().
th().$class("ui-state-default")._("Apps Running")._().
th().$class("ui-state-default")._("Apps Completed")._().
th().$class("ui-state-default")._("Containers Running")._().
th().$class("ui-state-default")._("Containers Pending")._().
th().$class("ui-state-default")._("Containers Reserved")._().
th().$class("ui-state-default")._("Memory Used")._().
th().$class("ui-state-default")._("Memory Pending")._().
th().$class("ui-state-default")._("Memory Reserved")._().
_().
_().
tbody().$class("ui-widget-content").
tr().
td(String.valueOf(userMetrics.getAppsSubmitted())).
td(String.valueOf(userMetrics.getAppsPending())).
td(String.valueOf(userMetrics.getAppsRunning())).
td(
String.valueOf(
(userMetrics.getAppsCompleted() +
userMetrics.getAppsFailed() + userMetrics.getAppsKilled())
)
).
td(String.valueOf(userMetrics.getRunningContainers())).
td(String.valueOf(userMetrics.getPendingContainers())).
td(String.valueOf(userMetrics.getReservedContainers())).
@ -117,6 +141,7 @@ public class MetricsOverviewTable extends HtmlBlock {
td(StringUtils.byteDesc(userMetrics.getReservedMB() * BYTES_IN_MB)).
_().
_()._();
}
}

View File

@ -63,10 +63,11 @@ public class RmView extends TwoColumnLayout {
private String appsTableInit() {
AppsList list = getInstance(AppsList.class);
// id, user, name, queue, state, progress, ui, note
// id, user, name, queue, starttime, finishtime, state, progress, ui
StringBuilder init = tableInit().
append(", aoColumns:[{sType:'title-numeric'}, null, null, null, null,").
append("null,{sType:'title-numeric', bSearchable:false}, null, null]");
append(", aoColumns:[{sType:'title-numeric'}, null, null, null, ").
append("null, null , null, ").
append("null,{sType:'title-numeric', bSearchable:false}, null]");
// Sort by id upon page load
init.append(", aaSorting: [[0, 'asc']]");

View File

@ -32,10 +32,20 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
public class ClusterMetricsInfo {
protected int appsSubmitted;
protected int appsCompleted;
protected int appsPending;
protected int appsRunning;
protected int appsFailed;
protected int appsKilled;
protected long reservedMB;
protected long availableMB;
protected long allocatedMB;
protected int containersAllocated;
protected int containersReserved;
protected int containersPending;
protected long totalMB;
protected int totalNodes;
protected int lostNodes;
@ -53,10 +63,20 @@ public class ClusterMetricsInfo {
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
this.appsSubmitted = metrics.getAppsSubmitted();
this.appsCompleted = metrics.getAppsCompleted();
this.appsPending = metrics.getAppsPending();
this.appsRunning = metrics.getAppsRunning();
this.appsFailed = metrics.getAppsFailed();
this.appsKilled = metrics.getAppsKilled();
this.reservedMB = metrics.getReservedMB();
this.availableMB = metrics.getAvailableMB();
this.allocatedMB = metrics.getAllocatedMB();
this.containersAllocated = metrics.getAllocatedContainers();
this.containersPending = metrics.getPendingContainers();
this.containersReserved = metrics.getReservedContainers();
this.totalMB = availableMB + reservedMB + allocatedMB;
this.activeNodes = clusterMetrics.getNumActiveNMs();
this.lostNodes = clusterMetrics.getNumLostNMs();
@ -71,6 +91,26 @@ public class ClusterMetricsInfo {
return this.appsSubmitted;
}
public int getAppsCompleted() {
return appsCompleted;
}
public int getAppsPending() {
return appsPending;
}
public int getAppsRunning() {
return appsRunning;
}
public int getAppsFailed() {
return appsFailed;
}
public int getAppsKilled() {
return appsKilled;
}
public long getReservedMB() {
return this.reservedMB;
}
@ -87,6 +127,14 @@ public class ClusterMetricsInfo {
return this.containersAllocated;
}
public int getReservedContainers() {
return this.containersReserved;
}
public int getPendingContainers() {
return this.containersPending;
}
public long getTotalMB() {
return this.totalMB;
}

View File

@ -32,6 +32,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
public class UserMetricsInfo {
protected int appsSubmitted;
protected int appsCompleted;
protected int appsPending;
protected int appsRunning;
protected int appsFailed;
protected int appsKilled;
protected int runningContainers;
protected int pendingContainers;
protected int reservedContainers;
@ -54,10 +59,18 @@ public class UserMetricsInfo {
if (userMetrics != null) {
this.userMetricsAvailable = true;
this.appsSubmitted = userMetrics.getAppsSubmitted();
this.appsCompleted = metrics.getAppsCompleted();
this.appsPending = metrics.getAppsPending();
this.appsRunning = metrics.getAppsRunning();
this.appsFailed = metrics.getAppsFailed();
this.appsKilled = metrics.getAppsKilled();
this.runningContainers = userMetrics.getAllocatedContainers();
this.pendingContainers = userMetrics.getPendingContainers();
this.reservedContainers = userMetrics.getReservedContainers();
this.reservedMB = userMetrics.getReservedMB();
this.pendingMB = userMetrics.getPendingMB();
this.allocatedMB = userMetrics.getAllocatedMB();
@ -72,6 +85,26 @@ public class UserMetricsInfo {
return this.appsSubmitted;
}
public int getAppsCompleted() {
return appsCompleted;
}
public int getAppsPending() {
return appsPending;
}
public int getAppsRunning() {
return appsRunning;
}
public int getAppsFailed() {
return appsFailed;
}
public int getAppsKilled() {
return appsKilled;
}
public long getReservedMB() {
return this.reservedMB;
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.net.InetSocketAddress;
import java.util.List;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Test;
public class TestClientRMService {
private static final Log LOG = LogFactory.getLog(TestClientRMService.class);
@Test
public void testGetClusterNodes() throws Exception {
MockRM rm = new MockRM() {
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler,
this.rmAppManager, this.applicationACLsManager,
this.rmDTSecretManager);
};
};
rm.start();
// Add a healthy node
MockNM node = rm.registerNode("host:1234", 1024);
node.nodeHeartbeat(true);
// Create a client.
Configuration conf = new Configuration();
YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress);
ClientRMProtocol client =
(ClientRMProtocol) rpc
.getProxy(ClientRMProtocol.class, rmAddress, conf);
// Make call
GetClusterNodesRequest request =
Records.newRecord(GetClusterNodesRequest.class);
List<NodeReport> nodeReports =
client.getClusterNodes(request).getNodeReports();
Assert.assertEquals(1, nodeReports.size());
Assert.assertTrue("Node is expected to be healthy!", nodeReports.get(0)
.getNodeHealthStatus().getIsNodeHealthy());
// Now make the node unhealthy.
node.nodeHeartbeat(false);
// Call again
nodeReports = client.getClusterNodes(request).getNodeReports();
Assert.assertEquals(1, nodeReports.size());
Assert.assertFalse("Node is expected to be unhealthy!", nodeReports.get(0)
.getNodeHealthStatus().getIsNodeHealthy());
}
}

View File

@ -24,38 +24,22 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestFifoScheduler {
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
private ResourceManager resourceManager = null;
@Before
public void setUp() throws Exception {
Store store = StoreFactory.getStore(new Configuration());
resourceManager = new ResourceManager(store);
resourceManager.init(new Configuration());
}
@After
public void tearDown() throws Exception {
}
private final int GB = 1024;
@Test
public void test() throws Exception {
@ -63,7 +47,6 @@ public class TestFifoScheduler {
rootLogger.setLevel(Level.DEBUG);
MockRM rm = new MockRM();
rm.start();
int GB = 1024;
MockNM nm1 = rm.registerNode("h1:1234", 6 * GB);
MockNM nm2 = rm.registerNode("h2:5678", 4 * GB);
@ -146,8 +129,48 @@ public class TestFifoScheduler {
rm.stop();
}
private void testMinimumAllocation(YarnConfiguration conf)
throws Exception {
MockRM rm = new MockRM(conf);
rm.start();
// Register node1
MockNM nm1 = rm.registerNode("h1:1234", 6 * GB);
// Submit an application
RMApp app1 = rm.submitApp(256);
// kick the scheduling
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport(
nm1.getNodeId());
int checkAlloc =
conf.getInt("yarn.scheduler.fifo.minimum-allocation-mb", GB);
Assert.assertEquals(checkAlloc, report_nm1.getUsedResource().getMemory());
rm.stop();
}
@Test
public void testDefaultMinimumAllocation() throws Exception {
testMinimumAllocation(new YarnConfiguration());
}
@Test
public void testNonDefaultMinimumAllocation() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt("yarn.scheduler.fifo.minimum-allocation-mb", 512);
testMinimumAllocation(conf);
}
public static void main(String[] args) throws Exception {
TestFifoScheduler t = new TestFifoScheduler();
t.test();
t.testDefaultMinimumAllocation();
t.testNonDefaultMinimumAllocation();
}
}

View File

@ -142,7 +142,7 @@ public class TestApplicationLimits {
CapacityScheduler.parseQueue(csContext, csConf, null, "root",
queues, queues,
CapacityScheduler.queueComparator,
CapacityScheduler.applicationComparator,
CapacityScheduler.applicationComparator,
TestUtils.spyHook);
LeafQueue queue = (LeafQueue)queues.get(A);
@ -163,6 +163,10 @@ public class TestApplicationLimits {
expectedMaxActiveApps * (queue.getUserLimit() / 100.0f) *
queue.getUserLimitFactor()),
queue.getMaximumActiveApplicationsPerUser());
assertEquals(
(int)(clusterResource.getMemory() * queue.getAbsoluteCapacity()),
queue.getMetrics().getAvailableMB()
);
// Add some nodes to the cluster & test new limits
clusterResource = Resources.createResource(120 * 16 * GB);
@ -178,6 +182,10 @@ public class TestApplicationLimits {
(int)Math.ceil(expectedMaxActiveApps *
(queue.getUserLimit() / 100.0f) * queue.getUserLimitFactor()),
queue.getMaximumActiveApplicationsPerUser());
assertEquals(
(int)(clusterResource.getMemory() * queue.getAbsoluteCapacity()),
queue.getMetrics().getAvailableMB()
);
}
@Test

View File

@ -48,7 +48,7 @@ public class TestNodesPage {
// Number of Actual Table Headers for NodesPage.NodesBlock might change in
// future. In that case this value should be adjusted to the new value.
final int numberOfThInMetricsTable = 10;
final int numberOfThInMetricsTable = 13;
final int numberOfActualTableHeaders = 10;
private Injector injector;

View File

@ -361,6 +361,7 @@ public class TestRMWebServices extends JerseyTest {
verifyClusterMetrics(
WebServicesTestUtils.getXmlInt(element, "appsSubmitted"),
WebServicesTestUtils.getXmlInt(element, "appsCompleted"),
WebServicesTestUtils.getXmlInt(element, "reservedMB"),
WebServicesTestUtils.getXmlInt(element, "availableMB"),
WebServicesTestUtils.getXmlInt(element, "allocatedMB"),
@ -379,8 +380,9 @@ public class TestRMWebServices extends JerseyTest {
Exception {
assertEquals("incorrect number of elements", 1, json.length());
JSONObject clusterinfo = json.getJSONObject("clusterMetrics");
assertEquals("incorrect number of elements", 12, clusterinfo.length());
verifyClusterMetrics(clusterinfo.getInt("appsSubmitted"),
assertEquals("incorrect number of elements", 19, clusterinfo.length());
verifyClusterMetrics(
clusterinfo.getInt("appsSubmitted"), clusterinfo.getInt("appsCompleted"),
clusterinfo.getInt("reservedMB"), clusterinfo.getInt("availableMB"),
clusterinfo.getInt("allocatedMB"),
clusterinfo.getInt("containersAllocated"),
@ -390,7 +392,8 @@ public class TestRMWebServices extends JerseyTest {
clusterinfo.getInt("rebootedNodes"),clusterinfo.getInt("activeNodes"));
}
public void verifyClusterMetrics(int sub, int reservedMB, int availableMB,
public void verifyClusterMetrics(int submittedApps, int completedApps,
int reservedMB, int availableMB,
int allocMB, int containersAlloc, int totalMB, int totalNodes,
int lostNodes, int unhealthyNodes, int decommissionedNodes,
int rebootedNodes, int activeNodes) throws JSONException, Exception {
@ -404,7 +407,9 @@ public class TestRMWebServices extends JerseyTest {
+ metrics.getAllocatedMB();
assertEquals("appsSubmitted doesn't match",
metrics.getAppsSubmitted(), sub);
metrics.getAppsSubmitted(), submittedApps);
assertEquals("appsCompleted doesn't match",
metrics.getAppsCompleted(), completedApps);
assertEquals("reservedMB doesn't match",
metrics.getReservedMB(), reservedMB);
assertEquals("availableMB doesn't match",

View File

@ -81,11 +81,11 @@ public class MiniYARNCluster extends CompositeService {
*/
public MiniYARNCluster(String testName, int noOfNodeManagers,
int numLocalDirs, int numLogDirs) {
super(testName);
super(testName.replace("$", ""));
this.numLocalDirs = numLocalDirs;
this.numLogDirs = numLogDirs;
this.testWorkDir = new File("target", testName);
this.testWorkDir = new File("target",
testName.replace("$", ""));
try {
FileContext.getLocalFSFileContext().delete(
new Path(testWorkDir.getAbsolutePath()), true);

View File

@ -476,7 +476,7 @@ Hadoop MapReduce Next Generation - Cluster Setup
designated server:
----
$ $YARN_HOME/bin/yarn start historyserver --config $HADOOP_CONF_DIR
$ $YARN_HOME/bin/mapred start historyserver --config $YARN_CONF_DIR
----
* Hadoop Shutdown
@ -519,7 +519,7 @@ Hadoop MapReduce Next Generation - Cluster Setup
designated server:
----
$ $YARN_HOME/bin/yarn stop historyserver --config $HADOOP_CONF_DIR
$ $YARN_HOME/bin/mapred stop historyserver --config $YARN_CONF_DIR
----
@ -1020,7 +1020,7 @@ KVNO Timestamp Principal
designated server as <mapred>:
----
[mapred]$ $YARN_HOME/bin/yarn start historyserver --config $HADOOP_CONF_DIR
[mapred]$ $YARN_HOME/bin/mapred start historyserver --config $YARN_CONF_DIR
----
* Hadoop Shutdown
@ -1063,7 +1063,7 @@ KVNO Timestamp Principal
designated server as <mapred>:
----
[mapred]$ $YARN_HOME/bin/yarn stop historyserver --config $HADOOP_CONF_DIR
[mapred]$ $YARN_HOME/bin/mapred stop historyserver --config $YARN_CONF_DIR
----
* {Web Interfaces}

View File

@ -90,7 +90,7 @@
<menu name="Configuration" inherit="top">
<item name="core-default.xml" href="hadoop-project-dist/hadoop-common/core-default.xml"/>
<item name="hdfs-default.xml" href="hadoop-project-dist/hadoop-hdfs/hdfs-default.xml"/>
<item name="yarn-default.xml" href="hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/yarn-default.xml"/>
<item name="yarn-default.xml" href="hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml"/>
<item name="mapred-default.xml" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml"/>
<item name="Deprecated Properties" href="hadoop-project-dist/hadoop-common/DeprecatedProperties.html"/>
</menu>