HADOOP-11113. Namenode not able to reconnect to KMS after KMS restart. (Arun Suresh via wang)

(cherry picked from commit e25a25c5343c889d8c9e45b65082ddb55cf36d52)
This commit is contained in:
Andrew Wang 2014-09-30 16:46:58 -07:00
parent 823f02725f
commit 489b4008df
4 changed files with 132 additions and 18 deletions

View File

@ -442,6 +442,9 @@ Release 2.6.0 - UNRELEASED
HADOOP-11130. NFS updateMaps OS check is reversed (brandonli)
HADOOP-11113. Namenode not able to reconnect to KMS after KMS restart.
(Arun Suresh via wang)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HADOOP-10734. Implement high-performance secure random number sources.

View File

@ -415,7 +415,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
return conn;
}
private static <T> T call(HttpURLConnection conn, Map jsonOutput,
private <T> T call(HttpURLConnection conn, Map jsonOutput,
int expectedResponse, Class<T> klass)
throws IOException {
T ret = null;
@ -427,6 +427,14 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
conn.getInputStream().close();
throw ex;
}
if (conn.getResponseCode() == HttpURLConnection.HTTP_FORBIDDEN) {
// Ideally, this should happen only when there is an Authentication
// failure. Unfortunately, the AuthenticationFilter returns 403 when it
// cannot authenticate (Since a 401 requires Server to send
// WWW-Authenticate header as well)..
KMSClientProvider.this.authToken =
new DelegationTokenAuthenticatedURL.Token();
}
HttpExceptionUtils.validateResponse(conn, expectedResponse);
if (APPLICATION_JSON_MIME.equalsIgnoreCase(conn.getContentType())
&& klass != null) {

View File

@ -43,12 +43,12 @@ import java.util.UUID;
public class MiniKMS {
private static Server createJettyServer(String keyStore, String password) {
private static Server createJettyServer(String keyStore, String password, int inPort) {
try {
boolean ssl = keyStore != null;
InetAddress localhost = InetAddress.getByName("localhost");
String host = "localhost";
ServerSocket ss = new ServerSocket(0, 50, localhost);
ServerSocket ss = new ServerSocket((inPort < 0) ? 0 : inPort, 50, localhost);
int port = ss.getLocalPort();
ss.close();
Server server = new Server(0);
@ -91,6 +91,7 @@ public class MiniKMS {
private String log4jConfFile;
private File keyStoreFile;
private String keyStorePassword;
private int inPort = -1;
public Builder() {
kmsConfDir = new File("target/test-classes").getAbsoluteFile();
@ -111,6 +112,12 @@ public class MiniKMS {
return this;
}
public Builder setPort(int port) {
Preconditions.checkArgument(port > 0, "input port must be greater than 0");
this.inPort = port;
return this;
}
public Builder setSslConf(File keyStoreFile, String keyStorePassword) {
Preconditions.checkNotNull(keyStoreFile, "keystore file is NULL");
Preconditions.checkNotNull(keyStorePassword, "keystore password is NULL");
@ -126,7 +133,7 @@ public class MiniKMS {
"KMS conf dir does not exist");
return new MiniKMS(kmsConfDir.getAbsolutePath(), log4jConfFile,
(keyStoreFile != null) ? keyStoreFile.getAbsolutePath() : null,
keyStorePassword);
keyStorePassword, inPort);
}
}
@ -135,14 +142,16 @@ public class MiniKMS {
private String keyStore;
private String keyStorePassword;
private Server jetty;
private int inPort;
private URL kmsURL;
public MiniKMS(String kmsConfDir, String log4ConfFile, String keyStore,
String password) {
String password, int inPort) {
this.kmsConfDir = kmsConfDir;
this.log4jConfFile = log4ConfFile;
this.keyStore = keyStore;
this.keyStorePassword = password;
this.inPort = inPort;
}
public void start() throws Exception {
@ -174,7 +183,7 @@ public class MiniKMS {
writer.close();
}
System.setProperty("log4j.configuration", log4jConfFile);
jetty = createJettyServer(keyStore, keyStorePassword);
jetty = createJettyServer(keyStore, keyStorePassword, inPort);
// we need to do a special handling for MiniKMS to work when in a dir and
// when in a JAR in the classpath thanks to Jetty way of handling of webapps

View File

@ -89,7 +89,7 @@ public class TestKMS {
return file;
}
public static abstract class KMSCallable implements Callable<Void> {
public static abstract class KMSCallable<T> implements Callable<T> {
private URL kmsUrl;
protected URL getKMSUrl() {
@ -97,19 +97,27 @@ public class TestKMS {
}
}
protected void runServer(String keystore, String password, File confDir,
KMSCallable callable) throws Exception {
protected <T> T runServer(String keystore, String password, File confDir,
KMSCallable<T> callable) throws Exception {
return runServer(-1, keystore, password, confDir, callable);
}
protected <T> T runServer(int port, String keystore, String password, File confDir,
KMSCallable<T> callable) throws Exception {
MiniKMS.Builder miniKMSBuilder = new MiniKMS.Builder().setKmsConfDir(confDir)
.setLog4jConfFile("log4j.properties");
if (keystore != null) {
miniKMSBuilder.setSslConf(new File(keystore), password);
}
if (port > 0) {
miniKMSBuilder.setPort(port);
}
MiniKMS miniKMS = miniKMSBuilder.build();
miniKMS.start();
try {
System.out.println("Test KMS running at: " + miniKMS.getKMSUrl());
callable.kmsUrl = miniKMS.getKMSUrl();
callable.call();
return callable.call();
} finally {
miniKMS.stop();
}
@ -284,7 +292,7 @@ public class TestKMS {
writeConf(testDir, conf);
runServer(keystore, password, testDir, new KMSCallable() {
runServer(keystore, password, testDir, new KMSCallable<Void>() {
@Override
public Void call() throws Exception {
final Configuration conf = new Configuration();
@ -351,7 +359,7 @@ public class TestKMS {
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k6.ALL", "*");
writeConf(confDir, conf);
runServer(null, null, confDir, new KMSCallable() {
runServer(null, null, confDir, new KMSCallable<Void>() {
@Override
public Void call() throws Exception {
Date started = new Date();
@ -616,7 +624,7 @@ public class TestKMS {
writeConf(testDir, conf);
runServer(null, null, testDir, new KMSCallable() {
runServer(null, null, testDir, new KMSCallable<Void>() {
@Override
public Void call() throws Exception {
@ -782,6 +790,92 @@ public class TestKMS {
});
}
@Test
public void testKMSRestart() throws Exception {
Configuration conf = new Configuration();
conf.set("hadoop.security.authentication", "kerberos");
UserGroupInformation.setConfiguration(conf);
final File testDir = getTestDir();
conf = createBaseKMSConf(testDir);
conf.set("hadoop.kms.authentication.kerberos.keytab",
keytab.getAbsolutePath());
conf.set("hadoop.kms.authentication.kerberos.principal", "HTTP/localhost");
conf.set("hadoop.kms.authentication.kerberos.name.rules", "DEFAULT");
for (KMSACLs.Type type : KMSACLs.Type.values()) {
conf.set(type.getAclConfigKey(), type.toString());
}
conf.set(KMSACLs.Type.CREATE.getAclConfigKey(),
KMSACLs.Type.CREATE.toString() + ",SET_KEY_MATERIAL");
conf.set(KMSACLs.Type.ROLLOVER.getAclConfigKey(),
KMSACLs.Type.ROLLOVER.toString() + ",SET_KEY_MATERIAL");
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k0.ALL", "*");
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k1.ALL", "*");
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k2.ALL", "*");
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k3.ALL", "*");
writeConf(testDir, conf);
KMSCallable<KeyProvider> c =
new KMSCallable<KeyProvider>() {
@Override
public KeyProvider call() throws Exception {
final Configuration conf = new Configuration();
conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
final URI uri = createKMSUri(getKMSUrl());
final KeyProvider kp =
doAs("SET_KEY_MATERIAL",
new PrivilegedExceptionAction<KeyProvider>() {
@Override
public KeyProvider run() throws Exception {
KMSClientProvider kp = new KMSClientProvider(uri, conf);
kp.createKey("k1", new byte[16],
new KeyProvider.Options(conf));
return kp;
}
});
return kp;
}
};
final KeyProvider retKp =
runServer(null, null, testDir, c);
// Restart server (using the same port)
runServer(c.getKMSUrl().getPort(), null, null, testDir,
new KMSCallable<Void>() {
@Override
public Void call() throws Exception {
final Configuration conf = new Configuration();
conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
doAs("SET_KEY_MATERIAL",
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
try {
retKp.createKey("k2", new byte[16],
new KeyProvider.Options(conf));
Assert.fail("Should fail first time !!");
} catch (IOException e) {
String message = e.getMessage();
Assert.assertTrue("Should be a 403 error : " + message,
message.contains("403"));
}
retKp.createKey("k2", new byte[16],
new KeyProvider.Options(conf));
retKp.createKey("k3", new byte[16],
new KeyProvider.Options(conf));
return null;
}
});
return null;
}
});
}
@Test
public void testACLs() throws Exception {
Configuration conf = new Configuration();
@ -809,7 +903,7 @@ public class TestKMS {
writeConf(testDir, conf);
runServer(null, null, testDir, new KMSCallable() {
runServer(null, null, testDir, new KMSCallable<Void>() {
@Override
public Void call() throws Exception {
final Configuration conf = new Configuration();
@ -1117,7 +1211,7 @@ public class TestKMS {
writeConf(testDir, conf);
runServer(null, null, testDir, new KMSCallable() {
runServer(null, null, testDir, new KMSCallable<Void>() {
@Override
public Void call() throws Exception {
final Configuration conf = new Configuration();
@ -1201,7 +1295,7 @@ public class TestKMS {
writeConf(testDir, conf);
runServer(null, null, testDir, new KMSCallable() {
runServer(null, null, testDir, new KMSCallable<Void>() {
@Override
public Void call() throws Exception {
final Configuration conf = new Configuration();
@ -1326,7 +1420,7 @@ public class TestKMS {
writeConf(testDir, conf);
runServer(null, null, testDir, new KMSCallable() {
runServer(null, null, testDir, new KMSCallable<Void>() {
@Override
public Void call() throws Exception {
final Configuration conf = new Configuration();
@ -1398,7 +1492,7 @@ public class TestKMS {
writeConf(testDir, conf);
runServer(null, null, testDir, new KMSCallable() {
runServer(null, null, testDir, new KMSCallable<Void>() {
@Override
public Void call() throws Exception {
final Configuration conf = new Configuration();