Merge r1569890 through r1573042 from trunk.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1573043 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
commit
ee663fad14
|
@ -18,6 +18,8 @@
|
||||||
package org.apache.hadoop.classification;
|
package org.apache.hadoop.classification;
|
||||||
|
|
||||||
import java.lang.annotation.Documented;
|
import java.lang.annotation.Documented;
|
||||||
|
import java.lang.annotation.Retention;
|
||||||
|
import java.lang.annotation.RetentionPolicy;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Annotation to inform users of a package, class or method's intended audience.
|
* Annotation to inform users of a package, class or method's intended audience.
|
||||||
|
@ -46,20 +48,26 @@ public class InterfaceAudience {
|
||||||
/**
|
/**
|
||||||
* Intended for use by any project or application.
|
* Intended for use by any project or application.
|
||||||
*/
|
*/
|
||||||
@Documented public @interface Public {};
|
@Documented
|
||||||
|
@Retention(RetentionPolicy.RUNTIME)
|
||||||
|
public @interface Public {};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Intended only for the project(s) specified in the annotation.
|
* Intended only for the project(s) specified in the annotation.
|
||||||
* For example, "Common", "HDFS", "MapReduce", "ZooKeeper", "HBase".
|
* For example, "Common", "HDFS", "MapReduce", "ZooKeeper", "HBase".
|
||||||
*/
|
*/
|
||||||
@Documented public @interface LimitedPrivate {
|
@Documented
|
||||||
|
@Retention(RetentionPolicy.RUNTIME)
|
||||||
|
public @interface LimitedPrivate {
|
||||||
String[] value();
|
String[] value();
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Intended for use only within Hadoop itself.
|
* Intended for use only within Hadoop itself.
|
||||||
*/
|
*/
|
||||||
@Documented public @interface Private {};
|
@Documented
|
||||||
|
@Retention(RetentionPolicy.RUNTIME)
|
||||||
|
public @interface Private {};
|
||||||
|
|
||||||
private InterfaceAudience() {} // Audience can't exist on its own
|
private InterfaceAudience() {} // Audience can't exist on its own
|
||||||
}
|
}
|
||||||
|
|
|
@ -346,6 +346,9 @@ Release 2.4.0 - UNRELEASED
|
||||||
HADOOP-9454. Support multipart uploads for s3native. (Jordan Mendelson and
|
HADOOP-9454. Support multipart uploads for s3native. (Jordan Mendelson and
|
||||||
Akira AJISAKA via atm)
|
Akira AJISAKA via atm)
|
||||||
|
|
||||||
|
HADOOP-10374. InterfaceAudience annotations should have
|
||||||
|
RetentionPolicy.RUNTIME (Enis Soztutar via Arpit Agarwal)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -367,6 +367,9 @@ Release 2.4.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-6025. Update findbugsExcludeFile.xml. (szetszwo)
|
HDFS-6025. Update findbugsExcludeFile.xml. (szetszwo)
|
||||||
|
|
||||||
|
HDFS-6030. Remove an unused constructor in INode.java. (yzhang via
|
||||||
|
cmccabe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery
|
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery
|
||||||
|
@ -499,6 +502,9 @@ Release 2.4.0 - UNRELEASED
|
||||||
HDFS-5936. MiniDFSCluster does not clean data left behind by
|
HDFS-5936. MiniDFSCluster does not clean data left behind by
|
||||||
SecondaryNameNode. (Binglin Chang via cnauroth)
|
SecondaryNameNode. (Binglin Chang via cnauroth)
|
||||||
|
|
||||||
|
HDFS-5339. WebHDFS URI does not accept logical nameservices when security is
|
||||||
|
enabled. (Haohui Mai via jing9)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
|
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
|
||||||
|
|
||||||
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)
|
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)
|
||||||
|
@ -642,6 +648,9 @@ Release 2.4.0 - UNRELEASED
|
||||||
HDFS-5908. Change AclFeature to capture list of ACL entries in an
|
HDFS-5908. Change AclFeature to capture list of ACL entries in an
|
||||||
ImmutableList. (cnauroth)
|
ImmutableList. (cnauroth)
|
||||||
|
|
||||||
|
HDFS-6028. Print clearer error message when user attempts to delete required
|
||||||
|
mask entry from ACL. (cnauroth)
|
||||||
|
|
||||||
Release 2.3.1 - UNRELEASED
|
Release 2.3.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -365,7 +365,7 @@ final class AclTransformation {
|
||||||
maskDirty.contains(scope)) {
|
maskDirty.contains(scope)) {
|
||||||
// Caller explicitly removed mask entry, but it's required.
|
// Caller explicitly removed mask entry, but it's required.
|
||||||
throw new AclException(
|
throw new AclException(
|
||||||
"Invalid ACL: mask is required, but it was deleted.");
|
"Invalid ACL: mask is required and cannot be deleted.");
|
||||||
} else if (providedMask.containsKey(scope) &&
|
} else if (providedMask.containsKey(scope) &&
|
||||||
(!scopeDirty.contains(scope) || maskDirty.contains(scope))) {
|
(!scopeDirty.contains(scope) || maskDirty.contains(scope))) {
|
||||||
// Caller explicitly provided new mask, or we are preserving the existing
|
// Caller explicitly provided new mask, or we are preserving the existing
|
||||||
|
|
|
@ -752,11 +752,6 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
|
||||||
*/
|
*/
|
||||||
private List<Block> toDeleteList;
|
private List<Block> toDeleteList;
|
||||||
|
|
||||||
public BlocksMapUpdateInfo(List<Block> toDeleteList) {
|
|
||||||
this.toDeleteList = toDeleteList == null ? new ArrayList<Block>()
|
|
||||||
: toDeleteList;
|
|
||||||
}
|
|
||||||
|
|
||||||
public BlocksMapUpdateInfo() {
|
public BlocksMapUpdateInfo() {
|
||||||
toDeleteList = new ChunkedArrayList<Block>();
|
toDeleteList = new ChunkedArrayList<Block>();
|
||||||
}
|
}
|
||||||
|
|
|
@ -97,9 +97,10 @@ public class HftpFileSystem extends FileSystem
|
||||||
public static final String HFTP_TIMEZONE = "UTC";
|
public static final String HFTP_TIMEZONE = "UTC";
|
||||||
public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
|
public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
|
||||||
|
|
||||||
protected TokenAspect<HftpFileSystem> tokenAspect;
|
protected TokenAspect<? extends HftpFileSystem> tokenAspect;
|
||||||
private Token<?> delegationToken;
|
private Token<?> delegationToken;
|
||||||
private Token<?> renewToken;
|
private Token<?> renewToken;
|
||||||
|
protected Text tokenServiceName;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public URI getCanonicalUri() {
|
public URI getCanonicalUri() {
|
||||||
|
@ -175,9 +176,8 @@ public class HftpFileSystem extends FileSystem
|
||||||
* Initialize connectionFactory and tokenAspect. This function is intended to
|
* Initialize connectionFactory and tokenAspect. This function is intended to
|
||||||
* be overridden by HsFtpFileSystem.
|
* be overridden by HsFtpFileSystem.
|
||||||
*/
|
*/
|
||||||
protected void initTokenAspect(Configuration conf)
|
protected void initTokenAspect() {
|
||||||
throws IOException {
|
tokenAspect = new TokenAspect<HftpFileSystem>(this, tokenServiceName, TOKEN_KIND);
|
||||||
tokenAspect = new TokenAspect<HftpFileSystem>(this, TOKEN_KIND);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -189,6 +189,7 @@ public class HftpFileSystem extends FileSystem
|
||||||
.newDefaultURLConnectionFactory(conf);
|
.newDefaultURLConnectionFactory(conf);
|
||||||
this.ugi = UserGroupInformation.getCurrentUser();
|
this.ugi = UserGroupInformation.getCurrentUser();
|
||||||
this.nnUri = getNamenodeUri(name);
|
this.nnUri = getNamenodeUri(name);
|
||||||
|
this.tokenServiceName = SecurityUtil.buildTokenService(nnUri);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
this.hftpURI = new URI(name.getScheme(), name.getAuthority(),
|
this.hftpURI = new URI(name.getScheme(), name.getAuthority(),
|
||||||
|
@ -197,7 +198,7 @@ public class HftpFileSystem extends FileSystem
|
||||||
throw new IllegalArgumentException(e);
|
throw new IllegalArgumentException(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
initTokenAspect(conf);
|
initTokenAspect();
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
tokenAspect.initDelegationToken(ugi);
|
tokenAspect.initDelegationToken(ugi);
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,11 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.web;
|
package org.apache.hadoop.hdfs.web;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
|
||||||
|
@ -60,8 +57,9 @@ public class HsftpFileSystem extends HftpFileSystem {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void initTokenAspect(Configuration conf) throws IOException {
|
protected void initTokenAspect() {
|
||||||
tokenAspect = new TokenAspect<HftpFileSystem>(this, TOKEN_KIND);
|
tokenAspect = new TokenAspect<HsftpFileSystem>(this, tokenServiceName,
|
||||||
|
TOKEN_KIND);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -37,7 +37,7 @@ public class SWebHdfsFileSystem extends WebHdfsFileSystem {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void initializeTokenAspect() {
|
protected synchronized void initializeTokenAspect() {
|
||||||
tokenAspect = new TokenAspect<WebHdfsFileSystem>(this, TOKEN_KIND);
|
tokenAspect = new TokenAspect<SWebHdfsFileSystem>(this, tokenServiceName, TOKEN_KIND);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.web;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.Collection;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -31,7 +30,6 @@ import org.apache.hadoop.fs.DelegationTokenRenewer.Renewable;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
|
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
@ -93,23 +91,11 @@ final class TokenAspect<T extends FileSystem & Renewable> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static class DTSelecorByKind extends
|
private static class DTSelecorByKind extends
|
||||||
AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
|
AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
|
||||||
private static final DelegationTokenSelector selector = new DelegationTokenSelector();
|
|
||||||
|
|
||||||
public DTSelecorByKind(final Text kind) {
|
public DTSelecorByKind(final Text kind) {
|
||||||
super(kind);
|
super(kind);
|
||||||
}
|
}
|
||||||
|
|
||||||
Token<DelegationTokenIdentifier> selectToken(URI nnUri,
|
|
||||||
Collection<Token<?>> tokens, Configuration conf) {
|
|
||||||
Token<DelegationTokenIdentifier> token = selectToken(
|
|
||||||
SecurityUtil.buildTokenService(nnUri), tokens);
|
|
||||||
if (token == null) {
|
|
||||||
token = selector.selectToken(nnUri, tokens, conf);
|
|
||||||
}
|
|
||||||
return token;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -117,9 +103,6 @@ final class TokenAspect<T extends FileSystem & Renewable> {
|
||||||
*/
|
*/
|
||||||
interface TokenManagementDelegator {
|
interface TokenManagementDelegator {
|
||||||
void cancelDelegationToken(final Token<?> token) throws IOException;
|
void cancelDelegationToken(final Token<?> token) throws IOException;
|
||||||
|
|
||||||
URI getCanonicalUri();
|
|
||||||
|
|
||||||
long renewDelegationToken(final Token<?> token) throws IOException;
|
long renewDelegationToken(final Token<?> token) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -129,11 +112,13 @@ final class TokenAspect<T extends FileSystem & Renewable> {
|
||||||
private final T fs;
|
private final T fs;
|
||||||
private boolean hasInitedToken;
|
private boolean hasInitedToken;
|
||||||
private final Log LOG;
|
private final Log LOG;
|
||||||
|
private final Text serviceName;
|
||||||
|
|
||||||
TokenAspect(T fs, final Text kind) {
|
TokenAspect(T fs, final Text serviceName, final Text kind) {
|
||||||
this.LOG = LogFactory.getLog(fs.getClass());
|
this.LOG = LogFactory.getLog(fs.getClass());
|
||||||
this.fs = fs;
|
this.fs = fs;
|
||||||
this.dtSelector = new DTSelecorByKind(kind);
|
this.dtSelector = new DTSelecorByKind(kind);
|
||||||
|
this.serviceName = serviceName;
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void ensureTokenInitialized() throws IOException {
|
synchronized void ensureTokenInitialized() throws IOException {
|
||||||
|
@ -173,9 +158,7 @@ final class TokenAspect<T extends FileSystem & Renewable> {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
Token<DelegationTokenIdentifier> selectDelegationToken(
|
Token<DelegationTokenIdentifier> selectDelegationToken(
|
||||||
UserGroupInformation ugi) {
|
UserGroupInformation ugi) {
|
||||||
return dtSelector.selectToken(
|
return dtSelector.selectToken(serviceName, ugi.getTokens());
|
||||||
((TokenManagementDelegator)fs).getCanonicalUri(), ugi.getTokens(),
|
|
||||||
fs.getConf());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void addRenewAction(final T webhdfs) {
|
private synchronized void addRenewAction(final T webhdfs) {
|
||||||
|
|
|
@ -27,7 +27,6 @@ import java.net.HttpURLConnection;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -119,11 +118,12 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
|
|
||||||
/** Delegation token kind */
|
/** Delegation token kind */
|
||||||
public static final Text TOKEN_KIND = new Text("WEBHDFS delegation");
|
public static final Text TOKEN_KIND = new Text("WEBHDFS delegation");
|
||||||
protected TokenAspect<WebHdfsFileSystem> tokenAspect;
|
protected TokenAspect<? extends WebHdfsFileSystem> tokenAspect;
|
||||||
|
|
||||||
private UserGroupInformation ugi;
|
private UserGroupInformation ugi;
|
||||||
private URI uri;
|
private URI uri;
|
||||||
private Token<?> delegationToken;
|
private Token<?> delegationToken;
|
||||||
|
protected Text tokenServiceName;
|
||||||
private RetryPolicy retryPolicy = null;
|
private RetryPolicy retryPolicy = null;
|
||||||
private Path workingDir;
|
private Path workingDir;
|
||||||
private InetSocketAddress nnAddrs[];
|
private InetSocketAddress nnAddrs[];
|
||||||
|
@ -152,7 +152,8 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
* be overridden by SWebHdfsFileSystem.
|
* be overridden by SWebHdfsFileSystem.
|
||||||
*/
|
*/
|
||||||
protected synchronized void initializeTokenAspect() {
|
protected synchronized void initializeTokenAspect() {
|
||||||
tokenAspect = new TokenAspect<WebHdfsFileSystem>(this, TOKEN_KIND);
|
tokenAspect = new TokenAspect<WebHdfsFileSystem>(this, tokenServiceName,
|
||||||
|
TOKEN_KIND);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -161,23 +162,26 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
super.initialize(uri, conf);
|
super.initialize(uri, conf);
|
||||||
setConf(conf);
|
setConf(conf);
|
||||||
/** set user pattern based on configuration file */
|
/** set user pattern based on configuration file */
|
||||||
UserParam.setUserPattern(conf.get(DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY, DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT));
|
UserParam.setUserPattern(conf.get(
|
||||||
|
DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY,
|
||||||
|
DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT));
|
||||||
|
|
||||||
connectionFactory = URLConnectionFactory
|
connectionFactory = URLConnectionFactory
|
||||||
.newDefaultURLConnectionFactory(conf);
|
.newDefaultURLConnectionFactory(conf);
|
||||||
initializeTokenAspect();
|
|
||||||
|
|
||||||
|
|
||||||
ugi = UserGroupInformation.getCurrentUser();
|
ugi = UserGroupInformation.getCurrentUser();
|
||||||
|
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
|
||||||
|
this.nnAddrs = DFSUtil.resolveWebHdfsUri(this.uri, conf);
|
||||||
|
|
||||||
try {
|
boolean isHA = HAUtil.isLogicalUri(conf, this.uri);
|
||||||
this.uri = new URI(uri.getScheme(), uri.getAuthority(), null,
|
// In non-HA case, the code needs to call getCanonicalUri() in order to
|
||||||
null, null);
|
// handle the case where no port is specified in the URI
|
||||||
this.nnAddrs = DFSUtil.resolveWebHdfsUri(this.uri, conf);
|
this.tokenServiceName = isHA ? HAUtil.buildTokenServiceForLogicalUri(uri)
|
||||||
} catch (URISyntaxException e) {
|
: SecurityUtil.buildTokenService(getCanonicalUri());
|
||||||
throw new IllegalArgumentException(e);
|
initializeTokenAspect();
|
||||||
}
|
|
||||||
|
|
||||||
if (!HAUtil.isLogicalUri(conf, this.uri)) {
|
if (!isHA) {
|
||||||
this.retryPolicy =
|
this.retryPolicy =
|
||||||
RetryUtils.getDefaultRetryPolicy(
|
RetryUtils.getDefaultRetryPolicy(
|
||||||
conf,
|
conf,
|
||||||
|
@ -1004,19 +1008,19 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN;
|
final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN;
|
||||||
final Map<?, ?> m = run(op, null, new RenewerParam(renewer));
|
final Map<?, ?> m = run(op, null, new RenewerParam(renewer));
|
||||||
final Token<DelegationTokenIdentifier> token = JsonUtil.toDelegationToken(m);
|
final Token<DelegationTokenIdentifier> token = JsonUtil.toDelegationToken(m);
|
||||||
SecurityUtil.setTokenService(token, getCurrentNNAddr());
|
token.setService(tokenServiceName);
|
||||||
return token;
|
return token;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Token<?> getRenewToken() {
|
public synchronized Token<?> getRenewToken() {
|
||||||
return delegationToken;
|
return delegationToken;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T extends TokenIdentifier> void setDelegationToken(
|
public <T extends TokenIdentifier> void setDelegationToken(
|
||||||
final Token<T> token) {
|
final Token<T> token) {
|
||||||
synchronized(this) {
|
synchronized (this) {
|
||||||
delegationToken = token;
|
delegationToken = token;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
@ -73,14 +74,15 @@ public class TestHftpDelegationToken {
|
||||||
doReturn(conn).when(factory).openConnection(Mockito.<URL> any(),
|
doReturn(conn).when(factory).openConnection(Mockito.<URL> any(),
|
||||||
anyBoolean());
|
anyBoolean());
|
||||||
|
|
||||||
fs.initialize(new URI("hftp://127.0.0.1:8020"), conf);
|
final URI uri = new URI("hftp://127.0.0.1:8020");
|
||||||
|
fs.initialize(uri, conf);
|
||||||
fs.connectionFactory = factory;
|
fs.connectionFactory = factory;
|
||||||
|
|
||||||
UserGroupInformation ugi = UserGroupInformation.createUserForTesting("foo",
|
UserGroupInformation ugi = UserGroupInformation.createUserForTesting("foo",
|
||||||
new String[] { "bar" });
|
new String[] { "bar" });
|
||||||
|
|
||||||
TokenAspect<HftpFileSystem> tokenAspect = new TokenAspect<HftpFileSystem>(
|
TokenAspect<HftpFileSystem> tokenAspect = new TokenAspect<HftpFileSystem>(
|
||||||
fs, HftpFileSystem.TOKEN_KIND);
|
fs, SecurityUtil.buildTokenService(uri), HftpFileSystem.TOKEN_KIND);
|
||||||
|
|
||||||
tokenAspect.initDelegationToken(ugi);
|
tokenAspect.initDelegationToken(ugi);
|
||||||
tokenAspect.ensureTokenInitialized();
|
tokenAspect.ensureTokenInitialized();
|
||||||
|
|
|
@ -48,9 +48,8 @@ import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.security.SecurityUtilTestHelper;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
@ -122,14 +121,9 @@ public class TestTokenAspect {
|
||||||
public void initialize(URI name, Configuration conf) throws IOException {
|
public void initialize(URI name, Configuration conf) throws IOException {
|
||||||
super.initialize(name, conf);
|
super.initialize(name, conf);
|
||||||
setConf(conf);
|
setConf(conf);
|
||||||
try {
|
this.uri = URI.create(name.getScheme() + "://" + name.getAuthority());
|
||||||
this.uri = new URI(name.getScheme(), name.getAuthority(), null, null,
|
tokenAspect = new TokenAspect<DummyFs>(this,
|
||||||
null);
|
SecurityUtil.buildTokenService(uri), TOKEN_KIND);
|
||||||
} catch (URISyntaxException e) {
|
|
||||||
throw new IllegalArgumentException(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
tokenAspect = new TokenAspect<DummyFs>(this, DummyFs.TOKEN_KIND);
|
|
||||||
if (emulateSecurityEnabled || UserGroupInformation.isSecurityEnabled()) {
|
if (emulateSecurityEnabled || UserGroupInformation.isSecurityEnabled()) {
|
||||||
tokenAspect.initDelegationToken(ugi);
|
tokenAspect.initDelegationToken(ugi);
|
||||||
}
|
}
|
||||||
|
@ -293,9 +287,10 @@ public class TestTokenAspect {
|
||||||
doThrow(new IOException("get failed")).when(fs).addDelegationTokens(null,
|
doThrow(new IOException("get failed")).when(fs).addDelegationTokens(null,
|
||||||
null);
|
null);
|
||||||
|
|
||||||
|
final URI uri = new URI("dummyfs://127.0.0.1:1234");
|
||||||
TokenAspect<DummyFs> tokenAspect = new TokenAspect<DummyFs>(fs,
|
TokenAspect<DummyFs> tokenAspect = new TokenAspect<DummyFs>(fs,
|
||||||
DummyFs.TOKEN_KIND);
|
SecurityUtil.buildTokenService(uri), DummyFs.TOKEN_KIND);
|
||||||
fs.initialize(new URI("dummyfs://127.0.0.1:1234"), conf);
|
fs.initialize(uri, conf);
|
||||||
tokenAspect.initDelegationToken(ugi);
|
tokenAspect.initDelegationToken(ugi);
|
||||||
|
|
||||||
// trigger token acquisition
|
// trigger token acquisition
|
||||||
|
@ -318,58 +313,4 @@ public class TestTokenAspect {
|
||||||
action = getActionFromTokenAspect(tokenAspect);
|
action = getActionFromTokenAspect(tokenAspect);
|
||||||
assertTrue(action.isValid());
|
assertTrue(action.isValid());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testTokenSelectionPreferences() throws IOException,
|
|
||||||
URISyntaxException {
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
DummyFs fs = spy(new DummyFs());
|
|
||||||
doReturn(null).when(fs).getDelegationToken(anyString());
|
|
||||||
fs.initialize(new URI("dummyfs://localhost:1234"), conf);
|
|
||||||
TokenAspect<DummyFs> aspect = new TokenAspect<DummyFs>(fs,
|
|
||||||
DummyFs.TOKEN_KIND);
|
|
||||||
UserGroupInformation ugi = UserGroupInformation.createUserForTesting("foo",
|
|
||||||
new String[] { "bar" });
|
|
||||||
|
|
||||||
// use ip-based tokens
|
|
||||||
SecurityUtilTestHelper.setTokenServiceUseIp(true);
|
|
||||||
|
|
||||||
// test fallback to hdfs token
|
|
||||||
Token<TokenIdentifier> hdfsToken = new Token<TokenIdentifier>(new byte[0],
|
|
||||||
new byte[0], DelegationTokenIdentifier.HDFS_DELEGATION_KIND, new Text(
|
|
||||||
"127.0.0.1:8020"));
|
|
||||||
ugi.addToken(hdfsToken);
|
|
||||||
|
|
||||||
// test fallback to hdfs token
|
|
||||||
Token<?> token = aspect.selectDelegationToken(ugi);
|
|
||||||
assertEquals(hdfsToken, token);
|
|
||||||
|
|
||||||
// test dummyfs is favored over hdfs
|
|
||||||
Token<TokenIdentifier> dummyFsToken = new Token<TokenIdentifier>(
|
|
||||||
new byte[0], new byte[0], DummyFs.TOKEN_KIND,
|
|
||||||
new Text("127.0.0.1:1234"));
|
|
||||||
ugi.addToken(dummyFsToken);
|
|
||||||
token = aspect.selectDelegationToken(ugi);
|
|
||||||
assertEquals(dummyFsToken, token);
|
|
||||||
|
|
||||||
// switch to using host-based tokens, no token should match
|
|
||||||
SecurityUtilTestHelper.setTokenServiceUseIp(false);
|
|
||||||
token = aspect.selectDelegationToken(ugi);
|
|
||||||
assertNull(token);
|
|
||||||
|
|
||||||
// test fallback to hdfs token
|
|
||||||
hdfsToken = new Token<TokenIdentifier>(new byte[0], new byte[0],
|
|
||||||
DelegationTokenIdentifier.HDFS_DELEGATION_KIND, new Text(
|
|
||||||
"localhost:8020"));
|
|
||||||
ugi.addToken(hdfsToken);
|
|
||||||
token = aspect.selectDelegationToken(ugi);
|
|
||||||
assertEquals(hdfsToken, token);
|
|
||||||
|
|
||||||
// test dummyfs is favored over hdfs
|
|
||||||
dummyFsToken = new Token<TokenIdentifier>(new byte[0], new byte[0],
|
|
||||||
DummyFs.TOKEN_KIND, new Text("localhost:1234"));
|
|
||||||
ugi.addToken(dummyFsToken);
|
|
||||||
token = aspect.selectDelegationToken(ugi);
|
|
||||||
assertEquals(dummyFsToken, token);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.web;
|
package org.apache.hadoop.hdfs.web;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -27,35 +28,34 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
/** Test whether WebHDFS can connect to an HA cluster */
|
/** Test whether WebHDFS can connect to an HA cluster */
|
||||||
public class TestWebHDFSForHA {
|
public class TestWebHDFSForHA {
|
||||||
|
|
||||||
private static final String LOGICAL_NAME = "minidfs";
|
private static final String LOGICAL_NAME = "minidfs";
|
||||||
|
private static final MiniDFSNNTopology topo = new MiniDFSNNTopology()
|
||||||
|
.addNameservice(new MiniDFSNNTopology.NSConf(LOGICAL_NAME).addNN(
|
||||||
|
new MiniDFSNNTopology.NNConf("nn1")).addNN(
|
||||||
|
new MiniDFSNNTopology.NNConf("nn2")));
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test() throws Exception {
|
public void testHA() throws IOException {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
|
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
MiniDFSNNTopology topo = new MiniDFSNNTopology()
|
|
||||||
.addNameservice(new MiniDFSNNTopology.NSConf(LOGICAL_NAME).addNN(
|
|
||||||
new MiniDFSNNTopology.NNConf("nn1")).addNN(
|
|
||||||
new MiniDFSNNTopology.NNConf("nn2")));
|
|
||||||
|
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology(topo)
|
|
||||||
.numDataNodes(3).build();
|
|
||||||
|
|
||||||
HATestUtil.setFailoverConfigurations(cluster, conf, LOGICAL_NAME);
|
|
||||||
|
|
||||||
FileSystem fs = null;
|
FileSystem fs = null;
|
||||||
try {
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).nnTopology(topo)
|
||||||
|
.numDataNodes(0).build();
|
||||||
|
|
||||||
|
HATestUtil.setFailoverConfigurations(cluster, conf, LOGICAL_NAME);
|
||||||
|
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
|
||||||
final String uri = WebHdfsFileSystem.SCHEME + "://" + LOGICAL_NAME;
|
final String uri = WebHdfsFileSystem.SCHEME + "://" + LOGICAL_NAME;
|
||||||
fs = (WebHdfsFileSystem) FileSystem.get(new URI(uri), conf);
|
fs = FileSystem.get(URI.create(uri), conf);
|
||||||
cluster.transitionToActive(0);
|
cluster.transitionToActive(0);
|
||||||
|
|
||||||
final Path dir = new Path("/test");
|
final Path dir = new Path("/test");
|
||||||
|
@ -66,12 +66,50 @@ public class TestWebHDFSForHA {
|
||||||
|
|
||||||
final Path dir2 = new Path("/test2");
|
final Path dir2 = new Path("/test2");
|
||||||
Assert.assertTrue(fs.mkdirs(dir2));
|
Assert.assertTrue(fs.mkdirs(dir2));
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
if (fs != null) {
|
if (fs != null) {
|
||||||
fs.close();
|
fs.close();
|
||||||
}
|
}
|
||||||
cluster.shutdown();
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSecureHA() throws IOException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
|
||||||
|
true);
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
|
||||||
|
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
WebHdfsFileSystem fs = null;
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).nnTopology(topo)
|
||||||
|
.numDataNodes(0).build();
|
||||||
|
|
||||||
|
HATestUtil.setFailoverConfigurations(cluster, conf, LOGICAL_NAME);
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
final String uri = WebHdfsFileSystem.SCHEME + "://" + LOGICAL_NAME;
|
||||||
|
fs = (WebHdfsFileSystem) FileSystem.get(URI.create(uri), conf);
|
||||||
|
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
Token<?> token = fs.getDelegationToken(null);
|
||||||
|
|
||||||
|
cluster.shutdownNameNode(0);
|
||||||
|
cluster.transitionToActive(1);
|
||||||
|
|
||||||
|
fs.renewDelegationToken(token);
|
||||||
|
fs.cancelDelegationToken(token);
|
||||||
|
} finally {
|
||||||
|
if (fs != null) {
|
||||||
|
fs.close();
|
||||||
|
}
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -153,6 +153,9 @@ Release 2.5.0 - UNRELEASED
|
||||||
MAPREDUCE-5688. TestStagingCleanup fails intermittently with JDK7 (Mit
|
MAPREDUCE-5688. TestStagingCleanup fails intermittently with JDK7 (Mit
|
||||||
Desai via jeagles)
|
Desai via jeagles)
|
||||||
|
|
||||||
|
MAPREDUCE-5665. Add audience annotations to MiniMRYarnCluster and
|
||||||
|
MiniMRCluster. (Anubhav Dhoot via kasha)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
@ -199,6 +202,9 @@ Release 2.4.0 - UNRELEASED
|
||||||
to the YARN's web-app proxy with the correct scheme prefix. (Jian He via
|
to the YARN's web-app proxy with the correct scheme prefix. (Jian He via
|
||||||
vinodkv)
|
vinodkv)
|
||||||
|
|
||||||
|
MAPREDUCE-5768. TestMRJobs.testContainerRollingLog fails on trunk (Gera
|
||||||
|
Shegalov via jlowe)
|
||||||
|
|
||||||
Release 2.3.1 - UNRELEASED
|
Release 2.3.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -22,6 +22,8 @@ import java.util.Random;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
@ -40,6 +42,8 @@ import org.apache.hadoop.security.UserGroupInformation;
|
||||||
* instead
|
* instead
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
public class MiniMRCluster {
|
public class MiniMRCluster {
|
||||||
private static final Log LOG = LogFactory.getLog(MiniMRCluster.class);
|
private static final Log LOG = LogFactory.getLog(MiniMRCluster.class);
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,8 @@ import java.net.UnknownHostException;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.fs.FileContext;
|
import org.apache.hadoop.fs.FileContext;
|
||||||
|
@ -55,6 +57,8 @@ import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||||
* Configures and starts the MR-specific components in the YARN cluster.
|
* Configures and starts the MR-specific components in the YARN cluster.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
public class MiniMRYarnCluster extends MiniYARNCluster {
|
public class MiniMRYarnCluster extends MiniYARNCluster {
|
||||||
|
|
||||||
public static final String APPJAR = JarFinder.getJar(LocalContainerLauncher.class);
|
public static final String APPJAR = JarFinder.getJar(LocalContainerLauncher.class);
|
||||||
|
|
|
@ -23,7 +23,6 @@ import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.InputStreamReader;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.StringReader;
|
import java.io.StringReader;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
@ -82,8 +81,10 @@ import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.util.JarFinder;
|
import org.apache.hadoop.util.JarFinder;
|
||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -492,28 +493,17 @@ public class TestMRJobs {
|
||||||
LOG.info("Checking for glob: " + absSyslogGlob);
|
LOG.info("Checking for glob: " + absSyslogGlob);
|
||||||
final FileStatus[] syslogs = localFs.globStatus(absSyslogGlob);
|
final FileStatus[] syslogs = localFs.globStatus(absSyslogGlob);
|
||||||
for (FileStatus slog : syslogs) {
|
for (FileStatus slog : syslogs) {
|
||||||
// check all syslogs for the container
|
boolean foundAppMaster = job.isUber();
|
||||||
//
|
final Path containerPathComponent = slog.getPath().getParent();
|
||||||
final FileStatus[] sysSiblings = localFs.globStatus(new Path(
|
if (!foundAppMaster) {
|
||||||
slog.getPath().getParent(), TaskLog.LogName.SYSLOG + "*"));
|
final ContainerId cid = ConverterUtils.toContainerId(
|
||||||
boolean foundAppMaster = false;
|
containerPathComponent.getName());
|
||||||
floop:
|
foundAppMaster = (cid.getId() == 1);
|
||||||
for (FileStatus f : sysSiblings) {
|
|
||||||
final BufferedReader reader = new BufferedReader(
|
|
||||||
new InputStreamReader(localFs.open(f.getPath())));
|
|
||||||
String line;
|
|
||||||
try {
|
|
||||||
while ((line = reader.readLine()) != null) {
|
|
||||||
if (line.contains(MRJobConfig.APPLICATION_MASTER_CLASS)) {
|
|
||||||
foundAppMaster = true;
|
|
||||||
break floop;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
reader.close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final FileStatus[] sysSiblings = localFs.globStatus(new Path(
|
||||||
|
containerPathComponent, TaskLog.LogName.SYSLOG + "*"));
|
||||||
|
|
||||||
if (foundAppMaster) {
|
if (foundAppMaster) {
|
||||||
numAppMasters++;
|
numAppMasters++;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -241,6 +241,8 @@ Release 2.4.0 - UNRELEASED
|
||||||
YARN-1301. Added the INFO level log of the non-empty blacklist additions
|
YARN-1301. Added the INFO level log of the non-empty blacklist additions
|
||||||
and removals inside ApplicationMasterService. (Tsuyoshi Ozawa via zjshen)
|
and removals inside ApplicationMasterService. (Tsuyoshi Ozawa via zjshen)
|
||||||
|
|
||||||
|
YARN-1528. Allow setting auth for ZK connections. (kasha)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -335,6 +335,8 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final String RM_ZK_ACL = RM_ZK_PREFIX + "acl";
|
public static final String RM_ZK_ACL = RM_ZK_PREFIX + "acl";
|
||||||
public static final String DEFAULT_RM_ZK_ACL = "world:anyone:rwcda";
|
public static final String DEFAULT_RM_ZK_ACL = "world:anyone:rwcda";
|
||||||
|
|
||||||
|
public static final String RM_ZK_AUTH = RM_ZK_PREFIX + "auth";
|
||||||
|
|
||||||
public static final String ZK_STATE_STORE_PREFIX =
|
public static final String ZK_STATE_STORE_PREFIX =
|
||||||
RM_PREFIX + "zk-state-store.";
|
RM_PREFIX + "zk-state-store.";
|
||||||
|
|
||||||
|
|
|
@ -31,14 +31,12 @@ import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.ZKUtil;
|
import org.apache.hadoop.util.ZKUtil;
|
||||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.apache.zookeeper.data.ACL;
|
import org.apache.zookeeper.data.ACL;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
@ -88,18 +86,8 @@ public class EmbeddedElectorService extends AbstractService
|
||||||
long zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
|
long zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
|
||||||
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
|
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
|
||||||
|
|
||||||
String zkAclConf = conf.get(YarnConfiguration.RM_ZK_ACL,
|
List<ACL> zkAcls = RMZKUtils.getZKAcls(conf);
|
||||||
YarnConfiguration.DEFAULT_RM_ZK_ACL);
|
List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);
|
||||||
List<ACL> zkAcls;
|
|
||||||
try {
|
|
||||||
zkAcls = ZKUtil.parseACLs(ZKUtil.resolveConfIndirection(zkAclConf));
|
|
||||||
} catch (ZKUtil.BadAclFormatException bafe) {
|
|
||||||
throw new YarnRuntimeException(
|
|
||||||
YarnConfiguration.RM_ZK_ACL + "has ill-formatted ACLs");
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO (YARN-1528): ZKAuthInfo to be set for rm-store and elector
|
|
||||||
List<ZKUtil.ZKAuthInfo> zkAuths = Collections.emptyList();
|
|
||||||
|
|
||||||
elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
|
elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
|
||||||
electionZNode, zkAcls, zkAuths, this);
|
electionZNode, zkAcls, zkAuths, this);
|
||||||
|
|
|
@ -0,0 +1,74 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.util.ZKUtil;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.zookeeper.data.ACL;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper class that provides utility methods specific to ZK operations
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class RMZKUtils {
|
||||||
|
private static final Log LOG = LogFactory.getLog(RMZKUtils.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility method to fetch the ZK ACLs from the configuration
|
||||||
|
*/
|
||||||
|
public static List<ACL> getZKAcls(Configuration conf) throws Exception {
|
||||||
|
// Parse authentication from configuration.
|
||||||
|
String zkAclConf =
|
||||||
|
conf.get(YarnConfiguration.RM_ZK_ACL,
|
||||||
|
YarnConfiguration.DEFAULT_RM_ZK_ACL);
|
||||||
|
try {
|
||||||
|
zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
|
||||||
|
return ZKUtil.parseACLs(zkAclConf);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Couldn't read ACLs based on " + YarnConfiguration.RM_ZK_ACL);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility method to fetch ZK auth info from the configuration
|
||||||
|
*/
|
||||||
|
public static List<ZKUtil.ZKAuthInfo> getZKAuths(Configuration conf)
|
||||||
|
throws Exception {
|
||||||
|
String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH);
|
||||||
|
try {
|
||||||
|
zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
|
||||||
|
if (zkAuthConf != null) {
|
||||||
|
return ZKUtil.parseAuth(zkAuthConf);
|
||||||
|
} else {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Couldn't read Auth based on " + YarnConfiguration.RM_ZK_AUTH);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.Appli
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
||||||
|
@ -91,6 +92,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
private int zkSessionTimeout;
|
private int zkSessionTimeout;
|
||||||
private long zkRetryInterval;
|
private long zkRetryInterval;
|
||||||
private List<ACL> zkAcl;
|
private List<ACL> zkAcl;
|
||||||
|
private List<ZKUtil.ZKAuthInfo> zkAuths;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -200,18 +202,9 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
zkRetryInterval =
|
zkRetryInterval =
|
||||||
conf.getLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
|
conf.getLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
|
||||||
YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
|
YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
|
||||||
// Parse authentication from configuration.
|
|
||||||
String zkAclConf =
|
|
||||||
conf.get(YarnConfiguration.RM_ZK_ACL,
|
|
||||||
YarnConfiguration.DEFAULT_RM_ZK_ACL);
|
|
||||||
zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
|
|
||||||
|
|
||||||
try {
|
zkAcl = RMZKUtils.getZKAcls(conf);
|
||||||
zkAcl = ZKUtil.parseACLs(zkAclConf);
|
zkAuths = RMZKUtils.getZKAuths(conf);
|
||||||
} catch (ZKUtil.BadAclFormatException bafe) {
|
|
||||||
LOG.error("Invalid format for " + YarnConfiguration.RM_ZK_ACL);
|
|
||||||
throw bafe;
|
|
||||||
}
|
|
||||||
|
|
||||||
zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
|
zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
|
||||||
rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
|
rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
|
||||||
|
@ -952,6 +945,9 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
retries++) {
|
retries++) {
|
||||||
try {
|
try {
|
||||||
zkClient = getNewZooKeeper();
|
zkClient = getNewZooKeeper();
|
||||||
|
for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
|
||||||
|
zkClient.addAuthInfo(zkAuth.getScheme(), zkAuth.getAuth());
|
||||||
|
}
|
||||||
if (useDefaultFencingScheme) {
|
if (useDefaultFencingScheme) {
|
||||||
zkClient.addAuthInfo(zkRootNodeAuthScheme,
|
zkClient.addAuthInfo(zkRootNodeAuthScheme,
|
||||||
(zkRootNodeUsername + ":" + zkRootNodePassword).getBytes());
|
(zkRootNodeUsername + ":" + zkRootNodePassword).getBytes());
|
||||||
|
|
|
@ -32,10 +32,12 @@ import org.apache.zookeeper.WatchedEvent;
|
||||||
import org.apache.zookeeper.Watcher;
|
import org.apache.zookeeper.Watcher;
|
||||||
import org.apache.zookeeper.ZooDefs;
|
import org.apache.zookeeper.ZooDefs;
|
||||||
import org.apache.zookeeper.ZooKeeper;
|
import org.apache.zookeeper.ZooKeeper;
|
||||||
|
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.util.concurrent.CyclicBarrier;
|
import java.util.concurrent.CyclicBarrier;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
@ -49,6 +51,20 @@ public class TestZKRMStateStoreZKClientConnections extends
|
||||||
private Log LOG =
|
private Log LOG =
|
||||||
LogFactory.getLog(TestZKRMStateStoreZKClientConnections.class);
|
LogFactory.getLog(TestZKRMStateStoreZKClientConnections.class);
|
||||||
|
|
||||||
|
private static final String DIGEST_USER_PASS="test-user:test-password";
|
||||||
|
private static final String TEST_AUTH_GOOD = "digest:" + DIGEST_USER_PASS;
|
||||||
|
private static final String DIGEST_USER_HASH;
|
||||||
|
static {
|
||||||
|
try {
|
||||||
|
DIGEST_USER_HASH = DigestAuthenticationProvider.generateDigest(
|
||||||
|
DIGEST_USER_PASS);
|
||||||
|
} catch (NoSuchAlgorithmException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
private static final String TEST_ACL = "digest:" + DIGEST_USER_HASH + ":rwcda";
|
||||||
|
|
||||||
|
|
||||||
class TestZKClient {
|
class TestZKClient {
|
||||||
|
|
||||||
ZKRMStateStore store;
|
ZKRMStateStore store;
|
||||||
|
@ -252,4 +268,16 @@ public class TestZKRMStateStoreZKClientConnections extends
|
||||||
fail(error);
|
fail(error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testZKAuths() throws Exception {
|
||||||
|
TestZKClient zkClientTester = new TestZKClient();
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
conf.setInt(YarnConfiguration.RM_ZK_NUM_RETRIES, 1);
|
||||||
|
conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 100);
|
||||||
|
conf.set(YarnConfiguration.RM_ZK_ACL, TEST_ACL);
|
||||||
|
conf.set(YarnConfiguration.RM_ZK_AUTH, TEST_AUTH_GOOD);
|
||||||
|
|
||||||
|
zkClientTester.getRMStateStore(conf);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue