HDFS-4947. Merging change r1517040 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1517049 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Brandon Li 2013-08-23 21:26:23 +00:00
parent 773cab510f
commit 80ead68bfc
10 changed files with 884 additions and 143 deletions

View File

@ -189,4 +189,16 @@ public class Nfs3Constant {
public final static int CREATE_UNCHECKED = 0;
public final static int CREATE_GUARDED = 1;
public final static int CREATE_EXCLUSIVE = 2;
public static final String EXPORTS_ALLOWED_HOSTS_SEPARATOR = ";";
/** Allowed hosts for nfs exports */
public static final String EXPORTS_ALLOWED_HOSTS_KEY = "hdfs.nfs.exports.allowed.hosts";
public static final String EXPORTS_ALLOWED_HOSTS_KEY_DEFAULT = "* rw";
/** Size for nfs exports cache */
public static final String EXPORTS_CACHE_SIZE_KEY = "hdfs.nfs.exports.cache.size";
public static final int EXPORTS_CACHE_SIZE_DEFAULT = 512;
/** Expiration time for nfs exports cache entry */
public static final String EXPORTS_CACHE_EXPIRYTIME_MILLIS_KEY = "hdfs.nfs.exports.cache.expirytime.millis";
public static final long EXPORTS_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 15 * 60 * 1000; // 15 min
}

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.nfs.nfs3;
import java.net.InetAddress;
import org.apache.hadoop.nfs.nfs3.response.NFS3Response;
import org.apache.hadoop.oncrpc.RpcAuthSys;
import org.apache.hadoop.oncrpc.XDR;
@ -31,53 +33,54 @@ public interface Nfs3Interface {
public NFS3Response nullProcedure();
/** GETATTR: Get file attributes */
public NFS3Response getattr(XDR xdr, RpcAuthSys authSys);
public NFS3Response getattr(XDR xdr, RpcAuthSys authSys, InetAddress client);
/** SETATTR: Set file attributes */
public NFS3Response setattr(XDR xdr, RpcAuthSys authSys);
public NFS3Response setattr(XDR xdr, RpcAuthSys authSys, InetAddress client);
/** LOOKUP: Lookup filename */
public NFS3Response lookup(XDR xdr, RpcAuthSys authSys);
public NFS3Response lookup(XDR xdr, RpcAuthSys authSys, InetAddress client);
/** ACCESS: Check access permission */
public NFS3Response access(XDR xdr, RpcAuthSys authSys);
public NFS3Response access(XDR xdr, RpcAuthSys authSys, InetAddress client);
/** READ: Read from file */
public NFS3Response read(XDR xdr, RpcAuthSys authSys);
public NFS3Response read(XDR xdr, RpcAuthSys authSys, InetAddress client);
/** WRITE: Write to file */
public NFS3Response write(XDR xdr, Channel channel, int xid, RpcAuthSys authSys);
public NFS3Response write(XDR xdr, Channel channel, int xid,
RpcAuthSys authSys, InetAddress client);
/** CREATE: Create a file */
public NFS3Response create(XDR xdr, RpcAuthSys authSys);
public NFS3Response create(XDR xdr, RpcAuthSys authSys, InetAddress client);
/** MKDIR: Create a directory */
public NFS3Response mkdir(XDR xdr, RpcAuthSys authSys);
public NFS3Response mkdir(XDR xdr, RpcAuthSys authSys, InetAddress client);
/** REMOVE: Remove a file */
public NFS3Response remove(XDR xdr, RpcAuthSys authSys);
public NFS3Response remove(XDR xdr, RpcAuthSys authSys, InetAddress client);
/** RMDIR: Remove a directory */
public NFS3Response rmdir(XDR xdr, RpcAuthSys authSys);
public NFS3Response rmdir(XDR xdr, RpcAuthSys authSys, InetAddress client);
/** RENAME: Rename a file or directory */
public NFS3Response rename(XDR xdr, RpcAuthSys authSys);
public NFS3Response rename(XDR xdr, RpcAuthSys authSys, InetAddress client);
/** SYMLINK: Create a symbolic link */
public NFS3Response symlink(XDR xdr, RpcAuthSys authSys);
public NFS3Response symlink(XDR xdr, RpcAuthSys authSys, InetAddress client);
/** READDIR: Read From directory */
public NFS3Response readdir(XDR xdr, RpcAuthSys authSys);
public NFS3Response readdir(XDR xdr, RpcAuthSys authSys, InetAddress client);
/** FSSTAT: Get dynamic file system information */
public NFS3Response fsstat(XDR xdr, RpcAuthSys authSys);
public NFS3Response fsstat(XDR xdr, RpcAuthSys authSys, InetAddress client);
/** FSINFO: Get static file system information */
public NFS3Response fsinfo(XDR xdr, RpcAuthSys authSys);
public NFS3Response fsinfo(XDR xdr, RpcAuthSys authSys, InetAddress client);
/** PATHCONF: Retrieve POSIX information */
public NFS3Response pathconf(XDR xdr, RpcAuthSys authSys);
public NFS3Response pathconf(XDR xdr, RpcAuthSys authSys, InetAddress client);
/** COMMIT: Commit cached data on a server to stable storage */
public NFS3Response commit(XDR xdr, RpcAuthSys authSys);
public NFS3Response commit(XDR xdr, RpcAuthSys authSys, InetAddress client);
}

View File

@ -27,6 +27,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.nfs.security.AccessPrivilege;
import org.apache.hadoop.hdfs.nfs.security.NfsExports;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.mount.MountEntry;
@ -59,6 +61,8 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
/** List that is unmodifiable */
private final List<String> exports;
private final NfsExports hostsMatcher;
public RpcProgramMountd() throws IOException {
this(new ArrayList<String>(0));
@ -72,19 +76,29 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
throws IOException {
// Note that RPC cache is not enabled
super("mountd", "localhost", PORT, PROGRAM, VERSION_1, VERSION_3, 0);
this.hostsMatcher = NfsExports.getInstance(config);
this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>());
this.exports = Collections.unmodifiableList(exports);
this.dfsClient = new DFSClient(NameNode.getAddress(config), config);
}
@Override
public XDR nullOp(XDR out, int xid, InetAddress client) {
if (LOG.isDebugEnabled()) {
LOG.debug("MOUNT NULLOP : " + " client: " + client);
}
return RpcAcceptedReply.voidReply(out, xid);
return RpcAcceptedReply.voidReply(out, xid);
}
@Override
public XDR mnt(XDR xdr, XDR out, int xid, InetAddress client) {
AccessPrivilege accessPrivilege = hostsMatcher.getAccessPrivilege(client);
if (accessPrivilege == AccessPrivilege.NONE) {
return MountResponse.writeMNTResponse(Nfs3Status.NFS3ERR_ACCES, out, xid,
null);
}
String path = xdr.readString();
if (LOG.isDebugEnabled()) {
LOG.debug("MOUNT MNT path: " + path + " client: " + client);
@ -121,6 +135,7 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
return out;
}
@Override
public XDR dump(XDR out, int xid, InetAddress client) {
if (LOG.isDebugEnabled()) {
LOG.debug("MOUNT NULLOP : " + " client: " + client);
@ -131,6 +146,7 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
return out;
}
@Override
public XDR umnt(XDR xdr, XDR out, int xid, InetAddress client) {
String path = xdr.readString();
if (LOG.isDebugEnabled()) {
@ -143,6 +159,7 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
return out;
}
@Override
public XDR umntall(XDR out, int xid, InetAddress client) {
if (LOG.isDebugEnabled()) {
LOG.debug("MOUNT UMNTALL : " + " client: " + client);

View File

@ -32,12 +32,17 @@ import org.apache.hadoop.util.StringUtils;
* Only TCP server is supported and UDP is not supported.
*/
public class Nfs3 extends Nfs3Base {
static {
Configuration.addDefaultResource("hdfs-default.xml");
Configuration.addDefaultResource("hdfs-site.xml");
}
public Nfs3(List<String> exports) throws IOException {
super(new Mountd(exports), new RpcProgramNfs3(exports));
super(new Mountd(exports), new RpcProgramNfs3());
}
public Nfs3(List<String> exports, Configuration config) throws IOException {
super(new Mountd(exports, config), new RpcProgramNfs3(exports, config));
super(new Mountd(exports, config), new RpcProgramNfs3(config));
}
public static void main(String[] args) throws IOException {

View File

@ -88,6 +88,7 @@ public class Nfs3Utils {
return new WccAttr(attr.getSize(), attr.getMtime(), attr.getCtime());
}
// TODO: maybe not efficient
public static WccData createWccData(final WccAttr preOpAttr,
DFSClient dfsClient, final String fileIdPath, final IdUserGroup iug)
throws IOException {

View File

@ -22,22 +22,23 @@ import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.nfs.security.AccessPrivilege;
import org.apache.hadoop.hdfs.nfs.security.NfsExports;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -125,6 +126,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
private final IdUserGroup iug;// = new IdUserGroup();
private final DFSClientCache clientCache;
private final NfsExports exports;
/**
* superUserClient should always impersonate HDFS file system owner to send
* requests which requires supergroup privilege. This requires the same user
@ -138,17 +141,19 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
private Statistics statistics;
private String writeDumpDir; // The dir save dump files
public RpcProgramNfs3(List<String> exports) throws IOException {
this(exports, new Configuration());
public RpcProgramNfs3() throws IOException {
this(new Configuration());
}
public RpcProgramNfs3(List<String> exports, Configuration config)
public RpcProgramNfs3(Configuration config)
throws IOException {
super("NFS3", "localhost", Nfs3Constant.PORT, Nfs3Constant.PROGRAM,
Nfs3Constant.VERSION, Nfs3Constant.VERSION, 100);
config.set(FsPermission.UMASK_LABEL, "000");
iug = new IdUserGroup();
exports = NfsExports.getInstance(config);
writeManager = new WriteManager(iug, config);
clientCache = new DFSClientCache(config);
superUserClient = new DFSClient(NameNode.getAddress(config), config);
@ -185,7 +190,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
/******************************************************
* RPC call handlers
******************************************************/
@Override
public NFS3Response nullProcedure() {
if (LOG.isDebugEnabled()) {
LOG.debug("NFS NULL");
@ -193,8 +199,16 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return new VoidResponse(Nfs3Status.NFS3_OK);
}
public GETATTR3Response getattr(XDR xdr, RpcAuthSys authSys) {
@Override
public GETATTR3Response getattr(XDR xdr, RpcAuthSys authSys,
InetAddress client) {
GETATTR3Response response = new GETATTR3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
response.setStatus(Nfs3Status.NFS3ERR_ACCES);
return response;
}
String uname = authSysCheck(authSys);
DFSClient dfsClient = clientCache.get(uname);
if (dfsClient == null) {
@ -267,7 +281,9 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
}
}
public SETATTR3Response setattr(XDR xdr, RpcAuthSys authSys) {
@Override
public SETATTR3Response setattr(XDR xdr, RpcAuthSys authSys,
InetAddress client) {
SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK);
String uname = authSysCheck(authSys);
DFSClient dfsClient = clientCache.get(uname);
@ -298,34 +314,39 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
}
String fileIdPath = Nfs3Utils.getFileIdPath(handle);
WccAttr preOpAttr = null;
Nfs3FileAttributes preOpAttr = null;
try {
preOpAttr = Nfs3Utils.getWccAttr(dfsClient, fileIdPath);
preOpAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
if (preOpAttr == null) {
LOG.info("Can't get path for fileId:" + handle.getFileId());
response.setStatus(Nfs3Status.NFS3ERR_STALE);
return response;
}
WccAttr preOpWcc = Nfs3Utils.getWccAttr(preOpAttr);
if (request.isCheck()) {
if (!preOpAttr.getCtime().equals(request.getCtime())) {
WccData wccData = Nfs3Utils.createWccData(preOpAttr, dfsClient,
fileIdPath, iug);
WccData wccData = new WccData(preOpWcc, preOpAttr);
return new SETATTR3Response(Nfs3Status.NFS3ERR_NOT_SYNC, wccData);
}
}
// check the write access privilege
if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
return new SETATTR3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
preOpWcc, preOpAttr));
}
setattrInternal(dfsClient, fileIdPath, request.getAttr(), true);
Nfs3FileAttributes postOpAttr = Nfs3Utils.getFileAttr(dfsClient,
fileIdPath, iug);
WccData wccData = new WccData(preOpAttr, postOpAttr);
WccData wccData = new WccData(preOpWcc, postOpAttr);
return new SETATTR3Response(Nfs3Status.NFS3_OK, wccData);
} catch (IOException e) {
LOG.warn("Exception ", e);
WccData wccData = null;
try {
wccData = Nfs3Utils
.createWccData(preOpAttr, dfsClient, fileIdPath, iug);
wccData = Nfs3Utils.createWccData(Nfs3Utils.getWccAttr(preOpAttr),
dfsClient, fileIdPath, iug);
} catch (IOException e1) {
LOG.info("Can't get postOpAttr for fileIdPath: " + fileIdPath);
}
@ -337,8 +358,15 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
}
}
public LOOKUP3Response lookup(XDR xdr, RpcAuthSys authSys) {
@Override
public LOOKUP3Response lookup(XDR xdr, RpcAuthSys authSys, InetAddress client) {
LOOKUP3Response response = new LOOKUP3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
response.setStatus(Nfs3Status.NFS3ERR_ACCES);
return response;
}
String uname = authSysCheck(authSys);
DFSClient dfsClient = clientCache.get(uname);
if (dfsClient == null) {
@ -392,8 +420,15 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
}
}
public ACCESS3Response access(XDR xdr, RpcAuthSys authSys) {
@Override
public ACCESS3Response access(XDR xdr, RpcAuthSys authSys, InetAddress client) {
ACCESS3Response response = new ACCESS3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
response.setStatus(Nfs3Status.NFS3ERR_ACCES);
return response;
}
String uname = authSysCheck(authSys);
DFSClient dfsClient = clientCache.get(uname);
if (dfsClient == null) {
@ -434,12 +469,20 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
}
}
public READLINK3Response readlink(XDR xdr, RpcAuthSys authSys) {
public READLINK3Response readlink(XDR xdr, RpcAuthSys authSys,
InetAddress client) {
return new READLINK3Response(Nfs3Status.NFS3ERR_NOTSUPP);
}
public READ3Response read(XDR xdr, RpcAuthSys authSys) {
@Override
public READ3Response read(XDR xdr, RpcAuthSys authSys, InetAddress client) {
READ3Response response = new READ3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
response.setStatus(Nfs3Status.NFS3ERR_ACCES);
return response;
}
String uname = authSysCheck(authSys);
DFSClient dfsClient = clientCache.get(uname);
if (dfsClient == null) {
@ -528,8 +571,9 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
}
}
@Override
public WRITE3Response write(XDR xdr, Channel channel, int xid,
RpcAuthSys authSys) {
RpcAuthSys authSys, InetAddress client) {
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK);
String uname = authSysCheck(authSys);
DFSClient dfsClient = clientCache.get(uname);
@ -570,6 +614,13 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
LOG.error("Can't get path for fileId:" + handle.getFileId());
return new WRITE3Response(Nfs3Status.NFS3ERR_STALE);
}
if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
return new WRITE3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
Nfs3Utils.getWccAttr(preOpAttr), preOpAttr), 0, stableHow,
Nfs3Constant.WRITE_COMMIT_VERF);
}
if (LOG.isDebugEnabled()) {
LOG.debug("requesed offset=" + offset + " and current filesize="
+ preOpAttr.getSize());
@ -596,7 +647,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return null;
}
public CREATE3Response create(XDR xdr, RpcAuthSys authSys) {
@Override
public CREATE3Response create(XDR xdr, RpcAuthSys authSys, InetAddress client) {
CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK);
String uname = authSysCheck(authSys);
DFSClient dfsClient = clientCache.get(uname);
@ -631,16 +683,22 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
HdfsDataOutputStream fos = null;
String dirFileIdPath = Nfs3Utils.getFileIdPath(dirHandle);
WccAttr preOpDirAttr = null;
Nfs3FileAttributes preOpDirAttr = null;
Nfs3FileAttributes postOpObjAttr = null;
FileHandle fileHandle = null;
WccData dirWcc = null;
try {
preOpDirAttr = Nfs3Utils.getWccAttr(dfsClient, dirFileIdPath);
preOpDirAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
if (preOpDirAttr == null) {
LOG.error("Can't get path for dirHandle:" + dirHandle);
return new CREATE3Response(Nfs3Status.NFS3ERR_STALE);
}
if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
return new CREATE3Response(Nfs3Status.NFS3ERR_ACCES, null,
preOpDirAttr, new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
preOpDirAttr));
}
String fileIdPath = Nfs3Utils.getFileIdPath(dirHandle) + "/" + fileName;
SetAttr3 setAttr3 = request.getObjAttr();
@ -649,9 +707,9 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
SetAttrField.MODE) ? new FsPermission((short) setAttr3.getMode())
: FsPermission.getDefault().applyUMask(umask);
EnumSet<CreateFlag> flag = (createMode != Nfs3Constant.CREATE_EXCLUSIVE) ? EnumSet
.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) : EnumSet
.of(CreateFlag.CREATE);
EnumSet<CreateFlag> flag = (createMode != Nfs3Constant.CREATE_EXCLUSIVE) ?
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) :
EnumSet.of(CreateFlag.CREATE);
fos = new HdfsDataOutputStream(dfsClient.create(fileIdPath, permission,
flag, false, replication, blockSize, null, bufferSize, null),
@ -668,8 +726,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
}
postOpObjAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
dirWcc = Nfs3Utils.createWccData(preOpDirAttr, dfsClient, dirFileIdPath,
iug);
dirWcc = Nfs3Utils.createWccData(Nfs3Utils.getWccAttr(preOpDirAttr),
dfsClient, dirFileIdPath, iug);
} catch (IOException e) {
LOG.error("Exception", e);
if (fos != null) {
@ -682,8 +740,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
}
if (dirWcc == null) {
try {
dirWcc = Nfs3Utils.createWccData(preOpDirAttr, dfsClient,
dirFileIdPath, iug);
dirWcc = Nfs3Utils.createWccData(Nfs3Utils.getWccAttr(preOpDirAttr),
dfsClient, dirFileIdPath, iug);
} catch (IOException e1) {
LOG.error("Can't get postOpDirAttr for dirFileId:"
+ dirHandle.getFileId());
@ -712,7 +770,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
dirWcc);
}
public MKDIR3Response mkdir(XDR xdr, RpcAuthSys authSys) {
@Override
public MKDIR3Response mkdir(XDR xdr, RpcAuthSys authSys, InetAddress client) {
MKDIR3Response response = new MKDIR3Response(Nfs3Status.NFS3_OK);
String uname = authSysCheck(authSys);
DFSClient dfsClient = clientCache.get(uname);
@ -739,17 +798,22 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
}
String dirFileIdPath = Nfs3Utils.getFileIdPath(dirHandle);
WccAttr preOpDirAttr = null;
Nfs3FileAttributes preOpDirAttr = null;
Nfs3FileAttributes postOpDirAttr = null;
Nfs3FileAttributes postOpObjAttr = null;
FileHandle objFileHandle = null;
try {
preOpDirAttr = Nfs3Utils.getWccAttr(dfsClient, dirFileIdPath);
preOpDirAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
if (preOpDirAttr == null) {
LOG.info("Can't get path for dir fileId:" + dirHandle.getFileId());
return new MKDIR3Response(Nfs3Status.NFS3ERR_STALE);
}
if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
return new MKDIR3Response(Nfs3Status.NFS3ERR_ACCES, null, preOpDirAttr,
new WccData(Nfs3Utils.getWccAttr(preOpDirAttr), preOpDirAttr));
}
final String fileIdPath = dirFileIdPath + "/" + fileName;
SetAttr3 setAttr3 = request.getObjAttr();
FsPermission permission = setAttr3.getUpdateFields().contains(
@ -757,8 +821,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
: FsPermission.getDefault().applyUMask(umask);
if (!dfsClient.mkdirs(fileIdPath, permission, false)) {
WccData dirWcc = Nfs3Utils.createWccData(preOpDirAttr, dfsClient,
dirFileIdPath, iug);
WccData dirWcc = Nfs3Utils.createWccData(
Nfs3Utils.getWccAttr(preOpDirAttr), dfsClient, dirFileIdPath, iug);
return new MKDIR3Response(Nfs3Status.NFS3ERR_IO, null, null, dirWcc);
}
@ -771,8 +835,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
postOpObjAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
objFileHandle = new FileHandle(postOpObjAttr.getFileId());
WccData dirWcc = Nfs3Utils.createWccData(preOpDirAttr, dfsClient,
dirFileIdPath, iug);
WccData dirWcc = Nfs3Utils.createWccData(
Nfs3Utils.getWccAttr(preOpDirAttr), dfsClient, dirFileIdPath, iug);
return new MKDIR3Response(Nfs3Status.NFS3_OK, new FileHandle(
postOpObjAttr.getFileId()), postOpObjAttr, dirWcc);
} catch (IOException e) {
@ -785,7 +849,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
LOG.info("Can't get postOpDirAttr for " + dirFileIdPath);
}
}
WccData dirWcc = new WccData(preOpDirAttr, postOpDirAttr);
WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
postOpDirAttr);
if (e instanceof AccessControlException) {
return new MKDIR3Response(Nfs3Status.NFS3ERR_PERM, objFileHandle,
postOpObjAttr, dirWcc);
@ -796,12 +861,12 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
}
}
public READDIR3Response mknod(XDR xdr, RpcAuthSys authSys) {
public READDIR3Response mknod(XDR xdr, RpcAuthSys authSys, InetAddress client) {
return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
}
public REMOVE3Response remove(XDR xdr, RpcAuthSys authSys) {
@Override
public REMOVE3Response remove(XDR xdr, RpcAuthSys authSys, InetAddress client) {
REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK);
String uname = authSysCheck(authSys);
DFSClient dfsClient = clientCache.get(uname);
@ -825,10 +890,10 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
}
String dirFileIdPath = Nfs3Utils.getFileIdPath(dirHandle);
WccAttr preOpDirAttr = null;
Nfs3FileAttributes preOpDirAttr = null;
Nfs3FileAttributes postOpDirAttr = null;
try {
preOpDirAttr = Nfs3Utils.getWccAttr(dfsClient, dirFileIdPath);
preOpDirAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
if (preOpDirAttr == null) {
LOG.info("Can't get path for dir fileId:" + dirHandle.getFileId());
return new REMOVE3Response(Nfs3Status.NFS3ERR_STALE);
@ -838,24 +903,23 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient,
fileIdPath);
if (fstat == null) {
WccData dirWcc = Nfs3Utils.createWccData(preOpDirAttr, dfsClient,
dirFileIdPath, iug);
WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
preOpDirAttr);
return new REMOVE3Response(Nfs3Status.NFS3ERR_NOENT, dirWcc);
}
if (fstat.isDir()) {
WccData dirWcc = Nfs3Utils.createWccData(preOpDirAttr, dfsClient,
dirFileIdPath, iug);
WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
preOpDirAttr);
return new REMOVE3Response(Nfs3Status.NFS3ERR_ISDIR, dirWcc);
}
if (dfsClient.delete(fileIdPath, false) == false) {
WccData dirWcc = Nfs3Utils.createWccData(preOpDirAttr, dfsClient,
dirFileIdPath, iug);
boolean result = dfsClient.delete(fileIdPath, false);
WccData dirWcc = Nfs3Utils.createWccData(
Nfs3Utils.getWccAttr(preOpDirAttr), dfsClient, dirFileIdPath, iug);
if (!result) {
return new REMOVE3Response(Nfs3Status.NFS3ERR_ACCES, dirWcc);
}
WccData dirWcc = Nfs3Utils.createWccData(preOpDirAttr, dfsClient,
dirFileIdPath, iug);
return new REMOVE3Response(Nfs3Status.NFS3_OK, dirWcc);
} catch (IOException e) {
LOG.warn("Exception ", e);
@ -867,7 +931,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
LOG.info("Can't get postOpDirAttr for " + dirFileIdPath);
}
}
WccData dirWcc = new WccData(preOpDirAttr, postOpDirAttr);
WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
postOpDirAttr);
if (e instanceof AccessControlException) {
return new REMOVE3Response(Nfs3Status.NFS3ERR_PERM, dirWcc);
} else {
@ -876,7 +941,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
}
}
public RMDIR3Response rmdir(XDR xdr, RpcAuthSys authSys) {
@Override
public RMDIR3Response rmdir(XDR xdr, RpcAuthSys authSys, InetAddress client) {
RMDIR3Response response = new RMDIR3Response(Nfs3Status.NFS3_OK);
String uname = authSysCheck(authSys);
DFSClient dfsClient = clientCache.get(uname);
@ -901,45 +967,43 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
}
String dirFileIdPath = Nfs3Utils.getFileIdPath(dirHandle);
WccAttr preOpDirAttr = null;
Nfs3FileAttributes preOpDirAttr = null;
Nfs3FileAttributes postOpDirAttr = null;
try {
preOpDirAttr = Nfs3Utils.getWccAttr(dfsClient, dirFileIdPath);
preOpDirAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
if (preOpDirAttr == null) {
LOG.info("Can't get path for dir fileId:" + dirHandle.getFileId());
return new RMDIR3Response(Nfs3Status.NFS3ERR_STALE);
}
WccData errWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
preOpDirAttr);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
return new RMDIR3Response(Nfs3Status.NFS3ERR_ACCES, errWcc);
}
String fileIdPath = dirFileIdPath + "/" + fileName;
HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient,
fileIdPath);
if (fstat == null) {
WccData dirWcc = Nfs3Utils.createWccData(preOpDirAttr, dfsClient,
dirFileIdPath, iug);
return new RMDIR3Response(Nfs3Status.NFS3ERR_NOENT, dirWcc);
return new RMDIR3Response(Nfs3Status.NFS3ERR_NOENT, errWcc);
}
if (!fstat.isDir()) {
WccData dirWcc = Nfs3Utils.createWccData(preOpDirAttr, dfsClient,
dirFileIdPath, iug);
return new RMDIR3Response(Nfs3Status.NFS3ERR_NOTDIR, dirWcc);
return new RMDIR3Response(Nfs3Status.NFS3ERR_NOTDIR, errWcc);
}
if (fstat.getChildrenNum() > 0) {
WccData dirWcc = Nfs3Utils.createWccData(preOpDirAttr, dfsClient,
dirFileIdPath, iug);
return new RMDIR3Response(Nfs3Status.NFS3ERR_NOTEMPTY, dirWcc);
return new RMDIR3Response(Nfs3Status.NFS3ERR_NOTEMPTY, errWcc);
}
if (dfsClient.delete(fileIdPath, false) == false) {
WccData dirWcc = Nfs3Utils.createWccData(preOpDirAttr, dfsClient,
dirFileIdPath, iug);
boolean result = dfsClient.delete(fileIdPath, false);
WccData dirWcc = Nfs3Utils.createWccData(
Nfs3Utils.getWccAttr(preOpDirAttr), dfsClient, dirFileIdPath, iug);
if (!result) {
return new RMDIR3Response(Nfs3Status.NFS3ERR_ACCES, dirWcc);
}
postOpDirAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
WccData wccData = new WccData(preOpDirAttr, postOpDirAttr);
return new RMDIR3Response(Nfs3Status.NFS3_OK, wccData);
return new RMDIR3Response(Nfs3Status.NFS3_OK, dirWcc);
} catch (IOException e) {
LOG.warn("Exception ", e);
// Try to return correct WccData
@ -950,7 +1014,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
LOG.info("Can't get postOpDirAttr for " + dirFileIdPath);
}
}
WccData dirWcc = new WccData(preOpDirAttr, postOpDirAttr);
WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
postOpDirAttr);
if (e instanceof AccessControlException) {
return new RMDIR3Response(Nfs3Status.NFS3ERR_PERM, dirWcc);
} else {
@ -959,7 +1024,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
}
}
public RENAME3Response rename(XDR xdr, RpcAuthSys authSys) {
@Override
public RENAME3Response rename(XDR xdr, RpcAuthSys authSys, InetAddress client) {
RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK);
String uname = authSysCheck(authSys);
DFSClient dfsClient = clientCache.get(uname);
@ -987,23 +1053,31 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
String fromDirFileIdPath = Nfs3Utils.getFileIdPath(fromHandle);
String toDirFileIdPath = Nfs3Utils.getFileIdPath(toHandle);
WccAttr fromPreOpAttr = null;
WccAttr toPreOpAttr = null;
Nfs3FileAttributes fromPreOpAttr = null;
Nfs3FileAttributes toPreOpAttr = null;
WccData fromDirWcc = null;
WccData toDirWcc = null;
try {
fromPreOpAttr = Nfs3Utils.getWccAttr(dfsClient, fromDirFileIdPath);
fromPreOpAttr = Nfs3Utils.getFileAttr(dfsClient, fromDirFileIdPath, iug);
if (fromPreOpAttr == null) {
LOG.info("Can't get path for fromHandle fileId:"
+ fromHandle.getFileId());
return new RENAME3Response(Nfs3Status.NFS3ERR_STALE);
}
toPreOpAttr = Nfs3Utils.getWccAttr(dfsClient, toDirFileIdPath);
toPreOpAttr = Nfs3Utils.getFileAttr(dfsClient, toDirFileIdPath, iug);
if (toPreOpAttr == null) {
LOG.info("Can't get path for toHandle fileId:" + toHandle.getFileId());
return new RENAME3Response(Nfs3Status.NFS3ERR_STALE);
}
if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
WccData fromWcc = new WccData(Nfs3Utils.getWccAttr(fromPreOpAttr),
fromPreOpAttr);
WccData toWcc = new WccData(Nfs3Utils.getWccAttr(toPreOpAttr),
toPreOpAttr);
return new RENAME3Response(Nfs3Status.NFS3ERR_ACCES, fromWcc, toWcc);
}
String src = fromDirFileIdPath + "/" + fromName;
String dst = toDirFileIdPath + "/" + toName;
@ -1011,20 +1085,20 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
dfsClient.rename(src, dst, Options.Rename.NONE);
// Assemble the reply
fromDirWcc = Nfs3Utils.createWccData(fromPreOpAttr, dfsClient,
fromDirFileIdPath, iug);
toDirWcc = Nfs3Utils.createWccData(toPreOpAttr, dfsClient,
toDirFileIdPath, iug);
fromDirWcc = Nfs3Utils.createWccData(Nfs3Utils.getWccAttr(fromPreOpAttr),
dfsClient, fromDirFileIdPath, iug);
toDirWcc = Nfs3Utils.createWccData(Nfs3Utils.getWccAttr(toPreOpAttr),
dfsClient, toDirFileIdPath, iug);
return new RENAME3Response(Nfs3Status.NFS3_OK, fromDirWcc, toDirWcc);
} catch (IOException e) {
LOG.warn("Exception ", e);
// Try to return correct WccData
try {
fromDirWcc = Nfs3Utils.createWccData(fromPreOpAttr, dfsClient,
fromDirFileIdPath, iug);
toDirWcc = Nfs3Utils.createWccData(toPreOpAttr, dfsClient,
toDirFileIdPath, iug);
fromDirWcc = Nfs3Utils.createWccData(
Nfs3Utils.getWccAttr(fromPreOpAttr), dfsClient, fromDirFileIdPath,
iug);
toDirWcc = Nfs3Utils.createWccData(Nfs3Utils.getWccAttr(toPreOpAttr),
dfsClient, toDirFileIdPath, iug);
} catch (IOException e1) {
LOG.info("Can't get postOpDirAttr for " + fromDirFileIdPath + " or"
+ toDirFileIdPath);
@ -1038,16 +1112,25 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
}
}
public SYMLINK3Response symlink(XDR xdr, RpcAuthSys authSys) {
public SYMLINK3Response symlink(XDR xdr, RpcAuthSys authSys,
InetAddress client) {
return new SYMLINK3Response(Nfs3Status.NFS3ERR_NOTSUPP);
}
public READDIR3Response link(XDR xdr, RpcAuthSys authSys) {
public READDIR3Response link(XDR xdr, RpcAuthSys authSys, InetAddress client) {
return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
}
public READDIR3Response readdir(XDR xdr, RpcAuthSys authSys) {
@Override
public READDIR3Response readdir(XDR xdr, RpcAuthSys authSys,
InetAddress client) {
READDIR3Response response = new READDIR3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
response.setStatus(Nfs3Status.NFS3ERR_ACCES);
return response;
}
String uname = authSysCheck(authSys);
DFSClient dfsClient = clientCache.get(uname);
if (dfsClient == null) {
@ -1180,7 +1263,12 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
dirStatus.getModificationTime(), dirList);
}
public READDIRPLUS3Response readdirplus(XDR xdr, RpcAuthSys authSys) {
public READDIRPLUS3Response readdirplus(XDR xdr, RpcAuthSys authSys,
InetAddress client) {
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_ACCES);
}
String uname = authSysCheck(authSys);
DFSClient dfsClient = clientCache.get(uname);
if (dfsClient == null) {
@ -1325,8 +1413,15 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
dirStatus.getModificationTime(), dirListPlus);
}
public FSSTAT3Response fsstat(XDR xdr, RpcAuthSys authSys) {
@Override
public FSSTAT3Response fsstat(XDR xdr, RpcAuthSys authSys, InetAddress client) {
FSSTAT3Response response = new FSSTAT3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
response.setStatus(Nfs3Status.NFS3ERR_ACCES);
return response;
}
String uname = authSysCheck(authSys);
DFSClient dfsClient = clientCache.get(uname);
if (dfsClient == null) {
@ -1376,8 +1471,15 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
}
}
public FSINFO3Response fsinfo(XDR xdr, RpcAuthSys authSys) {
@Override
public FSINFO3Response fsinfo(XDR xdr, RpcAuthSys authSys, InetAddress client) {
FSINFO3Response response = new FSINFO3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
response.setStatus(Nfs3Status.NFS3ERR_ACCES);
return response;
}
String uname = authSysCheck(authSys);
DFSClient dfsClient = clientCache.get(uname);
if (dfsClient == null) {
@ -1421,8 +1523,16 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
}
}
public PATHCONF3Response pathconf(XDR xdr, RpcAuthSys authSys) {
@Override
public PATHCONF3Response pathconf(XDR xdr, RpcAuthSys authSys,
InetAddress client) {
PATHCONF3Response response = new PATHCONF3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
response.setStatus(Nfs3Status.NFS3ERR_ACCES);
return response;
}
String uname = authSysCheck(authSys);
DFSClient dfsClient = clientCache.get(uname);
if (dfsClient == null) {
@ -1461,7 +1571,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
}
}
public COMMIT3Response commit(XDR xdr, RpcAuthSys authSys) {
@Override
public COMMIT3Response commit(XDR xdr, RpcAuthSys authSys, InetAddress client) {
COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
String uname = authSysCheck(authSys);
DFSClient dfsClient = clientCache.get(uname);
@ -1486,13 +1597,20 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
}
String fileIdPath = Nfs3Utils.getFileIdPath(handle);
WccAttr preOpAttr = null;
Nfs3FileAttributes preOpAttr = null;
try {
preOpAttr = Nfs3Utils.getWccAttr(dfsClient, fileIdPath);
preOpAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
if (preOpAttr == null) {
LOG.info("Can't get path for fileId:" + handle.getFileId());
return new COMMIT3Response(Nfs3Status.NFS3ERR_STALE);
}
if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
return new COMMIT3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
Nfs3Utils.getWccAttr(preOpAttr), preOpAttr),
Nfs3Constant.WRITE_COMMIT_VERF);
}
long commitOffset = (request.getCount() == 0) ? 0
: (request.getOffset() + request.getCount());
@ -1504,7 +1622,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
}
Nfs3FileAttributes postOpAttr = writeManager.getFileAttr(dfsClient,
handle, iug);
WccData fileWcc = new WccData(preOpAttr, postOpAttr);
WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), postOpAttr);
return new COMMIT3Response(status, fileWcc,
Nfs3Constant.WRITE_COMMIT_VERF);
@ -1516,7 +1634,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} catch (IOException e1) {
LOG.info("Can't get postOpAttr for fileId: " + handle.getFileId());
}
WccData fileWcc = new WccData(preOpAttr, postOpAttr);
WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), postOpAttr);
return new COMMIT3Response(Nfs3Status.NFS3ERR_IO, fileWcc,
Nfs3Constant.WRITE_COMMIT_VERF);
}
@ -1554,47 +1672,47 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
if (nfsproc3 == NFSPROC3.NULL) {
response = nullProcedure();
} else if (nfsproc3 == NFSPROC3.GETATTR) {
response = getattr(xdr, authSys);
response = getattr(xdr, authSys, client);
} else if (nfsproc3 == NFSPROC3.SETATTR) {
response = setattr(xdr, authSys);
response = setattr(xdr, authSys, client);
} else if (nfsproc3 == NFSPROC3.LOOKUP) {
response = lookup(xdr, authSys);
response = lookup(xdr, authSys, client);
} else if (nfsproc3 == NFSPROC3.ACCESS) {
response = access(xdr, authSys);
response = access(xdr, authSys, client);
} else if (nfsproc3 == NFSPROC3.READLINK) {
response = readlink(xdr, authSys);
response = readlink(xdr, authSys, client);
} else if (nfsproc3 == NFSPROC3.READ) {
response = read(xdr, authSys);
response = read(xdr, authSys, client);
} else if (nfsproc3 == NFSPROC3.WRITE) {
response = write(xdr, channel, xid, authSys);
response = write(xdr, channel, xid, authSys, client);
} else if (nfsproc3 == NFSPROC3.CREATE) {
response = create(xdr, authSys);
response = create(xdr, authSys, client);
} else if (nfsproc3 == NFSPROC3.MKDIR) {
response = mkdir(xdr, authSys);
response = mkdir(xdr, authSys, client);
} else if (nfsproc3 == NFSPROC3.SYMLINK) {
response = symlink(xdr, authSys);
response = symlink(xdr, authSys, client);
} else if (nfsproc3 == NFSPROC3.MKNOD) {
response = mknod(xdr, authSys);
response = mknod(xdr, authSys, client);
} else if (nfsproc3 == NFSPROC3.REMOVE) {
response = remove(xdr, authSys);
response = remove(xdr, authSys, client);
} else if (nfsproc3 == NFSPROC3.RMDIR) {
response = rmdir(xdr, authSys);
response = rmdir(xdr, authSys, client);
} else if (nfsproc3 == NFSPROC3.RENAME) {
response = rename(xdr, authSys);
response = rename(xdr, authSys, client);
} else if (nfsproc3 == NFSPROC3.LINK) {
response = link(xdr, authSys);
response = link(xdr, authSys, client);
} else if (nfsproc3 == NFSPROC3.READDIR) {
response = readdir(xdr, authSys);
response = readdir(xdr, authSys, client);
} else if (nfsproc3 == NFSPROC3.READDIRPLUS) {
response = readdirplus(xdr, authSys);
response = readdirplus(xdr, authSys, client);
} else if (nfsproc3 == NFSPROC3.FSSTAT) {
response = fsstat(xdr, authSys);
response = fsstat(xdr, authSys, client);
} else if (nfsproc3 == NFSPROC3.FSINFO) {
response = fsinfo(xdr, authSys);
response = fsinfo(xdr, authSys, client);
} else if (nfsproc3 == NFSPROC3.PATHCONF) {
response = pathconf(xdr, authSys);
response = pathconf(xdr, authSys, client);
} else if (nfsproc3 == NFSPROC3.COMMIT) {
response = commit(xdr, authSys);
response = commit(xdr, authSys, client);
} else {
// Invalid procedure
RpcAcceptedReply.voidReply(out, xid,
@ -1611,4 +1729,17 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(call.getProcedure());
return nfsproc3 == null || nfsproc3.isIdempotent();
}
private boolean checkAccessPrivilege(final InetAddress client,
final AccessPrivilege expected) {
AccessPrivilege access = exports.getAccessPrivilege(client);
if (access == AccessPrivilege.NONE) {
return false;
}
if (access == AccessPrivilege.READ_ONLY
&& expected == AccessPrivilege.READ_WRITE) {
return false;
}
return true;
}
}

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.security;
public enum AccessPrivilege {
READ_ONLY,
READ_WRITE,
NONE;
}

View File

@ -0,0 +1,354 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.security;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.net.util.SubnetUtils;
import org.apache.commons.net.util.SubnetUtils.SubnetInfo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.apache.hadoop.util.LightWeightCache;
import org.apache.hadoop.util.LightWeightGSet;
import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
import com.google.common.base.Preconditions;
/**
* This class provides functionality for loading and checking the mapping
* between client hosts and their access privileges.
*/
public class NfsExports {
private static NfsExports exports = null;
public static synchronized NfsExports getInstance(Configuration conf) {
if (exports == null) {
String matchHosts = conf.get(Nfs3Constant.EXPORTS_ALLOWED_HOSTS_KEY,
Nfs3Constant.EXPORTS_ALLOWED_HOSTS_KEY_DEFAULT);
int cacheSize = conf.getInt(Nfs3Constant.EXPORTS_CACHE_SIZE_KEY,
Nfs3Constant.EXPORTS_CACHE_SIZE_DEFAULT);
long expirationPeriodNano = conf.getLong(
Nfs3Constant.EXPORTS_CACHE_EXPIRYTIME_MILLIS_KEY,
Nfs3Constant.EXPORTS_CACHE_EXPIRYTIME_MILLIS_DEFAULT) * 1000 * 1000;
exports = new NfsExports(cacheSize, expirationPeriodNano, matchHosts);
}
return exports;
}
public static final Log LOG = LogFactory.getLog(NfsExports.class);
// only support IPv4 now
private static final String IP_ADDRESS =
"(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})";
private static final String SLASH_FORMAT_SHORT = IP_ADDRESS + "/(\\d{1,3})";
private static final String SLASH_FORMAT_LONG = IP_ADDRESS + "/" + IP_ADDRESS;
private static final Pattern CIDR_FORMAT_SHORT =
Pattern.compile(SLASH_FORMAT_SHORT);
private static final Pattern CIDR_FORMAT_LONG =
Pattern.compile(SLASH_FORMAT_LONG);
static class AccessCacheEntry implements LightWeightCache.Entry{
private final String hostAddr;
private AccessPrivilege access;
private final long expirationTime;
private LightWeightGSet.LinkedElement next;
AccessCacheEntry(String hostAddr, AccessPrivilege access,
long expirationTime) {
Preconditions.checkArgument(hostAddr != null);
this.hostAddr = hostAddr;
this.access = access;
this.expirationTime = expirationTime;
}
@Override
public int hashCode() {
return hostAddr.hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof AccessCacheEntry) {
AccessCacheEntry entry = (AccessCacheEntry) obj;
return this.hostAddr.equals(entry.hostAddr);
}
return false;
}
@Override
public void setNext(LinkedElement next) {
this.next = next;
}
@Override
public LinkedElement getNext() {
return this.next;
}
@Override
public void setExpirationTime(long timeNano) {
// we set expiration time in the constructor, and the expiration time
// does not change
}
@Override
public long getExpirationTime() {
return this.expirationTime;
}
}
private final List<Match> mMatches;
private final LightWeightCache<AccessCacheEntry, AccessCacheEntry> accessCache;
private final long cacheExpirationPeriod;
/**
* Constructor.
* @param cacheSize The size of the access privilege cache.
* @param expirationPeriodNano The period
* @param matchingHosts A string specifying one or multiple matchers.
*/
NfsExports(int cacheSize, long expirationPeriodNano, String matchHosts) {
this.cacheExpirationPeriod = expirationPeriodNano;
accessCache = new LightWeightCache<AccessCacheEntry, AccessCacheEntry>(
cacheSize, cacheSize, expirationPeriodNano, 0);
String[] matchStrings = matchHosts.split(
Nfs3Constant.EXPORTS_ALLOWED_HOSTS_SEPARATOR);
mMatches = new ArrayList<Match>(matchStrings.length);
for(String mStr : matchStrings) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing match string '" + mStr + "'");
}
mStr = mStr.trim();
if(!mStr.isEmpty()) {
mMatches.add(getMatch(mStr));
}
}
}
public AccessPrivilege getAccessPrivilege(InetAddress addr) {
return getAccessPrivilege(addr.getHostAddress(),
addr.getCanonicalHostName());
}
AccessPrivilege getAccessPrivilege(String address, String hostname) {
long now = System.nanoTime();
AccessCacheEntry newEntry = new AccessCacheEntry(address,
AccessPrivilege.NONE, now + this.cacheExpirationPeriod);
// check if there is a cache entry for the given address
AccessCacheEntry cachedEntry = accessCache.get(newEntry);
if (cachedEntry != null && now < cachedEntry.expirationTime) {
// get a non-expired cache entry, use it
return cachedEntry.access;
} else {
for(Match match : mMatches) {
if(match.isIncluded(address, hostname)) {
if (match.accessPrivilege == AccessPrivilege.READ_ONLY) {
newEntry.access = AccessPrivilege.READ_ONLY;
break;
} else if (match.accessPrivilege == AccessPrivilege.READ_WRITE) {
newEntry.access = AccessPrivilege.READ_WRITE;
}
}
}
accessCache.put(newEntry);
return newEntry.access;
}
}
private static abstract class Match {
private final AccessPrivilege accessPrivilege;
private Match(AccessPrivilege accessPrivilege) {
this.accessPrivilege = accessPrivilege;
}
public abstract boolean isIncluded(String address, String hostname);
}
/**
* Matcher covering all client hosts (specified by "*")
*/
private static class AnonymousMatch extends Match {
private AnonymousMatch(AccessPrivilege accessPrivilege) {
super(accessPrivilege);
}
@Override
public boolean isIncluded(String ip, String hostname) {
return true;
}
}
/**
* Matcher using CIDR for client host matching
*/
private static class CIDRMatch extends Match {
private final SubnetInfo subnetInfo;
private CIDRMatch(AccessPrivilege accessPrivilege, SubnetInfo subnetInfo) {
super(accessPrivilege);
this.subnetInfo = subnetInfo;
}
@Override
public boolean isIncluded(String address, String hostname) {
if(subnetInfo.isInRange(address)) {
if(LOG.isDebugEnabled()) {
LOG.debug("CIDRNMatcher low = " + subnetInfo.getLowAddress() +
", high = " + subnetInfo.getHighAddress() +
", allowing client '" + address + "', '" + hostname + "'");
}
return true;
}
if(LOG.isDebugEnabled()) {
LOG.debug("CIDRNMatcher low = " + subnetInfo.getLowAddress() +
", high = " + subnetInfo.getHighAddress() +
", denying client '" + address + "', '" + hostname + "'");
}
return false;
}
}
/**
* Matcher requiring exact string match for client host
*/
private static class ExactMatch extends Match {
private final String ipOrHost;
private ExactMatch(AccessPrivilege accessPrivilege, String ipOrHost) {
super(accessPrivilege);
this.ipOrHost = ipOrHost;
}
@Override
public boolean isIncluded(String address, String hostname) {
if(ipOrHost.equalsIgnoreCase(address) ||
ipOrHost.equalsIgnoreCase(hostname)) {
if(LOG.isDebugEnabled()) {
LOG.debug("ExactMatcher '" + ipOrHost + "', allowing client " +
"'" + address + "', '" + hostname + "'");
}
return true;
}
if(LOG.isDebugEnabled()) {
LOG.debug("ExactMatcher '" + ipOrHost + "', denying client " +
"'" + address + "', '" + hostname + "'");
}
return false;
}
}
/**
* Matcher where client hosts are specified by regular expression
*/
private static class RegexMatch extends Match {
private final Pattern pattern;
private RegexMatch(AccessPrivilege accessPrivilege, String wildcard) {
super(accessPrivilege);
this.pattern = Pattern.compile(wildcard, Pattern.CASE_INSENSITIVE);
}
@Override
public boolean isIncluded(String address, String hostname) {
if (pattern.matcher(address).matches()
|| pattern.matcher(hostname).matches()) {
if (LOG.isDebugEnabled()) {
LOG.debug("RegexMatcher '" + pattern.pattern()
+ "', allowing client '" + address + "', '" + hostname + "'");
}
return true;
}
if (LOG.isDebugEnabled()) {
LOG.debug("RegexMatcher '" + pattern.pattern()
+ "', denying client '" + address + "', '" + hostname + "'");
}
return false;
}
}
/**
* Loading a matcher from a string. The default access privilege is read-only.
* The string contains 1 or 2 parts, separated by whitespace characters, where
* the first part specifies the client hosts, and the second part (if
* existent) specifies the access privilege of the client hosts. I.e.,
*
* "client-hosts [access-privilege]"
*/
private static Match getMatch(String line) {
String[] parts = line.split("\\s+");
final String host;
AccessPrivilege privilege = AccessPrivilege.READ_ONLY;
switch (parts.length) {
case 1:
host = parts[0].toLowerCase().trim();
break;
case 2:
host = parts[0].toLowerCase().trim();
String option = parts[1].trim();
if ("rw".equalsIgnoreCase(option)) {
privilege = AccessPrivilege.READ_WRITE;
}
break;
default:
throw new IllegalArgumentException("Incorrectly formatted line '" + line
+ "'");
}
if (host.equals("*")) {
if (LOG.isDebugEnabled()) {
LOG.debug("Using match all for '" + host + "' and " + privilege);
}
return new AnonymousMatch(privilege);
} else if (CIDR_FORMAT_SHORT.matcher(host).matches()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Using CIDR match for '" + host + "' and " + privilege);
}
return new CIDRMatch(privilege, new SubnetUtils(host).getInfo());
} else if (CIDR_FORMAT_LONG.matcher(host).matches()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Using CIDR match for '" + host + "' and " + privilege);
}
String[] pair = host.split("/");
return new CIDRMatch(privilege,
new SubnetUtils(pair[0], pair[1]).getInfo());
} else if (host.contains("*") || host.contains("?") || host.contains("[")
|| host.contains("]")) {
if (LOG.isDebugEnabled()) {
LOG.debug("Using Regex match for '" + host + "' and " + privilege);
}
return new RegexMatch(privilege, host);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Using exact match for '" + host + "' and " + privilege);
}
return new ExactMatch(privilege, host);
}
}

View File

@ -0,0 +1,191 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.security;
import junit.framework.Assert;
import org.apache.hadoop.hdfs.nfs.security.AccessPrivilege;
import org.apache.hadoop.hdfs.nfs.security.NfsExports;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.junit.Test;
public class TestNfsExports {
private final String address1 = "192.168.0.1";
private final String address2 = "10.0.0.1";
private final String hostname1 = "a.b.com";
private final String hostname2 = "a.b.org";
private static final long ExpirationPeriod =
Nfs3Constant.EXPORTS_CACHE_EXPIRYTIME_MILLIS_DEFAULT * 1000 * 1000;
private static final int CacheSize = Nfs3Constant.EXPORTS_CACHE_SIZE_DEFAULT;
@Test
public void testWildcardRW() {
NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod, "* rw");
Assert.assertEquals(AccessPrivilege.READ_WRITE,
matcher.getAccessPrivilege(address1, hostname1));
}
@Test
public void testWildcardRO() {
NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod, "* ro");
Assert.assertEquals(AccessPrivilege.READ_ONLY,
matcher.getAccessPrivilege(address1, hostname1));
}
@Test
public void testExactAddressRW() {
NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod, address1
+ " rw");
Assert.assertEquals(AccessPrivilege.READ_WRITE,
matcher.getAccessPrivilege(address1, hostname1));
Assert.assertFalse(AccessPrivilege.READ_WRITE == matcher
.getAccessPrivilege(address2, hostname1));
}
@Test
public void testExactAddressRO() {
NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod, address1);
Assert.assertEquals(AccessPrivilege.READ_ONLY,
matcher.getAccessPrivilege(address1, hostname1));
Assert.assertEquals(AccessPrivilege.NONE,
matcher.getAccessPrivilege(address2, hostname1));
}
@Test
public void testExactHostRW() {
NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod, hostname1
+ " rw");
Assert.assertEquals(AccessPrivilege.READ_WRITE,
matcher.getAccessPrivilege(address1, hostname1));
}
@Test
public void testExactHostRO() {
NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod, hostname1);
Assert.assertEquals(AccessPrivilege.READ_ONLY,
matcher.getAccessPrivilege(address1, hostname1));
}
@Test
public void testCidrShortRW() {
NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod,
"192.168.0.0/22 rw");
Assert.assertEquals(AccessPrivilege.READ_WRITE,
matcher.getAccessPrivilege(address1, hostname1));
Assert.assertEquals(AccessPrivilege.NONE,
matcher.getAccessPrivilege(address2, hostname1));
}
@Test
public void testCidrShortRO() {
NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod,
"192.168.0.0/22");
Assert.assertEquals(AccessPrivilege.READ_ONLY,
matcher.getAccessPrivilege(address1, hostname1));
Assert.assertEquals(AccessPrivilege.NONE,
matcher.getAccessPrivilege(address2, hostname1));
}
@Test
public void testCidrLongRW() {
NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod,
"192.168.0.0/255.255.252.0 rw");
Assert.assertEquals(AccessPrivilege.READ_WRITE,
matcher.getAccessPrivilege(address1, hostname1));
Assert.assertEquals(AccessPrivilege.NONE,
matcher.getAccessPrivilege(address2, hostname1));
}
@Test
public void testCidrLongRO() {
NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod,
"192.168.0.0/255.255.252.0");
Assert.assertEquals(AccessPrivilege.READ_ONLY,
matcher.getAccessPrivilege(address1, hostname1));
Assert.assertEquals(AccessPrivilege.NONE,
matcher.getAccessPrivilege(address2, hostname1));
}
@Test
public void testRegexIPRW() {
NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod,
"192.168.0.[0-9]+ rw");
Assert.assertEquals(AccessPrivilege.READ_WRITE,
matcher.getAccessPrivilege(address1, hostname1));
Assert.assertEquals(AccessPrivilege.NONE,
matcher.getAccessPrivilege(address2, hostname1));
}
@Test
public void testRegexIPRO() {
NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod,
"192.168.0.[0-9]+");
Assert.assertEquals(AccessPrivilege.READ_ONLY,
matcher.getAccessPrivilege(address1, hostname1));
Assert.assertEquals(AccessPrivilege.NONE,
matcher.getAccessPrivilege(address2, hostname1));
}
@Test
public void testRegexHostRW() {
NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod,
"[a-z]+.b.com rw");
Assert.assertEquals(AccessPrivilege.READ_WRITE,
matcher.getAccessPrivilege(address1, hostname1));
// address1 will hit the cache
Assert.assertEquals(AccessPrivilege.READ_WRITE,
matcher.getAccessPrivilege(address1, hostname2));
}
@Test
public void testRegexHostRO() {
NfsExports matcher = new NfsExports(CacheSize, ExpirationPeriod,
"[a-z]+.b.com");
Assert.assertEquals(AccessPrivilege.READ_ONLY,
matcher.getAccessPrivilege(address1, hostname1));
// address1 will hit the cache
Assert.assertEquals(AccessPrivilege.READ_ONLY,
matcher.getAccessPrivilege(address1, hostname2));
}
@Test
public void testMultiMatchers() throws Exception {
long shortExpirationPeriod = 1 * 1000 * 1000 * 1000; // 1s
NfsExports matcher = new NfsExports(CacheSize, shortExpirationPeriod,
"192.168.0.[0-9]+;[a-z]+.b.com rw");
Assert.assertEquals(AccessPrivilege.READ_ONLY,
matcher.getAccessPrivilege(address1, hostname2));
Assert.assertEquals(AccessPrivilege.READ_ONLY,
matcher.getAccessPrivilege(address1, address1));
Assert.assertEquals(AccessPrivilege.READ_ONLY,
matcher.getAccessPrivilege(address1, hostname1));
Assert.assertEquals(AccessPrivilege.READ_WRITE,
matcher.getAccessPrivilege(address2, hostname1));
// address2 will hit the cache
Assert.assertEquals(AccessPrivilege.READ_WRITE,
matcher.getAccessPrivilege(address2, hostname2));
Thread.sleep(1000);
// no cache for address2 now
Assert.assertEquals(AccessPrivilege.NONE,
matcher.getAccessPrivilege(address2, address2));
}
}

View File

@ -78,6 +78,9 @@ Release 2.1.1-beta - UNRELEASED
HDFS-5069 Include hadoop-nfs and hadoop-hdfs-nfs into hadoop dist for
NFS deployment (brandonli)
HDFS-4947 Add NFS server export table to control export by hostname or
IP range (Jing Zhao via brandonli)
IMPROVEMENTS
HDFS-4513. Clarify in the WebHDFS REST API that all JSON respsonses may