Merge branch 'trunk' into HADOOP-12756

This commit is contained in:
Kai Zheng 2016-09-20 08:09:29 +08:00
commit 005f4528c7
33 changed files with 490 additions and 177 deletions

View File

@ -29,6 +29,7 @@ import java.util.ServiceLoader;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
/**
* A factory to create a list of KeyProvider based on the path given in a
@ -39,7 +40,7 @@ import org.apache.hadoop.conf.Configuration;
@InterfaceStability.Unstable
public abstract class KeyProviderFactory {
public static final String KEY_PROVIDER_PATH =
"hadoop.security.key.provider.path";
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
public abstract KeyProvider createProvider(URI providerName,
Configuration conf

View File

@ -628,6 +628,14 @@ public class CommonConfigurationKeysPublic {
public static final String HADOOP_SECURITY_IMPERSONATION_PROVIDER_CLASS =
"hadoop.security.impersonation.provider.class";
/**
* @see
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
* core-default.xml</a>
*/
public static final String HADOOP_SECURITY_KEY_PROVIDER_PATH =
"hadoop.security.key.provider.path";
// <!-- KMSClientProvider configurations -->
/**
* @see

View File

@ -2037,6 +2037,14 @@
</description>
</property>
<property>
<name>hadoop.security.key.provider.path</name>
<description>
The KeyProvider to use when managing zone keys, and interacting with
encryption keys when reading and writing to an encryption zone.
</description>
</property>
<property>
<name>fs.har.impl.disable.cache</name>
<value>true</value>

View File

@ -28,6 +28,7 @@ The following table lists the configuration property names that are deprecated i
| dfs.data.dir | dfs.datanode.data.dir |
| dfs.datanode.max.xcievers | dfs.datanode.max.transfer.threads |
| dfs.df.interval | fs.df.interval |
| dfs.encryption.key.provider.uri | hadoop.security.key.provider.path |
| dfs.federation.nameservice.id | dfs.nameservice.id |
| dfs.federation.nameservices | dfs.nameservices |
| dfs.http.address | dfs.namenode.http-address |

View File

@ -37,10 +37,10 @@ KMS Client Configuration
The KMS client `KeyProvider` uses the **kms** scheme, and the embedded URL must be the URL of the KMS. For example, for a KMS running on `http://localhost:9600/kms`, the KeyProvider URI is `kms://http@localhost:9600/kms`. And, for a KMS running on `https://localhost:9600/kms`, the KeyProvider URI is `kms://https@localhost:9600/kms`
The following is an example to configure HDFS NameNode as a KMS client in
`hdfs-site.xml`:
`core-site.xml`:
<property>
<name>dfs.encryption.key.provider.uri</name>
<name>hadoop.security.key.provider.path</name>
<value>kms://http@localhost:9600/kms</value>
<description>
The KeyProvider to use when interacting with encryption keys used
@ -664,15 +664,15 @@ is to use LoadBalancingKMSClientProvider. Using this approach, a KMS client
(for example, a HDFS NameNode) is aware of multiple KMS instances, and it sends
requests to them in a round-robin fashion. LoadBalancingKMSClientProvider is
implicitly used when more than one URI is specified in
`dfs.encryption.key.provider.uri`.
`hadoop.security.key.provider.path`.
The following example in `hdfs-site.xml` configures two KMS
The following example in `core-site.xml` configures two KMS
instances, `kms01.example.com` and `kms02.example.com`.
The hostnames are separated by semi-colons, and all KMS instances must run
on the same port.
<property>
<name>dfs.encryption.key.provider.uri</name>
<name>hadoop.security.key.provider.path</name>
<value>kms://https@kms01.example.com;kms02.example.com:9600/kms</value>
<description>
The KeyProvider to use when interacting with encryption keys used

View File

@ -526,7 +526,7 @@ public class DFSUtilClient {
}
private static String keyProviderUriKeyName =
HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI;
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
/**
* Set the key provider uri configuration key name for creating key providers.
@ -616,16 +616,17 @@ public class DFSUtilClient {
}
/**
* Probe for HDFS Encryption being enabled; this uses the value of
* the option {@link HdfsClientConfigKeys#DFS_ENCRYPTION_KEY_PROVIDER_URI},
* returning true if that property contains a non-empty, non-whitespace
* Probe for HDFS Encryption being enabled; this uses the value of the option
* {@link CommonConfigurationKeysPublic#HADOOP_SECURITY_KEY_PROVIDER_PATH}
* , returning true if that property contains a non-empty, non-whitespace
* string.
* @param conf configuration to probe
* @return true if encryption is considered enabled.
*/
public static boolean isHDFSEncryptionEnabled(Configuration conf) {
return !conf.getTrimmed(
HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "").isEmpty();
return !(conf.getTrimmed(
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH, "")
.isEmpty());
}
public static InetSocketAddress getNNAddress(String address) {

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DeprecatedKeys;
@ -141,6 +142,8 @@ public class HdfsConfiguration extends Configuration {
HdfsClientConfigKeys.DFS_NAMESERVICES),
new DeprecationDelta("dfs.federation.nameservice.id",
DeprecatedKeys.DFS_NAMESERVICE_ID),
new DeprecationDelta("dfs.encryption.key.provider.uri",
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH),
});
}

View File

@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
@ -86,11 +86,11 @@ public class KeyProviderCache {
private URI createKeyProviderURI(Configuration conf) {
final String providerUriStr = conf.getTrimmed(
HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "");
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH, "");
// No provider set in conf
if (providerUriStr.isEmpty()) {
LOG.error("Could not find uri with key ["
+ HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH
+ "] to create a keyProvider !!");
return null;
}

View File

@ -139,7 +139,6 @@ public interface HdfsClientConfigKeys {
"dfs.datanode.kerberos.principal";
String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";
long DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 4 * 1024 * 1024; // 4MB
String DFS_ENCRYPTION_KEY_PROVIDER_URI = "dfs.encryption.key.provider.uri";
String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY =
"dfs.encrypt.data.transfer.cipher.suites";

View File

@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
@ -158,7 +159,8 @@ public class TestHdfsHelper extends TestDirHelper {
FileSystemTestHelper helper = new FileSystemTestHelper();
final String jceksPath = JavaKeyStoreProvider.SCHEME_NAME + "://file" +
new Path(helper.getTestRootDir(), "test.jks").toUri();
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, jceksPath);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
jceksPath);
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
builder.numDataNodes(2);
MiniDFSCluster miniHdfs = builder.build();

View File

@ -30,10 +30,10 @@ import java.util.EnumSet;
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -135,7 +135,7 @@ public class TestRpcProgramNfs3 {
String testRoot = fsHelper.getTestRootDir();
testRootDir = new File(testRoot).getAbsoluteFile();
final Path jksPath = new Path(testRootDir.toString(), "test.jks");
config.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
config.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri());
ProxyUsers.refreshSuperUserGroupsConfiguration(config);

View File

@ -804,8 +804,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
HdfsClientConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY;
public static final int DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES_DEFAULT = 100;
public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses";
public static final String DFS_ENCRYPTION_KEY_PROVIDER_URI =
HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI;
public static final String DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_KEY = "dfs.namenode.edekcacheloader.interval.ms";
public static final int DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_DEFAULT = 1000;
public static final String DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY = "dfs.namenode.edekcacheloader.initial.delay.ms";

View File

@ -2756,14 +2756,6 @@
</description>
</property>
<property>
<name>dfs.encryption.key.provider.uri</name>
<description>
The KeyProvider to use when interacting with encryption keys used
when reading and writing to an encryption zone.
</description>
</property>
<property>
<name>dfs.storage.policy.enabled</name>
<value>true</value>

View File

@ -117,7 +117,7 @@ Once a KMS has been set up and the NameNode and HDFS clients have been correctly
### <a name="Configuring_the_cluster_KeyProvider"></a>Configuring the cluster KeyProvider
#### dfs.encryption.key.provider.uri
#### hadoop.security.key.provider.path
The KeyProvider to use when interacting with encryption keys used when reading and writing to an encryption zone.

View File

@ -35,7 +35,7 @@ import org.apache.hadoop.cli.util.CommandExecutor.Result;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -66,7 +66,7 @@ public class TestCryptoAdminCLI extends CLITestHelperDFS {
tmpDir = GenericTestUtils.getTestDir(UUID.randomUUID().toString());
final Path jksPath = new Path(tmpDir.toString(), "test.jks");
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri());
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
import org.apache.hadoop.crypto.key.kms.server.KMSConfiguration;
import org.apache.hadoop.crypto.key.kms.server.KeyAuthorizationKeyProvider;
import org.apache.hadoop.crypto.key.kms.server.MiniKMS;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystemTestHelper;
@ -190,7 +191,7 @@ public class TestAclsEndToEnd {
"keyadmin,hdfs,user");
conf.set(ProxyUsers.CONF_HADOOP_PROXYUSER + "." + realUser + ".hosts",
"*");
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
getKeyProviderURI());
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
true);

View File

@ -59,6 +59,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -1031,16 +1032,19 @@ public class TestDFSUtil {
@Test
public void testEncryptionProbe() throws Throwable {
Configuration conf = new Configuration(false);
conf.unset(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI);
conf.unset(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
assertFalse("encryption enabled on no provider key",
DFSUtilClient.isHDFSEncryptionEnabled(conf));
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "");
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
"");
assertFalse("encryption enabled on empty provider key",
DFSUtilClient.isHDFSEncryptionEnabled(conf));
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "\n\t\n");
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
"\n\t\n");
assertFalse("encryption enabled on whitespace provider key",
DFSUtilClient.isHDFSEncryptionEnabled(conf));
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "http://hadoop.apache.org");
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
"http://hadoop.apache.org");
assertTrue("encryption disabled on valid provider key",
DFSUtilClient.isHDFSEncryptionEnabled(conf));

View File

@ -150,7 +150,8 @@ public class TestEncryptionZones {
// Set up java key store
String testRoot = fsHelper.getTestRootDir();
testRootDir = new File(testRoot).getAbsoluteFile();
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, getKeyProviderURI());
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
getKeyProviderURI());
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
// Lower the batch size for testing
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES,
@ -845,9 +846,9 @@ public class TestEncryptionZones {
// Check KeyProvider state
// Flushing the KP on the NN, since it caches, and init a test one
cluster.getNamesystem().getProvider().flush();
KeyProvider provider = KeyProviderFactory
.get(new URI(conf.getTrimmed(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI)),
conf);
KeyProvider provider = KeyProviderFactory.get(new URI(conf.getTrimmed(
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH)),
conf);
List<String> keys = provider.getKeys();
assertEquals("Expected NN to have created one key per zone", 1,
keys.size());
@ -931,7 +932,8 @@ public class TestEncryptionZones {
public void testCreateEZWithNoProvider() throws Exception {
// Unset the key provider and make sure EZ ops don't work
final Configuration clusterConf = cluster.getConfiguration(0);
clusterConf.unset(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI);
clusterConf
.unset(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
cluster.restartNameNode(true);
cluster.waitActive();
final Path zone1 = new Path("/zone1");
@ -943,8 +945,9 @@ public class TestEncryptionZones {
assertExceptionContains("since no key provider is available", e);
}
final Path jksPath = new Path(testRootDir.toString(), "test.jks");
clusterConf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri()
clusterConf
.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri()
);
// Try listing EZs as well
assertNumZones(0);

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
@ -62,7 +63,7 @@ public class TestEncryptionZonesWithHA {
fsHelper = new FileSystemTestHelper();
String testRoot = fsHelper.getTestRootDir();
testRootDir = new File(testRoot).getAbsoluteFile();
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
JavaKeyStoreProvider.SCHEME_NAME + "://file" +
new Path(testRootDir.toString(), "test.jks").toUri()
);

View File

@ -24,7 +24,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.junit.Assert;
import org.junit.Test;
@ -94,26 +94,26 @@ public class TestKeyProviderCache {
public void testCache() throws Exception {
KeyProviderCache kpCache = new KeyProviderCache(10000);
Configuration conf = new Configuration();
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
"dummy://foo:bar@test_provider1");
KeyProvider keyProvider1 = kpCache.get(conf);
Assert.assertNotNull("Returned Key Provider is null !!", keyProvider1);
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
"dummy://foo:bar@test_provider1");
KeyProvider keyProvider2 = kpCache.get(conf);
Assert.assertTrue("Different KeyProviders returned !!",
keyProvider1 == keyProvider2);
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
"dummy://test_provider3");
KeyProvider keyProvider3 = kpCache.get(conf);
Assert.assertFalse("Same KeyProviders returned !!",
keyProvider1 == keyProvider3);
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
"dummy://hello:there@test_provider1");
KeyProvider keyProvider4 = kpCache.get(conf);

View File

@ -25,6 +25,7 @@ import java.util.EnumSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileContextTestWrapper;
import org.apache.hadoop.fs.FileStatus;
@ -77,7 +78,7 @@ public class TestReservedRawPaths {
String testRoot = fsHelper.getTestRootDir();
File testRootDir = new File(testRoot).getAbsoluteFile();
final Path jksPath = new Path(testRootDir.toString(), "test.jks");
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri()
);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
import org.apache.hadoop.crypto.key.kms.server.KMSConfiguration;
import org.apache.hadoop.crypto.key.kms.server.MiniKMS;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestWrapper;
import org.apache.hadoop.fs.FileUtil;
@ -237,8 +238,9 @@ public class TestSecureEncryptionZoneWithKMS {
@Before
public void setup() throws Exception {
// Start MiniDFS Cluster
baseConf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
getKeyProviderURI());
baseConf
.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
getKeyProviderURI());
baseConf.setBoolean(DFSConfigKeys
.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
@ -85,7 +86,8 @@ public class TestNestedEncryptionZones {
// Set up java key store
String testRoot = fsHelper.getTestRootDir();
testRootDir = new File(testRoot).getAbsoluteFile();
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, getKeyProviderURI());
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
getKeyProviderURI());
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
// Lower the batch size for testing
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES,

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode.metrics;
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.FileSystemTestWrapper;
import org.apache.hadoop.fs.permission.FsPermission;
@ -642,7 +643,7 @@ public class TestNameNodeMetrics {
// Set up java key store
String testRoot = fsHelper.getTestRootDir();
File testRootDir = new File(testRoot).getAbsoluteFile();
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
JavaKeyStoreProvider.SCHEME_NAME + "://file" +
new Path(testRootDir.toString(), "test.jks").toUri());
conf.setBoolean(DFSConfigKeys

View File

@ -28,7 +28,7 @@ HADOOP_OPTIONAL_TOOLS in hadoop-env.sh has 'hadoop-aws' in the list.
### Features
**NOTE: `s3:` is being phased out. Use `s3n:` or `s3a:` instead.**
**NOTE: `s3:` has been phased out. Use `s3n:` or `s3a:` instead.**
1. The second-generation, `s3n:` filesystem, making it easy to share
data between hadoop and other applications via the S3 object store.
@ -86,38 +86,6 @@ these instructions —and be aware that all issues related to S3 integration
in EMR can only be addressed by Amazon themselves: please raise your issues
with them.
## S3
The `s3://` filesystem is the original S3 store in the Hadoop codebase.
It implements an inode-style filesystem atop S3, and was written to
provide scaleability when S3 had significant limits on the size of blobs.
It is incompatible with any other application's use of data in S3.
It is now deprecated and will be removed in Hadoop 3. Please do not use,
and migrate off data which is on it.
### Dependencies
* `jets3t` jar
* `commons-codec` jar
* `commons-logging` jar
* `httpclient` jar
* `httpcore` jar
* `java-xmlbuilder` jar
### Authentication properties
<property>
<name>fs.s3.awsAccessKeyId</name>
<description>AWS access key ID</description>
</property>
<property>
<name>fs.s3.awsSecretAccessKey</name>
<description>AWS secret key</description>
</property>
## S3N
S3N was the first S3 Filesystem client which used "native" S3 objects, hence
@ -171,16 +139,16 @@ it should be used wherever possible.
### Other properties
<property>
<name>fs.s3.buffer.dir</name>
<name>fs.s3n.buffer.dir</name>
<value>${hadoop.tmp.dir}/s3</value>
<description>Determines where on the local filesystem the s3:/s3n: filesystem
<description>Determines where on the local filesystem the s3n: filesystem
should store files before sending them to S3
(or after retrieving them from S3).
</description>
</property>
<property>
<name>fs.s3.maxRetries</name>
<name>fs.s3n.maxRetries</name>
<value>4</value>
<description>The maximum number of retries for reading or writing files to
S3, before we signal failure to the application.
@ -188,7 +156,7 @@ it should be used wherever possible.
</property>
<property>
<name>fs.s3.sleepTimeSeconds</name>
<name>fs.s3n.sleepTimeSeconds</name>
<value>10</value>
<description>The number of seconds to sleep between each S3 retry.
</description>
@ -1011,7 +979,7 @@ includes `distcp`.
### `ClassNotFoundException: org.apache.hadoop.fs.s3a.S3AFileSystem`
(or `org.apache.hadoop.fs.s3native.NativeS3FileSystem`, `org.apache.hadoop.fs.s3.S3FileSystem`).
(or `org.apache.hadoop.fs.s3native.NativeS3FileSystem`).
These are the Hadoop classes, found in the `hadoop-aws` JAR. An exception
reporting one of these classes is missing means that this JAR is not on

View File

@ -165,8 +165,8 @@ import static org.apache.hadoop.service.Service.STATE.STARTED;
public class ContainerManagerImpl extends CompositeService implements
ContainerManager {
private enum ReinitOp {
UPGRADE, COMMIT, ROLLBACK, LOCALIZE;
private enum ReInitOp {
RE_INIT, COMMIT, ROLLBACK, LOCALIZE;
}
/**
* Extra duration to wait for applications to be killed on shutdown.
@ -1535,7 +1535,7 @@ public class ContainerManagerImpl extends CompositeService implements
ContainerId containerId = request.getContainerId();
Container container = preUpgradeOrLocalizeCheck(containerId,
ReinitOp.LOCALIZE);
ReInitOp.LOCALIZE);
try {
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
container.getResourceSet().addResources(request.getLocalResources());
@ -1551,16 +1551,31 @@ public class ContainerManagerImpl extends CompositeService implements
return ResourceLocalizationResponse.newInstance();
}
public void upgradeContainer(ContainerId containerId,
ContainerLaunchContext upgradeLaunchContext) throws YarnException {
/**
* ReInitialize a container using a new Launch Context. If the
* retryFailureContext is not provided, The container is
* terminated on Failure.
*
* NOTE: Auto-Commit is true by default. This also means that the rollback
* context is purged as soon as the command to start the new process
* is sent. (The Container moves to RUNNING state)
*
* @param containerId Container Id.
* @param autoCommit Auto Commit flag.
* @param reInitLaunchContext Target Launch Context.
* @throws YarnException Yarn Exception.
*/
public void reInitializeContainer(ContainerId containerId,
ContainerLaunchContext reInitLaunchContext, boolean autoCommit)
throws YarnException {
Container container = preUpgradeOrLocalizeCheck(containerId,
ReinitOp.UPGRADE);
ReInitOp.RE_INIT);
ResourceSet resourceSet = new ResourceSet();
try {
resourceSet.addResources(upgradeLaunchContext.getLocalResources());
resourceSet.addResources(reInitLaunchContext.getLocalResources());
dispatcher.getEventHandler().handle(
new ContainerReInitEvent(containerId, upgradeLaunchContext,
resourceSet));
new ContainerReInitEvent(containerId, reInitLaunchContext,
resourceSet, autoCommit));
container.setIsReInitializing(true);
} catch (URISyntaxException e) {
LOG.info("Error when parsing local resource URI for upgrade of" +
@ -1569,8 +1584,41 @@ public class ContainerManagerImpl extends CompositeService implements
}
}
/**
* Rollback the last reInitialization, if possible.
* @param containerId Container ID.
* @throws YarnException Yarn Exception.
*/
public void rollbackReInitialization(ContainerId containerId)
throws YarnException {
Container container = preUpgradeOrLocalizeCheck(containerId,
ReInitOp.ROLLBACK);
if (container.canRollback()) {
dispatcher.getEventHandler().handle(
new ContainerEvent(containerId, ContainerEventType.ROLLBACK_REINIT));
} else {
throw new YarnException("Nothing to rollback to !!");
}
}
/**
* Commit last reInitialization after which no rollback will be possible.
* @param containerId Container ID.
* @throws YarnException Yarn Exception.
*/
public void commitReInitialization(ContainerId containerId)
throws YarnException {
Container container = preUpgradeOrLocalizeCheck(containerId,
ReInitOp.COMMIT);
if (container.canRollback()) {
container.commitUpgrade();
} else {
throw new YarnException("Nothing to Commit !!");
}
}
private Container preUpgradeOrLocalizeCheck(ContainerId containerId,
ReinitOp op) throws YarnException {
ReInitOp op) throws YarnException {
Container container = context.getContainers().get(containerId);
if (container == null) {
throw new YarnException("Specified " + containerId + " does not exist!");

View File

@ -82,4 +82,8 @@ public interface Container extends EventHandler<ContainerEvent> {
void setIsReInitializing(boolean isReInitializing);
boolean isReInitializing();
boolean canRollback();
void commitUpgrade();
}

View File

@ -26,6 +26,7 @@ public enum ContainerEventType {
UPDATE_DIAGNOSTICS_MSG,
CONTAINER_DONE,
REINITIALIZE_CONTAINER,
ROLLBACK_REINIT,
// DownloadManager
CONTAINER_INITED,

View File

@ -91,14 +91,42 @@ import org.apache.hadoop.yarn.util.resource.Resources;
public class ContainerImpl implements Container {
private final static class ReInitializationContext {
private final ResourceSet resourceSet;
private static final class ReInitializationContext {
private final ContainerLaunchContext newLaunchContext;
private final ResourceSet newResourceSet;
// Rollback state
private final ContainerLaunchContext oldLaunchContext;
private final ResourceSet oldResourceSet;
private ReInitializationContext(ContainerLaunchContext newLaunchContext,
ResourceSet resourceSet) {
ResourceSet newResourceSet,
ContainerLaunchContext oldLaunchContext,
ResourceSet oldResourceSet) {
this.newLaunchContext = newLaunchContext;
this.resourceSet = resourceSet;
this.newResourceSet = newResourceSet;
this.oldLaunchContext = oldLaunchContext;
this.oldResourceSet = oldResourceSet;
}
private boolean canRollback() {
return (oldLaunchContext != null);
}
private ResourceSet mergedResourceSet() {
if (oldLaunchContext == null) {
return newResourceSet;
}
return ResourceSet.merge(oldResourceSet, newResourceSet);
}
private ReInitializationContext createContextForRollback() {
if (oldLaunchContext == null) {
return null;
} else {
return new ReInitializationContext(
oldLaunchContext, oldResourceSet, null, null);
}
}
}
@ -129,7 +157,7 @@ public class ContainerImpl implements Container {
private String logDir;
private String host;
private String ips;
private ReInitializationContext reInitContext;
private volatile ReInitializationContext reInitContext;
private volatile boolean isReInitializing = false;
/** The NM-wide configuration - not specific to this container */
@ -187,8 +215,8 @@ public class ContainerImpl implements Container {
}
// Configure the Retry Context
this.containerRetryContext =
configureRetryContext(conf, launchContext, this.containerId);
this.containerRetryContext = configureRetryContext(
conf, launchContext, this.containerId);
this.remainingRetryAttempts = this.containerRetryContext.getMaxRetries();
stateMachine = stateMachineFactory.make(this);
this.context = context;
@ -320,12 +348,16 @@ public class ContainerImpl implements Container {
new ExitedWithSuccessTransition(true))
.addTransition(ContainerState.RUNNING,
EnumSet.of(ContainerState.RELAUNCHING,
ContainerState.LOCALIZED,
ContainerState.EXITED_WITH_FAILURE),
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new RetryFailureTransition())
.addTransition(ContainerState.RUNNING, ContainerState.REINITIALIZING,
ContainerEventType.REINITIALIZE_CONTAINER,
new ReInitializeContainerTransition())
.addTransition(ContainerState.RUNNING, ContainerState.REINITIALIZING,
ContainerEventType.ROLLBACK_REINIT,
new RollbackContainerTransition())
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
ContainerEventType.RESOURCE_LOCALIZED,
new ResourceLocalizedWhileRunningTransition())
@ -884,15 +916,15 @@ public class ContainerImpl implements Container {
@SuppressWarnings("unchecked")
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
container.reInitContext = createReInitContext(event);
container.reInitContext = createReInitContext(container, event);
try {
Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
pendingResources =
container.reInitContext.resourceSet.getAllResourcesByVisibility();
if (!pendingResources.isEmpty()) {
resByVisibility = container.reInitContext.newResourceSet
.getAllResourcesByVisibility();
if (!resByVisibility.isEmpty()) {
container.dispatcher.getEventHandler().handle(
new ContainerLocalizationRequestEvent(
container, pendingResources));
container, resByVisibility));
} else {
// We are not waiting on any resources, so...
// Kill the current container.
@ -909,10 +941,30 @@ public class ContainerImpl implements Container {
}
protected ReInitializationContext createReInitContext(
ContainerEvent event) {
ContainerReInitEvent rEvent = (ContainerReInitEvent)event;
return new ReInitializationContext(rEvent.getReInitLaunchContext(),
rEvent.getResourceSet());
ContainerImpl container, ContainerEvent event) {
ContainerReInitEvent reInitEvent = (ContainerReInitEvent)event;
return new ReInitializationContext(
reInitEvent.getReInitLaunchContext(),
reInitEvent.getResourceSet(),
// If AutoCommit is turned on, then no rollback can happen...
// So don't need to store the previous context.
(reInitEvent.isAutoCommit() ? null : container.launchContext),
(reInitEvent.isAutoCommit() ? null : container.resourceSet));
}
}
/**
* Transition to start the Rollback process.
*/
static class RollbackContainerTransition extends
ReInitializeContainerTransition {
@Override
protected ReInitializationContext createReInitContext(ContainerImpl
container, ContainerEvent event) {
LOG.warn("Container [" + container.getContainerId() + "]" +
" about to be explicitly Rolledback !!");
return container.reInitContext.createContextForRollback();
}
}
@ -928,10 +980,10 @@ public class ContainerImpl implements Container {
public void transition(ContainerImpl container, ContainerEvent event) {
ContainerResourceLocalizedEvent rsrcEvent =
(ContainerResourceLocalizedEvent) event;
container.reInitContext.resourceSet.resourceLocalized(
container.reInitContext.newResourceSet.resourceLocalized(
rsrcEvent.getResource(), rsrcEvent.getLocation());
// Check if all ResourceLocalization has completed
if (container.reInitContext.resourceSet.getPendingResources()
if (container.reInitContext.newResourceSet.getPendingResources()
.isEmpty()) {
// Kill the current container.
container.dispatcher.getEventHandler().handle(
@ -1028,10 +1080,13 @@ public class ContainerImpl implements Container {
container.metrics.runningContainer();
container.wasLaunched = true;
if (container.reInitContext != null) {
container.setIsReInitializing(false);
// Check if this launch was due to a re-initialization.
// If autocommit == true, then wipe the re-init context. This ensures
// that any subsequent failures do not trigger a rollback.
if (container.reInitContext != null
&& !container.reInitContext.canRollback()) {
container.reInitContext = null;
// Set rollback context here..
container.setIsReInitializing(false);
}
if (container.recoveredAsKilled) {
@ -1148,36 +1203,50 @@ public class ContainerImpl implements Container {
+ container.getContainerId(), e);
}
}
LOG.info("Relaunching Container " + container.getContainerId()
+ ". Remaining retry attempts(after relaunch) : "
+ container.remainingRetryAttempts
+ ". Interval between retries is "
+ container.containerRetryContext.getRetryInterval() + "ms");
container.wasLaunched = false;
container.metrics.endRunningContainer();
if (container.containerRetryContext.getRetryInterval() == 0) {
container.sendRelaunchEvent();
} else {
// wait for some time, then send launch event
new Thread() {
@Override
public void run() {
try {
Thread.sleep(
container.containerRetryContext.getRetryInterval());
container.sendRelaunchEvent();
} catch (InterruptedException e) {
return;
}
}
}.start();
}
doRelaunch(container, container.remainingRetryAttempts,
container.containerRetryContext.getRetryInterval());
return ContainerState.RELAUNCHING;
} else if (container.canRollback()) {
// Rollback is possible only if the previous launch context is
// available.
container.addDiagnostics("Container Re-init Auto Rolled-Back.");
LOG.info("Rolling back Container reInitialization for [" +
container.getContainerId() + "] !!");
container.reInitContext =
container.reInitContext.createContextForRollback();
new KilledForReInitializationTransition().transition(container, event);
return ContainerState.LOCALIZED;
} else {
new ExitedWithFailureTransition(true).transition(container, event);
return ContainerState.EXITED_WITH_FAILURE;
}
}
private void doRelaunch(final ContainerImpl container,
int remainingRetryAttempts, final int retryInterval) {
LOG.info("Relaunching Container " + container.getContainerId()
+ ". Remaining retry attempts(after relaunch) : "
+ remainingRetryAttempts + ". Interval between retries is "
+ retryInterval + "ms");
container.wasLaunched = false;
container.metrics.endRunningContainer();
if (retryInterval == 0) {
container.sendRelaunchEvent();
} else {
// wait for some time, then send launch event
new Thread() {
@Override
public void run() {
try {
Thread.sleep(retryInterval);
container.sendRelaunchEvent();
} catch (InterruptedException e) {
return;
}
}
}.start();
}
}
}
@Override
@ -1188,24 +1257,29 @@ public class ContainerImpl implements Container {
@Override
public boolean shouldRetry(int errorCode) {
return shouldRetry(errorCode, containerRetryContext,
remainingRetryAttempts);
}
public static boolean shouldRetry(int errorCode,
ContainerRetryContext retryContext, int remainingRetryAttempts) {
if (errorCode == ExitCode.SUCCESS.getExitCode()
|| errorCode == ExitCode.FORCE_KILLED.getExitCode()
|| errorCode == ExitCode.TERMINATED.getExitCode()) {
return false;
}
ContainerRetryPolicy retryPolicy = containerRetryContext.getRetryPolicy();
ContainerRetryPolicy retryPolicy = retryContext.getRetryPolicy();
if (retryPolicy == ContainerRetryPolicy.RETRY_ON_ALL_ERRORS
|| (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES
&& containerRetryContext.getErrorCodes() != null
&& containerRetryContext.getErrorCodes().contains(errorCode))) {
&& retryContext.getErrorCodes() != null
&& retryContext.getErrorCodes().contains(errorCode))) {
return remainingRetryAttempts > 0
|| remainingRetryAttempts == ContainerRetryContext.RETRY_FOREVER;
}
return false;
}
/**
* Transition to EXITED_WITH_FAILURE
*/
@ -1240,13 +1314,12 @@ public class ContainerImpl implements Container {
// Re configure the Retry Context
container.containerRetryContext =
configureRetryContext(container.context.getConf(),
container.launchContext, container.containerId);
container.launchContext, container.containerId);
// Reset the retry attempts since its a fresh start
container.remainingRetryAttempts =
container.containerRetryContext.getMaxRetries();
container.resourceSet = ResourceSet.merge(
container.resourceSet, container.reInitContext.resourceSet);
container.resourceSet = container.reInitContext.mergedResourceSet();
container.sendLaunchEvent();
}
@ -1589,4 +1662,15 @@ public class ContainerImpl implements Container {
public boolean isReInitializing() {
return this.isReInitializing;
}
@Override
public boolean canRollback() {
return (this.reInitContext != null)
&& (this.reInitContext.canRollback());
}
@Override
public void commitUpgrade() {
this.reInitContext = null;
}
}

View File

@ -30,18 +30,22 @@ public class ContainerReInitEvent extends ContainerEvent {
private final ContainerLaunchContext reInitLaunchContext;
private final ResourceSet resourceSet;
private final boolean autoCommit;
/**
* Container Re-Init Event.
* @param cID Container Id
* @param upgradeContext Upgrade context
* @param resourceSet Resource Set
* @param cID Container Id.
* @param upgradeContext Upgrade Context.
* @param resourceSet Resource Set.
* @param autoCommit Auto Commit.
*/
public ContainerReInitEvent(ContainerId cID,
ContainerLaunchContext upgradeContext, ResourceSet resourceSet){
ContainerLaunchContext upgradeContext,
ResourceSet resourceSet, boolean autoCommit){
super(cID, ContainerEventType.REINITIALIZE_CONTAINER);
this.reInitLaunchContext = upgradeContext;
this.resourceSet = resourceSet;
this.autoCommit = autoCommit;
}
/**
@ -59,4 +63,12 @@ public class ContainerReInitEvent extends ContainerEvent {
public ResourceSet getResourceSet() {
return resourceSet;
}
/**
* Should this re-Initialization be auto-committed.
* @return AutoCommit.
*/
public boolean isAutoCommit() {
return autoCommit;
}
}

View File

@ -270,15 +270,15 @@ public class TestContainerManagerWithLCE extends TestContainerManager {
}
@Override
public void testContainerUpgradeSuccess() throws IOException,
public void testContainerUpgradeSuccessAutoCommit() throws IOException,
InterruptedException, YarnException {
// Don't run the test if the binary is not available.
if (!shouldRunTest()) {
LOG.info("LCE binary path is not passed. Not running the test");
return;
}
LOG.info("Running testContainerUpgradeSuccess");
super.testContainerUpgradeSuccess();
LOG.info("Running testContainerUpgradeSuccessAutoCommit");
super.testContainerUpgradeSuccessAutoCommit();
}
@Override
@ -293,6 +293,42 @@ public class TestContainerManagerWithLCE extends TestContainerManager {
super.testContainerUpgradeLocalizationFailure();
}
@Override
public void testContainerUpgradeSuccessExplicitCommit() throws IOException,
InterruptedException, YarnException {
// Don't run the test if the binary is not available.
if (!shouldRunTest()) {
LOG.info("LCE binary path is not passed. Not running the test");
return;
}
LOG.info("Running testContainerUpgradeSuccessExplicitCommit");
super.testContainerUpgradeSuccessExplicitCommit();
}
@Override
public void testContainerUpgradeSuccessExplicitRollback() throws IOException,
InterruptedException, YarnException {
// Don't run the test if the binary is not available.
if (!shouldRunTest()) {
LOG.info("LCE binary path is not passed. Not running the test");
return;
}
LOG.info("Running testContainerUpgradeSuccessExplicitRollback");
super.testContainerUpgradeSuccessExplicitRollback();
}
@Override
public void testContainerUpgradeRollbackDueToFailure() throws IOException,
InterruptedException, YarnException {
// Don't run the test if the binary is not available.
if (!shouldRunTest()) {
LOG.info("LCE binary path is not passed. Not running the test");
return;
}
LOG.info("Running testContainerUpgradeRollbackDueToFailure");
super.testContainerUpgradeRollbackDueToFailure();
}
@Override
public void testContainerUpgradeProcessFailure() throws IOException,
InterruptedException, YarnException {

View File

@ -369,9 +369,8 @@ public class TestContainerManager extends BaseContainerManagerTest {
DefaultContainerExecutor.containerIsAlive(pid));
}
@Test
public void testContainerUpgradeSuccess() throws IOException,
InterruptedException, YarnException {
private String[] testContainerUpgradeSuccess(boolean autoCommit)
throws IOException, InterruptedException, YarnException {
containerManager.start();
// ////// Construct the Container-id
ContainerId cId = createContainerId(0);
@ -381,7 +380,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile();
prepareContainerUpgrade(false, false, cId, newStartFile);
prepareContainerUpgrade(autoCommit, false, false, cId, newStartFile);
// Assert that the First process is not alive anymore
Assert.assertFalse("Process is still alive!",
@ -407,6 +406,80 @@ public class TestContainerManager extends BaseContainerManagerTest {
// Assert that the New process is alive
Assert.assertTrue("New Process is not alive!",
DefaultContainerExecutor.containerIsAlive(newPid));
return new String[]{pid, newPid};
}
@Test
public void testContainerUpgradeSuccessAutoCommit() throws IOException,
InterruptedException, YarnException {
testContainerUpgradeSuccess(true);
// Should not be able to Commit (since already auto committed)
try {
containerManager.commitReInitialization(createContainerId(0));
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Nothing to Commit"));
}
}
@Test
public void testContainerUpgradeSuccessExplicitCommit() throws IOException,
InterruptedException, YarnException {
testContainerUpgradeSuccess(false);
ContainerId cId = createContainerId(0);
containerManager.commitReInitialization(cId);
// Should not be able to Rollback once committed
try {
containerManager.rollbackReInitialization(cId);
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Nothing to rollback to"));
}
}
@Test
public void testContainerUpgradeSuccessExplicitRollback() throws IOException,
InterruptedException, YarnException {
String[] pids = testContainerUpgradeSuccess(false);
// Delete the old start File..
File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
oldStartFile.delete();
ContainerId cId = createContainerId(0);
// Explicit Rollback
containerManager.rollbackReInitialization(cId);
// Original should be dead anyway
Assert.assertFalse("Original Process is still alive!",
DefaultContainerExecutor.containerIsAlive(pids[0]));
// Wait for upgraded process to die
int timeoutSecs = 0;
while (!DefaultContainerExecutor.containerIsAlive(pids[1])
&& timeoutSecs++ < 20) {
Thread.sleep(1000);
LOG.info("Waiting for Upgraded process to die..");
}
timeoutSecs = 0;
// Wait for new processStartfile to be created
while (!oldStartFile.exists() && timeoutSecs++ < 20) {
Thread.sleep(1000);
LOG.info("Waiting for New process start-file to be created");
}
// Now verify the contents of the file
BufferedReader reader =
new BufferedReader(new FileReader(oldStartFile));
Assert.assertEquals("Hello World!", reader.readLine());
// Get the pid of the process
String rolledBackPid = reader.readLine().trim();
// No more lines
Assert.assertEquals(null, reader.readLine());
Assert.assertNotEquals("The Rolled-back process should be a different pid",
pids[0], rolledBackPid);
}
@Test
@ -424,7 +497,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile();
prepareContainerUpgrade(true, true, cId, newStartFile);
prepareContainerUpgrade(false, true, true, cId, newStartFile);
// Assert that the First process is STILL alive
// since upgrade was terminated..
@ -447,22 +520,69 @@ public class TestContainerManager extends BaseContainerManagerTest {
File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile();
prepareContainerUpgrade(true, false, cId, newStartFile);
// Since Autocommit is true, there is also no rollback context...
// which implies that if the new process fails, since there is no
// rollback, it is terminated.
prepareContainerUpgrade(true, true, false, cId, newStartFile);
// Assert that the First process is not alive anymore
Assert.assertFalse("Process is still alive!",
DefaultContainerExecutor.containerIsAlive(pid));
}
@Test
public void testContainerUpgradeRollbackDueToFailure() throws IOException,
InterruptedException, YarnException {
if (Shell.WINDOWS) {
return;
}
containerManager.start();
// ////// Construct the Container-id
ContainerId cId = createContainerId(0);
File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
String pid = prepareInitialContainer(cId, oldStartFile);
File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile();
prepareContainerUpgrade(false, true, false, cId, newStartFile);
// Assert that the First process is not alive anymore
Assert.assertFalse("Original Process is still alive!",
DefaultContainerExecutor.containerIsAlive(pid));
int timeoutSecs = 0;
// Wait for oldStartFile to be created
while (!oldStartFile.exists() && timeoutSecs++ < 20) {
System.out.println("\nFiles: " +
Arrays.toString(oldStartFile.getParentFile().list()));
Thread.sleep(1000);
LOG.info("Waiting for New process start-file to be created");
}
// Now verify the contents of the file
BufferedReader reader =
new BufferedReader(new FileReader(oldStartFile));
Assert.assertEquals("Hello World!", reader.readLine());
// Get the pid of the process
String rolledBackPid = reader.readLine().trim();
// No more lines
Assert.assertEquals(null, reader.readLine());
Assert.assertNotEquals("The Rolled-back process should be a different pid",
pid, rolledBackPid);
}
/**
* Prepare a launch Context for container upgrade and request the
* Container Manager to re-initialize a running container using the
* new launch context.
* @param autoCommit Enable autoCommit.
* @param failCmd injects a start script that intentionally fails.
* @param failLoc injects a bad file Location that will fail localization.
*/
private void prepareContainerUpgrade(boolean failCmd, boolean failLoc,
ContainerId cId, File startFile)
private void prepareContainerUpgrade(boolean autoCommit, boolean failCmd,
boolean failLoc, ContainerId cId, File startFile)
throws FileNotFoundException, YarnException, InterruptedException {
// Re-write scriptfile and processStartFile
File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile_new");
@ -471,13 +591,15 @@ public class TestContainerManager extends BaseContainerManagerTest {
writeScriptFile(fileWriter, "Upgrade World!", startFile, cId, failCmd);
ContainerLaunchContext containerLaunchContext =
prepareContainerLaunchContext(scriptFile, "dest_file_new", failLoc);
prepareContainerLaunchContext(scriptFile, "dest_file_new", failLoc, 0);
containerManager.upgradeContainer(cId, containerLaunchContext);
containerManager.reInitializeContainer(cId, containerLaunchContext,
autoCommit);
try {
containerManager.upgradeContainer(cId, containerLaunchContext);
containerManager.reInitializeContainer(cId, containerLaunchContext,
autoCommit);
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Cannot perform UPGRADE"));
Assert.assertTrue(e.getMessage().contains("Cannot perform RE_INIT"));
}
int timeoutSecs = 0;
int maxTimeToWait = failLoc ? 10 : 20;
@ -501,7 +623,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
writeScriptFile(fileWriterOld, "Hello World!", startFile, cId, false);
ContainerLaunchContext containerLaunchContext =
prepareContainerLaunchContext(scriptFileOld, "dest_file", false);
prepareContainerLaunchContext(scriptFileOld, "dest_file", false, 4);
StartContainerRequest scRequest =
StartContainerRequest.newInstance(containerLaunchContext,
@ -562,7 +684,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
}
private ContainerLaunchContext prepareContainerLaunchContext(File scriptFile,
String destFName, boolean putBadFile) {
String destFName, boolean putBadFile, int numRetries) {
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
URL resourceAlpha = null;
@ -592,7 +714,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
ContainerRetryContext containerRetryContext = ContainerRetryContext
.newInstance(
ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES,
new HashSet<>(Arrays.asList(Integer.valueOf(111))), 4, 0);
new HashSet<>(Arrays.asList(Integer.valueOf(111))), numRetries, 0);
containerLaunchContext.setContainerRetryContext(containerRetryContext);
List<String> commands = Arrays.asList(
Shell.getRunScriptCommand(scriptFile));

View File

@ -205,4 +205,14 @@ public class MockContainer implements Container {
public boolean isReInitializing() {
return false;
}
@Override
public boolean canRollback() {
return false;
}
@Override
public void commitUpgrade() {
}
}