HDFS-6666. Abort NameNode and DataNode startup if security is enabled but block access token is not enabled. Contributed by Vijay Bhat.

This commit is contained in:
cnauroth 2015-04-14 09:59:01 -07:00
parent b5a0b24643
commit d45aa7647b
6 changed files with 115 additions and 50 deletions

View File

@ -494,6 +494,9 @@ Release 2.8.0 - UNRELEASED
HDFS-7701. Support reporting per storage type quota and usage HDFS-7701. Support reporting per storage type quota and usage
with hadoop/hdfs shell. (Peter Shi via Arpit Agarwal) with hadoop/hdfs shell. (Peter Shi via Arpit Agarwal)
HDFS-6666. Abort NameNode and DataNode startup if security is enabled but
block access token is not enabled. (Vijay Bhat via cnauroth)
Release 2.7.1 - UNRELEASED Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -359,7 +359,7 @@ public class BlockManager {
} }
private static BlockTokenSecretManager createBlockTokenSecretManager( private static BlockTokenSecretManager createBlockTokenSecretManager(
final Configuration conf) { final Configuration conf) throws IOException {
final boolean isEnabled = conf.getBoolean( final boolean isEnabled = conf.getBoolean(
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY,
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT); DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT);
@ -367,10 +367,11 @@ public class BlockManager {
if (!isEnabled) { if (!isEnabled) {
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
LOG.error("Security is enabled but block access tokens " + String errMessage = "Security is enabled but block access tokens " +
"(via " + DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY + ") " + "(via " + DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY + ") " +
"aren't enabled. This may cause issues " + "aren't enabled. This may cause issues " +
"when clients attempt to talk to a DataNode."); "when clients attempt to connect to a DataNode. Aborting NameNode";
throw new IOException(errMessage);
} }
return null; return null;
} }

View File

@ -1183,6 +1183,20 @@ public class DataNode extends ReconfigurableBase
if (!UserGroupInformation.isSecurityEnabled()) { if (!UserGroupInformation.isSecurityEnabled()) {
return; return;
} }
// Abort out of inconsistent state if Kerberos is enabled
// but block access tokens are not enabled.
boolean isEnabled = conf.getBoolean(
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY,
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT);
if (!isEnabled) {
String errMessage = "Security is enabled but block access tokens " +
"(via " + DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY + ") " +
"aren't enabled. This may cause issues " +
"when clients attempt to connect to a DataNode. Aborting DataNode";
throw new RuntimeException(errMessage);
}
SaslPropertiesResolver saslPropsResolver = dnConf.getSaslPropsResolver(); SaslPropertiesResolver saslPropsResolver = dnConf.getSaslPropsResolver();
if (resources != null && saslPropsResolver == null) { if (resources != null && saslPropsResolver == null) {
return; return;

View File

@ -1242,10 +1242,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
cacheManager.stopMonitorThread(); cacheManager.stopMonitorThread();
cacheManager.clearDirectiveStats(); cacheManager.clearDirectiveStats();
} }
blockManager.getDatanodeManager().clearPendingCachingCommands(); if (blockManager != null) {
blockManager.getDatanodeManager().setShouldSendCachingCommands(false); blockManager.getDatanodeManager().clearPendingCachingCommands();
// Don't want to keep replication queues when not in Active. blockManager.getDatanodeManager().setShouldSendCachingCommands(false);
blockManager.clearQueues(); // Don't want to keep replication queues when not in Active.
blockManager.clearQueues();
}
initializedReplQueues = false; initializedReplQueues = false;
} finally { } finally {
writeUnlock(); writeUnlock();

View File

@ -33,6 +33,7 @@ import static org.junit.Assert.*;
import java.io.File; import java.io.File;
import java.util.Properties; import java.util.Properties;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.http.HttpConfig;
@ -48,10 +49,28 @@ public abstract class SaslDataTransferTestCase {
private static File baseDir; private static File baseDir;
private static String hdfsPrincipal; private static String hdfsPrincipal;
private static String userPrincipal;
private static MiniKdc kdc; private static MiniKdc kdc;
private static String keytab; private static String hdfsKeytab;
private static String userKeyTab;
private static String spnegoPrincipal; private static String spnegoPrincipal;
public static String getUserKeyTab() {
return userKeyTab;
}
public static String getUserPrincipal() {
return userPrincipal;
}
public static String getHdfsPrincipal() {
return hdfsPrincipal;
}
public static String getHdfsKeytab() {
return hdfsKeytab;
}
@BeforeClass @BeforeClass
public static void initKdc() throws Exception { public static void initKdc() throws Exception {
baseDir = new File(System.getProperty("test.build.dir", "target/test-dir"), baseDir = new File(System.getProperty("test.build.dir", "target/test-dir"),
@ -63,11 +82,17 @@ public abstract class SaslDataTransferTestCase {
kdc = new MiniKdc(kdcConf, baseDir); kdc = new MiniKdc(kdcConf, baseDir);
kdc.start(); kdc.start();
String userName = UserGroupInformation.getLoginUser().getShortUserName(); String userName = RandomStringUtils.randomAlphabetic(8);
File keytabFile = new File(baseDir, userName + ".keytab"); File userKeytabFile = new File(baseDir, userName + ".keytab");
keytab = keytabFile.getAbsolutePath(); userKeyTab = userKeytabFile.getAbsolutePath();
kdc.createPrincipal(keytabFile, userName + "/localhost", "HTTP/localhost"); kdc.createPrincipal(userKeytabFile, userName + "/localhost");
hdfsPrincipal = userName + "/localhost@" + kdc.getRealm(); userPrincipal = userName + "/localhost@" + kdc.getRealm();
String superUserName = "hdfs";
File hdfsKeytabFile = new File(baseDir, superUserName + ".keytab");
hdfsKeytab = hdfsKeytabFile.getAbsolutePath();
kdc.createPrincipal(hdfsKeytabFile, superUserName + "/localhost", "HTTP/localhost");
hdfsPrincipal = superUserName + "/localhost@" + kdc.getRealm();
spnegoPrincipal = "HTTP/localhost@" + kdc.getRealm(); spnegoPrincipal = "HTTP/localhost@" + kdc.getRealm();
} }
@ -91,9 +116,9 @@ public abstract class SaslDataTransferTestCase {
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration conf = new HdfsConfiguration();
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf); SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal); conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, keytab); conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, hdfsKeytab);
conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal); conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, keytab); conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, hdfsKeytab);
conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal); conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal);
conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, dataTransferProtection); conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, dataTransferProtection);

View File

@ -24,71 +24,63 @@ import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.TestUGIWithSecurityOn; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.junit.Assume; import org.junit.Rule;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;
public class TestSecureNameNode {
public class TestSecureNameNode extends SaslDataTransferTestCase {
final static private int NUM_OF_DATANODES = 0; final static private int NUM_OF_DATANODES = 0;
@Before @Rule
public void testKdcRunning() { public ExpectedException exception = ExpectedException.none();
// Tests are skipped if KDC is not running
Assume.assumeTrue(TestUGIWithSecurityOn.isKdcRunning());
}
@Test @Test
public void testName() throws IOException, InterruptedException { public void testName() throws Exception {
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
HdfsConfiguration conf = createSecureConfig(
"authentication,privacy");
try { try {
String keyTabDir = System.getProperty("kdc.resource.dir") + "/keytabs";
String nn1KeytabPath = keyTabDir + "/nn1.keytab";
String user1KeyTabPath = keyTabDir + "/user1.keytab";
Configuration conf = new HdfsConfiguration();
conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
conf.set(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY,
"nn1/localhost@EXAMPLE.COM");
conf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, nn1KeytabPath);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_OF_DATANODES) cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_OF_DATANODES)
.build(); .build();
final MiniDFSCluster clusterRef = cluster; final MiniDFSCluster clusterRef = cluster;
cluster.waitActive(); cluster.waitActive();
FileSystem fsForCurrentUser = cluster.getFileSystem(); FileSystem fsForSuperUser = UserGroupInformation
fsForCurrentUser.mkdirs(new Path("/tmp")); .loginUserFromKeytabAndReturnUGI(getHdfsPrincipal(), getHdfsKeytab()).doAs(new PrivilegedExceptionAction<FileSystem>() {
fsForCurrentUser.setPermission(new Path("/tmp"), new FsPermission( @Override
public FileSystem run() throws Exception {
return clusterRef.getFileSystem();
}
});
fsForSuperUser.mkdirs(new Path("/tmp"));
fsForSuperUser.setPermission(new Path("/tmp"), new FsPermission(
(short) 511)); (short) 511));
UserGroupInformation ugi = UserGroupInformation UserGroupInformation ugi = UserGroupInformation
.loginUserFromKeytabAndReturnUGI("user1@EXAMPLE.COM", user1KeyTabPath); .loginUserFromKeytabAndReturnUGI(getUserPrincipal(), getUserKeyTab());
FileSystem fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { FileSystem fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override @Override
public FileSystem run() throws Exception { public FileSystem run() throws Exception {
return clusterRef.getFileSystem(); return clusterRef.getFileSystem();
} }
}); });
try { Path p = new Path("/mydir");
Path p = new Path("/users"); exception.expect(IOException.class);
fs.mkdirs(p);
fail("user1 must not be allowed to write in /");
} catch (IOException expected) {
}
Path p = new Path("/tmp/alpha");
fs.mkdirs(p); fs.mkdirs(p);
assertNotNull(fs.listStatus(p));
Path tmp = new Path("/tmp/alpha");
fs.mkdirs(tmp);
assertNotNull(fs.listStatus(tmp));
assertEquals(AuthenticationMethod.KERBEROS, assertEquals(AuthenticationMethod.KERBEROS,
ugi.getAuthenticationMethod()); ugi.getAuthenticationMethod());
} finally { } finally {
@ -97,4 +89,32 @@ public class TestSecureNameNode {
} }
} }
} }
/**
* Verify the following scenario.
* 1. Kerberos is enabled.
* 2. HDFS block tokens are not enabled.
* 3. Start the NN.
* 4. NN should throw an IOException and abort
* @throws Exception
*/
@Test
public void testKerberosHdfsBlockTokenInconsistencyNNStartup() throws Exception {
MiniDFSCluster dfsCluster = null;
HdfsConfiguration conf = createSecureConfig(
"authentication,privacy");
try {
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, false);
exception.expect(IOException.class);
exception.expectMessage("Security is enabled but block access tokens");
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
dfsCluster.waitActive();
} finally {
if (dfsCluster != null) {
dfsCluster.shutdown();
}
}
return;
}
} }