HBASE-24403 FsDelegationToken Should Cache Token After Acquired A New One (#1743)
Signed-off-by: Guanghao Zhang <zghao@apache.org> Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
parent
9ef17c2784
commit
7b396e9b8c
|
@ -50,6 +50,8 @@ public class FsDelegationToken {
|
||||||
private boolean hasForwardedToken = false;
|
private boolean hasForwardedToken = false;
|
||||||
private Token<?> userToken = null;
|
private Token<?> userToken = null;
|
||||||
private FileSystem fs = null;
|
private FileSystem fs = null;
|
||||||
|
private long tokenExpireTime = -1L;
|
||||||
|
private long renewAheadTime = Long.MAX_VALUE;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* @param renewer the account name that is allowed to renew the token.
|
* @param renewer the account name that is allowed to renew the token.
|
||||||
|
@ -59,6 +61,17 @@ public class FsDelegationToken {
|
||||||
this.renewer = renewer;
|
this.renewer = renewer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param renewer the account name that is allowed to renew the token.
|
||||||
|
* @param renewAheadTime how long in millis
|
||||||
|
*/
|
||||||
|
public FsDelegationToken(final UserProvider userProvider, final String renewer,
|
||||||
|
long renewAheadTime) {
|
||||||
|
this.userProvider = userProvider;
|
||||||
|
this.renewer = renewer;
|
||||||
|
this.renewAheadTime = renewAheadTime;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Acquire the delegation token for the specified filesystem.
|
* Acquire the delegation token for the specified filesystem.
|
||||||
* Before requesting a new delegation token, tries to find one already available.
|
* Before requesting a new delegation token, tries to find one already available.
|
||||||
|
@ -100,13 +113,20 @@ public class FsDelegationToken {
|
||||||
if (userProvider.isHadoopSecurityEnabled()) {
|
if (userProvider.isHadoopSecurityEnabled()) {
|
||||||
this.fs = fs;
|
this.fs = fs;
|
||||||
userToken = userProvider.getCurrent().getToken(tokenKind, fs.getCanonicalServiceName());
|
userToken = userProvider.getCurrent().getToken(tokenKind, fs.getCanonicalServiceName());
|
||||||
if (userToken == null) {
|
//We should acquire token when never acquired before or token is expiring or already expired
|
||||||
|
if (userToken == null || tokenExpireTime <= 0
|
||||||
|
|| System.currentTimeMillis() > tokenExpireTime - renewAheadTime) {
|
||||||
hasForwardedToken = false;
|
hasForwardedToken = false;
|
||||||
try {
|
try {
|
||||||
userToken = fs.getDelegationToken(renewer);
|
userToken = fs.getDelegationToken(renewer);
|
||||||
} catch (NullPointerException npe) {
|
//After acquired the new token,we quickly renew it to get the token expiration
|
||||||
|
//time to confirm to renew it before expiration
|
||||||
|
tokenExpireTime = userToken.renew(fs.getConf());
|
||||||
|
LOG.debug("Acquired new token " + userToken + ". Expiration time: " + tokenExpireTime);
|
||||||
|
userProvider.getCurrent().addToken(userToken);
|
||||||
|
} catch (InterruptedException | NullPointerException e) {
|
||||||
// we need to handle NullPointerException in case HADOOP-10009 is missing
|
// we need to handle NullPointerException in case HADOOP-10009 is missing
|
||||||
LOG.error("Failed to get token for " + renewer);
|
LOG.error("Failed to get token for " + renewer, e);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
hasForwardedToken = true;
|
hasForwardedToken = true;
|
||||||
|
|
|
@ -125,6 +125,10 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
||||||
*/
|
*/
|
||||||
public static final String BULK_LOAD_HFILES_BY_FAMILY = "hbase.mapreduce.bulkload.by.family";
|
public static final String BULK_LOAD_HFILES_BY_FAMILY = "hbase.mapreduce.bulkload.by.family";
|
||||||
|
|
||||||
|
//HDFS DelegationToken is cached and should be renewed before token expiration
|
||||||
|
public static final String BULK_LOAD_RENEW_TOKEN_TIME_BUFFER
|
||||||
|
= "hbase.bulkload.renew.token.time.buffer";
|
||||||
|
|
||||||
// We use a '.' prefix which is ignored when walking directory trees
|
// We use a '.' prefix which is ignored when walking directory trees
|
||||||
// above. It is invalid family name.
|
// above. It is invalid family name.
|
||||||
static final String TMP_DIR = ".tmp";
|
static final String TMP_DIR = ".tmp";
|
||||||
|
@ -142,6 +146,7 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
||||||
|
|
||||||
private List<String> clusterIds = new ArrayList<>();
|
private List<String> clusterIds = new ArrayList<>();
|
||||||
private boolean replicate = true;
|
private boolean replicate = true;
|
||||||
|
private final long retryAheadTime;
|
||||||
|
|
||||||
public BulkLoadHFilesTool(Configuration conf) {
|
public BulkLoadHFilesTool(Configuration conf) {
|
||||||
// make a copy, just to be sure we're not overriding someone else's config
|
// make a copy, just to be sure we're not overriding someone else's config
|
||||||
|
@ -149,7 +154,8 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
||||||
// disable blockcache for tool invocation, see HBASE-10500
|
// disable blockcache for tool invocation, see HBASE-10500
|
||||||
conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
|
conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
|
||||||
userProvider = UserProvider.instantiate(conf);
|
userProvider = UserProvider.instantiate(conf);
|
||||||
fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
|
retryAheadTime = conf.getLong(BULK_LOAD_RENEW_TOKEN_TIME_BUFFER, 60000L);
|
||||||
|
fsDelegationToken = new FsDelegationToken(userProvider, "renewer", retryAheadTime);
|
||||||
assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
|
assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
|
||||||
maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
|
maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
|
||||||
nrThreads = conf.getInt("hbase.loadincremental.threads.max",
|
nrThreads = conf.getInt("hbase.loadincremental.threads.max",
|
||||||
|
|
|
@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenId
|
||||||
import static org.apache.hadoop.hdfs.web.WebHdfsConstants.SWEBHDFS_TOKEN_KIND;
|
import static org.apache.hadoop.hdfs.web.WebHdfsConstants.SWEBHDFS_TOKEN_KIND;
|
||||||
import static org.apache.hadoop.hdfs.web.WebHdfsConstants.WEBHDFS_TOKEN_KIND;
|
import static org.apache.hadoop.hdfs.web.WebHdfsConstants.WEBHDFS_TOKEN_KIND;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -81,6 +82,9 @@ public class TestFsDelegationToken {
|
||||||
when(hdfsToken.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN"));
|
when(hdfsToken.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN"));
|
||||||
when(webhdfsToken.getKind()).thenReturn(WEBHDFS_TOKEN_KIND);
|
when(webhdfsToken.getKind()).thenReturn(WEBHDFS_TOKEN_KIND);
|
||||||
when(swebhdfsToken.getKind()).thenReturn(SWEBHDFS_TOKEN_KIND);
|
when(swebhdfsToken.getKind()).thenReturn(SWEBHDFS_TOKEN_KIND);
|
||||||
|
when(fileSystem.getDelegationToken("Renewer")).thenReturn(hdfsToken);
|
||||||
|
when(webHdfsFileSystem.getDelegationToken("Renewer")).thenReturn(webhdfsToken);
|
||||||
|
when(swebHdfsFileSystem.getDelegationToken("Renewer")).thenReturn(swebhdfsToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -88,6 +92,10 @@ public class TestFsDelegationToken {
|
||||||
fsDelegationToken.acquireDelegationToken(fileSystem);
|
fsDelegationToken.acquireDelegationToken(fileSystem);
|
||||||
assertEquals(
|
assertEquals(
|
||||||
fsDelegationToken.getUserToken().getKind(), HDFS_DELEGATION_KIND);
|
fsDelegationToken.getUserToken().getKind(), HDFS_DELEGATION_KIND);
|
||||||
|
assertNotNull(
|
||||||
|
"HDFS Token should exist in cache after acquired",
|
||||||
|
userProvider.getCurrent()
|
||||||
|
.getToken(HDFS_DELEGATION_KIND.toString(), fileSystem.getCanonicalServiceName()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -95,6 +103,10 @@ public class TestFsDelegationToken {
|
||||||
fsDelegationToken.acquireDelegationToken(webHdfsFileSystem);
|
fsDelegationToken.acquireDelegationToken(webHdfsFileSystem);
|
||||||
assertEquals(
|
assertEquals(
|
||||||
fsDelegationToken.getUserToken().getKind(), WEBHDFS_TOKEN_KIND);
|
fsDelegationToken.getUserToken().getKind(), WEBHDFS_TOKEN_KIND);
|
||||||
|
assertNotNull(
|
||||||
|
"Webhdfs token should exist in cache after acquired",
|
||||||
|
userProvider.getCurrent()
|
||||||
|
.getToken(WEBHDFS_TOKEN_KIND.toString(), webHdfsFileSystem.getCanonicalServiceName()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -102,6 +114,10 @@ public class TestFsDelegationToken {
|
||||||
fsDelegationToken.acquireDelegationToken(swebHdfsFileSystem);
|
fsDelegationToken.acquireDelegationToken(swebHdfsFileSystem);
|
||||||
assertEquals(
|
assertEquals(
|
||||||
fsDelegationToken.getUserToken().getKind(), SWEBHDFS_TOKEN_KIND);
|
fsDelegationToken.getUserToken().getKind(), SWEBHDFS_TOKEN_KIND);
|
||||||
|
assertNotNull(
|
||||||
|
"Swebhdfs token should exist in cache after acquired",
|
||||||
|
userProvider.getCurrent()
|
||||||
|
.getToken(SWEBHDFS_TOKEN_KIND.toString(), swebHdfsFileSystem.getCanonicalServiceName()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = NullPointerException.class)
|
@Test(expected = NullPointerException.class)
|
||||||
|
@ -114,6 +130,10 @@ public class TestFsDelegationToken {
|
||||||
fsDelegationToken
|
fsDelegationToken
|
||||||
.acquireDelegationToken(WEBHDFS_TOKEN_KIND.toString(), webHdfsFileSystem);
|
.acquireDelegationToken(WEBHDFS_TOKEN_KIND.toString(), webHdfsFileSystem);
|
||||||
assertEquals(fsDelegationToken.getUserToken().getKind(), WEBHDFS_TOKEN_KIND);
|
assertEquals(fsDelegationToken.getUserToken().getKind(), WEBHDFS_TOKEN_KIND);
|
||||||
|
assertNotNull(
|
||||||
|
"Webhdfs token should exist in cache after acquired",
|
||||||
|
userProvider.getCurrent()
|
||||||
|
.getToken(WEBHDFS_TOKEN_KIND.toString(), webHdfsFileSystem.getCanonicalServiceName()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -122,5 +142,9 @@ public class TestFsDelegationToken {
|
||||||
.acquireDelegationToken(
|
.acquireDelegationToken(
|
||||||
SWEBHDFS_TOKEN_KIND.toString(), swebHdfsFileSystem);
|
SWEBHDFS_TOKEN_KIND.toString(), swebHdfsFileSystem);
|
||||||
assertEquals(fsDelegationToken.getUserToken().getKind(), SWEBHDFS_TOKEN_KIND);
|
assertEquals(fsDelegationToken.getUserToken().getKind(), SWEBHDFS_TOKEN_KIND);
|
||||||
|
assertNotNull(
|
||||||
|
"Swebhdfs token should exist in cache after acquired",
|
||||||
|
userProvider.getCurrent()
|
||||||
|
.getToken(SWEBHDFS_TOKEN_KIND.toString(), swebHdfsFileSystem.getCanonicalServiceName()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue