HDFS-6776. Using distcp to copy data between insecure and secure cluster via webdhfs doesn't work. (yzhangal via tucu)
This commit is contained in:
parent
6dae4b430c
commit
bbff44cb03
|
@ -739,6 +739,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
HDFS-6986. DistributedFileSystem must get delegation tokens from configured
|
HDFS-6986. DistributedFileSystem must get delegation tokens from configured
|
||||||
KeyProvider. (zhz via tucu)
|
KeyProvider. (zhz via tucu)
|
||||||
|
|
||||||
|
HDFS-6776. Using distcp to copy data between insecure and secure cluster via webdhfs
|
||||||
|
doesn't work. (yzhangal via tucu)
|
||||||
|
|
||||||
Release 2.5.1 - UNRELEASED
|
Release 2.5.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -402,8 +402,7 @@ public class DelegationTokenSecretManager
|
||||||
final Token<DelegationTokenIdentifier> token = namenode.getRpcServer(
|
final Token<DelegationTokenIdentifier> token = namenode.getRpcServer(
|
||||||
).getDelegationToken(new Text(renewer));
|
).getDelegationToken(new Text(renewer));
|
||||||
if (token == null) {
|
if (token == null) {
|
||||||
throw new IOException("Failed to get the token for " + renewer
|
return null;
|
||||||
+ ", user=" + ugi.getShortUserName());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
final InetSocketAddress addr = namenode.getNameNodeAddress();
|
final InetSocketAddress addr = namenode.getNameNodeAddress();
|
||||||
|
|
|
@ -283,6 +283,9 @@ public class NamenodeWebHdfsMethods {
|
||||||
final String renewer) throws IOException {
|
final String renewer) throws IOException {
|
||||||
final Credentials c = DelegationTokenSecretManager.createCredentials(
|
final Credentials c = DelegationTokenSecretManager.createCredentials(
|
||||||
namenode, ugi, renewer != null? renewer: ugi.getShortUserName());
|
namenode, ugi, renewer != null? renewer: ugi.getShortUserName());
|
||||||
|
if (c == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
final Token<? extends TokenIdentifier> t = c.getAllTokens().iterator().next();
|
final Token<? extends TokenIdentifier> t = c.getAllTokens().iterator().next();
|
||||||
Text kind = request.getScheme().equals("http") ? WebHdfsFileSystem.TOKEN_KIND
|
Text kind = request.getScheme().equals("http") ? WebHdfsFileSystem.TOKEN_KIND
|
||||||
: SWebHdfsFileSystem.TOKEN_KIND;
|
: SWebHdfsFileSystem.TOKEN_KIND;
|
||||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.fs.ContentSummary;
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
import org.apache.hadoop.fs.DelegationTokenRenewer;
|
import org.apache.hadoop.fs.DelegationTokenRenewer;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
@ -102,6 +103,11 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
|
|
||||||
/** Delegation token kind */
|
/** Delegation token kind */
|
||||||
public static final Text TOKEN_KIND = new Text("WEBHDFS delegation");
|
public static final Text TOKEN_KIND = new Text("WEBHDFS delegation");
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public static final String CANT_FALLBACK_TO_INSECURE_MSG =
|
||||||
|
"The client is configured to only allow connecting to secure cluster";
|
||||||
|
|
||||||
private boolean canRefreshDelegationToken;
|
private boolean canRefreshDelegationToken;
|
||||||
|
|
||||||
private UserGroupInformation ugi;
|
private UserGroupInformation ugi;
|
||||||
|
@ -112,6 +118,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
private Path workingDir;
|
private Path workingDir;
|
||||||
private InetSocketAddress nnAddrs[];
|
private InetSocketAddress nnAddrs[];
|
||||||
private int currentNNAddrIndex;
|
private int currentNNAddrIndex;
|
||||||
|
private boolean disallowFallbackToInsecureCluster;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the protocol scheme for the FileSystem.
|
* Return the protocol scheme for the FileSystem.
|
||||||
|
@ -194,6 +201,9 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
|
|
||||||
this.workingDir = getHomeDirectory();
|
this.workingDir = getHomeDirectory();
|
||||||
this.canRefreshDelegationToken = UserGroupInformation.isSecurityEnabled();
|
this.canRefreshDelegationToken = UserGroupInformation.isSecurityEnabled();
|
||||||
|
this.disallowFallbackToInsecureCluster = !conf.getBoolean(
|
||||||
|
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
|
||||||
|
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
|
||||||
this.delegationToken = null;
|
this.delegationToken = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1293,7 +1303,13 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
return JsonUtil.toDelegationToken(json);
|
return JsonUtil.toDelegationToken(json);
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
token.setService(tokenServiceName);
|
if (token != null) {
|
||||||
|
token.setService(tokenServiceName);
|
||||||
|
} else {
|
||||||
|
if (disallowFallbackToInsecureCluster) {
|
||||||
|
throw new AccessControlException(CANT_FALLBACK_TO_INSECURE_MSG);
|
||||||
|
}
|
||||||
|
}
|
||||||
return token;
|
return token;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -45,6 +46,7 @@ import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
|
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.apache.hadoop.ipc.RetriableException;
|
import org.apache.hadoop.ipc.RetriableException;
|
||||||
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
|
@ -482,4 +484,43 @@ public class TestWebHDFS {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDTInInsecureClusterWithFallback()
|
||||||
|
throws IOException, URISyntaxException {
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
final Configuration conf = WebHdfsTestUtil.createConf();
|
||||||
|
conf.setBoolean(CommonConfigurationKeys
|
||||||
|
.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, true);
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
||||||
|
final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
|
||||||
|
WebHdfsFileSystem.SCHEME);
|
||||||
|
Assert.assertNull(webHdfs.getDelegationToken(null));
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDTInInsecureCluster() throws Exception {
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
final Configuration conf = WebHdfsTestUtil.createConf();
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
||||||
|
final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
|
||||||
|
WebHdfsFileSystem.SCHEME);
|
||||||
|
webHdfs.getDelegationToken(null);
|
||||||
|
fail("No exception is thrown.");
|
||||||
|
} catch (AccessControlException ace) {
|
||||||
|
Assert.assertTrue(ace.getMessage().startsWith(
|
||||||
|
WebHdfsFileSystem.CANT_FALLBACK_TO_INSECURE_MSG));
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue