Merge branch 'trunk' into HDFS-6584

This commit is contained in:
Tsz-Wo Nicholas Sze 2014-09-11 15:23:38 +08:00
commit 70dfe9cfab
129 changed files with 3445 additions and 225 deletions

View File

@ -509,6 +509,8 @@ Release 2.6.0 - UNRELEASED
HADOOP-11057. checknative command to probe for winutils.exe on windows.
(Xiaoyu Yao via cnauroth)
HADOOP-10758. KMS: add ACLs on per key basis. (tucu)
OPTIMIZATIONS
HADOOP-10838. Byte array native checksumming. (James Thomas via todd)
@ -777,6 +779,11 @@ Release 2.6.0 - UNRELEASED
HADOOP-10925. Compilation fails in native link0 function on Windows.
(cnauroth)
HADOOP-11077. NPE if hosts not specified in ProxyUsers. (gchanan via tucu)
HADOOP-9989. Bug introduced in HADOOP-9374, which parses the -tokenCacheFile
as binary file but set it to the configuration as JSON file. (zxu via tucu)
Release 2.5.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -123,7 +123,7 @@ public void authorize(UserGroupInformation user,
MachineList MachineList = proxyHosts.get(
getProxySuperuserIpConfKey(realUser.getShortUserName()));
if(!MachineList.includes(remoteAddress)) {
if(MachineList == null || !MachineList.includes(remoteAddress)) {
throw new AuthorizationException("Unauthorized connection for super-user: "
+ realUser.getUserName() + " from IP " + remoteAddress);
}

View File

@ -332,7 +332,7 @@ private void processGeneralOptions(Configuration conf,
}
UserGroupInformation.getCurrentUser().addCredentials(
Credentials.readTokenStorageFile(p, conf));
conf.set("mapreduce.job.credentials.json", p.toString(),
conf.set("mapreduce.job.credentials.binary", p.toString(),
"from -tokenCacheFile command line option");
}

View File

@ -15,7 +15,5 @@
org.apache.hadoop.fs.LocalFileSystem
org.apache.hadoop.fs.viewfs.ViewFileSystem
org.apache.hadoop.fs.s3.S3FileSystem
org.apache.hadoop.fs.s3native.NativeS3FileSystem
org.apache.hadoop.fs.ftp.FTPFileSystem
org.apache.hadoop.fs.HarFileSystem

View File

@ -478,6 +478,21 @@ public void testProxyUsersWithCustomPrefix() throws Exception {
assertNotAuthorized(proxyUserUgi, "1.2.3.5");
}
@Test
public void testNoHostsForUsers() throws Exception {
Configuration conf = new Configuration(false);
conf.set("y." + REAL_USER_NAME + ".users",
StringUtils.join(",", Arrays.asList(AUTHORIZED_PROXY_USER_NAME)));
ProxyUsers.refreshSuperUserGroupsConfiguration(conf, "y");
UserGroupInformation realUserUgi = UserGroupInformation
.createRemoteUser(REAL_USER_NAME);
UserGroupInformation proxyUserUgi = UserGroupInformation.createProxyUserForTesting(
AUTHORIZED_PROXY_USER_NAME, realUserUgi, GROUP_NAMES);
// IP doesn't matter
assertNotAuthorized(proxyUserUgi, "1.2.3.4");
}
private void assertNotAuthorized(UserGroupInformation proxyUgi, String host) {
try {

View File

@ -249,7 +249,7 @@ public void testTokenCacheOption() throws IOException {
creds.writeTokenStorageFile(tmpPath, conf);
new GenericOptionsParser(conf, args);
String fileName = conf.get("mapreduce.job.credentials.json");
String fileName = conf.get("mapreduce.job.credentials.binary");
assertNotNull("files is null", fileName);
assertEquals("files option does not match", tmpPath.toString(), fileName);

View File

@ -94,4 +94,42 @@
ACL for decrypt EncryptedKey CryptoExtension operations
</description>
</property>
<property>
<name>default.key.acl.MANAGEMENT</name>
<value>*</value>
<description>
default ACL for MANAGEMENT operations for all key acls that are not
explicitly defined.
</description>
</property>
<property>
<name>default.key.acl.GENERATE_EEK</name>
<value>*</value>
<description>
default ACL for GENERATE_EEK operations for all key acls that are not
explicitly defined.
</description>
</property>
<property>
<name>default.key.acl.DECRYPT_EEK</name>
<value>*</value>
<description>
default ACL for DECRYPT_EEK operations for all key acls that are not
explicitly defined.
</description>
</property>
<property>
<name>default.key.acl.READ</name>
<value>*</value>
<description>
default ACL for READ operations for all key acls that are not
explicitly defined.
</description>
</property>
</configuration>

View File

@ -20,6 +20,8 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.kms.server.KMS.KMSOp;
import org.apache.hadoop.crypto.key.kms.server.KeyAuthorizationKeyProvider.KeyACLs;
import org.apache.hadoop.crypto.key.kms.server.KeyAuthorizationKeyProvider.KeyOpType;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
@ -32,6 +34,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
/**
* Provides access to the <code>AccessControlList</code>s used by KMS,
@ -39,7 +42,7 @@
* are defined has been updated.
*/
@InterfaceAudience.Private
public class KMSACLs implements Runnable {
public class KMSACLs implements Runnable, KeyACLs {
private static final Logger LOG = LoggerFactory.getLogger(KMSACLs.class);
private static final String UNAUTHORIZED_MSG_WITH_KEY =
@ -67,6 +70,9 @@ public String getBlacklistConfigKey() {
private volatile Map<Type, AccessControlList> acls;
private volatile Map<Type, AccessControlList> blacklistedAcls;
private volatile Map<String, HashMap<KeyOpType, AccessControlList>> keyAcls;
private final Map<KeyOpType, AccessControlList> defaultKeyAcls =
new HashMap<KeyOpType, AccessControlList>();
private ScheduledExecutorService executorService;
private long lastReload;
@ -74,14 +80,15 @@ public String getBlacklistConfigKey() {
if (conf == null) {
conf = loadACLs();
}
setACLs(conf);
setKMSACLs(conf);
setKeyACLs(conf);
}
public KMSACLs() {
this(null);
}
private void setACLs(Configuration conf) {
private void setKMSACLs(Configuration conf) {
Map<Type, AccessControlList> tempAcls = new HashMap<Type, AccessControlList>();
Map<Type, AccessControlList> tempBlacklist = new HashMap<Type, AccessControlList>();
for (Type aclType : Type.values()) {
@ -99,14 +106,69 @@ private void setACLs(Configuration conf) {
blacklistedAcls = tempBlacklist;
}
private void setKeyACLs(Configuration conf) {
Map<String, HashMap<KeyOpType, AccessControlList>> tempKeyAcls =
new HashMap<String, HashMap<KeyOpType,AccessControlList>>();
Map<String, String> allKeyACLS =
conf.getValByRegex(Pattern.quote(KMSConfiguration.KEY_ACL_PREFIX));
for (Map.Entry<String, String> keyAcl : allKeyACLS.entrySet()) {
String k = keyAcl.getKey();
// this should be of type "key.acl.<KEY_NAME>.<OP_TYPE>"
int keyNameStarts = KMSConfiguration.KEY_ACL_PREFIX.length();
int keyNameEnds = k.lastIndexOf(".");
if (keyNameStarts >= keyNameEnds) {
LOG.warn("Invalid key name '{}'", k);
} else {
String aclStr = keyAcl.getValue();
String keyName = k.substring(keyNameStarts, keyNameEnds);
String keyOp = k.substring(keyNameEnds + 1);
KeyOpType aclType = null;
try {
aclType = KeyOpType.valueOf(keyOp);
} catch (IllegalArgumentException e) {
LOG.warn("Invalid key Operation '{}'", keyOp);
}
if (aclType != null) {
// On the assumption this will be single threaded.. else we need to
// ConcurrentHashMap
HashMap<KeyOpType,AccessControlList> aclMap =
tempKeyAcls.get(keyName);
if (aclMap == null) {
aclMap = new HashMap<KeyOpType, AccessControlList>();
tempKeyAcls.put(keyName, aclMap);
}
aclMap.put(aclType, new AccessControlList(aclStr));
LOG.info("KEY_NAME '{}' KEY_OP '{}' ACL '{}'",
keyName, aclType, aclStr);
}
}
}
keyAcls = tempKeyAcls;
for (KeyOpType keyOp : KeyOpType.values()) {
if (!defaultKeyAcls.containsKey(keyOp)) {
String confKey = KMSConfiguration.DEFAULT_KEY_ACL_PREFIX + keyOp;
String aclStr = conf.get(confKey);
if (aclStr != null) {
if (aclStr.equals("*")) {
LOG.info("Default Key ACL for KEY_OP '{}' is set to '*'", keyOp);
}
defaultKeyAcls.put(keyOp, new AccessControlList(aclStr));
}
}
}
}
@Override
public void run() {
try {
if (KMSConfiguration.isACLsFileNewer(lastReload)) {
setACLs(loadACLs());
setKMSACLs(loadACLs());
setKeyACLs(loadACLs());
}
} catch (Exception ex) {
LOG.warn("Could not reload ACLs file: " + ex.toString(), ex);
LOG.warn(
String.format("Could not reload ACLs file: '%s'", ex.toString()), ex);
}
}
@ -164,4 +226,29 @@ public void assertAccess(KMSACLs.Type aclType,
}
}
@Override
public boolean hasAccessToKey(String keyName, UserGroupInformation ugi,
KeyOpType opType) {
Map<KeyOpType, AccessControlList> keyAcl = keyAcls.get(keyName);
if (keyAcl == null) {
// Get KeyAcl map of DEFAULT KEY.
keyAcl = defaultKeyAcls;
}
// If No key acl defined for this key, check to see if
// there are key defaults configured for this operation
AccessControlList acl = keyAcl.get(opType);
if (acl == null) {
// If no acl is specified for this operation,
// deny access
return false;
} else {
return acl.isUserAllowed(ugi);
}
}
@Override
public boolean isACLPresent(String keyName, KeyOpType opType) {
return (keyAcls.containsKey(keyName) || defaultKeyAcls.containsKey(opType));
}
}

View File

@ -36,6 +36,9 @@ public class KMSConfiguration {
public static final String CONFIG_PREFIX = "hadoop.kms.";
public static final String KEY_ACL_PREFIX = "key.acl.";
public static final String DEFAULT_KEY_ACL_PREFIX = "default.key.acl.";
// Property to Enable/Disable Caching
public static final String KEY_CACHE_ENABLE = CONFIG_PREFIX +
"cache.enable";
@ -57,6 +60,12 @@ public class KMSConfiguration {
// 10 secs
public static final long KMS_AUDIT_AGGREGATION_DELAY_DEFAULT = 10000;
// Property to Enable/Disable per Key authorization
public static final String KEY_AUTHORIZATION_ENABLE = CONFIG_PREFIX +
"key.authorization.enable";
public static final boolean KEY_AUTHORIZATION_ENABLE_DEFAULT = true;
static Configuration getConfiguration(boolean loadHadoopDefaults,
String ... resources) {
Configuration conf = new Configuration(loadHadoopDefaults);

View File

@ -68,7 +68,7 @@ public class KMSWebApp implements ServletContextListener {
private JmxReporter jmxReporter;
private static Configuration kmsConf;
private static KMSACLs acls;
private static KMSACLs kmsAcls;
private static Meter adminCallsMeter;
private static Meter keyCallsMeter;
private static Meter unauthorizedCallsMeter;
@ -126,8 +126,8 @@ public void contextInitialized(ServletContextEvent sce) {
LOG.info(" KMS Hadoop Version: " + VersionInfo.getVersion());
LOG.info("-------------------------------------------------------------");
acls = new KMSACLs();
acls.startReloader();
kmsAcls = new KMSACLs();
kmsAcls.startReloader();
metricRegistry = new MetricRegistry();
jmxReporter = JmxReporter.forRegistry(metricRegistry).build();
@ -188,6 +188,13 @@ public void contextInitialized(ServletContextEvent sce) {
keyProviderCryptoExtension =
new EagerKeyGeneratorKeyProviderCryptoExtension(kmsConf,
keyProviderCryptoExtension);
if (kmsConf.getBoolean(KMSConfiguration.KEY_AUTHORIZATION_ENABLE,
KMSConfiguration.KEY_AUTHORIZATION_ENABLE_DEFAULT)) {
keyProviderCryptoExtension =
new KeyAuthorizationKeyProvider(
keyProviderCryptoExtension, kmsAcls);
}
LOG.info("Initialized KeyProviderCryptoExtension "
+ keyProviderCryptoExtension);
final int defaultBitlength = kmsConf
@ -213,7 +220,7 @@ public void contextInitialized(ServletContextEvent sce) {
@Override
public void contextDestroyed(ServletContextEvent sce) {
kmsAudit.shutdown();
acls.stopReloader();
kmsAcls.stopReloader();
jmxReporter.stop();
jmxReporter.close();
metricRegistry = null;
@ -225,7 +232,7 @@ public static Configuration getConfiguration() {
}
public static KMSACLs getACLs() {
return acls;
return kmsAcls;
}
public static Meter getAdminCallsMeter() {

View File

@ -0,0 +1,276 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.crypto.key.kms.server;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
/**
* A {@link KeyProvider} proxy that checks whether the current user derived via
* {@link UserGroupInformation}, is authorized to perform the following
* type of operations on a Key :
* <ol>
* <li>MANAGEMENT operations : createKey, rollNewVersion, deleteKey</li>
* <li>GENERATE_EEK operations : generateEncryptedKey, warmUpEncryptedKeys</li>
* <li>DECRYPT_EEK operation : decryptEncryptedKey</li>
* <li>READ operations : getKeyVersion, getKeyVersions, getMetadata,
* getKeysMetadata, getCurrentKey</li>
* </ol>
* The read operations (getCurrentKeyVersion / getMetadata) etc are not checked.
*/
public class KeyAuthorizationKeyProvider extends KeyProviderCryptoExtension {
public static final String KEY_ACL = "key.acl.";
private static final String KEY_ACL_NAME = KEY_ACL + "name";
public enum KeyOpType {
ALL, READ, MANAGEMENT, GENERATE_EEK, DECRYPT_EEK;
}
/**
* Interface that needs to be implemented by a client of the
* <code>KeyAuthorizationKeyProvider</code>.
*/
public static interface KeyACLs {
/**
* This is called by the KeyProvider to check if the given user is
* authorized to perform the specified operation on the given acl name.
* @param aclName name of the key ACL
* @param ugi User's UserGroupInformation
* @param opType Operation Type
* @return true if user has access to the aclName and opType else false
*/
public boolean hasAccessToKey(String aclName, UserGroupInformation ugi,
KeyOpType opType);
/**
*
* @param aclName ACL name
* @param opType Operation Type
* @return true if AclName exists else false
*/
public boolean isACLPresent(String aclName, KeyOpType opType);
}
private final KeyProviderCryptoExtension provider;
private final KeyACLs acls;
/**
* The constructor takes a {@link KeyProviderCryptoExtension} and an
* implementation of <code>KeyACLs</code>. All calls are delegated to the
* provider keyProvider after authorization check (if required)
* @param keyProvider
* @param acls
*/
public KeyAuthorizationKeyProvider(KeyProviderCryptoExtension keyProvider,
KeyACLs acls) {
super(keyProvider, null);
this.provider = keyProvider;
this.acls = acls;
}
// This method first checks if "key.acl.name" attribute is present as an
// attribute in the provider Options. If yes, use the aclName for any
// subsequent access checks, else use the keyName as the aclName and set it
// as the value of the "key.acl.name" in the key's metadata.
private void authorizeCreateKey(String keyName, Options options,
UserGroupInformation ugi) throws IOException{
Preconditions.checkNotNull(ugi, "UserGroupInformation cannot be null");
Map<String, String> attributes = options.getAttributes();
String aclName = attributes.get(KEY_ACL_NAME);
boolean success = false;
if (Strings.isNullOrEmpty(aclName)) {
if (acls.isACLPresent(keyName, KeyOpType.MANAGEMENT)) {
options.setAttributes(ImmutableMap.<String, String> builder()
.putAll(attributes).put(KEY_ACL_NAME, keyName).build());
success =
acls.hasAccessToKey(keyName, ugi, KeyOpType.MANAGEMENT)
|| acls.hasAccessToKey(keyName, ugi, KeyOpType.ALL);
} else {
success = false;
}
} else {
success = acls.isACLPresent(aclName, KeyOpType.MANAGEMENT) &&
(acls.hasAccessToKey(aclName, ugi, KeyOpType.MANAGEMENT)
|| acls.hasAccessToKey(aclName, ugi, KeyOpType.ALL));
}
if (!success)
throw new AuthorizationException(String.format("User [%s] is not"
+ " authorized to create key !!", ugi.getShortUserName()));
}
private void checkAccess(String aclName, UserGroupInformation ugi,
KeyOpType opType) throws AuthorizationException {
Preconditions.checkNotNull(aclName, "Key ACL name cannot be null");
Preconditions.checkNotNull(ugi, "UserGroupInformation cannot be null");
if (acls.isACLPresent(aclName, KeyOpType.MANAGEMENT) &&
(acls.hasAccessToKey(aclName, ugi, opType)
|| acls.hasAccessToKey(aclName, ugi, KeyOpType.ALL))) {
return;
} else {
throw new AuthorizationException(String.format("User [%s] is not"
+ " authorized to perform [%s] on key with ACL name [%s]!!",
ugi.getShortUserName(), opType, aclName));
}
}
@Override
public KeyVersion createKey(String name, Options options)
throws NoSuchAlgorithmException, IOException {
authorizeCreateKey(name, options, getUser());
return provider.createKey(name, options);
}
@Override
public KeyVersion createKey(String name, byte[] material, Options options)
throws IOException {
authorizeCreateKey(name, options, getUser());
return provider.createKey(name, material, options);
}
@Override
public KeyVersion rollNewVersion(String name)
throws NoSuchAlgorithmException, IOException {
doAccessCheck(name, KeyOpType.MANAGEMENT);
return provider.rollNewVersion(name);
}
@Override
public void deleteKey(String name) throws IOException {
doAccessCheck(name, KeyOpType.MANAGEMENT);
provider.deleteKey(name);
}
@Override
public KeyVersion rollNewVersion(String name, byte[] material)
throws IOException {
doAccessCheck(name, KeyOpType.MANAGEMENT);
return provider.rollNewVersion(name, material);
}
@Override
public void warmUpEncryptedKeys(String... names) throws IOException {
for (String name : names) {
doAccessCheck(name, KeyOpType.GENERATE_EEK);
}
provider.warmUpEncryptedKeys(names);
}
@Override
public EncryptedKeyVersion generateEncryptedKey(String encryptionKeyName)
throws IOException, GeneralSecurityException {
doAccessCheck(encryptionKeyName, KeyOpType.GENERATE_EEK);
return provider.generateEncryptedKey(encryptionKeyName);
}
@Override
public KeyVersion decryptEncryptedKey(EncryptedKeyVersion encryptedKeyVersion)
throws IOException, GeneralSecurityException {
doAccessCheck(
encryptedKeyVersion.getEncryptionKeyName(), KeyOpType.DECRYPT_EEK);
return provider.decryptEncryptedKey(encryptedKeyVersion);
}
@Override
public KeyVersion getKeyVersion(String versionName) throws IOException {
KeyVersion keyVersion = provider.getKeyVersion(versionName);
if (keyVersion != null) {
doAccessCheck(keyVersion.getName(), KeyOpType.READ);
}
return keyVersion;
}
@Override
public List<String> getKeys() throws IOException {
return provider.getKeys();
}
@Override
public List<KeyVersion> getKeyVersions(String name) throws IOException {
doAccessCheck(name, KeyOpType.READ);
return provider.getKeyVersions(name);
}
@Override
public Metadata getMetadata(String name) throws IOException {
doAccessCheck(name, KeyOpType.READ);
return provider.getMetadata(name);
}
@Override
public Metadata[] getKeysMetadata(String... names) throws IOException {
for (String name : names) {
doAccessCheck(name, KeyOpType.READ);
}
return provider.getKeysMetadata(names);
}
@Override
public KeyVersion getCurrentKey(String name) throws IOException {
doAccessCheck(name, KeyOpType.READ);
return provider.getCurrentKey(name);
}
@Override
public void flush() throws IOException {
provider.flush();
}
@Override
public boolean isTransient() {
return provider.isTransient();
}
private void doAccessCheck(String keyName, KeyOpType opType) throws
IOException {
Metadata metadata = provider.getMetadata(keyName);
if (metadata != null) {
String aclName = metadata.getAttributes().get(KEY_ACL_NAME);
checkAccess((aclName == null) ? keyName : aclName, getUser(), opType);
}
}
private UserGroupInformation getUser() throws IOException {
return UserGroupInformation.getCurrentUser();
}
@Override
protected KeyProvider getKeyProvider() {
return this;
}
@Override
public String toString() {
return provider.toString();
}
}

View File

@ -443,6 +443,112 @@ $ keytool -genkey -alias tomcat -keyalg RSA
+---+
*** Key Access Control
KMS supports access control for all non-read operations at the Key level.
All Key Access operations are classified as :
* MANAGEMENT - createKey, deleteKey, rolloverNewVersion
* GENERATE_EEK - generateEncryptedKey, warmUpEncryptedKeys
* DECRYPT_EEK - decryptEncryptedKey;
* READ - getKeyVersion, getKeyVersions, getMetadata, getKeysMetadata,
getCurrentKey;
* ALL - all of the above;
These can be defined in the KMS <<<etc/hadoop/kms-acls.xml>>> as follows
For all keys for which a key access has not been explicitly configured, It
is possible to configure a default key access control for a subset of the
operation types.
If no ACL is configured for a specific key AND no default ACL is configured
for the requested operation, then access will be DENIED.
<<NOTE:>> The default ACL does not support <<<ALL>>> operation qualifier.
+---+
<property>
<name>key.acl.testKey1.MANAGEMENT</name>
<value>*</value>
<description>
ACL for create-key, deleteKey and rolloverNewVersion operations.
</description>
</property>
<property>
<name>key.acl.testKey2.GENERATE_EEK</name>
<value>*</value>
<description>
ACL for generateEncryptedKey operations.
</description>
</property>
<property>
<name>key.acl.testKey3.DECRYPT_EEK</name>
<value>*</value>
<description>
ACL for decryptEncryptedKey operations.
</description>
</property>
<property>
<name>key.acl.testKey4.READ</name>
<value>*</value>
<description>
ACL for getKeyVersion, getKeyVersions, getMetadata, getKeysMetadata,
getCurrentKey operations
</description>
</property>
<property>
<name>key.acl.testKey5.ALL</name>
<value>*</value>
<description>
ACL for ALL operations.
</description>
</property>
<property>
<name>default.key.acl.MANAGEMENT</name>
<value>user1,user2</value>
<description>
default ACL for MANAGEMENT operations for all keys that are not
explicitly defined.
</description>
</property>
<property>
<name>default.key.acl.GENERATE_EEK</name>
<value>user1,user2</value>
<description>
default ACL for GENERATE_EEK operations for all keys that are not
explicitly defined.
</description>
</property>
<property>
<name>default.key.acl.DECRYPT_EEK</name>
<value>user1,user2</value>
<description>
default ACL for DECRYPT_EEK operations for all keys that are not
explicitly defined.
</description>
</property>
<property>
<name>default.key.acl.READ</name>
<value>user1,user2</value>
<description>
default ACL for READ operations for all keys that are not
explicitly defined.
</description>
</property>
+---+
** KMS Delegation Token Configuration
KMS delegation token secret manager can be configured with the following

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.crypto.key.kms.server;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.kms.server.KeyAuthorizationKeyProvider;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
import org.apache.hadoop.crypto.key.KeyProvider.Options;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
@ -338,6 +340,13 @@ public void testKMSProvider() throws Exception {
UserGroupInformation.setConfiguration(conf);
File confDir = getTestDir();
conf = createBaseKMSConf(confDir);
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k1.ALL", "*");
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k2.MANAGEMENT", "*");
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k2.READ", "*");
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k3.ALL", "*");
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k4.ALL", "*");
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k5.ALL", "*");
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k6.ALL", "*");
writeConf(confDir, conf);
runServer(null, null, confDir, new KMSCallable() {
@ -492,10 +501,20 @@ public Void call() throws Exception {
options = new KeyProvider.Options(conf);
options.setCipher("AES/CTR/NoPadding");
options.setBitLength(128);
kp.createKey("k2", options);
KeyVersion kVer2 = kp.createKey("k2", options);
KeyProvider.Metadata meta = kp.getMetadata("k2");
Assert.assertNull(meta.getDescription());
Assert.assertTrue(meta.getAttributes().isEmpty());
Assert.assertEquals("k2", meta.getAttributes().get("key.acl.name"));
// test key ACL.. k2 is granted only MANAGEMENT Op access
try {
kpExt =
KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp);
kpExt.generateEncryptedKey(kVer2.getName());
Assert.fail("User should not be allowed to encrypt !!");
} catch (Exception ex) {
//
}
// createKey() description, no tags
options = new KeyProvider.Options(conf);
@ -505,7 +524,7 @@ public Void call() throws Exception {
kp.createKey("k3", options);
meta = kp.getMetadata("k3");
Assert.assertEquals("d", meta.getDescription());
Assert.assertTrue(meta.getAttributes().isEmpty());
Assert.assertEquals("k3", meta.getAttributes().get("key.acl.name"));
Map<String, String> attributes = new HashMap<String, String>();
attributes.put("a", "A");
@ -514,6 +533,7 @@ public Void call() throws Exception {
options = new KeyProvider.Options(conf);
options.setCipher("AES/CTR/NoPadding");
options.setBitLength(128);
attributes.put("key.acl.name", "k4");
options.setAttributes(attributes);
kp.createKey("k4", options);
meta = kp.getMetadata("k4");
@ -525,6 +545,7 @@ public Void call() throws Exception {
options.setCipher("AES/CTR/NoPadding");
options.setBitLength(128);
options.setDescription("d");
attributes.put("key.acl.name", "k5");
options.setAttributes(attributes);
kp.createKey("k5", options);
meta = kp.getMetadata("k5");
@ -564,6 +585,201 @@ public Void call() throws Exception {
});
}
@Test
public void testKeyACLs() throws Exception {
Configuration conf = new Configuration();
conf.set("hadoop.security.authentication", "kerberos");
UserGroupInformation.setConfiguration(conf);
final File testDir = getTestDir();
conf = createBaseKMSConf(testDir);
conf.set("hadoop.kms.authentication.type", "kerberos");
conf.set("hadoop.kms.authentication.kerberos.keytab",
keytab.getAbsolutePath());
conf.set("hadoop.kms.authentication.kerberos.principal", "HTTP/localhost");
conf.set("hadoop.kms.authentication.kerberos.name.rules", "DEFAULT");
for (KMSACLs.Type type : KMSACLs.Type.values()) {
conf.set(type.getAclConfigKey(), type.toString());
}
conf.set(KMSACLs.Type.CREATE.getAclConfigKey(),"CREATE,ROLLOVER,GET,SET_KEY_MATERIAL,GENERATE_EEK");
conf.set(KMSACLs.Type.ROLLOVER.getAclConfigKey(),"CREATE,ROLLOVER,GET,SET_KEY_MATERIAL,GENERATE_EEK");
conf.set(KMSACLs.Type.GENERATE_EEK.getAclConfigKey(),"CREATE,ROLLOVER,GET,SET_KEY_MATERIAL,GENERATE_EEK");
conf.set(KMSACLs.Type.DECRYPT_EEK.getAclConfigKey(),"CREATE,ROLLOVER,GET,SET_KEY_MATERIAL,GENERATE_EEK");
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "test_key.MANAGEMENT", "CREATE");
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "all_access.ALL", "GENERATE_EEK");
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "all_access.DECRYPT_EEK", "ROLLOVER");
conf.set(KMSConfiguration.DEFAULT_KEY_ACL_PREFIX + "MANAGEMENT", "ROLLOVER");
writeConf(testDir, conf);
runServer(null, null, testDir, new KMSCallable() {
@Override
public Void call() throws Exception {
final Configuration conf = new Configuration();
conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
final URI uri = createKMSUri(getKMSUrl());
doAs("CREATE", new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
KeyProvider kp = new KMSClientProvider(uri, conf);
try {
Options options = new KeyProvider.Options(conf);
Map<String, String> attributes = options.getAttributes();
HashMap<String,String> newAttribs = new HashMap<String, String>(attributes);
newAttribs.put("key.acl.name", "test_key");
options.setAttributes(newAttribs);
KeyProvider.KeyVersion kv = kp.createKey("k0", options);
Assert.assertNull(kv.getMaterial());
KeyVersion rollVersion = kp.rollNewVersion("k0");
Assert.assertNull(rollVersion.getMaterial());
KeyProviderCryptoExtension kpce =
KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp);
try {
kpce.generateEncryptedKey("k0");
Assert.fail("User [CREATE] should not be allowed to generate_eek on k0");
} catch (Exception e) {
// Ignore
}
newAttribs = new HashMap<String, String>(attributes);
newAttribs.put("key.acl.name", "all_access");
options.setAttributes(newAttribs);
try {
kp.createKey("kx", options);
Assert.fail("User [CREATE] should not be allowed to create kx");
} catch (Exception e) {
// Ignore
}
} catch (Exception ex) {
Assert.fail(ex.getMessage());
}
return null;
}
});
doAs("ROLLOVER", new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
KeyProvider kp = new KMSClientProvider(uri, conf);
try {
Options options = new KeyProvider.Options(conf);
Map<String, String> attributes = options.getAttributes();
HashMap<String,String> newAttribs = new HashMap<String, String>(attributes);
newAttribs.put("key.acl.name", "test_key2");
options.setAttributes(newAttribs);
KeyProvider.KeyVersion kv = kp.createKey("k1", options);
Assert.assertNull(kv.getMaterial());
KeyVersion rollVersion = kp.rollNewVersion("k1");
Assert.assertNull(rollVersion.getMaterial());
try {
kp.rollNewVersion("k0");
Assert.fail("User [ROLLOVER] should not be allowed to rollover k0");
} catch (Exception e) {
// Ignore
}
KeyProviderCryptoExtension kpce =
KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp);
try {
kpce.generateEncryptedKey("k1");
Assert.fail("User [ROLLOVER] should not be allowed to generate_eek on k1");
} catch (Exception e) {
// Ignore
}
newAttribs = new HashMap<String, String>(attributes);
newAttribs.put("key.acl.name", "all_access");
options.setAttributes(newAttribs);
try {
kp.createKey("kx", options);
Assert.fail("User [ROLLOVER] should not be allowed to create kx");
} catch (Exception e) {
// Ignore
}
} catch (Exception ex) {
Assert.fail(ex.getMessage());
}
return null;
}
});
doAs("GET", new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
KeyProvider kp = new KMSClientProvider(uri, conf);
try {
Options options = new KeyProvider.Options(conf);
Map<String, String> attributes = options.getAttributes();
HashMap<String,String> newAttribs = new HashMap<String, String>(attributes);
newAttribs.put("key.acl.name", "test_key");
options.setAttributes(newAttribs);
try {
kp.createKey("k2", options);
Assert.fail("User [GET] should not be allowed to create key..");
} catch (Exception e) {
// Ignore
}
newAttribs = new HashMap<String, String>(attributes);
newAttribs.put("key.acl.name", "all_access");
options.setAttributes(newAttribs);
try {
kp.createKey("kx", options);
Assert.fail("User [GET] should not be allowed to create kx");
} catch (Exception e) {
// Ignore
}
} catch (Exception ex) {
Assert.fail(ex.getMessage());
}
return null;
}
});
final EncryptedKeyVersion ekv = doAs("GENERATE_EEK", new PrivilegedExceptionAction<EncryptedKeyVersion>() {
@Override
public EncryptedKeyVersion run() throws Exception {
KeyProvider kp = new KMSClientProvider(uri, conf);
try {
Options options = new KeyProvider.Options(conf);
Map<String, String> attributes = options.getAttributes();
HashMap<String,String> newAttribs = new HashMap<String, String>(attributes);
newAttribs.put("key.acl.name", "all_access");
options.setAttributes(newAttribs);
kp.createKey("kx", options);
KeyProviderCryptoExtension kpce =
KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp);
try {
return kpce.generateEncryptedKey("kx");
} catch (Exception e) {
Assert.fail("User [GENERATE_EEK] should be allowed to generate_eek on kx");
}
} catch (Exception ex) {
Assert.fail(ex.getMessage());
}
return null;
}
});
doAs("ROLLOVER", new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
KeyProvider kp = new KMSClientProvider(uri, conf);
try {
KeyProviderCryptoExtension kpce =
KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp);
kpce.decryptEncryptedKey(ekv);
} catch (Exception ex) {
Assert.fail(ex.getMessage());
}
return null;
}
});
return null;
}
});
}
@Test
public void testACLs() throws Exception {
Configuration conf = new Configuration();
@ -586,6 +802,9 @@ public void testACLs() throws Exception {
conf.set(KMSACLs.Type.ROLLOVER.getAclConfigKey(),
KMSACLs.Type.ROLLOVER.toString() + ",SET_KEY_MATERIAL");
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k0.ALL", "*");
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k1.ALL", "*");
writeConf(testDir, conf);
runServer(null, null, testDir, new KMSCallable() {
@ -891,6 +1110,9 @@ public void testKMSBlackList() throws Exception {
conf.set(KMSACLs.Type.DECRYPT_EEK.getAclConfigKey(), "client,hdfs,otheradmin");
conf.set(KMSACLs.Type.DECRYPT_EEK.getBlacklistConfigKey(), "hdfs,otheradmin");
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "ck0.ALL", "*");
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "ck1.ALL", "*");
writeConf(testDir, conf);
runServer(null, null, testDir, new KMSCallable() {
@ -973,6 +1195,7 @@ public void testServicePrincipalACLs() throws Exception {
conf.set(type.getAclConfigKey(), " ");
}
conf.set(KMSACLs.Type.CREATE.getAclConfigKey(), "client");
conf.set(KMSConfiguration.DEFAULT_KEY_ACL_PREFIX + "MANAGEMENT", "client,client/host");
writeConf(testDir, conf);
@ -1096,6 +1319,9 @@ public void testDelegationTokenAccess() throws Exception {
conf.set("hadoop.kms.authentication.kerberos.principal", "HTTP/localhost");
conf.set("hadoop.kms.authentication.kerberos.name.rules", "DEFAULT");
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "kA.ALL", "*");
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "kD.ALL", "*");
writeConf(testDir, conf);
runServer(null, null, testDir, new KMSCallable() {
@ -1164,6 +1390,10 @@ public void testProxyUser() throws Exception {
conf.set("hadoop.kms.authentication.kerberos.name.rules", "DEFAULT");
conf.set("hadoop.kms.proxyuser.client.users", "foo");
conf.set("hadoop.kms.proxyuser.client.hosts", "*");
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "kAA.ALL", "*");
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "kBB.ALL", "*");
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "kCC.ALL", "*");
writeConf(testDir, conf);
runServer(null, null, testDir, new KMSCallable() {

View File

@ -0,0 +1,218 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.crypto.key.kms.server;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
import org.apache.hadoop.crypto.key.KeyProvider.Options;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import org.apache.hadoop.crypto.key.UserProvider;
import org.apache.hadoop.crypto.key.kms.server.KeyAuthorizationKeyProvider.KeyACLs;
import org.apache.hadoop.crypto.key.kms.server.KeyAuthorizationKeyProvider.KeyOpType;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Assert;
import org.junit.Test;
public class TestKeyAuthorizationKeyProvider {
private static final String CIPHER = "AES";
@Test
public void testCreateKey() throws Exception {
final Configuration conf = new Configuration();
KeyProvider kp =
new UserProvider.Factory().createProvider(new URI("user:///"), conf);
KeyACLs mock = mock(KeyACLs.class);
when(mock.isACLPresent("foo", KeyOpType.MANAGEMENT)).thenReturn(true);
UserGroupInformation u1 = UserGroupInformation.createRemoteUser("u1");
when(mock.hasAccessToKey("foo", u1, KeyOpType.MANAGEMENT)).thenReturn(true);
final KeyProviderCryptoExtension kpExt =
new KeyAuthorizationKeyProvider(
KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp),
mock);
u1.doAs(
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
try {
kpExt.createKey("foo", SecureRandom.getSeed(16),
newOptions(conf));
} catch (IOException ioe) {
Assert.fail("User should be Authorized !!");
}
// "bar" key not configured
try {
kpExt.createKey("bar", SecureRandom.getSeed(16),
newOptions(conf));
Assert.fail("User should NOT be Authorized !!");
} catch (IOException ioe) {
// Ignore
}
return null;
}
}
);
// Unauthorized User
UserGroupInformation.createRemoteUser("badGuy").doAs(
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
try {
kpExt.createKey("foo", SecureRandom.getSeed(16),
newOptions(conf));
Assert.fail("User should NOT be Authorized !!");
} catch (IOException ioe) {
// Ignore
}
return null;
}
}
);
}
@Test
public void testOpsWhenACLAttributeExists() throws Exception {
final Configuration conf = new Configuration();
KeyProvider kp =
new UserProvider.Factory().createProvider(new URI("user:///"), conf);
KeyACLs mock = mock(KeyACLs.class);
when(mock.isACLPresent("testKey", KeyOpType.MANAGEMENT)).thenReturn(true);
when(mock.isACLPresent("testKey", KeyOpType.GENERATE_EEK)).thenReturn(true);
when(mock.isACLPresent("testKey", KeyOpType.DECRYPT_EEK)).thenReturn(true);
when(mock.isACLPresent("testKey", KeyOpType.ALL)).thenReturn(true);
UserGroupInformation u1 = UserGroupInformation.createRemoteUser("u1");
UserGroupInformation u2 = UserGroupInformation.createRemoteUser("u2");
UserGroupInformation u3 = UserGroupInformation.createRemoteUser("u3");
UserGroupInformation sudo = UserGroupInformation.createRemoteUser("sudo");
when(mock.hasAccessToKey("testKey", u1, KeyOpType.MANAGEMENT)).thenReturn(true);
when(mock.hasAccessToKey("testKey", u2, KeyOpType.GENERATE_EEK)).thenReturn(true);
when(mock.hasAccessToKey("testKey", u3, KeyOpType.DECRYPT_EEK)).thenReturn(true);
when(mock.hasAccessToKey("testKey", sudo, KeyOpType.ALL)).thenReturn(true);
final KeyProviderCryptoExtension kpExt =
new KeyAuthorizationKeyProvider(
KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp),
mock);
final KeyVersion barKv = u1.doAs(
new PrivilegedExceptionAction<KeyVersion>() {
@Override
public KeyVersion run() throws Exception {
Options opt = newOptions(conf);
Map<String, String> m = new HashMap<String, String>();
m.put("key.acl.name", "testKey");
opt.setAttributes(m);
try {
KeyVersion kv =
kpExt.createKey("foo", SecureRandom.getSeed(16), opt);
kpExt.rollNewVersion(kv.getName());
kpExt.rollNewVersion(kv.getName(), SecureRandom.getSeed(16));
kpExt.deleteKey(kv.getName());
} catch (IOException ioe) {
Assert.fail("User should be Authorized !!");
}
KeyVersion retkv = null;
try {
retkv = kpExt.createKey("bar", SecureRandom.getSeed(16), opt);
kpExt.generateEncryptedKey(retkv.getName());
Assert.fail("User should NOT be Authorized to generate EEK !!");
} catch (IOException ioe) {
}
Assert.assertNotNull(retkv);
return retkv;
}
}
);
final EncryptedKeyVersion barEKv =
u2.doAs(
new PrivilegedExceptionAction<EncryptedKeyVersion>() {
@Override
public EncryptedKeyVersion run() throws Exception {
try {
kpExt.deleteKey(barKv.getName());
Assert.fail("User should NOT be Authorized to "
+ "perform any other operation !!");
} catch (IOException ioe) {
}
return kpExt.generateEncryptedKey(barKv.getName());
}
});
u3.doAs(
new PrivilegedExceptionAction<KeyVersion>() {
@Override
public KeyVersion run() throws Exception {
try {
kpExt.deleteKey(barKv.getName());
Assert.fail("User should NOT be Authorized to "
+ "perform any other operation !!");
} catch (IOException ioe) {
}
return kpExt.decryptEncryptedKey(barEKv);
}
});
sudo.doAs(
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
Options opt = newOptions(conf);
Map<String, String> m = new HashMap<String, String>();
m.put("key.acl.name", "testKey");
opt.setAttributes(m);
try {
KeyVersion kv =
kpExt.createKey("foo", SecureRandom.getSeed(16), opt);
kpExt.rollNewVersion(kv.getName());
kpExt.rollNewVersion(kv.getName(), SecureRandom.getSeed(16));
EncryptedKeyVersion ekv = kpExt.generateEncryptedKey(kv.getName());
kpExt.decryptEncryptedKey(ekv);
kpExt.deleteKey(kv.getName());
} catch (IOException ioe) {
Assert.fail("User should be Allowed to do everything !!");
}
return null;
}
}
);
}
private static KeyProvider.Options newOptions(Configuration conf) {
KeyProvider.Options options = new KeyProvider.Options(conf);
options.setCipher(CIPHER);
options.setBitLength(128);
return options;
}
}

View File

@ -762,6 +762,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6986. DistributedFileSystem must get delegation tokens from configured
KeyProvider. (zhz via tucu)
HDFS-6776. Using distcp to copy data between insecure and secure cluster via webdhfs
doesn't work. (yzhangal via tucu)
Release 2.5.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -402,8 +402,7 @@ public static Credentials createCredentials(final NameNode namenode,
final Token<DelegationTokenIdentifier> token = namenode.getRpcServer(
).getDelegationToken(new Text(renewer));
if (token == null) {
throw new IOException("Failed to get the token for " + renewer
+ ", user=" + ugi.getShortUserName());
return null;
}
final InetSocketAddress addr = namenode.getNameNodeAddress();

View File

@ -279,6 +279,9 @@ private Token<? extends TokenIdentifier> generateDelegationToken(
final String renewer) throws IOException {
final Credentials c = DelegationTokenSecretManager.createCredentials(
namenode, ugi, renewer != null? renewer: ugi.getShortUserName());
if (c == null) {
return null;
}
final Token<? extends TokenIdentifier> t = c.getAllTokens().iterator().next();
Text kind = request.getScheme().equals("http") ? WebHdfsFileSystem.TOKEN_KIND
: SWebHdfsFileSystem.TOKEN_KIND;

View File

@ -41,6 +41,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.DelegationTokenRenewer;
import org.apache.hadoop.fs.FSDataInputStream;
@ -102,6 +103,11 @@ public class WebHdfsFileSystem extends FileSystem
/** Delegation token kind */
public static final Text TOKEN_KIND = new Text("WEBHDFS delegation");
@VisibleForTesting
public static final String CANT_FALLBACK_TO_INSECURE_MSG =
"The client is configured to only allow connecting to secure cluster";
private boolean canRefreshDelegationToken;
private UserGroupInformation ugi;
@ -112,6 +118,7 @@ public class WebHdfsFileSystem extends FileSystem
private Path workingDir;
private InetSocketAddress nnAddrs[];
private int currentNNAddrIndex;
private boolean disallowFallbackToInsecureCluster;
/**
* Return the protocol scheme for the FileSystem.
@ -194,6 +201,9 @@ public synchronized void initialize(URI uri, Configuration conf
this.workingDir = getHomeDirectory();
this.canRefreshDelegationToken = UserGroupInformation.isSecurityEnabled();
this.disallowFallbackToInsecureCluster = !conf.getBoolean(
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
this.delegationToken = null;
}
@ -1293,7 +1303,13 @@ Token<DelegationTokenIdentifier> decodeResponse(Map<?,?> json)
return JsonUtil.toDelegationToken(json);
}
}.run();
token.setService(tokenServiceName);
if (token != null) {
token.setService(tokenServiceName);
} else {
if (disallowFallbackToInsecureCluster) {
throw new AccessControlException(CANT_FALLBACK_TO_INSECURE_MSG);
}
}
return token;
}

View File

@ -29,6 +29,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@ -45,6 +46,7 @@
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
@ -482,4 +484,43 @@ public void testRaceWhileNNStartup() throws Exception {
}
}
}
@Test
public void testDTInInsecureClusterWithFallback()
throws IOException, URISyntaxException {
MiniDFSCluster cluster = null;
final Configuration conf = WebHdfsTestUtil.createConf();
conf.setBoolean(CommonConfigurationKeys
.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, true);
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsFileSystem.SCHEME);
Assert.assertNull(webHdfs.getDelegationToken(null));
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testDTInInsecureCluster() throws Exception {
MiniDFSCluster cluster = null;
final Configuration conf = WebHdfsTestUtil.createConf();
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsFileSystem.SCHEME);
webHdfs.getDelegationToken(null);
fail("No exception is thrown.");
} catch (AccessControlException ace) {
Assert.assertTrue(ace.getMessage().startsWith(
WebHdfsFileSystem.CANT_FALLBACK_TO_INSECURE_MSG));
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -276,6 +276,9 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-6071. JobImpl#makeUberDecision doesn't log that Uber mode is
disabled because of too much CPUs (Tsuyoshi OZAWA via jlowe)
MAPREDUCE-6075. HistoryServerFileSystemStateStore can create zero-length
files (jlowe)
Release 2.5.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -189,6 +189,8 @@ public void storeTokenMasterKey(DelegationKey key) throws IOException {
DataOutputStream dataStream = new DataOutputStream(memStream);
try {
key.write(dataStream);
dataStream.close();
dataStream = null;
} finally {
IOUtils.cleanup(LOG, dataStream);
}
@ -260,6 +262,8 @@ private void writeFile(Path file, byte[] data) throws IOException {
try {
try {
out.write(data);
out.close();
out = null;
} finally {
IOUtils.cleanup(LOG, out);
}
@ -299,6 +303,8 @@ private byte[] buildTokenData(MRDelegationTokenIdentifier tokenId,
try {
tokenId.write(dataStream);
dataStream.writeLong(renewDate);
dataStream.close();
dataStream = null;
} finally {
IOUtils.cleanup(LOG, dataStream);
}

View File

@ -327,6 +327,12 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
@ -576,6 +582,12 @@
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.7.2</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>

View File

@ -15,5 +15,361 @@
limitations under the License.
-->
<FindBugsFilter>
<Match>
<Package name="org.apache.hadoop.security.proto" />
</Match>
<Match>
<Package name="org.apache.hadoop.tools.proto" />
</Match>
<Match>
<Bug pattern="EI_EXPOSE_REP" />
</Match>
<Match>
<Bug pattern="EI_EXPOSE_REP2" />
</Match>
<Match>
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
</Match>
<Match>
<Class name="~.*_jsp" />
<Bug pattern="DLS_DEAD_LOCAL_STORE" />
</Match>
<Match>
<Field name="_jspx_dependants" />
<Bug pattern="UWF_UNWRITTEN_FIELD" />
</Match>
<!--
Inconsistent synchronization for Client.Connection.out is
is intentional to make a connection to be closed instantly.
-->
<Match>
<Class name="org.apache.hadoop.ipc.Client$Connection" />
<Field name="out" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<!--
Further SaslException should be ignored during cleanup and
original exception should be re-thrown.
-->
<Match>
<Class name="org.apache.hadoop.security.SaslRpcClient" />
<Bug pattern="DE_MIGHT_IGNORE" />
</Match>
<!--
Ignore Cross Scripting Vulnerabilities
-->
<Match>
<Package name="~org.apache.hadoop.mapred.*" />
<Bug code="XSS" />
</Match>
<Match>
<Class name="org.apache.hadoop.mapred.taskdetails_jsp" />
<Bug code="HRS" />
</Match>
<!--
Ignore warnings where child class has the same name as
super class. Classes based on Old API shadow names from
new API. Should go off after HADOOP-1.0
-->
<Match>
<Class name="~org.apache.hadoop.mapred.*" />
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS" />
</Match>
<!--
Ignore warnings for usage of System.exit. This is
required and have been well thought out
-->
<Match>
<Class name="org.apache.hadoop.mapred.Child$2" />
<Method name="run" />
<Bug pattern="DM_EXIT" />
</Match>
<Match>
<Class name="org.apache.hadoop.mapred.JobTracker" />
<Method name="addHostToNodeMapping" />
<Bug pattern="DM_EXIT" />
</Match>
<Match>
<Class name="org.apache.hadoop.mapred.Task" />
<Or>
<Method name="done" />
<Method name="commit" />
<Method name="statusUpdate" />
</Or>
<Bug pattern="DM_EXIT" />
</Match>
<Match>
<Class name="org.apache.hadoop.mapred.Task$TaskReporter" />
<Method name="run" />
<Bug pattern="DM_EXIT" />
</Match>
<Match>
<Class name="org.apache.hadoop.util.ProgramDriver" />
<Method name="driver" />
<Bug pattern="DM_EXIT" />
</Match>
<Match>
<Class name="org.apache.hadoop.util.RunJar" />
<Method name="run" />
<Bug pattern="DM_EXIT" />
</Match>
<!--
We need to cast objects between old and new api objects
-->
<Match>
<Class name="org.apache.hadoop.mapred.OutputCommitter" />
<Bug pattern="BC_UNCONFIRMED_CAST" />
</Match>
<!--
We intentionally do the get name from the inner class
-->
<Match>
<Class name="org.apache.hadoop.mapred.TaskTracker$MapEventsFetcherThread" />
<Method name="run" />
<Bug pattern="IA_AMBIGUOUS_INVOCATION_OF_INHERITED_OR_OUTER_METHOD" />
</Match>
<Match>
<Class name="org.apache.hadoop.mapred.FileOutputCommitter" />
<Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
</Match>
<!--
Ignoring this warning as resolving this would need a non-trivial change in code
-->
<Match>
<Class name="org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorBaseDescriptor" />
<Method name="configure" />
<Field name="maxNumItems" />
<Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD" />
</Match>
<!--
Comes from org.apache.jasper.runtime.ResourceInjector. Cannot do much.
-->
<Match>
<Class name="org.apache.hadoop.mapred.jobqueue_005fdetails_jsp" />
<Field name="_jspx_resourceInjector" />
<Bug pattern="SE_BAD_FIELD" />
</Match>
<!--
Storing textInputFormat and then passing it as a parameter. Safe to ignore.
-->
<Match>
<Class name="org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorJob" />
<Method name="createValueAggregatorJob" />
<Bug pattern="DLS_DEAD_STORE_OF_CLASS_LITERAL" />
</Match>
<!--
Can remove this after the upgrade to findbugs1.3.8
-->
<Match>
<Class name="org.apache.hadoop.mapred.lib.db.DBInputFormat" />
<Method name="getSplits" />
<Bug pattern="DLS_DEAD_LOCAL_STORE" />
</Match>
<!--
This is a spurious warning. Just ignore
-->
<Match>
<Class name="org.apache.hadoop.mapred.MapTask$MapOutputBuffer" />
<Field name="kvindex" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
</FindBugsFilter>
<!--
core changes
-->
<Match>
<Class name="~org.apache.hadoop.*" />
<Bug code="MS" />
</Match>
<Match>
<Class name="org.apache.hadoop.fs.FileSystem" />
<Method name="checkPath" />
<Bug pattern="ES_COMPARING_STRINGS_WITH_EQ" />
</Match>
<Match>
<Class name="org.apache.hadoop.io.Closeable" />
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
</Match>
<Match>
<Class name="org.apache.hadoop.security.AccessControlException" />
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS" />
</Match>
<Match>
<Class name="org.apache.hadoop.util.ProcfsBasedProcessTree" />
<Bug pattern="DMI_HARDCODED_ABSOLUTE_FILENAME" />
</Match>
<!--
Streaming, Examples
-->
<Match>
<Class name="org.apache.hadoop.streaming.StreamUtil$TaskId" />
<Bug pattern="URF_UNREAD_FIELD" />
</Match>
<Match>
<Class name="org.apache.hadoop.examples.DBCountPageView" />
<Method name="verify" />
<Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
</Match>
<Match>
<Class name="org.apache.hadoop.examples.ContextFactory" />
<Method name="setAttributes" />
<Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
</Match>
<!--
TFile
-->
<Match>
<Class name="org.apache.hadoop.io.file.tfile.Chunk$ChunkDecoder" />
<Method name="close" />
<Bug pattern="SR_NOT_CHECKED" />
</Match>
<!--
The purpose of skip() is to drain remaining bytes of the chunk-encoded
stream (one chunk at a time). The termination condition is checked by
checkEOF().
-->
<Match>
<Class name="org.apache.hadoop.io.file.tfile.Utils" />
<Method name="writeVLong" />
<Bug pattern="SF_SWITCH_FALLTHROUGH" />
</Match>
<!--
The switch condition fall through is intentional and for performance
purposes.
-->
<Match>
<Class name="org.apache.hadoop.log.EventCounter"/>
<!-- backward compatibility -->
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/>
</Match>
<Match>
<Class name="org.apache.hadoop.metrics.jvm.EventCounter"/>
<!-- backward compatibility -->
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.ProtobufRpcEngineProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.ProtocolInfoProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.IpcConnectionContextProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.RpcHeaderProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ha\.proto\.HAServiceProtocolProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ha\.proto\.ZKFCProtocolProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.security\.proto\.SecurityProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.TestProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.proto\.RefreshCallQueueProtocolProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.proto\.GenericRefreshProtocolProtos.*"/>
</Match>
<!--
Manually checked, misses child thread manually syncing on parent's intrinsic lock.
-->
<Match>
<Class name="org.apache.hadoop.metrics2.lib.MutableQuantiles" />
<Field name="previousSnapshot" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<!--
The method uses a generic type T that extends two other types
T1 and T2. Findbugs complains of a cast from T1 to T2.
-->
<Match>
<Class name="org.apache.hadoop.fs.DelegationTokenRenewer" />
<Method name="removeRenewAction" />
<Bug pattern="BC_UNCONFIRMED_CAST" />
</Match>
<!-- Inconsistent synchronization flagged by findbugs is not valid. -->
<Match>
<Class name="org.apache.hadoop.ipc.Client$Connection" />
<Field name="in" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<!--
The switch condition for INITIATE is expected to fallthru to RESPONSE
to process initial sasl response token included in the INITIATE
-->
<Match>
<Class name="org.apache.hadoop.ipc.Server$Connection" />
<Method name="processSaslMessage" />
<Bug pattern="SF_SWITCH_FALLTHROUGH" />
</Match>
<!-- Synchronization performed on util.concurrent instance. -->
<Match>
<Class name="org.apache.hadoop.service.AbstractService" />
<Method name="stop" />
<Bug code="JLM" />
</Match>
<Match>
<Class name="org.apache.hadoop.service.AbstractService" />
<Method name="waitForServiceToStop" />
<Bug code="JLM" />
</Match>
<!--
OpenStack Swift FS module -closes streams in a different method
from where they are opened.
-->
<Match>
<Class name="org.apache.hadoop.fs.swift.snative.SwiftNativeOutputStream"/>
<Method name="uploadFileAttempt"/>
<Bug pattern="OBL_UNSATISFIED_OBLIGATION"/>
</Match>
<Match>
<Class name="org.apache.hadoop.fs.swift.snative.SwiftNativeOutputStream"/>
<Method name="uploadFilePartAttempt"/>
<Bug pattern="OBL_UNSATISFIED_OBLIGATION"/>
</Match>
<!-- code from maven source, null value is checked at callee side. -->
<Match>
<Class name="org.apache.hadoop.util.ComparableVersion$ListItem" />
<Method name="compareTo" />
<Bug code="NP" />
</Match>
<Match>
<Class name="org.apache.hadoop.util.HttpExceptionUtils"/>
<Method name="validateResponse"/>
<Bug pattern="REC_CATCH_EXCEPTION"/>
</Match>
</FindBugsFilter>

View File

@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.hadoop.fs.s3.S3FileSystem
org.apache.hadoop.fs.s3native.NativeS3FileSystem

View File

@ -46,15 +46,6 @@ protected void tearDown() throws Exception {
super.tearDown();
}
public void testBlockSize() throws Exception {
Path file = path("/test/hadoop/file");
long newBlockSize = fs.getDefaultBlockSize(file) * 2;
fs.getConf().setLong("fs.s3.block.size", newBlockSize);
createFile(file);
assertEquals("Double default block size", newBlockSize,
fs.getFileStatus(file).getBlockSize());
}
public void testCanonicalName() throws Exception {
assertNull("s3 doesn't support security token and shouldn't have canonical name",
fs.getCanonicalServiceName());

View File

@ -83,6 +83,12 @@
<scope>compile</scope>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<scope>compile</scope>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-azure</artifactId>

View File

@ -67,6 +67,12 @@ Release 2.6.0 - UNRELEASED
YARN-2394. FairScheduler: Configure fairSharePreemptionThreshold per queue.
(Wei Yan via kasha)
YARN-415. Capture aggregate memory allocation at the app-level for chargeback.
(Eric Payne & Andrey Klochkov via jianhe)
YARN-2440. Enabled Nodemanagers to limit the aggregate cpu usage across all
containers to a preconfigured limit. (Varun Vasudev via vinodkv)
IMPROVEMENTS
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc
@ -196,6 +202,9 @@ Release 2.6.0 - UNRELEASED
YARN-2515. Updated ConverterUtils#toContainerId to parse epoch.
(Tsuyoshi OZAWA via jianhe)
YARN-2448. Changed ApplicationMasterProtocol to expose RM-recognized resource
types to the AMs. (Varun Vasudev via vinodkv)
OPTIMIZATIONS
BUG FIXES
@ -305,6 +314,15 @@ Release 2.6.0 - UNRELEASED
YARN-2526. SLS can deadlock when all the threads are taken by AMSimulators.
(Wei Yan via kasha)
YARN-1458. FairScheduler: Zero weight can lead to livelock.
(Zhihai Xu via kasha)
YARN-2459. RM crashes if App gets rejected for any reason
and HA is enabled. (Jian He and Mayank Bansal via xgong)
YARN-2158. Fixed TestRMWebServicesAppsModification#testSingleAppKill test
failure. (Varun Vasudev via jianhe)
Release 2.5.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.api.protocolrecords;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@ -31,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.util.Records;
/**
@ -180,4 +182,25 @@ public abstract void setContainersFromPreviousAttempts(
@Private
@Unstable
public abstract void setNMTokensFromPreviousAttempts(List<NMToken> nmTokens);
/**
* Get a set of the resource types considered by the scheduler.
*
* @return a Map of RM settings
*/
@Public
@Unstable
public abstract EnumSet<SchedulerResourceTypes> getSchedulerResourceTypes();
/**
* Set the resource types used by the scheduler.
*
* @param types
* a set of the resource types that the scheduler considers during
* scheduling
*/
@Private
@Unstable
public abstract void setSchedulerResourceTypes(
EnumSet<SchedulerResourceTypes> types);
}

View File

@ -35,7 +35,8 @@ public abstract class ApplicationResourceUsageReport {
@Unstable
public static ApplicationResourceUsageReport newInstance(
int numUsedContainers, int numReservedContainers, Resource usedResources,
Resource reservedResources, Resource neededResources) {
Resource reservedResources, Resource neededResources, long memorySeconds,
long vcoreSeconds) {
ApplicationResourceUsageReport report =
Records.newRecord(ApplicationResourceUsageReport.class);
report.setNumUsedContainers(numUsedContainers);
@ -43,6 +44,8 @@ public static ApplicationResourceUsageReport newInstance(
report.setUsedResources(usedResources);
report.setReservedResources(reservedResources);
report.setNeededResources(neededResources);
report.setMemorySeconds(memorySeconds);
report.setVcoreSeconds(vcoreSeconds);
return report;
}
@ -113,4 +116,40 @@ public static ApplicationResourceUsageReport newInstance(
@Private
@Unstable
public abstract void setNeededResources(Resource needed_resources);
/**
* Set the aggregated amount of memory (in megabytes) the application has
* allocated times the number of seconds the application has been running.
* @param memory_seconds the aggregated amount of memory seconds
*/
@Private
@Unstable
public abstract void setMemorySeconds(long memory_seconds);
/**
* Get the aggregated amount of memory (in megabytes) the application has
* allocated times the number of seconds the application has been running.
* @return the aggregated amount of memory seconds
*/
@Public
@Unstable
public abstract long getMemorySeconds();
/**
* Set the aggregated number of vcores that the application has allocated
* times the number of seconds the application has been running.
* @param vcore_seconds the aggregated number of vcore seconds
*/
@Private
@Unstable
public abstract void setVcoreSeconds(long vcore_seconds);
/**
* Get the aggregated number of vcores that the application has allocated
* times the number of seconds the application has been running.
* @return the aggregated number of vcore seconds
*/
@Public
@Unstable
public abstract long getVcoreSeconds();
}

View File

@ -723,6 +723,12 @@ public class YarnConfiguration extends Configuration {
/** Number of Virtual CPU Cores which can be allocated for containers.*/
public static final String NM_VCORES = NM_PREFIX + "resource.cpu-vcores";
public static final int DEFAULT_NM_VCORES = 8;
/** Percentage of overall CPU which can be allocated for containers. */
public static final String NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT =
NM_PREFIX + "resource.percentage-physical-cpu-limit";
public static final int DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT =
100;
/** NM Webapp address.**/
public static final String NM_WEBAPP_ADDRESS = NM_PREFIX + "webapp.address";

View File

@ -167,6 +167,8 @@ message ApplicationResourceUsageReportProto {
optional ResourceProto used_resources = 3;
optional ResourceProto reserved_resources = 4;
optional ResourceProto needed_resources = 5;
optional int64 memory_seconds = 6;
optional int64 vcore_seconds = 7;
}
message ApplicationReportProto {

View File

@ -47,6 +47,7 @@ message RegisterApplicationMasterResponseProto {
repeated ContainerProto containers_from_previous_attempts = 4;
optional string queue = 5;
repeated NMTokenProto nm_tokens_from_previous_attempts = 6;
repeated SchedulerResourceTypes scheduler_resource_types = 7;
}
message FinishApplicationMasterRequestProto {
@ -88,6 +89,11 @@ message AllocateResponseProto {
optional hadoop.common.TokenProto am_rm_token = 12;
}
enum SchedulerResourceTypes {
MEMORY = 0;
CPU = 1;
}
//////////////////////////////////////////////////////
/////// client_RM_Protocol ///////////////////////////
//////////////////////////////////////////////////////

View File

@ -38,6 +38,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
@ -460,6 +461,11 @@ private void printApplicationReport(String applicationId)
appReportStr.println(appReport.getRpcPort());
appReportStr.print("\tAM Host : ");
appReportStr.println(appReport.getHost());
appReportStr.print("\tAggregate Resource Allocation : ");
ApplicationResourceUsageReport usageReport = appReport.getApplicationResourceUsageReport();
appReportStr.print(usageReport.getMemorySeconds() + " MB-seconds, ");
appReportStr.println(usageReport.getVcoreSeconds() + " vcore-seconds");
appReportStr.print("\tDiagnostics : ");
appReportStr.print(appReport.getDiagnostics());
} else {

View File

@ -46,6 +46,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
@ -87,11 +88,15 @@ public void setup() {
public void testGetApplicationReport() throws Exception {
ApplicationCLI cli = createAndGetAppCLI();
ApplicationId applicationId = ApplicationId.newInstance(1234, 5);
ApplicationResourceUsageReport usageReport =
ApplicationResourceUsageReport.newInstance(
2, 0, null, null, null, 123456, 4567);
ApplicationReport newApplicationReport = ApplicationReport.newInstance(
applicationId, ApplicationAttemptId.newInstance(applicationId, 1),
"user", "queue", "appname", "host", 124, null,
YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0,
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null);
FinalApplicationStatus.SUCCEEDED, usageReport, "N/A", 0.53789f, "YARN",
null);
when(client.getApplicationReport(any(ApplicationId.class))).thenReturn(
newApplicationReport);
int result = cli.run(new String[] { "application", "-status", applicationId.toString() });
@ -113,6 +118,7 @@ public void testGetApplicationReport() throws Exception {
pw.println("\tTracking-URL : N/A");
pw.println("\tRPC Port : 124");
pw.println("\tAM Host : host");
pw.println("\tAggregate Resource Allocation : 123456 MB-seconds, 4567 vcore-seconds");
pw.println("\tDiagnostics : diagnostics");
pw.close();
String appReportStr = baos.toString("UTF-8");

View File

@ -20,11 +20,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.*;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@ -43,6 +39,7 @@
import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import com.google.protobuf.ByteString;
import com.google.protobuf.TextFormat;
@ -61,6 +58,7 @@ public class RegisterApplicationMasterResponsePBImpl extends
private Map<ApplicationAccessType, String> applicationACLS = null;
private List<Container> containersFromPreviousAttempts = null;
private List<NMToken> nmTokens = null;
private EnumSet<SchedulerResourceTypes> schedulerResourceTypes = null;
public RegisterApplicationMasterResponsePBImpl() {
builder = RegisterApplicationMasterResponseProto.newBuilder();
@ -122,6 +120,9 @@ private void mergeLocalToBuilder() {
Iterable<NMTokenProto> iterable = getTokenProtoIterable(nmTokens);
builder.addAllNmTokensFromPreviousAttempts(iterable);
}
if(schedulerResourceTypes != null) {
addSchedulerResourceTypes();
}
}
@ -364,6 +365,73 @@ public void remove() {
};
}
@Override
public EnumSet<SchedulerResourceTypes> getSchedulerResourceTypes() {
initSchedulerResourceTypes();
return this.schedulerResourceTypes;
}
private void initSchedulerResourceTypes() {
if (this.schedulerResourceTypes != null) {
return;
}
RegisterApplicationMasterResponseProtoOrBuilder p =
viaProto ? proto : builder;
List<SchedulerResourceTypes> list = p.getSchedulerResourceTypesList();
if (list.isEmpty()) {
this.schedulerResourceTypes =
EnumSet.noneOf(SchedulerResourceTypes.class);
} else {
this.schedulerResourceTypes = EnumSet.copyOf(list);
}
}
private void addSchedulerResourceTypes() {
maybeInitBuilder();
builder.clearSchedulerResourceTypes();
if (schedulerResourceTypes == null) {
return;
}
Iterable<? extends SchedulerResourceTypes> values =
new Iterable<SchedulerResourceTypes>() {
@Override
public Iterator<SchedulerResourceTypes> iterator() {
return new Iterator<SchedulerResourceTypes>() {
Iterator<SchedulerResourceTypes> settingsIterator =
schedulerResourceTypes.iterator();
@Override
public boolean hasNext() {
return settingsIterator.hasNext();
}
@Override
public SchedulerResourceTypes next() {
return settingsIterator.next();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
this.builder.addAllSchedulerResourceTypes(values);
}
@Override
public void setSchedulerResourceTypes(EnumSet<SchedulerResourceTypes> types) {
if (types == null) {
return;
}
initSchedulerResourceTypes();
this.schedulerResourceTypes.clear();
this.schedulerResourceTypes.addAll(types);
}
private Resource convertFromProtoFormat(ResourceProto resource) {
return new ResourcePBImpl(resource);
}

View File

@ -200,6 +200,30 @@ public synchronized void setNeededResources(Resource reserved_resources) {
this.neededResources = reserved_resources;
}
@Override
public synchronized void setMemorySeconds(long memory_seconds) {
maybeInitBuilder();
builder.setMemorySeconds(memory_seconds);
}
@Override
public synchronized long getMemorySeconds() {
ApplicationResourceUsageReportProtoOrBuilder p = viaProto ? proto : builder;
return p.getMemorySeconds();
}
@Override
public synchronized void setVcoreSeconds(long vcore_seconds) {
maybeInitBuilder();
builder.setVcoreSeconds(vcore_seconds);
}
@Override
public synchronized long getVcoreSeconds() {
ApplicationResourceUsageReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getVcoreSeconds());
}
private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
return new ResourcePBImpl(p);
}

View File

@ -871,12 +871,24 @@
</property>
<property>
<description>Number of CPU cores that can be allocated
for containers.</description>
<description>Number of vcores that can be allocated
for containers. This is used by the RM scheduler when allocating
resources for containers. This is not used to limit the number of
physical cores used by YARN containers.</description>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>8</value>
</property>
<property>
<description>Percentage of CPU that can be allocated
for containers. This setting allows users to limit the amount of
CPU that YARN containers use. Currently functional only
on Linux using cgroups. The default is to use 100% of CPU.
</description>
<name>yarn.nodemanager.resource.percentage-physical-cpu-limit</name>
<value>100</value>
</property>
<property>
<description>NM Webapp address.</description>
<name>yarn.nodemanager.webapp.address</name>

View File

@ -370,7 +370,8 @@ public static ApplicationSubmissionContext newApplicationSubmissionContext(
public static ApplicationResourceUsageReport newApplicationResourceUsageReport(
int numUsedContainers, int numReservedContainers, Resource usedResources,
Resource reservedResources, Resource neededResources) {
Resource reservedResources, Resource neededResources, long memorySeconds,
long vcoreSeconds) {
ApplicationResourceUsageReport report =
recordFactory.newRecordInstance(ApplicationResourceUsageReport.class);
report.setNumUsedContainers(numUsedContainers);
@ -378,6 +379,8 @@ public static ApplicationResourceUsageReport newApplicationResourceUsageReport(
report.setUsedResources(usedResources);
report.setReservedResources(reservedResources);
report.setNeededResources(neededResources);
report.setMemorySeconds(memorySeconds);
report.setVcoreSeconds(vcoreSeconds);
return report;
}

View File

@ -33,6 +33,7 @@
import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -42,6 +43,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.SystemClock;
public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
@ -59,7 +61,11 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
private final String MTAB_FILE = "/proc/mounts";
private final String CGROUPS_FSTYPE = "cgroup";
private final String CONTROLLER_CPU = "cpu";
private final String CPU_PERIOD_US = "cfs_period_us";
private final String CPU_QUOTA_US = "cfs_quota_us";
private final int CPU_DEFAULT_WEIGHT = 1024; // set by kernel
private final int MAX_QUOTA_US = 1000 * 1000;
private final int MIN_PERIOD_US = 1000;
private final Map<String, String> controllerPaths; // Controller -> path
private long deleteCgroupTimeout;
@ -106,8 +112,15 @@ void initConfig() throws IOException {
}
public void init(LinuxContainerExecutor lce) throws IOException {
this.init(lce,
ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf));
}
@VisibleForTesting
void init(LinuxContainerExecutor lce, ResourceCalculatorPlugin plugin)
throws IOException {
initConfig();
// mount cgroups if requested
if (cgroupMount && cgroupMountPath != null) {
ArrayList<String> cgroupKVs = new ArrayList<String>();
@ -117,8 +130,74 @@ public void init(LinuxContainerExecutor lce) throws IOException {
}
initializeControllerPaths();
// cap overall usage to the number of cores allocated to YARN
float yarnProcessors =
NodeManagerHardwareUtils.getContainersCores(plugin, conf);
int systemProcessors = plugin.getNumProcessors();
if (systemProcessors != (int) yarnProcessors) {
LOG.info("YARN containers restricted to " + yarnProcessors + " cores");
int[] limits = getOverallLimits(yarnProcessors);
updateCgroup(CONTROLLER_CPU, "", CPU_PERIOD_US, String.valueOf(limits[0]));
updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(limits[1]));
} else if (cpuLimitsExist()) {
LOG.info("Removing CPU constraints for YARN containers.");
updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(-1));
}
}
boolean cpuLimitsExist() throws IOException {
String path = pathForCgroup(CONTROLLER_CPU, "");
File quotaFile = new File(path, CONTROLLER_CPU + "." + CPU_QUOTA_US);
if (quotaFile.exists()) {
String contents = FileUtils.readFileToString(quotaFile, "UTF-8");
int quotaUS = Integer.parseInt(contents.trim());
if (quotaUS != -1) {
return true;
}
}
return false;
}
@VisibleForTesting
int[] getOverallLimits(float yarnProcessors) {
int[] ret = new int[2];
if (yarnProcessors < 0.01f) {
throw new IllegalArgumentException("Number of processors can't be <= 0.");
}
int quotaUS = MAX_QUOTA_US;
int periodUS = (int) (MAX_QUOTA_US / yarnProcessors);
if (yarnProcessors < 1.0f) {
periodUS = MAX_QUOTA_US;
quotaUS = (int) (periodUS * yarnProcessors);
if (quotaUS < MIN_PERIOD_US) {
LOG
.warn("The quota calculated for the cgroup was too low. The minimum value is "
+ MIN_PERIOD_US + ", calculated value is " + quotaUS
+ ". Setting quota to minimum value.");
quotaUS = MIN_PERIOD_US;
}
}
// cfs_period_us can't be less than 1000 microseconds
// if the value of periodUS is less than 1000, we can't really use cgroups
// to limit cpu
if (periodUS < MIN_PERIOD_US) {
LOG
.warn("The period calculated for the cgroup was too low. The minimum value is "
+ MIN_PERIOD_US + ", calculated value is " + periodUS
+ ". Using all available CPU.");
periodUS = MAX_QUOTA_US;
quotaUS = -1;
}
ret[0] = periodUS;
ret[1] = quotaUS;
return ret;
}
boolean isCpuWeightEnabled() {
return this.cpuWeightEnabled;
@ -274,7 +353,7 @@ private Map<String, List<String>> parseMtab() throws IOException {
BufferedReader in = null;
try {
in = new BufferedReader(new FileReader(new File(MTAB_FILE)));
in = new BufferedReader(new FileReader(new File(getMtabFileName())));
for (String str = in.readLine(); str != null;
str = in.readLine()) {
@ -292,13 +371,13 @@ private Map<String, List<String>> parseMtab() throws IOException {
}
}
} catch (IOException e) {
throw new IOException("Error while reading " + MTAB_FILE, e);
throw new IOException("Error while reading " + getMtabFileName(), e);
} finally {
// Close the streams
try {
in.close();
} catch (IOException e2) {
LOG.warn("Error closing the stream: " + MTAB_FILE, e2);
LOG.warn("Error closing the stream: " + getMtabFileName(), e2);
}
}
@ -334,7 +413,12 @@ private void initializeControllerPaths() throws IOException {
}
} else {
throw new IOException("Not able to enforce cpu weights; cannot find "
+ "cgroup for cpu controller in " + MTAB_FILE);
+ "cgroup for cpu controller in " + getMtabFileName());
}
}
@VisibleForTesting
String getMtabFileName() {
return MTAB_FILE;
}
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.util;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class NodeManagerHardwareUtils {
/**
*
* Returns the fraction of CPU cores that should be used for YARN containers.
* The number is derived based on various configuration params such as
* YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT
*
* @param conf
* - Configuration object
* @return Fraction of CPU cores to be used for YARN containers
*/
public static float getContainersCores(Configuration conf) {
ResourceCalculatorPlugin plugin =
ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf);
return NodeManagerHardwareUtils.getContainersCores(plugin, conf);
}
/**
*
* Returns the fraction of CPU cores that should be used for YARN containers.
* The number is derived based on various configuration params such as
* YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT
*
* @param plugin
* - ResourceCalculatorPlugin object to determine hardware specs
* @param conf
* - Configuration object
* @return Fraction of CPU cores to be used for YARN containers
*/
public static float getContainersCores(ResourceCalculatorPlugin plugin,
Configuration conf) {
int numProcessors = plugin.getNumProcessors();
int nodeCpuPercentage =
Math.min(conf.getInt(
YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
YarnConfiguration.DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT),
100);
nodeCpuPercentage = Math.max(0, nodeCpuPercentage);
if (nodeCpuPercentage == 0) {
String message =
"Illegal value for "
+ YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT
+ ". Value cannot be less than or equal to 0.";
throw new IllegalArgumentException(message);
}
return (nodeCpuPercentage * numProcessors) / 100.0f;
}
}

View File

@ -17,13 +17,18 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.util;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.junit.Assert;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Clock;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.File;
import java.io.FileOutputStream;
import java.io.*;
import java.util.List;
import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
@ -70,4 +75,142 @@ public void run() {
Assert.assertFalse(handler.deleteCgroup(file.getPath()));
}
static class MockLinuxContainerExecutor extends LinuxContainerExecutor {
@Override
public void mountCgroups(List<String> x, String y) {
}
}
static class CustomCgroupsLCEResourceHandler extends
CgroupsLCEResourcesHandler {
String mtabFile;
int[] limits = new int[2];
@Override
int[] getOverallLimits(float x) {
return limits;
}
void setMtabFile(String file) {
mtabFile = file;
}
@Override
String getMtabFileName() {
return mtabFile;
}
}
@Test
public void testInit() throws IOException {
LinuxContainerExecutor mockLCE = new MockLinuxContainerExecutor();
CustomCgroupsLCEResourceHandler handler =
new CustomCgroupsLCEResourceHandler();
YarnConfiguration conf = new YarnConfiguration();
final int numProcessors = 4;
ResourceCalculatorPlugin plugin =
Mockito.mock(ResourceCalculatorPlugin.class);
Mockito.doReturn(numProcessors).when(plugin).getNumProcessors();
handler.setConf(conf);
handler.initConfig();
// create mock cgroup
File cgroupDir = new File("target", UUID.randomUUID().toString());
if (!cgroupDir.mkdir()) {
String message = "Could not create dir " + cgroupDir.getAbsolutePath();
throw new IOException(message);
}
File cgroupMountDir = new File(cgroupDir.getAbsolutePath(), "hadoop-yarn");
if (!cgroupMountDir.mkdir()) {
String message =
"Could not create dir " + cgroupMountDir.getAbsolutePath();
throw new IOException(message);
}
// create mock mtab
String mtabContent =
"none " + cgroupDir.getAbsolutePath() + " cgroup rw,relatime,cpu 0 0";
File mockMtab = new File("target", UUID.randomUUID().toString());
if (!mockMtab.exists()) {
if (!mockMtab.createNewFile()) {
String message = "Could not create file " + mockMtab.getAbsolutePath();
throw new IOException(message);
}
}
FileWriter mtabWriter = new FileWriter(mockMtab.getAbsoluteFile());
mtabWriter.write(mtabContent);
mtabWriter.close();
mockMtab.deleteOnExit();
// setup our handler and call init()
handler.setMtabFile(mockMtab.getAbsolutePath());
// check values
// in this case, we're using all cpu so the files
// shouldn't exist(because init won't create them
handler.init(mockLCE, plugin);
File periodFile = new File(cgroupMountDir, "cpu.cfs_period_us");
File quotaFile = new File(cgroupMountDir, "cpu.cfs_quota_us");
Assert.assertFalse(periodFile.exists());
Assert.assertFalse(quotaFile.exists());
// subset of cpu being used, files should be created
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 75);
handler.limits[0] = 100 * 1000;
handler.limits[1] = 1000 * 1000;
handler.init(mockLCE, plugin);
int period = readIntFromFile(periodFile);
int quota = readIntFromFile(quotaFile);
Assert.assertEquals(100 * 1000, period);
Assert.assertEquals(1000 * 1000, quota);
// set cpu back to 100, quota should be -1
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 100);
handler.limits[0] = 100 * 1000;
handler.limits[1] = 1000 * 1000;
handler.init(mockLCE, plugin);
quota = readIntFromFile(quotaFile);
Assert.assertEquals(-1, quota);
FileUtils.deleteQuietly(cgroupDir);
}
private int readIntFromFile(File targetFile) throws IOException {
Scanner scanner = new Scanner(targetFile);
if (scanner.hasNextInt()) {
return scanner.nextInt();
}
return -1;
}
@Test
public void testGetOverallLimits() {
int expectedQuota = 1000 * 1000;
CgroupsLCEResourcesHandler handler = new CgroupsLCEResourcesHandler();
int[] ret = handler.getOverallLimits(2);
Assert.assertEquals(expectedQuota / 2, ret[0]);
Assert.assertEquals(expectedQuota, ret[1]);
ret = handler.getOverallLimits(2000);
Assert.assertEquals(expectedQuota, ret[0]);
Assert.assertEquals(-1, ret[1]);
int[] params = { 0, -1 };
for (int cores : params) {
try {
handler.getOverallLimits(cores);
Assert.fail("Function call should throw error.");
} catch (IllegalArgumentException ie) {
// expected
}
}
// test minimums
ret = handler.getOverallLimits(1000 * 1000);
Assert.assertEquals(1000 * 1000, ret[0]);
Assert.assertEquals(-1, ret[1]);
}
}

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.util;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
public class TestNodeManagerHardwareUtils {
@Test
public void testGetContainerCores() {
YarnConfiguration conf = new YarnConfiguration();
float ret;
final int numProcessors = 4;
ResourceCalculatorPlugin plugin =
Mockito.mock(ResourceCalculatorPlugin.class);
Mockito.doReturn(numProcessors).when(plugin).getNumProcessors();
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 0);
try {
NodeManagerHardwareUtils.getContainersCores(plugin, conf);
Assert.fail("getContainerCores should have thrown exception");
} catch (IllegalArgumentException ie) {
// expected
}
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
100);
ret = NodeManagerHardwareUtils.getContainersCores(plugin, conf);
Assert.assertEquals(4, (int) ret);
conf
.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 50);
ret = NodeManagerHardwareUtils.getContainersCores(plugin, conf);
Assert.assertEquals(2, (int) ret);
conf
.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 75);
ret = NodeManagerHardwareUtils.getContainersCores(plugin, conf);
Assert.assertEquals(3, (int) ret);
conf
.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 85);
ret = NodeManagerHardwareUtils.getContainersCores(plugin, conf);
Assert.assertEquals(3.4, ret, 0.1);
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
110);
ret = NodeManagerHardwareUtils.getContainersCores(plugin, conf);
Assert.assertEquals(4, (int) ret);
}
}

View File

@ -22,11 +22,7 @@
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -329,6 +325,10 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
+ transferredContainers.size() + " containers from previous"
+ " attempts and " + nmTokens.size() + " NM tokens.");
}
response.setSchedulerResourceTypes(rScheduler
.getSchedulingResourceTypes());
return response;
}
}

View File

@ -401,7 +401,7 @@ private boolean isApplicationInFinalState(RMAppState rmAppState) {
}
}
private Credentials parseCredentials(ApplicationSubmissionContext application)
protected Credentials parseCredentials(ApplicationSubmissionContext application)
throws IOException {
Credentials credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer();

View File

@ -236,5 +236,5 @@ public static YarnApplicationAttemptState createApplicationAttemptState(
DUMMY_APPLICATION_RESOURCE_USAGE_REPORT =
BuilderUtils.newApplicationResourceUsageReport(-1, -1,
Resources.createResource(-1, -1), Resources.createResource(-1, -1),
Resources.createResource(-1, -1));
Resources.createResource(-1, -1), 0, 0);
}

View File

@ -280,7 +280,9 @@ private void loadRMAppState(RMState rmState) throws Exception {
attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus(),
attemptStateData.getAMContainerExitStatus());
attemptStateData.getAMContainerExitStatus(),
attemptStateData.getMemorySeconds(),
attemptStateData.getVcoreSeconds());
// assert child node name is same as application attempt id
assert attemptId.equals(attemptState.getAttemptId());

View File

@ -138,7 +138,10 @@ public synchronized void storeApplicationAttemptStateInternal(
ApplicationAttemptState attemptState =
new ApplicationAttemptState(appAttemptId,
attemptStateData.getMasterContainer(), credentials,
attemptStateData.getStartTime());
attemptStateData.getStartTime(),
attemptStateData.getMemorySeconds(),
attemptStateData.getVcoreSeconds());
ApplicationState appState = state.getApplicationState().get(
attemptState.getAttemptId().getApplicationId());
@ -167,7 +170,9 @@ public synchronized void updateApplicationAttemptStateInternal(
attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus(),
attemptStateData.getAMContainerExitStatus());
attemptStateData.getAMContainerExitStatus(),
attemptStateData.getMemorySeconds(),
attemptStateData.getVcoreSeconds());
ApplicationState appState =
state.getApplicationState().get(

View File

@ -55,6 +55,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@ -265,19 +266,21 @@ public static class ApplicationAttemptState {
String diagnostics;
int exitStatus = ContainerExitStatus.INVALID;
FinalApplicationStatus amUnregisteredFinalStatus;
long memorySeconds;
long vcoreSeconds;
public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer, Credentials appAttemptCredentials,
long startTime) {
long startTime, long memorySeconds, long vcoreSeconds) {
this(attemptId, masterContainer, appAttemptCredentials, startTime, null,
null, "", null, ContainerExitStatus.INVALID);
null, "", null, ContainerExitStatus.INVALID, memorySeconds, vcoreSeconds);
}
public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer, Credentials appAttemptCredentials,
long startTime, RMAppAttemptState state, String finalTrackingUrl,
String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus,
int exitStatus) {
int exitStatus, long memorySeconds, long vcoreSeconds) {
this.attemptId = attemptId;
this.masterContainer = masterContainer;
this.appAttemptCredentials = appAttemptCredentials;
@ -287,6 +290,8 @@ public ApplicationAttemptState(ApplicationAttemptId attemptId,
this.diagnostics = diagnostics == null ? "" : diagnostics;
this.amUnregisteredFinalStatus = amUnregisteredFinalStatus;
this.exitStatus = exitStatus;
this.memorySeconds = memorySeconds;
this.vcoreSeconds = vcoreSeconds;
}
public Container getMasterContainer() {
@ -316,6 +321,12 @@ public FinalApplicationStatus getFinalApplicationStatus() {
public int getAMContainerExitStatus(){
return this.exitStatus;
}
public long getMemorySeconds() {
return memorySeconds;
}
public long getVcoreSeconds() {
return vcoreSeconds;
}
}
/**
@ -587,10 +598,13 @@ protected abstract void updateApplicationStateInternal(ApplicationId appId,
public synchronized void storeNewApplicationAttempt(RMAppAttempt appAttempt) {
Credentials credentials = getCredentialsFromAppAttempt(appAttempt);
AggregateAppResourceUsage resUsage =
appAttempt.getRMAppAttemptMetrics().getAggregateAppResourceUsage();
ApplicationAttemptState attemptState =
new ApplicationAttemptState(appAttempt.getAppAttemptId(),
appAttempt.getMasterContainer(), credentials,
appAttempt.getStartTime());
appAttempt.getStartTime(), resUsage.getMemorySeconds(),
resUsage.getVcoreSeconds());
dispatcher.getEventHandler().handle(
new RMStateStoreAppAttemptEvent(attemptState));
@ -746,7 +760,7 @@ public synchronized void removeApplication(RMApp app) {
ApplicationAttemptState attemptState =
new ApplicationAttemptState(appAttempt.getAppAttemptId(),
appAttempt.getMasterContainer(), credentials,
appAttempt.getStartTime());
appAttempt.getStartTime(), 0, 0);
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}

View File

@ -603,7 +603,9 @@ private void loadApplicationAttemptState(ApplicationState appState,
attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus(),
attemptStateData.getAMContainerExitStatus());
attemptStateData.getAMContainerExitStatus(),
attemptStateData.getMemorySeconds(),
attemptStateData.getVcoreSeconds());
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}

View File

@ -43,7 +43,8 @@ public static ApplicationAttemptStateData newInstance(
ApplicationAttemptId attemptId, Container container,
ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
String finalTrackingUrl, String diagnostics,
FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus) {
FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus,
long memorySeconds, long vcoreSeconds) {
ApplicationAttemptStateData attemptStateData =
Records.newRecord(ApplicationAttemptStateData.class);
attemptStateData.setAttemptId(attemptId);
@ -55,6 +56,8 @@ public static ApplicationAttemptStateData newInstance(
attemptStateData.setStartTime(startTime);
attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
attemptStateData.setAMContainerExitStatus(exitStatus);
attemptStateData.setMemorySeconds(memorySeconds);
attemptStateData.setVcoreSeconds(vcoreSeconds);
return attemptStateData;
}
@ -72,7 +75,8 @@ public static ApplicationAttemptStateData newInstance(
attemptState.getStartTime(), attemptState.getState(),
attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(),
attemptState.getFinalApplicationStatus(),
attemptState.getAMContainerExitStatus());
attemptState.getAMContainerExitStatus(),
attemptState.getMemorySeconds(), attemptState.getVcoreSeconds());
}
public abstract ApplicationAttemptStateDataProto getProto();
@ -157,4 +161,28 @@ public abstract void setFinalApplicationStatus(
public abstract int getAMContainerExitStatus();
public abstract void setAMContainerExitStatus(int exitStatus);
/**
* Get the <em>memory seconds</em> (in MB seconds) of the application.
* @return <em>memory seconds</em> (in MB seconds) of the application
*/
@Public
@Unstable
public abstract long getMemorySeconds();
@Public
@Unstable
public abstract void setMemorySeconds(long memorySeconds);
/**
* Get the <em>vcore seconds</em> of the application.
* @return <em>vcore seconds</em> of the application
*/
@Public
@Unstable
public abstract long getVcoreSeconds();
@Public
@Unstable
public abstract void setVcoreSeconds(long vcoreSeconds);
}

View File

@ -228,6 +228,30 @@ public void setStartTime(long startTime) {
builder.setStartTime(startTime);
}
@Override
public long getMemorySeconds() {
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
return p.getMemorySeconds();
}
@Override
public long getVcoreSeconds() {
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
return p.getVcoreSeconds();
}
@Override
public void setMemorySeconds(long memorySeconds) {
maybeInitBuilder();
builder.setMemorySeconds(memorySeconds);
}
@Override
public void setVcoreSeconds(long vcoreSeconds) {
maybeInitBuilder();
builder.setVcoreSeconds(vcoreSeconds);
}
@Override
public FinalApplicationStatus getFinalApplicationStatus() {
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;

View File

@ -62,6 +62,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@ -150,8 +151,10 @@ RMAppEventType.START, new RMAppNewlySavingTransition())
RMAppEventType.RECOVER, new RMAppRecoveredTransition())
.addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
new AppKilledTransition())
.addTransition(RMAppState.NEW, RMAppState.FAILED,
RMAppEventType.APP_REJECTED, new AppRejectedTransition())
.addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING,
RMAppEventType.APP_REJECTED,
new FinalSavingTransition(new AppRejectedTransition(),
RMAppState.FAILED))
// Transitions from NEW_SAVING state
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
@ -559,6 +562,10 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName,
}
}
}
RMAppMetrics rmAppMetrics = getRMAppMetrics();
appUsageReport.setMemorySeconds(rmAppMetrics.getMemorySeconds());
appUsageReport.setVcoreSeconds(rmAppMetrics.getVcoreSeconds());
}
if (currentApplicationAttemptId == null) {
@ -1115,7 +1122,6 @@ public AttemptFailedTransition(RMAppState initialState) {
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
if (!app.submissionContext.getUnmanagedAM()
&& app.getNumFailedAppAttempts() < app.maxAppAttempts) {
boolean transferStateFromPreviousAttempt = false;
@ -1197,6 +1203,8 @@ public RMAppMetrics getRMAppMetrics() {
Resource resourcePreempted = Resource.newInstance(0, 0);
int numAMContainerPreempted = 0;
int numNonAMContainerPreempted = 0;
long memorySeconds = 0;
long vcoreSeconds = 0;
for (RMAppAttempt attempt : attempts.values()) {
if (null != attempt) {
RMAppAttemptMetrics attemptMetrics =
@ -1206,10 +1214,17 @@ public RMAppMetrics getRMAppMetrics() {
numAMContainerPreempted += attemptMetrics.getIsPreempted() ? 1 : 0;
numNonAMContainerPreempted +=
attemptMetrics.getNumNonAMContainersPreempted();
// getAggregateAppResourceUsage() will calculate resource usage stats
// for both running and finished containers.
AggregateAppResourceUsage resUsage =
attempt.getRMAppAttemptMetrics().getAggregateAppResourceUsage();
memorySeconds += resUsage.getMemorySeconds();
vcoreSeconds += resUsage.getVcoreSeconds();
}
}
return new RMAppMetrics(resourcePreempted,
numNonAMContainerPreempted, numAMContainerPreempted);
numNonAMContainerPreempted, numAMContainerPreempted,
memorySeconds, vcoreSeconds);
}
}

View File

@ -24,12 +24,17 @@ public class RMAppMetrics {
final Resource resourcePreempted;
final int numNonAMContainersPreempted;
final int numAMContainersPreempted;
final long memorySeconds;
final long vcoreSeconds;
public RMAppMetrics(Resource resourcePreempted,
int numNonAMContainersPreempted, int numAMContainersPreempted) {
int numNonAMContainersPreempted, int numAMContainersPreempted,
long memorySeconds, long vcoreSeconds) {
this.resourcePreempted = resourcePreempted;
this.numNonAMContainersPreempted = numNonAMContainersPreempted;
this.numAMContainersPreempted = numAMContainersPreempted;
this.memorySeconds = memorySeconds;
this.vcoreSeconds = vcoreSeconds;
}
public Resource getResourcePreempted() {
@ -43,4 +48,12 @@ public int getNumNonAMContainersPreempted() {
public int getNumAMContainersPreempted() {
return numAMContainersPreempted;
}
public long getMemorySeconds() {
return memorySeconds;
}
public long getVcoreSeconds() {
return vcoreSeconds;
}
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@Private
public class AggregateAppResourceUsage {
long memorySeconds;
long vcoreSeconds;
public AggregateAppResourceUsage(long memorySeconds, long vcoreSeconds) {
this.memorySeconds = memorySeconds;
this.vcoreSeconds = vcoreSeconds;
}
/**
* @return the memorySeconds
*/
public long getMemorySeconds() {
return memorySeconds;
}
/**
* @param memorySeconds the memorySeconds to set
*/
public void setMemorySeconds(long memorySeconds) {
this.memorySeconds = memorySeconds;
}
/**
* @return the vcoreSeconds
*/
public long getVcoreSeconds() {
return vcoreSeconds;
}
/**
* @param vcoreSeconds the vcoreSeconds to set
*/
public void setVcoreSeconds(long vcoreSeconds) {
this.vcoreSeconds = vcoreSeconds;
}
}

View File

@ -85,6 +85,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
@ -430,7 +431,8 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
this.proxiedTrackingUrl = generateProxyUriWithScheme(null);
this.maybeLastAttempt = maybeLastAttempt;
this.stateMachine = stateMachineFactory.make(this);
this.attemptMetrics = new RMAppAttemptMetrics(applicationAttemptId);
this.attemptMetrics =
new RMAppAttemptMetrics(applicationAttemptId, rmContext);
}
@Override
@ -704,6 +706,10 @@ public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
if (report == null) {
report = RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
}
AggregateAppResourceUsage resUsage =
this.attemptMetrics.getAggregateAppResourceUsage();
report.setMemorySeconds(resUsage.getMemorySeconds());
report.setVcoreSeconds(resUsage.getVcoreSeconds());
return report;
} finally {
this.readLock.unlock();
@ -733,6 +739,8 @@ public void recover(RMState state) throws Exception {
this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
this.finalStatus = attemptState.getFinalApplicationStatus();
this.startTime = attemptState.getStartTime();
this.attemptMetrics.updateAggregateAppResourceUsage(
attemptState.getMemorySeconds(),attemptState.getVcoreSeconds());
}
public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
@ -1017,12 +1025,14 @@ private void rememberTargetTransitionsAndStoreState(RMAppAttemptEvent event,
default:
break;
}
AggregateAppResourceUsage resUsage =
this.attemptMetrics.getAggregateAppResourceUsage();
RMStateStore rmStore = rmContext.getStateStore();
ApplicationAttemptState attemptState =
new ApplicationAttemptState(applicationAttemptId, getMasterContainer(),
rmStore.getCredentialsFromAppAttempt(this), startTime,
stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus);
stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus,
resUsage.getMemorySeconds(), resUsage.getVcoreSeconds());
LOG.info("Updating application attempt " + applicationAttemptId
+ " with final state: " + targetedFinalState + ", and exit status: "
+ exitStatus);

Some files were not shown because too many files have changed in this diff Show More