Merging r1521566 through r1523108 from trunk to branch HDFS-2832

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1523110 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-09-13 21:27:58 +00:00
commit 06eb46eda3
69 changed files with 1556 additions and 373 deletions

View File

@ -153,9 +153,13 @@ Building on OS/X
---------------------------------------------------------------------------------- ----------------------------------------------------------------------------------
Hadoop does not build on OS/X with Java 7. A one-time manual step is required to enable building Hadoop OS X with Java 7
every time the JDK is updated.
see: https://issues.apache.org/jira/browse/HADOOP-9350 see: https://issues.apache.org/jira/browse/HADOOP-9350
$ sudo mkdir `/usr/libexec/java_home`/Classes
$ sudo ln -s `/usr/libexec/java_home`/lib/tools.jar `/usr/libexec/java_home`/Classes/classes.jar
---------------------------------------------------------------------------------- ----------------------------------------------------------------------------------
Building on Windows Building on Windows

View File

@ -56,6 +56,21 @@
</dependency> </dependency>
</dependencies> </dependencies>
</profile> </profile>
<profile>
<id>jdk1.7</id>
<activation>
<jdk>1.7</jdk>
</activation>
<dependencies>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.7</version>
<scope>system</scope>
<systemPath>${java.home}/../lib/tools.jar</systemPath>
</dependency>
</dependencies>
</profile>
</profiles> </profiles>
</project> </project>

View File

@ -363,6 +363,9 @@ Release 2.3.0 - UNRELEASED
HADOOP-9908. Fix NPE when versioninfo properties file is missing (todd) HADOOP-9908. Fix NPE when versioninfo properties file is missing (todd)
HADOOP-9350. Hadoop not building against Java7 on OSX
(Robert Kanter via stevel)
Release 2.1.1-beta - UNRELEASED Release 2.1.1-beta - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
@ -454,6 +457,9 @@ Release 2.1.1-beta - UNRELEASED
HADOOP-9932. Improper synchronization in RetryCache. (kihwal) HADOOP-9932. Improper synchronization in RetryCache. (kihwal)
HADOOP-9958. Add old constructor back to DelegationTokenInformation to
unbreak downstream builds. (Andrew Wang)
Release 2.1.0-beta - 2013-08-22 Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.security.token;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
@ -35,6 +36,9 @@ import org.apache.hadoop.security.UserGroupInformation;
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"}) @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving @InterfaceStability.Evolving
public abstract class TokenIdentifier implements Writable { public abstract class TokenIdentifier implements Writable {
private String trackingId = null;
/** /**
* Get the token kind * Get the token kind
* @return the kind of the token * @return the kind of the token
@ -62,4 +66,19 @@ public abstract class TokenIdentifier implements Writable {
} }
return Arrays.copyOf(buf.getData(), buf.getLength()); return Arrays.copyOf(buf.getData(), buf.getLength());
} }
/**
* Returns a tracking identifier that can be used to associate usages of a
* token across multiple client sessions.
*
* Currently, this function just returns an MD5 of {{@link #getBytes()}.
*
* @return tracking identifier
*/
public String getTrackingId() {
if (trackingId == null) {
trackingId = DigestUtils.md5Hex(getBytes());
}
return trackingId;
}
} }

View File

@ -86,6 +86,11 @@ extends AbstractDelegationTokenIdentifier>
private long tokenMaxLifetime; private long tokenMaxLifetime;
private long tokenRemoverScanInterval; private long tokenRemoverScanInterval;
private long tokenRenewInterval; private long tokenRenewInterval;
/**
* Whether to store a token's tracking ID in its TokenInformation.
* Can be overridden by a subclass.
*/
protected boolean storeTokenTrackingId;
private Thread tokenRemoverThread; private Thread tokenRemoverThread;
protected volatile boolean running; protected volatile boolean running;
@ -102,6 +107,7 @@ extends AbstractDelegationTokenIdentifier>
this.tokenMaxLifetime = delegationTokenMaxLifetime; this.tokenMaxLifetime = delegationTokenMaxLifetime;
this.tokenRenewInterval = delegationTokenRenewInterval; this.tokenRenewInterval = delegationTokenRenewInterval;
this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval; this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval;
this.storeTokenTrackingId = false;
} }
/** should be called before this object is used */ /** should be called before this object is used */
@ -201,7 +207,7 @@ extends AbstractDelegationTokenIdentifier>
} }
if (currentTokens.get(identifier) == null) { if (currentTokens.get(identifier) == null) {
currentTokens.put(identifier, new DelegationTokenInformation(renewDate, currentTokens.put(identifier, new DelegationTokenInformation(renewDate,
password)); password, getTrackingIdIfEnabled(identifier)));
} else { } else {
throw new IOException( throw new IOException(
"Same delegation token being added twice."); "Same delegation token being added twice.");
@ -280,7 +286,7 @@ extends AbstractDelegationTokenIdentifier>
byte[] password = createPassword(identifier.getBytes(), currentKey.getKey()); byte[] password = createPassword(identifier.getBytes(), currentKey.getKey());
storeNewToken(identifier, now + tokenRenewInterval); storeNewToken(identifier, now + tokenRenewInterval);
currentTokens.put(identifier, new DelegationTokenInformation(now currentTokens.put(identifier, new DelegationTokenInformation(now
+ tokenRenewInterval, password)); + tokenRenewInterval, password, getTrackingIdIfEnabled(identifier)));
return password; return password;
} }
@ -299,6 +305,21 @@ extends AbstractDelegationTokenIdentifier>
return info.getPassword(); return info.getPassword();
} }
protected String getTrackingIdIfEnabled(TokenIdent ident) {
if (storeTokenTrackingId) {
return ident.getTrackingId();
}
return null;
}
public synchronized String getTokenTrackingId(TokenIdent identifier) {
DelegationTokenInformation info = currentTokens.get(identifier);
if (info == null) {
return null;
}
return info.getTrackingId();
}
/** /**
* Verifies that the given identifier and password are valid and match. * Verifies that the given identifier and password are valid and match.
* @param identifier Token identifier. * @param identifier Token identifier.
@ -359,8 +380,9 @@ extends AbstractDelegationTokenIdentifier>
+ " is trying to renew a token with " + "wrong password"); + " is trying to renew a token with " + "wrong password");
} }
long renewTime = Math.min(id.getMaxDate(), now + tokenRenewInterval); long renewTime = Math.min(id.getMaxDate(), now + tokenRenewInterval);
String trackingId = getTrackingIdIfEnabled(id);
DelegationTokenInformation info = new DelegationTokenInformation(renewTime, DelegationTokenInformation info = new DelegationTokenInformation(renewTime,
password); password, trackingId);
if (currentTokens.get(id) == null) { if (currentTokens.get(id) == null) {
throw new InvalidToken("Renewal request for unknown token"); throw new InvalidToken("Renewal request for unknown token");
@ -420,9 +442,17 @@ extends AbstractDelegationTokenIdentifier>
public static class DelegationTokenInformation { public static class DelegationTokenInformation {
long renewDate; long renewDate;
byte[] password; byte[] password;
String trackingId;
public DelegationTokenInformation(long renewDate, byte[] password) { public DelegationTokenInformation(long renewDate, byte[] password) {
this(renewDate, password, null);
}
public DelegationTokenInformation(long renewDate, byte[] password,
String trackingId) {
this.renewDate = renewDate; this.renewDate = renewDate;
this.password = password; this.password = password;
this.trackingId = trackingId;
} }
/** returns renew date */ /** returns renew date */
public long getRenewDate() { public long getRenewDate() {
@ -432,6 +462,10 @@ extends AbstractDelegationTokenIdentifier>
byte[] getPassword() { byte[] getPassword() {
return password; return password;
} }
/** returns tracking id */
public String getTrackingId() {
return trackingId;
}
} }
/** Remove expired delegation tokens from cache */ /** Remove expired delegation tokens from cache */

View File

@ -19,10 +19,10 @@ package org.apache.hadoop.mount;
import java.util.List; import java.util.List;
import org.apache.hadoop.nfs.security.NfsExports; import org.apache.hadoop.nfs.NfsExports;
import org.apache.hadoop.oncrpc.RpcAcceptedReply; import org.apache.hadoop.oncrpc.RpcAcceptedReply;
import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor; import org.apache.hadoop.oncrpc.security.RpcAuthInfo.AuthFlavor;
/** /**
* Helper class for sending MountResponse * Helper class for sending MountResponse
@ -40,8 +40,7 @@ public class MountResponse {
RpcAcceptedReply.voidReply(xdr, xid); RpcAcceptedReply.voidReply(xdr, xid);
xdr.writeInt(status); xdr.writeInt(status);
if (status == MNT_OK) { if (status == MNT_OK) {
xdr.writeInt(handle.length); xdr.writeVariableOpaque(handle);
xdr.writeFixedOpaque(handle);
// Only MountV3 returns a list of supported authFlavors // Only MountV3 returns a list of supported authFlavors
xdr.writeInt(1); xdr.writeInt(1);
xdr.writeInt(AuthFlavor.AUTH_SYS.getValue()); xdr.writeInt(AuthFlavor.AUTH_SYS.getValue());

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.nfs.security; package org.apache.hadoop.nfs;
public enum AccessPrivilege { public enum AccessPrivilege {
READ_ONLY, READ_ONLY,

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.nfs.security; package org.apache.hadoop.nfs;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.ArrayList; import java.util.ArrayList;

View File

@ -147,8 +147,10 @@ public class IdUserGroup {
synchronized public String getUserName(int uid, String unknown) { synchronized public String getUserName(int uid, String unknown) {
checkAndUpdateMaps(); checkAndUpdateMaps();
String uname = uidNameMap.get(Integer.valueOf(uid)); String uname = uidNameMap.get(uid);
if (uname == null) { if (uname == null) {
LOG.warn("Can't find user name for uid " + uid
+ ". Use default user name " + unknown);
uname = unknown; uname = unknown;
} }
return uname; return uname;
@ -156,8 +158,10 @@ public class IdUserGroup {
synchronized public String getGroupName(int gid, String unknown) { synchronized public String getGroupName(int gid, String unknown) {
checkAndUpdateMaps(); checkAndUpdateMaps();
String gname = gidNameMap.get(Integer.valueOf(gid)); String gname = gidNameMap.get(gid);
if (gname == null) { if (gname == null) {
LOG.warn("Can't find group name for gid " + gid
+ ". Use default group name " + unknown);
gname = unknown; gname = unknown;
} }
return gname; return gname;

View File

@ -205,4 +205,7 @@ public class Nfs3Constant {
public static final String FILE_DUMP_DIR_DEFAULT = "/tmp/.hdfs-nfs"; public static final String FILE_DUMP_DIR_DEFAULT = "/tmp/.hdfs-nfs";
public static final String ENABLE_FILE_DUMP_KEY = "dfs.nfs3.enableDump"; public static final String ENABLE_FILE_DUMP_KEY = "dfs.nfs3.enableDump";
public static final boolean ENABLE_FILE_DUMP_DEFAULT = true; public static final boolean ENABLE_FILE_DUMP_DEFAULT = true;
public final static String UNKNOWN_USER = "nobody";
public final static String UNKNOWN_GROUP = "nobody";
} }

View File

@ -72,19 +72,18 @@ public class Nfs3FileAttributes {
} }
public Nfs3FileAttributes() { public Nfs3FileAttributes() {
this(false, 0, (short)0, 0, 0, 0, 0, 0, 0, 0); this(NfsFileType.NFSREG, 0, (short)0, 0, 0, 0, 0, 0, 0, 0);
} }
public Nfs3FileAttributes(boolean isDir, int nlink, short mode, int uid, public Nfs3FileAttributes(NfsFileType nfsType, int nlink, short mode, int uid,
int gid, long size, long fsid, long fileid, long mtime, long atime) { int gid, long size, long fsid, long fileid, long mtime, long atime) {
this.type = isDir ? NfsFileType.NFSDIR.toValue() : NfsFileType.NFSREG this.type = nfsType.toValue();
.toValue();
this.mode = mode; this.mode = mode;
this.nlink = isDir ? (nlink + 2) : 1; this.nlink = (type == NfsFileType.NFSDIR.toValue()) ? (nlink + 2) : 1;
this.uid = uid; this.uid = uid;
this.gid = gid; this.gid = gid;
this.size = size; this.size = size;
if(isDir) { if(type == NfsFileType.NFSDIR.toValue()) {
this.size = getDirSize(nlink); this.size = getDirSize(nlink);
} }
this.used = this.size; this.used = this.size;

View File

@ -20,8 +20,8 @@ package org.apache.hadoop.nfs.nfs3;
import java.net.InetAddress; import java.net.InetAddress;
import org.apache.hadoop.nfs.nfs3.response.NFS3Response; import org.apache.hadoop.nfs.nfs3.response.NFS3Response;
import org.apache.hadoop.oncrpc.RpcAuthSys;
import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.SecurityHandler;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
/** /**
@ -33,54 +33,70 @@ public interface Nfs3Interface {
public NFS3Response nullProcedure(); public NFS3Response nullProcedure();
/** GETATTR: Get file attributes */ /** GETATTR: Get file attributes */
public NFS3Response getattr(XDR xdr, RpcAuthSys authSys, InetAddress client); public NFS3Response getattr(XDR xdr, SecurityHandler securityHandler,
InetAddress client);
/** SETATTR: Set file attributes */ /** SETATTR: Set file attributes */
public NFS3Response setattr(XDR xdr, RpcAuthSys authSys, InetAddress client); public NFS3Response setattr(XDR xdr, SecurityHandler securityHandler,
InetAddress client);
/** LOOKUP: Lookup filename */ /** LOOKUP: Lookup filename */
public NFS3Response lookup(XDR xdr, RpcAuthSys authSys, InetAddress client); public NFS3Response lookup(XDR xdr, SecurityHandler securityHandler,
InetAddress client);
/** ACCESS: Check access permission */ /** ACCESS: Check access permission */
public NFS3Response access(XDR xdr, RpcAuthSys authSys, InetAddress client); public NFS3Response access(XDR xdr, SecurityHandler securityHandler,
InetAddress client);
/** READ: Read from file */ /** READ: Read from file */
public NFS3Response read(XDR xdr, RpcAuthSys authSys, InetAddress client); public NFS3Response read(XDR xdr, SecurityHandler securityHandler,
InetAddress client);
/** WRITE: Write to file */ /** WRITE: Write to file */
public NFS3Response write(XDR xdr, Channel channel, int xid, public NFS3Response write(XDR xdr, Channel channel, int xid,
RpcAuthSys authSys, InetAddress client); SecurityHandler securityHandler, InetAddress client);
/** CREATE: Create a file */ /** CREATE: Create a file */
public NFS3Response create(XDR xdr, RpcAuthSys authSys, InetAddress client); public NFS3Response create(XDR xdr, SecurityHandler securityHandler,
InetAddress client);
/** MKDIR: Create a directory */ /** MKDIR: Create a directory */
public NFS3Response mkdir(XDR xdr, RpcAuthSys authSys, InetAddress client); public NFS3Response mkdir(XDR xdr, SecurityHandler securityHandler,
InetAddress client);
/** REMOVE: Remove a file */ /** REMOVE: Remove a file */
public NFS3Response remove(XDR xdr, RpcAuthSys authSys, InetAddress client); public NFS3Response remove(XDR xdr, SecurityHandler securityHandler,
InetAddress client);
/** RMDIR: Remove a directory */ /** RMDIR: Remove a directory */
public NFS3Response rmdir(XDR xdr, RpcAuthSys authSys, InetAddress client); public NFS3Response rmdir(XDR xdr, SecurityHandler securityHandler,
InetAddress client);
/** RENAME: Rename a file or directory */ /** RENAME: Rename a file or directory */
public NFS3Response rename(XDR xdr, RpcAuthSys authSys, InetAddress client); public NFS3Response rename(XDR xdr, SecurityHandler securityHandler,
InetAddress client);
/** SYMLINK: Create a symbolic link */ /** SYMLINK: Create a symbolic link */
public NFS3Response symlink(XDR xdr, RpcAuthSys authSys, InetAddress client); public NFS3Response symlink(XDR xdr, SecurityHandler securityHandler,
InetAddress client);
/** READDIR: Read From directory */ /** READDIR: Read From directory */
public NFS3Response readdir(XDR xdr, RpcAuthSys authSys, InetAddress client); public NFS3Response readdir(XDR xdr, SecurityHandler securityHandler,
InetAddress client);
/** FSSTAT: Get dynamic file system information */ /** FSSTAT: Get dynamic file system information */
public NFS3Response fsstat(XDR xdr, RpcAuthSys authSys, InetAddress client); public NFS3Response fsstat(XDR xdr, SecurityHandler securityHandler,
InetAddress client);
/** FSINFO: Get static file system information */ /** FSINFO: Get static file system information */
public NFS3Response fsinfo(XDR xdr, RpcAuthSys authSys, InetAddress client); public NFS3Response fsinfo(XDR xdr, SecurityHandler securityHandler,
InetAddress client);
/** PATHCONF: Retrieve POSIX information */ /** PATHCONF: Retrieve POSIX information */
public NFS3Response pathconf(XDR xdr, RpcAuthSys authSys, InetAddress client); public NFS3Response pathconf(XDR xdr, SecurityHandler securityHandler,
InetAddress client);
/** COMMIT: Commit cached data on a server to stable storage */ /** COMMIT: Commit cached data on a server to stable storage */
public NFS3Response commit(XDR xdr, RpcAuthSys authSys, InetAddress client); public NFS3Response commit(XDR xdr, SecurityHandler securityHandler,
InetAddress client);
} }

View File

@ -25,9 +25,9 @@ import org.apache.hadoop.oncrpc.XDR;
* SYMLINK3 Request * SYMLINK3 Request
*/ */
public class SYMLINK3Request extends RequestWithHandle { public class SYMLINK3Request extends RequestWithHandle {
private final String name; private final String name; // The name of the link
private final SetAttr3 symAttr; private final SetAttr3 symAttr;
private final String symData; private final String symData; // It contains the target
public SYMLINK3Request(XDR xdr) throws IOException { public SYMLINK3Request(XDR xdr) throws IOException {
super(xdr); super(xdr);

View File

@ -46,7 +46,7 @@ public class READLINK3Response extends NFS3Response {
out.writeBoolean(true); // Attribute follows out.writeBoolean(true); // Attribute follows
postOpSymlinkAttr.serialize(out); postOpSymlinkAttr.serialize(out);
if (getStatus() == Nfs3Status.NFS3_OK) { if (getStatus() == Nfs3Status.NFS3_OK) {
out.writeFixedOpaque(path, path.length); out.writeVariableOpaque(path);
} }
return out; return out;
} }

View File

@ -60,9 +60,9 @@ public class SYMLINK3Response extends NFS3Response {
if (this.getStatus() == Nfs3Status.NFS3_OK) { if (this.getStatus() == Nfs3Status.NFS3_OK) {
out.writeBoolean(true); out.writeBoolean(true);
objFileHandle.serialize(out); objFileHandle.serialize(out);
out.writeBoolean(true);
objPostOpAttr.serialize(out); objPostOpAttr.serialize(out);
} }
out.writeBoolean(true);
dirWcc.serialize(out); dirWcc.serialize(out);
return out; return out;

View File

@ -17,7 +17,9 @@
*/ */
package org.apache.hadoop.oncrpc; package org.apache.hadoop.oncrpc;
import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor; import org.apache.hadoop.oncrpc.security.Verifier;
import org.apache.hadoop.oncrpc.security.RpcAuthInfo;
import org.apache.hadoop.oncrpc.security.RpcAuthInfo.AuthFlavor;
/** /**
* Represents RPC message MSG_ACCEPTED reply body. See RFC 1831 for details. * Represents RPC message MSG_ACCEPTED reply body. See RFC 1831 for details.
@ -54,7 +56,7 @@ public class RpcAcceptedReply extends RpcReply {
public static RpcAcceptedReply read(int xid, RpcMessage.Type messageType, public static RpcAcceptedReply read(int xid, RpcMessage.Type messageType,
ReplyState replyState, XDR xdr) { ReplyState replyState, XDR xdr) {
RpcAuthInfo verifier = RpcAuthInfo.read(xdr); Verifier verifier = Verifier.readFlavorAndVerifier(xdr);
AcceptState acceptState = AcceptState.fromValue(xdr.readInt()); AcceptState acceptState = AcceptState.fromValue(xdr.readInt());
return new RpcAcceptedReply(xid, messageType, replyState, verifier, return new RpcAcceptedReply(xid, messageType, replyState, verifier,
acceptState); acceptState);

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.oncrpc;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.oncrpc.security.Credentials;
import org.apache.hadoop.oncrpc.security.Verifier;
/** /**
* Represents an RPC message of type RPC call as defined in RFC 1831 * Represents an RPC message of type RPC call as defined in RFC 1831
@ -30,11 +32,12 @@ public class RpcCall extends RpcMessage {
private final int program; private final int program;
private final int version; private final int version;
private final int procedure; private final int procedure;
private final RpcAuthInfo credential; private final Credentials credential;
private final RpcAuthInfo verifier; private final Verifier verifier;
protected RpcCall(int xid, RpcMessage.Type messageType, int rpcVersion, int program, protected RpcCall(int xid, RpcMessage.Type messageType, int rpcVersion,
int version, int procedure, RpcAuthInfo credential, RpcAuthInfo verifier) { int program, int version, int procedure, Credentials credential,
Verifier verifier) {
super(xid, messageType); super(xid, messageType);
this.rpcVersion = rpcVersion; this.rpcVersion = rpcVersion;
this.program = program; this.program = program;
@ -79,19 +82,19 @@ public class RpcCall extends RpcMessage {
return procedure; return procedure;
} }
public RpcAuthInfo getCredential() { public Credentials getCredential() {
return credential; return credential;
} }
public RpcAuthInfo getVerifier() { public Verifier getVerifier() {
return verifier; return verifier;
} }
public static RpcCall read(XDR xdr) { public static RpcCall read(XDR xdr) {
return new RpcCall(xdr.readInt(), RpcMessage.Type.fromValue(xdr.readInt()), return new RpcCall(xdr.readInt(), RpcMessage.Type.fromValue(xdr.readInt()),
xdr.readInt(), xdr.readInt(), xdr.readInt(), xdr.readInt(), xdr.readInt(), xdr.readInt(),
xdr.readInt(), xdr.readInt(), RpcAuthInfo.read(xdr), Credentials.readFlavorAndCredentials(xdr),
RpcAuthInfo.read(xdr)); Verifier.readFlavorAndVerifier(xdr));
} }
public static void write(XDR out, int xid, int program, int progVersion, public static void write(XDR out, int xid, int program, int progVersion,

View File

@ -17,7 +17,7 @@
*/ */
package org.apache.hadoop.oncrpc; package org.apache.hadoop.oncrpc;
import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor; import org.apache.hadoop.oncrpc.security.RpcAuthInfo.AuthFlavor;
/** /**
* Represents RPC message MSG_DENIED reply body. See RFC 1831 for details. * Represents RPC message MSG_DENIED reply body. See RFC 1831 for details.

View File

@ -280,7 +280,7 @@ public class XDR {
public byte[] readVariableOpaque() { public byte[] readVariableOpaque() {
int size = this.readInt(); int size = this.readInt();
return size != 0 ? this.readFixedOpaque(size) : null; return size != 0 ? this.readFixedOpaque(size) : new byte[0];
} }
public void skipVariableOpaque() { public void skipVariableOpaque() {

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.oncrpc.security;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.oncrpc.XDR;
/**
* Base class for all credentials. Currently we only support 3 different types
* of auth flavors: AUTH_NONE, AUTH_SYS, and RPCSEC_GSS.
*/
public abstract class Credentials extends RpcAuthInfo {
public static final Log LOG = LogFactory.getLog(Credentials.class);
public static Credentials readFlavorAndCredentials(XDR xdr) {
AuthFlavor flavor = AuthFlavor.fromValue(xdr.readInt());
final Credentials credentials;
if(flavor == AuthFlavor.AUTH_NONE) {
credentials = new CredentialsNone();
} else if(flavor == AuthFlavor.AUTH_SYS) {
credentials = new CredentialsSys();
} else if(flavor == AuthFlavor.RPCSEC_GSS) {
credentials = new CredentialsGSS();
} else {
throw new UnsupportedOperationException("Unsupported Credentials Flavor "
+ flavor);
}
credentials.read(xdr);
return credentials;
}
protected int mCredentialsLength;
protected Credentials(AuthFlavor flavor) {
super(flavor);
}
}

View File

@ -15,37 +15,27 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.oncrpc; package org.apache.hadoop.oncrpc.security;
/** import org.apache.hadoop.oncrpc.XDR;
* AUTH_SYS as defined in RFC 1831
*/
public class RpcAuthSys {
private final int uid;
private final int gid;
public RpcAuthSys(int uid, int gid) { /** Credential used by RPCSEC_GSS */
this.uid = uid; public class CredentialsGSS extends Credentials {
this.gid = gid;
}
public static RpcAuthSys from(byte[] credentials) { public CredentialsGSS() {
XDR sys = new XDR(credentials); super(AuthFlavor.RPCSEC_GSS);
sys.skip(4); // Stamp
sys.skipVariableOpaque(); // Machine name
return new RpcAuthSys(sys.readInt(), sys.readInt());
}
public int getUid() {
return uid;
}
public int getGid() {
return gid;
} }
@Override @Override
public String toString() { public void read(XDR xdr) {
return "(AuthSys: uid=" + uid + " gid=" + gid + ")"; // TODO Auto-generated method stub
} }
@Override
public void write(XDR xdr) {
// TODO Auto-generated method stub
}
} }

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.oncrpc.security;
import org.apache.hadoop.oncrpc.XDR;
import com.google.common.base.Preconditions;
/** Credential used by AUTH_NONE */
public class CredentialsNone extends Credentials {
public CredentialsNone() {
super(AuthFlavor.AUTH_NONE);
mCredentialsLength = 0;
}
@Override
public void read(XDR xdr) {
mCredentialsLength = xdr.readInt();
Preconditions.checkState(mCredentialsLength == 0);
}
@Override
public void write(XDR xdr) {
Preconditions.checkState(mCredentialsLength == 0);
xdr.writeInt(mCredentialsLength);
}
}

View File

@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.oncrpc.security;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.hadoop.oncrpc.XDR;
/** Credential used by AUTH_SYS */
public class CredentialsSys extends Credentials {
private static final String HOSTNAME;
static {
try {
String s = InetAddress.getLocalHost().getHostName();
HOSTNAME = s;
if(LOG.isDebugEnabled()) {
LOG.debug("HOSTNAME = " + HOSTNAME);
}
} catch (UnknownHostException e) {
LOG.error("Error setting HOSTNAME", e);
throw new RuntimeException(e);
}
}
protected int mUID, mGID;
protected int[] mAuxGIDs;
protected String mHostName;
protected int mStamp;
public CredentialsSys() {
super(AuthFlavor.AUTH_SYS);
this.mCredentialsLength = 0;
this.mHostName = HOSTNAME;
}
public int getGID() {
return mGID;
}
public int getUID() {
return mUID;
}
public void setGID(int gid) {
this.mGID = gid;
}
public void setUID(int uid) {
this.mUID = uid;
}
public void setStamp(int stamp) {
this.mStamp = stamp;
}
@Override
public void read(XDR xdr) {
mCredentialsLength = xdr.readInt();
mStamp = xdr.readInt();
mHostName = xdr.readString();
mUID = xdr.readInt();
mGID = xdr.readInt();
int length = xdr.readInt();
mAuxGIDs = new int[length];
for (int i = 0; i < length; i++) {
mAuxGIDs[i] = xdr.readInt();
}
}
@Override
public void write(XDR xdr) {
// mStamp + mHostName.length + mHostName + mUID + mGID + mAuxGIDs.count
mCredentialsLength = 20 + mHostName.getBytes().length;
// mAuxGIDs
if (mAuxGIDs != null && mAuxGIDs.length > 0) {
mCredentialsLength += mAuxGIDs.length * 4;
}
xdr.writeInt(mCredentialsLength);
xdr.writeInt(mStamp);
xdr.writeString(mHostName);
xdr.writeInt(mUID);
xdr.writeInt(mGID);
if((mAuxGIDs == null) || (mAuxGIDs.length == 0)) {
xdr.writeInt(0);
} else {
xdr.writeInt(mAuxGIDs.length);
for (int i = 0; i < mAuxGIDs.length; i++) {
xdr.writeInt(mAuxGIDs[i]);
}
}
}
}

View File

@ -15,14 +15,14 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.oncrpc; package org.apache.hadoop.oncrpc.security;
import java.util.Arrays; import org.apache.hadoop.oncrpc.XDR;
/** /**
* Authentication Info as defined in RFC 1831 * Authentication Info. Base class of Verifier and Credential.
*/ */
public class RpcAuthInfo { public abstract class RpcAuthInfo {
/** Different types of authentication as defined in RFC 1831 */ /** Different types of authentication as defined in RFC 1831 */
public enum AuthFlavor { public enum AuthFlavor {
AUTH_NONE(0), AUTH_NONE(0),
@ -52,28 +52,21 @@ public class RpcAuthInfo {
} }
private final AuthFlavor flavor; private final AuthFlavor flavor;
private final byte[] body;
protected RpcAuthInfo(AuthFlavor flavor, byte[] body) { protected RpcAuthInfo(AuthFlavor flavor) {
this.flavor = flavor; this.flavor = flavor;
this.body = body;
} }
public static RpcAuthInfo read(XDR xdr) { /** Load auth info */
int type = xdr.readInt(); public abstract void read(XDR xdr);
AuthFlavor flavor = AuthFlavor.fromValue(type);
byte[] body = xdr.readVariableOpaque(); /** Write auth info */
return new RpcAuthInfo(flavor, body); public abstract void write(XDR xdr);
}
public AuthFlavor getFlavor() { public AuthFlavor getFlavor() {
return flavor; return flavor;
} }
public byte[] getBody() {
return Arrays.copyOf(body, body.length);
}
@Override @Override
public String toString() { public String toString() {
return "(AuthFlavor:" + flavor + ")"; return "(AuthFlavor:" + flavor + ")";

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.oncrpc.security;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.oncrpc.RpcCall;
import org.apache.hadoop.oncrpc.XDR;
public abstract class SecurityHandler {
public static final Log LOG = LogFactory.getLog(SecurityHandler.class);
public abstract String getUser();
public abstract boolean shouldSilentlyDrop(RpcCall request);
public abstract Verifier getVerifer(RpcCall request) throws IOException;
public boolean isUnwrapRequired() {
return false;
}
public boolean isWrapRequired() {
return false;
}
/** Used by GSS */
public XDR unwrap(RpcCall request, byte[] data ) throws IOException {
throw new UnsupportedOperationException();
}
/** Used by GSS */
public byte[] wrap(RpcCall request, XDR response) throws IOException {
throw new UnsupportedOperationException();
}
/** Used by AUTH_SYS */
public int getUid() {
throw new UnsupportedOperationException();
}
/** Used by AUTH_SYS */
public int getGid() {
throw new UnsupportedOperationException();
}
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.oncrpc.security;
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.apache.hadoop.oncrpc.RpcCall;
public class SysSecurityHandler extends SecurityHandler {
private final IdUserGroup iug;
private final CredentialsSys mCredentialsSys;
public SysSecurityHandler(CredentialsSys credentialsSys,
IdUserGroup iug) {
this.mCredentialsSys = credentialsSys;
this.iug = iug;
}
@Override
public String getUser() {
return iug.getUserName(mCredentialsSys.getUID(), Nfs3Constant.UNKNOWN_USER);
}
@Override
public boolean shouldSilentlyDrop(RpcCall request) {
return false;
}
@Override
public VerifierNone getVerifer(RpcCall request) {
return new VerifierNone();
}
@Override
public int getUid() {
return mCredentialsSys.getUID();
}
@Override
public int getGid() {
return mCredentialsSys.getGID();
}
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.oncrpc.security;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.RpcAuthInfo.AuthFlavor;
/**
* Base class for verifier. Currently we only support 3 types of auth flavors:
* {@link AuthFlavor#AUTH_NONE}, {@link AuthFlavor#AUTH_SYS},
* and {@link AuthFlavor#RPCSEC_GSS}.
*/
public abstract class Verifier extends RpcAuthInfo {
protected Verifier(AuthFlavor flavor) {
super(flavor);
}
public static Verifier readFlavorAndVerifier(XDR xdr) {
AuthFlavor flavor = AuthFlavor.fromValue(xdr.readInt());
final Verifier verifer;
if(flavor == AuthFlavor.AUTH_NONE) {
verifer = new VerifierNone();
} else if(flavor == AuthFlavor.RPCSEC_GSS) {
verifer = new VerifierGSS();
} else {
throw new UnsupportedOperationException("Unsupported verifier flavor"
+ flavor);
}
verifer.read(xdr);
return verifer;
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.oncrpc.security;
import org.apache.hadoop.oncrpc.XDR;
/** Verifier mapped to RPCSEC_GSS. */
public class VerifierGSS extends Verifier {
public VerifierGSS() {
super(AuthFlavor.RPCSEC_GSS);
}
@Override
public void read(XDR xdr) {
// TODO Auto-generated method stub
}
@Override
public void write(XDR xdr) {
// TODO Auto-generated method stub
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.oncrpc.security;
import org.apache.hadoop.oncrpc.XDR;
import com.google.common.base.Preconditions;
/** Verifier used by AUTH_NONE. */
public class VerifierNone extends Verifier {
public VerifierNone() {
super(AuthFlavor.AUTH_NONE);
}
@Override
public void read(XDR xdr) {
int length = xdr.readInt();
Preconditions.checkState(length == 0);
}
@Override
public void write(XDR xdr) {
xdr.writeInt(0);
}
}

View File

@ -17,10 +17,14 @@
*/ */
package org.apache.hadoop.portmap; package org.apache.hadoop.portmap;
import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
import org.apache.hadoop.oncrpc.RpcCall; import org.apache.hadoop.oncrpc.RpcCall;
import org.apache.hadoop.oncrpc.RpcUtil; import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.CredentialsNone;
import org.apache.hadoop.oncrpc.security.Credentials;
import org.apache.hadoop.oncrpc.security.Verifier;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.oncrpc.security.RpcAuthInfo.AuthFlavor;
import org.apache.hadoop.portmap.PortmapInterface.Procedure; import org.apache.hadoop.portmap.PortmapInterface.Procedure;
/** /**
@ -38,9 +42,11 @@ public class PortmapRequest {
RpcProgramPortmap.PROGRAM, RpcProgramPortmap.VERSION, RpcProgramPortmap.PROGRAM, RpcProgramPortmap.VERSION,
Procedure.PMAPPROC_SET.getValue()); Procedure.PMAPPROC_SET.getValue());
request.writeInt(AuthFlavor.AUTH_NONE.getValue()); request.writeInt(AuthFlavor.AUTH_NONE.getValue());
request.writeInt(0); Credentials credential = new CredentialsNone();
request.writeInt(0); credential.write(request);
request.writeInt(0); request.writeInt(AuthFlavor.AUTH_NONE.getValue());
Verifier verifier = new VerifierNone();
verifier.write(request);
return mapping.serialize(request); return mapping.serialize(request);
} }
} }

View File

@ -15,10 +15,12 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.nfs.security; package org.apache.hadoop.nfs;
import junit.framework.Assert; import junit.framework.Assert;
import org.apache.hadoop.nfs.AccessPrivilege;
import org.apache.hadoop.nfs.NfsExports;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant; import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.junit.Test; import org.junit.Test;

View File

@ -20,8 +20,9 @@ package org.apache.hadoop.oncrpc;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState; import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
import org.apache.hadoop.oncrpc.RpcReply.ReplyState; import org.apache.hadoop.oncrpc.RpcReply.ReplyState;
import org.apache.hadoop.oncrpc.security.Verifier;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.junit.Test; import org.junit.Test;
/** /**
@ -45,7 +46,7 @@ public class TestRpcAcceptedReply {
@Test @Test
public void testConstructor() { public void testConstructor() {
RpcAuthInfo verifier = new RpcAuthInfo(AuthFlavor.AUTH_NONE, new byte[0]); Verifier verifier = new VerifierNone();
RpcAcceptedReply reply = new RpcAcceptedReply(0, RpcMessage.Type.RPC_REPLY, RpcAcceptedReply reply = new RpcAcceptedReply(0, RpcMessage.Type.RPC_REPLY,
ReplyState.MSG_ACCEPTED, verifier, AcceptState.SUCCESS); ReplyState.MSG_ACCEPTED, verifier, AcceptState.SUCCESS);
assertEquals(0, reply.getXid()); assertEquals(0, reply.getXid());

View File

@ -17,8 +17,12 @@
*/ */
package org.apache.hadoop.oncrpc; package org.apache.hadoop.oncrpc;
import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import org.apache.hadoop.oncrpc.security.CredentialsNone;
import org.apache.hadoop.oncrpc.security.Credentials;
import org.apache.hadoop.oncrpc.security.Verifier;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.junit.Test; import org.junit.Test;
/** /**
@ -28,8 +32,8 @@ public class TestRpcCall {
@Test @Test
public void testConstructor() { public void testConstructor() {
RpcAuthInfo credential = new RpcAuthInfo(AuthFlavor.AUTH_NONE, new byte[0]); Credentials credential = new CredentialsNone();
RpcAuthInfo verifier = new RpcAuthInfo(AuthFlavor.AUTH_NONE, new byte[0]); Verifier verifier = new VerifierNone();
int rpcVersion = RpcCall.RPC_VERSION; int rpcVersion = RpcCall.RPC_VERSION;
int program = 2; int program = 2;
int version = 3; int version = 3;

View File

@ -15,31 +15,32 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.oncrpc; package org.apache.hadoop.oncrpc.security;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.CredentialsSys;
import org.junit.Test; import org.junit.Test;
/** /**
* Test for {@link RpcAuthSys} * Test for {@link CredentialsSys}
*/ */
public class TestRpcAuthSys { public class TestCredentialsSys {
@Test
public void testConstructor() {
RpcAuthSys auth = new RpcAuthSys(0, 1);
assertEquals(0, auth.getUid());
assertEquals(1, auth.getGid());
}
@Test @Test
public void testRead() { public void testReadWrite() {
byte[] bytes = {0, 1, 2, 3}; // 4 bytes Stamp CredentialsSys credential = new CredentialsSys();
bytes = XDR.append(bytes, XDR.getVariableOpque(new byte[0])); credential.setUID(0);
bytes = XDR.append(bytes, XDR.toBytes(0)); // gid credential.setGID(1);
bytes = XDR.append(bytes, XDR.toBytes(1)); // uid
RpcAuthSys auth = RpcAuthSys.from(bytes); XDR xdr = new XDR();
assertEquals(0, auth.getUid()); credential.write(xdr);
assertEquals(1, auth.getGid());
CredentialsSys newCredential = new CredentialsSys();
newCredential.read(xdr);
assertEquals(0, newCredential.getUID());
assertEquals(1, newCredential.getGID());
} }
} }

View File

@ -15,14 +15,12 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.oncrpc; package org.apache.hadoop.oncrpc.security;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Arrays; import org.apache.hadoop.oncrpc.security.RpcAuthInfo;
import org.apache.hadoop.oncrpc.security.RpcAuthInfo.AuthFlavor;
import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
import org.junit.Test; import org.junit.Test;
/** /**
@ -42,12 +40,4 @@ public class TestRpcAuthInfo {
public void testInvalidAuthFlavor() { public void testInvalidAuthFlavor() {
assertEquals(AuthFlavor.AUTH_NONE, AuthFlavor.fromValue(4)); assertEquals(AuthFlavor.AUTH_NONE, AuthFlavor.fromValue(4));
} }
@Test
public void testConsturctor() {
byte[] body = new byte[0];
RpcAuthInfo auth = new RpcAuthInfo(AuthFlavor.AUTH_NONE, body);
assertEquals(AuthFlavor.AUTH_NONE, auth.getFlavor());
assertTrue(Arrays.equals(body, auth.getBody()));
}
} }

View File

@ -32,10 +32,10 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.mount.MountEntry; import org.apache.hadoop.mount.MountEntry;
import org.apache.hadoop.mount.MountInterface; import org.apache.hadoop.mount.MountInterface;
import org.apache.hadoop.mount.MountResponse; import org.apache.hadoop.mount.MountResponse;
import org.apache.hadoop.nfs.AccessPrivilege;
import org.apache.hadoop.nfs.NfsExports;
import org.apache.hadoop.nfs.nfs3.FileHandle; import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.Nfs3Status; import org.apache.hadoop.nfs.nfs3.Nfs3Status;
import org.apache.hadoop.nfs.security.AccessPrivilege;
import org.apache.hadoop.nfs.security.NfsExports;
import org.apache.hadoop.oncrpc.RpcAcceptedReply; import org.apache.hadoop.oncrpc.RpcAcceptedReply;
import org.apache.hadoop.oncrpc.RpcCall; import org.apache.hadoop.oncrpc.RpcCall;
import org.apache.hadoop.oncrpc.RpcProgram; import org.apache.hadoop.oncrpc.RpcProgram;

View File

@ -49,7 +49,7 @@ public class Nfs3Utils {
public static HdfsFileStatus getFileStatus(DFSClient client, String fileIdPath) public static HdfsFileStatus getFileStatus(DFSClient client, String fileIdPath)
throws IOException { throws IOException {
return client.getFileInfo(fileIdPath); return client.getFileLinkInfo(fileIdPath);
} }
public static Nfs3FileAttributes getNfs3FileAttrFromFileStatus( public static Nfs3FileAttributes getNfs3FileAttrFromFileStatus(
@ -59,7 +59,10 @@ public class Nfs3Utils {
* client takes only the lower 32bit of the fileId and treats it as signed * client takes only the lower 32bit of the fileId and treats it as signed
* int. When the 32th bit is 1, the client considers it invalid. * int. When the 32th bit is 1, the client considers it invalid.
*/ */
return new Nfs3FileAttributes(fs.isDir(), fs.getChildrenNum(), fs NfsFileType fileType = fs.isDir() ? NfsFileType.NFSDIR : NfsFileType.NFSREG;
fileType = fs.isSymlink() ? NfsFileType.NFSLNK : fileType;
return new Nfs3FileAttributes(fileType, fs.getChildrenNum(), fs
.getPermission().toShort(), iug.getUidAllowingUnknown(fs.getOwner()), .getPermission().toShort(), iug.getUidAllowingUnknown(fs.getOwner()),
iug.getGidAllowingUnknown(fs.getGroup()), fs.getLen(), 0 /* fsid */, iug.getGidAllowingUnknown(fs.getGroup()), fs.getLen(), 0 /* fsid */,
fs.getFileId(), fs.getModificationTime(), fs.getAccessTime()); fs.getFileId(), fs.getModificationTime(), fs.getAccessTime());

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.nfs.nfs3; package org.apache.hadoop.hdfs.nfs.nfs3;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -42,6 +43,9 @@ import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.nfs.AccessPrivilege;
import org.apache.hadoop.nfs.NfsExports;
import org.apache.hadoop.nfs.NfsFileType;
import org.apache.hadoop.nfs.NfsTime; import org.apache.hadoop.nfs.NfsTime;
import org.apache.hadoop.nfs.nfs3.FileHandle; import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.IdUserGroup; import org.apache.hadoop.nfs.nfs3.IdUserGroup;
@ -63,10 +67,12 @@ import org.apache.hadoop.nfs.nfs3.request.PATHCONF3Request;
import org.apache.hadoop.nfs.nfs3.request.READ3Request; import org.apache.hadoop.nfs.nfs3.request.READ3Request;
import org.apache.hadoop.nfs.nfs3.request.READDIR3Request; import org.apache.hadoop.nfs.nfs3.request.READDIR3Request;
import org.apache.hadoop.nfs.nfs3.request.READDIRPLUS3Request; import org.apache.hadoop.nfs.nfs3.request.READDIRPLUS3Request;
import org.apache.hadoop.nfs.nfs3.request.READLINK3Request;
import org.apache.hadoop.nfs.nfs3.request.REMOVE3Request; import org.apache.hadoop.nfs.nfs3.request.REMOVE3Request;
import org.apache.hadoop.nfs.nfs3.request.RENAME3Request; import org.apache.hadoop.nfs.nfs3.request.RENAME3Request;
import org.apache.hadoop.nfs.nfs3.request.RMDIR3Request; import org.apache.hadoop.nfs.nfs3.request.RMDIR3Request;
import org.apache.hadoop.nfs.nfs3.request.SETATTR3Request; import org.apache.hadoop.nfs.nfs3.request.SETATTR3Request;
import org.apache.hadoop.nfs.nfs3.request.SYMLINK3Request;
import org.apache.hadoop.nfs.nfs3.request.SetAttr3; import org.apache.hadoop.nfs.nfs3.request.SetAttr3;
import org.apache.hadoop.nfs.nfs3.request.SetAttr3.SetAttrField; import org.apache.hadoop.nfs.nfs3.request.SetAttr3.SetAttrField;
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request; import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
@ -96,16 +102,18 @@ import org.apache.hadoop.nfs.nfs3.response.VoidResponse;
import org.apache.hadoop.nfs.nfs3.response.WRITE3Response; import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
import org.apache.hadoop.nfs.nfs3.response.WccAttr; import org.apache.hadoop.nfs.nfs3.response.WccAttr;
import org.apache.hadoop.nfs.nfs3.response.WccData; import org.apache.hadoop.nfs.nfs3.response.WccData;
import org.apache.hadoop.nfs.security.AccessPrivilege;
import org.apache.hadoop.nfs.security.NfsExports;
import org.apache.hadoop.oncrpc.RpcAcceptedReply; import org.apache.hadoop.oncrpc.RpcAcceptedReply;
import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
import org.apache.hadoop.oncrpc.RpcAuthSys;
import org.apache.hadoop.oncrpc.RpcCall; import org.apache.hadoop.oncrpc.RpcCall;
import org.apache.hadoop.oncrpc.RpcDeniedReply; import org.apache.hadoop.oncrpc.RpcDeniedReply;
import org.apache.hadoop.oncrpc.RpcProgram; import org.apache.hadoop.oncrpc.RpcProgram;
import org.apache.hadoop.oncrpc.RpcReply; import org.apache.hadoop.oncrpc.RpcReply;
import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.CredentialsSys;
import org.apache.hadoop.oncrpc.security.Credentials;
import org.apache.hadoop.oncrpc.security.Verifier;
import org.apache.hadoop.oncrpc.security.SecurityHandler;
import org.apache.hadoop.oncrpc.security.SysSecurityHandler;
import org.apache.hadoop.oncrpc.security.RpcAuthInfo.AuthFlavor;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
@ -205,8 +213,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
@Override @Override
public GETATTR3Response getattr(XDR xdr, RpcAuthSys authSys, public GETATTR3Response getattr(XDR xdr,
InetAddress client) { SecurityHandler securityHandler, InetAddress client) {
GETATTR3Response response = new GETATTR3Response(Nfs3Status.NFS3_OK); GETATTR3Response response = new GETATTR3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@ -214,8 +222,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
String uname = authSysCheck(authSys); DFSClient dfsClient = clientCache.get(securityHandler.getUser());
DFSClient dfsClient = clientCache.get(uname);
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -268,9 +275,9 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
if (updateFields.contains(SetAttrField.UID) if (updateFields.contains(SetAttrField.UID)
|| updateFields.contains(SetAttrField.GID)) { || updateFields.contains(SetAttrField.GID)) {
String uname = updateFields.contains(SetAttrField.UID) ? iug.getUserName( String uname = updateFields.contains(SetAttrField.UID) ? iug.getUserName(
newAttr.getUid(), UNKNOWN_USER) : null; newAttr.getUid(), Nfs3Constant.UNKNOWN_USER) : null;
String gname = updateFields.contains(SetAttrField.GID) ? iug String gname = updateFields.contains(SetAttrField.GID) ? iug
.getGroupName(newAttr.getGid(), UNKNOWN_GROUP) : null; .getGroupName(newAttr.getGid(), Nfs3Constant.UNKNOWN_GROUP) : null;
dfsClient.setOwner(fileIdPath, uname, gname); dfsClient.setOwner(fileIdPath, uname, gname);
} }
@ -287,11 +294,10 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
@Override @Override
public SETATTR3Response setattr(XDR xdr, RpcAuthSys authSys, public SETATTR3Response setattr(XDR xdr,
InetAddress client) { SecurityHandler securityHandler, InetAddress client) {
SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK); SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK);
String uname = authSysCheck(authSys); DFSClient dfsClient = clientCache.get(securityHandler.getUser());
DFSClient dfsClient = clientCache.get(uname);
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -364,7 +370,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
@Override @Override
public LOOKUP3Response lookup(XDR xdr, RpcAuthSys authSys, InetAddress client) { public LOOKUP3Response lookup(XDR xdr,
SecurityHandler securityHandler, InetAddress client) {
LOOKUP3Response response = new LOOKUP3Response(Nfs3Status.NFS3_OK); LOOKUP3Response response = new LOOKUP3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@ -372,8 +379,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
String uname = authSysCheck(authSys); DFSClient dfsClient = clientCache.get(securityHandler.getUser());
DFSClient dfsClient = clientCache.get(uname);
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -426,7 +432,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
@Override @Override
public ACCESS3Response access(XDR xdr, RpcAuthSys authSys, InetAddress client) { public ACCESS3Response access(XDR xdr,
SecurityHandler securityHandler, InetAddress client) {
ACCESS3Response response = new ACCESS3Response(Nfs3Status.NFS3_OK); ACCESS3Response response = new ACCESS3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@ -434,8 +441,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
String uname = authSysCheck(authSys); DFSClient dfsClient = clientCache.get(securityHandler.getUser());
DFSClient dfsClient = clientCache.get(uname);
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -464,8 +470,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
LOG.error("Can't get path for fileId:" + handle.getFileId()); LOG.error("Can't get path for fileId:" + handle.getFileId());
return new ACCESS3Response(Nfs3Status.NFS3ERR_STALE); return new ACCESS3Response(Nfs3Status.NFS3ERR_STALE);
} }
int access = Nfs3Utils.getAccessRightsForUserGroup(authSys.getUid(), int access = Nfs3Utils.getAccessRightsForUserGroup(
authSys.getGid(), attrs); securityHandler.getUid(), securityHandler.getGid(), attrs);
return new ACCESS3Response(Nfs3Status.NFS3_OK, attrs, access); return new ACCESS3Response(Nfs3Status.NFS3_OK, attrs, access);
} catch (IOException e) { } catch (IOException e) {
@ -474,13 +480,75 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
} }
public READLINK3Response readlink(XDR xdr, RpcAuthSys authSys, public READLINK3Response readlink(XDR xdr, SecurityHandler securityHandler,
InetAddress client) { InetAddress client) {
return new READLINK3Response(Nfs3Status.NFS3ERR_NOTSUPP); READLINK3Response response = new READLINK3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
response.setStatus(Nfs3Status.NFS3ERR_ACCES);
return response;
}
DFSClient dfsClient = clientCache.get(securityHandler.getUser());
if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response;
}
READLINK3Request request = null;
try {
request = new READLINK3Request(xdr);
} catch (IOException e) {
LOG.error("Invalid READLINK request");
return new READLINK3Response(Nfs3Status.NFS3ERR_INVAL);
}
FileHandle handle = request.getHandle();
if (LOG.isDebugEnabled()) {
LOG.debug("NFS READLINK fileId: " + handle.getFileId());
}
String fileIdPath = Nfs3Utils.getFileIdPath(handle);
try {
String target = dfsClient.getLinkTarget(fileIdPath);
Nfs3FileAttributes postOpAttr = Nfs3Utils.getFileAttr(dfsClient,
fileIdPath, iug);
if (postOpAttr == null) {
LOG.info("Can't get path for fileId:" + handle.getFileId());
return new READLINK3Response(Nfs3Status.NFS3ERR_STALE);
}
if (postOpAttr.getType() != NfsFileType.NFSLNK.toValue()) {
LOG.error("Not a symlink, fileId:" + handle.getFileId());
return new READLINK3Response(Nfs3Status.NFS3ERR_INVAL);
}
if (target == null) {
LOG.error("Symlink target should not be null, fileId:"
+ handle.getFileId());
return new READLINK3Response(Nfs3Status.NFS3ERR_SERVERFAULT);
}
if (MAX_READ_TRANSFER_SIZE < target.getBytes().length) {
return new READLINK3Response(Nfs3Status.NFS3ERR_IO, postOpAttr, null);
}
return new READLINK3Response(Nfs3Status.NFS3_OK, postOpAttr,
target.getBytes());
} catch (IOException e) {
LOG.warn("Readlink error: " + e.getClass(), e);
if (e instanceof FileNotFoundException) {
return new READLINK3Response(Nfs3Status.NFS3ERR_STALE);
} else if (e instanceof AccessControlException) {
return new READLINK3Response(Nfs3Status.NFS3ERR_ACCES);
}
return new READLINK3Response(Nfs3Status.NFS3ERR_IO);
}
} }
@Override @Override
public READ3Response read(XDR xdr, RpcAuthSys authSys, InetAddress client) { public READ3Response read(XDR xdr, SecurityHandler securityHandler,
InetAddress client) {
READ3Response response = new READ3Response(Nfs3Status.NFS3_OK); READ3Response response = new READ3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@ -488,8 +556,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
String uname = authSysCheck(authSys); DFSClient dfsClient = clientCache.get(securityHandler.getUser());
DFSClient dfsClient = clientCache.get(uname);
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -534,8 +601,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
return new READ3Response(Nfs3Status.NFS3ERR_NOENT); return new READ3Response(Nfs3Status.NFS3ERR_NOENT);
} }
int access = Nfs3Utils.getAccessRightsForUserGroup(authSys.getUid(), int access = Nfs3Utils.getAccessRightsForUserGroup(
authSys.getGid(), attrs); securityHandler.getUid(), securityHandler.getGid(), attrs);
if ((access & Nfs3Constant.ACCESS3_READ) != 0) { if ((access & Nfs3Constant.ACCESS3_READ) != 0) {
eof = offset < attrs.getSize() ? false : true; eof = offset < attrs.getSize() ? false : true;
return new READ3Response(Nfs3Status.NFS3_OK, attrs, 0, eof, return new READ3Response(Nfs3Status.NFS3_OK, attrs, 0, eof,
@ -578,10 +645,10 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
@Override @Override
public WRITE3Response write(XDR xdr, Channel channel, int xid, public WRITE3Response write(XDR xdr, Channel channel, int xid,
RpcAuthSys authSys, InetAddress client) { SecurityHandler securityHandler, InetAddress client) {
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK); WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK);
String uname = authSysCheck(authSys);
DFSClient dfsClient = clientCache.get(uname); DFSClient dfsClient = clientCache.get(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -653,10 +720,10 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
@Override @Override
public CREATE3Response create(XDR xdr, RpcAuthSys authSys, InetAddress client) { public CREATE3Response create(XDR xdr,
SecurityHandler securityHandler, InetAddress client) {
CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK); CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK);
String uname = authSysCheck(authSys); DFSClient dfsClient = clientCache.get(securityHandler.getUser());
DFSClient dfsClient = clientCache.get(uname);
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -725,7 +792,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
// Set group if it's not specified in the request. // Set group if it's not specified in the request.
if (!setAttr3.getUpdateFields().contains(SetAttrField.GID)) { if (!setAttr3.getUpdateFields().contains(SetAttrField.GID)) {
setAttr3.getUpdateFields().add(SetAttrField.GID); setAttr3.getUpdateFields().add(SetAttrField.GID);
setAttr3.setGid(authSys.getGid()); setAttr3.setGid(securityHandler.getGid());
} }
setattrInternal(dfsClient, fileIdPath, setAttr3, false); setattrInternal(dfsClient, fileIdPath, setAttr3, false);
} }
@ -776,10 +843,10 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
@Override @Override
public MKDIR3Response mkdir(XDR xdr, RpcAuthSys authSys, InetAddress client) { public MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler,
InetAddress client) {
MKDIR3Response response = new MKDIR3Response(Nfs3Status.NFS3_OK); MKDIR3Response response = new MKDIR3Response(Nfs3Status.NFS3_OK);
String uname = authSysCheck(authSys); DFSClient dfsClient = clientCache.get(securityHandler.getUser());
DFSClient dfsClient = clientCache.get(uname);
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -834,7 +901,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
// Set group if it's not specified in the request. // Set group if it's not specified in the request.
if (!setAttr3.getUpdateFields().contains(SetAttrField.GID)) { if (!setAttr3.getUpdateFields().contains(SetAttrField.GID)) {
setAttr3.getUpdateFields().add(SetAttrField.GID); setAttr3.getUpdateFields().add(SetAttrField.GID);
setAttr3.setGid(authSys.getGid()); setAttr3.setGid(securityHandler.getGid());
} }
setattrInternal(dfsClient, fileIdPath, setAttr3, false); setattrInternal(dfsClient, fileIdPath, setAttr3, false);
@ -866,15 +933,16 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
} }
public READDIR3Response mknod(XDR xdr, RpcAuthSys authSys, InetAddress client) { public READDIR3Response mknod(XDR xdr,
SecurityHandler securityHandler, InetAddress client) {
return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP); return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
} }
@Override @Override
public REMOVE3Response remove(XDR xdr, RpcAuthSys authSys, InetAddress client) { public REMOVE3Response remove(XDR xdr,
SecurityHandler securityHandler, InetAddress client) {
REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK); REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK);
String uname = authSysCheck(authSys); DFSClient dfsClient = clientCache.get(securityHandler.getUser());
DFSClient dfsClient = clientCache.get(uname);
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -947,10 +1015,10 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
@Override @Override
public RMDIR3Response rmdir(XDR xdr, RpcAuthSys authSys, InetAddress client) { public RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler,
InetAddress client) {
RMDIR3Response response = new RMDIR3Response(Nfs3Status.NFS3_OK); RMDIR3Response response = new RMDIR3Response(Nfs3Status.NFS3_OK);
String uname = authSysCheck(authSys); DFSClient dfsClient = clientCache.get(securityHandler.getUser());
DFSClient dfsClient = clientCache.get(uname);
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -1030,10 +1098,10 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
@Override @Override
public RENAME3Response rename(XDR xdr, RpcAuthSys authSys, InetAddress client) { public RENAME3Response rename(XDR xdr,
SecurityHandler securityHandler, InetAddress client) {
RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK); RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK);
String uname = authSysCheck(authSys); DFSClient dfsClient = clientCache.get(securityHandler.getUser());
DFSClient dfsClient = clientCache.get(uname);
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -1118,18 +1186,72 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
@Override @Override
public SYMLINK3Response symlink(XDR xdr, RpcAuthSys authSys, public SYMLINK3Response symlink(XDR xdr, SecurityHandler securityHandler,
InetAddress client) { InetAddress client) {
return new SYMLINK3Response(Nfs3Status.NFS3ERR_NOTSUPP); SYMLINK3Response response = new SYMLINK3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
response.setStatus(Nfs3Status.NFS3ERR_ACCES);
return response;
} }
public READDIR3Response link(XDR xdr, RpcAuthSys authSys, InetAddress client) { DFSClient dfsClient = clientCache.get(securityHandler.getUser());
if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response;
}
SYMLINK3Request request = null;
try {
request = new SYMLINK3Request(xdr);
} catch (IOException e) {
LOG.error("Invalid SYMLINK request");
response.setStatus(Nfs3Status.NFS3ERR_INVAL);
return response;
}
FileHandle dirHandle = request.getHandle();
String name = request.getName();
String symData = request.getSymData();
String linkDirIdPath = Nfs3Utils.getFileIdPath(dirHandle);
// Don't do any name check to source path, just leave it to HDFS
String linkIdPath = linkDirIdPath + "/" + name;
if (LOG.isDebugEnabled()) {
LOG.debug("NFS SYMLINK, target: " + symData + " link: " + linkIdPath);
}
try {
WccData dirWcc = response.getDirWcc();
WccAttr preOpAttr = Nfs3Utils.getWccAttr(dfsClient, linkDirIdPath);
dirWcc.setPreOpAttr(preOpAttr);
dfsClient.createSymlink(symData, linkIdPath, false);
// Set symlink attr is considered as to change the attr of the target
// file. So no need to set symlink attr here after it's created.
HdfsFileStatus linkstat = dfsClient.getFileLinkInfo(linkIdPath);
Nfs3FileAttributes objAttr = Nfs3Utils.getNfs3FileAttrFromFileStatus(
linkstat, iug);
dirWcc
.setPostOpAttr(Nfs3Utils.getFileAttr(dfsClient, linkDirIdPath, iug));
return new SYMLINK3Response(Nfs3Status.NFS3_OK, new FileHandle(
objAttr.getFileid()), objAttr, dirWcc);
} catch (IOException e) {
LOG.warn("Exception:" + e);
response.setStatus(Nfs3Status.NFS3ERR_IO);
return response;
}
}
public READDIR3Response link(XDR xdr, SecurityHandler securityHandler, InetAddress client) {
return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP); return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
} }
@Override @Override
public READDIR3Response readdir(XDR xdr, RpcAuthSys authSys, public READDIR3Response readdir(XDR xdr,
InetAddress client) { SecurityHandler securityHandler, InetAddress client) {
READDIR3Response response = new READDIR3Response(Nfs3Status.NFS3_OK); READDIR3Response response = new READDIR3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@ -1137,8 +1259,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
String uname = authSysCheck(authSys); DFSClient dfsClient = clientCache.get(securityHandler.getUser());
DFSClient dfsClient = clientCache.get(uname);
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -1269,14 +1390,13 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
dirStatus.getModificationTime(), dirList); dirStatus.getModificationTime(), dirList);
} }
public READDIRPLUS3Response readdirplus(XDR xdr, RpcAuthSys authSys, public READDIRPLUS3Response readdirplus(XDR xdr,
InetAddress client) { SecurityHandler securityHandler, InetAddress client) {
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_ACCES); return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_ACCES);
} }
String uname = authSysCheck(authSys); DFSClient dfsClient = clientCache.get(securityHandler.getUser());
DFSClient dfsClient = clientCache.get(uname);
if (dfsClient == null) { if (dfsClient == null) {
return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_SERVERFAULT); return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_SERVERFAULT);
} }
@ -1420,7 +1540,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
@Override @Override
public FSSTAT3Response fsstat(XDR xdr, RpcAuthSys authSys, InetAddress client) { public FSSTAT3Response fsstat(XDR xdr,
SecurityHandler securityHandler, InetAddress client) {
FSSTAT3Response response = new FSSTAT3Response(Nfs3Status.NFS3_OK); FSSTAT3Response response = new FSSTAT3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@ -1428,8 +1549,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
String uname = authSysCheck(authSys); DFSClient dfsClient = clientCache.get(securityHandler.getUser());
DFSClient dfsClient = clientCache.get(uname);
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -1478,7 +1598,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
@Override @Override
public FSINFO3Response fsinfo(XDR xdr, RpcAuthSys authSys, InetAddress client) { public FSINFO3Response fsinfo(XDR xdr,
SecurityHandler securityHandler, InetAddress client) {
FSINFO3Response response = new FSINFO3Response(Nfs3Status.NFS3_OK); FSINFO3Response response = new FSINFO3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@ -1486,8 +1607,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
String uname = authSysCheck(authSys); DFSClient dfsClient = clientCache.get(securityHandler.getUser());
DFSClient dfsClient = clientCache.get(uname);
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -1530,8 +1650,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
@Override @Override
public PATHCONF3Response pathconf(XDR xdr, RpcAuthSys authSys, public PATHCONF3Response pathconf(XDR xdr,
InetAddress client) { SecurityHandler securityHandler, InetAddress client) {
PATHCONF3Response response = new PATHCONF3Response(Nfs3Status.NFS3_OK); PATHCONF3Response response = new PATHCONF3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@ -1539,8 +1659,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
String uname = authSysCheck(authSys); DFSClient dfsClient = clientCache.get(securityHandler.getUser());
DFSClient dfsClient = clientCache.get(uname);
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -1578,10 +1697,10 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
@Override @Override
public COMMIT3Response commit(XDR xdr, RpcAuthSys authSys, InetAddress client) { public COMMIT3Response commit(XDR xdr,
SecurityHandler securityHandler, InetAddress client) {
COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK); COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
String uname = authSysCheck(authSys); DFSClient dfsClient = clientCache.get(securityHandler.getUser());
DFSClient dfsClient = clientCache.get(uname);
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -1646,11 +1765,14 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
} }
private final static String UNKNOWN_USER = "nobody"; private SecurityHandler getSecurityHandler(Credentials credentials,
private final static String UNKNOWN_GROUP = "nobody"; Verifier verifier) {
if (credentials instanceof CredentialsSys) {
private String authSysCheck(RpcAuthSys authSys) { return new SysSecurityHandler((CredentialsSys) credentials, iug);
return iug.getUserName(authSys.getUid(), UNKNOWN_USER); } else {
// TODO: support GSS and handle other cases
return null;
}
} }
@Override @Override
@ -1658,67 +1780,71 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
InetAddress client, Channel channel) { InetAddress client, Channel channel) {
final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(rpcCall.getProcedure()); final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(rpcCall.getProcedure());
int xid = rpcCall.getXid(); int xid = rpcCall.getXid();
RpcAuthSys authSys = null;
Credentials credentials = rpcCall.getCredential();
// Ignore auth only for NFSPROC3_NULL, especially for Linux clients. // Ignore auth only for NFSPROC3_NULL, especially for Linux clients.
if (nfsproc3 != NFSPROC3.NULL) { if (nfsproc3 != NFSPROC3.NULL) {
if (rpcCall.getCredential().getFlavor() != AuthFlavor.AUTH_SYS) { if (rpcCall.getCredential().getFlavor() != AuthFlavor.AUTH_SYS
&& rpcCall.getCredential().getFlavor() != AuthFlavor.RPCSEC_GSS) {
LOG.info("Wrong RPC AUTH flavor, " LOG.info("Wrong RPC AUTH flavor, "
+ rpcCall.getCredential().getFlavor() + " is not AUTH_SYS."); + rpcCall.getCredential().getFlavor()
+ " is not AUTH_SYS or RPCSEC_GSS.");
XDR reply = new XDR(); XDR reply = new XDR();
reply = RpcDeniedReply.voidReply(reply, xid, reply = RpcDeniedReply.voidReply(reply, xid,
RpcReply.ReplyState.MSG_ACCEPTED, RpcReply.ReplyState.MSG_ACCEPTED,
RpcDeniedReply.RejectState.AUTH_ERROR); RpcDeniedReply.RejectState.AUTH_ERROR);
return reply; return reply;
} }
authSys = RpcAuthSys.from(rpcCall.getCredential().getBody());
} }
SecurityHandler securityHandler = getSecurityHandler(credentials,
rpcCall.getVerifier());
NFS3Response response = null; NFS3Response response = null;
if (nfsproc3 == NFSPROC3.NULL) { if (nfsproc3 == NFSPROC3.NULL) {
response = nullProcedure(); response = nullProcedure();
} else if (nfsproc3 == NFSPROC3.GETATTR) { } else if (nfsproc3 == NFSPROC3.GETATTR) {
response = getattr(xdr, authSys, client); response = getattr(xdr, securityHandler, client);
} else if (nfsproc3 == NFSPROC3.SETATTR) { } else if (nfsproc3 == NFSPROC3.SETATTR) {
response = setattr(xdr, authSys, client); response = setattr(xdr, securityHandler, client);
} else if (nfsproc3 == NFSPROC3.LOOKUP) { } else if (nfsproc3 == NFSPROC3.LOOKUP) {
response = lookup(xdr, authSys, client); response = lookup(xdr, securityHandler, client);
} else if (nfsproc3 == NFSPROC3.ACCESS) { } else if (nfsproc3 == NFSPROC3.ACCESS) {
response = access(xdr, authSys, client); response = access(xdr, securityHandler, client);
} else if (nfsproc3 == NFSPROC3.READLINK) { } else if (nfsproc3 == NFSPROC3.READLINK) {
response = readlink(xdr, authSys, client); response = readlink(xdr, securityHandler, client);
} else if (nfsproc3 == NFSPROC3.READ) { } else if (nfsproc3 == NFSPROC3.READ) {
response = read(xdr, authSys, client); response = read(xdr, securityHandler, client);
} else if (nfsproc3 == NFSPROC3.WRITE) { } else if (nfsproc3 == NFSPROC3.WRITE) {
response = write(xdr, channel, xid, authSys, client); response = write(xdr, channel, xid, securityHandler, client);
} else if (nfsproc3 == NFSPROC3.CREATE) { } else if (nfsproc3 == NFSPROC3.CREATE) {
response = create(xdr, authSys, client); response = create(xdr, securityHandler, client);
} else if (nfsproc3 == NFSPROC3.MKDIR) { } else if (nfsproc3 == NFSPROC3.MKDIR) {
response = mkdir(xdr, authSys, client); response = mkdir(xdr, securityHandler, client);
} else if (nfsproc3 == NFSPROC3.SYMLINK) { } else if (nfsproc3 == NFSPROC3.SYMLINK) {
response = symlink(xdr, authSys, client); response = symlink(xdr, securityHandler, client);
} else if (nfsproc3 == NFSPROC3.MKNOD) { } else if (nfsproc3 == NFSPROC3.MKNOD) {
response = mknod(xdr, authSys, client); response = mknod(xdr, securityHandler, client);
} else if (nfsproc3 == NFSPROC3.REMOVE) { } else if (nfsproc3 == NFSPROC3.REMOVE) {
response = remove(xdr, authSys, client); response = remove(xdr, securityHandler, client);
} else if (nfsproc3 == NFSPROC3.RMDIR) { } else if (nfsproc3 == NFSPROC3.RMDIR) {
response = rmdir(xdr, authSys, client); response = rmdir(xdr, securityHandler, client);
} else if (nfsproc3 == NFSPROC3.RENAME) { } else if (nfsproc3 == NFSPROC3.RENAME) {
response = rename(xdr, authSys, client); response = rename(xdr, securityHandler, client);
} else if (nfsproc3 == NFSPROC3.LINK) { } else if (nfsproc3 == NFSPROC3.LINK) {
response = link(xdr, authSys, client); response = link(xdr, securityHandler, client);
} else if (nfsproc3 == NFSPROC3.READDIR) { } else if (nfsproc3 == NFSPROC3.READDIR) {
response = readdir(xdr, authSys, client); response = readdir(xdr, securityHandler, client);
} else if (nfsproc3 == NFSPROC3.READDIRPLUS) { } else if (nfsproc3 == NFSPROC3.READDIRPLUS) {
response = readdirplus(xdr, authSys, client); response = readdirplus(xdr, securityHandler, client);
} else if (nfsproc3 == NFSPROC3.FSSTAT) { } else if (nfsproc3 == NFSPROC3.FSSTAT) {
response = fsstat(xdr, authSys, client); response = fsstat(xdr, securityHandler, client);
} else if (nfsproc3 == NFSPROC3.FSINFO) { } else if (nfsproc3 == NFSPROC3.FSINFO) {
response = fsinfo(xdr, authSys, client); response = fsinfo(xdr, securityHandler, client);
} else if (nfsproc3 == NFSPROC3.PATHCONF) { } else if (nfsproc3 == NFSPROC3.PATHCONF) {
response = pathconf(xdr, authSys, client); response = pathconf(xdr, securityHandler, client);
} else if (nfsproc3 == NFSPROC3.COMMIT) { } else if (nfsproc3 == NFSPROC3.COMMIT) {
response = commit(xdr, authSys, client); response = commit(xdr, securityHandler, client);
} else { } else {
// Invalid procedure // Invalid procedure
RpcAcceptedReply.voidReply(out, xid, RpcAcceptedReply.voidReply(out, xid,

View File

@ -42,7 +42,7 @@ public class TestMountd {
// Start minicluster // Start minicluster
Configuration config = new Configuration(); Configuration config = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(config).numDataNodes(1) MiniDFSCluster cluster = new MiniDFSCluster.Builder(config).numDataNodes(1)
.manageNameDfsDirs(false).build(); .build();
cluster.waitActive(); cluster.waitActive();
// Start nfs // Start nfs

View File

@ -272,6 +272,9 @@ Release 2.3.0 - UNRELEASED
HDFS-4879. Add "blocked ArrayList" collection to avoid CMS full GCs HDFS-4879. Add "blocked ArrayList" collection to avoid CMS full GCs
(Todd Lipcon via Colin Patrick McCabe) (Todd Lipcon via Colin Patrick McCabe)
HDFS-4096. Add snapshot information to namenode WebUI. (Haohui Mai via
jing9)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES
@ -330,6 +333,11 @@ Release 2.1.1-beta - UNRELEASED
HDFS-5118. Provide testing support for DFSClient to drop RPC responses. HDFS-5118. Provide testing support for DFSClient to drop RPC responses.
(jing9) (jing9)
HDFS-5085. Refactor o.a.h.nfs to support different types of
authentications. (jing9)
HDFS-5067 Support symlink operations in NFS gateway. (brandonli)
IMPROVEMENTS IMPROVEMENTS
HDFS-4513. Clarify in the WebHDFS REST API that all JSON respsonses may HDFS-4513. Clarify in the WebHDFS REST API that all JSON respsonses may
@ -364,6 +372,8 @@ Release 2.1.1-beta - UNRELEASED
HDFS-5150. Allow per NN SPN for internal SPNEGO. (kihwal) HDFS-5150. Allow per NN SPN for internal SPNEGO. (kihwal)
HDFS-4680. Audit logging of delegation tokens for MR tracing. (Andrew Wang)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES
@ -432,6 +442,9 @@ Release 2.1.1-beta - UNRELEASED
HDFS-5159. Secondary NameNode fails to checkpoint if error occurs HDFS-5159. Secondary NameNode fails to checkpoint if error occurs
downloading edits on first checkpoint. (atm) downloading edits on first checkpoint. (atm)
HDFS-5192. NameNode may fail to start when
dfs.client.test.drop.namenode.response.number is set. (jing9)
Release 2.1.0-beta - 2013-08-22 Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -484,14 +484,17 @@ public class DFSClient implements java.io.Closeable {
int numResponseToDrop = conf.getInt( int numResponseToDrop = conf.getInt(
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY, DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT); DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null;
if (numResponseToDrop > 0) { if (numResponseToDrop > 0) {
// This case is used for testing. // This case is used for testing.
LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
+ " is set to " + numResponseToDrop + " is set to " + numResponseToDrop
+ ", this hacked client will proactively drop responses"); + ", this hacked client will proactively drop responses");
NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = NameNodeProxies proxyInfo = NameNodeProxies.createProxyWithLossyRetryHandler(conf,
.createProxyWithLossyRetryHandler(conf, nameNodeUri, nameNodeUri, ClientProtocol.class, numResponseToDrop);
ClientProtocol.class, numResponseToDrop); }
if (proxyInfo != null) {
this.dtService = proxyInfo.getDelegationTokenService(); this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = proxyInfo.getProxy(); this.namenode = proxyInfo.getProxy();
} else if (rpcNamenode != null) { } else if (rpcNamenode != null) {
@ -502,9 +505,8 @@ public class DFSClient implements java.io.Closeable {
} else { } else {
Preconditions.checkArgument(nameNodeUri != null, Preconditions.checkArgument(nameNodeUri != null,
"null URI"); "null URI");
NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class); ClientProtocol.class);
this.dtService = proxyInfo.getDelegationTokenService(); this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = proxyInfo.getProxy(); this.namenode = proxyInfo.getProxy();
} }

View File

@ -267,6 +267,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_CLIENT_LOCAL_INTERFACES = "dfs.client.local.interfaces"; public static final String DFS_CLIENT_LOCAL_INTERFACES = "dfs.client.local.interfaces";
public static final String DFS_NAMENODE_AUDIT_LOGGERS_KEY = "dfs.namenode.audit.loggers"; public static final String DFS_NAMENODE_AUDIT_LOGGERS_KEY = "dfs.namenode.audit.loggers";
public static final String DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME = "default"; public static final String DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME = "default";
public static final String DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY = "dfs.namenode.audit.log.token.tracking.id";
public static final boolean DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT = false;
// Much code in hdfs is not yet updated to use these keys. // Much code in hdfs is not yet updated to use these keys.
public static final String DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY = "dfs.client.block.write.locateFollowingBlock.retries"; public static final String DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY = "dfs.client.block.write.locateFollowingBlock.retries";

View File

@ -158,8 +158,8 @@ public class NameNodeProxies {
* Generate a dummy namenode proxy instance that utilizes our hacked * Generate a dummy namenode proxy instance that utilizes our hacked
* {@link LossyRetryInvocationHandler}. Proxy instance generated using this * {@link LossyRetryInvocationHandler}. Proxy instance generated using this
* method will proactively drop RPC responses. Currently this method only * method will proactively drop RPC responses. Currently this method only
* support HA setup. IllegalStateException will be thrown if the given * support HA setup. null will be returned if the given configuration is not
* configuration is not for HA. * for HA.
* *
* @param config the configuration containing the required IPC * @param config the configuration containing the required IPC
* properties, client failover configurations, etc. * properties, client failover configurations, etc.
@ -168,7 +168,8 @@ public class NameNodeProxies {
* @param xface the IPC interface which should be created * @param xface the IPC interface which should be created
* @param numResponseToDrop The number of responses to drop for each RPC call * @param numResponseToDrop The number of responses to drop for each RPC call
* @return an object containing both the proxy and the associated * @return an object containing both the proxy and the associated
* delegation token service it corresponds to * delegation token service it corresponds to. Will return null of the
* given configuration does not support HA.
* @throws IOException if there is an error creating the proxy * @throws IOException if there is an error creating the proxy
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -204,8 +205,9 @@ public class NameNodeProxies {
Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri); Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri);
return new ProxyAndInfo<T>(proxy, dtService); return new ProxyAndInfo<T>(proxy, dtService);
} else { } else {
throw new IllegalStateException("Currently creating proxy using " + LOG.warn("Currently creating proxy using " +
"LossyRetryInvocationHandler requires NN HA setup"); "LossyRetryInvocationHandler requires NN HA setup");
return null;
} }
} }

View File

@ -58,6 +58,15 @@ public class DelegationTokenSecretManager
.getLog(DelegationTokenSecretManager.class); .getLog(DelegationTokenSecretManager.class);
private final FSNamesystem namesystem; private final FSNamesystem namesystem;
public DelegationTokenSecretManager(long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
long delegationTokenRemoverScanInterval, FSNamesystem namesystem) {
this(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval, false,
namesystem);
}
/** /**
* Create a secret manager * Create a secret manager
* @param delegationKeyUpdateInterval the number of seconds for rolling new * @param delegationKeyUpdateInterval the number of seconds for rolling new
@ -67,13 +76,16 @@ public class DelegationTokenSecretManager
* @param delegationTokenRenewInterval how often the tokens must be renewed * @param delegationTokenRenewInterval how often the tokens must be renewed
* @param delegationTokenRemoverScanInterval how often the tokens are scanned * @param delegationTokenRemoverScanInterval how often the tokens are scanned
* for expired tokens * for expired tokens
* @param storeTokenTrackingId whether to store the token's tracking id
*/ */
public DelegationTokenSecretManager(long delegationKeyUpdateInterval, public DelegationTokenSecretManager(long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime, long delegationTokenRenewInterval, long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
long delegationTokenRemoverScanInterval, FSNamesystem namesystem) { long delegationTokenRemoverScanInterval, boolean storeTokenTrackingId,
FSNamesystem namesystem) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval); delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
this.namesystem = namesystem; this.namesystem = namesystem;
this.storeTokenTrackingId = storeTokenTrackingId;
} }
@Override //SecretManager @Override //SecretManager
@ -184,7 +196,7 @@ public class DelegationTokenSecretManager
} }
if (currentTokens.get(identifier) == null) { if (currentTokens.get(identifier) == null) {
currentTokens.put(identifier, new DelegationTokenInformation(expiryTime, currentTokens.put(identifier, new DelegationTokenInformation(expiryTime,
password)); password, getTrackingIdIfEnabled(identifier)));
} else { } else {
throw new IOException( throw new IOException(
"Same delegation token being added twice; invalid entry in fsimage or editlogs"); "Same delegation token being added twice; invalid entry in fsimage or editlogs");
@ -223,7 +235,7 @@ public class DelegationTokenSecretManager
byte[] password = createPassword(identifier.getBytes(), allKeys byte[] password = createPassword(identifier.getBytes(), allKeys
.get(keyId).getKey()); .get(keyId).getKey());
currentTokens.put(identifier, new DelegationTokenInformation(expiryTime, currentTokens.put(identifier, new DelegationTokenInformation(expiryTime,
password)); password, getTrackingIdIfEnabled(identifier)));
} }
} }

View File

@ -36,6 +36,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KE
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY;
@ -227,6 +229,8 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
@ -318,10 +322,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
stat.getGroup(), symlink, path); stat.getGroup(), symlink, path);
} }
for (AuditLogger logger : auditLoggers) { for (AuditLogger logger : auditLoggers) {
if (logger instanceof HdfsAuditLogger) {
HdfsAuditLogger hdfsLogger = (HdfsAuditLogger) logger;
hdfsLogger.logAuditEvent(succeeded, ugi.toString(), addr, cmd, src, dst,
status, ugi, dtSecretManager);
} else {
logger.logAuditEvent(succeeded, ugi.toString(), addr, logger.logAuditEvent(succeeded, ugi.toString(), addr,
cmd, src, dst, status); cmd, src, dst, status);
} }
} }
}
/** /**
* Logger for audit events, noting successful FSNamesystem operations. Emits * Logger for audit events, noting successful FSNamesystem operations. Emits
@ -4209,6 +4219,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return this.snapshotManager.getNumSnapshots(); return this.snapshotManager.getNumSnapshots();
} }
@Override
public String getSnapshotStats() {
Map<String, Object> info = new HashMap<String, Object>();
info.put("SnapshottableDirectories", this.getNumSnapshottableDirs());
info.put("Snapshots", this.getNumSnapshots());
return JSON.toString(info);
}
int getNumberOfDatanodes(DatanodeReportType type) { int getNumberOfDatanodes(DatanodeReportType type) {
readLock(); readLock();
try { try {
@ -5921,7 +5940,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT), DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT),
conf.getLong(DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, conf.getLong(DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT), DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT),
DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL, this); DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL,
conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY,
DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT),
this);
} }
/** /**
@ -6832,17 +6854,22 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* defined in the config file. It can also be explicitly listed in the * defined in the config file. It can also be explicitly listed in the
* config file. * config file.
*/ */
private static class DefaultAuditLogger implements AuditLogger { private static class DefaultAuditLogger extends HdfsAuditLogger {
private boolean logTokenTrackingId;
@Override @Override
public void initialize(Configuration conf) { public void initialize(Configuration conf) {
// Nothing to do. logTokenTrackingId = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY,
DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT);
} }
@Override @Override
public void logAuditEvent(boolean succeeded, String userName, public void logAuditEvent(boolean succeeded, String userName,
InetAddress addr, String cmd, String src, String dst, InetAddress addr, String cmd, String src, String dst,
FileStatus status) { FileStatus status, UserGroupInformation ugi,
DelegationTokenSecretManager dtSecretManager) {
if (auditLog.isInfoEnabled()) { if (auditLog.isInfoEnabled()) {
final StringBuilder sb = auditBuffer.get(); final StringBuilder sb = auditBuffer.get();
sb.setLength(0); sb.setLength(0);
@ -6860,6 +6887,22 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
sb.append(status.getGroup()).append(":"); sb.append(status.getGroup()).append(":");
sb.append(status.getPermission()); sb.append(status.getPermission());
} }
if (logTokenTrackingId) {
sb.append("\t").append("trackingId=");
String trackingId = null;
if (ugi != null && dtSecretManager != null
&& ugi.getAuthenticationMethod() == AuthenticationMethod.TOKEN) {
for (TokenIdentifier tid: ugi.getTokenIdentifiers()) {
if (tid instanceof DelegationTokenIdentifier) {
DelegationTokenIdentifier dtid =
(DelegationTokenIdentifier)tid;
trackingId = dtSecretManager.getTokenTrackingId(dtid);
break;
}
}
}
sb.append(trackingId);
}
auditLog.info(sb); auditLog.info(sb);
} }
} }

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.net.InetAddress;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.security.UserGroupInformation;
/**
* Extension of {@link AuditLogger}.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class HdfsAuditLogger implements AuditLogger {
@Override
public void logAuditEvent(boolean succeeded, String userName,
InetAddress addr, String cmd, String src, String dst,
FileStatus status) {
logAuditEvent(succeeded, userName, addr, cmd, src, dst, status, null,
null);
}
/**
* Same as
* {@link #logAuditEvent(boolean, String, InetAddress, String, String, String, FileStatus)}
* with additional parameters related to logging delegation token tracking
* IDs.
*
* @param succeeded Whether authorization succeeded.
* @param userName Name of the user executing the request.
* @param addr Remote address of the request.
* @param cmd The requested command.
* @param src Path of affected source file.
* @param dst Path of affected destination file (if any).
* @param stat File information for operations that change the file's metadata
* (permissions, owner, times, etc).
* @param ugi UserGroupInformation of the current user, or null if not logging
* token tracking information
* @param dtSecretManager The token secret manager, or null if not logging
* token tracking information
*/
public abstract void logAuditEvent(boolean succeeded, String userName,
InetAddress addr, String cmd, String src, String dst,
FileStatus stat, UserGroupInformation ugi,
DelegationTokenSecretManager dtSecretManager);
}

View File

@ -204,6 +204,17 @@ class NamenodeJspHelper {
return ""; return "";
} }
static void generateSnapshotReport(JspWriter out, FSNamesystem fsn)
throws IOException {
out.println("<div id=\"snapshotstats\"><div class=\"dfstable\">"
+ "<table class=\"storage\" title=\"Snapshot Summary\">\n"
+ "<thead><tr><td><b>Snapshottable directories</b></td>"
+ "<td><b>Snapshotted directories</b></td></tr></thead>");
out.println(String.format("<td>%d</td><td>%d</td>", fsn.getNumSnapshottableDirs(), fsn.getNumSnapshots()));
out.println("</table></div></div>");
}
static class HealthJsp { static class HealthJsp {
private int rowNum = 0; private int rowNum = 0;
private int colNum = 0; private int colNum = 0;

View File

@ -130,4 +130,9 @@ public interface FSNamesystemMBean {
* @return number of decommissioned dead data nodes * @return number of decommissioned dead data nodes
*/ */
public int getNumDecomDeadDataNodes(); public int getNumDecomDeadDataNodes();
/**
* The statistics of snapshots
*/
public String getSnapshotStats();
} }

View File

@ -73,7 +73,10 @@
<% healthjsp.generateJournalReport(out, nn, request); %> <% healthjsp.generateJournalReport(out, nn, request); %>
<hr/> <hr/>
<% healthjsp.generateConfReport(out, nn, request); %> <% healthjsp.generateConfReport(out, nn, request); %>
<hr> <hr/>
<h3>Snapshot Summary</h3>
<% NamenodeJspHelper.generateSnapshotReport(out, fsn); %>
<hr/>
<h3>Startup Progress</h3> <h3>Startup Progress</h3>
<% healthjsp.generateStartupProgress(out, nn.getStartupProgress()); %> <% healthjsp.generateStartupProgress(out, nn.getStartupProgress()); %>
<% <%

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertTrue;
import java.lang.management.ManagementFactory;
import java.util.Map;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.Test;
import org.mortbay.util.ajax.JSON;
/**
* Class for testing {@link NameNodeMXBean} implementation
*/
public class TestFSNamesystemMBean {
@Test
public void test() throws Exception {
Configuration conf = new Configuration();
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
FSNamesystem fsn = cluster.getNameNode().namesystem;
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName mxbeanName = new ObjectName(
"Hadoop:service=NameNode,name=FSNamesystemState");
String snapshotStats = (String) (mbs.getAttribute(mxbeanName,
"SnapshotStats"));
@SuppressWarnings("unchecked")
Map<String, Object> stat = (Map<String, Object>) JSON
.parse(snapshotStats);
assertTrue(stat.containsKey("SnapshottableDirectories")
&& (Long) stat.get("SnapshottableDirectories") == fsn
.getNumSnapshottableDirs());
assertTrue(stat.containsKey("Snapshots")
&& (Long) stat.get("Snapshots") == fsn.getNumSnapshots());
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.junit.Test;
/**
* This test makes sure that when
* {@link DFSConfigKeys#DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY} is set,
* DFSClient instances can still be created within NN/DN (e.g., the fs instance
* used by the trash emptier thread in NN)
*/
public class TestLossyRetryInvocationHandler {
@Test
public void testStartNNWithTrashEmptier() throws Exception {
MiniDFSCluster cluster = null;
Configuration conf = new HdfsConfiguration();
// enable both trash emptier and dropping response
conf.setLong("fs.trash.interval", 360);
conf.setInt(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY, 2);
try {
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(0)
.build();
cluster.waitActive();
cluster.transitionToActive(0);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -186,6 +186,10 @@ Release 2.1.1-beta - UNRELEASED
MAPREDUCE-5478. TeraInputFormat unnecessarily defines its own FileSplit MAPREDUCE-5478. TeraInputFormat unnecessarily defines its own FileSplit
subclass (Sandy Ryza) subclass (Sandy Ryza)
MAPREDUCE-5497. Changed MRAppMaster to sleep only after doing everything else
but just before ClientService to avoid race conditions during RM restart.
(Jian He via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-5446. TestJobHistoryEvents and TestJobHistoryParsing have race MAPREDUCE-5446. TestJobHistoryEvents and TestJobHistoryParsing have race
@ -256,6 +260,11 @@ Release 2.1.1-beta - UNRELEASED
MAPREDUCE-5414. TestTaskAttempt fails in JDK7 with NPE (Nemon Lou via MAPREDUCE-5414. TestTaskAttempt fails in JDK7 with NPE (Nemon Lou via
devaraj) devaraj)
MAPREDUCE-5020. Compile failure with JDK8 (Trevor Robinson via tgraves)
MAPREDUCE-5164. mapred job and queue commands omit HADOOP_CLIENT_OPTS
(Nemon Lou via devaraj)
Release 2.1.0-beta - 2013-08-22 Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -61,8 +61,10 @@ esac
if [ "$COMMAND" = "job" ] ; then if [ "$COMMAND" = "job" ] ; then
CLASS=org.apache.hadoop.mapred.JobClient CLASS=org.apache.hadoop.mapred.JobClient
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "queue" ] ; then elif [ "$COMMAND" = "queue" ] ; then
CLASS=org.apache.hadoop.mapred.JobQueueClient CLASS=org.apache.hadoop.mapred.JobQueueClient
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "pipes" ] ; then elif [ "$COMMAND" = "pipes" ] ; then
CLASS=org.apache.hadoop.mapred.pipes.Submitter CLASS=org.apache.hadoop.mapred.pipes.Submitter
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS" HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"

View File

@ -103,10 +103,12 @@ goto :eof
:job :job
set CLASS=org.apache.hadoop.mapred.JobClient set CLASS=org.apache.hadoop.mapred.JobClient
set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_CLIENT_OPTS%
goto :eof goto :eof
:queue :queue
set CLASS=org.apache.hadoop.mapred.JobQueueClient set CLASS=org.apache.hadoop.mapred.JobQueueClient
set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_CLIENT_OPTS%
goto :eof goto :eof
:sampler :sampler

View File

@ -362,7 +362,10 @@ public class MRAppMaster extends CompositeService {
//service to handle requests from JobClient //service to handle requests from JobClient
clientService = createClientService(context); clientService = createClientService(context);
addIfService(clientService); // Init ClientService separately so that we stop it separately, since this
// service needs to wait some time before it stops so clients can know the
// final states
clientService.init(conf);
containerAllocator = createContainerAllocator(clientService, context); containerAllocator = createContainerAllocator(clientService, context);
@ -425,7 +428,6 @@ public class MRAppMaster extends CompositeService {
// queued inside the JobHistoryEventHandler // queued inside the JobHistoryEventHandler
addIfService(historyService); addIfService(historyService);
} }
super.serviceInit(conf); super.serviceInit(conf);
} // end of init() } // end of init()
@ -534,14 +536,6 @@ public class MRAppMaster extends CompositeService {
} }
} }
// TODO:currently just wait for some time so clients can know the
// final states. Will be removed once RM come on.
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try { try {
//if isLastAMRetry comes as true, should never set it to false //if isLastAMRetry comes as true, should never set it to false
if ( !isLastAMRetry){ if ( !isLastAMRetry){
@ -556,6 +550,14 @@ public class MRAppMaster extends CompositeService {
LOG.info("Calling stop for all the services"); LOG.info("Calling stop for all the services");
MRAppMaster.this.stop(); MRAppMaster.this.stop();
// TODO: Stop ClientService last, since only ClientService should wait for
// some time so clients can know the final states. Will be removed once RM come on.
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
clientService.stop();
} catch (Throwable t) { } catch (Throwable t) {
LOG.warn("Graceful stop failed ", t); LOG.warn("Graceful stop failed ", t);
} }
@ -1019,8 +1021,10 @@ public class MRAppMaster extends CompositeService {
LOG.info("MRAppMaster launching normal, non-uberized, multi-container " LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
+ "job " + job.getID() + "."); + "job " + job.getID() + ".");
} }
// Start ClientService here, since it's not initialized if
// errorHappenedShutDown is true
clientService.start();
} }
//start all the components //start all the components
super.serviceStart(); super.serviceStart();

View File

@ -1,28 +1,30 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file * regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.mapreduce.v2.app.client; package org.apache.hadoop.mapreduce.v2.app.client;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
public interface ClientService { import org.apache.hadoop.service.Service;
InetSocketAddress getBindAddress(); public interface ClientService extends Service {
int getHttpPort(); public abstract InetSocketAddress getBindAddress();
public abstract int getHttpPort();
} }

View File

@ -94,8 +94,7 @@ import org.apache.hadoop.yarn.webapp.WebApps;
* jobclient (user facing). * jobclient (user facing).
* *
*/ */
public class MRClientService extends AbstractService public class MRClientService extends AbstractService implements ClientService {
implements ClientService {
static final Log LOG = LogFactory.getLog(MRClientService.class); static final Log LOG = LogFactory.getLog(MRClientService.class);
@ -106,7 +105,7 @@ public class MRClientService extends AbstractService
private AppContext appContext; private AppContext appContext;
public MRClientService(AppContext appContext) { public MRClientService(AppContext appContext) {
super("MRClientService"); super(MRClientService.class.getName());
this.appContext = appContext; this.appContext = appContext;
this.protocolHandler = new MRClientProtocolHandler(); this.protocolHandler = new MRClientProtocolHandler();
} }

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent; import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler; import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
@ -603,7 +604,7 @@ public class MRApp extends MRAppMaster {
@Override @Override
protected ClientService createClientService(AppContext context) { protected ClientService createClientService(AppContext context) {
return new ClientService(){ return new MRClientService(context) {
@Override @Override
public InetSocketAddress getBindAddress() { public InetSocketAddress getBindAddress() {
return NetUtils.createSocketAddr("localhost:9876"); return NetUtils.createSocketAddr("localhost:9876");

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app;
import java.io.IOException;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.junit.Test;
public class TestMRAppComponentDependencies {
@Test(timeout = 20000)
public void testComponentStopOrder() throws Exception {
@SuppressWarnings("resource")
TestMRApp app = new TestMRApp(1, 1, true, this.getClass().getName(), true);
JobImpl job = (JobImpl) app.submit(new Configuration());
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
int waitTime = 20 * 1000;
while (waitTime > 0 && app.numStops < 2) {
Thread.sleep(100);
waitTime -= 100;
}
// assert JobHistoryEventHandlerStopped and then clientServiceStopped
Assert.assertEquals(1, app.JobHistoryEventHandlerStopped);
Assert.assertEquals(2, app.clientServiceStopped);
}
private final class TestMRApp extends MRApp {
int JobHistoryEventHandlerStopped;
int clientServiceStopped;
int numStops;
public TestMRApp(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart);
JobHistoryEventHandlerStopped = 0;
clientServiceStopped = 0;
numStops = 0;
}
@Override
protected Job createJob(Configuration conf, JobStateInternal forcedState,
String diagnostic) {
UserGroupInformation currentUser = null;
try {
currentUser = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
Job newJob =
new TestJob(getJobId(), getAttemptID(), conf, getDispatcher()
.getEventHandler(), getTaskAttemptListener(), getContext()
.getClock(), getCommitter(), isNewApiCommitter(),
currentUser.getUserName(), getContext(), forcedState, diagnostic);
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
getDispatcher().register(JobFinishEvent.Type.class,
createJobFinishEventHandler());
return newJob;
}
@Override
protected ClientService createClientService(AppContext context) {
return new MRClientService(context) {
@Override
public void serviceStop() throws Exception {
numStops++;
clientServiceStopped = numStops;
super.serviceStop();
}
};
}
@Override
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
return new JobHistoryEventHandler(context, getStartCount()) {
@Override
public void serviceStop() throws Exception {
numStops++;
JobHistoryEventHandlerStopped = numStops;
super.serviceStop();
}
};
}
}
}

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
@ -284,14 +285,12 @@ import org.junit.Test;
private final class MRAppTestCleanup extends MRApp { private final class MRAppTestCleanup extends MRApp {
int stagingDirCleanedup; int stagingDirCleanedup;
int ContainerAllocatorStopped; int ContainerAllocatorStopped;
int JobHistoryEventHandlerStopped;
int numStops; int numStops;
public MRAppTestCleanup(int maps, int reduces, boolean autoComplete, public MRAppTestCleanup(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart) { String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart); super(maps, reduces, autoComplete, testName, cleanOnStart);
stagingDirCleanedup = 0; stagingDirCleanedup = 0;
ContainerAllocatorStopped = 0; ContainerAllocatorStopped = 0;
JobHistoryEventHandlerStopped = 0;
numStops = 0; numStops = 0;
} }
@ -318,26 +317,6 @@ import org.junit.Test;
return newJob; return newJob;
} }
@Override
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
return new TestJobHistoryEventHandler(context, getStartCount());
}
private class TestJobHistoryEventHandler extends JobHistoryEventHandler {
public TestJobHistoryEventHandler(AppContext context, int startCount) {
super(context, startCount);
}
@Override
public void serviceStop() throws Exception {
numStops++;
JobHistoryEventHandlerStopped = numStops;
super.serviceStop();
}
}
@Override @Override
protected ContainerAllocator createContainerAllocator( protected ContainerAllocator createContainerAllocator(
ClientService clientService, AppContext context) { ClientService clientService, AppContext context) {
@ -405,15 +384,13 @@ import org.junit.Test;
app.verifyCompleted(); app.verifyCompleted();
int waitTime = 20 * 1000; int waitTime = 20 * 1000;
while (waitTime > 0 && app.numStops < 3 ) { while (waitTime > 0 && app.numStops < 2) {
Thread.sleep(100); Thread.sleep(100);
waitTime -= 100; waitTime -= 100;
} }
// assert JobHistoryEventHandlerStopped first, then // assert ContainerAllocatorStopped and then tagingDirCleanedup
// ContainerAllocatorStopped, and then stagingDirCleanedup Assert.assertEquals(1, app.ContainerAllocatorStopped);
Assert.assertEquals(1, app.JobHistoryEventHandlerStopped); Assert.assertEquals(2, app.stagingDirCleanedup);
Assert.assertEquals(2, app.ContainerAllocatorStopped);
Assert.assertEquals(3, app.stagingDirCleanedup);
} }
} }

View File

@ -317,7 +317,7 @@ public class InputSampler<K,V> extends Configured implements Tool {
final InputFormat inf = final InputFormat inf =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf); ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
int numPartitions = job.getNumReduceTasks(); int numPartitions = job.getNumReduceTasks();
K[] samples = sampler.getSample(inf, job); K[] samples = (K[])sampler.getSample(inf, job);
LOG.info("Using " + samples.length + " samples"); LOG.info("Using " + samples.length + " samples");
RawComparator<K> comparator = RawComparator<K> comparator =
(RawComparator<K>) job.getSortComparator(); (RawComparator<K>) job.getSortComparator();

View File

@ -182,6 +182,18 @@ Release 2.1.1-beta - UNRELEASED
data structures thread safe to avoid RM crashing with data structures thread safe to avoid RM crashing with
ArrayIndexOutOfBoundsException. (Zhijie Shen via vinodkv) ArrayIndexOutOfBoundsException. (Zhijie Shen via vinodkv)
YARN-1025. ResourceManager and NodeManager do not load native libraries on
Windows. (cnauroth)
YARN-1176. RM web services ClusterMetricsInfo total nodes doesn't include
unhealthy nodes (Jonathan Eagles via tgraves)
YARN-1078. TestNodeManagerResync, TestNodeManagerShutdown, and
TestNodeStatusUpdater fail on Windows. (Chuan Liu via cnauroth)
YARN-1194. TestContainerLogsPage fails with native builds (Roman Shaposhnik
via jlowe)
Release 2.1.0-beta - 2013-08-22 Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
@ -1294,6 +1306,9 @@ Release 0.23.10 - UNRELEASED
YARN-985. Nodemanager should log where a resource was localized (Ravi YARN-985. Nodemanager should log where a resource was localized (Ravi
Prakash via jeagles) Prakash via jeagles)
YARN-1119. Add ClusterMetrics checks to tho TestRMNodeTransitions tests
(Mit Desai via jeagles)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES
@ -1303,6 +1318,9 @@ Release 0.23.10 - UNRELEASED
YARN-1101. Active nodes can be decremented below 0 (Robert Parker YARN-1101. Active nodes can be decremented below 0 (Robert Parker
via tgraves) via tgraves)
YARN-1176. RM web services ClusterMetricsInfo total nodes doesn't include
unhealthy nodes (Jonathan Eagles via tgraves)
Release 0.23.9 - 2013-07-08 Release 0.23.9 - 2013-07-08
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -135,6 +135,10 @@ if "%1" == "--config" (
call :%yarn-command% %yarn-command-arguments% call :%yarn-command% %yarn-command-arguments%
if defined JAVA_LIBRARY_PATH (
set YARN_OPTS=%YARN_OPTS% -Djava.library.path=%JAVA_LIBRARY_PATH%
)
set java_arguments=%JAVA_HEAP_MAX% %YARN_OPTS% -classpath %CLASSPATH% %CLASS% %yarn-command-arguments% set java_arguments=%JAVA_HEAP_MAX% %YARN_OPTS% -classpath %CLASSPATH% %CLASS% %yarn-command-arguments%
call %JAVA% %java_arguments% call %JAVA% %java_arguments%

View File

@ -23,6 +23,7 @@ import java.io.File;
import java.io.FileReader; import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.ArrayList; import java.util.ArrayList;
@ -163,7 +164,8 @@ public class TestNodeManagerShutdown {
ContainerLaunchContext containerLaunchContext = ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class); recordFactory.newRecordInstance(ContainerLaunchContext.class);
NodeId nodeId = BuilderUtils.newNodeId("localhost", 12345); NodeId nodeId = BuilderUtils.newNodeId(InetAddress.getByName("localhost")
.getCanonicalHostName(), 12345);
URL localResourceUri = URL localResourceUri =
ConverterUtils.getYarnUrlFromPath(localFS ConverterUtils.getYarnUrlFromPath(localFS

View File

@ -23,7 +23,9 @@ import static org.mockito.Mockito.when;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -219,11 +221,11 @@ public class TestNodeStatusUpdater {
Resource resource = BuilderUtils.newResource(2, 1); Resource resource = BuilderUtils.newResource(2, 1);
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
String user = "testUser"; String user = "testUser";
ContainerTokenIdentifier containerToken = ContainerTokenIdentifier containerToken = BuilderUtils
BuilderUtils.newContainerTokenIdentifier(BuilderUtils .newContainerTokenIdentifier(BuilderUtils.newContainerToken(
.newContainerToken(firstContainerID, "localhost", 1234, user, firstContainerID, InetAddress.getByName("localhost")
resource, currentTime + 10000, 123, "password".getBytes(), .getCanonicalHostName(), 1234, user, resource,
currentTime)); currentTime + 10000, 123, "password".getBytes(), currentTime));
Container container = Container container =
new ContainerImpl(conf, mockDispatcher, launchContext, null, new ContainerImpl(conf, mockDispatcher, launchContext, null,
mockMetrics, containerToken); mockMetrics, containerToken);
@ -250,11 +252,11 @@ public class TestNodeStatusUpdater {
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
String user = "testUser"; String user = "testUser";
Resource resource = BuilderUtils.newResource(3, 1); Resource resource = BuilderUtils.newResource(3, 1);
ContainerTokenIdentifier containerToken = ContainerTokenIdentifier containerToken = BuilderUtils
BuilderUtils.newContainerTokenIdentifier(BuilderUtils .newContainerTokenIdentifier(BuilderUtils.newContainerToken(
.newContainerToken(secondContainerID, "localhost", 1234, user, secondContainerID, InetAddress.getByName("localhost")
resource, currentTime + 10000, 123, .getCanonicalHostName(), 1234, user, resource,
"password".getBytes(), currentTime)); currentTime + 10000, 123, "password".getBytes(), currentTime));
Container container = Container container =
new ContainerImpl(conf, mockDispatcher, launchContext, null, new ContainerImpl(conf, mockDispatcher, launchContext, null,
mockMetrics, containerToken); mockMetrics, containerToken);
@ -1290,9 +1292,15 @@ public class TestNodeStatusUpdater {
private YarnConfiguration createNMConfig() { private YarnConfiguration createNMConfig() {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
String localhostAddress = null;
try {
localhostAddress = InetAddress.getByName("localhost").getCanonicalHostName();
} catch (UnknownHostException e) {
Assert.fail("Unable to get localhost address: " + e.getMessage());
}
conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB
conf.set(YarnConfiguration.NM_ADDRESS, "localhost:12345"); conf.set(YarnConfiguration.NM_ADDRESS, localhostAddress + ":12345");
conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "localhost:12346"); conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, localhostAddress + ":12346");
conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath()); conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
remoteLogsDir.getAbsolutePath()); remoteLogsDir.getAbsolutePath());

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -50,6 +51,7 @@ import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService; import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.webapp.ContainerLogsPage.ContainersLogsBlock; import org.apache.hadoop.yarn.server.nodemanager.webapp.ContainerLogsPage.ContainersLogsBlock;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.webapp.YarnWebParams; import org.apache.hadoop.yarn.webapp.YarnWebParams;
@ -151,8 +153,15 @@ public class TestContainerLogsPage {
new ConcurrentHashMap<ApplicationId, Application>(); new ConcurrentHashMap<ApplicationId, Application>();
appMap.put(appId, app); appMap.put(appId, app);
when(context.getApplications()).thenReturn(appMap); when(context.getApplications()).thenReturn(appMap);
when(context.getContainers()).thenReturn( ConcurrentHashMap<ContainerId, Container> containers =
new ConcurrentHashMap<ContainerId, Container>()); new ConcurrentHashMap<ContainerId, Container>();
when(context.getContainers()).thenReturn(containers);
when(context.getLocalDirsHandler()).thenReturn(dirsHandler);
MockContainer container = new MockContainer(appAttemptId,
new AsyncDispatcher(), conf, user, appId, 1);
container.setState(ContainerState.RUNNING);
context.getContainers().put(container1, container);
ContainersLogsBlock cLogsBlock = ContainersLogsBlock cLogsBlock =
new ContainersLogsBlock(context); new ContainersLogsBlock(context);

View File

@ -84,7 +84,7 @@ public class ClusterMetricsInfo {
this.decommissionedNodes = clusterMetrics.getNumDecommisionedNMs(); this.decommissionedNodes = clusterMetrics.getNumDecommisionedNMs();
this.rebootedNodes = clusterMetrics.getNumRebootedNMs(); this.rebootedNodes = clusterMetrics.getNumRebootedNMs();
this.totalNodes = activeNodes + lostNodes + decommissionedNodes this.totalNodes = activeNodes + lostNodes + decommissionedNodes
+ rebootedNodes; + rebootedNodes + unhealthyNodes;
} }
public int getAppsSubmitted() { public int getAppsSubmitted() {

View File

@ -260,7 +260,21 @@ public class TestRMNodeTransitions {
@Test @Test
public void testRunningExpire() { public void testRunningExpire() {
RMNodeImpl node = getRunningNode(); RMNodeImpl node = getRunningNode();
ClusterMetrics cm = ClusterMetrics.getMetrics();
int initialActive = cm.getNumActiveNMs();
int initialLost = cm.getNumLostNMs();
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.EXPIRE)); node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.EXPIRE));
Assert.assertEquals("Active Nodes", initialActive - 1, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost + 1, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes",
initialUnhealthy, cm.getUnhealthyNMs());
Assert.assertEquals("Decommissioned Nodes",
initialDecommissioned, cm.getNumDecommisionedNMs());
Assert.assertEquals("Rebooted Nodes",
initialRebooted, cm.getNumRebootedNMs());
Assert.assertEquals(NodeState.LOST, node.getState()); Assert.assertEquals(NodeState.LOST, node.getState());
} }
@ -297,8 +311,22 @@ public class TestRMNodeTransitions {
@Test @Test
public void testRunningDecommission() { public void testRunningDecommission() {
RMNodeImpl node = getRunningNode(); RMNodeImpl node = getRunningNode();
ClusterMetrics cm = ClusterMetrics.getMetrics();
int initialActive = cm.getNumActiveNMs();
int initialLost = cm.getNumLostNMs();
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
node.handle(new RMNodeEvent(node.getNodeID(), node.handle(new RMNodeEvent(node.getNodeID(),
RMNodeEventType.DECOMMISSION)); RMNodeEventType.DECOMMISSION));
Assert.assertEquals("Active Nodes", initialActive - 1, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes",
initialUnhealthy, cm.getUnhealthyNMs());
Assert.assertEquals("Decommissioned Nodes",
initialDecommissioned + 1, cm.getNumDecommisionedNMs());
Assert.assertEquals("Rebooted Nodes",
initialRebooted, cm.getNumRebootedNMs());
Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState()); Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
} }
@ -327,8 +355,22 @@ public class TestRMNodeTransitions {
@Test @Test
public void testRunningRebooting() { public void testRunningRebooting() {
RMNodeImpl node = getRunningNode(); RMNodeImpl node = getRunningNode();
ClusterMetrics cm = ClusterMetrics.getMetrics();
int initialActive = cm.getNumActiveNMs();
int initialLost = cm.getNumLostNMs();
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
node.handle(new RMNodeEvent(node.getNodeID(), node.handle(new RMNodeEvent(node.getNodeID(),
RMNodeEventType.REBOOTING)); RMNodeEventType.REBOOTING));
Assert.assertEquals("Active Nodes", initialActive - 1, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes",
initialUnhealthy, cm.getUnhealthyNMs());
Assert.assertEquals("Decommissioned Nodes",
initialDecommissioned, cm.getNumDecommisionedNMs());
Assert.assertEquals("Rebooted Nodes",
initialRebooted + 1, cm.getNumRebootedNMs());
Assert.assertEquals(NodeState.REBOOTED, node.getState()); Assert.assertEquals(NodeState.REBOOTED, node.getState());
} }

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.webapp.WebServicesTestUtils;
import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.w3c.dom.Document; import org.w3c.dom.Document;
import org.w3c.dom.Element; import org.w3c.dom.Element;
@ -109,6 +110,16 @@ public class TestRMWebServices extends JerseyTest {
.contextPath("jersey-guice-filter").servletPath("/").build()); .contextPath("jersey-guice-filter").servletPath("/").build());
} }
@BeforeClass
public static void initClusterMetrics() {
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
clusterMetrics.incrDecommisionedNMs();
clusterMetrics.incrNumActiveNodes();
clusterMetrics.incrNumLostNMs();
clusterMetrics.incrNumRebootedNMs();
clusterMetrics.incrNumUnhealthyNMs();
}
@Test @Test
public void testInfoXML() throws JSONException, Exception { public void testInfoXML() throws JSONException, Exception {
WebResource r = resource(); WebResource r = resource();
@ -426,7 +437,8 @@ public class TestRMWebServices extends JerseyTest {
"totalNodes doesn't match", "totalNodes doesn't match",
clusterMetrics.getNumActiveNMs() + clusterMetrics.getNumLostNMs() clusterMetrics.getNumActiveNMs() + clusterMetrics.getNumLostNMs()
+ clusterMetrics.getNumDecommisionedNMs() + clusterMetrics.getNumDecommisionedNMs()
+ clusterMetrics.getNumRebootedNMs(), totalNodes); + clusterMetrics.getNumRebootedNMs()
+ clusterMetrics.getUnhealthyNMs(), totalNodes);
assertEquals("lostNodes doesn't match", clusterMetrics.getNumLostNMs(), assertEquals("lostNodes doesn't match", clusterMetrics.getNumLostNMs(),
lostNodes); lostNodes);
assertEquals("unhealthyNodes doesn't match", assertEquals("unhealthyNodes doesn't match",