HDFS-5339. Merge change r1573026 from trunk.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1573027 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
052e6e7fa6
commit
d744a6d8fe
|
@ -249,6 +249,9 @@ Release 2.4.0 - UNRELEASED
|
|||
HDFS-5936. MiniDFSCluster does not clean data left behind by
|
||||
SecondaryNameNode. (Binglin Chang via cnauroth)
|
||||
|
||||
HDFS-5339. WebHDFS URI does not accept logical nameservices when security is
|
||||
enabled. (Haohui Mai via jing9)
|
||||
|
||||
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)
|
||||
|
|
|
@ -97,9 +97,10 @@ public class HftpFileSystem extends FileSystem
|
|||
public static final String HFTP_TIMEZONE = "UTC";
|
||||
public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
|
||||
|
||||
protected TokenAspect<HftpFileSystem> tokenAspect;
|
||||
protected TokenAspect<? extends HftpFileSystem> tokenAspect;
|
||||
private Token<?> delegationToken;
|
||||
private Token<?> renewToken;
|
||||
protected Text tokenServiceName;
|
||||
|
||||
@Override
|
||||
public URI getCanonicalUri() {
|
||||
|
@ -175,9 +176,8 @@ public class HftpFileSystem extends FileSystem
|
|||
* Initialize connectionFactory and tokenAspect. This function is intended to
|
||||
* be overridden by HsFtpFileSystem.
|
||||
*/
|
||||
protected void initTokenAspect(Configuration conf)
|
||||
throws IOException {
|
||||
tokenAspect = new TokenAspect<HftpFileSystem>(this, TOKEN_KIND);
|
||||
protected void initTokenAspect() {
|
||||
tokenAspect = new TokenAspect<HftpFileSystem>(this, tokenServiceName, TOKEN_KIND);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -189,6 +189,7 @@ public class HftpFileSystem extends FileSystem
|
|||
.newDefaultURLConnectionFactory(conf);
|
||||
this.ugi = UserGroupInformation.getCurrentUser();
|
||||
this.nnUri = getNamenodeUri(name);
|
||||
this.tokenServiceName = SecurityUtil.buildTokenService(nnUri);
|
||||
|
||||
try {
|
||||
this.hftpURI = new URI(name.getScheme(), name.getAuthority(),
|
||||
|
@ -197,7 +198,7 @@ public class HftpFileSystem extends FileSystem
|
|||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
|
||||
initTokenAspect(conf);
|
||||
initTokenAspect();
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
tokenAspect.initDelegationToken(ugi);
|
||||
}
|
||||
|
|
|
@ -18,11 +18,8 @@
|
|||
|
||||
package org.apache.hadoop.hdfs.web;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
|
@ -60,8 +57,9 @@ public class HsftpFileSystem extends HftpFileSystem {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void initTokenAspect(Configuration conf) throws IOException {
|
||||
tokenAspect = new TokenAspect<HftpFileSystem>(this, TOKEN_KIND);
|
||||
protected void initTokenAspect() {
|
||||
tokenAspect = new TokenAspect<HsftpFileSystem>(this, tokenServiceName,
|
||||
TOKEN_KIND);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -37,7 +37,7 @@ public class SWebHdfsFileSystem extends WebHdfsFileSystem {
|
|||
|
||||
@Override
|
||||
protected synchronized void initializeTokenAspect() {
|
||||
tokenAspect = new TokenAspect<WebHdfsFileSystem>(this, TOKEN_KIND);
|
||||
tokenAspect = new TokenAspect<SWebHdfsFileSystem>(this, tokenServiceName, TOKEN_KIND);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.web;
|
|||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -31,7 +30,6 @@ import org.apache.hadoop.fs.DelegationTokenRenewer.Renewable;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
@ -93,23 +91,11 @@ final class TokenAspect<T extends FileSystem & Renewable> {
|
|||
}
|
||||
}
|
||||
|
||||
static class DTSelecorByKind extends
|
||||
private static class DTSelecorByKind extends
|
||||
AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
|
||||
private static final DelegationTokenSelector selector = new DelegationTokenSelector();
|
||||
|
||||
public DTSelecorByKind(final Text kind) {
|
||||
super(kind);
|
||||
}
|
||||
|
||||
Token<DelegationTokenIdentifier> selectToken(URI nnUri,
|
||||
Collection<Token<?>> tokens, Configuration conf) {
|
||||
Token<DelegationTokenIdentifier> token = selectToken(
|
||||
SecurityUtil.buildTokenService(nnUri), tokens);
|
||||
if (token == null) {
|
||||
token = selector.selectToken(nnUri, tokens, conf);
|
||||
}
|
||||
return token;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -117,9 +103,6 @@ final class TokenAspect<T extends FileSystem & Renewable> {
|
|||
*/
|
||||
interface TokenManagementDelegator {
|
||||
void cancelDelegationToken(final Token<?> token) throws IOException;
|
||||
|
||||
URI getCanonicalUri();
|
||||
|
||||
long renewDelegationToken(final Token<?> token) throws IOException;
|
||||
}
|
||||
|
||||
|
@ -129,11 +112,13 @@ final class TokenAspect<T extends FileSystem & Renewable> {
|
|||
private final T fs;
|
||||
private boolean hasInitedToken;
|
||||
private final Log LOG;
|
||||
private final Text serviceName;
|
||||
|
||||
TokenAspect(T fs, final Text kind) {
|
||||
TokenAspect(T fs, final Text serviceName, final Text kind) {
|
||||
this.LOG = LogFactory.getLog(fs.getClass());
|
||||
this.fs = fs;
|
||||
this.dtSelector = new DTSelecorByKind(kind);
|
||||
this.serviceName = serviceName;
|
||||
}
|
||||
|
||||
synchronized void ensureTokenInitialized() throws IOException {
|
||||
|
@ -173,9 +158,7 @@ final class TokenAspect<T extends FileSystem & Renewable> {
|
|||
@VisibleForTesting
|
||||
Token<DelegationTokenIdentifier> selectDelegationToken(
|
||||
UserGroupInformation ugi) {
|
||||
return dtSelector.selectToken(
|
||||
((TokenManagementDelegator)fs).getCanonicalUri(), ugi.getTokens(),
|
||||
fs.getConf());
|
||||
return dtSelector.selectToken(serviceName, ugi.getTokens());
|
||||
}
|
||||
|
||||
private synchronized void addRenewAction(final T webhdfs) {
|
||||
|
|
|
@ -27,7 +27,6 @@ import java.net.HttpURLConnection;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.List;
|
||||
|
@ -120,11 +119,12 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
|
||||
/** Delegation token kind */
|
||||
public static final Text TOKEN_KIND = new Text("WEBHDFS delegation");
|
||||
protected TokenAspect<WebHdfsFileSystem> tokenAspect;
|
||||
protected TokenAspect<? extends WebHdfsFileSystem> tokenAspect;
|
||||
|
||||
private UserGroupInformation ugi;
|
||||
private URI uri;
|
||||
private Token<?> delegationToken;
|
||||
protected Text tokenServiceName;
|
||||
private RetryPolicy retryPolicy = null;
|
||||
private Path workingDir;
|
||||
private InetSocketAddress nnAddrs[];
|
||||
|
@ -153,7 +153,8 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
* be overridden by SWebHdfsFileSystem.
|
||||
*/
|
||||
protected synchronized void initializeTokenAspect() {
|
||||
tokenAspect = new TokenAspect<WebHdfsFileSystem>(this, TOKEN_KIND);
|
||||
tokenAspect = new TokenAspect<WebHdfsFileSystem>(this, tokenServiceName,
|
||||
TOKEN_KIND);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -162,22 +163,25 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
super.initialize(uri, conf);
|
||||
setConf(conf);
|
||||
/** set user pattern based on configuration file */
|
||||
UserParam.setUserPattern(conf.get(DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY, DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT));
|
||||
UserParam.setUserPattern(conf.get(
|
||||
DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY,
|
||||
DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT));
|
||||
|
||||
connectionFactory = URLConnectionFactory
|
||||
.newDefaultURLConnectionFactory(conf);
|
||||
initializeTokenAspect();
|
||||
|
||||
ugi = UserGroupInformation.getCurrentUser();
|
||||
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
|
||||
this.nnAddrs = DFSUtil.resolveWebHdfsUri(this.uri, conf);
|
||||
|
||||
try {
|
||||
this.uri = new URI(uri.getScheme(), uri.getAuthority(), null,
|
||||
null, null);
|
||||
this.nnAddrs = DFSUtil.resolveWebHdfsUri(this.uri, conf);
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
boolean isHA = HAUtil.isLogicalUri(conf, this.uri);
|
||||
// In non-HA case, the code needs to call getCanonicalUri() in order to
|
||||
// handle the case where no port is specified in the URI
|
||||
this.tokenServiceName = isHA ? HAUtil.buildTokenServiceForLogicalUri(uri)
|
||||
: SecurityUtil.buildTokenService(getCanonicalUri());
|
||||
initializeTokenAspect();
|
||||
|
||||
if (!HAUtil.isLogicalUri(conf, this.uri)) {
|
||||
if (!isHA) {
|
||||
this.retryPolicy =
|
||||
RetryUtils.getDefaultRetryPolicy(
|
||||
conf,
|
||||
|
@ -1003,20 +1007,20 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
final String renewer) throws IOException {
|
||||
final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN;
|
||||
final Map<?, ?> m = run(op, null, new RenewerParam(renewer));
|
||||
final Token<DelegationTokenIdentifier> token = JsonUtil.toDelegationToken(m);
|
||||
SecurityUtil.setTokenService(token, getCurrentNNAddr());
|
||||
final Token<DelegationTokenIdentifier> token = JsonUtil.toDelegationToken(m);
|
||||
token.setService(tokenServiceName);
|
||||
return token;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<?> getRenewToken() {
|
||||
public synchronized Token<?> getRenewToken() {
|
||||
return delegationToken;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends TokenIdentifier> void setDelegationToken(
|
||||
final Token<T> token) {
|
||||
synchronized(this) {
|
||||
synchronized (this) {
|
||||
delegationToken = token;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
|
@ -73,14 +74,15 @@ public class TestHftpDelegationToken {
|
|||
doReturn(conn).when(factory).openConnection(Mockito.<URL> any(),
|
||||
anyBoolean());
|
||||
|
||||
fs.initialize(new URI("hftp://127.0.0.1:8020"), conf);
|
||||
final URI uri = new URI("hftp://127.0.0.1:8020");
|
||||
fs.initialize(uri, conf);
|
||||
fs.connectionFactory = factory;
|
||||
|
||||
UserGroupInformation ugi = UserGroupInformation.createUserForTesting("foo",
|
||||
new String[] { "bar" });
|
||||
|
||||
TokenAspect<HftpFileSystem> tokenAspect = new TokenAspect<HftpFileSystem>(
|
||||
fs, HftpFileSystem.TOKEN_KIND);
|
||||
fs, SecurityUtil.buildTokenService(uri), HftpFileSystem.TOKEN_KIND);
|
||||
|
||||
tokenAspect.initDelegationToken(ugi);
|
||||
tokenAspect.ensureTokenInitialized();
|
||||
|
|
|
@ -48,9 +48,8 @@ import org.apache.hadoop.fs.FileStatus;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.SecurityUtilTestHelper;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
|
@ -122,14 +121,9 @@ public class TestTokenAspect {
|
|||
public void initialize(URI name, Configuration conf) throws IOException {
|
||||
super.initialize(name, conf);
|
||||
setConf(conf);
|
||||
try {
|
||||
this.uri = new URI(name.getScheme(), name.getAuthority(), null, null,
|
||||
null);
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
|
||||
tokenAspect = new TokenAspect<DummyFs>(this, DummyFs.TOKEN_KIND);
|
||||
this.uri = URI.create(name.getScheme() + "://" + name.getAuthority());
|
||||
tokenAspect = new TokenAspect<DummyFs>(this,
|
||||
SecurityUtil.buildTokenService(uri), TOKEN_KIND);
|
||||
if (emulateSecurityEnabled || UserGroupInformation.isSecurityEnabled()) {
|
||||
tokenAspect.initDelegationToken(ugi);
|
||||
}
|
||||
|
@ -293,9 +287,10 @@ public class TestTokenAspect {
|
|||
doThrow(new IOException("get failed")).when(fs).addDelegationTokens(null,
|
||||
null);
|
||||
|
||||
final URI uri = new URI("dummyfs://127.0.0.1:1234");
|
||||
TokenAspect<DummyFs> tokenAspect = new TokenAspect<DummyFs>(fs,
|
||||
DummyFs.TOKEN_KIND);
|
||||
fs.initialize(new URI("dummyfs://127.0.0.1:1234"), conf);
|
||||
SecurityUtil.buildTokenService(uri), DummyFs.TOKEN_KIND);
|
||||
fs.initialize(uri, conf);
|
||||
tokenAspect.initDelegationToken(ugi);
|
||||
|
||||
// trigger token acquisition
|
||||
|
@ -318,58 +313,4 @@ public class TestTokenAspect {
|
|||
action = getActionFromTokenAspect(tokenAspect);
|
||||
assertTrue(action.isValid());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTokenSelectionPreferences() throws IOException,
|
||||
URISyntaxException {
|
||||
Configuration conf = new Configuration();
|
||||
DummyFs fs = spy(new DummyFs());
|
||||
doReturn(null).when(fs).getDelegationToken(anyString());
|
||||
fs.initialize(new URI("dummyfs://localhost:1234"), conf);
|
||||
TokenAspect<DummyFs> aspect = new TokenAspect<DummyFs>(fs,
|
||||
DummyFs.TOKEN_KIND);
|
||||
UserGroupInformation ugi = UserGroupInformation.createUserForTesting("foo",
|
||||
new String[] { "bar" });
|
||||
|
||||
// use ip-based tokens
|
||||
SecurityUtilTestHelper.setTokenServiceUseIp(true);
|
||||
|
||||
// test fallback to hdfs token
|
||||
Token<TokenIdentifier> hdfsToken = new Token<TokenIdentifier>(new byte[0],
|
||||
new byte[0], DelegationTokenIdentifier.HDFS_DELEGATION_KIND, new Text(
|
||||
"127.0.0.1:8020"));
|
||||
ugi.addToken(hdfsToken);
|
||||
|
||||
// test fallback to hdfs token
|
||||
Token<?> token = aspect.selectDelegationToken(ugi);
|
||||
assertEquals(hdfsToken, token);
|
||||
|
||||
// test dummyfs is favored over hdfs
|
||||
Token<TokenIdentifier> dummyFsToken = new Token<TokenIdentifier>(
|
||||
new byte[0], new byte[0], DummyFs.TOKEN_KIND,
|
||||
new Text("127.0.0.1:1234"));
|
||||
ugi.addToken(dummyFsToken);
|
||||
token = aspect.selectDelegationToken(ugi);
|
||||
assertEquals(dummyFsToken, token);
|
||||
|
||||
// switch to using host-based tokens, no token should match
|
||||
SecurityUtilTestHelper.setTokenServiceUseIp(false);
|
||||
token = aspect.selectDelegationToken(ugi);
|
||||
assertNull(token);
|
||||
|
||||
// test fallback to hdfs token
|
||||
hdfsToken = new Token<TokenIdentifier>(new byte[0], new byte[0],
|
||||
DelegationTokenIdentifier.HDFS_DELEGATION_KIND, new Text(
|
||||
"localhost:8020"));
|
||||
ugi.addToken(hdfsToken);
|
||||
token = aspect.selectDelegationToken(ugi);
|
||||
assertEquals(hdfsToken, token);
|
||||
|
||||
// test dummyfs is favored over hdfs
|
||||
dummyFsToken = new Token<TokenIdentifier>(new byte[0], new byte[0],
|
||||
DummyFs.TOKEN_KIND, new Text("localhost:1234"));
|
||||
ugi.addToken(dummyFsToken);
|
||||
token = aspect.selectDelegationToken(ugi);
|
||||
assertEquals(dummyFsToken, token);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.hdfs.web;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -27,35 +28,34 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
/** Test whether WebHDFS can connect to an HA cluster */
|
||||
public class TestWebHDFSForHA {
|
||||
|
||||
private static final String LOGICAL_NAME = "minidfs";
|
||||
private static final MiniDFSNNTopology topo = new MiniDFSNNTopology()
|
||||
.addNameservice(new MiniDFSNNTopology.NSConf(LOGICAL_NAME).addNN(
|
||||
new MiniDFSNNTopology.NNConf("nn1")).addNN(
|
||||
new MiniDFSNNTopology.NNConf("nn2")));
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
public void testHA() throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
|
||||
|
||||
MiniDFSNNTopology topo = new MiniDFSNNTopology()
|
||||
.addNameservice(new MiniDFSNNTopology.NSConf(LOGICAL_NAME).addNN(
|
||||
new MiniDFSNNTopology.NNConf("nn1")).addNN(
|
||||
new MiniDFSNNTopology.NNConf("nn2")));
|
||||
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology(topo)
|
||||
.numDataNodes(3).build();
|
||||
|
||||
HATestUtil.setFailoverConfigurations(cluster, conf, LOGICAL_NAME);
|
||||
|
||||
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
|
||||
MiniDFSCluster cluster = null;
|
||||
FileSystem fs = null;
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).nnTopology(topo)
|
||||
.numDataNodes(0).build();
|
||||
|
||||
HATestUtil.setFailoverConfigurations(cluster, conf, LOGICAL_NAME);
|
||||
|
||||
cluster.waitActive();
|
||||
|
||||
final String uri = WebHdfsFileSystem.SCHEME + "://" + LOGICAL_NAME;
|
||||
fs = (WebHdfsFileSystem) FileSystem.get(new URI(uri), conf);
|
||||
fs = FileSystem.get(URI.create(uri), conf);
|
||||
cluster.transitionToActive(0);
|
||||
|
||||
final Path dir = new Path("/test");
|
||||
|
@ -66,12 +66,50 @@ public class TestWebHDFSForHA {
|
|||
|
||||
final Path dir2 = new Path("/test2");
|
||||
Assert.assertTrue(fs.mkdirs(dir2));
|
||||
|
||||
} finally {
|
||||
if (fs != null) {
|
||||
fs.close();
|
||||
}
|
||||
cluster.shutdown();
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSecureHA() throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
|
||||
true);
|
||||
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
|
||||
|
||||
MiniDFSCluster cluster = null;
|
||||
WebHdfsFileSystem fs = null;
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).nnTopology(topo)
|
||||
.numDataNodes(0).build();
|
||||
|
||||
HATestUtil.setFailoverConfigurations(cluster, conf, LOGICAL_NAME);
|
||||
cluster.waitActive();
|
||||
|
||||
final String uri = WebHdfsFileSystem.SCHEME + "://" + LOGICAL_NAME;
|
||||
fs = (WebHdfsFileSystem) FileSystem.get(URI.create(uri), conf);
|
||||
|
||||
cluster.transitionToActive(0);
|
||||
Token<?> token = fs.getDelegationToken(null);
|
||||
|
||||
cluster.shutdownNameNode(0);
|
||||
cluster.transitionToActive(1);
|
||||
|
||||
fs.renewDelegationToken(token);
|
||||
fs.cancelDelegationToken(token);
|
||||
} finally {
|
||||
if (fs != null) {
|
||||
fs.close();
|
||||
}
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue