HDFS-15545 - Allow WebHdfsFileSystem to read a new delegation token f… (#2255)
Co-authored-by: Issac Buenrostro <ibuenros@linkedin.com>
This commit is contained in:
parent
5ce18101cb
commit
f5e6be337b
|
@ -368,8 +368,8 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
Token<?> token = tokenSelector.selectToken(
|
||||
new Text(getCanonicalServiceName()), ugi.getTokens());
|
||||
// ugi tokens are usually indicative of a task which can't
|
||||
// refetch tokens. even if ugi has credentials, don't attempt
|
||||
// to get another token to match hdfs/rpc behavior
|
||||
// refetch tokens. Don't attempt to fetch tokens from the
|
||||
// namenode in this situation.
|
||||
if (token != null) {
|
||||
LOG.debug("Using UGI token: {}", token);
|
||||
canRefreshDelegationToken = false;
|
||||
|
@ -392,6 +392,9 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
@VisibleForTesting
|
||||
synchronized boolean replaceExpiredDelegationToken() throws IOException {
|
||||
boolean replaced = false;
|
||||
if (attemptReplaceDelegationTokenFromUGI()) {
|
||||
return true;
|
||||
}
|
||||
if (canRefreshDelegationToken) {
|
||||
Token<?> token = getDelegationToken(null);
|
||||
LOG.debug("Replaced expired token: {}", token);
|
||||
|
@ -401,6 +404,17 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
return replaced;
|
||||
}
|
||||
|
||||
private synchronized boolean attemptReplaceDelegationTokenFromUGI() {
|
||||
Token<?> token = tokenSelector.selectToken(
|
||||
new Text(getCanonicalServiceName()), ugi.getTokens());
|
||||
if (token != null && !token.equals(delegationToken)) {
|
||||
LOG.debug("Replaced expired token with new UGI token: {}", token);
|
||||
setDelegationToken(token);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getDefaultPort() {
|
||||
return HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT;
|
||||
|
|
|
@ -522,7 +522,8 @@ public class TestWebHdfsTokens {
|
|||
Assert.assertSame(token, token2);
|
||||
reset(fs);
|
||||
|
||||
// verify an expired ugi token is NOT replaced with a new token
|
||||
// verify an expired ugi token is not replaced with a new token if the
|
||||
// ugi does not contain a new token
|
||||
fs.cancelDelegationToken(token);
|
||||
for (int i=0; i<2; i++) {
|
||||
try {
|
||||
|
@ -543,6 +544,22 @@ public class TestWebHdfsTokens {
|
|||
reset(fs);
|
||||
}
|
||||
|
||||
// verify an expired ugi token is replaced with a new token
|
||||
// if one is available in UGI
|
||||
token = fs.getDelegationToken(null);
|
||||
ugi.addToken(token);
|
||||
reset(fs);
|
||||
fs.getFileStatus(new Path("/"));
|
||||
verify(fs, times(2)).getDelegationToken(); // first bad, then good
|
||||
verify(fs, times(1)).replaceExpiredDelegationToken();
|
||||
verify(fs, never()).getDelegationToken(anyString());
|
||||
verify(fs, times(1)).setDelegationToken(eq(token));
|
||||
token2 = fs.getRenewToken();
|
||||
Assert.assertNotNull(token2);
|
||||
Assert.assertEquals(fs.getTokenKind(), token.getKind());
|
||||
Assert.assertSame(token, token2);
|
||||
reset(fs);
|
||||
|
||||
// verify fs close does NOT cancel the ugi token
|
||||
fs.close();
|
||||
verify(fs, never()).getDelegationToken();
|
||||
|
|
Loading…
Reference in New Issue