diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 0b3757740c7..5f4ae1ac101 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -126,6 +126,9 @@ Trunk (Unreleased)
HADOOP-11041. VersionInfo specifies subversion (Tsuyoshi OZAWA via aw)
+ HADOOP-11092. hadoop shell commands should print usage if not given a
+ a class (aw)
+
BUG FIXES
HADOOP-9451. Fault single-layer config if node group topology is enabled.
@@ -596,6 +599,8 @@ Release 2.6.0 - UNRELEASED
HADOOP-11112. TestKMSWithZK does not use KEY_PROVIDER_URI. (tucu via wang)
+ HADOOP-11111 MiniKDC to use locale EN_US for case conversions. (stevel)
+
BUG FIXES
HADOOP-10781. Unportable getgrouplist() usage breaks FreeBSD (Dmitry
diff --git a/hadoop-common-project/hadoop-common/src/main/bin/hadoop b/hadoop-common-project/hadoop-common/src/main/bin/hadoop
index 64c67587dc6..ad6e4ee3d41 100755
--- a/hadoop-common-project/hadoop-common/src/main/bin/hadoop
+++ b/hadoop-common-project/hadoop-common/src/main/bin/hadoop
@@ -162,11 +162,11 @@ case ${COMMAND} in
version)
CLASS=org.apache.hadoop.util.VersionInfo
;;
- -*|hdfs)
- hadoop_exit_with_usage 1
- ;;
*)
CLASS="${COMMAND}"
+ if ! hadoop_validate_classname "${CLASS}"; then
+ hadoop_exit_with_usage 1
+ fi
;;
esac
diff --git a/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh b/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh
index dfdb1016639..efa42f65229 100644
--- a/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh
+++ b/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh
@@ -279,6 +279,17 @@ function hadoop_connect_to_hosts
fi
}
+function hadoop_validate_classname
+{
+ local class=$1
+ shift 1
+
+ if [[ ! ${class} =~ \. ]]; then
+ return 1
+ fi
+ return 0
+}
+
function hadoop_add_param
{
#
diff --git a/hadoop-common-project/hadoop-minikdc/src/main/java/org/apache/hadoop/minikdc/MiniKdc.java b/hadoop-common-project/hadoop-minikdc/src/main/java/org/apache/hadoop/minikdc/MiniKdc.java
index d3ea2e70cfa..7107b75aaef 100644
--- a/hadoop-common-project/hadoop-minikdc/src/main/java/org/apache/hadoop/minikdc/MiniKdc.java
+++ b/hadoop-common-project/hadoop-minikdc/src/main/java/org/apache/hadoop/minikdc/MiniKdc.java
@@ -70,6 +70,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -109,6 +110,11 @@ import java.util.UUID;
*/
public class MiniKdc {
+ public static final String JAVA_SECURITY_KRB5_CONF =
+ "java.security.krb5.conf";
+ public static final String SUN_SECURITY_KRB5_DEBUG =
+ "sun.security.krb5.debug";
+
public static void main(String[] args) throws Exception {
if (args.length < 4) {
System.out.println("Arguments: " +
@@ -266,7 +272,8 @@ public class MiniKdc {
}
String orgName= conf.getProperty(ORG_NAME);
String orgDomain = conf.getProperty(ORG_DOMAIN);
- realm = orgName.toUpperCase() + "." + orgDomain.toUpperCase();
+ realm = orgName.toUpperCase(Locale.ENGLISH) + "."
+ + orgDomain.toUpperCase(Locale.ENGLISH);
}
/**
@@ -355,8 +362,8 @@ public class MiniKdc {
ds.addLast(new KeyDerivationInterceptor());
// create one partition
- String orgName= conf.getProperty(ORG_NAME).toLowerCase();
- String orgDomain = conf.getProperty(ORG_DOMAIN).toLowerCase();
+ String orgName= conf.getProperty(ORG_NAME).toLowerCase(Locale.ENGLISH);
+ String orgDomain = conf.getProperty(ORG_DOMAIN).toLowerCase(Locale.ENGLISH);
JdbmPartition partition = new JdbmPartition(ds.getSchemaManager());
partition.setId(orgName);
@@ -387,10 +394,10 @@ public class MiniKdc {
String orgDomain = conf.getProperty(ORG_DOMAIN);
String bindAddress = conf.getProperty(KDC_BIND_ADDRESS);
final Map map = new HashMap();
- map.put("0", orgName.toLowerCase());
- map.put("1", orgDomain.toLowerCase());
- map.put("2", orgName.toUpperCase());
- map.put("3", orgDomain.toUpperCase());
+ map.put("0", orgName.toLowerCase(Locale.ENGLISH));
+ map.put("1", orgDomain.toLowerCase(Locale.ENGLISH));
+ map.put("2", orgName.toUpperCase(Locale.ENGLISH));
+ map.put("3", orgDomain.toUpperCase(Locale.ENGLISH));
map.put("4", bindAddress);
ClassLoader cl = Thread.currentThread().getContextClassLoader();
@@ -455,9 +462,9 @@ public class MiniKdc {
FileUtils.writeStringToFile(krb5conf,
MessageFormat.format(sb.toString(), getRealm(), getHost(),
Integer.toString(getPort()), System.getProperty("line.separator")));
- System.setProperty("java.security.krb5.conf", krb5conf.getAbsolutePath());
+ System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5conf.getAbsolutePath());
- System.setProperty("sun.security.krb5.debug", conf.getProperty(DEBUG,
+ System.setProperty(SUN_SECURITY_KRB5_DEBUG, conf.getProperty(DEBUG,
"false"));
// refresh the config
@@ -481,8 +488,8 @@ public class MiniKdc {
*/
public synchronized void stop() {
if (kdc != null) {
- System.getProperties().remove("java.security.krb5.conf");
- System.getProperties().remove("sun.security.krb5.debug");
+ System.getProperties().remove(JAVA_SECURITY_KRB5_CONF);
+ System.getProperties().remove(SUN_SECURITY_KRB5_DEBUG);
kdc.stop();
try {
ds.shutdown();
@@ -520,8 +527,8 @@ public class MiniKdc {
throws Exception {
String orgName= conf.getProperty(ORG_NAME);
String orgDomain = conf.getProperty(ORG_DOMAIN);
- String baseDn = "ou=users,dc=" + orgName.toLowerCase() + ",dc=" +
- orgDomain.toLowerCase();
+ String baseDn = "ou=users,dc=" + orgName.toLowerCase(Locale.ENGLISH)
+ + ",dc=" + orgDomain.toLowerCase(Locale.ENGLISH);
String content = "dn: uid=" + principal + "," + baseDn + "\n" +
"objectClass: top\n" +
"objectClass: person\n" +
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 9389d370aae..af6c135be8d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -795,6 +795,9 @@ Release 2.6.0 - UNRELEASED
HDFS-7001. Tests in TestTracing depends on the order of execution
(iwasakims via cmccabe)
+ HDFS-7132. hdfs namenode -metadataVersion command does not honor
+ configured name dirs. (Charles Lamb via wang)
+
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an
@@ -926,6 +929,12 @@ Release 2.6.0 - UNRELEASED
HDFS-7115. TestEncryptionZones assumes Unix path separator for KMS key store
path. (Xiaoyu Yao via cnauroth)
+ HDFS-7115. TestEncryptionZonesWithHA assumes Unix path separator for KMS key
+ store path. (Xiaoyu Yao via cnauroth)
+
+ HDFS-7130. TestDataTransferKeepalive fails intermittently on Windows.
+ (cnauroth)
+
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
index 22a0f0f8c0f..087c67472a9 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
@@ -222,11 +222,11 @@ case ${COMMAND} in
hadoop_debug "Appending HADOOP_ZKFC_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_ZKFC_OPTS}"
;;
- -*)
- hadoop_exit_with_usage 1
- ;;
*)
CLASS="${COMMAND}"
+ if ! hadoop_validate_classname "${CLASS}"; then
+ hadoop_exit_with_usage 1
+ fi
;;
esac
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index bcb5a8697d1..217645a689d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -1347,6 +1347,9 @@ public class NameNode implements NameNodeStatusMXBean {
*/
private static boolean printMetadataVersion(Configuration conf)
throws IOException {
+ final String nsId = DFSUtil.getNamenodeNameServiceId(conf);
+ final String namenodeId = HAUtil.getNameNodeId(conf, nsId);
+ NameNode.initializeGenericKeys(conf, nsId, namenodeId);
final FSImage fsImage = new FSImage(conf);
final FSNamesystem fs = new FSNamesystem(conf, fsImage, false);
return fsImage.recoverTransitionRead(
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
index 003e6be273b..eae8ea7681b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
@@ -105,7 +105,7 @@ public class TestDataTransferKeepalive {
// Sleep for a bit longer than the keepalive timeout
// and make sure the xceiver died.
- Thread.sleep(DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT + 1);
+ Thread.sleep(DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT + 50);
assertXceiverCount(0);
// The socket is still in the cache, because we don't
@@ -149,7 +149,7 @@ public class TestDataTransferKeepalive {
assertXceiverCount(1);
// Sleep for a bit longer than the client keepalive timeout.
- Thread.sleep(CLIENT_EXPIRY_MS + 1);
+ Thread.sleep(CLIENT_EXPIRY_MS + 50);
// Taking out a peer which is expired should give a null.
Peer peer = peerCache.get(dn.getDatanodeId(), false);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithHA.java
index c74f99063ec..04977d4597d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithHA.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithHA.java
@@ -60,7 +60,8 @@ public class TestEncryptionZonesWithHA {
String testRoot = fsHelper.getTestRootDir();
testRootDir = new File(testRoot).getAbsoluteFile();
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
- JavaKeyStoreProvider.SCHEME_NAME + "://file" + testRootDir + "/test.jks"
+ JavaKeyStoreProvider.SCHEME_NAME + "://file" +
+ new Path(testRootDir.toString(), "test.jks").toUri()
);
cluster = new MiniDFSCluster.Builder(conf)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetadataVersionOutput.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetadataVersionOutput.java
index 0e809cf9c66..03c75577ab9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetadataVersionOutput.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetadataVersionOutput.java
@@ -25,27 +25,22 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
+
public class TestMetadataVersionOutput {
private MiniDFSCluster dfsCluster = null;
private final Configuration conf = new Configuration();
- @Before
- public void setUp() throws Exception {
- dfsCluster = new MiniDFSCluster.Builder(conf).
- numDataNodes(1).
- checkExitOnShutdown(false).
- build();
- dfsCluster.waitClusterUp();
- }
-
@After
public void tearDown() throws Exception {
if (dfsCluster != null) {
@@ -54,9 +49,26 @@ public class TestMetadataVersionOutput {
Thread.sleep(2000);
}
+ private void initConfig() {
+ conf.set(DFS_NAMESERVICE_ID, "ns1");
+ conf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".ns1", "nn1");
+ conf.set(DFS_HA_NAMENODE_ID_KEY, "nn1");
+ conf.set(DFS_NAMENODE_NAME_DIR_KEY + ".ns1.nn1", MiniDFSCluster.getBaseDirectory() + "1");
+ conf.unset(DFS_NAMENODE_NAME_DIR_KEY);
+ }
+
@Test(timeout = 30000)
public void testMetadataVersionOutput() throws IOException {
+ initConfig();
+ dfsCluster = new MiniDFSCluster.Builder(conf).
+ manageNameDfsDirs(false).
+ numDataNodes(1).
+ checkExitOnShutdown(false).
+ build();
+ dfsCluster.waitClusterUp();
+ dfsCluster.shutdown(false);
+ initConfig();
final PrintStream origOut = System.out;
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final PrintStream stdOut = new PrintStream(baos);
diff --git a/hadoop-mapreduce-project/bin/mapred b/hadoop-mapreduce-project/bin/mapred
index 8f3063774f8..2163cada29d 100755
--- a/hadoop-mapreduce-project/bin/mapred
+++ b/hadoop-mapreduce-project/bin/mapred
@@ -118,8 +118,11 @@ case ${COMMAND} in
hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
;;
- -*|*)
- hadoop_exit_with_usage 1
+ *)
+ CLASS="${COMMAND}"
+ if ! hadoop_validate_classname "${CLASS}"; then
+ hadoop_exit_with_usage 1
+ fi
;;
esac
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 2bc118de5d7..0e4909e8af9 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -91,6 +91,9 @@ Release 2.6.0 - UNRELEASED
YARN-1250. Generic history service should support application-acls. (Zhijie Shen
via junping_du)
+ YARN-2569. Added the log handling APIs for the long running services. (Xuan
+ Gong via zjshen)
+
IMPROVEMENTS
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc
@@ -244,6 +247,9 @@ Release 2.6.0 - UNRELEASED
YARN-2539. FairScheduler: Set the default value for maxAMShare to 0.5.
(Wei Yan via kasha)
+ YARN-1959. Fix headroom calculation in FairScheduler.
+ (Anubhav Dhoot via kasha)
+
OPTIMIZATIONS
BUG FIXES
@@ -421,6 +427,13 @@ Release 2.6.0 - UNRELEASED
YARN-2540. FairScheduler: Queue filters not working on scheduler page in
RM UI. (Ashwin Shankar via kasha)
+ YARN-2584. TestContainerManagerSecurity fails on trunk. (Jian He via
+ junping_du)
+
+ YARN-2252. Intermittent failure of
+ TestFairScheduler.testContinuousScheduling.
+ (Ratandeep Ratti and kasha via kasha)
+
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES
diff --git a/hadoop-yarn-project/hadoop-yarn/bin/yarn b/hadoop-yarn-project/hadoop-yarn/bin/yarn
index 12f7bb501b2..207fb4a41ff 100644
--- a/hadoop-yarn-project/hadoop-yarn/bin/yarn
+++ b/hadoop-yarn-project/hadoop-yarn/bin/yarn
@@ -154,11 +154,11 @@ case "${COMMAND}" in
hadoop_debug "Append YARN_CLIENT_OPTS onto YARN_OPTS"
YARN_OPTS="${YARN_OPTS} ${YARN_CLIENT_OPTS}"
;;
- -*)
- hadoop_exit_with_usage 1
- ;;
*)
CLASS="${COMMAND}"
+ if ! hadoop_validate_classname "${CLASS}"; then
+ hadoop_exit_with_usage 1
+ fi
;;
esac
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
index 723a2e0908e..22023807adc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
@@ -54,6 +54,7 @@ import java.util.Set;
* validityInterval into failure count. If failure count reaches to
* maxAppAttempts, the application will be failed.
*
+ * Optional, application-specific {@link LogAggregationContext}
*
*
*
@@ -128,6 +129,21 @@ public abstract class ApplicationSubmissionContext {
return context;
}
+ @Public
+ @Stable
+ public static ApplicationSubmissionContext newInstance(
+ ApplicationId applicationId, String applicationName, String queue,
+ Priority priority, ContainerLaunchContext amContainer,
+ boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
+ int maxAppAttempts, Resource resource, String applicationType,
+ boolean keepContainers, LogAggregationContext logAggregationContext) {
+ ApplicationSubmissionContext context =
+ newInstance(applicationId, applicationName, queue, priority,
+ amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
+ resource, applicationType, keepContainers);
+ context.setLogAggregationContext(logAggregationContext);
+ return context;
+ }
/**
* Get the ApplicationId
of the submitted application.
* @return ApplicationId
of the submitted application
@@ -381,4 +397,24 @@ public abstract class ApplicationSubmissionContext {
@Stable
public abstract void setAttemptFailuresValidityInterval(
long attemptFailuresValidityInterval);
+
+ /**
+ * Get LogAggregationContext
of the application
+ *
+ * @return LogAggregationContext
of the application
+ */
+ @Public
+ @Stable
+ public abstract LogAggregationContext getLogAggregationContext();
+
+ /**
+ * Set LogAggregationContext
for the application
+ *
+ * @param logAggregationContext
+ * for the application
+ */
+ @Public
+ @Stable
+ public abstract void setLogAggregationContext(
+ LogAggregationContext logAggregationContext);
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java
new file mode 100644
index 00000000000..9a0a15774f5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * LogAggregationContext
represents all of the
+ * information needed by the NodeManager
to handle
+ * the logs for an application.
+ *
+ * It includes details such as:
+ *
+ * - includePattern. It uses Java Regex to filter the log files
+ * which match the defined include pattern and those log files
+ * will be uploaded.
+ * - excludePattern. It uses Java Regex to filter the log files
+ * which match the defined exclude pattern and those log files
+ * will not be uploaded. If the log file name matches both the
+ * include and the exclude pattern, this file will be excluded eventually
+ * - rollingIntervalSeconds. The default value is -1. By default,
+ * the logAggregationService only uploads container logs when
+ * the application is finished. This configure defines
+ * how often the logAggregationSerivce uploads container logs in seconds.
+ * By setting this configure, the logAggregationSerivce can upload container
+ * logs periodically when the application is running.
+ *
+ *
+ *
+ *
+ * @see ApplicationSubmissionContext
+ */
+
+@Evolving
+@Public
+public abstract class LogAggregationContext {
+
+ @Public
+ @Unstable
+ public static LogAggregationContext newInstance(String includePattern,
+ String excludePattern, long rollingIntervalSeconds) {
+ LogAggregationContext context = Records.newRecord(LogAggregationContext.class);
+ context.setIncludePattern(includePattern);
+ context.setExcludePattern(excludePattern);
+ context.setRollingIntervalSeconds(rollingIntervalSeconds);
+ return context;
+ }
+
+ /**
+ * Get include pattern
+ *
+ * @return include pattern
+ */
+ @Public
+ @Unstable
+ public abstract String getIncludePattern();
+
+ /**
+ * Set include pattern
+ *
+ * @param includePattern
+ */
+ @Public
+ @Unstable
+ public abstract void setIncludePattern(String includePattern);
+
+ /**
+ * Get exclude pattern
+ *
+ * @return exclude pattern
+ */
+ @Public
+ @Unstable
+ public abstract String getExcludePattern();
+
+ /**
+ * Set exclude pattern
+ *
+ * @param excludePattern
+ */
+ @Public
+ @Unstable
+ public abstract void setExcludePattern(String excludePattern);
+
+ /**
+ * Get rollingIntervalSeconds
+ *
+ * @return the rollingIntervalSeconds
+ */
+ @Public
+ @Unstable
+ public abstract long getRollingIntervalSeconds();
+
+ /**
+ * Set rollingIntervalSeconds
+ *
+ * @param rollingIntervalSeconds
+ */
+ @Public
+ @Unstable
+ public abstract void setRollingIntervalSeconds(long rollingIntervalSeconds);
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index d8c42cc303d..b3687466cc4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -292,6 +292,13 @@ message ApplicationSubmissionContextProto {
optional bool keep_containers_across_application_attempts = 11 [default = false];
repeated string applicationTags = 12;
optional int64 attempt_failures_validity_interval = 13 [default = -1];
+ optional LogAggregationContextProto log_aggregation_context = 14;
+}
+
+message LogAggregationContextProto {
+ optional string include_pattern = 1 [default = ".*"];
+ optional string exclude_pattern = 2 [default = ""];
+ optional int64 rolling_interval_seconds = 3 [default = -1];
}
enum ApplicationAccessTypeProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
index 7b49a1654f6..e4f183b9027 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
@@ -19,11 +19,13 @@
package org.apache.hadoop.yarn.api.records.impl.pb;
import com.google.common.base.CharMatcher;
+
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -31,6 +33,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
@@ -53,6 +56,7 @@ extends ApplicationSubmissionContext {
private ContainerLaunchContext amContainer = null;
private Resource resource = null;
private Set applicationTags = null;
+ private LogAggregationContext logAggregationContext = null;
public ApplicationSubmissionContextPBImpl() {
builder = ApplicationSubmissionContextProto.newBuilder();
@@ -110,6 +114,10 @@ extends ApplicationSubmissionContext {
builder.clearApplicationTags();
builder.addAllApplicationTags(this.applicationTags);
}
+ if (this.logAggregationContext != null) {
+ builder.setLogAggregationContext(
+ convertToProtoFormat(this.logAggregationContext));
+ }
}
private void mergeLocalToProto() {
@@ -415,4 +423,36 @@ extends ApplicationSubmissionContext {
maybeInitBuilder();
builder.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
}
+
+ private LogAggregationContextPBImpl convertFromProtoFormat(
+ LogAggregationContextProto p) {
+ return new LogAggregationContextPBImpl(p);
+ }
+
+ private LogAggregationContextProto convertToProtoFormat(
+ LogAggregationContext t) {
+ return ((LogAggregationContextPBImpl) t).getProto();
+ }
+
+ @Override
+ public LogAggregationContext getLogAggregationContext() {
+ ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.logAggregationContext != null) {
+ return this.logAggregationContext;
+ } // Else via proto
+ if (!p.hasLogAggregationContext()) {
+ return null;
+ }
+ logAggregationContext = convertFromProtoFormat(p.getLogAggregationContext());
+ return logAggregationContext;
+ }
+
+ @Override
+ public void setLogAggregationContext(
+ LogAggregationContext logAggregationContext) {
+ maybeInitBuilder();
+ if (logAggregationContext == null)
+ builder.clearLogAggregationContext();
+ this.logAggregationContext = logAggregationContext;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java
new file mode 100644
index 00000000000..4406ef9fcea
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProtoOrBuilder;
+import com.google.protobuf.TextFormat;
+
+public class LogAggregationContextPBImpl extends LogAggregationContext{
+
+ LogAggregationContextProto proto = LogAggregationContextProto.getDefaultInstance();
+ LogAggregationContextProto.Builder builder = null;
+ boolean viaProto = false;
+
+ public LogAggregationContextPBImpl() {
+ builder = LogAggregationContextProto.newBuilder();
+ }
+
+ public LogAggregationContextPBImpl(LogAggregationContextProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public LogAggregationContextProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = LogAggregationContextProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+
+ @Override
+ public String getIncludePattern() {
+ LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (! p.hasIncludePattern()) {
+ return null;
+ }
+ return p.getIncludePattern();
+ }
+
+ @Override
+ public void setIncludePattern(String includePattern) {
+ maybeInitBuilder();
+ if (includePattern == null) {
+ builder.clearIncludePattern();
+ return;
+ }
+ builder.setIncludePattern(includePattern);
+ }
+
+ @Override
+ public String getExcludePattern() {
+ LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (! p.hasExcludePattern()) {
+ return null;
+ }
+ return p.getExcludePattern();
+ }
+
+ @Override
+ public void setExcludePattern(String excludePattern) {
+ maybeInitBuilder();
+ if (excludePattern == null) {
+ builder.clearExcludePattern();
+ return;
+ }
+ builder.setExcludePattern(excludePattern);
+ }
+
+ @Override
+ public long getRollingIntervalSeconds() {
+ LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (! p.hasRollingIntervalSeconds()) {
+ return -1;
+ }
+ return p.getRollingIntervalSeconds();
+ }
+
+ @Override
+ public void setRollingIntervalSeconds(long rollingIntervalSeconds) {
+ maybeInitBuilder();
+ builder.setRollingIntervalSeconds(rollingIntervalSeconds);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
index c6572e9f387..c463452a0c2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
@@ -178,6 +178,7 @@ public class TestPBImplRecords {
"http", "localhost", 8080, "file0"));
typeValueCache.put(SerializedException.class,
SerializedException.newInstance(new IOException("exception for test")));
+ generateByNewInstance(LogAggregationContext.class);
generateByNewInstance(ApplicationId.class);
generateByNewInstance(ApplicationAttemptId.class);
generateByNewInstance(ContainerId.class);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index b9feacb6e8e..b4dcf1f2d87 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -422,7 +422,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
@VisibleForTesting
@Private
public void removeCompletedContainersFromContext(
- ListcontainerIds) throws IOException {
+ List containerIds) throws IOException {
Set removedContainers = new HashSet();
// If the AM has pulled the completedContainer it can be removed
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 825c3985c77..b9966e7f551 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -171,6 +171,33 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
+ priority + "; currentReservation " + currentReservation);
}
+ @Override
+ public synchronized Resource getHeadroom() {
+ final FSQueue queue = (FSQueue) this.queue;
+ SchedulingPolicy policy = queue.getPolicy();
+
+ Resource queueFairShare = queue.getFairShare();
+ Resource queueUsage = queue.getResourceUsage();
+ Resource clusterResource = this.scheduler.getClusterResource();
+ Resource clusterUsage = this.scheduler.getRootQueueMetrics()
+ .getAllocatedResources();
+ Resource clusterAvailableResource = Resources.subtract(clusterResource,
+ clusterUsage);
+ Resource headroom = policy.getHeadroom(queueFairShare,
+ queueUsage, clusterAvailableResource);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Headroom calculation for " + this.getName() + ":" +
+ "Min(" +
+ "(queueFairShare=" + queueFairShare +
+ " - queueUsage=" + queueUsage + ")," +
+ " clusterAvailableResource=" + clusterAvailableResource +
+ "(clusterResource=" + clusterResource +
+ " - clusterUsage=" + clusterUsage + ")" +
+ "Headroom=" + headroom);
+ }
+ return headroom;
+ }
+
public synchronized float getLocalityWaitFactor(
Priority priority, int clusterNodes) {
// Estimate: Required unique resources (i.e. hosts + racks)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
index ca006c580ed..4f3123dffdd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
@@ -175,4 +175,19 @@ public abstract class SchedulingPolicy {
*/
public abstract boolean checkIfAMResourceUsageOverLimit(
Resource usage, Resource maxAMResource);
+
+ /**
+ * Get headroom by calculating the min of clusterAvailable
and
+ * (queueFairShare
- queueUsage
) resources that are
+ * applicable to this policy. For eg if only memory then leave other
+ * resources such as CPU to same as clusterAvailable.
+ *
+ * @param queueFairShare fairshare in the queue
+ * @param queueUsage resources used in the queue
+ * @param clusterAvailable available resource in cluster
+ * @return calculated headroom
+ */
+ public abstract Resource getHeadroom(Resource queueFairShare,
+ Resource queueUsage, Resource clusterAvailable);
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
index 42044bcaac1..3f6cbd19adb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
@@ -77,7 +77,7 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
ComputeFairShares.computeSteadyShares(queues, totalResources, type);
}
}
-
+
@Override
public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
return !Resources.fitsIn(usage, fairShare);
@@ -88,6 +88,21 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
return !Resources.fitsIn(usage, maxAMResource);
}
+ @Override
+ public Resource getHeadroom(Resource queueFairShare, Resource queueUsage,
+ Resource clusterAvailable) {
+ int queueAvailableMemory =
+ Math.max(queueFairShare.getMemory() - queueUsage.getMemory(), 0);
+ int queueAvailableCPU =
+ Math.max(queueFairShare.getVirtualCores() - queueUsage
+ .getVirtualCores(), 0);
+ Resource headroom = Resources.createResource(
+ Math.min(clusterAvailable.getMemory(), queueAvailableMemory),
+ Math.min(clusterAvailable.getVirtualCores(),
+ queueAvailableCPU));
+ return headroom;
+ }
+
@Override
public void initialize(Resource clusterCapacity) {
comparator.setClusterCapacity(clusterCapacity);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
index 66bb88bf16c..97669cb4e27 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
@@ -114,6 +114,17 @@ public class FairSharePolicy extends SchedulingPolicy {
return comparator;
}
+ @Override
+ public Resource getHeadroom(Resource queueFairShare,
+ Resource queueUsage, Resource clusterAvailable) {
+ int queueAvailableMemory = Math.max(
+ queueFairShare.getMemory() - queueUsage.getMemory(), 0);
+ Resource headroom = Resources.createResource(
+ Math.min(clusterAvailable.getMemory(), queueAvailableMemory),
+ clusterAvailable.getVirtualCores());
+ return headroom;
+ }
+
@Override
public void computeShares(Collection extends Schedulable> schedulables,
Resource totalResources) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
index 591ee4936b9..a2e17ecb0a5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
@@ -107,6 +107,18 @@ public class FifoPolicy extends SchedulingPolicy {
return usage.getMemory() > maxAMResource.getMemory();
}
+ @Override
+ public Resource getHeadroom(Resource queueFairShare,
+ Resource queueUsage, Resource clusterAvailable) {
+ int queueAvailableMemory = Math.max(
+ queueFairShare.getMemory() - queueUsage.getMemory(), 0);
+ Resource headroom = Resources.createResource(
+ Math.min(clusterAvailable.getMemory(), queueAvailableMemory),
+ clusterAvailable.getVirtualCores());
+ return headroom;
+ }
+
+
@Override
public byte getApplicableDepth() {
return SchedulingPolicy.DEPTH_LEAF;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
index 0ab1f70147b..f560690d935 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import static org.junit.Assert.assertEquals;
@@ -26,7 +27,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@@ -185,4 +191,61 @@ public class TestFSAppAttempt extends FairSchedulerTestBase {
assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel(
prio, 10, -1.0, -1.0));
}
+
+ @Test
+ public void testHeadroom() {
+ final FairScheduler mockScheduler = Mockito.mock(FairScheduler.class);
+ Mockito.when(mockScheduler.getClock()).thenReturn(scheduler.getClock());
+
+ final FSLeafQueue mockQueue = Mockito.mock(FSLeafQueue.class);
+ final Resource queueFairShare = Resources.createResource(4096, 4);
+ final Resource queueUsage = Resource.newInstance(1024, 1);
+ final Resource clusterResource = Resources.createResource(8192, 8);
+ final Resource clusterUsage = Resources.createResource(6144, 2);
+ final QueueMetrics fakeRootQueueMetrics = Mockito.mock(QueueMetrics.class);
+
+ ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
+ RMContext rmContext = resourceManager.getRMContext();
+ FSAppAttempt schedulerApp =
+ new FSAppAttempt(mockScheduler, applicationAttemptId, "user1", mockQueue ,
+ null, rmContext);
+
+ Mockito.when(mockQueue.getFairShare()).thenReturn(queueFairShare);
+ Mockito.when(mockQueue.getResourceUsage()).thenReturn(queueUsage);
+ Mockito.when(mockScheduler.getClusterResource()).thenReturn
+ (clusterResource);
+ Mockito.when(fakeRootQueueMetrics.getAllocatedResources()).thenReturn
+ (clusterUsage);
+ Mockito.when(mockScheduler.getRootQueueMetrics()).thenReturn
+ (fakeRootQueueMetrics);
+
+ int minClusterAvailableMemory = 2048;
+ int minClusterAvailableCPU = 6;
+ int minQueueAvailableCPU = 3;
+
+ // Min of Memory and CPU across cluster and queue is used in
+ // DominantResourceFairnessPolicy
+ Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy
+ .getInstance(DominantResourceFairnessPolicy.class));
+ verifyHeadroom(schedulerApp, minClusterAvailableMemory,
+ minQueueAvailableCPU);
+
+ // Fair and Fifo ignore CPU of queue, so use cluster available CPU
+ Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy
+ .getInstance(FairSharePolicy.class));
+ verifyHeadroom(schedulerApp, minClusterAvailableMemory,
+ minClusterAvailableCPU);
+
+ Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy
+ .getInstance(FifoPolicy.class));
+ verifyHeadroom(schedulerApp, minClusterAvailableMemory,
+ minClusterAvailableCPU);
+ }
+
+ protected void verifyHeadroom(FSAppAttempt schedulerApp,
+ int expectedMemory, int expectedCPU) {
+ Resource headroom = schedulerApp.getHeadroom();
+ assertEquals(expectedMemory, headroom.getMemory());
+ assertEquals(expectedCPU, headroom.getVirtualCores());
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index a6e928a4f4d..67164c6c0f5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -131,8 +131,14 @@ public class TestFairScheduler extends FairSchedulerTestBase {
@After
public void tearDown() {
- scheduler = null;
- resourceManager = null;
+ if (scheduler != null) {
+ scheduler.stop();
+ scheduler = null;
+ }
+ if (resourceManager != null) {
+ resourceManager.stop();
+ resourceManager = null;
+ }
QueueMetrics.clearQueueMetrics();
DefaultMetricsSystem.shutdown();
}
@@ -140,7 +146,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
@Test (timeout = 30000)
public void testConfValidation() throws Exception {
- FairScheduler scheduler = new FairScheduler();
+ scheduler = new FairScheduler();
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
@@ -212,7 +218,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
@Test
public void testNonMinZeroResourcesSettings() throws IOException {
- FairScheduler fs = new FairScheduler();
+ scheduler = new FairScheduler();
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 256);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 1);
@@ -220,17 +226,17 @@ public class TestFairScheduler extends FairSchedulerTestBase {
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
conf.setInt(
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
- fs.init(conf);
- fs.reinitialize(conf, null);
- Assert.assertEquals(256, fs.getMinimumResourceCapability().getMemory());
- Assert.assertEquals(1, fs.getMinimumResourceCapability().getVirtualCores());
- Assert.assertEquals(512, fs.getIncrementResourceCapability().getMemory());
- Assert.assertEquals(2, fs.getIncrementResourceCapability().getVirtualCores());
+ scheduler.init(conf);
+ scheduler.reinitialize(conf, null);
+ Assert.assertEquals(256, scheduler.getMinimumResourceCapability().getMemory());
+ Assert.assertEquals(1, scheduler.getMinimumResourceCapability().getVirtualCores());
+ Assert.assertEquals(512, scheduler.getIncrementResourceCapability().getMemory());
+ Assert.assertEquals(2, scheduler.getIncrementResourceCapability().getVirtualCores());
}
@Test
public void testMinZeroResourcesSettings() throws IOException {
- FairScheduler fs = new FairScheduler();
+ scheduler = new FairScheduler();
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 0);
@@ -238,12 +244,12 @@ public class TestFairScheduler extends FairSchedulerTestBase {
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
conf.setInt(
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
- fs.init(conf);
- fs.reinitialize(conf, null);
- Assert.assertEquals(0, fs.getMinimumResourceCapability().getMemory());
- Assert.assertEquals(0, fs.getMinimumResourceCapability().getVirtualCores());
- Assert.assertEquals(512, fs.getIncrementResourceCapability().getMemory());
- Assert.assertEquals(2, fs.getIncrementResourceCapability().getVirtualCores());
+ scheduler.init(conf);
+ scheduler.reinitialize(conf, null);
+ Assert.assertEquals(0, scheduler.getMinimumResourceCapability().getMemory());
+ Assert.assertEquals(0, scheduler.getMinimumResourceCapability().getVirtualCores());
+ Assert.assertEquals(512, scheduler.getIncrementResourceCapability().getMemory());
+ Assert.assertEquals(2, scheduler.getIncrementResourceCapability().getVirtualCores());
}
@Test
@@ -3293,49 +3299,49 @@ public class TestFairScheduler extends FairSchedulerTestBase {
@Test (timeout = 10000)
public void testContinuousScheduling() throws Exception {
// set continuous scheduling enabled
- FairScheduler fs = new FairScheduler();
+ scheduler = new FairScheduler();
Configuration conf = createConfiguration();
conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED,
true);
- fs.setRMContext(resourceManager.getRMContext());
- fs.init(conf);
- fs.start();
- fs.reinitialize(conf, resourceManager.getRMContext());
+ scheduler.setRMContext(resourceManager.getRMContext());
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
Assert.assertTrue("Continuous scheduling should be enabled.",
- fs.isContinuousSchedulingEnabled());
+ scheduler.isContinuousSchedulingEnabled());
// Add two nodes
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
- fs.handle(nodeEvent1);
+ scheduler.handle(nodeEvent1);
RMNode node2 =
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2,
"127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
- fs.handle(nodeEvent2);
+ scheduler.handle(nodeEvent2);
// available resource
- Assert.assertEquals(fs.getClusterResource().getMemory(), 16 * 1024);
- Assert.assertEquals(fs.getClusterResource().getVirtualCores(), 16);
+ Assert.assertEquals(scheduler.getClusterResource().getMemory(), 16 * 1024);
+ Assert.assertEquals(scheduler.getClusterResource().getVirtualCores(), 16);
// send application request
ApplicationAttemptId appAttemptId =
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
- fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false);
- fs.addApplicationAttempt(appAttemptId, false, false);
+ scheduler.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false);
+ scheduler.addApplicationAttempt(appAttemptId, false, false);
List ask = new ArrayList();
ResourceRequest request =
createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
ask.add(request);
- fs.allocate(appAttemptId, ask, new ArrayList(), null, null);
+ scheduler.allocate(appAttemptId, ask, new ArrayList(), null, null);
// waiting for continuous_scheduler_sleep_time
// at least one pass
- Thread.sleep(fs.getConf().getContinuousSchedulingSleepMs() + 500);
+ Thread.sleep(scheduler.getConf().getContinuousSchedulingSleepMs() + 500);
- FSAppAttempt app = fs.getSchedulerApp(appAttemptId);
+ FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
// Wait until app gets resources.
while (app.getCurrentConsumption().equals(Resources.none())) { }
@@ -3348,7 +3354,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true);
ask.clear();
ask.add(request);
- fs.allocate(appAttemptId, ask, new ArrayList(), null, null);
+ scheduler.allocate(appAttemptId, ask, new ArrayList(), null, null);
// Wait until app gets resources
while (app.getCurrentConsumption()
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
index 9bb44ca54f2..3f82d72abc6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
@@ -27,10 +27,8 @@ import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.List;
import java.util.LinkedList;
-import com.google.common.io.ByteArrayDataInput;
-import com.google.common.io.ByteStreams;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -52,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -80,6 +79,9 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteStreams;
+
@RunWith(Parameterized.class)
public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
@@ -137,7 +139,7 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
this.conf = conf;
}
- @Test (timeout = 1000000)
+ @Test (timeout = 120000)
public void testContainerManager() throws Exception {
try {
yarnCluster = new MiniYARNCluster(TestContainerManagerSecurity.class
@@ -162,7 +164,7 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
}
}
- @Test (timeout = 500000)
+ @Test (timeout = 120000)
public void testContainerManagerWithEpoch() throws Exception {
try {
yarnCluster = new MiniYARNCluster(TestContainerManagerSecurity.class
@@ -311,7 +313,7 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
// trying to stop the container. It should not throw any exception.
testStopContainer(rpc, validAppAttemptId, validNode, validContainerId,
validNMToken, false);
-
+
// Rolling over master key twice so that we can check whether older keys
// are used for authentication.
rollNMTokenMasterKey(nmTokenSecretManagerRM, nmTokenSecretManagerNM);
@@ -326,7 +328,7 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
sb.append(" was recently stopped on node manager");
Assert.assertTrue(testGetContainer(rpc, validAppAttemptId, validNode,
validContainerId, validNMToken, true).contains(sb.toString()));
-
+
// Now lets remove the container from nm-memory
nm.getNodeStatusUpdater().clearFinishedContainersFromCache();
@@ -355,14 +357,22 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
private void waitForContainerToFinishOnNM(ContainerId containerId) {
Context nmContet = yarnCluster.getNodeManager(0).getNMContext();
int interval = 4 * 60; // Max time for container token to expire.
+ Assert.assertNotNull(nmContet.getContainers().containsKey(containerId));
while ((interval-- > 0)
- && nmContet.getContainers().containsKey(containerId)) {
+ && !nmContet.getContainers().get(containerId)
+ .cloneAndGetContainerStatus().getState()
+ .equals(ContainerState.COMPLETE)) {
try {
+ LOG.info("Waiting for " + containerId + " to complete.");
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
- Assert.assertFalse(nmContet.getContainers().containsKey(containerId));
+ // Normally, Containers will be removed from NM context after they are
+ // explicitly acked by RM. Now, manually remove it for testing.
+ yarnCluster.getNodeManager(0).getNodeStatusUpdater()
+ .addCompletedContainer(containerId);
+ nmContet.getContainers().remove(containerId);
}
protected void waitForNMToReceiveNMTokenKey(