HDFS-9276. Failed to Update HDFS Delegation Token for long running application in HA mode. Contributed by Liangliang Gu and John Zhuge

(cherry picked from commit d9aae22fdf)
This commit is contained in:
Xiao Chen 2016-07-28 16:29:22 -07:00
parent f03ad5677a
commit 24d464a150
3 changed files with 83 additions and 8 deletions

View File

@ -91,10 +91,24 @@ public class Credentials implements Writable {
* @param t the token object * @param t the token object
*/ */
public void addToken(Text alias, Token<? extends TokenIdentifier> t) { public void addToken(Text alias, Token<? extends TokenIdentifier> t) {
if (t != null) { if (t == null) {
tokenMap.put(alias, t);
} else {
LOG.warn("Null token ignored for " + alias); LOG.warn("Null token ignored for " + alias);
} else if (tokenMap.put(alias, t) != null) {
// Update private tokens
Map<Text, Token<? extends TokenIdentifier>> tokensToAdd =
new HashMap<>();
for (Map.Entry<Text, Token<? extends TokenIdentifier>> e :
tokenMap.entrySet()) {
Token<? extends TokenIdentifier> token = e.getValue();
if (token instanceof Token.PrivateToken &&
((Token.PrivateToken) token).getPublicService().equals(alias)) {
Token<? extends TokenIdentifier> privateToken =
new Token.PrivateToken<>(t);
privateToken.setService(token.getService());
tokensToAdd.put(e.getKey(), privateToken);
}
}
tokenMap.putAll(tokensToAdd);
} }
} }
@ -319,7 +333,7 @@ public class Credentials implements Writable {
for(Map.Entry<Text, Token<?>> token: other.tokenMap.entrySet()){ for(Map.Entry<Text, Token<?>> token: other.tokenMap.entrySet()){
Text key = token.getKey(); Text key = token.getKey();
if (!tokenMap.containsKey(key) || overwrite) { if (!tokenMap.containsKey(key) || overwrite) {
tokenMap.put(key, token.getValue()); addToken(key, token.getValue());
} }
} }
} }

View File

@ -94,10 +94,10 @@ public class Token<T extends TokenIdentifier> implements Writable {
* @param other the token to clone * @param other the token to clone
*/ */
public Token(Token<T> other) { public Token(Token<T> other) {
this.identifier = other.identifier; this.identifier = other.identifier.clone();
this.password = other.password; this.password = other.password.clone();
this.kind = other.kind; this.kind = new Text(other.kind);
this.service = other.service; this.service = new Text(other.service);
} }
/** /**
@ -198,8 +198,37 @@ public class Token<T extends TokenIdentifier> implements Writable {
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public static class PrivateToken<T extends TokenIdentifier> extends Token<T> { public static class PrivateToken<T extends TokenIdentifier> extends Token<T> {
final private Text publicService;
public PrivateToken(Token<T> token) { public PrivateToken(Token<T> token) {
super(token); super(token);
publicService = new Text(token.getService());
}
public Text getPublicService() {
return publicService;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
PrivateToken<?> that = (PrivateToken<?>) o;
return publicService.equals(that.publicService);
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + publicService.hashCode();
return result;
} }
} }

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem; import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.*; import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -367,6 +368,37 @@ public class TestDelegationTokensWithHA {
token.cancel(conf); token.cancel(conf);
} }
@Test(timeout = 300000)
public void testCancelAndUpdateDelegationTokens() throws Exception {
// Create UGI with token1
String user = UserGroupInformation.getCurrentUser().getShortUserName();
UserGroupInformation ugi1 = UserGroupInformation.createRemoteUser(user);
ugi1.doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
final Token<DelegationTokenIdentifier> token1 =
getDelegationToken(fs, "JobTracker");
UserGroupInformation.getCurrentUser()
.addToken(token1.getService(), token1);
FileSystem fs1 = HATestUtil.configureFailoverFs(cluster, conf);
// Cancel token1
doRenewOrCancel(token1, conf, TokenTestAction.CANCEL);
// Update UGI with token2
final Token<DelegationTokenIdentifier> token2 =
getDelegationToken(fs, "JobTracker");
UserGroupInformation.getCurrentUser()
.addToken(token2.getService(), token2);
// Check whether token2 works
fs1.listFiles(new Path("/"), false);
return null;
}
});
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private Token<DelegationTokenIdentifier> getDelegationToken(FileSystem fs, private Token<DelegationTokenIdentifier> getDelegationToken(FileSystem fs,
String renewer) throws IOException { String renewer) throws IOException {