HDFS-9276. Failed to Update HDFS Delegation Token for long running application in HA mode. Contributed by Liangliang Gu and John Zhuge
(cherry picked from commit d9aae22fdf2ab22ae8ce4a9d32ac71b3dde084d3) (cherry picked from commit 24d464a15015a7bf6d02d259568fedab1cb97b84)
This commit is contained in:
parent
e216c15625
commit
7ad5b278a7
@ -91,10 +91,24 @@ public Token<? extends TokenIdentifier> getToken(Text alias) {
|
||||
* @param t the token object
|
||||
*/
|
||||
public void addToken(Text alias, Token<? extends TokenIdentifier> t) {
|
||||
if (t != null) {
|
||||
tokenMap.put(alias, t);
|
||||
} else {
|
||||
if (t == null) {
|
||||
LOG.warn("Null token ignored for " + alias);
|
||||
} else if (tokenMap.put(alias, t) != null) {
|
||||
// Update private tokens
|
||||
Map<Text, Token<? extends TokenIdentifier>> tokensToAdd =
|
||||
new HashMap<>();
|
||||
for (Map.Entry<Text, Token<? extends TokenIdentifier>> e :
|
||||
tokenMap.entrySet()) {
|
||||
Token<? extends TokenIdentifier> token = e.getValue();
|
||||
if (token instanceof Token.PrivateToken &&
|
||||
((Token.PrivateToken) token).getPublicService().equals(alias)) {
|
||||
Token<? extends TokenIdentifier> privateToken =
|
||||
new Token.PrivateToken<>(t);
|
||||
privateToken.setService(token.getService());
|
||||
tokensToAdd.put(e.getKey(), privateToken);
|
||||
}
|
||||
}
|
||||
tokenMap.putAll(tokensToAdd);
|
||||
}
|
||||
}
|
||||
|
||||
@ -319,7 +333,7 @@ private void addAll(Credentials other, boolean overwrite) {
|
||||
for(Map.Entry<Text, Token<?>> token: other.tokenMap.entrySet()){
|
||||
Text key = token.getKey();
|
||||
if (!tokenMap.containsKey(key) || overwrite) {
|
||||
tokenMap.put(key, token.getValue());
|
||||
addToken(key, token.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -94,10 +94,10 @@ public Token() {
|
||||
* @param other the token to clone
|
||||
*/
|
||||
public Token(Token<T> other) {
|
||||
this.identifier = other.identifier;
|
||||
this.password = other.password;
|
||||
this.kind = other.kind;
|
||||
this.service = other.service;
|
||||
this.identifier = other.identifier.clone();
|
||||
this.password = other.password.clone();
|
||||
this.kind = new Text(other.kind);
|
||||
this.service = new Text(other.service);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -198,8 +198,37 @@ public void setService(Text newService) {
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public static class PrivateToken<T extends TokenIdentifier> extends Token<T> {
|
||||
final private Text publicService;
|
||||
|
||||
public PrivateToken(Token<T> token) {
|
||||
super(token);
|
||||
publicService = new Text(token.getService());
|
||||
}
|
||||
|
||||
public Text getPublicService() {
|
||||
return publicService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
if (!super.equals(o)) {
|
||||
return false;
|
||||
}
|
||||
PrivateToken<?> that = (PrivateToken<?>) o;
|
||||
return publicService.equals(that.publicService);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = super.hashCode();
|
||||
result = 31 * result + publicService.hashCode();
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -24,6 +24,7 @@
|
||||
import org.apache.hadoop.fs.AbstractFileSystem;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.hdfs.*;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
@ -367,6 +368,37 @@ public void testHdfsGetCanonicalServiceName() throws Exception {
|
||||
token.cancel(conf);
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testCancelAndUpdateDelegationTokens() throws Exception {
|
||||
// Create UGI with token1
|
||||
String user = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||
UserGroupInformation ugi1 = UserGroupInformation.createRemoteUser(user);
|
||||
|
||||
ugi1.doAs(new PrivilegedExceptionAction<Void>() {
|
||||
public Void run() throws Exception {
|
||||
final Token<DelegationTokenIdentifier> token1 =
|
||||
getDelegationToken(fs, "JobTracker");
|
||||
UserGroupInformation.getCurrentUser()
|
||||
.addToken(token1.getService(), token1);
|
||||
|
||||
FileSystem fs1 = HATestUtil.configureFailoverFs(cluster, conf);
|
||||
|
||||
// Cancel token1
|
||||
doRenewOrCancel(token1, conf, TokenTestAction.CANCEL);
|
||||
|
||||
// Update UGI with token2
|
||||
final Token<DelegationTokenIdentifier> token2 =
|
||||
getDelegationToken(fs, "JobTracker");
|
||||
UserGroupInformation.getCurrentUser()
|
||||
.addToken(token2.getService(), token2);
|
||||
|
||||
// Check whether token2 works
|
||||
fs1.listFiles(new Path("/"), false);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Token<DelegationTokenIdentifier> getDelegationToken(FileSystem fs,
|
||||
String renewer) throws IOException {
|
||||
|
Loading…
x
Reference in New Issue
Block a user