HDFS-4762,HDFS-4948. Merging change r1499029,r1499041,r1499152 from trunk to branch-2

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1499190 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Brandon Li 2013-07-03 03:55:42 +00:00
parent b1d5f9519c
commit 07a08e2941
30 changed files with 4614 additions and 7 deletions

View File

@ -50,7 +50,6 @@ abstract public class MountdBase {
/* Start UDP server */
private void startUDPServer() {
rpcProgram.register(PortmapMapping.TRANSPORT_UDP);
SimpleUdpServer udpServer = new SimpleUdpServer(rpcProgram.getPort(),
rpcProgram, 1);
udpServer.run();
@ -58,14 +57,17 @@ abstract public class MountdBase {
/* Start TCP server */
private void startTCPServer() {
rpcProgram.register(PortmapMapping.TRANSPORT_TCP);
SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
rpcProgram, 1);
tcpServer.run();
}
public void start() {
public void start(boolean register) {
startUDPServer();
startTCPServer();
if (register) {
rpcProgram.register(PortmapMapping.TRANSPORT_UDP);
rpcProgram.register(PortmapMapping.TRANSPORT_TCP);
}
}
}

View File

@ -52,10 +52,12 @@ public abstract class Nfs3Base {
this.rpcProgram = program;
}
public void start() {
mountd.start(); // Start mountd
rpcProgram.register(PortmapMapping.TRANSPORT_TCP);
startTCPServer(); // Start TCP server
public void start(boolean register) {
mountd.start(register); // Start mountd
startTCPServer(); // Start TCP server
if (register) {
rpcProgram.register(PortmapMapping.TRANSPORT_TCP);
}
}
private void startTCPServer() {

View File

@ -96,6 +96,22 @@ public class Nfs3FileAttributes {
this.ctime = this.mtime;
}
public Nfs3FileAttributes(Nfs3FileAttributes other) {
this.type = other.getType();
this.mode = other.getMode();
this.nlink = other.getNlink();
this.uid = other.getUid();
this.gid = other.getGid();
this.size = other.getSize();
this.used = other.getUsed();
this.rdev = new Specdata3();
this.fsid = other.getFsid();
this.fileid = other.getFileid();
this.mtime = new NfsTime(other.getMtime());
this.atime = new NfsTime(other.getAtime());
this.ctime = new NfsTime(other.getCtime());
}
public void serialize(XDR xdr) {
xdr.writeInt(type);
xdr.writeInt(mode);

View File

@ -61,6 +61,7 @@ public class READDIR3Response extends NFS3Response {
public DirList3(Entry3[] entries, boolean eof) {
this.entries = ObjectArrays.newArray(entries, entries.length);
System.arraycopy(this.entries, 0, entries, 0, entries.length);
this.eof = eof;
}
}

View File

@ -65,6 +65,7 @@ public class READDIRPLUS3Response extends NFS3Response {
public DirListPlus3(EntryPlus3[] entries, boolean eof) {
this.entries = ObjectArrays.newArray(entries, entries.length);
System.arraycopy(this.entries, 0, entries, 0, entries.length);
this.eof = eof;
}

View File

@ -0,0 +1,4 @@
-----------------------------------------------------------------------------
HDFS-NFS - NFS implementation for Hadoop HDFS
-----------------------------------------------------------------------------

View File

@ -0,0 +1,18 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<FindBugsFilter>
</FindBugsFilter>

View File

@ -0,0 +1,196 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-project-dist</artifactId>
<version>3.0.0-SNAPSHOT</version>
<relativePath>../../hadoop-project-dist</relativePath>
</parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-nfs</artifactId>
<version>3.0.0-SNAPSHOT</version>
<description>Apache Hadoop HDFS-NFS</description>
<name>Apache Hadoop HDFS-NFS</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-nfs</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>3.6.2.Final</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.2</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-daemon</groupId>
<artifactId>commons-daemon</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>xmlenc</groupId>
<artifactId>xmlenc</artifactId>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.mount;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mount.MountdBase;
/**
* Main class for starting mountd daemon. This daemon implements the NFS
* mount protocol. When receiving a MOUNT request from an NFS client, it checks
* the request against the list of currently exported file systems. If the
* client is permitted to mount the file system, rpc.mountd obtains a file
* handle for requested directory and returns it to the client.
*/
public class Mountd extends MountdBase {
/**
* Constructor
* @param exports
* @throws IOException
*/
public Mountd(List<String> exports) throws IOException {
super(exports, new RpcProgramMountd(exports));
}
public Mountd(List<String> exports, Configuration config) throws IOException {
super(exports, new RpcProgramMountd(exports, config));
}
public static void main(String[] args) throws IOException {
List<String> exports = new ArrayList<String>();
exports.add("/");
Mountd mountd = new Mountd(exports);
mountd.start(true);
}
}

View File

@ -0,0 +1,183 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.mount;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.mount.MountEntry;
import org.apache.hadoop.mount.MountInterface;
import org.apache.hadoop.mount.MountResponse;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
import org.apache.hadoop.oncrpc.RpcAcceptedReply;
import org.apache.hadoop.oncrpc.RpcCall;
import org.apache.hadoop.oncrpc.RpcProgram;
import org.apache.hadoop.oncrpc.XDR;
import org.jboss.netty.channel.Channel;
/**
* RPC program corresponding to mountd daemon. See {@link Mountd}.
*/
public class RpcProgramMountd extends RpcProgram implements MountInterface {
private static final Log LOG = LogFactory.getLog(RpcProgramMountd.class);
public static final int PROGRAM = 100005;
public static final int VERSION_1 = 1;
public static final int VERSION_2 = 2;
public static final int VERSION_3 = 3;
public static final int PORT = 4242;
// Need DFSClient for branch-1 to get ExtendedHdfsFileStatus
private final DFSClient dfsClient;
/** Synchronized list */
private final List<MountEntry> mounts;
/** List that is unmodifiable */
private final List<String> exports;
public RpcProgramMountd() throws IOException {
this(new ArrayList<String>(0));
}
public RpcProgramMountd(List<String> exports) throws IOException {
this(exports, new Configuration());
}
public RpcProgramMountd(List<String> exports, Configuration config)
throws IOException {
// Note that RPC cache is not enabled
super("mountd", "localhost", PORT, PROGRAM, VERSION_1, VERSION_3, 0);
this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>());
this.exports = Collections.unmodifiableList(exports);
this.dfsClient = new DFSClient(NameNode.getAddress(config), config);
}
public XDR nullOp(XDR out, int xid, InetAddress client) {
if (LOG.isDebugEnabled()) {
LOG.debug("MOUNT NULLOP : " + " client: " + client);
}
return RpcAcceptedReply.voidReply(out, xid);
}
public XDR mnt(XDR xdr, XDR out, int xid, InetAddress client) {
String path = xdr.readString();
if (LOG.isDebugEnabled()) {
LOG.debug("MOUNT MNT path: " + path + " client: " + client);
}
String host = client.getHostName();
if (LOG.isDebugEnabled()) {
LOG.debug("Got host: " + host + " path: " + path);
}
if (!exports.contains(path)) {
LOG.info("Path " + path + " is not shared.");
MountResponse.writeMNTResponse(Nfs3Status.NFS3ERR_NOENT, out, xid, null);
return out;
}
FileHandle handle = null;
try {
HdfsFileStatus exFileStatus = dfsClient.getFileInfo(path);
handle = new FileHandle(exFileStatus.getFileId());
} catch (IOException e) {
LOG.error("Can't get handle for export:" + path + ", exception:" + e);
MountResponse.writeMNTResponse(Nfs3Status.NFS3ERR_NOENT, out, xid, null);
return out;
}
assert (handle != null);
LOG.info("Giving handle (fileId:" + handle.getFileId()
+ ") to client for export " + path);
mounts.add(new MountEntry(host, path));
MountResponse.writeMNTResponse(Nfs3Status.NFS3_OK, out, xid,
handle.getContent());
return out;
}
public XDR dump(XDR out, int xid, InetAddress client) {
if (LOG.isDebugEnabled()) {
LOG.debug("MOUNT NULLOP : " + " client: " + client);
}
List<MountEntry> copy = new ArrayList<MountEntry>(mounts);
MountResponse.writeMountList(out, xid, copy);
return out;
}
public XDR umnt(XDR xdr, XDR out, int xid, InetAddress client) {
String path = xdr.readString();
if (LOG.isDebugEnabled()) {
LOG.debug("MOUNT UMNT path: " + path + " client: " + client);
}
String host = client.getHostName();
mounts.remove(new MountEntry(host, path));
RpcAcceptedReply.voidReply(out, xid);
return out;
}
public XDR umntall(XDR out, int xid, InetAddress client) {
if (LOG.isDebugEnabled()) {
LOG.debug("MOUNT UMNTALL : " + " client: " + client);
}
mounts.clear();
return RpcAcceptedReply.voidReply(out, xid);
}
@Override
public XDR handleInternal(RpcCall rpcCall, XDR xdr, XDR out,
InetAddress client, Channel channel) {
int procedure = rpcCall.getProcedure();
int xid = rpcCall.getXid();
if (procedure == MNTPROC_NULL) {
out = nullOp(out, xid, client);
} else if (procedure == MNTPROC_MNT) {
out = mnt(xdr, out, xid, client);
} else if (procedure == MNTPROC_DUMP) {
out = dump(out, xid, client);
} else if (procedure == MNTPROC_UMNT) {
out = umnt(xdr, out, xid, client);
} else if (procedure == MNTPROC_UMNTALL) {
umntall(out, xid, client);
} else if (procedure == MNTPROC_EXPORT) {
out = MountResponse.writeExportList(out, xid, exports);
} else {
// Invalid procedure
RpcAcceptedReply.voidReply(out, xid,
RpcAcceptedReply.AcceptState.PROC_UNAVAIL); }
return out;
}
@Override
protected boolean isIdempotent(RpcCall call) {
// Not required, because cache is turned off
return false;
}
}

View File

@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.nfs3;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* This class is a thread pool to easily schedule async data operations.Current
* async data operation is write back operation. In the future, we could use it
* for readahead operations too.
*/
public class AsyncDataService {
static final Log LOG = LogFactory.getLog(AsyncDataService.class);
// ThreadPool core pool size
private static final int CORE_THREADS_PER_VOLUME = 1;
// ThreadPool maximum pool size
private static final int MAXIMUM_THREADS_PER_VOLUME = 4;
// ThreadPool keep-alive time for threads over core pool size
private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
private final ThreadGroup threadGroup = new ThreadGroup("async data service");
private ThreadFactory threadFactory = null;
private ThreadPoolExecutor executor = null;
public AsyncDataService() {
threadFactory = new ThreadFactory() {
public Thread newThread(Runnable r) {
return new Thread(threadGroup, r);
}
};
executor = new ThreadPoolExecutor(CORE_THREADS_PER_VOLUME,
MAXIMUM_THREADS_PER_VOLUME, THREADS_KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
// This can reduce the number of running threads
executor.allowCoreThreadTimeOut(true);
}
/**
* Execute the task sometime in the future.
*/
synchronized void execute(Runnable task) {
if (executor == null) {
throw new RuntimeException("AsyncDataService is already shutdown");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Current active thread number: " + executor.getActiveCount()
+ " queue size:" + executor.getQueue().size()
+ " scheduled task number:" + executor.getTaskCount());
}
executor.execute(task);
}
/**
* Gracefully shut down the ThreadPool. Will wait for all data tasks to
* finish.
*/
synchronized void shutdown() {
if (executor == null) {
LOG.warn("AsyncDataService has already shut down.");
} else {
LOG.info("Shutting down all async data service threads...");
executor.shutdown();
// clear the executor so that calling execute again will fail.
executor = null;
LOG.info("All async data service threads have been shut down");
}
}
/**
* Write the data to HDFS asynchronously
*/
void writeAsync(OpenFileCtx openFileCtx) {
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling write back task for fileId: "
+ openFileCtx.copyLatestAttr().getFileId());
}
WriteBackTask wbTask = new WriteBackTask(openFileCtx);
execute(wbTask);
}
/**
* A task for write data back to HDFS for a file. Since only one thread can
* write for a file, any time there should be only one task(in queue or
* executing) for one file existing, and this should be guaranteed by the
* caller.
*/
static class WriteBackTask implements Runnable {
OpenFileCtx openFileCtx;
WriteBackTask(OpenFileCtx openFileCtx) {
this.openFileCtx = openFileCtx;
}
OpenFileCtx getOpenFileCtx() {
return openFileCtx;
}
@Override
public String toString() {
// Called in AsyncDataService.execute for displaying error messages.
return "write back data for fileId"
+ openFileCtx.copyLatestAttr().getFileId() + " with nextOffset "
+ openFileCtx.getNextOffset();
}
public void run() {
try {
openFileCtx.executeWriteBack();
} catch (Throwable t) {
LOG.error("Asyn data service got error:"
+ ExceptionUtils.getFullStackTrace(t));
}
}
}
}

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.nfs3;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.security.UserGroupInformation;
/**
* A cache saves DFSClient objects for different users
*/
public class DFSClientCache {
static final Log LOG = LogFactory.getLog(DFSClientCache.class);
private final LruCache<String, DFSClient> lruTable;
private final Configuration config;
public DFSClientCache(Configuration config) {
// By default, keep 256 DFSClient instance for 256 active users
this(config, 256);
}
public DFSClientCache(Configuration config, int size) {
lruTable = new LruCache<String, DFSClient>(size);
this.config = config;
}
public void put(String uname, DFSClient client) {
lruTable.put(uname, client);
}
synchronized public DFSClient get(String uname) {
DFSClient client = lruTable.get(uname);
if (client != null) {
return client;
}
// Not in table, create one.
try {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(uname);
client = ugi.doAs(new PrivilegedExceptionAction<DFSClient>() {
public DFSClient run() throws IOException {
return new DFSClient(NameNode.getAddress(config), config);
}
});
} catch (IOException e) {
LOG.error("Create DFSClient failed for user:" + uname);
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
// Add new entry
lruTable.put(uname, client);
return client;
}
public int usedSize() {
return lruTable.usedSize();
}
public boolean containsKey(String key) {
return lruTable.containsKey(key);
}
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.nfs3;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* A thread-safe LRU table.
*/
public class LruCache<K, V> {
private final int maxSize;
private final LinkedHashMap<K, V> map;
private static final float hashTableLoadFactor = 0.75f;
public LruCache(int maxSize) {
this.maxSize = maxSize;
int hashTableCapacity = (int) Math.ceil(maxSize / hashTableLoadFactor) + 1;
map = new LinkedHashMap<K, V>(hashTableCapacity, hashTableLoadFactor, true) {
private static final long serialVersionUID = 1L;
@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
return size() > LruCache.this.maxSize;
}
};
}
// The found entry becomes the most recently used.
public synchronized V get(K key) {
return map.get(key);
}
public synchronized void put(K key, V value) {
map.put(key, value);
}
public synchronized int usedSize() {
return map.size();
}
public synchronized boolean containsKey(K key) {
return map.containsKey(key);
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.nfs3;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.nfs.mount.Mountd;
import org.apache.hadoop.nfs.nfs3.Nfs3Base;
import org.apache.hadoop.util.StringUtils;
/**
* Nfs server. Supports NFS v3 using {@link RpcProgramNfs3}.
* Currently Mountd program is also started inside this class.
* Only TCP server is supported and UDP is not supported.
*/
public class Nfs3 extends Nfs3Base {
public Nfs3(List<String> exports) throws IOException {
super(new Mountd(exports), new RpcProgramNfs3(exports));
}
public Nfs3(List<String> exports, Configuration config) throws IOException {
super(new Mountd(exports, config), new RpcProgramNfs3(exports, config));
}
public static void main(String[] args) throws IOException {
StringUtils.startupShutdownMessage(Nfs3.class, args, LOG);
List<String> exports = new ArrayList<String>();
exports.add("/");
final Nfs3 nfsServer = new Nfs3(exports);
nfsServer.start(true);
}
}

View File

@ -0,0 +1,166 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.nfs3;
import java.io.IOException;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.nfs.NfsFileType;
import org.apache.hadoop.nfs.NfsTime;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
import org.apache.hadoop.nfs.nfs3.response.WccAttr;
import org.apache.hadoop.nfs.nfs3.response.WccData;
import org.apache.hadoop.oncrpc.XDR;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
/**
* Utility/helper methods related to NFS
*/
public class Nfs3Utils {
public final static String INODEID_PATH_PREFIX = "/.reserved/.inodes/";
public static String getFileIdPath(FileHandle handle) {
return getFileIdPath(handle.getFileId());
}
public static String getFileIdPath(long fileId) {
return INODEID_PATH_PREFIX + fileId;
}
public static HdfsFileStatus getFileStatus(DFSClient client, String fileIdPath)
throws IOException {
return client.getFileInfo(fileIdPath);
}
public static Nfs3FileAttributes getNfs3FileAttrFromFileStatus(
HdfsFileStatus fs, IdUserGroup iug) {
/**
* Some 32bit Linux client has problem with 64bit fileId: it seems the 32bit
* client takes only the lower 32bit of the fileId and treats it as signed
* int. When the 32th bit is 1, the client considers it invalid.
*/
return new Nfs3FileAttributes(fs.isDir(), fs.getChildrenNum(), fs
.getPermission().toShort(), iug.getUidAllowingUnknown(fs.getOwner()),
iug.getGidAllowingUnknown(fs.getGroup()), fs.getLen(), 0 /* fsid */,
fs.getFileId(), fs.getModificationTime(), fs.getAccessTime());
}
public static Nfs3FileAttributes getFileAttr(DFSClient client,
String fileIdPath, IdUserGroup iug) throws IOException {
HdfsFileStatus fs = getFileStatus(client, fileIdPath);
return fs == null ? null : getNfs3FileAttrFromFileStatus(fs, iug);
}
public static WccAttr getWccAttr(DFSClient client, String fileIdPath)
throws IOException {
HdfsFileStatus fstat = getFileStatus(client, fileIdPath);
if (fstat == null) {
return null;
}
long size = fstat.isDir() ? Nfs3FileAttributes.getDirSize(fstat
.getChildrenNum()) : fstat.getLen();
return new WccAttr(size, new NfsTime(fstat.getModificationTime()),
new NfsTime(fstat.getModificationTime()));
}
public static WccAttr getWccAttr(Nfs3FileAttributes attr) {
return new WccAttr(attr.getSize(), attr.getMtime(), attr.getCtime());
}
public static WccData createWccData(final WccAttr preOpAttr,
DFSClient dfsClient, final String fileIdPath, final IdUserGroup iug)
throws IOException {
Nfs3FileAttributes postOpDirAttr = getFileAttr(dfsClient, fileIdPath, iug);
return new WccData(preOpAttr, postOpDirAttr);
}
/**
* Send a write response to the netty network socket channel
*/
public static void writeChannel(Channel channel, XDR out) {
ChannelBuffer outBuf = XDR.writeMessageTcp(out, true);
channel.write(outBuf);
}
private static boolean isSet(int access, int bits) {
return (access & bits) == bits;
}
public static int getAccessRights(int mode, int type) {
int rtn = 0;
if (isSet(mode, Nfs3Constant.ACCESS_MODE_READ)) {
rtn |= Nfs3Constant.ACCESS3_READ;
// LOOKUP is only meaningful for dir
if (type == NfsFileType.NFSDIR.toValue()) {
rtn |= Nfs3Constant.ACCESS3_LOOKUP;
}
}
if (isSet(mode, Nfs3Constant.ACCESS_MODE_WRITE)) {
rtn |= Nfs3Constant.ACCESS3_MODIFY;
rtn |= Nfs3Constant.ACCESS3_EXTEND;
// Set delete bit, UNIX may ignore it for regular file since it's up to
// parent dir op permission
rtn |= Nfs3Constant.ACCESS3_DELETE;
}
if (isSet(mode, Nfs3Constant.ACCESS_MODE_EXECUTE)) {
if (type == NfsFileType.NFSREG.toValue()) {
rtn |= Nfs3Constant.ACCESS3_EXECUTE;
}
}
return rtn;
}
public static int getAccessRightsForUserGroup(int uid, int gid,
Nfs3FileAttributes attr) {
int mode = attr.getMode();
if (uid == attr.getUid()) {
return getAccessRights(mode >> 6, attr.getType());
}
if (gid == attr.getGid()) {
return getAccessRights(mode >> 3, attr.getType());
}
return getAccessRights(mode, attr.getType());
}
public static long bytesToLong(byte[] data) {
long n = 0xffL & data[0];
for (int i = 1; i < 8; i++) {
n = (n << 8) | (0xffL & data[i]);
}
return n;
}
public static byte[] longToByte(long v) {
byte[] data = new byte[8];
data[0] = (byte) (v >>> 56);
data[1] = (byte) (v >>> 48);
data[2] = (byte) (v >>> 40);
data[3] = (byte) (v >>> 32);
data[4] = (byte) (v >>> 24);
data[5] = (byte) (v >>> 16);
data[6] = (byte) (v >>> 8);
data[7] = (byte) (v >>> 0);
return data;
}
}

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.nfs3;
/**
* OffsetRange is the range of read/write request. A single point (e.g.,[5,5])
* is not a valid range.
*/
public class OffsetRange implements Comparable<OffsetRange> {
private final long min;
private final long max;
OffsetRange(long min, long max) {
if ((min >= max) || (min < 0) || (max < 0)) {
throw new IllegalArgumentException("Wrong offset range: (" + min + ","
+ max + ")");
}
this.min = min;
this.max = max;
}
long getMin() {
return min;
}
long getMax() {
return max;
}
@Override
public int hashCode() {
return (int) (min ^ max);
}
@Override
public boolean equals(Object o) {
assert (o instanceof OffsetRange);
OffsetRange range = (OffsetRange) o;
return (min == range.getMin()) && (max == range.getMax());
}
private static int compareTo(long left, long right) {
if (left < right) {
return -1;
} else if (left > right) {
return 1;
} else {
return 0;
}
}
@Override
public int compareTo(OffsetRange other) {
final int d = compareTo(min, other.getMin());
return d != 0 ? d : compareTo(max, other.getMax());
}
}

View File

@ -0,0 +1,775 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.nfs3;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.security.InvalidParameterException;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
import org.apache.hadoop.io.BytesWritable.Comparator;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
import org.apache.hadoop.nfs.nfs3.response.WccAttr;
import org.apache.hadoop.nfs.nfs3.response.WccData;
import org.apache.hadoop.oncrpc.XDR;
import org.jboss.netty.channel.Channel;
/**
* OpenFileCtx saves the context of one HDFS file output stream. Access to it is
* synchronized by its member lock.
*/
class OpenFileCtx {
public static final Log LOG = LogFactory.getLog(OpenFileCtx.class);
/**
* Lock to synchronize OpenFileCtx changes. Thread should get this lock before
* any read/write operation to an OpenFileCtx object
*/
private final ReentrantLock ctxLock;
// The stream status. False means the stream is closed.
private boolean activeState;
// The stream write-back status. True means one thread is doing write back.
private boolean asyncStatus;
private final FSDataOutputStream fos;
private final Nfs3FileAttributes latestAttr;
private long nextOffset;
private final SortedMap<OffsetRange, WriteCtx> pendingWrites;
// The last write, commit request or write-back event. Updating time to keep
// output steam alive.
private long lastAccessTime;
// Pending writes water mark for dump, 1MB
private static int DUMP_WRITE_WATER_MARK = 1024 * 1024;
private FileOutputStream dumpOut;
private long nonSequentialWriteInMemory;
private boolean enabledDump;
private RandomAccessFile raf;
private final String dumpFilePath;
private void updateLastAccessTime() {
lastAccessTime = System.currentTimeMillis();
}
private boolean checkStreamTimeout(long streamTimeout) {
return System.currentTimeMillis() - lastAccessTime > streamTimeout;
}
// Increase or decrease the memory occupation of non-sequential writes
private long updateNonSequentialWriteInMemory(long count) {
nonSequentialWriteInMemory += count;
if (LOG.isDebugEnabled()) {
LOG.debug("Update nonSequentialWriteInMemory by " + count + " new value:"
+ nonSequentialWriteInMemory);
}
if (nonSequentialWriteInMemory < 0) {
LOG.error("nonSequentialWriteInMemory is negative after update with count "
+ count);
throw new IllegalArgumentException(
"nonSequentialWriteInMemory is negative after update with count "
+ count);
}
return nonSequentialWriteInMemory;
}
OpenFileCtx(FSDataOutputStream fos, Nfs3FileAttributes latestAttr,
String dumpFilePath) {
this.fos = fos;
this.latestAttr = latestAttr;
pendingWrites = new TreeMap<OffsetRange, WriteCtx>();
updateLastAccessTime();
activeState = true;
asyncStatus = false;
dumpOut = null;
raf = null;
nonSequentialWriteInMemory = 0;
this.dumpFilePath = dumpFilePath;
enabledDump = dumpFilePath == null ? false: true;
ctxLock = new ReentrantLock(true);
}
private void lockCtx() {
if (LOG.isTraceEnabled()) {
StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace();
StackTraceElement e = stacktrace[2];
String methodName = e.getMethodName();
LOG.trace("lock ctx, caller:" + methodName);
}
ctxLock.lock();
}
private void unlockCtx() {
ctxLock.unlock();
if (LOG.isTraceEnabled()) {
StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace();
StackTraceElement e = stacktrace[2];
String methodName = e.getMethodName();
LOG.info("unlock ctx, caller:" + methodName);
}
}
// Make a copy of the latestAttr
public Nfs3FileAttributes copyLatestAttr() {
Nfs3FileAttributes ret;
lockCtx();
try {
ret = new Nfs3FileAttributes(latestAttr);
} finally {
unlockCtx();
}
return ret;
}
private long getNextOffsetUnprotected() {
assert(ctxLock.isLocked());
return nextOffset;
}
public long getNextOffset() {
long ret;
lockCtx();
try {
ret = getNextOffsetUnprotected();
} finally {
unlockCtx();
}
return ret;
}
// Get flushed offset. Note that flushed data may not be persisted.
private long getFlushedOffset() {
return fos.getPos();
}
// Check if need to dump the new writes
private void checkDump(long count) {
assert (ctxLock.isLocked());
// Always update the in memory count
updateNonSequentialWriteInMemory(count);
if (!enabledDump) {
if (LOG.isDebugEnabled()) {
LOG.debug("Do nothing, dump is disabled.");
}
return;
}
if (nonSequentialWriteInMemory < DUMP_WRITE_WATER_MARK) {
return;
}
// Create dump outputstream for the first time
if (dumpOut == null) {
LOG.info("Create dump file:" + dumpFilePath);
File dumpFile = new File(dumpFilePath);
try {
if (dumpFile.exists()) {
LOG.fatal("The dump file should not exist:" + dumpFilePath);
throw new RuntimeException("The dump file should not exist:"
+ dumpFilePath);
}
dumpOut = new FileOutputStream(dumpFile);
} catch (IOException e) {
LOG.error("Got failure when creating dump stream " + dumpFilePath
+ " with error:" + e);
enabledDump = false;
IOUtils.cleanup(LOG, dumpOut);
return;
}
}
// Get raf for the first dump
if (raf == null) {
try {
raf = new RandomAccessFile(dumpFilePath, "r");
} catch (FileNotFoundException e) {
LOG.error("Can't get random access to file " + dumpFilePath);
// Disable dump
enabledDump = false;
return;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Start dump, current write number:" + pendingWrites.size());
}
Iterator<OffsetRange> it = pendingWrites.keySet().iterator();
while (it.hasNext()) {
OffsetRange key = it.next();
WriteCtx writeCtx = pendingWrites.get(key);
try {
long dumpedDataSize = writeCtx.dumpData(dumpOut, raf);
if (dumpedDataSize > 0) {
updateNonSequentialWriteInMemory(-dumpedDataSize);
}
} catch (IOException e) {
LOG.error("Dump data failed:" + writeCtx + " with error:" + e);
// Disable dump
enabledDump = false;
return;
}
}
if (nonSequentialWriteInMemory != 0) {
LOG.fatal("After dump, nonSequentialWriteInMemory is not zero: "
+ nonSequentialWriteInMemory);
throw new RuntimeException(
"After dump, nonSequentialWriteInMemory is not zero: "
+ nonSequentialWriteInMemory);
}
}
private WriteCtx checkRepeatedWriteRequest(WRITE3Request request,
Channel channel, int xid) {
OffsetRange range = new OffsetRange(request.getOffset(),
request.getOffset() + request.getCount());
WriteCtx writeCtx = pendingWrites.get(range);
if (writeCtx== null) {
return null;
} else {
if (xid != writeCtx.getXid()) {
LOG.warn("Got a repeated request, same range, with a different xid:"
+ xid + " xid in old request:" + writeCtx.getXid());
//TODO: better handling.
}
return writeCtx;
}
}
public void receivedNewWrite(DFSClient dfsClient, WRITE3Request request,
Channel channel, int xid, AsyncDataService asyncDataService,
IdUserGroup iug) {
lockCtx();
try {
if (!activeState) {
LOG.info("OpenFileCtx is inactive, fileId:"
+ request.getHandle().getFileId());
WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
} else {
// Handle repeated write requests(same xid or not).
// If already replied, send reply again. If not replied, drop the
// repeated request.
WriteCtx existantWriteCtx = checkRepeatedWriteRequest(request, channel,
xid);
if (existantWriteCtx != null) {
if (!existantWriteCtx.getReplied()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Repeated write request which hasn't be served: xid="
+ xid + ", drop it.");
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Repeated write request which is already served: xid="
+ xid + ", resend response.");
}
WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
fileWcc, request.getCount(), request.getStableHow(),
Nfs3Constant.WRITE_COMMIT_VERF);
Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
}
updateLastAccessTime();
} else {
receivedNewWriteInternal(dfsClient, request, channel, xid,
asyncDataService, iug);
}
}
} finally {
unlockCtx();
}
}
private void receivedNewWriteInternal(DFSClient dfsClient,
WRITE3Request request, Channel channel, int xid,
AsyncDataService asyncDataService, IdUserGroup iug) {
long offset = request.getOffset();
int count = request.getCount();
WriteStableHow stableHow = request.getStableHow();
// Get file length, fail non-append call
WccAttr preOpAttr = latestAttr.getWccAttr();
if (LOG.isDebugEnabled()) {
LOG.debug("requesed offset=" + offset + " and current filesize="
+ preOpAttr.getSize());
}
long nextOffset = getNextOffsetUnprotected();
if (offset == nextOffset) {
LOG.info("Add to the list, update nextOffset and notify the writer,"
+ " nextOffset:" + nextOffset);
WriteCtx writeCtx = new WriteCtx(request.getHandle(),
request.getOffset(), request.getCount(), request.getStableHow(),
request.getData().array(), channel, xid, false, WriteCtx.NO_DUMP);
addWrite(writeCtx);
// Create an async task and change openFileCtx status to indicate async
// task pending
if (!asyncStatus) {
asyncStatus = true;
asyncDataService.execute(new AsyncDataService.WriteBackTask(this));
}
// Update the write time first
updateLastAccessTime();
Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr);
// Send response immediately for unstable write
if (request.getStableHow() == WriteStableHow.UNSTABLE) {
WccData fileWcc = new WccData(preOpAttr, postOpAttr);
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
writeCtx.setReplied(true);
}
} else if (offset > nextOffset) {
LOG.info("Add new write to the list but not update nextOffset:"
+ nextOffset);
WriteCtx writeCtx = new WriteCtx(request.getHandle(),
request.getOffset(), request.getCount(), request.getStableHow(),
request.getData().array(), channel, xid, false, WriteCtx.ALLOW_DUMP);
addWrite(writeCtx);
// Check if need to dump some pending requests to file
checkDump(request.getCount());
updateLastAccessTime();
Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr);
// In test, noticed some Linux client sends a batch (e.g., 1MB)
// of reordered writes and won't send more writes until it gets
// responses of the previous batch. So here send response immediately for
// unstable non-sequential write
if (request.getStableHow() == WriteStableHow.UNSTABLE) {
WccData fileWcc = new WccData(preOpAttr, postOpAttr);
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
writeCtx.setReplied(true);
}
} else {
// offset < nextOffset
LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + ","
+ nextOffset + ")");
WccData wccData = new WccData(preOpAttr, null);
WRITE3Response response;
if (offset + count > nextOffset) {
LOG.warn("Haven't noticed any partial overwrite out of a sequential file"
+ "write requests, so treat it as a real random write, no support.");
response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
WriteStableHow.UNSTABLE, 0);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Process perfectOverWrite");
}
response = processPerfectOverWrite(dfsClient, offset, count, stableHow,
request.getData().array(),
Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug);
}
updateLastAccessTime();
Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
}
}
/**
* Honor 2 kinds of overwrites: 1). support some application like touch(write
* the same content back to change mtime), 2) client somehow sends the same
* write again in a different RPC.
*/
private WRITE3Response processPerfectOverWrite(DFSClient dfsClient,
long offset, int count, WriteStableHow stableHow, byte[] data,
String path, WccData wccData, IdUserGroup iug) {
assert (ctxLock.isLocked());
WRITE3Response response = null;
// Read the content back
byte[] readbuffer = new byte[count];
int readCount = 0;
FSDataInputStream fis = null;
try {
// Sync file data and length to avoid partial read failure
((HdfsDataOutputStream) fos).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
fis = new FSDataInputStream(dfsClient.open(path));
readCount = fis.read(offset, readbuffer, 0, count);
if (readCount < count) {
LOG.error("Can't read back " + count + " bytes, partial read size:"
+ readCount);
return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0,
stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
}
} catch (IOException e) {
LOG.info("Read failed when processing possible perfect overwrite, path="
+ path + " error:" + e);
return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0,
stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
} finally {
IOUtils.cleanup(LOG, fis);
}
// Compare with the request
Comparator comparator = new Comparator();
if (comparator.compare(readbuffer, 0, readCount, data, 0, count) != 0) {
LOG.info("Perfect overwrite has different content");
response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
stableHow, 0);
} else {
LOG.info("Perfect overwrite has same content,"
+ " updating the mtime, then return success");
Nfs3FileAttributes postOpAttr = null;
try {
dfsClient.setTimes(path, System.currentTimeMillis(), -1);
postOpAttr = Nfs3Utils.getFileAttr(dfsClient, path, iug);
} catch (IOException e) {
LOG.info("Got error when processing perfect overwrite, path=" + path
+ " error:" + e);
return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
0);
}
wccData.setPostOpAttr(postOpAttr);
response = new WRITE3Response(Nfs3Status.NFS3_OK, wccData, count,
stableHow, 0);
}
return response;
}
public final static int COMMIT_FINISHED = 0;
public final static int COMMIT_WAIT = 1;
public final static int COMMIT_INACTIVE_CTX = 2;
public final static int COMMIT_ERROR = 3;
/**
* return one commit status: COMMIT_FINISHED, COMMIT_WAIT,
* COMMIT_INACTIVE_CTX, COMMIT_ERROR
*/
public int checkCommit(long commitOffset) {
int ret = COMMIT_WAIT;
lockCtx();
try {
if (!activeState) {
ret = COMMIT_INACTIVE_CTX;
} else {
ret = checkCommitInternal(commitOffset);
}
} finally {
unlockCtx();
}
return ret;
}
private int checkCommitInternal(long commitOffset) {
if (commitOffset == 0) {
// Commit whole file
commitOffset = getNextOffsetUnprotected();
}
long flushed = getFlushedOffset();
LOG.info("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
if (flushed < commitOffset) {
// Keep stream active
updateLastAccessTime();
return COMMIT_WAIT;
}
int ret = COMMIT_WAIT;
try {
// Sync file data and length
((HdfsDataOutputStream) fos).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
// Nothing to do for metadata since attr related change is pass-through
ret = COMMIT_FINISHED;
} catch (IOException e) {
LOG.error("Got stream error during data sync:" + e);
// Do nothing. Stream will be closed eventually by StreamMonitor.
ret = COMMIT_ERROR;
}
// Keep stream active
updateLastAccessTime();
return ret;
}
private void addWrite(WriteCtx writeCtx) {
assert (ctxLock.isLocked());
long offset = writeCtx.getOffset();
int count = writeCtx.getCount();
pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx);
}
/**
* Check stream status to decide if it should be closed
* @return true, remove stream; false, keep stream
*/
public boolean streamCleanup(long fileId, long streamTimeout) {
if (streamTimeout < WriteManager.MINIMIUM_STREAM_TIMEOUT) {
throw new InvalidParameterException("StreamTimeout" + streamTimeout
+ "ms is less than MINIMIUM_STREAM_TIMEOUT "
+ WriteManager.MINIMIUM_STREAM_TIMEOUT + "ms");
}
boolean flag = false;
if (!ctxLock.tryLock()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Another thread is working on it" + ctxLock.toString());
}
return flag;
}
try {
// Check the stream timeout
if (checkStreamTimeout(streamTimeout)) {
LOG.info("closing stream for fileId:" + fileId);
cleanup();
flag = true;
}
} finally {
unlockCtx();
}
return flag;
}
// Invoked by AsynDataService to do the write back
public void executeWriteBack() {
long nextOffset;
OffsetRange key;
WriteCtx writeCtx;
try {
// Don't lock OpenFileCtx for all writes to reduce the timeout of other
// client request to the same file
while (true) {
lockCtx();
if (!asyncStatus) {
// This should never happen. There should be only one thread working
// on one OpenFileCtx anytime.
LOG.fatal("The openFileCtx has false async status");
throw new RuntimeException("The openFileCtx has false async status");
}
// Any single write failure can change activeState to false, so do the
// check each loop.
if (pendingWrites.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("The asyn write task has no pendding writes, fileId: "
+ latestAttr.getFileId());
}
break;
}
if (!activeState) {
if (LOG.isDebugEnabled()) {
LOG.debug("The openFileCtx is not active anymore, fileId: "
+ latestAttr.getFileId());
}
break;
}
// Get the next sequential write
nextOffset = getNextOffsetUnprotected();
key = pendingWrites.firstKey();
if (LOG.isTraceEnabled()) {
LOG.trace("key.getMin()=" + key.getMin() + " nextOffset="
+ nextOffset);
}
if (key.getMin() > nextOffset) {
if (LOG.isDebugEnabled()) {
LOG.info("The next sequencial write has not arrived yet");
}
break;
} else if (key.getMin() < nextOffset && key.getMax() > nextOffset) {
// Can't handle overlapping write. Didn't see it in tests yet.
LOG.fatal("Got a overlapping write (" + key.getMin() + ","
+ key.getMax() + "), nextOffset=" + nextOffset);
throw new RuntimeException("Got a overlapping write (" + key.getMin()
+ "," + key.getMax() + "), nextOffset=" + nextOffset);
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Remove write(" + key.getMin() + "-" + key.getMax()
+ ") from the list");
}
writeCtx = pendingWrites.remove(key);
// Do the write
doSingleWrite(writeCtx);
updateLastAccessTime();
}
unlockCtx();
}
} finally {
// Always reset the async status so another async task can be created
// for this file
asyncStatus = false;
if (ctxLock.isHeldByCurrentThread()) {
unlockCtx();
}
}
}
private void doSingleWrite(final WriteCtx writeCtx) {
assert(ctxLock.isLocked());
Channel channel = writeCtx.getChannel();
int xid = writeCtx.getXid();
long offset = writeCtx.getOffset();
int count = writeCtx.getCount();
WriteStableHow stableHow = writeCtx.getStableHow();
byte[] data = null;
try {
data = writeCtx.getData();
} catch (IOException e1) {
LOG.error("Failed to get request data offset:" + offset + " count:"
+ count + " error:" + e1);
// Cleanup everything
cleanup();
return;
}
assert (data.length == count);
FileHandle handle = writeCtx.getHandle();
LOG.info("do write, fileId: " + handle.getFileId() + " offset: " + offset
+ " length:" + count + " stableHow:" + stableHow.getValue());
try {
fos.write(data, 0, count);
if (fos.getPos() != (offset + count)) {
throw new IOException("output stream is out of sync, pos="
+ fos.getPos() + " and nextOffset should be" + (offset + count));
}
nextOffset = fos.getPos();
// Reduce memory occupation size if request was allowed dumped
if (writeCtx.getDataState() == WriteCtx.ALLOW_DUMP) {
updateNonSequentialWriteInMemory(-count);
}
if (!writeCtx.getReplied()) {
WccAttr preOpAttr = latestAttr.getWccAttr();
WccData fileWcc = new WccData(preOpAttr, latestAttr);
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
}
} catch (IOException e) {
LOG.error("Error writing to fileId " + handle.getFileId() + " at offset "
+ offset + " and length " + data.length, e);
if (!writeCtx.getReplied()) {
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
// Keep stream open. Either client retries or SteamMonitor closes it.
}
LOG.info("Clean up open file context for fileId: "
+ latestAttr.getFileid());
cleanup();
}
}
private void cleanup() {
assert(ctxLock.isLocked());
activeState = false;
// Close stream
try {
if (fos != null) {
fos.close();
}
} catch (IOException e) {
LOG.info("Can't close stream for fileId:" + latestAttr.getFileid()
+ ", error:" + e);
}
// Reply error for pending writes
LOG.info("There are " + pendingWrites.size() + " pending writes.");
WccAttr preOpAttr = latestAttr.getWccAttr();
while (!pendingWrites.isEmpty()) {
OffsetRange key = pendingWrites.firstKey();
LOG.info("Fail pending write: (" + key.getMin() + "," + key.getMax()
+ "), nextOffset=" + getNextOffsetUnprotected());
WriteCtx writeCtx = pendingWrites.remove(key);
if (!writeCtx.getReplied()) {
WccData fileWcc = new WccData(preOpAttr, latestAttr);
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
fileWcc, 0, writeCtx.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
Nfs3Utils.writeChannel(writeCtx.getChannel(),
response.send(new XDR(), writeCtx.getXid()));
}
}
// Cleanup dump file
if (dumpOut!=null){
try {
dumpOut.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (raf!=null) {
try {
raf.close();
} catch (IOException e) {
e.printStackTrace();
}
}
File dumpFile = new File(dumpFilePath);
if (dumpFile.delete()) {
LOG.error("Failed to delete dumpfile: "+ dumpFile);
}
}
}

View File

@ -0,0 +1,162 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.nfs3;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
import org.jboss.netty.channel.Channel;
/**
* WriteCtx saves the context of one write request, such as request, channel,
* xid and reply status.
*/
class WriteCtx {
public static final Log LOG = LogFactory.getLog(WriteCtx.class);
private final FileHandle handle;
private final long offset;
private final int count;
private final WriteStableHow stableHow;
private byte[] data;
private final Channel channel;
private final int xid;
private boolean replied;
/**
* In memory write data has 3 states. ALLOW_DUMP: not sequential write, still
* wait for prerequisit writes. NO_DUMP: sequential write, no need to dump
* since it will be written to HDFS soon. DUMPED: already dumped to a file.
*/
public final static int ALLOW_DUMP = 0;
public final static int NO_DUMP = 1;
public final static int DUMPED = 2;
private int dataState;
public int getDataState() {
return dataState;
}
public void setDataState(int dataState) {
this.dataState = dataState;
}
private RandomAccessFile raf;
private long dumpFileOffset;
// Return the dumped data size
public long dumpData(FileOutputStream dumpOut, RandomAccessFile raf)
throws IOException {
if (dataState != ALLOW_DUMP) {
if (LOG.isTraceEnabled()) {
LOG.trace("No need to dump with status(replied,dataState):" + "("
+ replied + "," + dataState + ")");
}
return 0;
}
this.raf = raf;
dumpFileOffset = dumpOut.getChannel().position();
dumpOut.write(data, 0, count);
if (LOG.isDebugEnabled()) {
LOG.debug("After dump, new dumpFileOffset:" + dumpFileOffset);
}
data = null;
dataState = DUMPED;
return count;
}
public FileHandle getHandle() {
return handle;
}
public long getOffset() {
return offset;
}
public int getCount() {
return count;
}
public WriteStableHow getStableHow() {
return stableHow;
}
public byte[] getData() throws IOException {
if (dataState != DUMPED) {
if (data == null) {
throw new IOException("Data is not dumpted but has null:" + this);
}
} else {
// read back
if (data != null) {
throw new IOException("Data is dumpted but not null");
}
data = new byte[count];
raf.seek(dumpFileOffset);
int size = raf.read(data, 0, count);
if (size != count) {
throw new IOException("Data count is " + count + ", but read back "
+ size + "bytes");
}
}
return data;
}
Channel getChannel() {
return channel;
}
int getXid() {
return xid;
}
boolean getReplied() {
return replied;
}
void setReplied(boolean replied) {
this.replied = replied;
}
WriteCtx(FileHandle handle, long offset, int count, WriteStableHow stableHow,
byte[] data, Channel channel, int xid, boolean replied, int dataState) {
this.handle = handle;
this.offset = offset;
this.count = count;
this.stableHow = stableHow;
this.data = data;
this.channel = channel;
this.xid = xid;
this.replied = replied;
this.dataState = dataState;
raf = null;
}
@Override
public String toString() {
return "Id:" + handle.getFileId() + " offset:" + offset + " count:" + count
+ " stableHow:" + stableHow + " replied:" + replied + " dataState:"
+ dataState + " xid:" + xid;
}
}

View File

@ -0,0 +1,284 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.nfs3;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.nfs.NfsFileType;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
import org.apache.hadoop.nfs.nfs3.response.WccData;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.util.Daemon;
import org.jboss.netty.channel.Channel;
import com.google.common.collect.Maps;
/**
* Manage the writes and responds asynchronously.
*/
public class WriteManager {
public static final Log LOG = LogFactory.getLog(WriteManager.class);
private final IdUserGroup iug;
private final ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = Maps
.newConcurrentMap();
private AsyncDataService asyncDataService;
private boolean asyncDataServiceStarted = false;
private final StreamMonitor streamMonitor;
/**
* The time limit to wait for accumulate reordered sequential writes to the
* same file before the write is considered done.
*/
private long streamTimeout;
public static final long DEFAULT_STREAM_TIMEOUT = 10 * 1000; // 10 second
public static final long MINIMIUM_STREAM_TIMEOUT = 1 * 1000; // 1 second
void addOpenFileStream(FileHandle h, OpenFileCtx ctx) {
openFileMap.put(h, ctx);
if (LOG.isDebugEnabled()) {
LOG.debug("After add the new stream " + h.getFileId()
+ ", the stream number:" + openFileMap.size());
}
}
WriteManager(IdUserGroup iug, final Configuration config) {
this.iug = iug;
streamTimeout = config.getLong("dfs.nfs3.stream.timeout",
DEFAULT_STREAM_TIMEOUT);
LOG.info("Stream timeout is " + streamTimeout + "ms.");
if (streamTimeout < MINIMIUM_STREAM_TIMEOUT) {
LOG.info("Reset stream timeout to minimum value "
+ MINIMIUM_STREAM_TIMEOUT + "ms.");
streamTimeout = MINIMIUM_STREAM_TIMEOUT;
}
this.streamMonitor = new StreamMonitor();
}
private void startAsyncDataSerivce() {
streamMonitor.start();
this.asyncDataService = new AsyncDataService();
asyncDataServiceStarted = true;
}
private void shutdownAsyncDataService() {
asyncDataService.shutdown();
asyncDataServiceStarted = false;
streamMonitor.interrupt();
}
void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel,
int xid, Nfs3FileAttributes preOpAttr) throws IOException {
// First write request starts the async data service
if (!asyncDataServiceStarted) {
startAsyncDataSerivce();
}
long offset = request.getOffset();
int count = request.getCount();
WriteStableHow stableHow = request.getStableHow();
byte[] data = request.getData().array();
if (data.length < count) {
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL);
Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
return;
}
FileHandle handle = request.getHandle();
if (LOG.isDebugEnabled()) {
LOG.debug("handleWrite fileId: " + handle.getFileId() + " offset: "
+ offset + " length:" + count + " stableHow:" + stableHow.getValue());
}
// Check if there is a stream to write
FileHandle fileHandle = request.getHandle();
OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
if (openFileCtx == null) {
LOG.info("No opened stream for fileId:" + fileHandle.getFileId());
WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), preOpAttr);
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
fileWcc, count, request.getStableHow(),
Nfs3Constant.WRITE_COMMIT_VERF);
Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
return;
}
// Add write into the async job queue
openFileCtx.receivedNewWrite(dfsClient, request, channel, xid,
asyncDataService, iug);
// Block stable write
if (request.getStableHow() != WriteStableHow.UNSTABLE) {
if (handleCommit(fileHandle, offset + count)) {
Nfs3FileAttributes postOpAttr = getFileAttr(dfsClient, handle, iug);
WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr),
postOpAttr);
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
fileWcc, count, request.getStableHow(),
Nfs3Constant.WRITE_COMMIT_VERF);
Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
} else {
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
}
}
return;
}
boolean handleCommit(FileHandle fileHandle, long commitOffset) {
OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
if (openFileCtx == null) {
LOG.info("No opened stream for fileId:" + fileHandle.getFileId()
+ " commitOffset=" + commitOffset);
return true;
}
long timeout = 30 * 1000; // 30 seconds
long startCommit = System.currentTimeMillis();
while (true) {
int ret = openFileCtx.checkCommit(commitOffset);
if (ret == OpenFileCtx.COMMIT_FINISHED) {
// Committed
return true;
} else if (ret == OpenFileCtx.COMMIT_INACTIVE_CTX) {
LOG.info("Inactive stream, fileId=" + fileHandle.getFileId()
+ " commitOffset=" + commitOffset);
return true;
}
assert (ret == OpenFileCtx.COMMIT_WAIT || ret == OpenFileCtx.COMMIT_ERROR);
if (ret == OpenFileCtx.COMMIT_ERROR) {
return false;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Not committed yet, wait., fileId=" + fileHandle.getFileId()
+ " commitOffset=" + commitOffset);
}
if (System.currentTimeMillis() - startCommit > timeout) {
// Commit took too long, return error
return false;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.info("Commit is interrupted, fileId=" + fileHandle.getFileId()
+ " commitOffset=" + commitOffset);
return false;
}
}// while
}
/**
* If the file is in cache, update the size based on the cached data size
*/
Nfs3FileAttributes getFileAttr(DFSClient client, FileHandle fileHandle,
IdUserGroup iug) throws IOException {
String fileIdPath = Nfs3Utils.getFileIdPath(fileHandle);
Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug);
if (attr != null) {
OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
if (openFileCtx != null) {
attr.setSize(openFileCtx.getNextOffset());
attr.setUsed(openFileCtx.getNextOffset());
}
}
return attr;
}
Nfs3FileAttributes getFileAttr(DFSClient client, FileHandle dirHandle,
String fileName) throws IOException {
String fileIdPath = Nfs3Utils.getFileIdPath(dirHandle) + "/" + fileName;
Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug);
if ((attr != null) && (attr.getType() == NfsFileType.NFSREG.toValue())) {
OpenFileCtx openFileCtx = openFileMap
.get(new FileHandle(attr.getFileId()));
if (openFileCtx != null) {
attr.setSize(openFileCtx.getNextOffset());
attr.setUsed(openFileCtx.getNextOffset());
}
}
return attr;
}
/**
* StreamMonitor wakes up periodically to find and closes idle streams.
*/
class StreamMonitor extends Daemon {
private int rotation = 5 * 1000; // 5 seconds
private long lastWakeupTime = 0;
@Override
public void run() {
while (true) {
Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet()
.iterator();
if (LOG.isTraceEnabled()) {
LOG.trace("openFileMap size:" + openFileMap.size());
}
while (it.hasNext()) {
Entry<FileHandle, OpenFileCtx> pairs = it.next();
OpenFileCtx ctx = pairs.getValue();
if (ctx.streamCleanup((pairs.getKey()).getFileId(), streamTimeout)) {
it.remove();
if (LOG.isDebugEnabled()) {
LOG.debug("After remove stream " + pairs.getKey().getFileId()
+ ", the stream number:" + openFileMap.size());
}
}
}
// Check if it can sleep
try {
long workedTime = System.currentTimeMillis() - lastWakeupTime;
if (workedTime < rotation) {
if (LOG.isTraceEnabled()) {
LOG.trace("StreamMonitor can still have a sleep:"
+ ((rotation - workedTime) / 1000));
}
Thread.sleep(rotation - workedTime);
}
lastWakeupTime = System.currentTimeMillis();
} catch (InterruptedException e) {
LOG.info("StreamMonitor got interrupted");
return;
}
}
}
}
}

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.nfs.mount.RpcProgramMountd;
import org.apache.hadoop.hdfs.nfs.nfs3.Nfs3;
import org.apache.hadoop.hdfs.nfs.nfs3.RpcProgramNfs3;
import org.apache.hadoop.oncrpc.XDR;
import org.junit.Test;
public class TestMountd {
public static final Log LOG = LogFactory.getLog(TestMountd.class);
@Test
public void testStart() throws IOException {
// Start minicluster
Configuration config = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(config).numDataNodes(1)
.manageNameDfsDirs(false).build();
cluster.waitActive();
// Start nfs
List<String> exports = new ArrayList<String>();
exports.add("/");
Nfs3 nfs3 = new Nfs3(exports, config);
nfs3.start(false);
RpcProgramMountd mountd = (RpcProgramMountd) nfs3.getMountBase()
.getRpcProgram();
mountd.nullOp(new XDR(), 1234, InetAddress.getByName("localhost"));
RpcProgramNfs3 nfsd = (RpcProgramNfs3) nfs3.getRpcProgram();
nfsd.nullProcedure();
cluster.shutdown();
}
}

View File

@ -0,0 +1,185 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.nfs.nfs3.Nfs3Utils;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
import org.apache.hadoop.nfs.nfs3.request.CREATE3Request;
import org.apache.hadoop.nfs.nfs3.request.SetAttr3;
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
import org.apache.hadoop.oncrpc.RegistrationClient;
import org.apache.hadoop.oncrpc.RpcCall;
import org.apache.hadoop.oncrpc.RpcFrameDecoder;
import org.apache.hadoop.oncrpc.RpcReply;
import org.apache.hadoop.oncrpc.SimpleTcpClient;
import org.apache.hadoop.oncrpc.SimpleTcpClientHandler;
import org.apache.hadoop.oncrpc.XDR;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
public class TestOutOfOrderWrite {
public final static Log LOG = LogFactory.getLog(TestOutOfOrderWrite.class);
static FileHandle handle = null;
static Channel channel;
static byte[] data1 = new byte[1000];
static byte[] data2 = new byte[1000];
static byte[] data3 = new byte[1000];
static XDR create() {
XDR request = new XDR();
RpcCall.write(request, 0x8000004c, Nfs3Constant.PROGRAM,
Nfs3Constant.VERSION, Nfs3Constant.NFSPROC3_CREATE);
// credentials
request.writeInt(0); // auth null
request.writeInt(0); // length zero
// verifier
request.writeInt(0); // auth null
request.writeInt(0); // length zero
SetAttr3 objAttr = new SetAttr3();
CREATE3Request createReq = new CREATE3Request(new FileHandle("/"),
"out-of-order-write" + System.currentTimeMillis(), 0, objAttr, 0);
createReq.serialize(request);
return request;
}
static XDR write(FileHandle handle, int xid, long offset, int count,
byte[] data) {
XDR request = new XDR();
RpcCall.write(request, xid, Nfs3Constant.PROGRAM, Nfs3Constant.VERSION,
Nfs3Constant.NFSPROC3_WRITE);
// credentials
request.writeInt(0); // auth null
request.writeInt(0); // length zero
// verifier
request.writeInt(0); // auth null
request.writeInt(0); // length zero
WRITE3Request write1 = new WRITE3Request(handle, offset, count,
WriteStableHow.UNSTABLE, ByteBuffer.wrap(data));
write1.serialize(request);
return request;
}
static void testRequest(XDR request) {
RegistrationClient registrationClient = new RegistrationClient("localhost",
Nfs3Constant.SUN_RPCBIND, request);
registrationClient.run();
}
static class WriteHandler extends SimpleTcpClientHandler {
public WriteHandler(XDR request) {
super(request);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
// Get handle from create response
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
XDR rsp = new XDR(buf.array());
if (rsp.getBytes().length == 0) {
LOG.info("rsp length is zero, why?");
return;
}
LOG.info("rsp length=" + rsp.getBytes().length);
RpcReply reply = RpcReply.read(rsp);
int xid = reply.getXid();
// Only process the create response
if (xid != 0x8000004c) {
return;
}
int status = rsp.readInt();
if (status != Nfs3Status.NFS3_OK) {
LOG.error("Create failed, status =" + status);
return;
}
LOG.info("Create succeeded");
rsp.readBoolean(); // value follow
handle = new FileHandle();
handle.deserialize(rsp);
channel = e.getChannel();
}
}
static class WriteClient extends SimpleTcpClient {
public WriteClient(String host, int port, XDR request, Boolean oneShot) {
super(host, port, request, oneShot);
}
@Override
protected ChannelPipelineFactory setPipelineFactory() {
this.pipelineFactory = new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
return Channels.pipeline(new RpcFrameDecoder(), new WriteHandler(
request));
}
};
return this.pipelineFactory;
}
}
public static void main(String[] args) throws InterruptedException {
Arrays.fill(data1, (byte) 7);
Arrays.fill(data2, (byte) 8);
Arrays.fill(data3, (byte) 9);
// NFS3 Create request
WriteClient client = new WriteClient("localhost", Nfs3Constant.PORT,
create(), false);
client.run();
while (handle == null) {
Thread.sleep(1000);
System.out.println("handle is still null...");
}
LOG.info("Send write1 request");
XDR writeReq;
writeReq = write(handle, 0x8000005c, 2000, 1000, data3);
Nfs3Utils.writeChannel(channel, writeReq);
writeReq = write(handle, 0x8000005d, 1000, 1000, data2);
Nfs3Utils.writeChannel(channel, writeReq);
writeReq = write(handle, 0x8000005e, 0, 1000, data1);
Nfs3Utils.writeChannel(channel, writeReq);
// TODO: convert to Junit test, and validate result automatically
}
}

View File

@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.nfs.mount.RpcProgramMountd;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.apache.hadoop.oncrpc.RegistrationClient;
import org.apache.hadoop.oncrpc.RpcCall;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.portmap.PortmapMapping;
import org.apache.hadoop.portmap.PortmapRequest;
public class TestPortmapRegister {
public static final Log LOG = LogFactory.getLog(TestPortmapRegister.class);
static void testRequest(XDR request, XDR request2) {
RegistrationClient registrationClient = new RegistrationClient(
"localhost", Nfs3Constant.SUN_RPCBIND, request);
registrationClient.run();
}
public static void main(String[] args) throws InterruptedException {
PortmapMapping mapEntry = new PortmapMapping(RpcProgramMountd.PROGRAM,
RpcProgramMountd.VERSION_1, PortmapMapping.TRANSPORT_UDP,
RpcProgramMountd.PORT);
XDR mappingRequest = PortmapRequest.create(mapEntry);
RegistrationClient registrationClient = new RegistrationClient(
"localhost", Nfs3Constant.SUN_RPCBIND, mappingRequest);
registrationClient.run();
Thread t1 = new Runtest1();
//Thread t2 = testa.new Runtest2();
t1.start();
//t2.start();
t1.join();
//t2.join();
//testDump();
}
static class Runtest1 extends Thread {
@Override
public void run() {
//testGetportMount();
PortmapMapping mapEntry = new PortmapMapping(RpcProgramMountd.PROGRAM,
RpcProgramMountd.VERSION_1, PortmapMapping.TRANSPORT_UDP,
RpcProgramMountd.PORT);
XDR req = PortmapRequest.create(mapEntry);
testRequest(req, req);
}
}
static class Runtest2 extends Thread {
@Override
public void run() {
testDump();
}
}
static void createPortmapXDRheader(XDR xdr_out, int procedure) {
// TODO: Move this to RpcRequest
RpcCall.write(xdr_out, 0, 100000, 2, procedure);
xdr_out.writeInt(0); //no auth
xdr_out.writeInt(0);
xdr_out.writeInt(0);
xdr_out.writeInt(0);
/*
xdr_out.putInt(1); //unix auth
xdr_out.putVariableOpaque(new byte[20]);
xdr_out.putInt(0);
xdr_out.putInt(0);
*/
}
static void testGetportMount() {
XDR xdr_out = new XDR();
createPortmapXDRheader(xdr_out, 3);
xdr_out.writeInt(100005);
xdr_out.writeInt(1);
xdr_out.writeInt(6);
xdr_out.writeInt(0);
XDR request2 = new XDR();
createPortmapXDRheader(xdr_out, 3);
request2.writeInt(100005);
request2.writeInt(1);
request2.writeInt(6);
request2.writeInt(0);
testRequest(xdr_out, request2);
}
static void testGetport() {
XDR xdr_out = new XDR();
createPortmapXDRheader(xdr_out, 3);
xdr_out.writeInt(100003);
xdr_out.writeInt(3);
xdr_out.writeInt(6);
xdr_out.writeInt(0);
XDR request2 = new XDR();
createPortmapXDRheader(xdr_out, 3);
request2.writeInt(100003);
request2.writeInt(3);
request2.writeInt(6);
request2.writeInt(0);
testRequest(xdr_out, request2);
}
static void testDump() {
XDR xdr_out = new XDR();
createPortmapXDRheader(xdr_out, 4);
testRequest(xdr_out, xdr_out);
}
}

View File

@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.apache.hadoop.oncrpc.RpcCall;
import org.apache.hadoop.oncrpc.XDR;
// TODO: convert this to Junit
public class TestUdpServer {
static void testRequest(XDR request, XDR request2) {
try {
DatagramSocket clientSocket = new DatagramSocket();
InetAddress IPAddress = InetAddress.getByName("localhost");
byte[] sendData = request.getBytes();
byte[] receiveData = new byte[65535];
DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length,
IPAddress, Nfs3Constant.SUN_RPCBIND);
clientSocket.send(sendPacket);
DatagramPacket receivePacket = new DatagramPacket(receiveData,
receiveData.length);
clientSocket.receive(receivePacket);
clientSocket.close();
} catch (UnknownHostException e) {
System.err.println("Don't know about host: localhost.");
System.exit(1);
} catch (IOException e) {
System.err.println("Couldn't get I/O for "
+ "the connection to: localhost.");
System.exit(1);
}
}
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Runtest1();
// TODO: cleanup
//Thread t2 = new Runtest2();
t1.start();
//t2.start();
t1.join();
//t2.join();
//testDump();
}
static class Runtest1 extends Thread {
@Override
public void run() {
testGetportMount();
}
}
static class Runtest2 extends Thread {
@Override
public void run() {
testDump();
}
}
static void createPortmapXDRheader(XDR xdr_out, int procedure) {
// Make this a method
RpcCall.write(xdr_out, 0, 100000, 2, procedure);
}
static void testGetportMount() {
XDR xdr_out = new XDR();
createPortmapXDRheader(xdr_out, 3);
xdr_out.writeInt(100005);
xdr_out.writeInt(1);
xdr_out.writeInt(6);
xdr_out.writeInt(0);
XDR request2 = new XDR();
createPortmapXDRheader(xdr_out, 3);
request2.writeInt(100005);
request2.writeInt(1);
request2.writeInt(6);
request2.writeInt(0);
testRequest(xdr_out, request2);
}
static void testGetport() {
XDR xdr_out = new XDR();
createPortmapXDRheader(xdr_out, 3);
xdr_out.writeInt(100003);
xdr_out.writeInt(3);
xdr_out.writeInt(6);
xdr_out.writeInt(0);
XDR request2 = new XDR();
createPortmapXDRheader(xdr_out, 3);
request2.writeInt(100003);
request2.writeInt(3);
request2.writeInt(6);
request2.writeInt(0);
testRequest(xdr_out, request2);
}
static void testDump() {
XDR xdr_out = new XDR();
createPortmapXDRheader(xdr_out, 4);
testRequest(xdr_out, xdr_out);
}
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.nfs3;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient;
import org.junit.Test;
import org.mockito.Mockito;
public class TestDFSClientCache {
@Test
public void testLruTable() throws IOException {
DFSClientCache cache = new DFSClientCache(new Configuration(), 3);
DFSClient client = Mockito.mock(DFSClient.class);
cache.put("a", client);
assertTrue(cache.containsKey("a"));
cache.put("b", client);
cache.put("c", client);
cache.put("d", client);
assertTrue(cache.usedSize() == 3);
assertFalse(cache.containsKey("a"));
// Cache should have d,c,b in LRU order
assertTrue(cache.containsKey("b"));
// Do a lookup to make b the most recently used
assertTrue(cache.get("b") != null);
cache.put("e", client);
assertTrue(cache.usedSize() == 3);
// c should be replaced with e, and cache has e,b,d
assertFalse(cache.containsKey("c"));
assertTrue(cache.containsKey("e"));
assertTrue(cache.containsKey("b"));
assertTrue(cache.containsKey("d"));
}
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.nfs3;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.junit.Test;
public class TestOffsetRange {
@Test(expected = IllegalArgumentException.class)
public void testConstructor1() throws IOException {
new OffsetRange(0, 0);
}
@Test(expected = IllegalArgumentException.class)
public void testConstructor2() throws IOException {
new OffsetRange(-1, 0);
}
@Test(expected = IllegalArgumentException.class)
public void testConstructor3() throws IOException {
new OffsetRange(-3, -1);
}
@Test(expected = IllegalArgumentException.class)
public void testConstructor4() throws IOException {
new OffsetRange(-3, 100);
}
@Test
public void testCompare() throws IOException {
OffsetRange r1 = new OffsetRange(0, 1);
OffsetRange r2 = new OffsetRange(1, 3);
OffsetRange r3 = new OffsetRange(1, 3);
OffsetRange r4 = new OffsetRange(3, 4);
assertTrue(r2.compareTo(r3) == 0);
assertTrue(r2.compareTo(r1) == 1);
assertTrue(r2.compareTo(r4) == -1);
}
}

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.nfs3;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.junit.Assert;
import org.junit.Test;
/**
* Tests for {@link RpcProgramNfs3}
*/
public class TestRpcProgramNfs3 {
@Test(timeout=1000)
public void testIdempotent() {
int[][] procedures = {
{ Nfs3Constant.NFSPROC3_NULL, 1 },
{ Nfs3Constant.NFSPROC3_GETATTR, 1 },
{ Nfs3Constant.NFSPROC3_SETATTR, 1 },
{ Nfs3Constant.NFSPROC3_LOOKUP, 1 },
{ Nfs3Constant.NFSPROC3_ACCESS, 1 },
{ Nfs3Constant.NFSPROC3_READLINK, 1 },
{ Nfs3Constant.NFSPROC3_READ, 1 },
{ Nfs3Constant.NFSPROC3_WRITE, 1 },
{ Nfs3Constant.NFSPROC3_CREATE, 0 },
{ Nfs3Constant.NFSPROC3_MKDIR, 0 },
{ Nfs3Constant.NFSPROC3_SYMLINK, 0 },
{ Nfs3Constant.NFSPROC3_MKNOD, 0 },
{ Nfs3Constant.NFSPROC3_REMOVE, 0 },
{ Nfs3Constant.NFSPROC3_RMDIR, 0 },
{ Nfs3Constant.NFSPROC3_RENAME, 0 },
{ Nfs3Constant.NFSPROC3_LINK, 0 },
{ Nfs3Constant.NFSPROC3_READDIR, 1 },
{ Nfs3Constant.NFSPROC3_READDIRPLUS, 1 },
{ Nfs3Constant.NFSPROC3_FSSTAT, 1 },
{ Nfs3Constant.NFSPROC3_FSINFO, 1 },
{ Nfs3Constant.NFSPROC3_PATHCONF, 1 },
{ Nfs3Constant.NFSPROC3_COMMIT, 1 } };
for (int[] procedure : procedures) {
boolean idempotent = procedure[1] == 1;
int proc = procedure[0];
if (idempotent) {
Assert.assertTrue(("Procedure " + proc + " should be idempotent"),
RpcProgramNfs3.isIdempotent(proc));
} else {
Assert.assertFalse(("Procedure " + proc + " should be non-idempotent"),
RpcProgramNfs3.isIdempotent(proc));
}
}
}
}

View File

@ -6,12 +6,16 @@ Release 2.3.0 - UNRELEASED
NEW FEATURES
HDFS-4762 Provide HDFS based NFSv3 and Mountd implementation (brandonli)
IMPROVEMENTS
OPTIMIZATIONS
BUG FIXES
HDFS-4948. mvn site for hadoop-hdfs-nfs fails. (brandonli)
Release 2.2.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -34,6 +34,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<module>hadoop-hdfs</module>
<module>hadoop-hdfs-httpfs</module>
<module>hadoop-hdfs/src/contrib/bkjournal</module>
<module>hadoop-hdfs-nfs</module>
</modules>
<build>

View File

@ -92,6 +92,11 @@
<artifactId>hadoop-auth</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-nfs</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>