Merge branch 'trunk' into HDFS-6581
This commit is contained in:
commit
849ccfa699
|
@ -530,7 +530,8 @@ Release 2.6.0 - UNRELEASED
|
|||
|
||||
HADOOP-10922. User documentation for CredentialShell. (Larry McCay via wang)
|
||||
|
||||
HDFS-6843. Create FileStatus isEncrypted() method (clamb via cmccabe)
|
||||
HADOOP-11016. KMS should support signing cookies with zookeeper secret
|
||||
manager. (tucu)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
|
@ -723,11 +724,8 @@ Release 2.6.0 - UNRELEASED
|
|||
HADOOP-11056. OsSecureRandom.setConf() might leak file descriptors (yzhang
|
||||
via cmccabe)
|
||||
|
||||
HDFS-6912. SharedFileDescriptorFactory should not allocate sparse files
|
||||
(cmccabe)
|
||||
|
||||
HDFS-7075. hadoop-fuse-dfs fails because it cannot find
|
||||
JavaKeyStoreProvider$Factory (cmccabe)
|
||||
HADOOP-11040. Return value of read(ByteBuffer buf) in CryptoInputStream is
|
||||
incorrect in some cases. (Yi Liu via wang)
|
||||
|
||||
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
|
|
|
@ -471,7 +471,16 @@ public class CryptoInputStream extends FilterInputStream implements
|
|||
streamOffset += n; // Read n bytes
|
||||
decrypt(buf, n, pos);
|
||||
}
|
||||
return n;
|
||||
|
||||
if (n >= 0) {
|
||||
return unread + n;
|
||||
} else {
|
||||
if (unread == 0) {
|
||||
return -1;
|
||||
} else {
|
||||
return unread;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
throw new UnsupportedOperationException("ByteBuffer read unsupported " +
|
||||
|
|
|
@ -469,6 +469,7 @@ public abstract class CryptoStreamsTestBase {
|
|||
int bufPos) throws Exception {
|
||||
buf.position(bufPos);
|
||||
int n = ((ByteBufferReadable) in).read(buf);
|
||||
Assert.assertEquals(bufPos + n, buf.position());
|
||||
byte[] readData = new byte[n];
|
||||
buf.rewind();
|
||||
buf.position(bufPos);
|
||||
|
@ -568,6 +569,7 @@ public abstract class CryptoStreamsTestBase {
|
|||
// Read forward len1
|
||||
ByteBuffer buf = ByteBuffer.allocate(len1);
|
||||
int nRead = ((ByteBufferReadable) in).read(buf);
|
||||
Assert.assertEquals(nRead, buf.position());
|
||||
readData = new byte[nRead];
|
||||
buf.rewind();
|
||||
buf.get(readData);
|
||||
|
@ -575,9 +577,10 @@ public abstract class CryptoStreamsTestBase {
|
|||
System.arraycopy(data, (int)pos, expectedData, 0, nRead);
|
||||
Assert.assertArrayEquals(readData, expectedData);
|
||||
|
||||
// Pos should be len1 + 2 * len2 + nRead
|
||||
long lastPos = pos;
|
||||
// Pos should be lastPos + nRead
|
||||
pos = ((Seekable) in).getPos();
|
||||
Assert.assertEquals(len1 + 2 * len2 + nRead, pos);
|
||||
Assert.assertEquals(lastPos + nRead, pos);
|
||||
|
||||
// Pos: 1/3 dataLen
|
||||
positionedReadCheck(in , dataLen / 3);
|
||||
|
@ -589,13 +592,15 @@ public abstract class CryptoStreamsTestBase {
|
|||
System.arraycopy(data, (int)pos, expectedData, 0, len1);
|
||||
Assert.assertArrayEquals(readData, expectedData);
|
||||
|
||||
// Pos should be 2 * len1 + 2 * len2 + nRead
|
||||
lastPos = pos;
|
||||
// Pos should be lastPos + len1
|
||||
pos = ((Seekable) in).getPos();
|
||||
Assert.assertEquals(2 * len1 + 2 * len2 + nRead, pos);
|
||||
Assert.assertEquals(lastPos + len1, pos);
|
||||
|
||||
// Read forward len1
|
||||
buf = ByteBuffer.allocate(len1);
|
||||
nRead = ((ByteBufferReadable) in).read(buf);
|
||||
Assert.assertEquals(nRead, buf.position());
|
||||
readData = new byte[nRead];
|
||||
buf.rewind();
|
||||
buf.get(readData);
|
||||
|
@ -603,6 +608,11 @@ public abstract class CryptoStreamsTestBase {
|
|||
System.arraycopy(data, (int)pos, expectedData, 0, nRead);
|
||||
Assert.assertArrayEquals(readData, expectedData);
|
||||
|
||||
lastPos = pos;
|
||||
// Pos should be lastPos + nRead
|
||||
pos = ((Seekable) in).getPos();
|
||||
Assert.assertEquals(lastPos + nRead, pos);
|
||||
|
||||
// ByteBuffer read after EOF
|
||||
((Seekable) in).seek(dataLen);
|
||||
buf.clear();
|
||||
|
|
|
@ -187,6 +187,11 @@
|
|||
<artifactId>metrics-core</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.curator</groupId>
|
||||
<artifactId>curator-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
|
||||
<!-- KMS Backend KeyProvider -->
|
||||
<property>
|
||||
<name>hadoop.security.key.provider.path</name>
|
||||
<name>hadoop.kms.key.provider.uri</name>
|
||||
<value>jceks://file@/${user.home}/kms.keystore</value>
|
||||
<description>
|
||||
</description>
|
||||
|
@ -68,4 +68,61 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<!-- Authentication cookie signature source -->
|
||||
|
||||
<property>
|
||||
<name>hadoop.kms.authentication.signer.secret.provider</name>
|
||||
<value>random</value>
|
||||
<description>
|
||||
Indicates how the secret to sign the authentication cookies will be
|
||||
stored. Options are 'random' (default), 'string' and 'zookeeper'.
|
||||
If using a setup with multiple KMS instances, 'zookeeper' should be used.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<!-- Configuration for 'zookeeper' authentication cookie signature source -->
|
||||
|
||||
<property>
|
||||
<name>hadoop.kms.authentication.signer.secret.provider.zookeeper.path</name>
|
||||
<value>/hadoop-kms/hadoop-auth-signature-secret</value>
|
||||
<description>
|
||||
The Zookeeper ZNode path where the KMS instances will store and retrieve
|
||||
the secret from.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hadoop.kms.authentication.signer.secret.provider.zookeeper.connection.string</name>
|
||||
<value>#HOSTNAME#:#PORT#,...</value>
|
||||
<description>
|
||||
The Zookeeper connection string, a list of hostnames and port comma
|
||||
separated.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hadoop.kms.authentication.signer.secret.provider.zookeeper.auth.type</name>
|
||||
<value>kerberos</value>
|
||||
<description>
|
||||
The Zookeeper authentication type, 'none' or 'sasl' (Kerberos).
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hadoop.kms.authentication.signer.secret.provider.zookeeper.kerberos.keytab</name>
|
||||
<value>/etc/hadoop/conf/kms.keytab</value>
|
||||
<description>
|
||||
The absolute path for the Kerberos keytab with the credentials to
|
||||
connect to Zookeeper.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hadoop.kms.authentication.signer.secret.provider.zookeeper.kerberos.principal</name>
|
||||
<value>kms/#HOSTNAME#</value>
|
||||
<description>
|
||||
The Kerberos service principal used to connect to Zookeeper.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
|
|
@ -46,7 +46,8 @@ import java.util.Properties;
|
|||
@InterfaceAudience.Private
|
||||
public class KMSAuthenticationFilter
|
||||
extends DelegationTokenAuthenticationFilter {
|
||||
private static final String CONF_PREFIX = KMSConfiguration.CONFIG_PREFIX +
|
||||
|
||||
public static final String CONFIG_PREFIX = KMSConfiguration.CONFIG_PREFIX +
|
||||
"authentication.";
|
||||
|
||||
@Override
|
||||
|
@ -56,9 +57,9 @@ public class KMSAuthenticationFilter
|
|||
Configuration conf = KMSWebApp.getConfiguration();
|
||||
for (Map.Entry<String, String> entry : conf) {
|
||||
String name = entry.getKey();
|
||||
if (name.startsWith(CONF_PREFIX)) {
|
||||
if (name.startsWith(CONFIG_PREFIX)) {
|
||||
String value = conf.get(name);
|
||||
name = name.substring(CONF_PREFIX.length());
|
||||
name = name.substring(CONFIG_PREFIX.length());
|
||||
props.setProperty(name, value);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,6 +40,10 @@ public class KMSConfiguration {
|
|||
public static final String KEY_ACL_PREFIX = "key.acl.";
|
||||
public static final String DEFAULT_KEY_ACL_PREFIX = "default.key.acl.";
|
||||
|
||||
// Property to set the backing KeyProvider
|
||||
public static final String KEY_PROVIDER_URI = CONFIG_PREFIX +
|
||||
"key.provider.uri";
|
||||
|
||||
// Property to Enable/Disable Caching
|
||||
public static final String KEY_CACHE_ENABLE = CONFIG_PREFIX +
|
||||
"cache.enable";
|
||||
|
|
|
@ -39,6 +39,7 @@ import javax.servlet.ServletContextEvent;
|
|||
import javax.servlet.ServletContextListener;
|
||||
|
||||
import java.io.File;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -159,17 +160,12 @@ public class KMSWebApp implements ServletContextListener {
|
|||
new AccessControlList(AccessControlList.WILDCARD_ACL_VALUE));
|
||||
|
||||
// intializing the KeyProvider
|
||||
|
||||
List<KeyProvider> providers = KeyProviderFactory.getProviders(kmsConf);
|
||||
if (providers.isEmpty()) {
|
||||
String providerString = kmsConf.get(KMSConfiguration.KEY_PROVIDER_URI);
|
||||
if (providerString == null) {
|
||||
throw new IllegalStateException("No KeyProvider has been defined");
|
||||
}
|
||||
if (providers.size() > 1) {
|
||||
LOG.warn("There is more than one KeyProvider configured '{}', using " +
|
||||
"the first provider",
|
||||
kmsConf.get(KeyProviderFactory.KEY_PROVIDER_PATH));
|
||||
}
|
||||
KeyProvider keyProvider = providers.get(0);
|
||||
KeyProvider keyProvider =
|
||||
KeyProviderFactory.get(new URI(providerString), kmsConf);
|
||||
if (kmsConf.getBoolean(KMSConfiguration.KEY_CACHE_ENABLE,
|
||||
KMSConfiguration.KEY_CACHE_ENABLE_DEFAULT)) {
|
||||
long keyTimeOutMillis =
|
||||
|
|
|
@ -51,7 +51,7 @@ Hadoop Key Management Server (KMS) - Documentation Sets ${project.version}
|
|||
|
||||
+---+
|
||||
<property>
|
||||
<name>hadoop.security.key.provider.path</name>
|
||||
<name>hadoop.kms.key.provider.uri</name>
|
||||
<value>jceks://file@/${user.home}/kms.keystore</value>
|
||||
</property>
|
||||
|
||||
|
@ -448,16 +448,16 @@ $ keytool -genkey -alias tomcat -keyalg RSA
|
|||
KMS supports access control for all non-read operations at the Key level.
|
||||
All Key Access operations are classified as :
|
||||
|
||||
* MANAGEMENT - createKey, deleteKey, rolloverNewVersion
|
||||
* MANAGEMENT - createKey, deleteKey, rolloverNewVersion
|
||||
|
||||
* GENERATE_EEK - generateEncryptedKey, warmUpEncryptedKeys
|
||||
* GENERATE_EEK - generateEncryptedKey, warmUpEncryptedKeys
|
||||
|
||||
* DECRYPT_EEK - decryptEncryptedKey;
|
||||
* DECRYPT_EEK - decryptEncryptedKey
|
||||
|
||||
* READ - getKeyVersion, getKeyVersions, getMetadata, getKeysMetadata,
|
||||
getCurrentKey;
|
||||
* READ - getKeyVersion, getKeyVersions, getMetadata, getKeysMetadata,
|
||||
getCurrentKey
|
||||
|
||||
* ALL - all of the above;
|
||||
* ALL - all of the above
|
||||
|
||||
These can be defined in the KMS <<<etc/hadoop/kms-acls.xml>>> as follows
|
||||
|
||||
|
@ -554,41 +554,124 @@ $ keytool -genkey -alias tomcat -keyalg RSA
|
|||
KMS delegation token secret manager can be configured with the following
|
||||
properties:
|
||||
|
||||
+---+
|
||||
<property>
|
||||
<name>hadoop.kms.authentication.delegation-token.update-interval.sec</name>
|
||||
<value>86400</value>
|
||||
<description>
|
||||
How often the master key is rotated, in seconds. Default value 1 day.
|
||||
</description>
|
||||
</property>
|
||||
+---+
|
||||
<property>
|
||||
<name>hadoop.kms.authentication.delegation-token.update-interval.sec</name>
|
||||
<value>86400</value>
|
||||
<description>
|
||||
How often the master key is rotated, in seconds. Default value 1 day.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hadoop.kms.authentication.delegation-token.max-lifetime.sec</name>
|
||||
<value>604800</value>
|
||||
<description>
|
||||
Maximum lifetime of a delagation token, in seconds. Default value 7 days.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hadoop.kms.authentication.delegation-token.max-lifetime.sec</name>
|
||||
<value>604800</value>
|
||||
<description>
|
||||
Maximum lifetime of a delagation token, in seconds. Default value 7 days.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hadoop.kms.authentication.delegation-token.renew-interval.sec</name>
|
||||
<value>86400</value>
|
||||
<description>
|
||||
Renewal interval of a delagation token, in seconds. Default value 1 day.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hadoop.kms.authentication.delegation-token.renew-interval.sec</name>
|
||||
<value>86400</value>
|
||||
<description>
|
||||
Renewal interval of a delagation token, in seconds. Default value 1 day.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hadoop.kms.authentication.delegation-token.removal-scan-interval.sec</name>
|
||||
<value>3600</value>
|
||||
<description>
|
||||
Scan interval to remove expired delegation tokens.
|
||||
</description>
|
||||
</property>
|
||||
+---+
|
||||
<property>
|
||||
<name>hadoop.kms.authentication.delegation-token.removal-scan-interval.sec</name>
|
||||
<value>3600</value>
|
||||
<description>
|
||||
Scan interval to remove expired delegation tokens.
|
||||
</description>
|
||||
</property>
|
||||
+---+
|
||||
|
||||
|
||||
** Using Multiple Instances of KMS Behind a Load-Balancer or VIP
|
||||
|
||||
KMS supports multiple KMS instances behind a load-balancer or VIP for
|
||||
scalability and for HA purposes.
|
||||
|
||||
When using multiple KMS instances behind a load-balancer or VIP, requests from
|
||||
the same user may be handled by different KMS instances.
|
||||
|
||||
KMS instances behind a load-balancer or VIP must be specially configured to
|
||||
work properly as a single logical service.
|
||||
|
||||
*** HTTP Kerberos Principals Configuration
|
||||
|
||||
TBD
|
||||
|
||||
*** HTTP Authentication Signature
|
||||
|
||||
KMS uses Hadoop Authentication for HTTP authentication. Hadoop Authentication
|
||||
issues a signed HTTP Cookie once the client has authenticated successfully.
|
||||
This HTTP Cookie has an expiration time, after which it will trigger a new
|
||||
authentication sequence. This is done to avoid triggering the authentication
|
||||
on every HTTP request of a client.
|
||||
|
||||
A KMS instance must verify the HTTP Cookie signatures signed by other KMS
|
||||
instances. To do this all KMS instances must share the signing secret.
|
||||
|
||||
This secret sharing can be done using a Zookeeper service which is configured
|
||||
in KMS with the following properties in the <<<kms-site.xml>>>:
|
||||
|
||||
+---+
|
||||
<property>
|
||||
<name>hadoop.kms.authentication.signer.secret.provider</name>
|
||||
<value>zookeeper</value>
|
||||
<description>
|
||||
Indicates how the secret to sign the authentication cookies will be
|
||||
stored. Options are 'random' (default), 'string' and 'zookeeper'.
|
||||
If using a setup with multiple KMS instances, 'zookeeper' should be used.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hadoop.kms.authentication.signer.secret.provider.zookeeper.path</name>
|
||||
<value>/hadoop-kms/hadoop-auth-signature-secret</value>
|
||||
<description>
|
||||
The Zookeeper ZNode path where the KMS instances will store and retrieve
|
||||
the secret from.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hadoop.kms.authentication.signer.secret.provider.zookeeper.connection.string</name>
|
||||
<value>#HOSTNAME#:#PORT#,...</value>
|
||||
<description>
|
||||
The Zookeeper connection string, a list of hostnames and port comma
|
||||
separated.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hadoop.kms.authentication.signer.secret.provider.zookeeper.auth.type</name>
|
||||
<value>kerberos</value>
|
||||
<description>
|
||||
The Zookeeper authentication type, 'none' or 'sasl' (Kerberos).
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hadoop.kms.authentication.signer.secret.provider.zookeeper.kerberos.keytab</name>
|
||||
<value>/etc/hadoop/conf/kms.keytab</value>
|
||||
<description>
|
||||
The absolute path for the Kerberos keytab with the credentials to
|
||||
connect to Zookeeper.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hadoop.kms.authentication.signer.secret.provider.zookeeper.kerberos.principal</name>
|
||||
<value>kms/#HOSTNAME#</value>
|
||||
<description>
|
||||
The Kerberos service principal used to connect to Zookeeper.
|
||||
</description>
|
||||
</property>
|
||||
+---+
|
||||
|
||||
*** Delegation Tokens
|
||||
|
||||
TBD
|
||||
|
||||
** KMS HTTP REST API
|
||||
|
||||
*** Create a Key
|
||||
|
|
|
@ -166,7 +166,7 @@ public class MiniKMS {
|
|||
File kmsFile = new File(kmsConfDir, "kms-site.xml");
|
||||
if (!kmsFile.exists()) {
|
||||
Configuration kms = new Configuration(false);
|
||||
kms.set("hadoop.security.key.provider.path",
|
||||
kms.set(KMSConfiguration.KEY_PROVIDER_URI,
|
||||
"jceks://file@" + new Path(kmsConfDir, "kms.keystore").toUri());
|
||||
kms.set("hadoop.kms.authentication.type", "simple");
|
||||
kms.setBoolean(KMSConfiguration.KEY_AUTHORIZATION_ENABLE, false);
|
||||
|
|
|
@ -117,13 +117,14 @@ public class TestKMS {
|
|||
|
||||
protected Configuration createBaseKMSConf(File keyStoreDir) throws Exception {
|
||||
Configuration conf = new Configuration(false);
|
||||
conf.set("hadoop.security.key.provider.path",
|
||||
conf.set(KMSConfiguration.KEY_PROVIDER_URI,
|
||||
"jceks://file@" + new Path(keyStoreDir.getAbsolutePath(), "kms.keystore").toUri());
|
||||
conf.set("hadoop.kms.authentication.type", "simple");
|
||||
return conf;
|
||||
}
|
||||
|
||||
protected void writeConf(File confDir, Configuration conf) throws Exception {
|
||||
public static void writeConf(File confDir, Configuration conf)
|
||||
throws Exception {
|
||||
Writer writer = new FileWriter(new File(confDir,
|
||||
KMSConfiguration.KMS_SITE_XML));
|
||||
conf.writeXml(writer);
|
||||
|
@ -139,7 +140,7 @@ public class TestKMS {
|
|||
writer.close();
|
||||
}
|
||||
|
||||
protected URI createKMSUri(URL kmsUrl) throws Exception {
|
||||
public static URI createKMSUri(URL kmsUrl) throws Exception {
|
||||
String str = kmsUrl.toString();
|
||||
str = str.replaceFirst("://", "@");
|
||||
return new URI("kms://" + str);
|
||||
|
|
|
@ -0,0 +1,179 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.crypto.key.kms.server;
|
||||
|
||||
import org.apache.curator.test.TestingServer;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider.Options;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
|
||||
import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
|
||||
import org.apache.hadoop.crypto.key.kms.KMSRESTConstants;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.minikdc.MiniKdc;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
|
||||
import org.apache.hadoop.security.authentication.util.ZKSignerSecretProvider;
|
||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.security.auth.Subject;
|
||||
import javax.security.auth.kerberos.KerberosPrincipal;
|
||||
import javax.security.auth.login.AppConfigurationEntry;
|
||||
import javax.security.auth.login.LoginContext;
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.Writer;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.security.Principal;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
public class TestKMSWithZK {
|
||||
|
||||
protected Configuration createBaseKMSConf(File keyStoreDir) throws Exception {
|
||||
Configuration conf = new Configuration(false);
|
||||
conf.set("hadoop.security.key.provider.path",
|
||||
"jceks://file@" + new Path(keyStoreDir.getAbsolutePath(),
|
||||
"kms.keystore").toUri());
|
||||
conf.set("hadoop.kms.authentication.type", "simple");
|
||||
conf.setBoolean(KMSConfiguration.KEY_AUTHORIZATION_ENABLE, false);
|
||||
|
||||
conf.set(KMSACLs.Type.GET_KEYS.getAclConfigKey(), "foo");
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleKMSInstancesWithZKSigner() throws Exception {
|
||||
final File testDir = TestKMS.getTestDir();
|
||||
Configuration conf = createBaseKMSConf(testDir);
|
||||
|
||||
TestingServer zkServer = new TestingServer();
|
||||
zkServer.start();
|
||||
|
||||
MiniKMS kms1 = null;
|
||||
MiniKMS kms2 = null;
|
||||
|
||||
conf.set(KMSAuthenticationFilter.CONFIG_PREFIX +
|
||||
AuthenticationFilter.SIGNER_SECRET_PROVIDER, "zookeeper");
|
||||
conf.set(KMSAuthenticationFilter.CONFIG_PREFIX +
|
||||
ZKSignerSecretProvider.ZOOKEEPER_CONNECTION_STRING,
|
||||
zkServer.getConnectString());
|
||||
conf.set(KMSAuthenticationFilter.CONFIG_PREFIX +
|
||||
ZKSignerSecretProvider.ZOOKEEPER_PATH, "/secret");
|
||||
TestKMS.writeConf(testDir, conf);
|
||||
|
||||
try {
|
||||
kms1 = new MiniKMS.Builder()
|
||||
.setKmsConfDir(testDir).setLog4jConfFile("log4j.properties").build();
|
||||
kms1.start();
|
||||
|
||||
kms2 = new MiniKMS.Builder()
|
||||
.setKmsConfDir(testDir).setLog4jConfFile("log4j.properties").build();
|
||||
kms2.start();
|
||||
|
||||
final URL url1 = new URL(kms1.getKMSUrl().toExternalForm() +
|
||||
KMSRESTConstants.SERVICE_VERSION + "/" +
|
||||
KMSRESTConstants.KEYS_NAMES_RESOURCE);
|
||||
final URL url2 = new URL(kms2.getKMSUrl().toExternalForm() +
|
||||
KMSRESTConstants.SERVICE_VERSION + "/" +
|
||||
KMSRESTConstants.KEYS_NAMES_RESOURCE);
|
||||
|
||||
final DelegationTokenAuthenticatedURL.Token token =
|
||||
new DelegationTokenAuthenticatedURL.Token();
|
||||
final DelegationTokenAuthenticatedURL aUrl =
|
||||
new DelegationTokenAuthenticatedURL();
|
||||
|
||||
UserGroupInformation ugiFoo = UserGroupInformation.createUserForTesting(
|
||||
"foo", new String[]{"gfoo"});
|
||||
UserGroupInformation ugiBar = UserGroupInformation.createUserForTesting(
|
||||
"bar", new String[]{"gBar"});
|
||||
|
||||
ugiFoo.doAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
HttpURLConnection conn = aUrl.openConnection(url1, token);
|
||||
Assert.assertEquals(HttpURLConnection.HTTP_OK,
|
||||
conn.getResponseCode());
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
ugiBar.doAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
HttpURLConnection conn = aUrl.openConnection(url2, token);
|
||||
Assert.assertEquals(HttpURLConnection.HTTP_OK,
|
||||
conn.getResponseCode());
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
ugiBar.doAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
final DelegationTokenAuthenticatedURL.Token emptyToken =
|
||||
new DelegationTokenAuthenticatedURL.Token();
|
||||
HttpURLConnection conn = aUrl.openConnection(url2, emptyToken);
|
||||
Assert.assertEquals(HttpURLConnection.HTTP_FORBIDDEN,
|
||||
conn.getResponseCode());
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
} finally {
|
||||
if (kms2 != null) {
|
||||
kms2.stop();
|
||||
}
|
||||
if (kms1 != null) {
|
||||
kms1.stop();
|
||||
}
|
||||
zkServer.stop();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -471,6 +471,10 @@ Release 2.6.0 - UNRELEASED
|
|||
HDFS-6705. Create an XAttr that disallows the HDFS admin from accessing a
|
||||
file. (clamb via wang)
|
||||
|
||||
HDFS-6843. Create FileStatus isEncrypted() method (clamb via cmccabe)
|
||||
|
||||
HDFS-7004. Update KeyProvider instantiation to create by URI. (wang)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-6690. Deduplicate xattr names in memory. (wang)
|
||||
|
@ -670,6 +674,14 @@ Release 2.6.0 - UNRELEASED
|
|||
and TestDFSClientFailover.testDoesntDnsResolveLogicalURI failing on jdk7.
|
||||
(Akira Ajisaka via wang)
|
||||
|
||||
HDFS-6912. SharedFileDescriptorFactory should not allocate sparse files
|
||||
(cmccabe)
|
||||
|
||||
HDFS-7075. hadoop-fuse-dfs fails because it cannot find
|
||||
JavaKeyStoreProvider$Factory (cmccabe)
|
||||
|
||||
HDFS-7078. Fix listEZs to work correctly with snapshots. (wang)
|
||||
|
||||
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HDFS-6387. HDFS CLI admin tool for creating & deleting an
|
||||
|
|
|
@ -594,6 +594,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = "dfs.data.transfer.saslproperties.resolver.class";
|
||||
public static final int DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES_DEFAULT = 100;
|
||||
public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses";
|
||||
public static final String DFS_ENCRYPTION_KEY_PROVIDER_URI = "dfs.encryption.key.provider.uri";
|
||||
|
||||
// Journal-node related configs. These are read on the JN side.
|
||||
public static final String DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir";
|
||||
|
|
|
@ -1794,34 +1794,37 @@ public class DFSUtil {
|
|||
* Creates a new KeyProviderCryptoExtension by wrapping the
|
||||
* KeyProvider specified in the given Configuration.
|
||||
*
|
||||
* @param conf Configuration specifying a single, non-transient KeyProvider.
|
||||
* @param conf Configuration
|
||||
* @return new KeyProviderCryptoExtension, or null if no provider was found.
|
||||
* @throws IOException if the KeyProvider is improperly specified in
|
||||
* the Configuration
|
||||
*/
|
||||
public static KeyProviderCryptoExtension createKeyProviderCryptoExtension(
|
||||
final Configuration conf) throws IOException {
|
||||
final List<KeyProvider> providers = KeyProviderFactory.getProviders(conf);
|
||||
if (providers == null || providers.size() == 0) {
|
||||
final String providerUriStr =
|
||||
conf.get(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, null);
|
||||
// No provider set in conf
|
||||
if (providerUriStr == null) {
|
||||
return null;
|
||||
}
|
||||
if (providers.size() > 1) {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("Found multiple KeyProviders but only one is permitted [");
|
||||
String prefix = " ";
|
||||
for (KeyProvider kp: providers) {
|
||||
builder.append(prefix + kp.toString());
|
||||
prefix = ", ";
|
||||
}
|
||||
builder.append("]");
|
||||
throw new IOException(builder.toString());
|
||||
final URI providerUri;
|
||||
try {
|
||||
providerUri = new URI(providerUriStr);
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
KeyProviderCryptoExtension provider = KeyProviderCryptoExtension
|
||||
.createKeyProviderCryptoExtension(providers.get(0));
|
||||
if (provider.isTransient()) {
|
||||
throw new IOException("KeyProvider " + provider.toString()
|
||||
KeyProvider keyProvider = KeyProviderFactory.get(providerUri, conf);
|
||||
if (keyProvider == null) {
|
||||
throw new IOException("Could not instantiate KeyProvider from " +
|
||||
DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI + " setting of '" +
|
||||
providerUriStr +"'");
|
||||
}
|
||||
if (keyProvider.isTransient()) {
|
||||
throw new IOException("KeyProvider " + keyProvider.toString()
|
||||
+ " was found but it is a transient provider.");
|
||||
}
|
||||
return provider;
|
||||
KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension
|
||||
.createKeyProviderCryptoExtension(keyProvider);
|
||||
return cryptoProvider;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -312,7 +312,22 @@ public class EncryptionZoneManager {
|
|||
|
||||
int count = 0;
|
||||
for (EncryptionZoneInt ezi : tailMap.values()) {
|
||||
zones.add(new EncryptionZone(getFullPathName(ezi),
|
||||
/*
|
||||
Skip EZs that are only present in snapshots. Re-resolve the path to
|
||||
see if the path's current inode ID matches EZ map's INode ID.
|
||||
|
||||
INode#getFullPathName simply calls getParent recursively, so will return
|
||||
the INode's parents at the time it was snapshotted. It will not
|
||||
contain a reference INode.
|
||||
*/
|
||||
final String pathName = getFullPathName(ezi);
|
||||
INodesInPath iip = dir.getINodesInPath(pathName, false);
|
||||
INode lastINode = iip.getLastINode();
|
||||
if (lastINode == null || lastINode.getId() != ezi.getINodeId()) {
|
||||
continue;
|
||||
}
|
||||
// Add the EZ to the result list
|
||||
zones.add(new EncryptionZone(pathName,
|
||||
ezi.getKeyName(), ezi.getINodeId()));
|
||||
count++;
|
||||
if (count >= numResponses) {
|
||||
|
|
|
@ -2147,4 +2147,12 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.encryption.key.provider.uri</name>
|
||||
<description>
|
||||
The KeyProvider to use when interacting with encryption keys used
|
||||
when reading and writing to an encryption zone.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
|
|
@ -85,6 +85,12 @@ Transparent Encryption in HDFS
|
|||
A necessary prerequisite is an instance of the KMS, as well as a backing key store for the KMS.
|
||||
See the {{{../../hadoop-kms/index.html}KMS documentation}} for more information.
|
||||
|
||||
** Configuring the cluster KeyProvider
|
||||
|
||||
*** dfs.encryption.key.provider.uri
|
||||
|
||||
The KeyProvider to use when interacting with encryption keys used when reading and writing to an encryption zone.
|
||||
|
||||
** Selecting an encryption algorithm and codec
|
||||
|
||||
*** hadoop.security.crypto.codec.classes.EXAMPLECIPHERSUITE
|
||||
|
|
|
@ -66,7 +66,7 @@ public class TestCryptoAdminCLI extends CLITestHelperDFS {
|
|||
tmpDir = new File(System.getProperty("test.build.data", "target"),
|
||||
UUID.randomUUID().toString()).getAbsoluteFile();
|
||||
final Path jksPath = new Path(tmpDir.toString(), "test.jks");
|
||||
conf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
|
||||
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
|
||||
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri());
|
||||
|
||||
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
|
|
|
@ -25,7 +25,9 @@ import java.io.PrintWriter;
|
|||
import java.io.RandomAccessFile;
|
||||
import java.io.StringReader;
|
||||
import java.io.StringWriter;
|
||||
import java.net.URI;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
|
@ -125,7 +127,7 @@ public class TestEncryptionZones {
|
|||
// Set up java key store
|
||||
String testRoot = fsHelper.getTestRootDir();
|
||||
testRootDir = new File(testRoot).getAbsoluteFile();
|
||||
conf.set(KeyProviderFactory.KEY_PROVIDER_PATH, getKeyProviderURI());
|
||||
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, getKeyProviderURI());
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
|
||||
// Lower the batch size for testing
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES,
|
||||
|
@ -670,7 +672,8 @@ public class TestEncryptionZones {
|
|||
// Check KeyProvider state
|
||||
// Flushing the KP on the NN, since it caches, and init a test one
|
||||
cluster.getNamesystem().getProvider().flush();
|
||||
KeyProvider provider = KeyProviderFactory.getProviders(conf).get(0);
|
||||
KeyProvider provider = KeyProviderFactory
|
||||
.get(new URI(conf.get(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI)), conf);
|
||||
List<String> keys = provider.getKeys();
|
||||
assertEquals("Expected NN to have created one key per zone", 1,
|
||||
keys.size());
|
||||
|
@ -694,7 +697,7 @@ public class TestEncryptionZones {
|
|||
public void testCreateEZWithNoProvider() throws Exception {
|
||||
// Unset the key provider and make sure EZ ops don't work
|
||||
final Configuration clusterConf = cluster.getConfiguration(0);
|
||||
clusterConf.set(KeyProviderFactory.KEY_PROVIDER_PATH, "");
|
||||
clusterConf.unset(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI);
|
||||
cluster.restartNameNode(true);
|
||||
cluster.waitActive();
|
||||
final Path zone1 = new Path("/zone1");
|
||||
|
@ -706,7 +709,7 @@ public class TestEncryptionZones {
|
|||
assertExceptionContains("since no key provider is available", e);
|
||||
}
|
||||
final Path jksPath = new Path(testRootDir.toString(), "test.jks");
|
||||
clusterConf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
|
||||
clusterConf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
|
||||
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri()
|
||||
);
|
||||
// Try listing EZs as well
|
||||
|
@ -1028,6 +1031,9 @@ public class TestEncryptionZones {
|
|||
*/
|
||||
@Test(timeout = 60000)
|
||||
public void testSnapshotsOnEncryptionZones() throws Exception {
|
||||
final String TEST_KEY2 = "testkey2";
|
||||
DFSTestUtil.createKey(TEST_KEY2, cluster, conf);
|
||||
|
||||
final int len = 8196;
|
||||
final Path zoneParent = new Path("/zones");
|
||||
final Path zone = new Path(zoneParent, "zone");
|
||||
|
@ -1042,7 +1048,8 @@ public class TestEncryptionZones {
|
|||
assertEquals("Got unexpected ez path", zone.toString(),
|
||||
dfsAdmin.getEncryptionZoneForPath(snap1Zone).getPath().toString());
|
||||
|
||||
// Now delete the encryption zone, recreate the dir, and take another snapshot
|
||||
// Now delete the encryption zone, recreate the dir, and take another
|
||||
// snapshot
|
||||
fsWrapper.delete(zone, true);
|
||||
fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
|
||||
final Path snap2 = fs.createSnapshot(zoneParent);
|
||||
|
@ -1051,11 +1058,35 @@ public class TestEncryptionZones {
|
|||
dfsAdmin.getEncryptionZoneForPath(snap2Zone));
|
||||
|
||||
// Create the encryption zone again
|
||||
dfsAdmin.createEncryptionZone(zone, TEST_KEY);
|
||||
dfsAdmin.createEncryptionZone(zone, TEST_KEY2);
|
||||
final Path snap3 = fs.createSnapshot(zoneParent);
|
||||
final Path snap3Zone = new Path(snap3, zone.getName());
|
||||
// Check that snap3's EZ has the correct settings
|
||||
EncryptionZone ezSnap3 = dfsAdmin.getEncryptionZoneForPath(snap3Zone);
|
||||
assertEquals("Got unexpected ez path", zone.toString(),
|
||||
dfsAdmin.getEncryptionZoneForPath(snap3Zone).getPath().toString());
|
||||
ezSnap3.getPath().toString());
|
||||
assertEquals("Unexpected ez key", TEST_KEY2, ezSnap3.getKeyName());
|
||||
// Check that older snapshots still have the old EZ settings
|
||||
EncryptionZone ezSnap1 = dfsAdmin.getEncryptionZoneForPath(snap1Zone);
|
||||
assertEquals("Got unexpected ez path", zone.toString(),
|
||||
ezSnap1.getPath().toString());
|
||||
assertEquals("Unexpected ez key", TEST_KEY, ezSnap1.getKeyName());
|
||||
|
||||
// Check that listEZs only shows the current filesystem state
|
||||
ArrayList<EncryptionZone> listZones = Lists.newArrayList();
|
||||
RemoteIterator<EncryptionZone> it = dfsAdmin.listEncryptionZones();
|
||||
while (it.hasNext()) {
|
||||
listZones.add(it.next());
|
||||
}
|
||||
for (EncryptionZone z: listZones) {
|
||||
System.out.println(z);
|
||||
}
|
||||
assertEquals("Did not expect additional encryption zones!", 1,
|
||||
listZones.size());
|
||||
EncryptionZone listZone = listZones.get(0);
|
||||
assertEquals("Got unexpected ez path", zone.toString(),
|
||||
listZone.getPath().toString());
|
||||
assertEquals("Unexpected ez key", TEST_KEY2, listZone.getKeyName());
|
||||
|
||||
// Verify contents of the snapshotted file
|
||||
final Path snapshottedZoneFile = new Path(
|
||||
|
@ -1063,7 +1094,8 @@ public class TestEncryptionZones {
|
|||
assertEquals("Contents of snapshotted file have changed unexpectedly",
|
||||
contents, DFSTestUtil.readFile(fs, snapshottedZoneFile));
|
||||
|
||||
// Now delete the snapshots out of order and verify the zones are still correct
|
||||
// Now delete the snapshots out of order and verify the zones are still
|
||||
// correct
|
||||
fs.deleteSnapshot(zoneParent, snap2.getName());
|
||||
assertEquals("Got unexpected ez path", zone.toString(),
|
||||
dfsAdmin.getEncryptionZoneForPath(snap1Zone).getPath().toString());
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderFactory;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.FileSystemTestHelper;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -60,7 +59,7 @@ public class TestEncryptionZonesWithHA {
|
|||
fsHelper = new FileSystemTestHelper();
|
||||
String testRoot = fsHelper.getTestRootDir();
|
||||
testRootDir = new File(testRoot).getAbsoluteFile();
|
||||
conf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
|
||||
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
|
||||
JavaKeyStoreProvider.SCHEME_NAME + "://file" + testRootDir + "/test.jks"
|
||||
);
|
||||
|
||||
|
|
|
@ -24,7 +24,6 @@ import java.security.PrivilegedExceptionAction;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderFactory;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileContextTestWrapper;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
|
@ -70,7 +69,7 @@ public class TestReservedRawPaths {
|
|||
String testRoot = fsHelper.getTestRootDir();
|
||||
File testRootDir = new File(testRoot).getAbsoluteFile();
|
||||
final Path jksPath = new Path(testRootDir.toString(), "test.jks");
|
||||
conf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
|
||||
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
|
||||
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri()
|
||||
);
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
|
|
|
@ -235,6 +235,9 @@ Release 2.6.0 - UNRELEASED
|
|||
YARN-2557. Add a parameter "attempt_Failures_Validity_Interval" into
|
||||
DistributedShell (xgong)
|
||||
|
||||
YARN-2001. Added a time threshold for RM to wait before starting container
|
||||
allocations after restart/failover. (Jian He via vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -380,6 +383,12 @@ Release 2.6.0 - UNRELEASED
|
|||
YARN-2558. Updated ContainerTokenIdentifier#read/write to use
|
||||
ContainerId#getContainerId. (Tsuyoshi OZAWA via jianhe)
|
||||
|
||||
YARN-2559. Fixed NPE in SystemMetricsPublisher when retrieving
|
||||
FinalApplicationStatus. (Zhijie Shen via jianhe)
|
||||
|
||||
YARN-1779. Fixed AMRMClient to handle AMRMTokens correctly across
|
||||
ResourceManager work-preserving-restart or failover. (Jian He via vinodkv)
|
||||
|
||||
Release 2.5.1 - 2014-09-05
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -353,6 +353,11 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final boolean DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED =
|
||||
false;
|
||||
|
||||
public static final String RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS =
|
||||
RM_PREFIX + "work-preserving-recovery.scheduling-wait-ms";
|
||||
public static final long DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS =
|
||||
10000;
|
||||
|
||||
/** Zookeeper interaction configs */
|
||||
public static final String RM_ZK_PREFIX = RM_PREFIX + "zk-";
|
||||
|
||||
|
|
|
@ -53,7 +53,7 @@ public class TestUnmanagedAMLauncher {
|
|||
.getLog(TestUnmanagedAMLauncher.class);
|
||||
|
||||
protected static MiniYARNCluster yarnCluster = null;
|
||||
protected static Configuration conf = new Configuration();
|
||||
protected static Configuration conf = new YarnConfiguration();
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws InterruptedException, IOException {
|
||||
|
|
|
@ -756,6 +756,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token
|
||||
.getIdentifier().array(), token.getPassword().array(), new Text(
|
||||
token.getKind()), new Text(token.getService()));
|
||||
amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConfig()));
|
||||
UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser();
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
currentUGI = UserGroupInformation.getLoginUser();
|
||||
|
|
|
@ -57,7 +57,7 @@ public class TestApplicationMasterServiceOnHA extends ProtocolHATestBase{
|
|||
Token<AMRMTokenIdentifier> appToken =
|
||||
this.cluster.getResourceManager().getRMContext()
|
||||
.getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId);
|
||||
appToken.setService(new Text("appToken service"));
|
||||
appToken.setService(ClientRMProxy.getAMRMTokenService(conf));
|
||||
UserGroupInformation.setLoginUser(UserGroupInformation
|
||||
.createRemoteUser(UserGroupInformation.getCurrentUser()
|
||||
.getUserName()));
|
||||
|
|
|
@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
||||
|
@ -196,6 +197,7 @@ public class TestAMRMClient {
|
|||
// of testing.
|
||||
UserGroupInformation.setLoginUser(UserGroupInformation
|
||||
.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
|
||||
appAttempt.getAMRMToken().setService(ClientRMProxy.getAMRMTokenService(conf));
|
||||
UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
|
||||
}
|
||||
|
||||
|
|
|
@ -22,11 +22,12 @@ import java.io.IOException;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
|
@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
@InterfaceAudience.Public
|
||||
|
@ -70,23 +72,17 @@ public class ClientRMProxy<T> extends RMProxy<T> {
|
|||
return createRMProxy(configuration, protocol, INSTANCE);
|
||||
}
|
||||
|
||||
private static void setupTokens(InetSocketAddress resourceManagerAddress)
|
||||
private static void setAMRMTokenService(final Configuration conf)
|
||||
throws IOException {
|
||||
// It is assumed for now that the only AMRMToken in AM's UGI is for this
|
||||
// cluster/RM. TODO: Fix later when we have some kind of cluster-ID as
|
||||
// default service-address, see YARN-1779.
|
||||
for (Token<? extends TokenIdentifier> token : UserGroupInformation
|
||||
.getCurrentUser().getTokens()) {
|
||||
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
|
||||
// This token needs to be directly provided to the AMs, so set the
|
||||
// appropriate service-name. We'll need more infrastructure when we
|
||||
// need to set it in HA case.
|
||||
SecurityUtil.setTokenService(token, resourceManagerAddress);
|
||||
token.setService(getAMRMTokenService(conf));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@Private
|
||||
@Override
|
||||
protected InetSocketAddress getRMAddress(YarnConfiguration conf,
|
||||
Class<?> protocol) throws IOException {
|
||||
|
@ -100,12 +96,10 @@ public class ClientRMProxy<T> extends RMProxy<T> {
|
|||
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
|
||||
} else if (protocol == ApplicationMasterProtocol.class) {
|
||||
InetSocketAddress serviceAddr =
|
||||
conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
||||
setupTokens(serviceAddr);
|
||||
return serviceAddr;
|
||||
setAMRMTokenService(conf);
|
||||
return conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
||||
} else {
|
||||
String message = "Unsupported protocol found when creating the proxy " +
|
||||
"connection to ResourceManager: " +
|
||||
|
@ -115,7 +109,7 @@ public class ClientRMProxy<T> extends RMProxy<T> {
|
|||
}
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@Private
|
||||
@Override
|
||||
protected void checkAllowedProtocols(Class<?> protocol) {
|
||||
Preconditions.checkArgument(
|
||||
|
@ -132,8 +126,23 @@ public class ClientRMProxy<T> extends RMProxy<T> {
|
|||
* RMDelegationToken for
|
||||
* @return - Service name for RMDelegationToken
|
||||
*/
|
||||
@InterfaceStability.Unstable
|
||||
@Unstable
|
||||
public static Text getRMDelegationTokenService(Configuration conf) {
|
||||
return getTokenService(conf, YarnConfiguration.RM_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_PORT);
|
||||
}
|
||||
|
||||
@Unstable
|
||||
public static Text getAMRMTokenService(Configuration conf) {
|
||||
return getTokenService(conf, YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
||||
}
|
||||
|
||||
@Unstable
|
||||
public static Text getTokenService(Configuration conf, String address,
|
||||
String defaultAddr, int defaultPort) {
|
||||
if (HAUtil.isHAEnabled(conf)) {
|
||||
// Build a list of service addresses to form the service name
|
||||
ArrayList<String> services = new ArrayList<String>();
|
||||
|
@ -142,17 +151,14 @@ public class ClientRMProxy<T> extends RMProxy<T> {
|
|||
// Set RM_ID to get the corresponding RM_ADDRESS
|
||||
yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
|
||||
services.add(SecurityUtil.buildTokenService(
|
||||
yarnConf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_PORT)).toString());
|
||||
yarnConf.getSocketAddr(address, defaultAddr, defaultPort))
|
||||
.toString());
|
||||
}
|
||||
return new Text(Joiner.on(',').join(services));
|
||||
}
|
||||
|
||||
// Non-HA case - no need to set RM_ID
|
||||
return SecurityUtil.buildTokenService(
|
||||
conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_PORT));
|
||||
return SecurityUtil.buildTokenService(conf.getSocketAddr(address,
|
||||
defaultAddr, defaultPort));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,11 +48,18 @@ public class AMRMTokenSelector implements
|
|||
LOG.debug("Token kind is " + token.getKind().toString()
|
||||
+ " and the token's service name is " + token.getService());
|
||||
if (AMRMTokenIdentifier.KIND_NAME.equals(token.getKind())
|
||||
&& service.equals(token.getService())) {
|
||||
&& checkService(service, token)) {
|
||||
return (Token<AMRMTokenIdentifier>) token;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private boolean checkService(Text service,
|
||||
Token<? extends TokenIdentifier> token) {
|
||||
if (service == null || token.getService() == null) {
|
||||
return false;
|
||||
}
|
||||
return token.getService().toString().contains(service.toString());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -297,6 +297,16 @@
|
|||
<value>false</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>Set the amount of time RM waits before allocating new
|
||||
containers on work-preserving-recovery. Such wait period gives RM a chance
|
||||
to settle down resyncing with NMs in the cluster on recovery, before assigning
|
||||
new containers to applications.
|
||||
</description>
|
||||
<name>yarn.resourcemanager.work-preserving-recovery.scheduling-wait-ms</name>
|
||||
<value>10000</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The class to use as the persistent store.
|
||||
|
||||
|
|
|
@ -56,4 +56,34 @@ public class TestClientRMProxy {
|
|||
service.contains(defaultRMAddress));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAMRMTokenService() {
|
||||
String defaultRMAddress = YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS;
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
|
||||
// HA is not enabled
|
||||
Text tokenService = ClientRMProxy.getAMRMTokenService(conf);
|
||||
String[] services = tokenService.toString().split(",");
|
||||
assertEquals(1, services.length);
|
||||
for (String service : services) {
|
||||
assertTrue("Incorrect token service name",
|
||||
service.contains(defaultRMAddress));
|
||||
}
|
||||
|
||||
// HA is enabled
|
||||
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||
conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2");
|
||||
conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm1"),
|
||||
"0.0.0.0");
|
||||
conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm2"),
|
||||
"0.0.0.0");
|
||||
tokenService = ClientRMProxy.getAMRMTokenService(conf);
|
||||
services = tokenService.toString().split(",");
|
||||
assertEquals(2, services.length);
|
||||
for (String service : services) {
|
||||
assertTrue("Incorrect token service name",
|
||||
service.contains(defaultRMAddress));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -108,4 +108,6 @@ public interface RMContext {
|
|||
boolean isWorkPreservingRecoveryEnabled();
|
||||
|
||||
long getEpoch();
|
||||
}
|
||||
|
||||
boolean isSchedulerReadyForAllocatingContainers();
|
||||
}
|
||||
|
|
|
@ -21,6 +21,9 @@ package org.apache.hadoop.yarn.server.resourcemanager;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.yarn.LocalConfigurationProvider;
|
||||
|
@ -44,6 +47,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
|
@ -85,6 +90,13 @@ public class RMContextImpl implements RMContext {
|
|||
private SystemMetricsPublisher systemMetricsPublisher;
|
||||
private ConfigurationProvider configurationProvider;
|
||||
private long epoch;
|
||||
private Clock systemClock = new SystemClock();
|
||||
private long schedulerRecoveryStartTime = 0;
|
||||
private long schedulerRecoveryWaitTime = 0;
|
||||
private boolean printLog = true;
|
||||
private boolean isSchedulerReady = false;
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(RMContextImpl.class);
|
||||
|
||||
/**
|
||||
* Default constructor. To be used in conjunction with setter methods for
|
||||
|
@ -379,7 +391,34 @@ public class RMContextImpl implements RMContext {
|
|||
return this.epoch;
|
||||
}
|
||||
|
||||
void setEpoch(long epoch) {
|
||||
void setEpoch(long epoch) {
|
||||
this.epoch = epoch;
|
||||
}
|
||||
}
|
||||
|
||||
public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
|
||||
this.schedulerRecoveryStartTime = systemClock.getTime();
|
||||
this.schedulerRecoveryWaitTime = waitTime;
|
||||
}
|
||||
|
||||
public boolean isSchedulerReadyForAllocatingContainers() {
|
||||
if (isSchedulerReady) {
|
||||
return isSchedulerReady;
|
||||
}
|
||||
isSchedulerReady = (systemClock.getTime() - schedulerRecoveryStartTime)
|
||||
> schedulerRecoveryWaitTime;
|
||||
if (!isSchedulerReady && printLog) {
|
||||
LOG.info("Skip allocating containers. Scheduler is waiting for recovery.");
|
||||
printLog = false;
|
||||
}
|
||||
if (isSchedulerReady) {
|
||||
LOG.info("Scheduler recovery is done. Start allocating new containers.");
|
||||
}
|
||||
return isSchedulerReady;
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public void setSystemClock(Clock clock) {
|
||||
this.systemClock = clock;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1131,6 +1131,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
|
||||
// recover applications
|
||||
rmAppManager.recover(state);
|
||||
|
||||
setSchedulerRecoveryStartAndWaitTime(state, conf);
|
||||
}
|
||||
|
||||
public static void main(String argv[]) {
|
||||
|
@ -1178,6 +1180,16 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
rmContext.setDispatcher(rmDispatcher);
|
||||
}
|
||||
|
||||
private void setSchedulerRecoveryStartAndWaitTime(RMState state,
|
||||
Configuration conf) {
|
||||
if (!state.getApplicationState().isEmpty()) {
|
||||
long waitTime =
|
||||
conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
|
||||
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
|
||||
rmContext.setSchedulerRecoveryStartAndWaitTime(waitTime);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve RM bind address from configuration
|
||||
*
|
||||
|
|
|
@ -160,7 +160,7 @@ public class SystemMetricsPublisher extends CompositeService {
|
|||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void appAttemptFinished(RMAppAttempt appAttempt,
|
||||
RMAppAttemptState state, long finishedTime) {
|
||||
RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
|
||||
if (publishSystemMetrics) {
|
||||
dispatcher.getEventHandler().handle(
|
||||
new AppAttemptFinishedEvent(
|
||||
|
@ -168,8 +168,10 @@ public class SystemMetricsPublisher extends CompositeService {
|
|||
appAttempt.getTrackingUrl(),
|
||||
appAttempt.getOriginalTrackingUrl(),
|
||||
appAttempt.getDiagnostics(),
|
||||
appAttempt.getFinalApplicationStatus(),
|
||||
RMServerUtils.createApplicationAttemptState(state),
|
||||
// app will get the final status from app attempt, or create one
|
||||
// based on app state if it doesn't exist
|
||||
app.getFinalApplicationStatus(),
|
||||
RMServerUtils.createApplicationAttemptState(appAttemtpState),
|
||||
finishedTime));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1159,8 +1159,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
appAttempt.rmContext.getRMApplicationHistoryWriter()
|
||||
.applicationAttemptFinished(appAttempt, finalAttemptState);
|
||||
appAttempt.rmContext.getSystemMetricsPublisher()
|
||||
.appAttemptFinished(
|
||||
appAttempt, finalAttemptState, System.currentTimeMillis());
|
||||
.appAttemptFinished(appAttempt, finalAttemptState,
|
||||
appAttempt.rmContext.getRMApps().get(
|
||||
appAttempt.applicationAttemptId.getApplicationId()),
|
||||
System.currentTimeMillis());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -902,6 +902,10 @@ public class CapacityScheduler extends
|
|||
}
|
||||
|
||||
private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
|
||||
if (rmContext.isWorkPreservingRecoveryEnabled()
|
||||
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Assign new containers...
|
||||
// 1. Check for reserved applications
|
||||
|
|
|
@ -1015,6 +1015,11 @@ public class FairScheduler extends
|
|||
}
|
||||
|
||||
private synchronized void attemptScheduling(FSSchedulerNode node) {
|
||||
if (rmContext.isWorkPreservingRecoveryEnabled()
|
||||
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Assign new containers...
|
||||
// 1. Check for reserved applications
|
||||
// 2. Schedule if there are no reservations
|
||||
|
|
|
@ -702,6 +702,12 @@ public class FifoScheduler extends
|
|||
completedContainer, RMContainerEventType.FINISHED);
|
||||
}
|
||||
|
||||
|
||||
if (rmContext.isWorkPreservingRecoveryEnabled()
|
||||
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
|
||||
node.getAvailableResource(),minimumAllocation)) {
|
||||
LOG.debug("Node heartbeat " + rmNode.getNodeID() +
|
||||
|
|
|
@ -37,10 +37,12 @@ import org.apache.hadoop.test.GenericTestUtils;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||
|
@ -62,6 +64,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueu
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
@ -479,6 +483,7 @@ public class TestWorkPreservingRMRestart {
|
|||
@Test(timeout = 20000)
|
||||
public void testAMfailedBetweenRMRestart() throws Exception {
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
conf.setLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 0);
|
||||
memStore.init(conf);
|
||||
rm1 = new MockRM(conf, memStore);
|
||||
rm1.start();
|
||||
|
@ -762,4 +767,55 @@ public class TestWorkPreservingRMRestart {
|
|||
Thread.sleep(200);
|
||||
}
|
||||
}
|
||||
|
||||
@Test (timeout = 20000)
|
||||
public void testNewContainersNotAllocatedDuringSchedulerRecovery()
|
||||
throws Exception {
|
||||
conf.setLong(
|
||||
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 4000);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
rm1 = new MockRM(conf, memStore);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
RMApp app1 = rm1.submitApp(200);
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
|
||||
// Restart RM
|
||||
rm2 = new MockRM(conf, memStore);
|
||||
rm2.start();
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
ControlledClock clock = new ControlledClock(new SystemClock());
|
||||
long startTime = System.currentTimeMillis();
|
||||
((RMContextImpl)rm2.getRMContext()).setSystemClock(clock);
|
||||
am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
|
||||
am1.registerAppAttempt(true);
|
||||
rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
|
||||
|
||||
// AM request for new containers
|
||||
am1.allocate("127.0.0.1", 1000, 1, new ArrayList<ContainerId>());
|
||||
|
||||
List<Container> containers = new ArrayList<Container>();
|
||||
clock.setTime(startTime + 2000);
|
||||
nm1.nodeHeartbeat(true);
|
||||
|
||||
// sleep some time as allocation happens asynchronously.
|
||||
Thread.sleep(3000);
|
||||
containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>()).getAllocatedContainers());
|
||||
// container is not allocated during scheduling recovery.
|
||||
Assert.assertTrue(containers.isEmpty());
|
||||
|
||||
clock.setTime(startTime + 8000);
|
||||
nm1.nodeHeartbeat(true);
|
||||
// Container is created after recovery is done.
|
||||
while (containers.isEmpty()) {
|
||||
containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>()).getAllocatedContainers());
|
||||
Thread.sleep(500);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -174,7 +174,9 @@ public class TestSystemMetricsPublisher {
|
|||
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1);
|
||||
RMAppAttempt appAttempt = createRMAppAttempt(appAttemptId);
|
||||
metricsPublisher.appAttemptRegistered(appAttempt, Integer.MAX_VALUE + 1L);
|
||||
metricsPublisher.appAttemptFinished(appAttempt, RMAppAttemptState.FINISHED,
|
||||
RMApp app = mock(RMApp.class);
|
||||
when(app.getFinalApplicationStatus()).thenReturn(FinalApplicationStatus.UNDEFINED);
|
||||
metricsPublisher.appAttemptFinished(appAttempt, RMAppAttemptState.FINISHED, app,
|
||||
Integer.MAX_VALUE + 2L);
|
||||
TimelineEntity entity = null;
|
||||
do {
|
||||
|
@ -222,7 +224,7 @@ public class TestSystemMetricsPublisher {
|
|||
event.getEventInfo().get(
|
||||
AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO));
|
||||
Assert.assertEquals(
|
||||
appAttempt.getFinalApplicationStatus().toString(),
|
||||
FinalApplicationStatus.UNDEFINED.toString(),
|
||||
event.getEventInfo().get(
|
||||
AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO));
|
||||
Assert.assertEquals(
|
||||
|
@ -340,8 +342,6 @@ public class TestSystemMetricsPublisher {
|
|||
when(appAttempt.getTrackingUrl()).thenReturn("test tracking url");
|
||||
when(appAttempt.getOriginalTrackingUrl()).thenReturn(
|
||||
"test original tracking url");
|
||||
when(appAttempt.getFinalApplicationStatus()).thenReturn(
|
||||
FinalApplicationStatus.UNDEFINED);
|
||||
return appAttempt;
|
||||
}
|
||||
|
||||
|
|
|
@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMaste
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
|
||||
|
@ -92,7 +93,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||
|
@ -289,7 +289,6 @@ public class TestRMAppAttemptTransitions {
|
|||
Mockito.doReturn(resourceScheduler).when(spyRMContext).getScheduler();
|
||||
|
||||
|
||||
final String user = MockApps.newUserName();
|
||||
final String queue = MockApps.newQueue();
|
||||
submissionContext = mock(ApplicationSubmissionContext.class);
|
||||
when(submissionContext.getQueue()).thenReturn(queue);
|
||||
|
@ -1385,7 +1384,7 @@ public class TestRMAppAttemptTransitions {
|
|||
finalState =
|
||||
ArgumentCaptor.forClass(RMAppAttemptState.class);
|
||||
verify(publisher).appAttemptFinished(any(RMAppAttempt.class), finalState.capture(),
|
||||
anyLong());
|
||||
any(RMApp.class), anyLong());
|
||||
Assert.assertEquals(state, finalState.getValue());
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue