svn merge -c 1374271 FIXES: HADOOP-7967. Need generalized multi-token filesystem support (daryn)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1374346 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1c7758d9a4
commit
a8f10869a8
|
@ -626,6 +626,8 @@ Release 0.23.3 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
||||||
|
HADOOP-7967. Need generalized multi-token filesystem support (daryn)
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -217,6 +218,6 @@ public abstract class DelegateToFileSystem extends AbstractFileSystem {
|
||||||
|
|
||||||
@Override //AbstractFileSystem
|
@Override //AbstractFileSystem
|
||||||
public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
|
public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
|
||||||
return fsImpl.getDelegationTokens(renewer);
|
return Arrays.asList(fsImpl.addDelegationTokens(renewer, null));
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -110,7 +110,11 @@ public class DelegationTokenRenewer<T extends FileSystem & DelegationTokenRenewe
|
||||||
fs.getRenewToken().renew(fs.getConf());
|
fs.getRenewToken().renew(fs.getConf());
|
||||||
} catch (IOException ie) {
|
} catch (IOException ie) {
|
||||||
try {
|
try {
|
||||||
fs.setDelegationToken(fs.getDelegationTokens(null).get(0));
|
Token<?>[] tokens = fs.addDelegationTokens(null, null);
|
||||||
|
if (tokens.length == 0) {
|
||||||
|
throw new IOException("addDelegationTokens returned no tokens");
|
||||||
|
}
|
||||||
|
fs.setDelegationToken(tokens[0]);
|
||||||
} catch (IOException ie2) {
|
} catch (IOException ie2) {
|
||||||
throw new IOException("Can't renew or get new delegation token ", ie);
|
throw new IOException("Can't renew or get new delegation token ", ie);
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.hadoop.conf.Configured;
|
||||||
import org.apache.hadoop.fs.Options.Rename;
|
import org.apache.hadoop.fs.Options.Rename;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.io.MultipleIOException;
|
import org.apache.hadoop.io.MultipleIOException;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
@ -57,6 +58,8 @@ import org.apache.hadoop.util.Progressable;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.util.ShutdownHookManager;
|
import org.apache.hadoop.util.ShutdownHookManager;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/****************************************************************
|
/****************************************************************
|
||||||
* An abstract base class for a fairly generic filesystem. It
|
* An abstract base class for a fairly generic filesystem. It
|
||||||
* may be implemented as a distributed filesystem, or as a "local"
|
* may be implemented as a distributed filesystem, or as a "local"
|
||||||
|
@ -222,15 +225,25 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a canonical service name for this file system. The token cache is
|
* Get a canonical service name for this file system. The token cache is
|
||||||
* the only user of this value, and uses it to lookup this filesystem's
|
* the only user of the canonical service name, and uses it to lookup this
|
||||||
* service tokens. The token cache will not attempt to acquire tokens if the
|
* filesystem's service tokens.
|
||||||
* service is null.
|
* If file system provides a token of its own then it must have a canonical
|
||||||
|
* name, otherwise canonical name can be null.
|
||||||
|
*
|
||||||
|
* Default Impl: If the file system has child file systems
|
||||||
|
* (such as an embedded file system) then it is assumed that the fs has no
|
||||||
|
* tokens of its own and hence returns a null name; otherwise a service
|
||||||
|
* name is built using Uri and port.
|
||||||
|
*
|
||||||
* @return a service string that uniquely identifies this file system, null
|
* @return a service string that uniquely identifies this file system, null
|
||||||
* if the filesystem does not implement tokens
|
* if the filesystem does not implement tokens
|
||||||
* @see SecurityUtil#buildDTServiceName(URI, int)
|
* @see SecurityUtil#buildDTServiceName(URI, int)
|
||||||
*/
|
*/
|
||||||
|
@InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
|
||||||
public String getCanonicalServiceName() {
|
public String getCanonicalServiceName() {
|
||||||
return SecurityUtil.buildDTServiceName(getUri(), getDefaultPort());
|
return (getChildFileSystems() == null)
|
||||||
|
? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort())
|
||||||
|
: null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @deprecated call #getUri() instead.*/
|
/** @deprecated call #getUri() instead.*/
|
||||||
|
@ -396,68 +409,95 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deprecated - use @link {@link #getDelegationTokens(String)}
|
|
||||||
* Get a new delegation token for this file system.
|
* Get a new delegation token for this file system.
|
||||||
|
* This is an internal method that should have been declared protected
|
||||||
|
* but wasn't historically.
|
||||||
|
* Callers should use {@link #addDelegationTokens(String, Credentials)}
|
||||||
|
*
|
||||||
* @param renewer the account name that is allowed to renew the token.
|
* @param renewer the account name that is allowed to renew the token.
|
||||||
* @return a new delegation token
|
* @return a new delegation token
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
|
@InterfaceAudience.Private()
|
||||||
@Deprecated
|
|
||||||
public Token<?> getDelegationToken(String renewer) throws IOException {
|
public Token<?> getDelegationToken(String renewer) throws IOException {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get one or more delegation tokens associated with the filesystem. Normally
|
* Obtain all delegation tokens used by this FileSystem that are not
|
||||||
* a file system returns a single delegation token. A file system that manages
|
* already present in the given Credentials. Existing tokens will neither
|
||||||
* multiple file systems underneath, could return set of delegation tokens for
|
* be verified as valid nor having the given renewer. Missing tokens will
|
||||||
* all the file systems it manages.
|
* be acquired and added to the given Credentials.
|
||||||
*
|
*
|
||||||
* @param renewer the account name that is allowed to renew the token.
|
* Default Impl: works for simple fs with its own token
|
||||||
|
* and also for an embedded fs whose tokens are those of its
|
||||||
|
* children file system (i.e. the embedded fs has not tokens of its
|
||||||
|
* own).
|
||||||
|
*
|
||||||
|
* @param renewer the user allowed to renew the delegation tokens
|
||||||
|
* @param credentials cache in which to add new delegation tokens
|
||||||
* @return list of new delegation tokens
|
* @return list of new delegation tokens
|
||||||
* If delegation tokens not supported then return a list of size zero.
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.LimitedPrivate( { "HDFS", "MapReduce" })
|
|
||||||
public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
|
|
||||||
return new ArrayList<Token<?>>(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @see #getDelegationTokens(String)
|
|
||||||
* This is similar to getDelegationTokens, with the added restriction that if
|
|
||||||
* a token is already present in the passed Credentials object - that token
|
|
||||||
* is returned instead of a new delegation token.
|
|
||||||
*
|
|
||||||
* If the token is found to be cached in the Credentials object, this API does
|
|
||||||
* not verify the token validity or the passed in renewer.
|
|
||||||
*
|
|
||||||
*
|
|
||||||
* @param renewer the account name that is allowed to renew the token.
|
|
||||||
* @param credentials a Credentials object containing already knowing
|
|
||||||
* delegationTokens.
|
|
||||||
* @return a list of delegation tokens.
|
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
|
@InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
|
||||||
public List<Token<?>> getDelegationTokens(String renewer,
|
public Token<?>[] addDelegationTokens(
|
||||||
Credentials credentials) throws IOException {
|
final String renewer, Credentials credentials) throws IOException {
|
||||||
List<Token<?>> allTokens = getDelegationTokens(renewer);
|
if (credentials == null) {
|
||||||
List<Token<?>> newTokens = new ArrayList<Token<?>>();
|
credentials = new Credentials();
|
||||||
if (allTokens != null) {
|
}
|
||||||
for (Token<?> token : allTokens) {
|
final List<Token<?>> tokens = new ArrayList<Token<?>>();
|
||||||
Token<?> knownToken = credentials.getToken(token.getService());
|
collectDelegationTokens(renewer, credentials, tokens);
|
||||||
if (knownToken == null) {
|
return tokens.toArray(new Token<?>[tokens.size()]);
|
||||||
newTokens.add(token);
|
}
|
||||||
} else {
|
|
||||||
newTokens.add(knownToken);
|
/**
|
||||||
|
* Recursively obtain the tokens for this FileSystem and all descended
|
||||||
|
* FileSystems as determined by getChildFileSystems().
|
||||||
|
* @param renewer the user allowed to renew the delegation tokens
|
||||||
|
* @param credentials cache in which to add the new delegation tokens
|
||||||
|
* @param tokens list in which to add acquired tokens
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private void collectDelegationTokens(final String renewer,
|
||||||
|
final Credentials credentials,
|
||||||
|
final List<Token<?>> tokens)
|
||||||
|
throws IOException {
|
||||||
|
final String serviceName = getCanonicalServiceName();
|
||||||
|
// Collect token of the this filesystem and then of its embedded children
|
||||||
|
if (serviceName != null) { // fs has token, grab it
|
||||||
|
final Text service = new Text(serviceName);
|
||||||
|
Token<?> token = credentials.getToken(service);
|
||||||
|
if (token == null) {
|
||||||
|
token = getDelegationToken(renewer);
|
||||||
|
if (token != null) {
|
||||||
|
tokens.add(token);
|
||||||
|
credentials.addToken(service, token);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return newTokens;
|
// Now collect the tokens from the children
|
||||||
|
final FileSystem[] children = getChildFileSystems();
|
||||||
|
if (children != null) {
|
||||||
|
for (final FileSystem fs : children) {
|
||||||
|
fs.collectDelegationTokens(renewer, credentials, tokens);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get all the immediate child FileSystems embedded in this FileSystem.
|
||||||
|
* It does not recurse and get grand children. If a FileSystem
|
||||||
|
* has multiple child FileSystems, then it should return a unique list
|
||||||
|
* of those FileSystems. Default is to return null to signify no children.
|
||||||
|
*
|
||||||
|
* @return FileSystems used by this FileSystem
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.LimitedPrivate({ "HDFS" })
|
||||||
|
@VisibleForTesting
|
||||||
|
public FileSystem[] getChildFileSystems() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
/** create a file with the provided permission
|
/** create a file with the provided permission
|
||||||
* The permission of the file is set to be the provided permission as in
|
* The permission of the file is set to be the provided permission as in
|
||||||
* setPermission, not permission&~umask
|
* setPermission, not permission&~umask
|
||||||
|
|
|
@ -22,15 +22,11 @@ import java.io.*;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.fs.ContentSummary;
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
import org.apache.hadoop.security.Credentials;
|
|
||||||
import org.apache.hadoop.security.token.Token;
|
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
|
||||||
/****************************************************************
|
/****************************************************************
|
||||||
|
@ -428,25 +424,7 @@ public class FilterFileSystem extends FileSystem {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // FileSystem
|
@Override // FileSystem
|
||||||
public String getCanonicalServiceName() {
|
public FileSystem[] getChildFileSystems() {
|
||||||
return fs.getCanonicalServiceName();
|
return new FileSystem[]{fs};
|
||||||
}
|
|
||||||
|
|
||||||
@Override // FileSystem
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
public Token<?> getDelegationToken(String renewer) throws IOException {
|
|
||||||
return fs.getDelegationToken(renewer);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override // FileSystem
|
|
||||||
public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
|
|
||||||
return fs.getDelegationTokens(renewer);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
// FileSystem
|
|
||||||
public List<Token<?>> getDelegationTokens(String renewer,
|
|
||||||
Credentials credentials) throws IOException {
|
|
||||||
return fs.getDelegationTokens(renewer, credentials);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,7 @@ import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.util.ArrayList;
|
import java.util.Arrays;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -49,11 +49,8 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.fs.viewfs.InodeTree.INode;
|
import org.apache.hadoop.fs.viewfs.InodeTree.INode;
|
||||||
import org.apache.hadoop.fs.viewfs.InodeTree.INodeLink;
|
import org.apache.hadoop.fs.viewfs.InodeTree.INodeLink;
|
||||||
import org.apache.hadoop.io.Text;
|
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.Credentials;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
|
@ -235,11 +232,6 @@ public class ViewFileSystem extends FileSystem {
|
||||||
return res.isInternalDir() ? null : res.targetFileSystem.getHomeDirectory();
|
return res.isInternalDir() ? null : res.targetFileSystem.getHomeDirectory();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getCanonicalServiceName() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public URI getUri() {
|
public URI getUri() {
|
||||||
return myUri;
|
return myUri;
|
||||||
|
@ -549,6 +541,18 @@ public class ViewFileSystem extends FileSystem {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FileSystem[] getChildFileSystems() {
|
||||||
|
List<InodeTree.MountPoint<FileSystem>> mountPoints =
|
||||||
|
fsState.getMountPoints();
|
||||||
|
Set<FileSystem> children = new HashSet<FileSystem>();
|
||||||
|
for (InodeTree.MountPoint<FileSystem> mountPoint : mountPoints) {
|
||||||
|
FileSystem targetFs = mountPoint.target.targetFileSystem;
|
||||||
|
children.addAll(Arrays.asList(targetFs.getChildFileSystems()));
|
||||||
|
}
|
||||||
|
return children.toArray(new FileSystem[]{});
|
||||||
|
}
|
||||||
|
|
||||||
public MountPoint[] getMountPoints() {
|
public MountPoint[] getMountPoints() {
|
||||||
List<InodeTree.MountPoint<FileSystem>> mountPoints =
|
List<InodeTree.MountPoint<FileSystem>> mountPoints =
|
||||||
fsState.getMountPoints();
|
fsState.getMountPoints();
|
||||||
|
@ -561,59 +565,6 @@ public class ViewFileSystem extends FileSystem {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
|
|
||||||
List<InodeTree.MountPoint<FileSystem>> mountPoints =
|
|
||||||
fsState.getMountPoints();
|
|
||||||
int initialListSize = 0;
|
|
||||||
for (InodeTree.MountPoint<FileSystem> im : mountPoints) {
|
|
||||||
initialListSize += im.target.targetDirLinkList.length;
|
|
||||||
}
|
|
||||||
List<Token<?>> result = new ArrayList<Token<?>>(initialListSize);
|
|
||||||
for ( int i = 0; i < mountPoints.size(); ++i ) {
|
|
||||||
List<Token<?>> tokens =
|
|
||||||
mountPoints.get(i).target.targetFileSystem.getDelegationTokens(renewer);
|
|
||||||
if (tokens != null) {
|
|
||||||
result.addAll(tokens);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<Token<?>> getDelegationTokens(String renewer,
|
|
||||||
Credentials credentials) throws IOException {
|
|
||||||
List<InodeTree.MountPoint<FileSystem>> mountPoints =
|
|
||||||
fsState.getMountPoints();
|
|
||||||
int initialListSize = 0;
|
|
||||||
for (InodeTree.MountPoint<FileSystem> im : mountPoints) {
|
|
||||||
initialListSize += im.target.targetDirLinkList.length;
|
|
||||||
}
|
|
||||||
Set<String> seenServiceNames = new HashSet<String>();
|
|
||||||
List<Token<?>> result = new ArrayList<Token<?>>(initialListSize);
|
|
||||||
for (int i = 0; i < mountPoints.size(); ++i) {
|
|
||||||
String serviceName =
|
|
||||||
mountPoints.get(i).target.targetFileSystem.getCanonicalServiceName();
|
|
||||||
if (serviceName == null || seenServiceNames.contains(serviceName)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
seenServiceNames.add(serviceName);
|
|
||||||
Token<?> knownToken = credentials.getToken(new Text(serviceName));
|
|
||||||
if (knownToken != null) {
|
|
||||||
result.add(knownToken);
|
|
||||||
} else {
|
|
||||||
List<Token<?>> tokens =
|
|
||||||
mountPoints.get(i).target.targetFileSystem
|
|
||||||
.getDelegationTokens(renewer);
|
|
||||||
if (tokens != null) {
|
|
||||||
result.addAll(tokens);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* An instance of this class represents an internal dir of the viewFs
|
* An instance of this class represents an internal dir of the viewFs
|
||||||
* that is internal dir of the mount table.
|
* that is internal dir of the mount table.
|
||||||
|
|
|
@ -56,6 +56,20 @@ public class Credentials implements Writable {
|
||||||
private Map<Text, Token<? extends TokenIdentifier>> tokenMap =
|
private Map<Text, Token<? extends TokenIdentifier>> tokenMap =
|
||||||
new HashMap<Text, Token<? extends TokenIdentifier>>();
|
new HashMap<Text, Token<? extends TokenIdentifier>>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create an empty credentials instance
|
||||||
|
*/
|
||||||
|
public Credentials() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a copy of the given credentials
|
||||||
|
* @param credentials to copy
|
||||||
|
*/
|
||||||
|
public Credentials(Credentials credentials) {
|
||||||
|
this.addAll(credentials);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the key bytes for the alias
|
* Returns the key bytes for the alias
|
||||||
* @param alias the alias for the key
|
* @param alias the alias for the key
|
||||||
|
|
|
@ -24,8 +24,10 @@ import java.util.Random;
|
||||||
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper class for unit tests.
|
* Helper class for unit tests.
|
||||||
|
@ -218,4 +220,39 @@ public final class FileSystemTestHelper {
|
||||||
}
|
}
|
||||||
Assert.assertEquals(aFs.makeQualified(new Path(path)), s.getPath());
|
Assert.assertEquals(aFs.makeQualified(new Path(path)), s.getPath());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class to enable easier mocking of a FileSystem
|
||||||
|
* Use getRawFileSystem to retrieve the mock
|
||||||
|
*/
|
||||||
|
public static class MockFileSystem extends FilterFileSystem {
|
||||||
|
public MockFileSystem() {
|
||||||
|
// it's a bit ackward to mock ourselves, but it allows the visibility
|
||||||
|
// of methods to be increased
|
||||||
|
super(mock(MockFileSystem.class));
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public MockFileSystem getRawFileSystem() {
|
||||||
|
return (MockFileSystem) super.getRawFileSystem();
|
||||||
|
|
||||||
|
}
|
||||||
|
// these basic methods need to directly propagate to the mock to be
|
||||||
|
// more transparent
|
||||||
|
@Override
|
||||||
|
public void initialize(URI uri, Configuration conf) throws IOException {
|
||||||
|
fs.initialize(uri, conf);
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public String getCanonicalServiceName() {
|
||||||
|
return fs.getCanonicalServiceName();
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public FileSystem[] getChildFileSystems() {
|
||||||
|
return fs.getChildFileSystems();
|
||||||
|
}
|
||||||
|
@Override // publicly expose for mocking
|
||||||
|
public Token<?> getDelegationToken(String renewer) throws IOException {
|
||||||
|
return fs.getDelegationToken(renewer);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,279 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
import static org.mockito.Matchers.*;
|
||||||
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileSystemTestHelper.MockFileSystem;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
public class TestFileSystemTokens {
|
||||||
|
private static String renewer = "renewer!";
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFsWithNoToken() throws Exception {
|
||||||
|
MockFileSystem fs = createFileSystemForServiceName(null);
|
||||||
|
Credentials credentials = new Credentials();
|
||||||
|
|
||||||
|
fs.addDelegationTokens(renewer, credentials);
|
||||||
|
verifyTokenFetch(fs, false);
|
||||||
|
assertEquals(0, credentials.numberOfTokens());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFsWithToken() throws Exception {
|
||||||
|
Text service = new Text("singleTokenFs");
|
||||||
|
MockFileSystem fs = createFileSystemForServiceName(service);
|
||||||
|
Credentials credentials = new Credentials();
|
||||||
|
|
||||||
|
fs.addDelegationTokens(renewer, credentials);
|
||||||
|
verifyTokenFetch(fs, true);
|
||||||
|
|
||||||
|
assertEquals(1, credentials.numberOfTokens());
|
||||||
|
assertNotNull(credentials.getToken(service));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFsWithTokenExists() throws Exception {
|
||||||
|
Credentials credentials = new Credentials();
|
||||||
|
Text service = new Text("singleTokenFs");
|
||||||
|
MockFileSystem fs = createFileSystemForServiceName(service);
|
||||||
|
Token<?> token = mock(Token.class);
|
||||||
|
credentials.addToken(service, token);
|
||||||
|
|
||||||
|
fs.addDelegationTokens(renewer, credentials);
|
||||||
|
verifyTokenFetch(fs, false);
|
||||||
|
|
||||||
|
assertEquals(1, credentials.numberOfTokens());
|
||||||
|
assertSame(token, credentials.getToken(service));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFsWithChildTokens() throws Exception {
|
||||||
|
Credentials credentials = new Credentials();
|
||||||
|
Text service1 = new Text("singleTokenFs1");
|
||||||
|
Text service2 = new Text("singleTokenFs2");
|
||||||
|
|
||||||
|
MockFileSystem fs1 = createFileSystemForServiceName(service1);
|
||||||
|
MockFileSystem fs2 = createFileSystemForServiceName(service2);
|
||||||
|
MockFileSystem fs3 = createFileSystemForServiceName(null);
|
||||||
|
MockFileSystem multiFs =
|
||||||
|
createFileSystemForServiceName(null, fs1, fs2, fs3);
|
||||||
|
|
||||||
|
multiFs.addDelegationTokens(renewer, credentials);
|
||||||
|
verifyTokenFetch(multiFs, false); // has no tokens of own, only child tokens
|
||||||
|
verifyTokenFetch(fs1, true);
|
||||||
|
verifyTokenFetch(fs2, true);
|
||||||
|
verifyTokenFetch(fs3, false);
|
||||||
|
|
||||||
|
assertEquals(2, credentials.numberOfTokens());
|
||||||
|
assertNotNull(credentials.getToken(service1));
|
||||||
|
assertNotNull(credentials.getToken(service2));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFsWithDuplicateChildren() throws Exception {
|
||||||
|
Credentials credentials = new Credentials();
|
||||||
|
Text service = new Text("singleTokenFs1");
|
||||||
|
|
||||||
|
MockFileSystem fs = createFileSystemForServiceName(service);
|
||||||
|
MockFileSystem multiFs =
|
||||||
|
createFileSystemForServiceName(null, fs, new FilterFileSystem(fs));
|
||||||
|
|
||||||
|
multiFs.addDelegationTokens(renewer, credentials);
|
||||||
|
verifyTokenFetch(multiFs, false);
|
||||||
|
verifyTokenFetch(fs, true);
|
||||||
|
|
||||||
|
assertEquals(1, credentials.numberOfTokens());
|
||||||
|
assertNotNull(credentials.getToken(service));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFsWithDuplicateChildrenTokenExists() throws Exception {
|
||||||
|
Credentials credentials = new Credentials();
|
||||||
|
Text service = new Text("singleTokenFs1");
|
||||||
|
Token<?> token = mock(Token.class);
|
||||||
|
credentials.addToken(service, token);
|
||||||
|
|
||||||
|
MockFileSystem fs = createFileSystemForServiceName(service);
|
||||||
|
MockFileSystem multiFs =
|
||||||
|
createFileSystemForServiceName(null, fs, new FilterFileSystem(fs));
|
||||||
|
|
||||||
|
multiFs.addDelegationTokens(renewer, credentials);
|
||||||
|
verifyTokenFetch(multiFs, false);
|
||||||
|
verifyTokenFetch(fs, false);
|
||||||
|
|
||||||
|
assertEquals(1, credentials.numberOfTokens());
|
||||||
|
assertSame(token, credentials.getToken(service));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFsWithChildTokensOneExists() throws Exception {
|
||||||
|
Credentials credentials = new Credentials();
|
||||||
|
Text service1 = new Text("singleTokenFs1");
|
||||||
|
Text service2 = new Text("singleTokenFs2");
|
||||||
|
Token<?> token = mock(Token.class);
|
||||||
|
credentials.addToken(service2, token);
|
||||||
|
|
||||||
|
MockFileSystem fs1 = createFileSystemForServiceName(service1);
|
||||||
|
MockFileSystem fs2 = createFileSystemForServiceName(service2);
|
||||||
|
MockFileSystem fs3 = createFileSystemForServiceName(null);
|
||||||
|
MockFileSystem multiFs = createFileSystemForServiceName(null, fs1, fs2, fs3);
|
||||||
|
|
||||||
|
multiFs.addDelegationTokens(renewer, credentials);
|
||||||
|
verifyTokenFetch(multiFs, false);
|
||||||
|
verifyTokenFetch(fs1, true);
|
||||||
|
verifyTokenFetch(fs2, false); // we had added its token to credentials
|
||||||
|
verifyTokenFetch(fs3, false);
|
||||||
|
|
||||||
|
assertEquals(2, credentials.numberOfTokens());
|
||||||
|
assertNotNull(credentials.getToken(service1));
|
||||||
|
assertSame(token, credentials.getToken(service2));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFsWithMyOwnAndChildTokens() throws Exception {
|
||||||
|
Credentials credentials = new Credentials();
|
||||||
|
Text service1 = new Text("singleTokenFs1");
|
||||||
|
Text service2 = new Text("singleTokenFs2");
|
||||||
|
Text myService = new Text("multiTokenFs");
|
||||||
|
Token<?> token = mock(Token.class);
|
||||||
|
credentials.addToken(service2, token);
|
||||||
|
|
||||||
|
MockFileSystem fs1 = createFileSystemForServiceName(service1);
|
||||||
|
MockFileSystem fs2 = createFileSystemForServiceName(service2);
|
||||||
|
MockFileSystem multiFs = createFileSystemForServiceName(myService, fs1, fs2);
|
||||||
|
|
||||||
|
multiFs.addDelegationTokens(renewer, credentials);
|
||||||
|
verifyTokenFetch(multiFs, true); // its own token and also of its children
|
||||||
|
verifyTokenFetch(fs1, true);
|
||||||
|
verifyTokenFetch(fs2, false); // we had added its token to credentials
|
||||||
|
|
||||||
|
assertEquals(3, credentials.numberOfTokens());
|
||||||
|
assertNotNull(credentials.getToken(myService));
|
||||||
|
assertNotNull(credentials.getToken(service1));
|
||||||
|
assertNotNull(credentials.getToken(service2));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFsWithMyOwnExistsAndChildTokens() throws Exception {
|
||||||
|
Credentials credentials = new Credentials();
|
||||||
|
Text service1 = new Text("singleTokenFs1");
|
||||||
|
Text service2 = new Text("singleTokenFs2");
|
||||||
|
Text myService = new Text("multiTokenFs");
|
||||||
|
Token<?> token = mock(Token.class);
|
||||||
|
credentials.addToken(myService, token);
|
||||||
|
|
||||||
|
MockFileSystem fs1 = createFileSystemForServiceName(service1);
|
||||||
|
MockFileSystem fs2 = createFileSystemForServiceName(service2);
|
||||||
|
MockFileSystem multiFs = createFileSystemForServiceName(myService, fs1, fs2);
|
||||||
|
|
||||||
|
multiFs.addDelegationTokens(renewer, credentials);
|
||||||
|
verifyTokenFetch(multiFs, false); // we had added its token to credentials
|
||||||
|
verifyTokenFetch(fs1, true);
|
||||||
|
verifyTokenFetch(fs2, true);
|
||||||
|
|
||||||
|
assertEquals(3, credentials.numberOfTokens());
|
||||||
|
assertSame(token, credentials.getToken(myService));
|
||||||
|
assertNotNull(credentials.getToken(service1));
|
||||||
|
assertNotNull(credentials.getToken(service2));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFsWithNestedDuplicatesChildren() throws Exception {
|
||||||
|
Credentials credentials = new Credentials();
|
||||||
|
Text service1 = new Text("singleTokenFs1");
|
||||||
|
Text service2 = new Text("singleTokenFs2");
|
||||||
|
Text service4 = new Text("singleTokenFs4");
|
||||||
|
Text multiService = new Text("multiTokenFs");
|
||||||
|
Token<?> token2 = mock(Token.class);
|
||||||
|
credentials.addToken(service2, token2);
|
||||||
|
|
||||||
|
MockFileSystem fs1 = createFileSystemForServiceName(service1);
|
||||||
|
MockFileSystem fs1B = createFileSystemForServiceName(service1);
|
||||||
|
MockFileSystem fs2 = createFileSystemForServiceName(service2);
|
||||||
|
MockFileSystem fs3 = createFileSystemForServiceName(null);
|
||||||
|
MockFileSystem fs4 = createFileSystemForServiceName(service4);
|
||||||
|
// now let's get dirty! ensure dup tokens aren't fetched even when
|
||||||
|
// repeated and dupped in a nested fs. fs4 is a real test of the drill
|
||||||
|
// down: multi-filter-multi-filter-filter-fs4.
|
||||||
|
MockFileSystem multiFs = createFileSystemForServiceName(multiService,
|
||||||
|
fs1, fs1B, fs2, fs2, new FilterFileSystem(fs3),
|
||||||
|
new FilterFileSystem(new FilterFileSystem(fs4)));
|
||||||
|
MockFileSystem superMultiFs = createFileSystemForServiceName(null,
|
||||||
|
fs1, fs1B, fs1, new FilterFileSystem(fs3), new FilterFileSystem(multiFs));
|
||||||
|
superMultiFs.addDelegationTokens(renewer, credentials);
|
||||||
|
verifyTokenFetch(superMultiFs, false); // does not have its own token
|
||||||
|
verifyTokenFetch(multiFs, true); // has its own token
|
||||||
|
verifyTokenFetch(fs1, true);
|
||||||
|
verifyTokenFetch(fs2, false); // we had added its token to credentials
|
||||||
|
verifyTokenFetch(fs3, false); // has no tokens
|
||||||
|
verifyTokenFetch(fs4, true);
|
||||||
|
|
||||||
|
assertEquals(4, credentials.numberOfTokens()); //fs1+fs2+fs4+multifs (fs3=0)
|
||||||
|
assertNotNull(credentials.getToken(service1));
|
||||||
|
assertNotNull(credentials.getToken(service2));
|
||||||
|
assertSame(token2, credentials.getToken(service2));
|
||||||
|
assertNotNull(credentials.getToken(multiService));
|
||||||
|
assertNotNull(credentials.getToken(service4));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static MockFileSystem createFileSystemForServiceName(
|
||||||
|
final Text service, final FileSystem... children) throws IOException {
|
||||||
|
final MockFileSystem fs = new MockFileSystem();
|
||||||
|
final MockFileSystem mockFs = fs.getRawFileSystem();
|
||||||
|
if (service != null) {
|
||||||
|
when(mockFs.getCanonicalServiceName()).thenReturn(service.toString());
|
||||||
|
when(mockFs.getDelegationToken(any(String.class))).thenAnswer(
|
||||||
|
new Answer<Token<?>>() {
|
||||||
|
@Override
|
||||||
|
public Token<?> answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
Token<?> token = new Token<TokenIdentifier>();
|
||||||
|
token.setService(service);
|
||||||
|
return token;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
when(mockFs.getChildFileSystems()).thenReturn(children);
|
||||||
|
return fs;
|
||||||
|
}
|
||||||
|
|
||||||
|
// check that canonical name was requested, if renewer is not null that
|
||||||
|
// a token was requested, and that child fs was invoked
|
||||||
|
private void verifyTokenFetch(MockFileSystem fs, boolean expected) throws IOException {
|
||||||
|
verify(fs.getRawFileSystem(), atLeast(1)).getCanonicalServiceName();
|
||||||
|
if (expected) {
|
||||||
|
verify(fs.getRawFileSystem()).getDelegationToken(renewer);
|
||||||
|
} else {
|
||||||
|
verify(fs.getRawFileSystem(), never()).getDelegationToken(any(String.class));
|
||||||
|
}
|
||||||
|
verify(fs.getRawFileSystem(), atLeast(1)).getChildFileSystems();
|
||||||
|
}
|
||||||
|
}
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.fs.Options.CreateOpts;
|
import org.apache.hadoop.fs.Options.CreateOpts;
|
||||||
import org.apache.hadoop.fs.Options.Rename;
|
import org.apache.hadoop.fs.Options.Rename;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
@ -185,6 +186,10 @@ public class TestFilterFileSystem {
|
||||||
public boolean cancelDeleteOnExit(Path f) throws IOException {
|
public boolean cancelDeleteOnExit(Path f) throws IOException {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
public Token<?>[] addDelegationTokens(String renewer, Credentials creds)
|
||||||
|
throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
public String getScheme() {
|
public String getScheme() {
|
||||||
return "dontcheck";
|
return "dontcheck";
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,10 +22,18 @@ import static org.junit.Assert.*;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FsConstants;
|
import org.apache.hadoop.fs.FsConstants;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.RawLocalFileSystem;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -38,6 +46,29 @@ import org.junit.Test;
|
||||||
public class TestViewFileSystemDelegationTokenSupport {
|
public class TestViewFileSystemDelegationTokenSupport {
|
||||||
|
|
||||||
private static final String MOUNT_TABLE_NAME = "vfs-cluster";
|
private static final String MOUNT_TABLE_NAME = "vfs-cluster";
|
||||||
|
static Configuration conf;
|
||||||
|
static FileSystem viewFs;
|
||||||
|
static FakeFileSystem fs1;
|
||||||
|
static FakeFileSystem fs2;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setup() throws Exception {
|
||||||
|
conf = ViewFileSystemTestSetup.createConfig();
|
||||||
|
fs1 = setupFileSystem(new URI("fs1:///"), FakeFileSystem.class);
|
||||||
|
fs2 = setupFileSystem(new URI("fs2:///"), FakeFileSystem.class);
|
||||||
|
viewFs = FileSystem.get(FsConstants.VIEWFS_URI, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
static FakeFileSystem setupFileSystem(URI uri, Class<? extends FileSystem> clazz)
|
||||||
|
throws Exception {
|
||||||
|
String scheme = uri.getScheme();
|
||||||
|
conf.set("fs."+scheme+".impl", clazz.getName());
|
||||||
|
FakeFileSystem fs = (FakeFileSystem)FileSystem.get(uri, conf);
|
||||||
|
// mount each fs twice, will later ensure 1 token/fs
|
||||||
|
ConfigUtil.addLink(conf, "/mounts/"+scheme+"-one", fs.getUri());
|
||||||
|
ConfigUtil.addLink(conf, "/mounts/"+scheme+"-two", fs.getUri());
|
||||||
|
return fs;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Regression test for HADOOP-8408.
|
* Regression test for HADOOP-8408.
|
||||||
|
@ -69,4 +100,92 @@ public class TestViewFileSystemDelegationTokenSupport {
|
||||||
assertNull(serviceName);
|
assertNull(serviceName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetChildFileSystems() throws Exception {
|
||||||
|
assertNull(fs1.getChildFileSystems());
|
||||||
|
assertNull(fs2.getChildFileSystems());
|
||||||
|
List<FileSystem> children = Arrays.asList(viewFs.getChildFileSystems());
|
||||||
|
assertEquals(2, children.size());
|
||||||
|
assertTrue(children.contains(fs1));
|
||||||
|
assertTrue(children.contains(fs2));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddDelegationTokens() throws Exception {
|
||||||
|
Credentials creds = new Credentials();
|
||||||
|
Token<?> fs1Tokens[] = addTokensWithCreds(fs1, creds);
|
||||||
|
assertEquals(1, fs1Tokens.length);
|
||||||
|
assertEquals(1, creds.numberOfTokens());
|
||||||
|
Token<?> fs2Tokens[] = addTokensWithCreds(fs2, creds);
|
||||||
|
assertEquals(1, fs2Tokens.length);
|
||||||
|
assertEquals(2, creds.numberOfTokens());
|
||||||
|
|
||||||
|
Credentials savedCreds = creds;
|
||||||
|
creds = new Credentials();
|
||||||
|
|
||||||
|
// should get the same set of tokens as explicitly fetched above
|
||||||
|
Token<?> viewFsTokens[] = viewFs.addDelegationTokens("me", creds);
|
||||||
|
assertEquals(2, viewFsTokens.length);
|
||||||
|
assertTrue(creds.getAllTokens().containsAll(savedCreds.getAllTokens()));
|
||||||
|
assertEquals(savedCreds.numberOfTokens(), creds.numberOfTokens());
|
||||||
|
// should get none, already have all tokens
|
||||||
|
viewFsTokens = viewFs.addDelegationTokens("me", creds);
|
||||||
|
assertEquals(0, viewFsTokens.length);
|
||||||
|
assertTrue(creds.getAllTokens().containsAll(savedCreds.getAllTokens()));
|
||||||
|
assertEquals(savedCreds.numberOfTokens(), creds.numberOfTokens());
|
||||||
|
}
|
||||||
|
|
||||||
|
Token<?>[] addTokensWithCreds(FileSystem fs, Credentials creds) throws Exception {
|
||||||
|
Credentials savedCreds;
|
||||||
|
|
||||||
|
savedCreds = new Credentials(creds);
|
||||||
|
Token<?> tokens[] = fs.addDelegationTokens("me", creds);
|
||||||
|
// test that we got the token we wanted, and that creds were modified
|
||||||
|
assertEquals(1, tokens.length);
|
||||||
|
assertEquals(fs.getCanonicalServiceName(), tokens[0].getService().toString());
|
||||||
|
assertTrue(creds.getAllTokens().contains(tokens[0]));
|
||||||
|
assertTrue(creds.getAllTokens().containsAll(savedCreds.getAllTokens()));
|
||||||
|
assertEquals(savedCreds.numberOfTokens()+1, creds.numberOfTokens());
|
||||||
|
|
||||||
|
// shouldn't get any new tokens since already in creds
|
||||||
|
savedCreds = new Credentials(creds);
|
||||||
|
Token<?> tokenRefetch[] = fs.addDelegationTokens("me", creds);
|
||||||
|
assertEquals(0, tokenRefetch.length);
|
||||||
|
assertTrue(creds.getAllTokens().containsAll(savedCreds.getAllTokens()));
|
||||||
|
assertEquals(savedCreds.numberOfTokens(), creds.numberOfTokens());
|
||||||
|
|
||||||
|
return tokens;
|
||||||
|
}
|
||||||
|
|
||||||
|
static class FakeFileSystem extends RawLocalFileSystem {
|
||||||
|
URI uri;
|
||||||
|
|
||||||
|
public void initialize(URI name, Configuration conf) throws IOException {
|
||||||
|
this.uri = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Path getInitialWorkingDirectory() {
|
||||||
|
return new Path("/"); // ctor calls getUri before the uri is inited...
|
||||||
|
}
|
||||||
|
|
||||||
|
public URI getUri() {
|
||||||
|
return uri;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getCanonicalServiceName() {
|
||||||
|
return String.valueOf(this.getUri()+"/"+this.hashCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Token<?> getDelegationToken(String renewer) throws IOException {
|
||||||
|
Token<?> token = new Token<TokenIdentifier>();
|
||||||
|
token.setService(new Text(getCanonicalServiceName()));
|
||||||
|
return token;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.fs.viewfs;
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
|
||||||
|
@ -137,9 +138,9 @@ public class ViewFileSystemBaseTest {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testGetDelegationTokens() throws IOException {
|
public void testGetDelegationTokens() throws IOException {
|
||||||
List<Token<?>> delTokens =
|
Token<?>[] delTokens =
|
||||||
fsView.getDelegationTokens("sanjay");
|
fsView.addDelegationTokens("sanjay", new Credentials());
|
||||||
Assert.assertEquals(getExpectedDelegationTokenCount(), delTokens.size());
|
Assert.assertEquals(getExpectedDelegationTokenCount(), delTokens.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
int getExpectedDelegationTokenCount() {
|
int getExpectedDelegationTokenCount() {
|
||||||
|
@ -150,29 +151,20 @@ public class ViewFileSystemBaseTest {
|
||||||
public void testGetDelegationTokensWithCredentials() throws IOException {
|
public void testGetDelegationTokensWithCredentials() throws IOException {
|
||||||
Credentials credentials = new Credentials();
|
Credentials credentials = new Credentials();
|
||||||
List<Token<?>> delTokens =
|
List<Token<?>> delTokens =
|
||||||
fsView.getDelegationTokens("sanjay", credentials);
|
Arrays.asList(fsView.addDelegationTokens("sanjay", credentials));
|
||||||
|
|
||||||
int expectedTokenCount = getExpectedDelegationTokenCountWithCredentials();
|
int expectedTokenCount = getExpectedDelegationTokenCountWithCredentials();
|
||||||
|
|
||||||
Assert.assertEquals(expectedTokenCount, delTokens.size());
|
Assert.assertEquals(expectedTokenCount, delTokens.size());
|
||||||
|
Credentials newCredentials = new Credentials();
|
||||||
for (int i = 0; i < expectedTokenCount / 2; i++) {
|
for (int i = 0; i < expectedTokenCount / 2; i++) {
|
||||||
Token<?> token = delTokens.get(i);
|
Token<?> token = delTokens.get(i);
|
||||||
credentials.addToken(token.getService(), token);
|
newCredentials.addToken(token.getService(), token);
|
||||||
}
|
}
|
||||||
|
|
||||||
List<Token<?>> delTokens2 =
|
List<Token<?>> delTokens2 =
|
||||||
fsView.getDelegationTokens("sanjay", credentials);
|
Arrays.asList(fsView.addDelegationTokens("sanjay", newCredentials));
|
||||||
Assert.assertEquals(expectedTokenCount, delTokens2.size());
|
Assert.assertEquals((expectedTokenCount + 1) / 2, delTokens2.size());
|
||||||
|
|
||||||
for (int i = 0; i < delTokens2.size(); i++) {
|
|
||||||
for (int j = 0; j < delTokens.size(); j++) {
|
|
||||||
if (delTokens.get(j) == delTokens2.get(i)) {
|
|
||||||
delTokens.remove(j);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Assert.assertEquals((expectedTokenCount + 1) / 2, delTokens.size());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int getExpectedDelegationTokenCountWithCredentials() {
|
int getExpectedDelegationTokenCountWithCredentials() {
|
||||||
|
|
|
@ -863,7 +863,6 @@ public class HttpFSFileSystem extends FileSystem
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
public Token<?> getDelegationToken(final String renewer)
|
public Token<?> getDelegationToken(final String renewer)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return doAsRealUserIfNecessary(new Callable<Token<?>>() {
|
return doAsRealUserIfNecessary(new Callable<Token<?>>() {
|
||||||
|
@ -875,19 +874,6 @@ public class HttpFSFileSystem extends FileSystem
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<Token<?>> getDelegationTokens(final String renewer)
|
|
||||||
throws IOException {
|
|
||||||
return doAsRealUserIfNecessary(new Callable<List<Token<?>>>() {
|
|
||||||
@Override
|
|
||||||
public List<Token<?>> call() throws Exception {
|
|
||||||
return HttpFSKerberosAuthenticator.
|
|
||||||
getDelegationTokens(uri, httpFSAddr, authToken, renewer);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public long renewDelegationToken(final Token<?> token) throws IOException {
|
public long renewDelegationToken(final Token<?> token) throws IOException {
|
||||||
return doAsRealUserIfNecessary(new Callable<Long>() {
|
return doAsRealUserIfNecessary(new Callable<Long>() {
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -66,7 +66,6 @@ public class HttpFSKerberosAuthenticator extends KerberosAuthenticator {
|
||||||
public static final String RENEWER_PARAM = "renewer";
|
public static final String RENEWER_PARAM = "renewer";
|
||||||
public static final String TOKEN_KIND = "HTTPFS_DELEGATION_TOKEN";
|
public static final String TOKEN_KIND = "HTTPFS_DELEGATION_TOKEN";
|
||||||
public static final String DELEGATION_TOKEN_JSON = "Token";
|
public static final String DELEGATION_TOKEN_JSON = "Token";
|
||||||
public static final String DELEGATION_TOKENS_JSON = "Tokens";
|
|
||||||
public static final String DELEGATION_TOKEN_URL_STRING_JSON = "urlString";
|
public static final String DELEGATION_TOKEN_URL_STRING_JSON = "urlString";
|
||||||
public static final String RENEW_DELEGATION_TOKEN_JSON = "long";
|
public static final String RENEW_DELEGATION_TOKEN_JSON = "long";
|
||||||
|
|
||||||
|
@ -76,7 +75,6 @@ public class HttpFSKerberosAuthenticator extends KerberosAuthenticator {
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public static enum DelegationTokenOperation {
|
public static enum DelegationTokenOperation {
|
||||||
GETDELEGATIONTOKEN(HTTP_GET, true),
|
GETDELEGATIONTOKEN(HTTP_GET, true),
|
||||||
GETDELEGATIONTOKENS(HTTP_GET, true),
|
|
||||||
RENEWDELEGATIONTOKEN(HTTP_PUT, true),
|
RENEWDELEGATIONTOKEN(HTTP_PUT, true),
|
||||||
CANCELDELEGATIONTOKEN(HTTP_PUT, false);
|
CANCELDELEGATIONTOKEN(HTTP_PUT, false);
|
||||||
|
|
||||||
|
@ -121,10 +119,11 @@ public class HttpFSKerberosAuthenticator extends KerberosAuthenticator {
|
||||||
|
|
||||||
public static final String OP_PARAM = "op";
|
public static final String OP_PARAM = "op";
|
||||||
|
|
||||||
private static List<Token<?>> getDelegationTokens(URI fsURI,
|
public static Token<?> getDelegationToken(URI fsURI,
|
||||||
InetSocketAddress httpFSAddr, DelegationTokenOperation op,
|
InetSocketAddress httpFSAddr, AuthenticatedURL.Token token,
|
||||||
AuthenticatedURL.Token token, String renewer)
|
String renewer) throws IOException {
|
||||||
throws IOException {
|
DelegationTokenOperation op =
|
||||||
|
DelegationTokenOperation.GETDELEGATIONTOKEN;
|
||||||
Map<String, String> params = new HashMap<String, String>();
|
Map<String, String> params = new HashMap<String, String>();
|
||||||
params.put(OP_PARAM, op.toString());
|
params.put(OP_PARAM, op.toString());
|
||||||
params.put(RENEWER_PARAM,renewer);
|
params.put(RENEWER_PARAM,renewer);
|
||||||
|
@ -135,56 +134,20 @@ public class HttpFSKerberosAuthenticator extends KerberosAuthenticator {
|
||||||
HttpURLConnection conn = aUrl.openConnection(url, token);
|
HttpURLConnection conn = aUrl.openConnection(url, token);
|
||||||
conn.setRequestMethod(op.getHttpMethod());
|
conn.setRequestMethod(op.getHttpMethod());
|
||||||
HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
||||||
List<String> list = new ArrayList<String>();
|
JSONObject json = (JSONObject) ((JSONObject)
|
||||||
if (op == DelegationTokenOperation.GETDELEGATIONTOKEN) {
|
HttpFSUtils.jsonParse(conn)).get(DELEGATION_TOKEN_JSON);
|
||||||
JSONObject json = (JSONObject) ((JSONObject)
|
String tokenStr = (String)
|
||||||
HttpFSUtils.jsonParse(conn)).get(DELEGATION_TOKEN_JSON);
|
json.get(DELEGATION_TOKEN_URL_STRING_JSON);
|
||||||
String tokenStr = (String)
|
Token<AbstractDelegationTokenIdentifier> dToken =
|
||||||
json.get(DELEGATION_TOKEN_URL_STRING_JSON);
|
new Token<AbstractDelegationTokenIdentifier>();
|
||||||
list.add(tokenStr);
|
dToken.decodeFromUrlString(tokenStr);
|
||||||
}
|
SecurityUtil.setTokenService(dToken, httpFSAddr);
|
||||||
else if (op == DelegationTokenOperation.GETDELEGATIONTOKENS) {
|
return dToken;
|
||||||
JSONObject json = (JSONObject) ((JSONObject)
|
|
||||||
HttpFSUtils.jsonParse(conn)).get(DELEGATION_TOKENS_JSON);
|
|
||||||
JSONArray array = (JSONArray) json.get(DELEGATION_TOKEN_JSON);
|
|
||||||
for (Object element : array) {
|
|
||||||
String tokenStr = (String)
|
|
||||||
((Map) element).get(DELEGATION_TOKEN_URL_STRING_JSON);
|
|
||||||
list.add(tokenStr);
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
|
||||||
throw new IllegalArgumentException("Invalid operation: " +
|
|
||||||
op.toString());
|
|
||||||
}
|
|
||||||
List<Token<?>> dTokens = new ArrayList<Token<?>>();
|
|
||||||
for (String tokenStr : list) {
|
|
||||||
Token<AbstractDelegationTokenIdentifier> dToken =
|
|
||||||
new Token<AbstractDelegationTokenIdentifier>();
|
|
||||||
dToken.decodeFromUrlString(tokenStr);
|
|
||||||
dTokens.add(dToken);
|
|
||||||
SecurityUtil.setTokenService(dToken, httpFSAddr);
|
|
||||||
}
|
|
||||||
return dTokens;
|
|
||||||
} catch (AuthenticationException ex) {
|
} catch (AuthenticationException ex) {
|
||||||
throw new IOException(ex.toString(), ex);
|
throw new IOException(ex.toString(), ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static List<Token<?>> getDelegationTokens(URI fsURI,
|
|
||||||
InetSocketAddress httpFSAddr, AuthenticatedURL.Token token,
|
|
||||||
String renewer) throws IOException {
|
|
||||||
return getDelegationTokens(fsURI, httpFSAddr,
|
|
||||||
DelegationTokenOperation.GETDELEGATIONTOKENS, token, renewer);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Token<?> getDelegationToken(URI fsURI,
|
|
||||||
InetSocketAddress httpFSAddr, AuthenticatedURL.Token token,
|
|
||||||
String renewer) throws IOException {
|
|
||||||
return getDelegationTokens(fsURI, httpFSAddr,
|
|
||||||
DelegationTokenOperation.GETDELEGATIONTOKENS, token, renewer).get(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static long renewDelegationToken(URI fsURI,
|
public static long renewDelegationToken(URI fsURI,
|
||||||
AuthenticatedURL.Token token, Token<?> dToken) throws IOException {
|
AuthenticatedURL.Token token, Token<?> dToken) throws IOException {
|
||||||
Map<String, String> params = new HashMap<String, String>();
|
Map<String, String> params = new HashMap<String, String>();
|
||||||
|
|
|
@ -63,8 +63,6 @@ public class HttpFSKerberosAuthenticationHandler
|
||||||
static {
|
static {
|
||||||
DELEGATION_TOKEN_OPS.add(
|
DELEGATION_TOKEN_OPS.add(
|
||||||
DelegationTokenOperation.GETDELEGATIONTOKEN.toString());
|
DelegationTokenOperation.GETDELEGATIONTOKEN.toString());
|
||||||
DELEGATION_TOKEN_OPS.add(
|
|
||||||
DelegationTokenOperation.GETDELEGATIONTOKENS.toString());
|
|
||||||
DELEGATION_TOKEN_OPS.add(
|
DELEGATION_TOKEN_OPS.add(
|
||||||
DelegationTokenOperation.RENEWDELEGATIONTOKEN.toString());
|
DelegationTokenOperation.RENEWDELEGATIONTOKEN.toString());
|
||||||
DELEGATION_TOKEN_OPS.add(
|
DELEGATION_TOKEN_OPS.add(
|
||||||
|
@ -111,7 +109,6 @@ public class HttpFSKerberosAuthenticationHandler
|
||||||
Map map = null;
|
Map map = null;
|
||||||
switch (dtOp) {
|
switch (dtOp) {
|
||||||
case GETDELEGATIONTOKEN:
|
case GETDELEGATIONTOKEN:
|
||||||
case GETDELEGATIONTOKENS:
|
|
||||||
String renewerParam =
|
String renewerParam =
|
||||||
request.getParameter(HttpFSKerberosAuthenticator.RENEWER_PARAM);
|
request.getParameter(HttpFSKerberosAuthenticator.RENEWER_PARAM);
|
||||||
if (renewerParam == null) {
|
if (renewerParam == null) {
|
||||||
|
@ -119,11 +116,7 @@ public class HttpFSKerberosAuthenticationHandler
|
||||||
}
|
}
|
||||||
Token<?> dToken = tokenManager.createToken(
|
Token<?> dToken = tokenManager.createToken(
|
||||||
UserGroupInformation.getCurrentUser(), renewerParam);
|
UserGroupInformation.getCurrentUser(), renewerParam);
|
||||||
if (dtOp == DelegationTokenOperation.GETDELEGATIONTOKEN) {
|
map = delegationTokenToJSON(dToken);
|
||||||
map = delegationTokenToJSON(dToken);
|
|
||||||
} else {
|
|
||||||
map = delegationTokensToJSON(Arrays.asList((Token)dToken));
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
case RENEWDELEGATIONTOKEN:
|
case RENEWDELEGATIONTOKEN:
|
||||||
case CANCELDELEGATIONTOKEN:
|
case CANCELDELEGATIONTOKEN:
|
||||||
|
@ -191,23 +184,6 @@ public class HttpFSKerberosAuthenticationHandler
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private static Map delegationTokensToJSON(List<Token> tokens)
|
|
||||||
throws IOException {
|
|
||||||
List list = new ArrayList();
|
|
||||||
for (Token token : tokens) {
|
|
||||||
Map map = new HashMap();
|
|
||||||
map.put(HttpFSKerberosAuthenticator.DELEGATION_TOKEN_URL_STRING_JSON,
|
|
||||||
token.encodeToUrlString());
|
|
||||||
list.add(map);
|
|
||||||
}
|
|
||||||
Map map = new HashMap();
|
|
||||||
map.put(HttpFSKerberosAuthenticator.DELEGATION_TOKEN_JSON, list);
|
|
||||||
Map response = new LinkedHashMap();
|
|
||||||
response.put(HttpFSKerberosAuthenticator.DELEGATION_TOKENS_JSON, map);
|
|
||||||
return response;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Authenticates a request looking for the <code>delegation</code>
|
* Authenticates a request looking for the <code>delegation</code>
|
||||||
* query-string parameter and verifying it is a valid token. If there is not
|
* query-string parameter and verifying it is a valid token. If there is not
|
||||||
|
|
|
@ -68,10 +68,8 @@ public class TestHttpFSKerberosAuthenticationHandler extends HFSTestCase {
|
||||||
|
|
||||||
testNonManagementOperation(handler);
|
testNonManagementOperation(handler);
|
||||||
testManagementOperationErrors(handler);
|
testManagementOperationErrors(handler);
|
||||||
testGetToken(handler, false, null);
|
testGetToken(handler, null);
|
||||||
testGetToken(handler, true, null);
|
testGetToken(handler, "foo");
|
||||||
testGetToken(handler, false, "foo");
|
|
||||||
testGetToken(handler, true, "foo");
|
|
||||||
testCancelToken(handler);
|
testCancelToken(handler);
|
||||||
testRenewToken(handler);
|
testRenewToken(handler);
|
||||||
|
|
||||||
|
@ -115,12 +113,9 @@ public class TestHttpFSKerberosAuthenticationHandler extends HFSTestCase {
|
||||||
Mockito.contains("requires SPNEGO"));
|
Mockito.contains("requires SPNEGO"));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testGetToken(AuthenticationHandler handler, boolean tokens,
|
private void testGetToken(AuthenticationHandler handler, String renewer)
|
||||||
String renewer)
|
|
||||||
throws Exception {
|
throws Exception {
|
||||||
DelegationTokenOperation op =
|
DelegationTokenOperation op = DelegationTokenOperation.GETDELEGATIONTOKEN;
|
||||||
(tokens) ? DelegationTokenOperation.GETDELEGATIONTOKENS
|
|
||||||
: DelegationTokenOperation.GETDELEGATIONTOKEN;
|
|
||||||
HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
|
HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
|
||||||
HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
|
HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
|
||||||
Mockito.when(request.getParameter(HttpFSFileSystem.OP_PARAM)).
|
Mockito.when(request.getParameter(HttpFSFileSystem.OP_PARAM)).
|
||||||
|
@ -148,23 +143,13 @@ public class TestHttpFSKerberosAuthenticationHandler extends HFSTestCase {
|
||||||
Mockito.verify(response).setContentType(MediaType.APPLICATION_JSON);
|
Mockito.verify(response).setContentType(MediaType.APPLICATION_JSON);
|
||||||
pwriter.close();
|
pwriter.close();
|
||||||
String responseOutput = writer.toString();
|
String responseOutput = writer.toString();
|
||||||
String tokenLabel = (tokens)
|
String tokenLabel = HttpFSKerberosAuthenticator.DELEGATION_TOKEN_JSON;
|
||||||
? HttpFSKerberosAuthenticator.DELEGATION_TOKENS_JSON
|
Assert.assertTrue(responseOutput.contains(tokenLabel));
|
||||||
: HttpFSKerberosAuthenticator.DELEGATION_TOKEN_JSON;
|
|
||||||
if (tokens) {
|
|
||||||
Assert.assertTrue(responseOutput.contains(tokenLabel));
|
|
||||||
} else {
|
|
||||||
Assert.assertTrue(responseOutput.contains(tokenLabel));
|
|
||||||
}
|
|
||||||
Assert.assertTrue(responseOutput.contains(
|
Assert.assertTrue(responseOutput.contains(
|
||||||
HttpFSKerberosAuthenticator.DELEGATION_TOKEN_URL_STRING_JSON));
|
HttpFSKerberosAuthenticator.DELEGATION_TOKEN_URL_STRING_JSON));
|
||||||
JSONObject json = (JSONObject) new JSONParser().parse(responseOutput);
|
JSONObject json = (JSONObject) new JSONParser().parse(responseOutput);
|
||||||
json = (JSONObject) json.get(tokenLabel);
|
json = (JSONObject) json.get(tokenLabel);
|
||||||
String tokenStr;
|
String tokenStr;
|
||||||
if (tokens) {
|
|
||||||
json = (JSONObject) ((JSONArray)
|
|
||||||
json.get(HttpFSKerberosAuthenticator.DELEGATION_TOKEN_JSON)).get(0);
|
|
||||||
}
|
|
||||||
tokenStr = (String)
|
tokenStr = (String)
|
||||||
json.get(HttpFSKerberosAuthenticator.DELEGATION_TOKEN_URL_STRING_JSON);
|
json.get(HttpFSKerberosAuthenticator.DELEGATION_TOKEN_URL_STRING_JSON);
|
||||||
Token<DelegationTokenIdentifier> dt = new Token<DelegationTokenIdentifier>();
|
Token<DelegationTokenIdentifier> dt = new Token<DelegationTokenIdentifier>();
|
||||||
|
|
|
@ -222,10 +222,11 @@ public class TestHttpFSWithKerberos extends HFSTestCase {
|
||||||
URI uri = new URI( "webhdfs://" +
|
URI uri = new URI( "webhdfs://" +
|
||||||
TestJettyHelper.getJettyURL().toURI().getAuthority());
|
TestJettyHelper.getJettyURL().toURI().getAuthority());
|
||||||
FileSystem fs = FileSystem.get(uri, conf);
|
FileSystem fs = FileSystem.get(uri, conf);
|
||||||
Token<?> token = fs.getDelegationToken("foo");
|
Token<?> tokens[] = fs.addDelegationTokens("foo", null);
|
||||||
fs.close();
|
fs.close();
|
||||||
|
Assert.assertEquals(1, tokens.length);
|
||||||
fs = FileSystem.get(uri, conf);
|
fs = FileSystem.get(uri, conf);
|
||||||
((DelegationTokenRenewer.Renewable) fs).setDelegationToken(token);
|
((DelegationTokenRenewer.Renewable) fs).setDelegationToken(tokens[0]);
|
||||||
fs.listStatus(new Path("/"));
|
fs.listStatus(new Path("/"));
|
||||||
fs.close();
|
fs.close();
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,6 @@ import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
@ -809,14 +808,6 @@ public class DistributedFileSystem extends FileSystem {
|
||||||
return getDelegationToken(renewer.toString());
|
return getDelegationToken(renewer.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // FileSystem
|
|
||||||
public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
|
|
||||||
List<Token<?>> tokenList = new ArrayList<Token<?>>();
|
|
||||||
Token<DelegationTokenIdentifier> token = this.getDelegationToken(renewer);
|
|
||||||
tokenList.add(token);
|
|
||||||
return tokenList;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Renew an existing delegation token.
|
* Renew an existing delegation token.
|
||||||
*
|
*
|
||||||
|
|
|
@ -669,17 +669,6 @@ public class NamenodeWebHdfsMethods {
|
||||||
final String js = JsonUtil.toJsonString(token);
|
final String js = JsonUtil.toJsonString(token);
|
||||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||||
}
|
}
|
||||||
case GETDELEGATIONTOKENS:
|
|
||||||
{
|
|
||||||
if (delegation.getValue() != null) {
|
|
||||||
throw new IllegalArgumentException(delegation.getName()
|
|
||||||
+ " parameter is not null.");
|
|
||||||
}
|
|
||||||
final Token<? extends TokenIdentifier>[] tokens = new Token<?>[1];
|
|
||||||
tokens[0] = generateDelegationToken(namenode, ugi, renewer.getValue());
|
|
||||||
final String js = JsonUtil.toJsonString(tokens);
|
|
||||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
|
||||||
}
|
|
||||||
case GETHOMEDIRECTORY:
|
case GETHOMEDIRECTORY:
|
||||||
{
|
{
|
||||||
final String js = JsonUtil.toJsonString(
|
final String js = JsonUtil.toJsonString(
|
||||||
|
|
|
@ -188,12 +188,13 @@ public class DelegationTokenFetcher {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
FileSystem fs = FileSystem.get(conf);
|
FileSystem fs = FileSystem.get(conf);
|
||||||
Token<?> token = fs.getDelegationToken(renewer);
|
|
||||||
Credentials cred = new Credentials();
|
Credentials cred = new Credentials();
|
||||||
cred.addToken(token.getService(), token);
|
Token<?> tokens[] = fs.addDelegationTokens(renewer, cred);
|
||||||
cred.writeTokenStorageFile(tokenFile, conf);
|
cred.writeTokenStorageFile(tokenFile, conf);
|
||||||
System.out.println("Fetched token for " + token.getService()
|
for (Token<?> token : tokens) {
|
||||||
+ " into " + tokenFile);
|
System.out.println("Fetched token for " + token.getService()
|
||||||
|
+ " into " + tokenFile);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -30,7 +30,6 @@ import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.StringTokenizer;
|
import java.util.StringTokenizer;
|
||||||
|
|
||||||
|
@ -376,8 +375,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
+ Param.toSortedString("&", parameters);
|
+ Param.toSortedString("&", parameters);
|
||||||
final URL url;
|
final URL url;
|
||||||
if (op == PutOpParam.Op.RENEWDELEGATIONTOKEN
|
if (op == PutOpParam.Op.RENEWDELEGATIONTOKEN
|
||||||
|| op == GetOpParam.Op.GETDELEGATIONTOKEN
|
|| op == GetOpParam.Op.GETDELEGATIONTOKEN) {
|
||||||
|| op == GetOpParam.Op.GETDELEGATIONTOKENS) {
|
|
||||||
// Skip adding delegation token for getting or renewing delegation token,
|
// Skip adding delegation token for getting or renewing delegation token,
|
||||||
// because these operations require kerberos authentication.
|
// because these operations require kerberos authentication.
|
||||||
url = getNamenodeURL(path, query);
|
url = getNamenodeURL(path, query);
|
||||||
|
@ -840,10 +838,9 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
return statuses;
|
return statuses;
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
@Override
|
@Override
|
||||||
public Token<DelegationTokenIdentifier> getDelegationToken(final String renewer
|
public Token<DelegationTokenIdentifier> getDelegationToken(
|
||||||
) throws IOException {
|
final String renewer) throws IOException {
|
||||||
final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN;
|
final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN;
|
||||||
final Map<?, ?> m = run(op, null, new RenewerParam(renewer));
|
final Map<?, ?> m = run(op, null, new RenewerParam(renewer));
|
||||||
final Token<DelegationTokenIdentifier> token = JsonUtil.toDelegationToken(m);
|
final Token<DelegationTokenIdentifier> token = JsonUtil.toDelegationToken(m);
|
||||||
|
@ -851,18 +848,6 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
return token;
|
return token;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<Token<?>> getDelegationTokens(final String renewer
|
|
||||||
) throws IOException {
|
|
||||||
final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKENS;
|
|
||||||
final Map<?, ?> m = run(op, null, new RenewerParam(renewer));
|
|
||||||
final List<Token<?>> tokens = JsonUtil.toTokenList(m);
|
|
||||||
for(Token<?> t : tokens) {
|
|
||||||
SecurityUtil.setTokenService(t, nnAddr);
|
|
||||||
}
|
|
||||||
return tokens;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Token<?> getRenewToken() {
|
public Token<?> getRenewToken() {
|
||||||
return delegationToken;
|
return delegationToken;
|
||||||
|
|
|
@ -32,7 +32,6 @@ public class GetOpParam extends HttpOpParam<GetOpParam.Op> {
|
||||||
|
|
||||||
GETHOMEDIRECTORY(false, HttpURLConnection.HTTP_OK),
|
GETHOMEDIRECTORY(false, HttpURLConnection.HTTP_OK),
|
||||||
GETDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK),
|
GETDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK),
|
||||||
GETDELEGATIONTOKENS(false, HttpURLConnection.HTTP_OK),
|
|
||||||
|
|
||||||
/** GET_BLOCK_LOCATIONS is a private unstable op. */
|
/** GET_BLOCK_LOCATIONS is a private unstable op. */
|
||||||
GET_BLOCK_LOCATIONS(false, HttpURLConnection.HTTP_OK),
|
GET_BLOCK_LOCATIONS(false, HttpURLConnection.HTTP_OK),
|
||||||
|
|
|
@ -59,7 +59,9 @@ public class TestViewFileSystemAtHdfsRoot extends ViewFileSystemBaseTest {
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void clusterShutdownAtEnd() throws Exception {
|
public static void clusterShutdownAtEnd() throws Exception {
|
||||||
cluster.shutdown();
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -84,7 +86,7 @@ public class TestViewFileSystemAtHdfsRoot extends ViewFileSystemBaseTest {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
int getExpectedDelegationTokenCount() {
|
int getExpectedDelegationTokenCount() {
|
||||||
return 8;
|
return 1; // all point to the same fs so 1 unique token
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -117,7 +117,7 @@ public class TestViewFileSystemHdfs extends ViewFileSystemBaseTest {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
int getExpectedDelegationTokenCount() {
|
int getExpectedDelegationTokenCount() {
|
||||||
return 9;
|
return 2; // Mount points to 2 unique hdfs
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -28,8 +28,6 @@ import java.io.DataInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
|
@ -50,6 +48,7 @@ import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMetho
|
||||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
@ -154,25 +153,18 @@ public class TestDelegationToken {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDelegationTokenDFSApi() throws Exception {
|
public void testAddDelegationTokensDFSApi() throws Exception {
|
||||||
|
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("JobTracker");
|
||||||
DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
|
DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
|
||||||
final Token<DelegationTokenIdentifier> token = dfs.getDelegationToken("JobTracker");
|
Credentials creds = new Credentials();
|
||||||
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
|
final Token<?> tokens[] = dfs.addDelegationTokens("JobTracker", creds);
|
||||||
byte[] tokenId = token.getIdentifier();
|
Assert.assertEquals(1, tokens.length);
|
||||||
identifier.readFields(new DataInputStream(
|
Assert.assertEquals(1, creds.numberOfTokens());
|
||||||
new ByteArrayInputStream(tokenId)));
|
checkTokenIdentifier(ugi, tokens[0]);
|
||||||
LOG.info("A valid token should have non-null password, and should be renewed successfully");
|
|
||||||
Assert.assertTrue(null != dtSecretManager.retrievePassword(identifier));
|
final Token<?> tokens2[] = dfs.addDelegationTokens("JobTracker", creds);
|
||||||
dtSecretManager.renewToken(token, "JobTracker");
|
Assert.assertEquals(0, tokens2.length); // already have token
|
||||||
UserGroupInformation.createRemoteUser("JobTracker").doAs(
|
Assert.assertEquals(1, creds.numberOfTokens());
|
||||||
new PrivilegedExceptionAction<Object>() {
|
|
||||||
@Override
|
|
||||||
public Object run() throws Exception {
|
|
||||||
token.renew(config);
|
|
||||||
token.cancel(config);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
|
@ -192,51 +184,27 @@ public class TestDelegationToken {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
{ //test getDelegationToken(..)
|
{ //test addDelegationTokens(..)
|
||||||
final Token<DelegationTokenIdentifier> token = webhdfs
|
Credentials creds = new Credentials();
|
||||||
.getDelegationToken("JobTracker");
|
final Token<?> tokens[] = webhdfs.addDelegationTokens("JobTracker", creds);
|
||||||
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
|
Assert.assertEquals(1, tokens.length);
|
||||||
byte[] tokenId = token.getIdentifier();
|
Assert.assertEquals(1, creds.numberOfTokens());
|
||||||
identifier.readFields(new DataInputStream(new ByteArrayInputStream(tokenId)));
|
Assert.assertSame(tokens[0], creds.getAllTokens().iterator().next());
|
||||||
LOG.info("A valid token should have non-null password, and should be renewed successfully");
|
checkTokenIdentifier(ugi, tokens[0]);
|
||||||
Assert.assertTrue(null != dtSecretManager.retrievePassword(identifier));
|
final Token<?> tokens2[] = webhdfs.addDelegationTokens("JobTracker", creds);
|
||||||
dtSecretManager.renewToken(token, "JobTracker");
|
Assert.assertEquals(0, tokens2.length);
|
||||||
ugi.doAs(new PrivilegedExceptionAction<Void>() {
|
|
||||||
@Override
|
|
||||||
public Void run() throws Exception {
|
|
||||||
token.renew(config);
|
|
||||||
token.cancel(config);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
{ //test getDelegationTokens(..)
|
|
||||||
final List<Token<?>> tokenlist = webhdfs.getDelegationTokens("JobTracker");
|
|
||||||
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
final Token<DelegationTokenIdentifier> token = (Token<DelegationTokenIdentifier>)tokenlist.get(0);
|
|
||||||
byte[] tokenId = token.getIdentifier();
|
|
||||||
identifier.readFields(new DataInputStream(new ByteArrayInputStream(tokenId)));
|
|
||||||
LOG.info("A valid token should have non-null password, and should be renewed successfully");
|
|
||||||
Assert.assertTrue(null != dtSecretManager.retrievePassword(identifier));
|
|
||||||
dtSecretManager.renewToken(token, "JobTracker");
|
|
||||||
ugi.doAs(new PrivilegedExceptionAction<Void>() {
|
|
||||||
@Override
|
|
||||||
public Void run() throws Exception {
|
|
||||||
token.renew(config);
|
|
||||||
token.cancel(config);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDelegationTokenWithDoAs() throws Exception {
|
public void testDelegationTokenWithDoAs() throws Exception {
|
||||||
final DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
|
final DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
|
||||||
final Token<DelegationTokenIdentifier> token =
|
final Credentials creds = new Credentials();
|
||||||
dfs.getDelegationToken("JobTracker");
|
final Token<?> tokens[] = dfs.addDelegationTokens("JobTracker", creds);
|
||||||
|
Assert.assertEquals(1, tokens.length);
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
final Token<DelegationTokenIdentifier> token =
|
||||||
|
(Token<DelegationTokenIdentifier>) tokens[0];
|
||||||
final UserGroupInformation longUgi = UserGroupInformation
|
final UserGroupInformation longUgi = UserGroupInformation
|
||||||
.createRemoteUser("JobTracker/foo.com@FOO.COM");
|
.createRemoteUser("JobTracker/foo.com@FOO.COM");
|
||||||
final UserGroupInformation shortUgi = UserGroupInformation
|
final UserGroupInformation shortUgi = UserGroupInformation
|
||||||
|
@ -326,4 +294,33 @@ public class TestDelegationToken {
|
||||||
assertFalse(nn.isInSafeMode());
|
assertFalse(nn.isInSafeMode());
|
||||||
assertTrue(sm.isRunning());
|
assertTrue(sm.isRunning());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private void checkTokenIdentifier(UserGroupInformation ugi, final Token<?> token)
|
||||||
|
throws Exception {
|
||||||
|
Assert.assertNotNull(token);
|
||||||
|
// should be able to use token.decodeIdentifier() but webhdfs isn't
|
||||||
|
// registered with the service loader for token decoding
|
||||||
|
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
|
||||||
|
byte[] tokenId = token.getIdentifier();
|
||||||
|
DataInputStream in = new DataInputStream(new ByteArrayInputStream(tokenId));
|
||||||
|
try {
|
||||||
|
identifier.readFields(in);
|
||||||
|
} finally {
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
|
Assert.assertNotNull(identifier);
|
||||||
|
LOG.info("A valid token should have non-null password, and should be renewed successfully");
|
||||||
|
Assert.assertTrue(null != dtSecretManager.retrievePassword(identifier));
|
||||||
|
dtSecretManager.renewToken((Token<DelegationTokenIdentifier>) token, "JobTracker");
|
||||||
|
ugi.doAs(
|
||||||
|
new PrivilegedExceptionAction<Object>() {
|
||||||
|
@Override
|
||||||
|
public Object run() throws Exception {
|
||||||
|
token.renew(config);
|
||||||
|
token.cancel(config);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -135,15 +135,15 @@ public class TestDelegationTokenForProxyUser {
|
||||||
final UserGroupInformation proxyUgi = UserGroupInformation
|
final UserGroupInformation proxyUgi = UserGroupInformation
|
||||||
.createProxyUserForTesting(PROXY_USER, ugi, GROUP_NAMES);
|
.createProxyUserForTesting(PROXY_USER, ugi, GROUP_NAMES);
|
||||||
try {
|
try {
|
||||||
Token<DelegationTokenIdentifier> token = proxyUgi
|
Token<?>[] tokens = proxyUgi
|
||||||
.doAs(new PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
|
.doAs(new PrivilegedExceptionAction<Token<?>[]>() {
|
||||||
@Override
|
@Override
|
||||||
public Token<DelegationTokenIdentifier> run() throws IOException {
|
public Token<?>[] run() throws IOException {
|
||||||
return cluster.getFileSystem().getDelegationToken("RenewerUser");
|
return cluster.getFileSystem().addDelegationTokens("RenewerUser", null);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
|
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
|
||||||
byte[] tokenId = token.getIdentifier();
|
byte[] tokenId = tokens[0].getIdentifier();
|
||||||
identifier.readFields(new DataInputStream(new ByteArrayInputStream(
|
identifier.readFields(new DataInputStream(new ByteArrayInputStream(
|
||||||
tokenId)));
|
tokenId)));
|
||||||
Assert.assertEquals(identifier.getUser().getUserName(), PROXY_USER);
|
Assert.assertEquals(identifier.getUser().getUserName(), PROXY_USER);
|
||||||
|
|
|
@ -41,7 +41,6 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||||
import org.apache.hadoop.hdfs.server.common.Util;
|
import org.apache.hadoop.hdfs.server.common.Util;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
||||||
|
@ -195,20 +194,21 @@ public class OfflineEditsViewerHelper {
|
||||||
Path pathSymlink = new Path("/file_symlink");
|
Path pathSymlink = new Path("/file_symlink");
|
||||||
fc.createSymlink(pathConcatTarget, pathSymlink, false);
|
fc.createSymlink(pathConcatTarget, pathSymlink, false);
|
||||||
// OP_GET_DELEGATION_TOKEN 18
|
// OP_GET_DELEGATION_TOKEN 18
|
||||||
final Token<DelegationTokenIdentifier> token =
|
|
||||||
dfs.getDelegationToken("JobTracker");
|
|
||||||
// OP_RENEW_DELEGATION_TOKEN 19
|
// OP_RENEW_DELEGATION_TOKEN 19
|
||||||
// OP_CANCEL_DELEGATION_TOKEN 20
|
// OP_CANCEL_DELEGATION_TOKEN 20
|
||||||
// see TestDelegationToken.java
|
// see TestDelegationToken.java
|
||||||
// fake the user to renew token for
|
// fake the user to renew token for
|
||||||
|
final Token<?>[] tokens = dfs.addDelegationTokens("JobTracker", null);
|
||||||
UserGroupInformation longUgi = UserGroupInformation.createRemoteUser(
|
UserGroupInformation longUgi = UserGroupInformation.createRemoteUser(
|
||||||
"JobTracker/foo.com@FOO.COM");
|
"JobTracker/foo.com@FOO.COM");
|
||||||
try {
|
try {
|
||||||
longUgi.doAs(new PrivilegedExceptionAction<Object>() {
|
longUgi.doAs(new PrivilegedExceptionAction<Object>() {
|
||||||
@Override
|
@Override
|
||||||
public Object run() throws IOException, InterruptedException {
|
public Object run() throws IOException, InterruptedException {
|
||||||
token.renew(config);
|
for (Token<?> token : tokens) {
|
||||||
token.cancel(config);
|
token.renew(config);
|
||||||
|
token.cancel(config);
|
||||||
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -116,7 +116,8 @@ public class TestDelegationTokensWithHA {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDelegationTokenDFSApi() throws Exception {
|
public void testDelegationTokenDFSApi() throws Exception {
|
||||||
Token<DelegationTokenIdentifier> token = dfs.getDelegationToken("JobTracker");
|
final Token<DelegationTokenIdentifier> token =
|
||||||
|
getDelegationToken(fs, "JobTracker");
|
||||||
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
|
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
|
||||||
byte[] tokenId = token.getIdentifier();
|
byte[] tokenId = token.getIdentifier();
|
||||||
identifier.readFields(new DataInputStream(
|
identifier.readFields(new DataInputStream(
|
||||||
|
@ -157,8 +158,8 @@ public class TestDelegationTokensWithHA {
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
@Test
|
@Test
|
||||||
public void testDelegationTokenWithDoAs() throws Exception {
|
public void testDelegationTokenWithDoAs() throws Exception {
|
||||||
final Token<DelegationTokenIdentifier> token =
|
final Token<DelegationTokenIdentifier> token =
|
||||||
dfs.getDelegationToken("JobTracker");
|
getDelegationToken(fs, "JobTracker");
|
||||||
final UserGroupInformation longUgi = UserGroupInformation
|
final UserGroupInformation longUgi = UserGroupInformation
|
||||||
.createRemoteUser("JobTracker/foo.com@FOO.COM");
|
.createRemoteUser("JobTracker/foo.com@FOO.COM");
|
||||||
final UserGroupInformation shortUgi = UserGroupInformation
|
final UserGroupInformation shortUgi = UserGroupInformation
|
||||||
|
@ -196,8 +197,8 @@ public class TestDelegationTokensWithHA {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHAUtilClonesDelegationTokens() throws Exception {
|
public void testHAUtilClonesDelegationTokens() throws Exception {
|
||||||
final Token<DelegationTokenIdentifier> token =
|
final Token<DelegationTokenIdentifier> token =
|
||||||
dfs.getDelegationToken("test");
|
getDelegationToken(fs, "JobTracker");
|
||||||
|
|
||||||
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("test");
|
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("test");
|
||||||
|
|
||||||
|
@ -258,8 +259,9 @@ public class TestDelegationTokensWithHA {
|
||||||
URI hAUri = HATestUtil.getLogicalUri(cluster);
|
URI hAUri = HATestUtil.getLogicalUri(cluster);
|
||||||
String haService = HAUtil.buildTokenServiceForLogicalUri(hAUri).toString();
|
String haService = HAUtil.buildTokenServiceForLogicalUri(hAUri).toString();
|
||||||
assertEquals(haService, dfs.getCanonicalServiceName());
|
assertEquals(haService, dfs.getCanonicalServiceName());
|
||||||
Token<?> token = dfs.getDelegationToken(
|
final String renewer = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||||
UserGroupInformation.getCurrentUser().getShortUserName());
|
final Token<DelegationTokenIdentifier> token =
|
||||||
|
getDelegationToken(dfs, renewer);
|
||||||
assertEquals(haService, token.getService().toString());
|
assertEquals(haService, token.getService().toString());
|
||||||
// make sure the logical uri is handled correctly
|
// make sure the logical uri is handled correctly
|
||||||
token.renew(dfs.getConf());
|
token.renew(dfs.getConf());
|
||||||
|
@ -281,6 +283,13 @@ public class TestDelegationTokensWithHA {
|
||||||
token.cancel(conf);
|
token.cancel(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private Token<DelegationTokenIdentifier> getDelegationToken(FileSystem fs,
|
||||||
|
String renewer) throws IOException {
|
||||||
|
final Token<?> tokens[] = fs.addDelegationTokens(renewer, null);
|
||||||
|
assertEquals(1, tokens.length);
|
||||||
|
return (Token<DelegationTokenIdentifier>) tokens[0];
|
||||||
|
}
|
||||||
enum TokenTestAction {
|
enum TokenTestAction {
|
||||||
RENEW, CANCEL;
|
RENEW, CANCEL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -126,8 +126,8 @@ public class TestOfflineImageViewer {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get delegation tokens so we log the delegation token op
|
// Get delegation tokens so we log the delegation token op
|
||||||
List<Token<?>> delegationTokens =
|
Token<?>[] delegationTokens =
|
||||||
hdfs.getDelegationTokens(TEST_RENEWER);
|
hdfs.addDelegationTokens(TEST_RENEWER, null);
|
||||||
for (Token<?> t : delegationTokens) {
|
for (Token<?> t : delegationTokens) {
|
||||||
LOG.debug("got token " + t);
|
LOG.debug("got token " + t);
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,6 +41,9 @@ import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenRenewer;
|
import org.apache.hadoop.security.token.TokenRenewer;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
import static org.mockito.Matchers.*;
|
||||||
|
|
||||||
public class TestDelegationTokenFetcher {
|
public class TestDelegationTokenFetcher {
|
||||||
private DistributedFileSystem dfs;
|
private DistributedFileSystem dfs;
|
||||||
|
@ -105,9 +108,17 @@ public class TestDelegationTokenFetcher {
|
||||||
|
|
||||||
// Create a token for the fetcher to fetch, wire NN to return it when asked
|
// Create a token for the fetcher to fetch, wire NN to return it when asked
|
||||||
// for this particular user.
|
// for this particular user.
|
||||||
Token<DelegationTokenIdentifier> t =
|
final Token<DelegationTokenIdentifier> t =
|
||||||
new Token<DelegationTokenIdentifier>(ident, pw, KIND, service);
|
new Token<DelegationTokenIdentifier>(ident, pw, KIND, service);
|
||||||
when(dfs.getDelegationToken(eq((String) null))).thenReturn(t);
|
when(dfs.addDelegationTokens(eq((String) null), any(Credentials.class))).thenAnswer(
|
||||||
|
new Answer<Token<?>[]>() {
|
||||||
|
@Override
|
||||||
|
public Token<?>[] answer(InvocationOnMock invocation) {
|
||||||
|
Credentials creds = (Credentials)invocation.getArguments()[1];
|
||||||
|
creds.addToken(service, t);
|
||||||
|
return new Token<?>[]{t};
|
||||||
|
}
|
||||||
|
});
|
||||||
when(dfs.renewDelegationToken(eq(t))).thenReturn(1000L);
|
when(dfs.renewDelegationToken(eq(t))).thenReturn(1000L);
|
||||||
when(dfs.getUri()).thenReturn(uri);
|
when(dfs.getUri()).thenReturn(uri);
|
||||||
FakeRenewer.reset();
|
FakeRenewer.reset();
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.mapreduce.security;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -110,7 +109,6 @@ public class TokenCache {
|
||||||
* @param conf
|
* @param conf
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
static void obtainTokensForNamenodesInternal(FileSystem fs,
|
static void obtainTokensForNamenodesInternal(FileSystem fs,
|
||||||
Credentials credentials, Configuration conf) throws IOException {
|
Credentials credentials, Configuration conf) throws IOException {
|
||||||
String delegTokenRenewer = Master.getMasterPrincipal(conf);
|
String delegTokenRenewer = Master.getMasterPrincipal(conf);
|
||||||
|
@ -120,26 +118,11 @@ public class TokenCache {
|
||||||
}
|
}
|
||||||
mergeBinaryTokens(credentials, conf);
|
mergeBinaryTokens(credentials, conf);
|
||||||
|
|
||||||
String fsName = fs.getCanonicalServiceName();
|
final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer,
|
||||||
if (TokenCache.getDelegationToken(credentials, fsName) == null) {
|
credentials);
|
||||||
List<Token<?>> tokens =
|
if (tokens != null) {
|
||||||
fs.getDelegationTokens(delegTokenRenewer, credentials);
|
for (Token<?> token : tokens) {
|
||||||
if (tokens != null) {
|
LOG.info("Got dt for " + fs.getUri() + "; "+token);
|
||||||
for (Token<?> token : tokens) {
|
|
||||||
credentials.addToken(token.getService(), token);
|
|
||||||
LOG.info("Got dt for " + fs.getUri() + ";uri="+ fsName +
|
|
||||||
";t.service="+token.getService());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//Call getDelegationToken as well for now - for FS implementations
|
|
||||||
// which may not have implmented getDelegationTokens (hftp)
|
|
||||||
if (tokens == null || tokens.size() == 0) {
|
|
||||||
Token<?> token = fs.getDelegationToken(delegTokenRenewer);
|
|
||||||
if (token != null) {
|
|
||||||
credentials.addToken(token.getService(), token);
|
|
||||||
LOG.info("Got dt for " + fs.getUri() + ";uri=" + fsName
|
|
||||||
+ ";t.service=" + token.getService());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -173,21 +156,6 @@ public class TokenCache {
|
||||||
public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
|
public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
|
||||||
private static final Text JOB_TOKEN = new Text("ShuffleAndJobToken");
|
private static final Text JOB_TOKEN = new Text("ShuffleAndJobToken");
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
* @param namenode
|
|
||||||
* @return delegation token
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public static Token<?> getDelegationToken(
|
|
||||||
Credentials credentials, String namenode) {
|
|
||||||
//No fs specific tokens issues by this fs. It may however issue tokens
|
|
||||||
// for other filesystems - which would be keyed by that filesystems name.
|
|
||||||
if (namenode == null)
|
|
||||||
return null;
|
|
||||||
return (Token<?>) credentials.getToken(new Text(namenode));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* load job token from a file
|
* load job token from a file
|
||||||
* @param conf
|
* @param conf
|
||||||
|
|
|
@ -18,23 +18,16 @@
|
||||||
|
|
||||||
package org.apache.hadoop.mapreduce.security;
|
package org.apache.hadoop.mapreduce.security;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.*;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.mockito.Mockito.*;
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.mockito.Matchers.any;
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.times;
|
|
||||||
import static org.mockito.Mockito.verify;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileSystemTestHelper.MockFileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapred.Master;
|
import org.apache.hadoop.mapred.Master;
|
||||||
|
@ -43,145 +36,42 @@ import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
public class TestTokenCache {
|
public class TestTokenCache {
|
||||||
|
private static Configuration conf;
|
||||||
|
private static String renewer;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setup() throws Exception {
|
||||||
|
conf = new Configuration();
|
||||||
|
conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/host@REALM");
|
||||||
|
renewer = Master.getMasterPrincipal(conf);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@SuppressWarnings("deprecation")
|
public void testObtainTokens() throws Exception {
|
||||||
public void testGetDelegationTokensNotImplemented() throws Exception {
|
|
||||||
Credentials credentials = new Credentials();
|
Credentials credentials = new Credentials();
|
||||||
Configuration conf = new Configuration();
|
FileSystem fs = mock(FileSystem.class);
|
||||||
conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/host@REALM");
|
|
||||||
String renewer = Master.getMasterPrincipal(conf);
|
|
||||||
|
|
||||||
FileSystem fs = setupSingleFsWithoutGetDelegationTokens();
|
|
||||||
TokenCache.obtainTokensForNamenodesInternal(fs, credentials, conf);
|
TokenCache.obtainTokensForNamenodesInternal(fs, credentials, conf);
|
||||||
assertEquals(1, credentials.getAllTokens().size());
|
verify(fs).addDelegationTokens(eq(renewer), eq(credentials));
|
||||||
|
|
||||||
verify(fs).getDelegationTokens(renewer, credentials);
|
|
||||||
verify(fs).getDelegationToken(renewer);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testManagedFileSystem() throws Exception {
|
|
||||||
Credentials credentials = new Credentials();
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/host@REALM");
|
|
||||||
String renewer = Master.getMasterPrincipal(conf);
|
|
||||||
|
|
||||||
FileSystem singleFs = setupSingleFs();
|
|
||||||
FileSystem multiFs = setupMultiFs(singleFs, renewer, credentials);
|
|
||||||
|
|
||||||
TokenCache.obtainTokensForNamenodesInternal(singleFs, credentials, conf);
|
|
||||||
assertEquals(1, credentials.getAllTokens().size());
|
|
||||||
|
|
||||||
TokenCache.obtainTokensForNamenodesInternal(singleFs, credentials, conf);
|
|
||||||
assertEquals(1, credentials.getAllTokens().size());
|
|
||||||
|
|
||||||
TokenCache.obtainTokensForNamenodesInternal(multiFs, credentials, conf);
|
|
||||||
assertEquals(2, credentials.getAllTokens().size());
|
|
||||||
|
|
||||||
TokenCache.obtainTokensForNamenodesInternal(multiFs, credentials, conf);
|
|
||||||
assertEquals(2, credentials.getAllTokens().size());
|
|
||||||
|
|
||||||
verify(singleFs, times(1)).getDelegationTokens(renewer, credentials);
|
|
||||||
verify(multiFs, times(2)).getDelegationTokens(renewer, credentials);
|
|
||||||
// A call to getDelegationToken would have generated an exception.
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
private FileSystem setupSingleFsWithoutGetDelegationTokens() throws Exception {
|
|
||||||
FileSystem mockFs = mock(FileSystem.class);
|
|
||||||
when(mockFs.getCanonicalServiceName()).thenReturn("singlefs4");
|
|
||||||
when(mockFs.getUri()).thenReturn(new URI("singlefs4:///"));
|
|
||||||
|
|
||||||
final Token<?> mockToken = (Token<?>) mock(Token.class);
|
|
||||||
when(mockToken.getService()).thenReturn(new Text("singlefs4"));
|
|
||||||
|
|
||||||
when(mockFs.getDelegationToken(any(String.class))).thenAnswer(
|
|
||||||
new Answer<Token<?>>() {
|
|
||||||
@Override
|
|
||||||
public Token<?> answer(InvocationOnMock invocation) throws Throwable {
|
|
||||||
return mockToken;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
when(mockFs.getDelegationTokens(any(String.class), any(Credentials.class)))
|
|
||||||
.thenReturn(new LinkedList<Token<?>>());
|
|
||||||
|
|
||||||
return mockFs;
|
|
||||||
}
|
|
||||||
|
|
||||||
private FileSystem setupSingleFs() throws Exception {
|
|
||||||
FileSystem mockFs = mock(FileSystem.class);
|
|
||||||
when(mockFs.getCanonicalServiceName()).thenReturn("singlefs1");
|
|
||||||
when(mockFs.getUri()).thenReturn(new URI("singlefs1:///"));
|
|
||||||
|
|
||||||
List<Token<?>> tokens = new LinkedList<Token<?>>();
|
|
||||||
Token<?> mockToken = mock(Token.class);
|
|
||||||
when(mockToken.getService()).thenReturn(new Text("singlefs1"));
|
|
||||||
tokens.add(mockToken);
|
|
||||||
|
|
||||||
when(mockFs.getDelegationTokens(any(String.class))).thenThrow(
|
|
||||||
new RuntimeException(
|
|
||||||
"getDelegationTokens(renewer) should not be called"));
|
|
||||||
when(mockFs.getDelegationTokens(any(String.class), any(Credentials.class)))
|
|
||||||
.thenReturn(tokens);
|
|
||||||
|
|
||||||
return mockFs;
|
|
||||||
}
|
|
||||||
|
|
||||||
private FileSystem setupMultiFs(final FileSystem singleFs,
|
|
||||||
final String renewer, final Credentials credentials) throws Exception {
|
|
||||||
FileSystem mockFs = mock(FileSystem.class);
|
|
||||||
when(mockFs.getCanonicalServiceName()).thenReturn(null);
|
|
||||||
when(mockFs.getUri()).thenReturn(new URI("multifs:///"));
|
|
||||||
|
|
||||||
when(mockFs.getDelegationTokens(any(String.class))).thenThrow(
|
|
||||||
new RuntimeException(
|
|
||||||
"getDelegationTokens(renewer) should not be called"));
|
|
||||||
when(mockFs.getDelegationTokens(renewer, credentials)).thenAnswer(
|
|
||||||
new Answer<List<Token<?>>>() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<Token<?>> answer(InvocationOnMock invocation)
|
|
||||||
throws Throwable {
|
|
||||||
List<Token<?>> newTokens = new LinkedList<Token<?>>();
|
|
||||||
if (credentials.getToken(new Text("singlefs1")) == null) {
|
|
||||||
newTokens.addAll(singleFs.getDelegationTokens(renewer,
|
|
||||||
credentials));
|
|
||||||
} else {
|
|
||||||
newTokens.add(credentials.getToken(new Text("singlefs1")));
|
|
||||||
}
|
|
||||||
Token<?> mockToken2 = mock(Token.class);
|
|
||||||
when(mockToken2.getService()).thenReturn(new Text("singlefs2"));
|
|
||||||
newTokens.add(mockToken2);
|
|
||||||
return newTokens;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
return mockFs;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
public void testBinaryCredentials() throws Exception {
|
public void testBinaryCredentials() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
|
||||||
conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/host@REALM");
|
|
||||||
String renewer = Master.getMasterPrincipal(conf);
|
|
||||||
|
|
||||||
Path TEST_ROOT_DIR =
|
Path TEST_ROOT_DIR =
|
||||||
new Path(System.getProperty("test.build.data","test/build/data"));
|
new Path(System.getProperty("test.build.data","test/build/data"));
|
||||||
// ick, but need fq path minus file:/
|
// ick, but need fq path minus file:/
|
||||||
String binaryTokenFile = FileSystem.getLocal(conf).makeQualified(
|
String binaryTokenFile = FileSystem.getLocal(conf).makeQualified(
|
||||||
new Path(TEST_ROOT_DIR, "tokenFile")).toUri().getPath();
|
new Path(TEST_ROOT_DIR, "tokenFile")).toUri().getPath();
|
||||||
|
|
||||||
FileSystem fs1 = createFileSystemForService("service1");
|
MockFileSystem fs1 = createFileSystemForServiceName("service1");
|
||||||
FileSystem fs2 = createFileSystemForService("service2");
|
MockFileSystem fs2 = createFileSystemForServiceName("service2");
|
||||||
FileSystem fs3 = createFileSystemForService("service3");
|
MockFileSystem fs3 = createFileSystemForServiceName("service3");
|
||||||
|
|
||||||
// get the tokens for fs1 & fs2 and write out to binary creds file
|
// get the tokens for fs1 & fs2 and write out to binary creds file
|
||||||
Credentials creds = new Credentials();
|
Credentials creds = new Credentials();
|
||||||
|
@ -196,7 +86,7 @@ public class TestTokenCache {
|
||||||
// re-init creds and add a newer token for fs1
|
// re-init creds and add a newer token for fs1
|
||||||
creds = new Credentials();
|
creds = new Credentials();
|
||||||
Token<?> newerToken1 = fs1.getDelegationToken(renewer);
|
Token<?> newerToken1 = fs1.getDelegationToken(renewer);
|
||||||
assertFalse(newerToken1.equals(token1));
|
assertNotSame(newerToken1, token1);
|
||||||
creds.addToken(newerToken1.getService(), newerToken1);
|
creds.addToken(newerToken1.getService(), newerToken1);
|
||||||
checkToken(creds, newerToken1);
|
checkToken(creds, newerToken1);
|
||||||
|
|
||||||
|
@ -230,10 +120,9 @@ public class TestTokenCache {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
private MockFileSystem createFileSystemForServiceName(final String service)
|
||||||
private FileSystem createFileSystemForService(final String service)
|
|
||||||
throws IOException {
|
throws IOException {
|
||||||
FileSystem mockFs = mock(FileSystem.class);
|
MockFileSystem mockFs = new MockFileSystem();
|
||||||
when(mockFs.getCanonicalServiceName()).thenReturn(service);
|
when(mockFs.getCanonicalServiceName()).thenReturn(service);
|
||||||
when(mockFs.getDelegationToken(any(String.class))).thenAnswer(
|
when(mockFs.getDelegationToken(any(String.class))).thenAnswer(
|
||||||
new Answer<Token<?>>() {
|
new Answer<Token<?>>() {
|
||||||
|
@ -258,7 +147,8 @@ public class TestTokenCache {
|
||||||
String renewer = Master.getMasterPrincipal(conf);
|
String renewer = Master.getMasterPrincipal(conf);
|
||||||
Credentials credentials = new Credentials();
|
Credentials credentials = new Credentials();
|
||||||
|
|
||||||
FileSystem mockFs = mock(FileSystem.class);
|
final MockFileSystem fs = new MockFileSystem();
|
||||||
|
final MockFileSystem mockFs = (MockFileSystem) fs.getRawFileSystem();
|
||||||
when(mockFs.getCanonicalServiceName()).thenReturn("host:0");
|
when(mockFs.getCanonicalServiceName()).thenReturn("host:0");
|
||||||
when(mockFs.getUri()).thenReturn(new URI("mockfs://host:0"));
|
when(mockFs.getUri()).thenReturn(new URI("mockfs://host:0"));
|
||||||
|
|
||||||
|
@ -266,9 +156,9 @@ public class TestTokenCache {
|
||||||
when(mockPath.getFileSystem(conf)).thenReturn(mockFs);
|
when(mockPath.getFileSystem(conf)).thenReturn(mockFs);
|
||||||
|
|
||||||
Path[] paths = new Path[]{ mockPath, mockPath };
|
Path[] paths = new Path[]{ mockPath, mockPath };
|
||||||
when(mockFs.getDelegationTokens("me", credentials)).thenReturn(null);
|
when(mockFs.addDelegationTokens("me", credentials)).thenReturn(null);
|
||||||
TokenCache.obtainTokensForNamenodesInternal(credentials, paths, conf);
|
TokenCache.obtainTokensForNamenodesInternal(credentials, paths, conf);
|
||||||
verify(mockFs, times(1)).getDelegationTokens(renewer, credentials);
|
verify(mockFs, times(1)).addDelegationTokens(renewer, credentials);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -278,5 +168,4 @@ public class TestTokenCache {
|
||||||
TokenCache.cleanUpTokenReferral(conf);
|
TokenCache.cleanUpTokenReferral(conf);
|
||||||
assertNull(conf.get(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY));
|
assertNull(conf.get(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
|
Loading…
Reference in New Issue