diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index bace31168b4..7bdfd17b5dd 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -183,6 +183,9 @@ Trunk (Unreleased)
HADOOP-10842. CryptoExtension generateEncryptedKey method should
receive the key name. (asuresh via tucu)
+ HADOOP-10750. KMSKeyProviderCache should be in hadoop-common.
+ (asuresh via tucu)
+
BUG FIXES
HADOOP-9451. Fault single-layer config if node group topology is enabled.
@@ -432,6 +435,9 @@ Release 2.6.0 - UNRELEASED
HADOOP-10610. Upgrade S3n s3.fs.buffer.dir to support multi directories.
(Ted Malaska via atm)
+ HADOOP-10817. ProxyUsers configuration should support configurable
+ prefixes. (tucu)
+
OPTIMIZATIONS
BUG FIXES
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/CachingKeyProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/CachingKeyProvider.java
new file mode 100644
index 00000000000..057df33a61d
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/CachingKeyProvider.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto.key;
+
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+/**
+ * A KeyProviderExtension
implementation providing a short lived
+ * cache for KeyVersions
and Metadata
to avoid burst
+ * of requests to hit the underlying KeyProvider
.
+ */
+public class CachingKeyProvider extends
+ KeyProviderExtension {
+
+ static class CacheExtension implements KeyProviderExtension.Extension {
+ private final KeyProvider provider;
+ private LoadingCache keyVersionCache;
+ private LoadingCache currentKeyCache;
+ private LoadingCache keyMetadataCache;
+
+ CacheExtension(KeyProvider prov, long keyTimeoutMillis,
+ long currKeyTimeoutMillis) {
+ this.provider = prov;
+ keyVersionCache =
+ CacheBuilder.newBuilder().expireAfterAccess(keyTimeoutMillis,
+ TimeUnit.MILLISECONDS)
+ .build(new CacheLoader() {
+ @Override
+ public KeyVersion load(String key) throws Exception {
+ KeyVersion kv = provider.getKeyVersion(key);
+ if (kv == null) {
+ throw new KeyNotFoundException();
+ }
+ return kv;
+ }
+ });
+ keyMetadataCache =
+ CacheBuilder.newBuilder().expireAfterAccess(keyTimeoutMillis,
+ TimeUnit.MILLISECONDS)
+ .build(new CacheLoader() {
+ @Override
+ public Metadata load(String key) throws Exception {
+ Metadata meta = provider.getMetadata(key);
+ if (meta == null) {
+ throw new KeyNotFoundException();
+ }
+ return meta;
+ }
+ });
+ currentKeyCache =
+ CacheBuilder.newBuilder().expireAfterWrite(currKeyTimeoutMillis,
+ TimeUnit.MILLISECONDS)
+ .build(new CacheLoader() {
+ @Override
+ public KeyVersion load(String key) throws Exception {
+ KeyVersion kv = provider.getCurrentKey(key);
+ if (kv == null) {
+ throw new KeyNotFoundException();
+ }
+ return kv;
+ }
+ });
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static class KeyNotFoundException extends Exception { }
+
+ public CachingKeyProvider(KeyProvider keyProvider, long keyTimeoutMillis,
+ long currKeyTimeoutMillis) {
+ super(keyProvider, new CacheExtension(keyProvider, keyTimeoutMillis,
+ currKeyTimeoutMillis));
+ }
+
+ @Override
+ public KeyVersion getCurrentKey(String name) throws IOException {
+ try {
+ return getExtension().currentKeyCache.get(name);
+ } catch (ExecutionException ex) {
+ Throwable cause = ex.getCause();
+ if (cause instanceof KeyNotFoundException) {
+ return null;
+ } else if (cause instanceof IOException) {
+ throw (IOException) cause;
+ } else {
+ throw new IOException(cause);
+ }
+ }
+ }
+
+ @Override
+ public KeyVersion getKeyVersion(String versionName)
+ throws IOException {
+ try {
+ return getExtension().keyVersionCache.get(versionName);
+ } catch (ExecutionException ex) {
+ Throwable cause = ex.getCause();
+ if (cause instanceof KeyNotFoundException) {
+ return null;
+ } else if (cause instanceof IOException) {
+ throw (IOException) cause;
+ } else {
+ throw new IOException(cause);
+ }
+ }
+ }
+
+ @Override
+ public void deleteKey(String name) throws IOException {
+ getKeyProvider().deleteKey(name);
+ getExtension().currentKeyCache.invalidate(name);
+ getExtension().keyMetadataCache.invalidate(name);
+ // invalidating all key versions as we don't know
+ // which ones belonged to the deleted key
+ getExtension().keyVersionCache.invalidateAll();
+ }
+
+ @Override
+ public KeyVersion rollNewVersion(String name, byte[] material)
+ throws IOException {
+ KeyVersion key = getKeyProvider().rollNewVersion(name, material);
+ getExtension().currentKeyCache.invalidate(name);
+ getExtension().keyMetadataCache.invalidate(name);
+ return key;
+ }
+
+ @Override
+ public KeyVersion rollNewVersion(String name)
+ throws NoSuchAlgorithmException, IOException {
+ KeyVersion key = getKeyProvider().rollNewVersion(name);
+ getExtension().currentKeyCache.invalidate(name);
+ getExtension().keyMetadataCache.invalidate(name);
+ return key;
+ }
+
+ @Override
+ public Metadata getMetadata(String name) throws IOException {
+ try {
+ return getExtension().keyMetadataCache.get(name);
+ } catch (ExecutionException ex) {
+ Throwable cause = ex.getCause();
+ if (cause instanceof KeyNotFoundException) {
+ return null;
+ } else if (cause instanceof IOException) {
+ throw (IOException) cause;
+ } else {
+ throw new IOException(cause);
+ }
+ }
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/DefaultImpersonationProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/DefaultImpersonationProvider.java
index afa46582d32..ab1c390f464 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/DefaultImpersonationProvider.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/DefaultImpersonationProvider.java
@@ -24,37 +24,64 @@
import java.util.Map.Entry;
import java.util.regex.Pattern;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.MachineList;
import com.google.common.annotations.VisibleForTesting;
+@InterfaceStability.Unstable
+@InterfaceAudience.Public
public class DefaultImpersonationProvider implements ImpersonationProvider {
private static final String CONF_HOSTS = ".hosts";
private static final String CONF_USERS = ".users";
private static final String CONF_GROUPS = ".groups";
- private static final String CONF_HADOOP_PROXYUSER = "hadoop.proxyuser.";
- private static final String CONF_HADOOP_PROXYUSER_RE = "hadoop\\.proxyuser\\.";
- private static final String CONF_HADOOP_PROXYUSER_RE_USERS_GROUPS =
- CONF_HADOOP_PROXYUSER_RE+"[^.]*(" + Pattern.quote(CONF_USERS) +
- "|" + Pattern.quote(CONF_GROUPS) + ")";
- private static final String CONF_HADOOP_PROXYUSER_RE_HOSTS =
- CONF_HADOOP_PROXYUSER_RE+"[^.]*"+ Pattern.quote(CONF_HOSTS);
// acl and list of hosts per proxyuser
private Map proxyUserAcl =
new HashMap();
- private static Map proxyHosts =
+ private Map proxyHosts =
new HashMap();
private Configuration conf;
+
+ private static DefaultImpersonationProvider testProvider;
+
+ public static synchronized DefaultImpersonationProvider getTestProvider() {
+ if (testProvider == null) {
+ testProvider = new DefaultImpersonationProvider();
+ testProvider.setConf(new Configuration());
+ testProvider.init(ProxyUsers.CONF_HADOOP_PROXYUSER);
+ }
+ return testProvider;
+ }
+
@Override
public void setConf(Configuration conf) {
this.conf = conf;
+ }
- // get list of users and groups per proxyuser
+ private String configPrefix;
+
+ @Override
+ public void init(String configurationPrefix) {
+ configPrefix = configurationPrefix +
+ (configurationPrefix.endsWith(".") ? "" : ".");
+
+ // constructing regex to match the following patterns:
+ // $configPrefix.[ANY].users
+ // $configPrefix.[ANY].groups
+ // $configPrefix.[ANY].hosts
+ //
+ String prefixRegEx = configPrefix.replace(".", "\\.");
+ String usersGroupsRegEx = prefixRegEx + "[^.]*(" +
+ Pattern.quote(CONF_USERS) + "|" + Pattern.quote(CONF_GROUPS) + ")";
+ String hostsRegEx = prefixRegEx + "[^.]*" + Pattern.quote(CONF_HOSTS);
+
+ // get list of users and groups per proxyuser
Map allMatchKeys =
- conf.getValByRegex(CONF_HADOOP_PROXYUSER_RE_USERS_GROUPS);
+ conf.getValByRegex(usersGroupsRegEx);
for(Entry entry : allMatchKeys.entrySet()) {
String aclKey = getAclKey(entry.getKey());
if (!proxyUserAcl.containsKey(aclKey)) {
@@ -65,7 +92,7 @@ public void setConf(Configuration conf) {
}
// get hosts per proxyuser
- allMatchKeys = conf.getValByRegex(CONF_HADOOP_PROXYUSER_RE_HOSTS);
+ allMatchKeys = conf.getValByRegex(hostsRegEx);
for(Entry entry : allMatchKeys.entrySet()) {
proxyHosts.put(entry.getKey(),
new MachineList(entry.getValue()));
@@ -86,8 +113,8 @@ public void authorize(UserGroupInformation user,
return;
}
- AccessControlList acl = proxyUserAcl.get(
- CONF_HADOOP_PROXYUSER+realUser.getShortUserName());
+ AccessControlList acl = proxyUserAcl.get(configPrefix +
+ realUser.getShortUserName());
if (acl == null || !acl.isUserAllowed(user)) {
throw new AuthorizationException("User: " + realUser.getUserName()
+ " is not allowed to impersonate " + user.getUserName());
@@ -116,8 +143,8 @@ private String getAclKey(String key) {
* @param userName name of the superuser
* @return configuration key for superuser usergroups
*/
- public static String getProxySuperuserUserConfKey(String userName) {
- return CONF_HADOOP_PROXYUSER+userName+CONF_USERS;
+ public String getProxySuperuserUserConfKey(String userName) {
+ return configPrefix + userName + CONF_USERS;
}
/**
@@ -126,8 +153,8 @@ public static String getProxySuperuserUserConfKey(String userName) {
* @param userName name of the superuser
* @return configuration key for superuser groups
*/
- public static String getProxySuperuserGroupConfKey(String userName) {
- return CONF_HADOOP_PROXYUSER+userName+CONF_GROUPS;
+ public String getProxySuperuserGroupConfKey(String userName) {
+ return configPrefix + userName + CONF_GROUPS;
}
/**
@@ -136,8 +163,8 @@ public static String getProxySuperuserGroupConfKey(String userName) {
* @param userName name of the superuser
* @return configuration key for superuser ip-addresses
*/
- public static String getProxySuperuserIpConfKey(String userName) {
- return CONF_HADOOP_PROXYUSER+userName+CONF_HOSTS;
+ public String getProxySuperuserIpConfKey(String userName) {
+ return configPrefix + userName + CONF_HOSTS;
}
@VisibleForTesting
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ImpersonationProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ImpersonationProvider.java
index 6e7a39565df..8b483f0336f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ImpersonationProvider.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ImpersonationProvider.java
@@ -18,10 +18,25 @@
package org.apache.hadoop.security.authorize;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.security.UserGroupInformation;
+@InterfaceStability.Unstable
+@InterfaceAudience.Public
public interface ImpersonationProvider extends Configurable {
+
+
+ /**
+ * Specifies the configuration prefix for the proxy user properties and
+ * initializes the provider.
+ *
+ * @param configurationPrefix the configuration prefix for the proxy user
+ * properties
+ */
+ public void init(String configurationPrefix);
+
/**
* Authorize the superuser which is doing doAs
*
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ProxyUsers.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ProxyUsers.java
index e221ae390f2..60d82cbdcf7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ProxyUsers.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ProxyUsers.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.security.authorize;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.security.UserGroupInformation;
@@ -26,9 +28,12 @@
import com.google.common.annotations.VisibleForTesting;
+@InterfaceStability.Unstable
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce", "HBase", "Hive"})
public class ProxyUsers {
+ public static final String CONF_HADOOP_PROXYUSER = "hadoop.proxyuser";
+
private static volatile ImpersonationProvider sip ;
/**
@@ -54,15 +59,31 @@ public static void refreshSuperUserGroupsConfiguration() {
}
/**
- * refresh configuration
- * @param conf
+ * Refreshes configuration using the specified Proxy user prefix for
+ * properties.
+ *
+ * @param conf configuration
+ * @param proxyUserPrefix proxy user configuration prefix
*/
- public static void refreshSuperUserGroupsConfiguration(Configuration conf) {
+ public static void refreshSuperUserGroupsConfiguration(Configuration conf,
+ String proxyUserPrefix) {
+ Preconditions.checkArgument(proxyUserPrefix != null &&
+ !proxyUserPrefix.isEmpty(), "prefix cannot be NULL or empty");
// sip is volatile. Any assignment to it as well as the object's state
// will be visible to all the other threads.
- sip = getInstance(conf);
+ ImpersonationProvider ip = getInstance(conf);
+ ip.init(proxyUserPrefix);
+ sip = ip;
ProxyServers.refresh(conf);
}
+
+ /**
+ * Refreshes configuration using the default Proxy user prefix for properties.
+ * @param conf configuration
+ */
+ public static void refreshSuperUserGroupsConfiguration(Configuration conf) {
+ refreshSuperUserGroupsConfiguration(conf, CONF_HADOOP_PROXYUSER);
+ }
/**
* Authorize the superuser which is doing doAs
diff --git a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMSCacheKeyProvider.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestCachingKeyProvider.java
similarity index 64%
rename from hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMSCacheKeyProvider.java
rename to hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestCachingKeyProvider.java
index 72b219191bc..2eff6991c3d 100644
--- a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMSCacheKeyProvider.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestCachingKeyProvider.java
@@ -15,17 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.crypto.key.kms.server;
+package org.apache.hadoop.crypto.key;
+
+import java.util.Date;
-import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
-import java.util.Date;
-
-public class TestKMSCacheKeyProvider {
+public class TestCachingKeyProvider {
@Test
public void testCurrentKey() throws Exception {
@@ -33,7 +32,7 @@ public void testCurrentKey() throws Exception {
KeyProvider mockProv = Mockito.mock(KeyProvider.class);
Mockito.when(mockProv.getCurrentKey(Mockito.eq("k1"))).thenReturn(mockKey);
Mockito.when(mockProv.getCurrentKey(Mockito.eq("k2"))).thenReturn(null);
- KeyProvider cache = new KMSCacheKeyProvider(mockProv, 100);
+ KeyProvider cache = new CachingKeyProvider(mockProv, 100, 100);
// asserting caching
Assert.assertEquals(mockKey, cache.getCurrentKey("k1"));
@@ -45,7 +44,7 @@ public void testCurrentKey() throws Exception {
Mockito.verify(mockProv, Mockito.times(2)).getCurrentKey(Mockito.eq("k1"));
// asserting no caching when key is not known
- cache = new KMSCacheKeyProvider(mockProv, 100);
+ cache = new CachingKeyProvider(mockProv, 100, 100);
Assert.assertEquals(null, cache.getCurrentKey("k2"));
Mockito.verify(mockProv, Mockito.times(1)).getCurrentKey(Mockito.eq("k2"));
Assert.assertEquals(null, cache.getCurrentKey("k2"));
@@ -56,25 +55,56 @@ public void testCurrentKey() throws Exception {
public void testKeyVersion() throws Exception {
KeyProvider.KeyVersion mockKey = Mockito.mock(KeyProvider.KeyVersion.class);
KeyProvider mockProv = Mockito.mock(KeyProvider.class);
- Mockito.when(mockProv.getKeyVersion(Mockito.eq("k1@0"))).thenReturn(mockKey);
+ Mockito.when(mockProv.getKeyVersion(Mockito.eq("k1@0")))
+ .thenReturn(mockKey);
Mockito.when(mockProv.getKeyVersion(Mockito.eq("k2@0"))).thenReturn(null);
- KeyProvider cache = new KMSCacheKeyProvider(mockProv, 100);
+ KeyProvider cache = new CachingKeyProvider(mockProv, 100, 100);
// asserting caching
Assert.assertEquals(mockKey, cache.getKeyVersion("k1@0"));
- Mockito.verify(mockProv, Mockito.times(1)).getKeyVersion(Mockito.eq("k1@0"));
+ Mockito.verify(mockProv, Mockito.times(1))
+ .getKeyVersion(Mockito.eq("k1@0"));
Assert.assertEquals(mockKey, cache.getKeyVersion("k1@0"));
- Mockito.verify(mockProv, Mockito.times(1)).getKeyVersion(Mockito.eq("k1@0"));
+ Mockito.verify(mockProv, Mockito.times(1))
+ .getKeyVersion(Mockito.eq("k1@0"));
Thread.sleep(200);
Assert.assertEquals(mockKey, cache.getKeyVersion("k1@0"));
- Mockito.verify(mockProv, Mockito.times(2)).getKeyVersion(Mockito.eq("k1@0"));
+ Mockito.verify(mockProv, Mockito.times(2))
+ .getKeyVersion(Mockito.eq("k1@0"));
// asserting no caching when key is not known
- cache = new KMSCacheKeyProvider(mockProv, 100);
+ cache = new CachingKeyProvider(mockProv, 100, 100);
Assert.assertEquals(null, cache.getKeyVersion("k2@0"));
- Mockito.verify(mockProv, Mockito.times(1)).getKeyVersion(Mockito.eq("k2@0"));
+ Mockito.verify(mockProv, Mockito.times(1))
+ .getKeyVersion(Mockito.eq("k2@0"));
Assert.assertEquals(null, cache.getKeyVersion("k2@0"));
- Mockito.verify(mockProv, Mockito.times(2)).getKeyVersion(Mockito.eq("k2@0"));
+ Mockito.verify(mockProv, Mockito.times(2))
+ .getKeyVersion(Mockito.eq("k2@0"));
+ }
+
+ @Test
+ public void testMetadata() throws Exception {
+ KeyProvider.Metadata mockMeta = Mockito.mock(KeyProvider.Metadata.class);
+ KeyProvider mockProv = Mockito.mock(KeyProvider.class);
+ Mockito.when(mockProv.getMetadata(Mockito.eq("k1"))).thenReturn(mockMeta);
+ Mockito.when(mockProv.getMetadata(Mockito.eq("k2"))).thenReturn(null);
+ KeyProvider cache = new CachingKeyProvider(mockProv, 100, 100);
+
+ // asserting caching
+ Assert.assertEquals(mockMeta, cache.getMetadata("k1"));
+ Mockito.verify(mockProv, Mockito.times(1)).getMetadata(Mockito.eq("k1"));
+ Assert.assertEquals(mockMeta, cache.getMetadata("k1"));
+ Mockito.verify(mockProv, Mockito.times(1)).getMetadata(Mockito.eq("k1"));
+ Thread.sleep(200);
+ Assert.assertEquals(mockMeta, cache.getMetadata("k1"));
+ Mockito.verify(mockProv, Mockito.times(2)).getMetadata(Mockito.eq("k1"));
+
+ // asserting no caching when key is not known
+ cache = new CachingKeyProvider(mockProv, 100, 100);
+ Assert.assertEquals(null, cache.getMetadata("k2"));
+ Mockito.verify(mockProv, Mockito.times(1)).getMetadata(Mockito.eq("k2"));
+ Assert.assertEquals(null, cache.getMetadata("k2"));
+ Mockito.verify(mockProv, Mockito.times(2)).getMetadata(Mockito.eq("k2"));
}
@Test
@@ -82,7 +112,7 @@ public void testRollNewVersion() throws Exception {
KeyProvider.KeyVersion mockKey = Mockito.mock(KeyProvider.KeyVersion.class);
KeyProvider mockProv = Mockito.mock(KeyProvider.class);
Mockito.when(mockProv.getCurrentKey(Mockito.eq("k1"))).thenReturn(mockKey);
- KeyProvider cache = new KMSCacheKeyProvider(mockProv, 100);
+ KeyProvider cache = new CachingKeyProvider(mockProv, 100, 100);
Assert.assertEquals(mockKey, cache.getCurrentKey("k1"));
Mockito.verify(mockProv, Mockito.times(1)).getCurrentKey(Mockito.eq("k1"));
cache.rollNewVersion("k1");
@@ -100,21 +130,23 @@ public void testDeleteKey() throws Exception {
KeyProvider.KeyVersion mockKey = Mockito.mock(KeyProvider.KeyVersion.class);
KeyProvider mockProv = Mockito.mock(KeyProvider.class);
Mockito.when(mockProv.getCurrentKey(Mockito.eq("k1"))).thenReturn(mockKey);
- Mockito.when(mockProv.getKeyVersion(Mockito.eq("k1@0"))).thenReturn(mockKey);
+ Mockito.when(mockProv.getKeyVersion(Mockito.eq("k1@0")))
+ .thenReturn(mockKey);
Mockito.when(mockProv.getMetadata(Mockito.eq("k1"))).thenReturn(
new KMSClientProvider.KMSMetadata("c", 0, "l", null, new Date(), 1));
- KeyProvider cache = new KMSCacheKeyProvider(mockProv, 100);
+ KeyProvider cache = new CachingKeyProvider(mockProv, 100, 100);
Assert.assertEquals(mockKey, cache.getCurrentKey("k1"));
Mockito.verify(mockProv, Mockito.times(1)).getCurrentKey(Mockito.eq("k1"));
Assert.assertEquals(mockKey, cache.getKeyVersion("k1@0"));
- Mockito.verify(mockProv, Mockito.times(1)).getKeyVersion(Mockito.eq("k1@0"));
+ Mockito.verify(mockProv, Mockito.times(1))
+ .getKeyVersion(Mockito.eq("k1@0"));
cache.deleteKey("k1");
// asserting the cache is purged
Assert.assertEquals(mockKey, cache.getCurrentKey("k1"));
Mockito.verify(mockProv, Mockito.times(2)).getCurrentKey(Mockito.eq("k1"));
Assert.assertEquals(mockKey, cache.getKeyVersion("k1@0"));
- Mockito.verify(mockProv, Mockito.times(2)).getKeyVersion(Mockito.eq("k1@0"));
+ Mockito.verify(mockProv, Mockito.times(2))
+ .getKeyVersion(Mockito.eq("k1@0"));
}
-
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/MiniRPCBenchmark.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/MiniRPCBenchmark.java
index 0bc0b047dad..cdbd5570f1b 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/MiniRPCBenchmark.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/MiniRPCBenchmark.java
@@ -327,8 +327,8 @@ long runMiniBenchmarkWithDelegationToken(Configuration conf,
String shortUserName =
UserGroupInformation.createRemoteUser(user).getShortUserName();
try {
- conf.setStrings(DefaultImpersonationProvider.getProxySuperuserGroupConfKey(shortUserName),
- GROUP_NAME_1);
+ conf.setStrings(DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserGroupConfKey(shortUserName), GROUP_NAME_1);
configureSuperUserIPAddresses(conf, shortUserName);
// start the server
miniServer = new MiniServer(conf, user, keytabFile);
@@ -411,7 +411,7 @@ private void configureSuperUserIPAddresses(Configuration conf,
}
builder.append("127.0.1.1,");
builder.append(InetAddress.getLocalHost().getCanonicalHostName());
- conf.setStrings(DefaultImpersonationProvider.getProxySuperuserIpConfKey(superUserShortName),
- builder.toString());
+ conf.setStrings(DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserIpConfKey(superUserShortName), builder.toString());
}
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java
index 42e7881d3a9..b44fa8b85af 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java
@@ -101,7 +101,8 @@ private void configureSuperUserIPAddresses(Configuration conf,
builder.append("127.0.1.1,");
builder.append(InetAddress.getLocalHost().getCanonicalHostName());
LOG.info("Local Ip addresses: "+builder.toString());
- conf.setStrings(DefaultImpersonationProvider.getProxySuperuserIpConfKey(superUserShortName),
+ conf.setStrings(DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserIpConfKey(superUserShortName),
builder.toString());
}
@@ -181,8 +182,8 @@ public Void run() throws IOException {
@Test(timeout=4000)
public void testRealUserSetup() throws IOException {
final Configuration conf = new Configuration();
- conf.setStrings(DefaultImpersonationProvider
- .getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1");
+ conf.setStrings(DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1");
configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
@@ -214,7 +215,8 @@ public void testRealUserSetup() throws IOException {
public void testRealUserAuthorizationSuccess() throws IOException {
final Configuration conf = new Configuration();
configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
- conf.setStrings(DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
+ conf.setStrings(DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
"group1");
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
@@ -248,9 +250,11 @@ public void testRealUserAuthorizationSuccess() throws IOException {
@Test
public void testRealUserIPAuthorizationFailure() throws IOException {
final Configuration conf = new Configuration();
- conf.setStrings(DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_SHORT_NAME),
+ conf.setStrings(DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserIpConfKey(REAL_USER_SHORT_NAME),
"20.20.20.20"); //Authorized IP address
- conf.setStrings(DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
+ conf.setStrings(DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
"group1");
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
@@ -293,8 +297,8 @@ public String run() throws IOException {
@Test
public void testRealUserIPNotSpecified() throws IOException {
final Configuration conf = new Configuration();
- conf.setStrings(DefaultImpersonationProvider
- .getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1");
+ conf.setStrings(DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1");
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(2).setVerbose(false).build();
@@ -377,7 +381,8 @@ public String run() throws IOException {
public void testRealUserGroupAuthorizationFailure() throws IOException {
final Configuration conf = new Configuration();
configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
- conf.setStrings(DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
+ conf.setStrings(DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
"group3");
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/authorize/TestProxyUsers.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/authorize/TestProxyUsers.java
index 3823d70391a..dbcac676fab 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/authorize/TestProxyUsers.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/authorize/TestProxyUsers.java
@@ -111,10 +111,12 @@ public void testNetgroups () throws IOException{
groupMappingClassName);
conf.set(
- DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME),
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserGroupConfKey(REAL_USER_NAME),
StringUtils.join(",", Arrays.asList(NETGROUP_NAMES)));
conf.set(
- DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME),
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserIpConfKey(REAL_USER_NAME),
PROXY_IP);
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
@@ -135,10 +137,12 @@ public void testNetgroups () throws IOException{
public void testProxyUsers() throws Exception {
Configuration conf = new Configuration();
conf.set(
- DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME),
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserGroupConfKey(REAL_USER_NAME),
StringUtils.join(",", Arrays.asList(GROUP_NAMES)));
conf.set(
- DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME),
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserIpConfKey(REAL_USER_NAME),
PROXY_IP);
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
@@ -168,10 +172,12 @@ public void testProxyUsers() throws Exception {
public void testProxyUsersWithUserConf() throws Exception {
Configuration conf = new Configuration();
conf.set(
- DefaultImpersonationProvider.getProxySuperuserUserConfKey(REAL_USER_NAME),
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserUserConfKey(REAL_USER_NAME),
StringUtils.join(",", Arrays.asList(AUTHORIZED_PROXY_USER_NAME)));
conf.set(
- DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME),
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserIpConfKey(REAL_USER_NAME),
PROXY_IP);
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
@@ -202,10 +208,12 @@ public void testProxyUsersWithUserConf() throws Exception {
public void testWildcardGroup() {
Configuration conf = new Configuration();
conf.set(
- DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME),
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserGroupConfKey(REAL_USER_NAME),
"*");
conf.set(
- DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME),
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserIpConfKey(REAL_USER_NAME),
PROXY_IP);
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
@@ -236,10 +244,12 @@ public void testWildcardGroup() {
public void testWildcardUser() {
Configuration conf = new Configuration();
conf.set(
- DefaultImpersonationProvider.getProxySuperuserUserConfKey(REAL_USER_NAME),
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserUserConfKey(REAL_USER_NAME),
"*");
conf.set(
- DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME),
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserIpConfKey(REAL_USER_NAME),
PROXY_IP);
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
@@ -270,10 +280,12 @@ public void testWildcardUser() {
public void testWildcardIP() {
Configuration conf = new Configuration();
conf.set(
- DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME),
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserGroupConfKey(REAL_USER_NAME),
StringUtils.join(",", Arrays.asList(GROUP_NAMES)));
conf.set(
- DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME),
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserIpConfKey(REAL_USER_NAME),
"*");
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
@@ -301,10 +313,12 @@ public void testWildcardIP() {
public void testIPRange() {
Configuration conf = new Configuration();
conf.set(
- DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME),
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserGroupConfKey(REAL_USER_NAME),
"*");
conf.set(
- DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME),
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserIpConfKey(REAL_USER_NAME),
PROXY_IP_RANGE);
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
@@ -324,16 +338,19 @@ public void testIPRange() {
public void testWithDuplicateProxyGroups() throws Exception {
Configuration conf = new Configuration();
conf.set(
- DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME),
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserGroupConfKey(REAL_USER_NAME),
StringUtils.join(",", Arrays.asList(GROUP_NAMES,GROUP_NAMES)));
conf.set(
- DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME),
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserIpConfKey(REAL_USER_NAME),
PROXY_IP);
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
Collection groupsToBeProxied =
ProxyUsers.getDefaultImpersonationProvider().getProxyGroups().get(
- DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME));
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserGroupConfKey(REAL_USER_NAME));
assertEquals (1,groupsToBeProxied.size());
}
@@ -342,16 +359,19 @@ public void testWithDuplicateProxyGroups() throws Exception {
public void testWithDuplicateProxyHosts() throws Exception {
Configuration conf = new Configuration();
conf.set(
- DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME),
+ DefaultImpersonationProvider.getTestProvider()
+ .getProxySuperuserGroupConfKey(REAL_USER_NAME),
StringUtils.join(",", Arrays.asList(GROUP_NAMES)));
conf.set(
- DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME),
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserIpConfKey(REAL_USER_NAME),
StringUtils.join(",", Arrays.asList(PROXY_IP,PROXY_IP)));
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
Collection hosts =
ProxyUsers.getDefaultImpersonationProvider().getProxyHosts().get(
- DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME));
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserIpConfKey(REAL_USER_NAME));
assertEquals (1,hosts.size());
}
@@ -391,26 +411,73 @@ public void testProxyUsersWithProviderOverride() throws Exception {
public void testWithProxyGroupsAndUsersWithSpaces() throws Exception {
Configuration conf = new Configuration();
conf.set(
- DefaultImpersonationProvider.getProxySuperuserUserConfKey(REAL_USER_NAME),
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserUserConfKey(REAL_USER_NAME),
StringUtils.join(",", Arrays.asList(PROXY_USER_NAME + " ",AUTHORIZED_PROXY_USER_NAME, "ONEMORE")));
conf.set(
- DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME),
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserGroupConfKey(REAL_USER_NAME),
StringUtils.join(",", Arrays.asList(GROUP_NAMES)));
conf.set(
- DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME),
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserIpConfKey(REAL_USER_NAME),
PROXY_IP);
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
Collection groupsToBeProxied =
ProxyUsers.getDefaultImpersonationProvider().getProxyGroups().get(
- DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME));
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserGroupConfKey(REAL_USER_NAME));
assertEquals (GROUP_NAMES.length, groupsToBeProxied.size());
}
+ @Test(expected = IllegalArgumentException.class)
+ public void testProxyUsersWithNullPrefix() throws Exception {
+ ProxyUsers.refreshSuperUserGroupsConfiguration(new Configuration(false),
+ null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testProxyUsersWithEmptyPrefix() throws Exception {
+ ProxyUsers.refreshSuperUserGroupsConfiguration(new Configuration(false),
+ "");
+ }
+
+ @Test
+ public void testProxyUsersWithCustomPrefix() throws Exception {
+ Configuration conf = new Configuration(false);
+ conf.set("x." + REAL_USER_NAME + ".users",
+ StringUtils.join(",", Arrays.asList(AUTHORIZED_PROXY_USER_NAME)));
+ conf.set("x." + REAL_USER_NAME+ ".hosts", PROXY_IP);
+ ProxyUsers.refreshSuperUserGroupsConfiguration(conf, "x");
+
+
+ // First try proxying a user that's allowed
+ UserGroupInformation realUserUgi = UserGroupInformation
+ .createRemoteUser(REAL_USER_NAME);
+ UserGroupInformation proxyUserUgi = UserGroupInformation.createProxyUserForTesting(
+ AUTHORIZED_PROXY_USER_NAME, realUserUgi, GROUP_NAMES);
+
+ // From good IP
+ assertAuthorized(proxyUserUgi, "1.2.3.4");
+ // From bad IP
+ assertNotAuthorized(proxyUserUgi, "1.2.3.5");
+
+ // Now try proxying a user that's not allowed
+ realUserUgi = UserGroupInformation.createRemoteUser(REAL_USER_NAME);
+ proxyUserUgi = UserGroupInformation.createProxyUserForTesting(
+ PROXY_USER_NAME, realUserUgi, GROUP_NAMES);
+
+ // From good IP
+ assertNotAuthorized(proxyUserUgi, "1.2.3.4");
+ // From bad IP
+ assertNotAuthorized(proxyUserUgi, "1.2.3.5");
+ }
+
private void assertNotAuthorized(UserGroupInformation proxyUgi, String host) {
try {
@@ -430,6 +497,11 @@ private void assertAuthorized(UserGroupInformation proxyUgi, String host) {
}
static class TestDummyImpersonationProvider implements ImpersonationProvider {
+
+ @Override
+ public void init(String configurationPrefix) {
+ }
+
/**
* Authorize a user (superuser) to impersonate another user (user1) if the
* superuser belongs to the group "sudo_user1" .
@@ -460,11 +532,13 @@ public Configuration getConf() {
public static void loadTest(String ipString, int testRange) {
Configuration conf = new Configuration();
conf.set(
- DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME),
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserGroupConfKey(REAL_USER_NAME),
StringUtils.join(",", Arrays.asList(GROUP_NAMES)));
conf.set(
- DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME),
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserIpConfKey(REAL_USER_NAME),
ipString
);
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSCacheKeyProvider.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSCacheKeyProvider.java
deleted file mode 100644
index e453c16980d..00000000000
--- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSCacheKeyProvider.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.crypto.key.kms.server;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import org.apache.hadoop.crypto.key.KeyProvider;
-
-import java.io.IOException;
-import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A KeyProvider
proxy implementation providing a short lived
- * cache for KeyVersions
to avoid burst of requests to hit the
- * underlying KeyProvider
.
- */
-public class KMSCacheKeyProvider extends KeyProvider {
- private final KeyProvider provider;
- private LoadingCache keyVersionCache;
- private LoadingCache currentKeyCache;
-
- private static class KeyNotFoundException extends Exception {
- private static final long serialVersionUID = 1L;
- }
-
- public KMSCacheKeyProvider(KeyProvider prov, long timeoutMillis) {
- this.provider = prov;
- keyVersionCache = CacheBuilder.newBuilder().expireAfterAccess(timeoutMillis,
- TimeUnit.MILLISECONDS).build(new CacheLoader() {
- @Override
- public KeyVersion load(String key) throws Exception {
- KeyVersion kv = provider.getKeyVersion(key);
- if (kv == null) {
- throw new KeyNotFoundException();
- }
- return kv;
- }
- });
- // for current key we don't want to go stale for more than 1 sec
- currentKeyCache = CacheBuilder.newBuilder().expireAfterWrite(1000,
- TimeUnit.MILLISECONDS).build(new CacheLoader() {
- @Override
- public KeyVersion load(String key) throws Exception {
- KeyVersion kv = provider.getCurrentKey(key);
- if (kv == null) {
- throw new KeyNotFoundException();
- }
- return kv;
- }
- });
- }
-
- @Override
- public KeyVersion getCurrentKey(String name) throws IOException {
- try {
- return currentKeyCache.get(name);
- } catch (ExecutionException ex) {
- Throwable cause = ex.getCause();
- if (cause instanceof KeyNotFoundException) {
- return null;
- } else if (cause instanceof IOException) {
- throw (IOException) cause;
- } else {
- throw new IOException(cause);
- }
- }
- }
-
- @Override
- public KeyVersion getKeyVersion(String versionName)
- throws IOException {
- try {
- return keyVersionCache.get(versionName);
- } catch (ExecutionException ex) {
- Throwable cause = ex.getCause();
- if (cause instanceof KeyNotFoundException) {
- return null;
- } else if (cause instanceof IOException) {
- throw (IOException) cause;
- } else {
- throw new IOException(cause);
- }
- }
- }
-
- @Override
- public List getKeys() throws IOException {
- return provider.getKeys();
- }
-
- @Override
- public List getKeyVersions(String name)
- throws IOException {
- return provider.getKeyVersions(name);
- }
-
- @Override
- public Metadata getMetadata(String name) throws IOException {
- return provider.getMetadata(name);
- }
-
- @Override
- public KeyVersion createKey(String name, byte[] material,
- Options options) throws IOException {
- return provider.createKey(name, material, options);
- }
-
- @Override
- public KeyVersion createKey(String name,
- Options options)
- throws NoSuchAlgorithmException, IOException {
- return provider.createKey(name, options);
- }
-
- @Override
- public void deleteKey(String name) throws IOException {
- provider.deleteKey(name);
- currentKeyCache.invalidate(name);
- // invalidating all key versions as we don't know which ones belonged to the
- // deleted key
- keyVersionCache.invalidateAll();
- }
-
- @Override
- public KeyVersion rollNewVersion(String name, byte[] material)
- throws IOException {
- KeyVersion key = provider.rollNewVersion(name, material);
- currentKeyCache.invalidate(name);
- return key;
- }
-
- @Override
- public KeyVersion rollNewVersion(String name)
- throws NoSuchAlgorithmException, IOException {
- KeyVersion key = provider.rollNewVersion(name);
- currentKeyCache.invalidate(name);
- return key;
- }
-
- @Override
- public void flush() throws IOException {
- provider.flush();
- }
-
- @Override
- public Metadata[] getKeysMetadata(String ... keyNames)
- throws IOException {
- return provider.getKeysMetadata(keyNames);
- }
-
- @Override
- public boolean isTransient() {
- return provider.isTransient();
- }
-
-}
diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java
index b2209d4cc77..e2b8fc4c093 100644
--- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java
+++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java
@@ -34,9 +34,21 @@ public class KMSConfiguration {
public static final String CONFIG_PREFIX = "hadoop.kms.";
+ // Property to Enable/Disable Caching
+ public static final String KEY_CACHE_ENABLE = CONFIG_PREFIX +
+ "cache.enable";
+ // Timeout for the Key and Metadata Cache
public static final String KEY_CACHE_TIMEOUT_KEY = CONFIG_PREFIX +
"cache.timeout.ms";
- public static final long KEY_CACHE_TIMEOUT_DEFAULT = 10 * 1000; // 10 secs
+ // TImeout for the Current Key cache
+ public static final String CURR_KEY_CACHE_TIMEOUT_KEY = CONFIG_PREFIX +
+ "current.key.cache.timeout.ms";
+
+ public static final boolean KEY_CACHE_ENABLE_DEFAULT = true;
+ // 10 mins
+ public static final long KEY_CACHE_TIMEOUT_DEFAULT = 10 * 60 * 1000;
+ // 30 secs
+ public static final long CURR_KEY_CACHE_TIMEOUT_DEFAULT = 30 * 1000;
static Configuration getConfiguration(boolean loadHadoopDefaults,
String ... resources) {
diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java
index 046753e55be..88ea8c4fa42 100644
--- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java
+++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java
@@ -22,6 +22,7 @@
import com.codahale.metrics.MetricRegistry;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.CachingKeyProvider;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.http.HttpServer2;
@@ -150,10 +151,17 @@ public void contextInitialized(ServletContextEvent sce) {
kmsConf.get(KeyProviderFactory.KEY_PROVIDER_PATH));
}
keyProvider = providers.get(0);
- long timeOutMillis =
- kmsConf.getLong(KMSConfiguration.KEY_CACHE_TIMEOUT_KEY,
- KMSConfiguration.KEY_CACHE_TIMEOUT_DEFAULT);
- keyProvider = new KMSCacheKeyProvider(keyProvider, timeOutMillis);
+ if (kmsConf.getBoolean(KMSConfiguration.KEY_CACHE_ENABLE,
+ KMSConfiguration.KEY_CACHE_ENABLE_DEFAULT)) {
+ long keyTimeOutMillis =
+ kmsConf.getLong(KMSConfiguration.KEY_CACHE_TIMEOUT_KEY,
+ KMSConfiguration.KEY_CACHE_TIMEOUT_DEFAULT);
+ long currKeyTimeOutMillis =
+ kmsConf.getLong(KMSConfiguration.CURR_KEY_CACHE_TIMEOUT_KEY,
+ KMSConfiguration.CURR_KEY_CACHE_TIMEOUT_DEFAULT);
+ keyProvider = new CachingKeyProvider(keyProvider, keyTimeOutMillis,
+ currKeyTimeOutMillis);
+ }
LOG.info("KMS Started");
} catch (Throwable ex) {
diff --git a/hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm b/hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm
index 329fd516e30..297d0325d01 100644
--- a/hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm
+++ b/hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm
@@ -72,22 +72,35 @@ Hadoop Key Management Server (KMS) - Documentation Sets ${project.version}
KMS caches keys for short period of time to avoid excessive hits to the
underlying key provider.
- The cache is used with the following 2 methods only, <<>>
- and <<>>.
+ The Cache is enabled by default (can be dissabled by setting the
+ <<>> boolean property to false)
+
+ The cache is used with the following 3 methods only, <<>>
+ and <<>> and <<>>.
For the <<>> method, cached entries are kept for a maximum
- of 1000 millisecond regardless the number of times the key is being access
+ of 30000 millisecond regardless the number of times the key is being access
(to avoid stale keys to be considered current).
For the <<>> method, cached entries are kept with a default
- inactivity timeout of 10000 milliseconds. This time out is configurable via
- the following property in the <<>> configuration
- file:
+ inactivity timeout of 600000 milliseconds (10 mins). This time out is
+ configurable via the following property in the <<>>
+ configuration file:
+---+
+
+ hadoop.kms.cache.enable
+ true
+
+
hadoop.kms.cache.timeout.ms
- 10000
+ 600000
+
+
+
+ hadoop.kms.current.key.cache.timeout.ms
+ 30000
+---+
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestReaddir.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestReaddir.java
index 617c31d8005..05ba2db05a7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestReaddir.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestReaddir.java
@@ -72,11 +72,11 @@ public class TestReaddir {
public static void setup() throws Exception {
String currentUser = System.getProperty("user.name");
config.set(
- DefaultImpersonationProvider.getProxySuperuserGroupConfKey(currentUser),
- "*");
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserGroupConfKey(currentUser), "*");
config.set(
- DefaultImpersonationProvider.getProxySuperuserIpConfKey(currentUser),
- "*");
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserIpConfKey(currentUser), "*");
ProxyUsers.refreshSuperUserGroupsConfiguration(config);
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
cluster.waitActive();
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
index 3945b298f5e..32f9b2bcddf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
@@ -312,10 +312,12 @@ public void testWriteStableHow() throws IOException, InterruptedException {
System.getProperty("user.name"));
String currentUser = System.getProperty("user.name");
config.set(
- DefaultImpersonationProvider.getProxySuperuserGroupConfKey(currentUser),
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserGroupConfKey(currentUser),
"*");
config.set(
- DefaultImpersonationProvider.getProxySuperuserIpConfKey(currentUser),
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserIpConfKey(currentUser),
"*");
ProxyUsers.refreshSuperUserGroupsConfiguration(config);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 12f10c549e1..3ee4c827b54 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -321,6 +321,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6700. BlockPlacementPolicy shoud choose storage but not datanode for
deletion. (szetszwo)
+ HDFS-6616. Add exclude-datanodes feature to WebHDFS redirection so that it
+ will not redirect retries to the same datanode. (zhaoyunjiong via szetszwo)
+
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
@@ -612,6 +615,8 @@ Release 2.5.0 - UNRELEASED
HDFS-6583. Remove clientNode in FileUnderConstructionFeature. (wheat9)
+ HDFS-6599. 2.4 addBlock is 10 to 20 times slower compared to 0.23 (daryn)
+
BUG FIXES
HDFS-6112. NFS Gateway docs are incorrect for allowed hosts configuration.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
index cb0f081b99f..9fcada734ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
@@ -339,7 +339,7 @@ public String getDatanodeReport() {
buffer.append("Cache Remaining: " +cr+ " ("+StringUtils.byteDesc(cr)+")"+"\n");
buffer.append("Cache Used%: "+percent2String(cacheUsedPercent) + "\n");
buffer.append("Cache Remaining%: "+percent2String(cacheRemainingPercent) + "\n");
-
+ buffer.append("Xceivers: "+getXceiverCount()+"\n");
buffer.append("Last contact: "+new Date(lastUpdate)+"\n");
return buffer.toString();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index e5536a5795d..445dd159f2e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1456,10 +1456,10 @@ int computeReplicationWorkForBlocks(List> blocksToReplicate) {
/** Choose target for WebHDFS redirection. */
public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src,
- DatanodeDescriptor clientnode, long blocksize) {
+ DatanodeDescriptor clientnode, Set excludes, long blocksize) {
return blockplacement.chooseTarget(src, 1, clientnode,
- Collections.emptyList(), false, null, blocksize,
- storagePolicySuite.getDefaultPolicy());
+ Collections.emptyList(), false, excludes,
+ blocksize, storagePolicySuite.getDefaultPolicy());
}
/** Choose target for getting additional datanodes for an existing pipeline. */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
index 6345a2c7c48..406f065929c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
@@ -642,15 +642,11 @@ private boolean isGoodTarget(DatanodeStorageInfo storage,
// check the communication traffic of the target machine
if (considerLoad) {
- double avgLoad = 0;
- if (stats != null) {
- int size = stats.getNumDatanodesInService();
- if (size != 0) {
- avgLoad = (double)stats.getTotalLoad()/size;
- }
- }
- if (node.getXceiverCount() > (2.0 * avgLoad)) {
- logNodeIsNotChosen(storage, "the node is too busy ");
+ final double maxLoad = 2.0 * stats.getInServiceXceiverAverage();
+ final int nodeLoad = node.getXceiverCount();
+ if (nodeLoad > maxLoad) {
+ logNodeIsNotChosen(storage,
+ "the node is too busy (load:"+nodeLoad+" > "+maxLoad+") ");
return false;
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index aea34ecbc90..791c6dff5be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -820,7 +820,9 @@ boolean checkDecommissionState(DatanodeDescriptor node) {
}
/** Start decommissioning the specified datanode. */
- private void startDecommission(DatanodeDescriptor node) {
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ public void startDecommission(DatanodeDescriptor node) {
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
for (DatanodeStorageInfo storage : node.getStorageInfos()) {
LOG.info("Start Decommissioning " + node + " " + storage
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
index 9ccc5b1ff4a..c9bc3e5ea67 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
@@ -52,6 +52,12 @@ public interface DatanodeStatistics {
/** @return the xceiver count */
public int getXceiverCount();
+ /** @return average xceiver count for non-decommission(ing|ed) nodes */
+ public int getInServiceXceiverCount();
+
+ /** @return number of non-decommission(ing|ed) nodes */
+ public int getNumDatanodesInService();
+
/**
* @return the total used space by data nodes for non-DFS purposes
* such as storing temporary files on the local file system
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
index e7017a30a93..901f7e3653d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
@@ -150,6 +150,16 @@ public synchronized int getXceiverCount() {
return stats.xceiverCount;
}
+ @Override
+ public synchronized int getInServiceXceiverCount() {
+ return stats.nodesInServiceXceiverCount;
+ }
+
+ @Override
+ public synchronized int getNumDatanodesInService() {
+ return stats.nodesInService;
+ }
+
@Override
public synchronized long getCacheCapacity() {
return stats.cacheCapacity;
@@ -178,7 +188,7 @@ public synchronized int getExpiredHeartbeats() {
}
synchronized void register(final DatanodeDescriptor d) {
- if (!datanodes.contains(d)) {
+ if (!d.isAlive) {
addDatanode(d);
//update its timestamp
@@ -191,6 +201,8 @@ synchronized DatanodeDescriptor[] getDatanodes() {
}
synchronized void addDatanode(final DatanodeDescriptor d) {
+ // update in-service node count
+ stats.add(d);
datanodes.add(d);
d.isAlive = true;
}
@@ -323,6 +335,9 @@ private static class Stats {
private long cacheCapacity = 0L;
private long cacheUsed = 0L;
+ private int nodesInService = 0;
+ private int nodesInServiceXceiverCount = 0;
+
private int expiredHeartbeats = 0;
private void add(final DatanodeDescriptor node) {
@@ -330,6 +345,8 @@ private void add(final DatanodeDescriptor node) {
blockPoolUsed += node.getBlockPoolUsed();
xceiverCount += node.getXceiverCount();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+ nodesInService++;
+ nodesInServiceXceiverCount += node.getXceiverCount();
capacityTotal += node.getCapacity();
capacityRemaining += node.getRemaining();
} else {
@@ -344,6 +361,8 @@ private void subtract(final DatanodeDescriptor node) {
blockPoolUsed -= node.getBlockPoolUsed();
xceiverCount -= node.getXceiverCount();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+ nodesInService--;
+ nodesInServiceXceiverCount -= node.getXceiverCount();
capacityTotal -= node.getCapacity();
capacityRemaining -= node.getRemaining();
} else {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
index 676aa0826c0..1a859a7b93e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
@@ -48,6 +48,15 @@ public interface FSClusterStats {
* @return Number of datanodes that are both alive and not decommissioned.
*/
public int getNumDatanodesInService();
+
+ /**
+ * an indication of the average load of non-decommission(ing|ed) nodes
+ * eligible for block placement
+ *
+ * @return average of the in service number of block transfers and block
+ * writes that are currently occurring on the cluster.
+ */
+ public double getInServiceXceiverAverage();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 53f8a18ec57..1aa308cebe2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -7323,7 +7323,18 @@ public boolean isAvoidingStaleDataNodesForWrite() {
@Override // FSClusterStats
public int getNumDatanodesInService() {
- return getNumLiveDataNodes() - getNumDecomLiveDataNodes();
+ return datanodeStatistics.getNumDatanodesInService();
+ }
+
+ @Override // for block placement strategy
+ public double getInServiceXceiverAverage() {
+ double avgLoad = 0;
+ final int nodes = getNumDatanodesInService();
+ if (nodes != 0) {
+ final int xceivers = datanodeStatistics.getInServiceXceiverCount();
+ avgLoad = (double)xceivers/nodes;
+ }
+ return avgLoad;
}
public SnapshotManager getSnapshotManager() {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
index 3ecc3ad5d57..858ce6e761b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
@@ -27,6 +27,7 @@
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.List;
import javax.servlet.ServletContext;
@@ -82,6 +83,7 @@
import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
import org.apache.hadoop.hdfs.web.resources.DestinationParam;
import org.apache.hadoop.hdfs.web.resources.DoAsParam;
+import org.apache.hadoop.hdfs.web.resources.ExcludeDatanodesParam;
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
import org.apache.hadoop.hdfs.web.resources.GroupParam;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
@@ -111,11 +113,13 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
+import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
@@ -188,12 +192,26 @@ private static NamenodeProtocols getRPCServer(NameNode namenode)
}
return np;
}
-
+
@VisibleForTesting
static DatanodeInfo chooseDatanode(final NameNode namenode,
final String path, final HttpOpParam.Op op, final long openOffset,
- final long blocksize) throws IOException {
+ final long blocksize, final String excludeDatanodes) throws IOException {
final BlockManager bm = namenode.getNamesystem().getBlockManager();
+
+ HashSet excludes = new HashSet();
+ if (excludeDatanodes != null) {
+ for (String host : StringUtils
+ .getTrimmedStringCollection(excludeDatanodes)) {
+ int idx = host.indexOf(":");
+ if (idx != -1) {
+ excludes.add(bm.getDatanodeManager().getDatanodeByXferAddr(
+ host.substring(0, idx), Integer.parseInt(host.substring(idx + 1))));
+ } else {
+ excludes.add(bm.getDatanodeManager().getDatanodeByHost(host));
+ }
+ }
+ }
if (op == PutOpParam.Op.CREATE) {
//choose a datanode near to client
@@ -201,7 +219,7 @@ static DatanodeInfo chooseDatanode(final NameNode namenode,
).getDatanodeByHost(getRemoteAddress());
if (clientNode != null) {
final DatanodeStorageInfo[] storages = bm.chooseTarget4WebHDFS(
- path, clientNode, blocksize);
+ path, clientNode, excludes, blocksize);
if (storages.length > 0) {
return storages[0].getDatanodeDescriptor();
}
@@ -228,7 +246,7 @@ static DatanodeInfo chooseDatanode(final NameNode namenode,
final LocatedBlocks locations = np.getBlockLocations(path, offset, 1);
final int count = locations.locatedBlockCount();
if (count > 0) {
- return bestNode(locations.get(0).getLocations());
+ return bestNode(locations.get(0).getLocations(), excludes);
}
}
}
@@ -242,11 +260,14 @@ static DatanodeInfo chooseDatanode(final NameNode namenode,
* sorted based on availability and network distances, thus it is sufficient
* to return the first element of the node here.
*/
- private static DatanodeInfo bestNode(DatanodeInfo[] nodes) throws IOException {
- if (nodes.length == 0 || nodes[0].isDecommissioned()) {
- throw new IOException("No active nodes contain this block");
+ private static DatanodeInfo bestNode(DatanodeInfo[] nodes,
+ HashSet excludes) throws IOException {
+ for (DatanodeInfo dn: nodes) {
+ if (false == dn.isDecommissioned() && false == excludes.contains(dn)) {
+ return dn;
+ }
}
- return nodes[0];
+ throw new IOException("No active nodes contain this block");
}
private Token extends TokenIdentifier> generateDelegationToken(
@@ -265,11 +286,12 @@ private URI redirectURI(final NameNode namenode,
final UserGroupInformation ugi, final DelegationParam delegation,
final UserParam username, final DoAsParam doAsUser,
final String path, final HttpOpParam.Op op, final long openOffset,
- final long blocksize,
+ final long blocksize, final String excludeDatanodes,
final Param, ?>... parameters) throws URISyntaxException, IOException {
final DatanodeInfo dn;
try {
- dn = chooseDatanode(namenode, path, op, openOffset, blocksize);
+ dn = chooseDatanode(namenode, path, op, openOffset, blocksize,
+ excludeDatanodes);
} catch (InvalidTopologyException ite) {
throw new IOException("Failed to find datanode, suggest to check cluster health.", ite);
}
@@ -356,13 +378,15 @@ public Response putRoot(
@QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT)
final SnapshotNameParam snapshotName,
@QueryParam(OldSnapshotNameParam.NAME) @DefaultValue(OldSnapshotNameParam.DEFAULT)
- final OldSnapshotNameParam oldSnapshotName
- )throws IOException, InterruptedException {
+ final OldSnapshotNameParam oldSnapshotName,
+ @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+ final ExcludeDatanodesParam excludeDatanodes
+ ) throws IOException, InterruptedException {
return put(ugi, delegation, username, doAsUser, ROOT, op, destination,
owner, group, permission, overwrite, bufferSize, replication,
blockSize, modificationTime, accessTime, renameOptions, createParent,
delegationTokenArgument, aclPermission, xattrName, xattrValue,
- xattrSetFlag, snapshotName, oldSnapshotName);
+ xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes);
}
/** Handle HTTP PUT request. */
@@ -418,14 +442,16 @@ public Response put(
@QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT)
final SnapshotNameParam snapshotName,
@QueryParam(OldSnapshotNameParam.NAME) @DefaultValue(OldSnapshotNameParam.DEFAULT)
- final OldSnapshotNameParam oldSnapshotName
+ final OldSnapshotNameParam oldSnapshotName,
+ @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+ final ExcludeDatanodesParam excludeDatanodes
) throws IOException, InterruptedException {
init(ugi, delegation, username, doAsUser, path, op, destination, owner,
group, permission, overwrite, bufferSize, replication, blockSize,
modificationTime, accessTime, renameOptions, delegationTokenArgument,
aclPermission, xattrName, xattrValue, xattrSetFlag, snapshotName,
- oldSnapshotName);
+ oldSnapshotName, excludeDatanodes);
return ugi.doAs(new PrivilegedExceptionAction() {
@Override
@@ -436,7 +462,7 @@ public Response run() throws IOException, URISyntaxException {
permission, overwrite, bufferSize, replication, blockSize,
modificationTime, accessTime, renameOptions, createParent,
delegationTokenArgument, aclPermission, xattrName, xattrValue,
- xattrSetFlag, snapshotName, oldSnapshotName);
+ xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes);
} finally {
reset();
}
@@ -469,7 +495,8 @@ private Response put(
final XAttrValueParam xattrValue,
final XAttrSetFlagParam xattrSetFlag,
final SnapshotNameParam snapshotName,
- final OldSnapshotNameParam oldSnapshotName
+ final OldSnapshotNameParam oldSnapshotName,
+ final ExcludeDatanodesParam exclDatanodes
) throws IOException, URISyntaxException {
final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
@@ -479,9 +506,10 @@ private Response put(
switch(op.getValue()) {
case CREATE:
{
- final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
- fullpath, op.getValue(), -1L, blockSize.getValue(conf),
- permission, overwrite, bufferSize, replication, blockSize);
+ final URI uri = redirectURI(namenode, ugi, delegation, username,
+ doAsUser, fullpath, op.getValue(), -1L, blockSize.getValue(conf),
+ exclDatanodes.getValue(), permission, overwrite, bufferSize,
+ replication, blockSize);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case MKDIRS:
@@ -614,9 +642,12 @@ public Response postRoot(
@QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT)
final ConcatSourcesParam concatSrcs,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
- final BufferSizeParam bufferSize
+ final BufferSizeParam bufferSize,
+ @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+ final ExcludeDatanodesParam excludeDatanodes
) throws IOException, InterruptedException {
- return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs, bufferSize);
+ return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs,
+ bufferSize, excludeDatanodes);
}
/** Handle HTTP POST request. */
@@ -638,17 +669,21 @@ public Response post(
@QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT)
final ConcatSourcesParam concatSrcs,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
- final BufferSizeParam bufferSize
+ final BufferSizeParam bufferSize,
+ @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+ final ExcludeDatanodesParam excludeDatanodes
) throws IOException, InterruptedException {
- init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize);
+ init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize,
+ excludeDatanodes);
return ugi.doAs(new PrivilegedExceptionAction() {
@Override
public Response run() throws IOException, URISyntaxException {
try {
return post(ugi, delegation, username, doAsUser,
- path.getAbsolutePath(), op, concatSrcs, bufferSize);
+ path.getAbsolutePath(), op, concatSrcs, bufferSize,
+ excludeDatanodes);
} finally {
reset();
}
@@ -664,15 +699,17 @@ private Response post(
final String fullpath,
final PostOpParam op,
final ConcatSourcesParam concatSrcs,
- final BufferSizeParam bufferSize
+ final BufferSizeParam bufferSize,
+ final ExcludeDatanodesParam excludeDatanodes
) throws IOException, URISyntaxException {
final NameNode namenode = (NameNode)context.getAttribute("name.node");
switch(op.getValue()) {
case APPEND:
{
- final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
- fullpath, op.getValue(), -1L, -1L, bufferSize);
+ final URI uri = redirectURI(namenode, ugi, delegation, username,
+ doAsUser, fullpath, op.getValue(), -1L, -1L,
+ excludeDatanodes.getValue(), bufferSize);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case CONCAT:
@@ -710,10 +747,12 @@ public Response getRoot(
@QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT)
final List xattrNames,
@QueryParam(XAttrEncodingParam.NAME) @DefaultValue(XAttrEncodingParam.DEFAULT)
- final XAttrEncodingParam xattrEncoding
+ final XAttrEncodingParam xattrEncoding,
+ @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+ final ExcludeDatanodesParam excludeDatanodes
) throws IOException, InterruptedException {
return get(ugi, delegation, username, doAsUser, ROOT, op, offset, length,
- renewer, bufferSize, xattrNames, xattrEncoding);
+ renewer, bufferSize, xattrNames, xattrEncoding, excludeDatanodes);
}
/** Handle HTTP GET request. */
@@ -742,11 +781,13 @@ public Response get(
@QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT)
final List xattrNames,
@QueryParam(XAttrEncodingParam.NAME) @DefaultValue(XAttrEncodingParam.DEFAULT)
- final XAttrEncodingParam xattrEncoding
+ final XAttrEncodingParam xattrEncoding,
+ @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+ final ExcludeDatanodesParam excludeDatanodes
) throws IOException, InterruptedException {
init(ugi, delegation, username, doAsUser, path, op, offset, length,
- renewer, bufferSize, xattrEncoding);
+ renewer, bufferSize, xattrEncoding, excludeDatanodes);
return ugi.doAs(new PrivilegedExceptionAction() {
@Override
@@ -754,7 +795,7 @@ public Response run() throws IOException, URISyntaxException {
try {
return get(ugi, delegation, username, doAsUser,
path.getAbsolutePath(), op, offset, length, renewer, bufferSize,
- xattrNames, xattrEncoding);
+ xattrNames, xattrEncoding, excludeDatanodes);
} finally {
reset();
}
@@ -774,7 +815,8 @@ private Response get(
final RenewerParam renewer,
final BufferSizeParam bufferSize,
final List xattrNames,
- final XAttrEncodingParam xattrEncoding
+ final XAttrEncodingParam xattrEncoding,
+ final ExcludeDatanodesParam excludeDatanodes
) throws IOException, URISyntaxException {
final NameNode namenode = (NameNode)context.getAttribute("name.node");
final NamenodeProtocols np = getRPCServer(namenode);
@@ -782,8 +824,9 @@ private Response get(
switch(op.getValue()) {
case OPEN:
{
- final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
- fullpath, op.getValue(), offset.getValue(), -1L, offset, length, bufferSize);
+ final URI uri = redirectURI(namenode, ugi, delegation, username,
+ doAsUser, fullpath, op.getValue(), offset.getValue(), -1L,
+ excludeDatanodes.getValue(), offset, length, bufferSize);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case GET_BLOCK_LOCATIONS:
@@ -819,7 +862,7 @@ private Response get(
case GETFILECHECKSUM:
{
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
- fullpath, op.getValue(), -1L, -1L);
+ fullpath, op.getValue(), -1L, -1L, null);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case GETDELEGATIONTOKEN:
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
index 6eb09f61340..78062ad0b5f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
@@ -448,6 +448,7 @@ abstract class AbstractRunner {
protected final HttpOpParam.Op op;
private final boolean redirected;
+ protected ExcludeDatanodesParam excludeDatanodes = new ExcludeDatanodesParam("");
private boolean checkRetry;
@@ -499,6 +500,10 @@ public T run() throws IOException {
* a DN such as open and checksum
*/
private HttpURLConnection connect(URL url) throws IOException {
+ //redirect hostname and port
+ String redirectHost = null;
+
+
// resolve redirects for a DN operation unless already resolved
if (op.getRedirect() && !redirected) {
final HttpOpParam.Op redirectOp =
@@ -511,11 +516,24 @@ private HttpURLConnection connect(URL url) throws IOException {
try {
validateResponse(redirectOp, conn, false);
url = new URL(conn.getHeaderField("Location"));
+ redirectHost = url.getHost() + ":" + url.getPort();
} finally {
conn.disconnect();
}
}
- return connect(op, url);
+ try {
+ return connect(op, url);
+ } catch (IOException ioe) {
+ if (redirectHost != null) {
+ if (excludeDatanodes.getValue() != null) {
+ excludeDatanodes = new ExcludeDatanodesParam(redirectHost + ","
+ + excludeDatanodes.getValue());
+ } else {
+ excludeDatanodes = new ExcludeDatanodesParam(redirectHost);
+ }
+ }
+ throw ioe;
+ }
}
private HttpURLConnection connect(final HttpOpParam.Op op, final URL url)
@@ -652,7 +670,14 @@ abstract class AbstractFsPathRunner extends AbstractRunner {
@Override
protected URL getUrl() throws IOException {
- return toUrl(op, fspath, parameters);
+ if (excludeDatanodes.getValue() != null) {
+ Param, ?>[] tmpParam = new Param, ?>[parameters.length + 1];
+ System.arraycopy(parameters, 0, tmpParam, 0, parameters.length);
+ tmpParam[parameters.length] = excludeDatanodes;
+ return toUrl(op, fspath, tmpParam);
+ } else {
+ return toUrl(op, fspath, parameters);
+ }
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExcludeDatanodesParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExcludeDatanodesParam.java
new file mode 100644
index 00000000000..3f44fae948a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExcludeDatanodesParam.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.resources;
+
+
+/** Exclude datanodes param */
+public class ExcludeDatanodesParam extends StringParam {
+ /** Parameter name. */
+ public static final String NAME = "excludedatanodes";
+ /** Default parameter value. */
+ public static final String DEFAULT = "";
+
+ private static final Domain DOMAIN = new Domain(NAME, null);
+
+ /**
+ * Constructor.
+ * @param str a string representation of the parameter value.
+ */
+ public ExcludeDatanodesParam(final String str) {
+ super(DOMAIN, str == null || str.equals(DEFAULT)? null: DOMAIN.parse(str));
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java
index fa29b4b03f2..e6493a24ff3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java
@@ -89,7 +89,8 @@ private static void configureSuperUserIPAddresses(Configuration conf,
builder.append("127.0.1.1,");
builder.append(InetAddress.getLocalHost().getCanonicalHostName());
LOG.info("Local Ip addresses: " + builder.toString());
- conf.setStrings(DefaultImpersonationProvider.getProxySuperuserIpConfKey(superUserShortName),
+ conf.setStrings(DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserIpConfKey(superUserShortName),
builder.toString());
}
@@ -101,7 +102,8 @@ public static void setUp() throws Exception {
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY, 10000);
config.setLong(
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, 5000);
- config.setStrings(DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER),
+ config.setStrings(DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserGroupConfKey(REAL_USER),
"group1");
config.setBoolean(
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
index 30380c998ff..0273da081f4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
@@ -39,6 +39,7 @@
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.test.PathUtils;
@@ -101,6 +102,7 @@ public static void setupCluster() throws IOException {
}
}
+ private final double EPSILON = 0.0001;
/**
* Tests that chooseTarget with considerLoad set to true correctly calculates
* load with decommissioned nodes.
@@ -109,14 +111,6 @@ public static void setupCluster() throws IOException {
public void testChooseTargetWithDecomNodes() throws IOException {
namenode.getNamesystem().writeLock();
try {
- // Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget()
- // returns false
- for (int i = 0; i < 3; i++) {
- DatanodeInfo d = dnManager.getDatanodeByXferAddr(
- dnrList.get(i).getIpAddr(),
- dnrList.get(i).getXferPort());
- d.setDecommissioned();
- }
String blockPoolId = namenode.getNamesystem().getBlockPoolId();
dnManager.handleHeartbeat(dnrList.get(3),
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[3]),
@@ -133,6 +127,20 @@ public void testChooseTargetWithDecomNodes() throws IOException {
blockPoolId, dataNodes[5].getCacheCapacity(),
dataNodes[5].getCacheRemaining(),
4, 0, 0);
+ // value in the above heartbeats
+ final int load = 2 + 4 + 4;
+
+ FSNamesystem fsn = namenode.getNamesystem();
+ assertEquals((double)load/6, fsn.getInServiceXceiverAverage(), EPSILON);
+
+ // Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget()
+ // returns false
+ for (int i = 0; i < 3; i++) {
+ DatanodeDescriptor d = dnManager.getDatanode(dnrList.get(i));
+ dnManager.startDecommission(d);
+ d.setDecommissioned();
+ }
+ assertEquals((double)load/3, fsn.getInServiceXceiverAverage(), EPSILON);
// Call chooseTarget()
DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestJspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestJspHelper.java
index 827a33aa5ba..d0d8d3e3189 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestJspHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestJspHelper.java
@@ -285,8 +285,10 @@ public void testGetProxyUgi() throws IOException {
String user = "TheNurse";
conf.set(DFSConfigKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
- conf.set(DefaultImpersonationProvider.getProxySuperuserGroupConfKey(realUser), "*");
- conf.set(DefaultImpersonationProvider.getProxySuperuserIpConfKey(realUser), "*");
+ conf.set(DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserGroupConfKey(realUser), "*");
+ conf.set(DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserIpConfKey(realUser), "*");
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
UserGroupInformation.setConfiguration(conf);
UserGroupInformation ugi;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
index 2883f99f692..48a4b3c6c76 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
@@ -18,9 +18,11 @@
package org.apache.hadoop.hdfs.server.namenode;
-import static org.junit.Assert.assertTrue;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY;
+import static org.junit.Assert.*;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -28,12 +30,21 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.junit.Test;
@@ -153,4 +164,177 @@ public void testVolumeSize() throws Exception {
if (cluster != null) {cluster.shutdown();}
}
}
+
+ private static final float EPSILON = 0.0001f;
+ @Test
+ public void testXceiverCount() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ // don't waste time retrying if close fails
+ conf.setInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 0);
+ MiniDFSCluster cluster = null;
+
+ final int nodes = 8;
+ final int fileCount = 5;
+ final short fileRepl = 3;
+
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(nodes).build();
+ cluster.waitActive();
+
+ final FSNamesystem namesystem = cluster.getNamesystem();
+ final DatanodeManager dnm = namesystem.getBlockManager().getDatanodeManager();
+ List datanodes = cluster.getDataNodes();
+ final DistributedFileSystem fs = cluster.getFileSystem();
+
+ // trigger heartbeats in case not already sent
+ triggerHeartbeats(datanodes);
+
+ // check that all nodes are live and in service
+ int expectedTotalLoad = nodes; // xceiver server adds 1 to load
+ int expectedInServiceNodes = nodes;
+ int expectedInServiceLoad = nodes;
+ assertEquals(nodes, namesystem.getNumLiveDataNodes());
+ assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
+ assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
+ assertEquals((double)expectedInServiceLoad/expectedInServiceLoad,
+ namesystem.getInServiceXceiverAverage(), EPSILON);
+
+ // shutdown half the nodes and force a heartbeat check to ensure
+ // counts are accurate
+ for (int i=0; i < nodes/2; i++) {
+ DataNode dn = datanodes.get(i);
+ DatanodeDescriptor dnd = dnm.getDatanode(dn.getDatanodeId());
+ dn.shutdown();
+ dnd.setLastUpdate(0L);
+ BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager());
+ expectedInServiceNodes--;
+ assertEquals(expectedInServiceNodes, namesystem.getNumLiveDataNodes());
+ assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
+ }
+
+ // restart the nodes to verify that counts are correct after
+ // node re-registration
+ cluster.restartDataNodes();
+ cluster.waitActive();
+ datanodes = cluster.getDataNodes();
+ expectedInServiceNodes = nodes;
+ assertEquals(nodes, datanodes.size());
+ assertEquals(nodes, namesystem.getNumLiveDataNodes());
+ assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
+ assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
+ assertEquals((double)expectedInServiceLoad/expectedInServiceLoad,
+ namesystem.getInServiceXceiverAverage(), EPSILON);
+
+ // create streams and hsync to force datastreamers to start
+ DFSOutputStream[] streams = new DFSOutputStream[fileCount];
+ for (int i=0; i < fileCount; i++) {
+ streams[i] = (DFSOutputStream)fs.create(new Path("/f"+i), fileRepl)
+ .getWrappedStream();
+ streams[i].write("1".getBytes());
+ streams[i].hsync();
+ // the load for writers is 2 because both the write xceiver & packet
+ // responder threads are counted in the load
+ expectedTotalLoad += 2*fileRepl;
+ expectedInServiceLoad += 2*fileRepl;
+ }
+ // force nodes to send load update
+ triggerHeartbeats(datanodes);
+ assertEquals(nodes, namesystem.getNumLiveDataNodes());
+ assertEquals(expectedInServiceNodes,
+ namesystem.getNumDatanodesInService());
+ assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
+ assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
+ namesystem.getInServiceXceiverAverage(), EPSILON);
+
+ // decomm a few nodes, substract their load from the expected load,
+ // trigger heartbeat to force load update
+ for (int i=0; i < fileRepl; i++) {
+ expectedInServiceNodes--;
+ DatanodeDescriptor dnd =
+ dnm.getDatanode(datanodes.get(i).getDatanodeId());
+ expectedInServiceLoad -= dnd.getXceiverCount();
+ dnm.startDecommission(dnd);
+ DataNodeTestUtils.triggerHeartbeat(datanodes.get(i));
+ Thread.sleep(100);
+ assertEquals(nodes, namesystem.getNumLiveDataNodes());
+ assertEquals(expectedInServiceNodes,
+ namesystem.getNumDatanodesInService());
+ assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
+ assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
+ namesystem.getInServiceXceiverAverage(), EPSILON);
+ }
+
+ // check expected load while closing each stream. recalc expected
+ // load based on whether the nodes in the pipeline are decomm
+ for (int i=0; i < fileCount; i++) {
+ int decomm = 0;
+ for (DatanodeInfo dni : streams[i].getPipeline()) {
+ DatanodeDescriptor dnd = dnm.getDatanode(dni);
+ expectedTotalLoad -= 2;
+ if (dnd.isDecommissionInProgress() || dnd.isDecommissioned()) {
+ decomm++;
+ } else {
+ expectedInServiceLoad -= 2;
+ }
+ }
+ try {
+ streams[i].close();
+ } catch (IOException ioe) {
+ // nodes will go decommissioned even if there's a UC block whose
+ // other locations are decommissioned too. we'll ignore that
+ // bug for now
+ if (decomm < fileRepl) {
+ throw ioe;
+ }
+ }
+ triggerHeartbeats(datanodes);
+ // verify node count and loads
+ assertEquals(nodes, namesystem.getNumLiveDataNodes());
+ assertEquals(expectedInServiceNodes,
+ namesystem.getNumDatanodesInService());
+ assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
+ assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
+ namesystem.getInServiceXceiverAverage(), EPSILON);
+ }
+
+ // shutdown each node, verify node counts based on decomm state
+ for (int i=0; i < nodes; i++) {
+ DataNode dn = datanodes.get(i);
+ dn.shutdown();
+ // force it to appear dead so live count decreases
+ DatanodeDescriptor dnDesc = dnm.getDatanode(dn.getDatanodeId());
+ dnDesc.setLastUpdate(0L);
+ BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager());
+ assertEquals(nodes-1-i, namesystem.getNumLiveDataNodes());
+ // first few nodes are already out of service
+ if (i >= fileRepl) {
+ expectedInServiceNodes--;
+ }
+ assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
+
+ // live nodes always report load of 1. no nodes is load 0
+ double expectedXceiverAvg = (i == nodes-1) ? 0.0 : 1.0;
+ assertEquals((double)expectedXceiverAvg,
+ namesystem.getInServiceXceiverAverage(), EPSILON);
+ }
+
+ // final sanity check
+ assertEquals(0, namesystem.getNumLiveDataNodes());
+ assertEquals(0, namesystem.getNumDatanodesInService());
+ assertEquals(0.0, namesystem.getTotalLoad(), EPSILON);
+ assertEquals(0.0, namesystem.getInServiceXceiverAverage(), EPSILON);
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ private void triggerHeartbeats(List datanodes)
+ throws IOException, InterruptedException {
+ for (DataNode dn : datanodes) {
+ DataNodeTestUtils.triggerHeartbeat(dn);
+ }
+ Thread.sleep(100);
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java
index 9fe3deab1bd..c1169dc3b19 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java
@@ -92,7 +92,7 @@ public void testDataLocality() throws Exception {
//The chosen datanode must be the same as the client address
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
- namenode, f, PutOpParam.Op.CREATE, -1L, blocksize);
+ namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, null);
Assert.assertEquals(ipAddr, chosen.getIpAddr());
}
}
@@ -117,23 +117,104 @@ public void testDataLocality() throws Exception {
{ //test GETFILECHECKSUM
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
- namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize);
+ namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, null);
Assert.assertEquals(expected, chosen);
}
{ //test OPEN
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
- namenode, f, GetOpParam.Op.OPEN, 0, blocksize);
+ namenode, f, GetOpParam.Op.OPEN, 0, blocksize, null);
Assert.assertEquals(expected, chosen);
}
{ //test APPEND
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
- namenode, f, PostOpParam.Op.APPEND, -1L, blocksize);
+ namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, null);
Assert.assertEquals(expected, chosen);
}
} finally {
cluster.shutdown();
}
}
+
+ @Test
+ public void testExcludeDataNodes() throws Exception {
+ final Configuration conf = WebHdfsTestUtil.createConf();
+ final String[] racks = {RACK0, RACK0, RACK1, RACK1, RACK2, RACK2};
+ final String[] hosts = {"DataNode1", "DataNode2", "DataNode3","DataNode4","DataNode5","DataNode6"};
+ final int nDataNodes = hosts.length;
+ LOG.info("nDataNodes=" + nDataNodes + ", racks=" + Arrays.asList(racks)
+ + ", hosts=" + Arrays.asList(hosts));
+
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .hosts(hosts).numDataNodes(nDataNodes).racks(racks).build();
+
+ try {
+ cluster.waitActive();
+
+ final DistributedFileSystem dfs = cluster.getFileSystem();
+ final NameNode namenode = cluster.getNameNode();
+ final DatanodeManager dm = namenode.getNamesystem().getBlockManager(
+ ).getDatanodeManager();
+ LOG.info("dm=" + dm);
+
+ final long blocksize = DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
+ final String f = "/foo";
+
+ //create a file with three replica.
+ final Path p = new Path(f);
+ final FSDataOutputStream out = dfs.create(p, (short)3);
+ out.write(1);
+ out.close();
+
+ //get replica location.
+ final LocatedBlocks locatedblocks = NameNodeAdapter.getBlockLocations(
+ namenode, f, 0, 1);
+ final List lb = locatedblocks.getLocatedBlocks();
+ Assert.assertEquals(1, lb.size());
+ final DatanodeInfo[] locations = lb.get(0).getLocations();
+ Assert.assertEquals(3, locations.length);
+
+
+ //For GETFILECHECKSUM, OPEN and APPEND,
+ //the chosen datanode must be different with exclude nodes.
+
+ StringBuffer sb = new StringBuffer();
+ for (int i = 0; i < 2; i++) {
+ sb.append(locations[i].getXferAddr());
+ { // test GETFILECHECKSUM
+ final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
+ namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize,
+ sb.toString());
+ for (int j = 0; j <= i; j++) {
+ Assert.assertNotEquals(locations[j].getHostName(),
+ chosen.getHostName());
+ }
+ }
+
+ { // test OPEN
+ final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
+ namenode, f, GetOpParam.Op.OPEN, 0, blocksize, sb.toString());
+ for (int j = 0; j <= i; j++) {
+ Assert.assertNotEquals(locations[j].getHostName(),
+ chosen.getHostName());
+ }
+ }
+
+ { // test APPEND
+ final DatanodeInfo chosen = NamenodeWebHdfsMethods
+ .chooseDatanode(namenode, f, PostOpParam.Op.APPEND, -1L,
+ blocksize, sb.toString());
+ for (int j = 0; j <= i; j++) {
+ Assert.assertNotEquals(locations[j].getHostName(),
+ chosen.getHostName());
+ }
+ }
+
+ sb.append(",");
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/security/TestRefreshUserMappings.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/security/TestRefreshUserMappings.java
index aadc6b0a417..72776e03ceb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/security/TestRefreshUserMappings.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/security/TestRefreshUserMappings.java
@@ -151,8 +151,10 @@ public void testRefreshSuperUserGroupsConfiguration() throws Exception {
final String [] GROUP_NAMES2 = new String [] {"gr3" , "gr4"};
//keys in conf
- String userKeyGroups = DefaultImpersonationProvider.getProxySuperuserGroupConfKey(SUPER_USER);
- String userKeyHosts = DefaultImpersonationProvider.getProxySuperuserIpConfKey (SUPER_USER);
+ String userKeyGroups = DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserGroupConfKey(SUPER_USER);
+ String userKeyHosts = DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserIpConfKey (SUPER_USER);
config.set(userKeyGroups, "gr3,gr4,gr5"); // superuser can proxy for this group
config.set(userKeyHosts,"127.0.0.1");
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 4665703b286..e1fa4445d55 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -49,6 +49,11 @@ Release 2.6.0 - UNRELEASED
YARN-1341. Recover NMTokens upon nodemanager restart. (Jason Lowe via
junping_du)
+ YARN-2208. AMRMTokenManager need to have a way to roll over AMRMToken. (xgong)
+
+ YARN-2323. FairShareComparator creates too many Resource objects (Hong Zhiguo
+ via Sandy Ryza)
+
OPTIMIZATIONS
BUG FIXES
@@ -69,6 +74,9 @@ Release 2.6.0 - UNRELEASED
after RM recovery but before scheduler learns about apps and app-attempts.
(Jian He via vinodkv)
+ YARN-2244. FairScheduler missing handling of containers for unknown
+ application attempts. (Anubhav Dhoot via kasha)
+
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java
index 99495d7ad0c..bc2d7c5624a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java
@@ -44,6 +44,7 @@ public class AMRMTokenIdentifier extends TokenIdentifier {
public static final Text KIND_NAME = new Text("YARN_AM_RM_TOKEN");
private ApplicationAttemptId applicationAttemptId;
+ private int keyId = Integer.MIN_VALUE;
public AMRMTokenIdentifier() {
}
@@ -53,6 +54,13 @@ public AMRMTokenIdentifier(ApplicationAttemptId appAttemptId) {
this.applicationAttemptId = appAttemptId;
}
+ public AMRMTokenIdentifier(ApplicationAttemptId appAttemptId,
+ int masterKeyId) {
+ this();
+ this.applicationAttemptId = appAttemptId;
+ this.keyId = masterKeyId;
+ }
+
@Private
public ApplicationAttemptId getApplicationAttemptId() {
return this.applicationAttemptId;
@@ -64,6 +72,7 @@ public void write(DataOutput out) throws IOException {
out.writeLong(appId.getClusterTimestamp());
out.writeInt(appId.getId());
out.writeInt(this.applicationAttemptId.getAttemptId());
+ out.writeInt(this.keyId);
}
@Override
@@ -75,6 +84,7 @@ public void readFields(DataInput in) throws IOException {
ApplicationId.newInstance(clusterTimeStamp, appId);
this.applicationAttemptId =
ApplicationAttemptId.newInstance(applicationId, attemptId);
+ this.keyId = in.readInt();
}
@Override
@@ -92,6 +102,10 @@ public UserGroupInformation getUser() {
.toString());
}
+ public int getKeyId() {
+ return this.keyId;
+ }
+
// TODO: Needed?
@InterfaceAudience.Private
public static class Renewer extends Token.TrivialRenewer {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index dbcf64fc391..64657ad7ccf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -774,11 +774,9 @@ public void transition(RMAppAttemptImpl appAttempt,
}
// create AMRMToken
- AMRMTokenIdentifier id =
- new AMRMTokenIdentifier(appAttempt.applicationAttemptId);
appAttempt.amrmToken =
- new Token(id,
- appAttempt.rmContext.getAMRMTokenSecretManager());
+ appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+ appAttempt.applicationAttemptId);
// Add the applicationAttempt to the scheduler and inform the scheduler
// whether to transfer the state from previous attempt.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index b3e835a54d3..5764c8c7a65 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -123,6 +123,23 @@ public Resource getMaximumResourceCapability() {
return maximumAllocation;
}
+ protected void containerLaunchedOnNode(ContainerId containerId,
+ SchedulerNode node) {
+ // Get the application for the finished container
+ SchedulerApplicationAttempt application = getCurrentAttemptForContainer
+ (containerId);
+ if (application == null) {
+ LOG.info("Unknown application "
+ + containerId.getApplicationAttemptId().getApplicationId()
+ + " launched container " + containerId + " on node: " + node);
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
+ return;
+ }
+
+ application.containerLaunchedOnNode(containerId, node.getNodeID());
+ }
+
public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
SchedulerApplication app =
applications.get(applicationAttemptId.getApplicationId());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 6d26519ff98..26812388aaf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -69,7 +69,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -866,21 +865,6 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
}
- private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
- // Get the application for the finished container
- FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
- if (application == null) {
- LOG.info("Unknown application "
- + containerId.getApplicationAttemptId().getApplicationId()
- + " launched container " + containerId + " on node: " + node);
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
- return;
- }
-
- application.containerLaunchedOnNode(containerId, node.getNodeID());
- }
-
@Override
public void handle(SchedulerEvent event) {
switch(event.getType()) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 3a847ce7589..18ccf9d8a93 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -928,22 +928,6 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
}
}
- /**
- * Process a container which has launched on a node, as reported by the node.
- */
- private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
- // Get the application for the finished container
- FSSchedulerApp application = getCurrentAttemptForContainer(containerId);
- if (application == null) {
- LOG.info("Unknown application "
- + containerId.getApplicationAttemptId().getApplicationId()
- + " launched container " + containerId + " on node: " + node);
- return;
- }
-
- application.containerLaunchedOnNode(containerId, node.getNodeID());
- }
-
/**
* Process a heartbeat update from a node.
*/
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
index 5976cea5230..c51852fa9d6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
@@ -65,6 +65,7 @@ public String getName() {
private static class FairShareComparator implements Comparator,
Serializable {
private static final long serialVersionUID = 5564969375856699313L;
+ private static final Resource ONE = Resources.createResource(1);
@Override
public int compare(Schedulable s1, Schedulable s2) {
@@ -78,11 +79,10 @@ public int compare(Schedulable s1, Schedulable s2) {
s1.getResourceUsage(), minShare1);
boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
s2.getResourceUsage(), minShare2);
- Resource one = Resources.createResource(1);
minShareRatio1 = (double) s1.getResourceUsage().getMemory()
- / Resources.max(RESOURCE_CALCULATOR, null, minShare1, one).getMemory();
+ / Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemory();
minShareRatio2 = (double) s2.getResourceUsage().getMemory()
- / Resources.max(RESOURCE_CALCULATOR, null, minShare2, one).getMemory();
+ / Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemory();
useToWeightRatio1 = s1.getResourceUsage().getMemory() /
s1.getWeights().getWeight(ResourceType.MEMORY);
useToWeightRatio2 = s2.getResourceUsage().getMemory() /
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 571d0558c04..518a8d9d3b6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -66,7 +66,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
@@ -831,23 +830,6 @@ public void handle(SchedulerEvent event) {
}
}
- private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
- // Get the application for the finished container
- FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
- if (application == null) {
- LOG.info("Unknown application "
- + containerId.getApplicationAttemptId().getApplicationId()
- + " launched container " + containerId + " on node: " + node);
- // Some unknown container sneaked into the system. Kill it.
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
-
- return;
- }
-
- application.containerLaunchedOnNode(containerId, node.getNodeID());
- }
-
@Lock(FifoScheduler.class)
private synchronized void containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java
index 5d21ec08885..c498b529bf6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java
@@ -19,22 +19,28 @@
package org.apache.hadoop.yarn.server.resourcemanager.security;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.security.SecureRandom;
+import java.util.HashSet;
+import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
-
-import javax.crypto.SecretKey;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
+
+import com.google.common.annotations.VisibleForTesting;
/**
* AMRM-tokens are per ApplicationAttempt. If users redistribute their
@@ -49,40 +55,66 @@ public class AMRMTokenSecretManager extends
private static final Log LOG = LogFactory
.getLog(AMRMTokenSecretManager.class);
- private SecretKey masterKey;
+ private int serialNo = new SecureRandom().nextInt();
+ private MasterKeyData nextMasterKey;
+ private MasterKeyData currentMasterKey;
+
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ private final Lock readLock = readWriteLock.readLock();
+ private final Lock writeLock = readWriteLock.writeLock();
+
private final Timer timer;
private final long rollingInterval;
+ private final long activationDelay;
- private final Map passwords =
- new HashMap();
+ private final Set appAttemptSet =
+ new HashSet();
/**
* Create an {@link AMRMTokenSecretManager}
*/
public AMRMTokenSecretManager(Configuration conf) {
- rollMasterKey();
this.timer = new Timer();
this.rollingInterval =
conf
.getLong(
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000;
+ // Adding delay = 1.5 * expiry interval makes sure that all active AMs get
+ // the updated shared-key.
+ this.activationDelay =
+ (long) (conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) * 1.5);
+ LOG.info("AMRMTokenKeyRollingInterval: " + this.rollingInterval
+ + "ms and AMRMTokenKeyActivationDelay: " + this.activationDelay + " ms");
+ if (rollingInterval <= activationDelay * 2) {
+ throw new IllegalArgumentException(
+ YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS
+ + " should be more than 2 X "
+ + YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS);
+ }
}
public void start() {
- this.timer.scheduleAtFixedRate(new MasterKeyRoller(), 0, rollingInterval);
+ if (this.currentMasterKey == null) {
+ this.currentMasterKey = createNewMasterKey();
+ }
+ this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval,
+ rollingInterval);
}
public void stop() {
this.timer.cancel();
}
- public synchronized void applicationMasterFinished(
- ApplicationAttemptId appAttemptId) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Application finished, removing password for " + appAttemptId);
+ public void applicationMasterFinished(ApplicationAttemptId appAttemptId) {
+ this.writeLock.lock();
+ try {
+ LOG.info("Application finished, removing password for " + appAttemptId);
+ this.appAttemptSet.remove(appAttemptId);
+ } finally {
+ this.writeLock.unlock();
}
- this.passwords.remove(appAttemptId);
}
private class MasterKeyRoller extends TimerTask {
@@ -93,49 +125,89 @@ public void run() {
}
@Private
- public synchronized void setMasterKey(SecretKey masterKey) {
- this.masterKey = masterKey;
- }
-
- @Private
- public synchronized SecretKey getMasterKey() {
- return this.masterKey;
- }
-
- @Private
- synchronized void rollMasterKey() {
- LOG.info("Rolling master-key for amrm-tokens");
- this.masterKey = generateSecret();
- }
-
- /**
- * Create a password for a given {@link AMRMTokenIdentifier}. Used to
- * send to the AppicationAttempt which can give it back during authentication.
- */
- @Override
- public synchronized byte[] createPassword(
- AMRMTokenIdentifier identifier) {
- ApplicationAttemptId applicationAttemptId =
- identifier.getApplicationAttemptId();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Creating password for " + applicationAttemptId);
+ void rollMasterKey() {
+ this.writeLock.lock();
+ try {
+ LOG.info("Rolling master-key for amrm-tokens");
+ this.nextMasterKey = createNewMasterKey();
+ this.timer.schedule(new NextKeyActivator(), this.activationDelay);
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ private class NextKeyActivator extends TimerTask {
+ @Override
+ public void run() {
+ activateNextMasterKey();
+ }
+ }
+
+ public void activateNextMasterKey() {
+ this.writeLock.lock();
+ try {
+ LOG.info("Activating next master key with id: "
+ + this.nextMasterKey.getMasterKey().getKeyId());
+ this.currentMasterKey = this.nextMasterKey;
+ this.nextMasterKey = null;
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ @Private
+ @VisibleForTesting
+ public MasterKeyData createNewMasterKey() {
+ this.writeLock.lock();
+ try {
+ return new MasterKeyData(serialNo++, generateSecret());
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ public Token createAndGetAMRMToken(
+ ApplicationAttemptId appAttemptId) {
+ this.writeLock.lock();
+ try {
+ LOG.info("Create AMRMToken for ApplicationAttempt: " + appAttemptId);
+ AMRMTokenIdentifier identifier =
+ new AMRMTokenIdentifier(appAttemptId, getMasterKey().getMasterKey()
+ .getKeyId());
+ byte[] password = this.createPassword(identifier);
+ appAttemptSet.add(appAttemptId);
+ return new Token(identifier.getBytes(), password,
+ identifier.getKind(), new Text());
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ // If nextMasterKey is not Null, then return nextMasterKey
+ // otherwise return currentMasterKey
+ @VisibleForTesting
+ public MasterKeyData getMasterKey() {
+ this.readLock.lock();
+ try {
+ return nextMasterKey == null ? currentMasterKey : nextMasterKey;
+ } finally {
+ this.readLock.unlock();
}
- byte[] password = createPassword(identifier.getBytes(), masterKey);
- this.passwords.put(applicationAttemptId, password);
- return password;
}
/**
* Populate persisted password of AMRMToken back to AMRMTokenSecretManager.
*/
- public synchronized void
- addPersistedPassword(Token token) throws IOException {
- AMRMTokenIdentifier identifier = token.decodeIdentifier();
- if (LOG.isDebugEnabled()) {
+ public void addPersistedPassword(Token token)
+ throws IOException {
+ this.writeLock.lock();
+ try {
+ AMRMTokenIdentifier identifier = token.decodeIdentifier();
LOG.debug("Adding password for " + identifier.getApplicationAttemptId());
+ appAttemptSet.add(identifier.getApplicationAttemptId());
+ } finally {
+ this.writeLock.unlock();
}
- this.passwords.put(identifier.getApplicationAttemptId(),
- token.getPassword());
}
/**
@@ -143,19 +215,35 @@ public synchronized byte[] createPassword(
* Used by RPC layer to validate a remote {@link AMRMTokenIdentifier}.
*/
@Override
- public synchronized byte[] retrievePassword(
- AMRMTokenIdentifier identifier) throws InvalidToken {
- ApplicationAttemptId applicationAttemptId =
- identifier.getApplicationAttemptId();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to retrieve password for " + applicationAttemptId);
+ public byte[] retrievePassword(AMRMTokenIdentifier identifier)
+ throws InvalidToken {
+ this.readLock.lock();
+ try {
+ ApplicationAttemptId applicationAttemptId =
+ identifier.getApplicationAttemptId();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to retrieve password for " + applicationAttemptId);
+ }
+ if (!appAttemptSet.contains(applicationAttemptId)) {
+ throw new InvalidToken("Password not found for ApplicationAttempt "
+ + applicationAttemptId);
+ }
+ if (identifier.getKeyId() == this.currentMasterKey.getMasterKey()
+ .getKeyId()) {
+ return createPassword(identifier.getBytes(),
+ this.currentMasterKey.getSecretKey());
+ } else if (nextMasterKey != null
+ && identifier.getKeyId() == this.nextMasterKey.getMasterKey()
+ .getKeyId()) {
+ return createPassword(identifier.getBytes(),
+ this.nextMasterKey.getSecretKey());
+ }
+ throw new InvalidToken("Given AMRMToken for application : "
+ + applicationAttemptId.toString()
+ + " seems to have been generated illegally.");
+ } finally {
+ this.readLock.unlock();
}
- byte[] password = this.passwords.get(applicationAttemptId);
- if (password == null) {
- throw new InvalidToken("Password not found for ApplicationAttempt "
- + applicationAttemptId);
- }
- return password;
}
/**
@@ -167,4 +255,40 @@ public AMRMTokenIdentifier createIdentifier() {
return new AMRMTokenIdentifier();
}
+ @Private
+ @VisibleForTesting
+ public MasterKeyData getCurrnetMasterKeyData() {
+ this.readLock.lock();
+ try {
+ return this.currentMasterKey;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Private
+ @VisibleForTesting
+ public MasterKeyData getNextMasterKeyData() {
+ this.readLock.lock();
+ try {
+ return this.nextMasterKey;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ @Private
+ protected byte[] createPassword(AMRMTokenIdentifier identifier) {
+ this.readLock.lock();
+ try {
+ ApplicationAttemptId applicationAttemptId =
+ identifier.getApplicationAttemptId();
+ LOG.info("Creating password for " + applicationAttemptId);
+ return createPassword(identifier.getBytes(), getMasterKey()
+ .getSecretKey());
+ } finally {
+ this.readLock.unlock();
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
index 45ccd1c3016..e88ebd24f3b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
@@ -232,20 +232,7 @@ protected Dispatcher createDispatcher() {
containerStatuses.put(app.getApplicationId(), containerStatusList);
NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true);
- dispatcher.await();
- List contsToClean = resp.getContainersToCleanup();
- int cleanedConts = contsToClean.size();
- waitCount = 0;
- while (cleanedConts < 1 && waitCount++ < 200) {
- LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts);
- Thread.sleep(100);
- resp = nm1.nodeHeartbeat(true);
- dispatcher.await();
- contsToClean = resp.getContainersToCleanup();
- cleanedConts += contsToClean.size();
- }
- LOG.info("Got cleanup for " + contsToClean.get(0));
- Assert.assertEquals(1, cleanedConts);
+ waitForContainerCleanup(dispatcher, nm1, resp);
// Now to test the case when RM already gave cleanup, and NM suddenly
// realizes that the container is running.
@@ -258,26 +245,36 @@ protected Dispatcher createDispatcher() {
containerStatuses.put(app.getApplicationId(), containerStatusList);
resp = nm1.nodeHeartbeat(containerStatuses, true);
- dispatcher.await();
- contsToClean = resp.getContainersToCleanup();
- cleanedConts = contsToClean.size();
// The cleanup list won't be instantaneous as it is given out by scheduler
// and not RMNodeImpl.
- waitCount = 0;
- while (cleanedConts < 1 && waitCount++ < 200) {
- LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts);
- Thread.sleep(100);
- resp = nm1.nodeHeartbeat(true);
- dispatcher.await();
- contsToClean = resp.getContainersToCleanup();
- cleanedConts += contsToClean.size();
- }
- LOG.info("Got cleanup for " + contsToClean.get(0));
- Assert.assertEquals(1, cleanedConts);
+ waitForContainerCleanup(dispatcher, nm1, resp);
rm.stop();
}
-
+
+ protected void waitForContainerCleanup(DrainDispatcher dispatcher, MockNM nm,
+ NodeHeartbeatResponse resp) throws Exception {
+ int waitCount = 0, cleanedConts = 0;
+ List contsToClean;
+ do {
+ dispatcher.await();
+ contsToClean = resp.getContainersToCleanup();
+ cleanedConts += contsToClean.size();
+ if (cleanedConts >= 1) {
+ break;
+ }
+ Thread.sleep(100);
+ resp = nm.nodeHeartbeat(true);
+ } while(waitCount++ < 200);
+
+ if (contsToClean.isEmpty()) {
+ LOG.error("Failed to get any containers to cleanup");
+ } else {
+ LOG.info("Got cleanup for " + contsToClean.get(0));
+ }
+ Assert.assertEquals(1, cleanedConts);
+ }
+
private void waitForAppCleanupMessageRecved(MockNM nm, ApplicationId appId)
throws Exception {
while (true) {
@@ -400,6 +397,58 @@ public void testAppCleanupWhenRMRestartedBeforeAppFinished() throws Exception {
rm2.stop();
}
+ @SuppressWarnings("resource")
+ @Test (timeout = 60000)
+ public void testContainerCleanupWhenRMRestartedAppNotRegistered() throws
+ Exception {
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ // start RM
+ final DrainDispatcher dispatcher = new DrainDispatcher();
+ MockRM rm1 = new MockRM(conf, memStore) {
+ @Override
+ protected Dispatcher createDispatcher() {
+ return dispatcher;
+ }
+ };
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ // create app and launch the AM
+ RMApp app0 = rm1.submitApp(200);
+ MockAM am0 = launchAM(app0, rm1, nm1);
+ nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.RUNNING);
+ rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
+
+ // start new RM
+ final DrainDispatcher dispatcher2 = new DrainDispatcher();
+ MockRM rm2 = new MockRM(conf, memStore) {
+ @Override
+ protected Dispatcher createDispatcher() {
+ return dispatcher2;
+ }
+ };
+ rm2.start();
+
+ // nm1 register to rm2, and do a heartbeat
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ nm1.registerNode(Arrays.asList(app0.getApplicationId()));
+ rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
+
+ // Add unknown container for application unknown to scheduler
+ NodeHeartbeatResponse response = nm1.nodeHeartbeat(am0
+ .getApplicationAttemptId(), 2, ContainerState.RUNNING);
+
+ waitForContainerCleanup(dispatcher2, nm1, response);
+
+ rm1.stop();
+ rm2.stop();
+ }
+
public static void main(String[] args) throws Exception {
TestApplicationCleanup t = new TestApplicationCleanup();
t.testAppCleanup();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index dc3e9f18178..8966af7e2a5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -1250,10 +1250,11 @@ public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
.getEncoded());
// assert AMRMTokenSecretManager also knows about the AMRMToken password
- Token amrmToken = loadedAttempt1.getAMRMToken();
- Assert.assertArrayEquals(amrmToken.getPassword(),
- rm2.getRMContext().getAMRMTokenSecretManager().retrievePassword(
- amrmToken.decodeIdentifier()));
+ // TODO: fix this on YARN-2211
+// Token amrmToken = loadedAttempt1.getAMRMToken();
+// Assert.assertArrayEquals(amrmToken.getPassword(),
+// rm2.getRMContext().getAMRMTokenSecretManager().retrievePassword(
+// amrmToken.decodeIdentifier()));
rm1.stop();
rm2.stop();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 6ac23a2ad57..04f034818c0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -25,6 +25,7 @@
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.spy;
import java.util.ArrayList;
import java.util.HashMap;
@@ -34,7 +35,6 @@
import javax.crypto.SecretKey;
import org.junit.Assert;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -67,6 +67,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.util.ConverterUtils;
public class RMStateStoreTestBase extends ClientBaseWithFixes{
@@ -175,8 +176,11 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
TestDispatcher dispatcher = new TestDispatcher();
store.setRMDispatcher(dispatcher);
- AMRMTokenSecretManager appTokenMgr =
- new AMRMTokenSecretManager(conf);
+ AMRMTokenSecretManager appTokenMgr = spy(
+ new AMRMTokenSecretManager(conf));
+ MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey();
+ when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData);
+
ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
new ClientToAMTokenSecretManagerInRM();
@@ -455,10 +459,8 @@ public void testRMDTSecretManagerStateStore(
private Token generateAMRMToken(
ApplicationAttemptId attemptId,
AMRMTokenSecretManager appTokenMgr) {
- AMRMTokenIdentifier appTokenId =
- new AMRMTokenIdentifier(attemptId);
Token appToken =
- new Token(appTokenId, appTokenMgr);
+ appTokenMgr.createAndGetAMRMToken(attemptId);
appToken.setService(new Text("appToken service"));
return appToken;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
index c99987d7add..ca0fc3960a0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
@@ -97,6 +97,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@@ -224,6 +225,8 @@ public void setUp() throws Exception {
amLivelinessMonitor = mock(AMLivelinessMonitor.class);
amFinishingMonitor = mock(AMLivelinessMonitor.class);
writer = mock(RMApplicationHistoryWriter.class);
+ MasterKeyData masterKeyData = amRMTokenManager.createNewMasterKey();
+ when(amRMTokenManager.getMasterKey()).thenReturn(masterKeyData);
rmContext =
new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java
index 64602bd888e..b11aadd7912 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java
@@ -23,13 +23,12 @@
import java.util.Arrays;
import java.util.Collection;
-import javax.crypto.SecretKey;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -41,7 +40,9 @@
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
@@ -50,6 +51,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
@@ -65,6 +67,8 @@ public class TestAMRMTokens {
private final Configuration conf;
private static final int maxWaitAttempts = 50;
+ private static final int rolling_interval_sec = 13;
+ private static final long am_expire_ms = 4000;
@Parameters
public static Collection