Merge branch 'trunk' into HDFS-6581

This commit is contained in:
arp 2014-09-23 16:40:35 -07:00
commit bfc7b7e6a0
28 changed files with 643 additions and 81 deletions

View File

@ -126,6 +126,9 @@ Trunk (Unreleased)
HADOOP-11041. VersionInfo specifies subversion (Tsuyoshi OZAWA via aw) HADOOP-11041. VersionInfo specifies subversion (Tsuyoshi OZAWA via aw)
HADOOP-11092. hadoop shell commands should print usage if not given a
a class (aw)
BUG FIXES BUG FIXES
HADOOP-9451. Fault single-layer config if node group topology is enabled. HADOOP-9451. Fault single-layer config if node group topology is enabled.
@ -596,6 +599,8 @@ Release 2.6.0 - UNRELEASED
HADOOP-11112. TestKMSWithZK does not use KEY_PROVIDER_URI. (tucu via wang) HADOOP-11112. TestKMSWithZK does not use KEY_PROVIDER_URI. (tucu via wang)
HADOOP-11111 MiniKDC to use locale EN_US for case conversions. (stevel)
BUG FIXES BUG FIXES
HADOOP-10781. Unportable getgrouplist() usage breaks FreeBSD (Dmitry HADOOP-10781. Unportable getgrouplist() usage breaks FreeBSD (Dmitry

View File

@ -162,11 +162,11 @@ case ${COMMAND} in
version) version)
CLASS=org.apache.hadoop.util.VersionInfo CLASS=org.apache.hadoop.util.VersionInfo
;; ;;
-*|hdfs)
hadoop_exit_with_usage 1
;;
*) *)
CLASS="${COMMAND}" CLASS="${COMMAND}"
if ! hadoop_validate_classname "${CLASS}"; then
hadoop_exit_with_usage 1
fi
;; ;;
esac esac

View File

@ -279,6 +279,17 @@ function hadoop_connect_to_hosts
fi fi
} }
function hadoop_validate_classname
{
local class=$1
shift 1
if [[ ! ${class} =~ \. ]]; then
return 1
fi
return 0
}
function hadoop_add_param function hadoop_add_param
{ {
# #

View File

@ -70,6 +70,7 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
@ -109,6 +110,11 @@
*/ */
public class MiniKdc { public class MiniKdc {
public static final String JAVA_SECURITY_KRB5_CONF =
"java.security.krb5.conf";
public static final String SUN_SECURITY_KRB5_DEBUG =
"sun.security.krb5.debug";
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
if (args.length < 4) { if (args.length < 4) {
System.out.println("Arguments: <WORKDIR> <MINIKDCPROPERTIES> " + System.out.println("Arguments: <WORKDIR> <MINIKDCPROPERTIES> " +
@ -266,7 +272,8 @@ public MiniKdc(Properties conf, File workDir) throws Exception {
} }
String orgName= conf.getProperty(ORG_NAME); String orgName= conf.getProperty(ORG_NAME);
String orgDomain = conf.getProperty(ORG_DOMAIN); String orgDomain = conf.getProperty(ORG_DOMAIN);
realm = orgName.toUpperCase() + "." + orgDomain.toUpperCase(); realm = orgName.toUpperCase(Locale.ENGLISH) + "."
+ orgDomain.toUpperCase(Locale.ENGLISH);
} }
/** /**
@ -355,8 +362,8 @@ private void initDirectoryService() throws Exception {
ds.addLast(new KeyDerivationInterceptor()); ds.addLast(new KeyDerivationInterceptor());
// create one partition // create one partition
String orgName= conf.getProperty(ORG_NAME).toLowerCase(); String orgName= conf.getProperty(ORG_NAME).toLowerCase(Locale.ENGLISH);
String orgDomain = conf.getProperty(ORG_DOMAIN).toLowerCase(); String orgDomain = conf.getProperty(ORG_DOMAIN).toLowerCase(Locale.ENGLISH);
JdbmPartition partition = new JdbmPartition(ds.getSchemaManager()); JdbmPartition partition = new JdbmPartition(ds.getSchemaManager());
partition.setId(orgName); partition.setId(orgName);
@ -387,10 +394,10 @@ private void initKDCServer() throws Exception {
String orgDomain = conf.getProperty(ORG_DOMAIN); String orgDomain = conf.getProperty(ORG_DOMAIN);
String bindAddress = conf.getProperty(KDC_BIND_ADDRESS); String bindAddress = conf.getProperty(KDC_BIND_ADDRESS);
final Map<String, String> map = new HashMap<String, String>(); final Map<String, String> map = new HashMap<String, String>();
map.put("0", orgName.toLowerCase()); map.put("0", orgName.toLowerCase(Locale.ENGLISH));
map.put("1", orgDomain.toLowerCase()); map.put("1", orgDomain.toLowerCase(Locale.ENGLISH));
map.put("2", orgName.toUpperCase()); map.put("2", orgName.toUpperCase(Locale.ENGLISH));
map.put("3", orgDomain.toUpperCase()); map.put("3", orgDomain.toUpperCase(Locale.ENGLISH));
map.put("4", bindAddress); map.put("4", bindAddress);
ClassLoader cl = Thread.currentThread().getContextClassLoader(); ClassLoader cl = Thread.currentThread().getContextClassLoader();
@ -455,9 +462,9 @@ private void initKDCServer() throws Exception {
FileUtils.writeStringToFile(krb5conf, FileUtils.writeStringToFile(krb5conf,
MessageFormat.format(sb.toString(), getRealm(), getHost(), MessageFormat.format(sb.toString(), getRealm(), getHost(),
Integer.toString(getPort()), System.getProperty("line.separator"))); Integer.toString(getPort()), System.getProperty("line.separator")));
System.setProperty("java.security.krb5.conf", krb5conf.getAbsolutePath()); System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5conf.getAbsolutePath());
System.setProperty("sun.security.krb5.debug", conf.getProperty(DEBUG, System.setProperty(SUN_SECURITY_KRB5_DEBUG, conf.getProperty(DEBUG,
"false")); "false"));
// refresh the config // refresh the config
@ -481,8 +488,8 @@ private void initKDCServer() throws Exception {
*/ */
public synchronized void stop() { public synchronized void stop() {
if (kdc != null) { if (kdc != null) {
System.getProperties().remove("java.security.krb5.conf"); System.getProperties().remove(JAVA_SECURITY_KRB5_CONF);
System.getProperties().remove("sun.security.krb5.debug"); System.getProperties().remove(SUN_SECURITY_KRB5_DEBUG);
kdc.stop(); kdc.stop();
try { try {
ds.shutdown(); ds.shutdown();
@ -520,8 +527,8 @@ public synchronized void createPrincipal(String principal, String password)
throws Exception { throws Exception {
String orgName= conf.getProperty(ORG_NAME); String orgName= conf.getProperty(ORG_NAME);
String orgDomain = conf.getProperty(ORG_DOMAIN); String orgDomain = conf.getProperty(ORG_DOMAIN);
String baseDn = "ou=users,dc=" + orgName.toLowerCase() + ",dc=" + String baseDn = "ou=users,dc=" + orgName.toLowerCase(Locale.ENGLISH)
orgDomain.toLowerCase(); + ",dc=" + orgDomain.toLowerCase(Locale.ENGLISH);
String content = "dn: uid=" + principal + "," + baseDn + "\n" + String content = "dn: uid=" + principal + "," + baseDn + "\n" +
"objectClass: top\n" + "objectClass: top\n" +
"objectClass: person\n" + "objectClass: person\n" +

View File

@ -795,6 +795,9 @@ Release 2.6.0 - UNRELEASED
HDFS-7001. Tests in TestTracing depends on the order of execution HDFS-7001. Tests in TestTracing depends on the order of execution
(iwasakims via cmccabe) (iwasakims via cmccabe)
HDFS-7132. hdfs namenode -metadataVersion command does not honor
configured name dirs. (Charles Lamb via wang)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an HDFS-6387. HDFS CLI admin tool for creating & deleting an
@ -926,6 +929,12 @@ Release 2.6.0 - UNRELEASED
HDFS-7115. TestEncryptionZones assumes Unix path separator for KMS key store HDFS-7115. TestEncryptionZones assumes Unix path separator for KMS key store
path. (Xiaoyu Yao via cnauroth) path. (Xiaoyu Yao via cnauroth)
HDFS-7115. TestEncryptionZonesWithHA assumes Unix path separator for KMS key
store path. (Xiaoyu Yao via cnauroth)
HDFS-7130. TestDataTransferKeepalive fails intermittently on Windows.
(cnauroth)
Release 2.5.1 - 2014-09-05 Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -222,11 +222,11 @@ case ${COMMAND} in
hadoop_debug "Appending HADOOP_ZKFC_OPTS onto HADOOP_OPTS" hadoop_debug "Appending HADOOP_ZKFC_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_ZKFC_OPTS}" HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_ZKFC_OPTS}"
;; ;;
-*)
hadoop_exit_with_usage 1
;;
*) *)
CLASS="${COMMAND}" CLASS="${COMMAND}"
if ! hadoop_validate_classname "${CLASS}"; then
hadoop_exit_with_usage 1
fi
;; ;;
esac esac

View File

@ -1347,6 +1347,9 @@ private static void doRecovery(StartupOption startOpt, Configuration conf)
*/ */
private static boolean printMetadataVersion(Configuration conf) private static boolean printMetadataVersion(Configuration conf)
throws IOException { throws IOException {
final String nsId = DFSUtil.getNamenodeNameServiceId(conf);
final String namenodeId = HAUtil.getNameNodeId(conf, nsId);
NameNode.initializeGenericKeys(conf, nsId, namenodeId);
final FSImage fsImage = new FSImage(conf); final FSImage fsImage = new FSImage(conf);
final FSNamesystem fs = new FSNamesystem(conf, fsImage, false); final FSNamesystem fs = new FSNamesystem(conf, fsImage, false);
return fsImage.recoverTransitionRead( return fsImage.recoverTransitionRead(

View File

@ -105,7 +105,7 @@ public void testDatanodeRespectsKeepAliveTimeout() throws Exception {
// Sleep for a bit longer than the keepalive timeout // Sleep for a bit longer than the keepalive timeout
// and make sure the xceiver died. // and make sure the xceiver died.
Thread.sleep(DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT + 1); Thread.sleep(DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT + 50);
assertXceiverCount(0); assertXceiverCount(0);
// The socket is still in the cache, because we don't // The socket is still in the cache, because we don't
@ -149,7 +149,7 @@ public void testClientResponsesKeepAliveTimeout() throws Exception {
assertXceiverCount(1); assertXceiverCount(1);
// Sleep for a bit longer than the client keepalive timeout. // Sleep for a bit longer than the client keepalive timeout.
Thread.sleep(CLIENT_EXPIRY_MS + 1); Thread.sleep(CLIENT_EXPIRY_MS + 50);
// Taking out a peer which is expired should give a null. // Taking out a peer which is expired should give a null.
Peer peer = peerCache.get(dn.getDatanodeId(), false); Peer peer = peerCache.get(dn.getDatanodeId(), false);

View File

@ -60,7 +60,8 @@ public void setupCluster() throws Exception {
String testRoot = fsHelper.getTestRootDir(); String testRoot = fsHelper.getTestRootDir();
testRootDir = new File(testRoot).getAbsoluteFile(); testRootDir = new File(testRoot).getAbsoluteFile();
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
JavaKeyStoreProvider.SCHEME_NAME + "://file" + testRootDir + "/test.jks" JavaKeyStoreProvider.SCHEME_NAME + "://file" +
new Path(testRootDir.toString(), "test.jks").toUri()
); );
cluster = new MiniDFSCluster.Builder(conf) cluster = new MiniDFSCluster.Builder(conf)

View File

@ -25,27 +25,22 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.junit.After; import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
public class TestMetadataVersionOutput { public class TestMetadataVersionOutput {
private MiniDFSCluster dfsCluster = null; private MiniDFSCluster dfsCluster = null;
private final Configuration conf = new Configuration(); private final Configuration conf = new Configuration();
@Before
public void setUp() throws Exception {
dfsCluster = new MiniDFSCluster.Builder(conf).
numDataNodes(1).
checkExitOnShutdown(false).
build();
dfsCluster.waitClusterUp();
}
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
if (dfsCluster != null) { if (dfsCluster != null) {
@ -54,9 +49,26 @@ public void tearDown() throws Exception {
Thread.sleep(2000); Thread.sleep(2000);
} }
private void initConfig() {
conf.set(DFS_NAMESERVICE_ID, "ns1");
conf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".ns1", "nn1");
conf.set(DFS_HA_NAMENODE_ID_KEY, "nn1");
conf.set(DFS_NAMENODE_NAME_DIR_KEY + ".ns1.nn1", MiniDFSCluster.getBaseDirectory() + "1");
conf.unset(DFS_NAMENODE_NAME_DIR_KEY);
}
@Test(timeout = 30000) @Test(timeout = 30000)
public void testMetadataVersionOutput() throws IOException { public void testMetadataVersionOutput() throws IOException {
initConfig();
dfsCluster = new MiniDFSCluster.Builder(conf).
manageNameDfsDirs(false).
numDataNodes(1).
checkExitOnShutdown(false).
build();
dfsCluster.waitClusterUp();
dfsCluster.shutdown(false);
initConfig();
final PrintStream origOut = System.out; final PrintStream origOut = System.out;
final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final PrintStream stdOut = new PrintStream(baos); final PrintStream stdOut = new PrintStream(baos);

View File

@ -118,8 +118,11 @@ case ${COMMAND} in
hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS" hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}" HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
;; ;;
-*|*) *)
hadoop_exit_with_usage 1 CLASS="${COMMAND}"
if ! hadoop_validate_classname "${CLASS}"; then
hadoop_exit_with_usage 1
fi
;; ;;
esac esac

View File

@ -91,6 +91,9 @@ Release 2.6.0 - UNRELEASED
YARN-1250. Generic history service should support application-acls. (Zhijie Shen YARN-1250. Generic history service should support application-acls. (Zhijie Shen
via junping_du) via junping_du)
YARN-2569. Added the log handling APIs for the long running services. (Xuan
Gong via zjshen)
IMPROVEMENTS IMPROVEMENTS
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc
@ -244,6 +247,9 @@ Release 2.6.0 - UNRELEASED
YARN-2539. FairScheduler: Set the default value for maxAMShare to 0.5. YARN-2539. FairScheduler: Set the default value for maxAMShare to 0.5.
(Wei Yan via kasha) (Wei Yan via kasha)
YARN-1959. Fix headroom calculation in FairScheduler.
(Anubhav Dhoot via kasha)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES
@ -421,6 +427,13 @@ Release 2.6.0 - UNRELEASED
YARN-2540. FairScheduler: Queue filters not working on scheduler page in YARN-2540. FairScheduler: Queue filters not working on scheduler page in
RM UI. (Ashwin Shankar via kasha) RM UI. (Ashwin Shankar via kasha)
YARN-2584. TestContainerManagerSecurity fails on trunk. (Jian He via
junping_du)
YARN-2252. Intermittent failure of
TestFairScheduler.testContinuousScheduling.
(Ratandeep Ratti and kasha via kasha)
Release 2.5.1 - 2014-09-05 Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -154,11 +154,11 @@ case "${COMMAND}" in
hadoop_debug "Append YARN_CLIENT_OPTS onto YARN_OPTS" hadoop_debug "Append YARN_CLIENT_OPTS onto YARN_OPTS"
YARN_OPTS="${YARN_OPTS} ${YARN_CLIENT_OPTS}" YARN_OPTS="${YARN_OPTS} ${YARN_CLIENT_OPTS}"
;; ;;
-*)
hadoop_exit_with_usage 1
;;
*) *)
CLASS="${COMMAND}" CLASS="${COMMAND}"
if ! hadoop_validate_classname "${CLASS}"; then
hadoop_exit_with_usage 1
fi
;; ;;
esac esac

View File

@ -54,6 +54,7 @@
* validityInterval into failure count. If failure count reaches to * validityInterval into failure count. If failure count reaches to
* maxAppAttempts, the application will be failed. * maxAppAttempts, the application will be failed.
* </li> * </li>
* <li>Optional, application-specific {@link LogAggregationContext}</li>
* </ul> * </ul>
* </p> * </p>
* *
@ -128,6 +129,21 @@ public static ApplicationSubmissionContext newInstance(
return context; return context;
} }
@Public
@Stable
public static ApplicationSubmissionContext newInstance(
ApplicationId applicationId, String applicationName, String queue,
Priority priority, ContainerLaunchContext amContainer,
boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
int maxAppAttempts, Resource resource, String applicationType,
boolean keepContainers, LogAggregationContext logAggregationContext) {
ApplicationSubmissionContext context =
newInstance(applicationId, applicationName, queue, priority,
amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
resource, applicationType, keepContainers);
context.setLogAggregationContext(logAggregationContext);
return context;
}
/** /**
* Get the <code>ApplicationId</code> of the submitted application. * Get the <code>ApplicationId</code> of the submitted application.
* @return <code>ApplicationId</code> of the submitted application * @return <code>ApplicationId</code> of the submitted application
@ -381,4 +397,24 @@ public abstract void setKeepContainersAcrossApplicationAttempts(
@Stable @Stable
public abstract void setAttemptFailuresValidityInterval( public abstract void setAttemptFailuresValidityInterval(
long attemptFailuresValidityInterval); long attemptFailuresValidityInterval);
/**
* Get <code>LogAggregationContext</code> of the application
*
* @return <code>LogAggregationContext</code> of the application
*/
@Public
@Stable
public abstract LogAggregationContext getLogAggregationContext();
/**
* Set <code>LogAggregationContext</code> for the application
*
* @param logAggregationContext
* for the application
*/
@Public
@Stable
public abstract void setLogAggregationContext(
LogAggregationContext logAggregationContext);
} }

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
/**
* <p><code>LogAggregationContext</code> represents all of the
* information needed by the <code>NodeManager</code> to handle
* the logs for an application.</p>
*
* <p>It includes details such as:
* <ul>
* <li>includePattern. It uses Java Regex to filter the log files
* which match the defined include pattern and those log files
* will be uploaded. </li>
* <li>excludePattern. It uses Java Regex to filter the log files
* which match the defined exclude pattern and those log files
* will not be uploaded. If the log file name matches both the
* include and the exclude pattern, this file will be excluded eventually</li>
* <li>rollingIntervalSeconds. The default value is -1. By default,
* the logAggregationService only uploads container logs when
* the application is finished. This configure defines
* how often the logAggregationSerivce uploads container logs in seconds.
* By setting this configure, the logAggregationSerivce can upload container
* logs periodically when the application is running.
* </li>
* </ul>
* </p>
*
* @see ApplicationSubmissionContext
*/
@Evolving
@Public
public abstract class LogAggregationContext {
@Public
@Unstable
public static LogAggregationContext newInstance(String includePattern,
String excludePattern, long rollingIntervalSeconds) {
LogAggregationContext context = Records.newRecord(LogAggregationContext.class);
context.setIncludePattern(includePattern);
context.setExcludePattern(excludePattern);
context.setRollingIntervalSeconds(rollingIntervalSeconds);
return context;
}
/**
* Get include pattern
*
* @return include pattern
*/
@Public
@Unstable
public abstract String getIncludePattern();
/**
* Set include pattern
*
* @param includePattern
*/
@Public
@Unstable
public abstract void setIncludePattern(String includePattern);
/**
* Get exclude pattern
*
* @return exclude pattern
*/
@Public
@Unstable
public abstract String getExcludePattern();
/**
* Set exclude pattern
*
* @param excludePattern
*/
@Public
@Unstable
public abstract void setExcludePattern(String excludePattern);
/**
* Get rollingIntervalSeconds
*
* @return the rollingIntervalSeconds
*/
@Public
@Unstable
public abstract long getRollingIntervalSeconds();
/**
* Set rollingIntervalSeconds
*
* @param rollingIntervalSeconds
*/
@Public
@Unstable
public abstract void setRollingIntervalSeconds(long rollingIntervalSeconds);
}

View File

@ -292,6 +292,13 @@ message ApplicationSubmissionContextProto {
optional bool keep_containers_across_application_attempts = 11 [default = false]; optional bool keep_containers_across_application_attempts = 11 [default = false];
repeated string applicationTags = 12; repeated string applicationTags = 12;
optional int64 attempt_failures_validity_interval = 13 [default = -1]; optional int64 attempt_failures_validity_interval = 13 [default = -1];
optional LogAggregationContextProto log_aggregation_context = 14;
}
message LogAggregationContextProto {
optional string include_pattern = 1 [default = ".*"];
optional string exclude_pattern = 2 [default = ""];
optional int64 rolling_interval_seconds = 3 [default = -1];
} }
enum ApplicationAccessTypeProto { enum ApplicationAccessTypeProto {

View File

@ -19,11 +19,13 @@
package org.apache.hadoop.yarn.api.records.impl.pb; package org.apache.hadoop.yarn.api.records.impl.pb;
import com.google.common.base.CharMatcher; import com.google.common.base.CharMatcher;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -31,6 +33,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
@ -53,6 +56,7 @@ public class ApplicationSubmissionContextPBImpl
private ContainerLaunchContext amContainer = null; private ContainerLaunchContext amContainer = null;
private Resource resource = null; private Resource resource = null;
private Set<String> applicationTags = null; private Set<String> applicationTags = null;
private LogAggregationContext logAggregationContext = null;
public ApplicationSubmissionContextPBImpl() { public ApplicationSubmissionContextPBImpl() {
builder = ApplicationSubmissionContextProto.newBuilder(); builder = ApplicationSubmissionContextProto.newBuilder();
@ -110,6 +114,10 @@ private void mergeLocalToBuilder() {
builder.clearApplicationTags(); builder.clearApplicationTags();
builder.addAllApplicationTags(this.applicationTags); builder.addAllApplicationTags(this.applicationTags);
} }
if (this.logAggregationContext != null) {
builder.setLogAggregationContext(
convertToProtoFormat(this.logAggregationContext));
}
} }
private void mergeLocalToProto() { private void mergeLocalToProto() {
@ -415,4 +423,36 @@ public void setAttemptFailuresValidityInterval(
maybeInitBuilder(); maybeInitBuilder();
builder.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval); builder.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
} }
private LogAggregationContextPBImpl convertFromProtoFormat(
LogAggregationContextProto p) {
return new LogAggregationContextPBImpl(p);
}
private LogAggregationContextProto convertToProtoFormat(
LogAggregationContext t) {
return ((LogAggregationContextPBImpl) t).getProto();
}
@Override
public LogAggregationContext getLogAggregationContext() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
if (this.logAggregationContext != null) {
return this.logAggregationContext;
} // Else via proto
if (!p.hasLogAggregationContext()) {
return null;
}
logAggregationContext = convertFromProtoFormat(p.getLogAggregationContext());
return logAggregationContext;
}
@Override
public void setLogAggregationContext(
LogAggregationContext logAggregationContext) {
maybeInitBuilder();
if (logAggregationContext == null)
builder.clearLogAggregationContext();
this.logAggregationContext = logAggregationContext;
}
} }

View File

@ -0,0 +1,134 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.impl.pb;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProtoOrBuilder;
import com.google.protobuf.TextFormat;
public class LogAggregationContextPBImpl extends LogAggregationContext{
LogAggregationContextProto proto = LogAggregationContextProto.getDefaultInstance();
LogAggregationContextProto.Builder builder = null;
boolean viaProto = false;
public LogAggregationContextPBImpl() {
builder = LogAggregationContextProto.newBuilder();
}
public LogAggregationContextPBImpl(LogAggregationContextProto proto) {
this.proto = proto;
viaProto = true;
}
public LogAggregationContextProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = LogAggregationContextProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public String getIncludePattern() {
LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
if (! p.hasIncludePattern()) {
return null;
}
return p.getIncludePattern();
}
@Override
public void setIncludePattern(String includePattern) {
maybeInitBuilder();
if (includePattern == null) {
builder.clearIncludePattern();
return;
}
builder.setIncludePattern(includePattern);
}
@Override
public String getExcludePattern() {
LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
if (! p.hasExcludePattern()) {
return null;
}
return p.getExcludePattern();
}
@Override
public void setExcludePattern(String excludePattern) {
maybeInitBuilder();
if (excludePattern == null) {
builder.clearExcludePattern();
return;
}
builder.setExcludePattern(excludePattern);
}
@Override
public long getRollingIntervalSeconds() {
LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
if (! p.hasRollingIntervalSeconds()) {
return -1;
}
return p.getRollingIntervalSeconds();
}
@Override
public void setRollingIntervalSeconds(long rollingIntervalSeconds) {
maybeInitBuilder();
builder.setRollingIntervalSeconds(rollingIntervalSeconds);
}
}

View File

@ -178,6 +178,7 @@ public static void setup() throws Exception {
"http", "localhost", 8080, "file0")); "http", "localhost", 8080, "file0"));
typeValueCache.put(SerializedException.class, typeValueCache.put(SerializedException.class,
SerializedException.newInstance(new IOException("exception for test"))); SerializedException.newInstance(new IOException("exception for test")));
generateByNewInstance(LogAggregationContext.class);
generateByNewInstance(ApplicationId.class); generateByNewInstance(ApplicationId.class);
generateByNewInstance(ApplicationAttemptId.class); generateByNewInstance(ApplicationAttemptId.class);
generateByNewInstance(ContainerId.class); generateByNewInstance(ContainerId.class);

View File

@ -422,7 +422,7 @@ public void addCompletedContainer(ContainerId containerId) {
@VisibleForTesting @VisibleForTesting
@Private @Private
public void removeCompletedContainersFromContext( public void removeCompletedContainersFromContext(
List<ContainerId>containerIds) throws IOException { List<ContainerId> containerIds) throws IOException {
Set<ContainerId> removedContainers = new HashSet<ContainerId>(); Set<ContainerId> removedContainers = new HashSet<ContainerId>();
// If the AM has pulled the completedContainer it can be removed // If the AM has pulled the completedContainer it can be removed

View File

@ -171,6 +171,33 @@ private synchronized void unreserveInternal(
+ priority + "; currentReservation " + currentReservation); + priority + "; currentReservation " + currentReservation);
} }
@Override
public synchronized Resource getHeadroom() {
final FSQueue queue = (FSQueue) this.queue;
SchedulingPolicy policy = queue.getPolicy();
Resource queueFairShare = queue.getFairShare();
Resource queueUsage = queue.getResourceUsage();
Resource clusterResource = this.scheduler.getClusterResource();
Resource clusterUsage = this.scheduler.getRootQueueMetrics()
.getAllocatedResources();
Resource clusterAvailableResource = Resources.subtract(clusterResource,
clusterUsage);
Resource headroom = policy.getHeadroom(queueFairShare,
queueUsage, clusterAvailableResource);
if (LOG.isDebugEnabled()) {
LOG.debug("Headroom calculation for " + this.getName() + ":" +
"Min(" +
"(queueFairShare=" + queueFairShare +
" - queueUsage=" + queueUsage + ")," +
" clusterAvailableResource=" + clusterAvailableResource +
"(clusterResource=" + clusterResource +
" - clusterUsage=" + clusterUsage + ")" +
"Headroom=" + headroom);
}
return headroom;
}
public synchronized float getLocalityWaitFactor( public synchronized float getLocalityWaitFactor(
Priority priority, int clusterNodes) { Priority priority, int clusterNodes) {
// Estimate: Required unique resources (i.e. hosts + racks) // Estimate: Required unique resources (i.e. hosts + racks)

View File

@ -175,4 +175,19 @@ public abstract boolean checkIfUsageOverFairShare(
*/ */
public abstract boolean checkIfAMResourceUsageOverLimit( public abstract boolean checkIfAMResourceUsageOverLimit(
Resource usage, Resource maxAMResource); Resource usage, Resource maxAMResource);
/**
* Get headroom by calculating the min of <code>clusterAvailable</code> and
* (<code>queueFairShare</code> - <code>queueUsage</code>) resources that are
* applicable to this policy. For eg if only memory then leave other
* resources such as CPU to same as clusterAvailable.
*
* @param queueFairShare fairshare in the queue
* @param queueUsage resources used in the queue
* @param clusterAvailable available resource in cluster
* @return calculated headroom
*/
public abstract Resource getHeadroom(Resource queueFairShare,
Resource queueUsage, Resource clusterAvailable);
} }

View File

@ -88,6 +88,21 @@ public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMRes
return !Resources.fitsIn(usage, maxAMResource); return !Resources.fitsIn(usage, maxAMResource);
} }
@Override
public Resource getHeadroom(Resource queueFairShare, Resource queueUsage,
Resource clusterAvailable) {
int queueAvailableMemory =
Math.max(queueFairShare.getMemory() - queueUsage.getMemory(), 0);
int queueAvailableCPU =
Math.max(queueFairShare.getVirtualCores() - queueUsage
.getVirtualCores(), 0);
Resource headroom = Resources.createResource(
Math.min(clusterAvailable.getMemory(), queueAvailableMemory),
Math.min(clusterAvailable.getVirtualCores(),
queueAvailableCPU));
return headroom;
}
@Override @Override
public void initialize(Resource clusterCapacity) { public void initialize(Resource clusterCapacity) {
comparator.setClusterCapacity(clusterCapacity); comparator.setClusterCapacity(clusterCapacity);

View File

@ -114,6 +114,17 @@ public Comparator<Schedulable> getComparator() {
return comparator; return comparator;
} }
@Override
public Resource getHeadroom(Resource queueFairShare,
Resource queueUsage, Resource clusterAvailable) {
int queueAvailableMemory = Math.max(
queueFairShare.getMemory() - queueUsage.getMemory(), 0);
Resource headroom = Resources.createResource(
Math.min(clusterAvailable.getMemory(), queueAvailableMemory),
clusterAvailable.getVirtualCores());
return headroom;
}
@Override @Override
public void computeShares(Collection<? extends Schedulable> schedulables, public void computeShares(Collection<? extends Schedulable> schedulables,
Resource totalResources) { Resource totalResources) {

View File

@ -107,6 +107,18 @@ public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMRes
return usage.getMemory() > maxAMResource.getMemory(); return usage.getMemory() > maxAMResource.getMemory();
} }
@Override
public Resource getHeadroom(Resource queueFairShare,
Resource queueUsage, Resource clusterAvailable) {
int queueAvailableMemory = Math.max(
queueFairShare.getMemory() - queueUsage.getMemory(), 0);
Resource headroom = Resources.createResource(
Math.min(clusterAvailable.getMemory(), queueAvailableMemory),
clusterAvailable.getVirtualCores());
return headroom;
}
@Override @Override
public byte getApplicableDepth() { public byte getApplicableDepth() {
return SchedulingPolicy.DEPTH_LEAF; return SchedulingPolicy.DEPTH_LEAF;

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -26,7 +27,12 @@
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -185,4 +191,61 @@ public void testLocalityLevelWithoutDelays() {
assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel( assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel(
prio, 10, -1.0, -1.0)); prio, 10, -1.0, -1.0));
} }
@Test
public void testHeadroom() {
final FairScheduler mockScheduler = Mockito.mock(FairScheduler.class);
Mockito.when(mockScheduler.getClock()).thenReturn(scheduler.getClock());
final FSLeafQueue mockQueue = Mockito.mock(FSLeafQueue.class);
final Resource queueFairShare = Resources.createResource(4096, 4);
final Resource queueUsage = Resource.newInstance(1024, 1);
final Resource clusterResource = Resources.createResource(8192, 8);
final Resource clusterUsage = Resources.createResource(6144, 2);
final QueueMetrics fakeRootQueueMetrics = Mockito.mock(QueueMetrics.class);
ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
RMContext rmContext = resourceManager.getRMContext();
FSAppAttempt schedulerApp =
new FSAppAttempt(mockScheduler, applicationAttemptId, "user1", mockQueue ,
null, rmContext);
Mockito.when(mockQueue.getFairShare()).thenReturn(queueFairShare);
Mockito.when(mockQueue.getResourceUsage()).thenReturn(queueUsage);
Mockito.when(mockScheduler.getClusterResource()).thenReturn
(clusterResource);
Mockito.when(fakeRootQueueMetrics.getAllocatedResources()).thenReturn
(clusterUsage);
Mockito.when(mockScheduler.getRootQueueMetrics()).thenReturn
(fakeRootQueueMetrics);
int minClusterAvailableMemory = 2048;
int minClusterAvailableCPU = 6;
int minQueueAvailableCPU = 3;
// Min of Memory and CPU across cluster and queue is used in
// DominantResourceFairnessPolicy
Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy
.getInstance(DominantResourceFairnessPolicy.class));
verifyHeadroom(schedulerApp, minClusterAvailableMemory,
minQueueAvailableCPU);
// Fair and Fifo ignore CPU of queue, so use cluster available CPU
Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy
.getInstance(FairSharePolicy.class));
verifyHeadroom(schedulerApp, minClusterAvailableMemory,
minClusterAvailableCPU);
Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy
.getInstance(FifoPolicy.class));
verifyHeadroom(schedulerApp, minClusterAvailableMemory,
minClusterAvailableCPU);
}
protected void verifyHeadroom(FSAppAttempt schedulerApp,
int expectedMemory, int expectedCPU) {
Resource headroom = schedulerApp.getHeadroom();
assertEquals(expectedMemory, headroom.getMemory());
assertEquals(expectedCPU, headroom.getVirtualCores());
}
} }

View File

@ -131,8 +131,14 @@ public void setUp() throws IOException {
@After @After
public void tearDown() { public void tearDown() {
scheduler = null; if (scheduler != null) {
resourceManager = null; scheduler.stop();
scheduler = null;
}
if (resourceManager != null) {
resourceManager.stop();
resourceManager = null;
}
QueueMetrics.clearQueueMetrics(); QueueMetrics.clearQueueMetrics();
DefaultMetricsSystem.shutdown(); DefaultMetricsSystem.shutdown();
} }
@ -140,7 +146,7 @@ public void tearDown() {
@Test (timeout = 30000) @Test (timeout = 30000)
public void testConfValidation() throws Exception { public void testConfValidation() throws Exception {
FairScheduler scheduler = new FairScheduler(); scheduler = new FairScheduler();
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
@ -212,7 +218,7 @@ public void testLoadConfigurationOnInitialize() throws IOException {
@Test @Test
public void testNonMinZeroResourcesSettings() throws IOException { public void testNonMinZeroResourcesSettings() throws IOException {
FairScheduler fs = new FairScheduler(); scheduler = new FairScheduler();
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 256); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 256);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 1); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 1);
@ -220,17 +226,17 @@ public void testNonMinZeroResourcesSettings() throws IOException {
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512); FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
conf.setInt( conf.setInt(
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2); FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
fs.init(conf); scheduler.init(conf);
fs.reinitialize(conf, null); scheduler.reinitialize(conf, null);
Assert.assertEquals(256, fs.getMinimumResourceCapability().getMemory()); Assert.assertEquals(256, scheduler.getMinimumResourceCapability().getMemory());
Assert.assertEquals(1, fs.getMinimumResourceCapability().getVirtualCores()); Assert.assertEquals(1, scheduler.getMinimumResourceCapability().getVirtualCores());
Assert.assertEquals(512, fs.getIncrementResourceCapability().getMemory()); Assert.assertEquals(512, scheduler.getIncrementResourceCapability().getMemory());
Assert.assertEquals(2, fs.getIncrementResourceCapability().getVirtualCores()); Assert.assertEquals(2, scheduler.getIncrementResourceCapability().getVirtualCores());
} }
@Test @Test
public void testMinZeroResourcesSettings() throws IOException { public void testMinZeroResourcesSettings() throws IOException {
FairScheduler fs = new FairScheduler(); scheduler = new FairScheduler();
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 0); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 0);
@ -238,12 +244,12 @@ public void testMinZeroResourcesSettings() throws IOException {
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512); FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
conf.setInt( conf.setInt(
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2); FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
fs.init(conf); scheduler.init(conf);
fs.reinitialize(conf, null); scheduler.reinitialize(conf, null);
Assert.assertEquals(0, fs.getMinimumResourceCapability().getMemory()); Assert.assertEquals(0, scheduler.getMinimumResourceCapability().getMemory());
Assert.assertEquals(0, fs.getMinimumResourceCapability().getVirtualCores()); Assert.assertEquals(0, scheduler.getMinimumResourceCapability().getVirtualCores());
Assert.assertEquals(512, fs.getIncrementResourceCapability().getMemory()); Assert.assertEquals(512, scheduler.getIncrementResourceCapability().getMemory());
Assert.assertEquals(2, fs.getIncrementResourceCapability().getVirtualCores()); Assert.assertEquals(2, scheduler.getIncrementResourceCapability().getVirtualCores());
} }
@Test @Test
@ -3293,49 +3299,49 @@ public void testMaxRunningAppsHierarchicalQueues() throws Exception {
@Test (timeout = 10000) @Test (timeout = 10000)
public void testContinuousScheduling() throws Exception { public void testContinuousScheduling() throws Exception {
// set continuous scheduling enabled // set continuous scheduling enabled
FairScheduler fs = new FairScheduler(); scheduler = new FairScheduler();
Configuration conf = createConfiguration(); Configuration conf = createConfiguration();
conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED,
true); true);
fs.setRMContext(resourceManager.getRMContext()); scheduler.setRMContext(resourceManager.getRMContext());
fs.init(conf); scheduler.init(conf);
fs.start(); scheduler.start();
fs.reinitialize(conf, resourceManager.getRMContext()); scheduler.reinitialize(conf, resourceManager.getRMContext());
Assert.assertTrue("Continuous scheduling should be enabled.", Assert.assertTrue("Continuous scheduling should be enabled.",
fs.isContinuousSchedulingEnabled()); scheduler.isContinuousSchedulingEnabled());
// Add two nodes // Add two nodes
RMNode node1 = RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
"127.0.0.1"); "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
fs.handle(nodeEvent1); scheduler.handle(nodeEvent1);
RMNode node2 = RMNode node2 =
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2, MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2,
"127.0.0.2"); "127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
fs.handle(nodeEvent2); scheduler.handle(nodeEvent2);
// available resource // available resource
Assert.assertEquals(fs.getClusterResource().getMemory(), 16 * 1024); Assert.assertEquals(scheduler.getClusterResource().getMemory(), 16 * 1024);
Assert.assertEquals(fs.getClusterResource().getVirtualCores(), 16); Assert.assertEquals(scheduler.getClusterResource().getVirtualCores(), 16);
// send application request // send application request
ApplicationAttemptId appAttemptId = ApplicationAttemptId appAttemptId =
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false); scheduler.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false);
fs.addApplicationAttempt(appAttemptId, false, false); scheduler.addApplicationAttempt(appAttemptId, false, false);
List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest request = ResourceRequest request =
createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true); createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
ask.add(request); ask.add(request);
fs.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null); scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null);
// waiting for continuous_scheduler_sleep_time // waiting for continuous_scheduler_sleep_time
// at least one pass // at least one pass
Thread.sleep(fs.getConf().getContinuousSchedulingSleepMs() + 500); Thread.sleep(scheduler.getConf().getContinuousSchedulingSleepMs() + 500);
FSAppAttempt app = fs.getSchedulerApp(appAttemptId); FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
// Wait until app gets resources. // Wait until app gets resources.
while (app.getCurrentConsumption().equals(Resources.none())) { } while (app.getCurrentConsumption().equals(Resources.none())) { }
@ -3348,7 +3354,7 @@ public void testContinuousScheduling() throws Exception {
createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true); createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true);
ask.clear(); ask.clear();
ask.add(request); ask.add(request);
fs.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null); scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null);
// Wait until app gets resources // Wait until app gets resources
while (app.getCurrentConsumption() while (app.getCurrentConsumption()

View File

@ -27,10 +27,8 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.LinkedList; import java.util.LinkedList;
import com.google.common.io.ByteArrayDataInput; import java.util.List;
import com.google.common.io.ByteStreams;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -52,6 +50,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -80,6 +79,9 @@
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters; import org.junit.runners.Parameterized.Parameters;
import com.google.common.io.ByteArrayDataInput;
import com.google.common.io.ByteStreams;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class TestContainerManagerSecurity extends KerberosSecurityTestcase { public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
@ -137,7 +139,7 @@ public TestContainerManagerSecurity(Configuration conf) {
this.conf = conf; this.conf = conf;
} }
@Test (timeout = 1000000) @Test (timeout = 120000)
public void testContainerManager() throws Exception { public void testContainerManager() throws Exception {
try { try {
yarnCluster = new MiniYARNCluster(TestContainerManagerSecurity.class yarnCluster = new MiniYARNCluster(TestContainerManagerSecurity.class
@ -162,7 +164,7 @@ public void testContainerManager() throws Exception {
} }
} }
@Test (timeout = 500000) @Test (timeout = 120000)
public void testContainerManagerWithEpoch() throws Exception { public void testContainerManagerWithEpoch() throws Exception {
try { try {
yarnCluster = new MiniYARNCluster(TestContainerManagerSecurity.class yarnCluster = new MiniYARNCluster(TestContainerManagerSecurity.class
@ -355,14 +357,22 @@ private void testNMTokens(Configuration conf) throws Exception {
private void waitForContainerToFinishOnNM(ContainerId containerId) { private void waitForContainerToFinishOnNM(ContainerId containerId) {
Context nmContet = yarnCluster.getNodeManager(0).getNMContext(); Context nmContet = yarnCluster.getNodeManager(0).getNMContext();
int interval = 4 * 60; // Max time for container token to expire. int interval = 4 * 60; // Max time for container token to expire.
Assert.assertNotNull(nmContet.getContainers().containsKey(containerId));
while ((interval-- > 0) while ((interval-- > 0)
&& nmContet.getContainers().containsKey(containerId)) { && !nmContet.getContainers().get(containerId)
.cloneAndGetContainerStatus().getState()
.equals(ContainerState.COMPLETE)) {
try { try {
LOG.info("Waiting for " + containerId + " to complete.");
Thread.sleep(1000); Thread.sleep(1000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
} }
} }
Assert.assertFalse(nmContet.getContainers().containsKey(containerId)); // Normally, Containers will be removed from NM context after they are
// explicitly acked by RM. Now, manually remove it for testing.
yarnCluster.getNodeManager(0).getNodeStatusUpdater()
.addCompletedContainer(containerId);
nmContet.getContainers().remove(containerId);
} }
protected void waitForNMToReceiveNMTokenKey( protected void waitForNMToReceiveNMTokenKey(