HDFS-4762 Provide HDFS based NFSv3 and Mountd implementation. Contributed by Brandon Li
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1499029 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
05fd0a706a
commit
37f587563a
|
@ -0,0 +1,4 @@
|
|||
-----------------------------------------------------------------------------
|
||||
HDFS-NFS - NFS implementation for Hadoop HDFS
|
||||
|
||||
-----------------------------------------------------------------------------
|
|
@ -0,0 +1,18 @@
|
|||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<FindBugsFilter>
|
||||
</FindBugsFilter>
|
|
@ -0,0 +1,268 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License. See accompanying LICENSE file.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
|
||||
http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-project-dist</artifactId>
|
||||
<version>3.0.0-SNAPSHOT</version>
|
||||
<relativePath>../../hadoop-project-dist</relativePath>
|
||||
</parent>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs-nfs</artifactId>
|
||||
<version>3.0.0-SNAPSHOT</version>
|
||||
<description>Apache Hadoop HDFS-NFS</description>
|
||||
<name>Apache Hadoop HDFS-NFS</name>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-annotations</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-auth</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-nfs</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty</artifactId>
|
||||
<version>3.6.2.Final</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs</artifactId>
|
||||
<scope>test</scope>
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
<scope>test</scope>
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
<version>3.4.2</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mortbay.jetty</groupId>
|
||||
<artifactId>jetty</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mortbay.jetty</groupId>
|
||||
<artifactId>jetty-util</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-core</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-server</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-cli</groupId>
|
||||
<artifactId>commons-cli</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-codec</groupId>
|
||||
<artifactId>commons-codec</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-lang</groupId>
|
||||
<artifactId>commons-lang</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-logging</groupId>
|
||||
<artifactId>commons-logging</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-daemon</groupId>
|
||||
<artifactId>commons-daemon</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>javax.servlet.jsp</groupId>
|
||||
<artifactId>jsp-api</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>log4j</groupId>
|
||||
<artifactId>log4j</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.protobuf</groupId>
|
||||
<artifactId>protobuf-java</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>javax.servlet</groupId>
|
||||
<artifactId>servlet-api</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-all</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.codehaus.jackson</groupId>
|
||||
<artifactId>jackson-core-asl</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.codehaus.jackson</groupId>
|
||||
<artifactId>jackson-mapper-asl</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>tomcat</groupId>
|
||||
<artifactId>jasper-runtime</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>xmlenc</groupId>
|
||||
<artifactId>xmlenc</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<configuration>
|
||||
<systemPropertyVariables>
|
||||
<startKdc>${startKdc}</startKdc>
|
||||
<kdc.resource.dir>${kdc.resource.dir}</kdc.resource.dir>
|
||||
</systemPropertyVariables>
|
||||
<properties>
|
||||
<property>
|
||||
<name>listener</name>
|
||||
<value>org.apache.hadoop.test.TimedOutTestsListener</value>
|
||||
</property>
|
||||
</properties>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.codehaus.mojo</groupId>
|
||||
<artifactId>build-helper-maven-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>add-jsp-generated-sources-directory</id>
|
||||
<phase>generate-sources</phase>
|
||||
<goals>
|
||||
<goal>add-source</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<sources>
|
||||
<source>${project.build.directory}/generated-sources/java</source>
|
||||
</sources>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-antrun-plugin</artifactId>
|
||||
<configuration>
|
||||
<skipTests>false</skipTests>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>create-jsp-generated-sources-directory</id>
|
||||
<phase>initialize</phase>
|
||||
<goals>
|
||||
<goal>run</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<target>
|
||||
<mkdir dir="${project.build.directory}/generated-sources/java" />
|
||||
</target>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<phase>pre-site</phase>
|
||||
<goals>
|
||||
<goal>run</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<tasks>
|
||||
<copy file="src/main/resources/hdfs-nfs-default.xml" todir="src/site/resources"/>
|
||||
<copy file="src/main/xsl/configuration.xsl" todir="src/site/resources"/>
|
||||
</tasks>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,54 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.nfs.mount;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mount.MountdBase;
|
||||
|
||||
/**
|
||||
* Main class for starting mountd daemon. This daemon implements the NFS
|
||||
* mount protocol. When receiving a MOUNT request from an NFS client, it checks
|
||||
* the request against the list of currently exported file systems. If the
|
||||
* client is permitted to mount the file system, rpc.mountd obtains a file
|
||||
* handle for requested directory and returns it to the client.
|
||||
*/
|
||||
public class Mountd extends MountdBase {
|
||||
/**
|
||||
* Constructor
|
||||
* @param exports
|
||||
* @throws IOException
|
||||
*/
|
||||
public Mountd(List<String> exports) throws IOException {
|
||||
super(exports, new RpcProgramMountd(exports));
|
||||
}
|
||||
|
||||
public Mountd(List<String> exports, Configuration config) throws IOException {
|
||||
super(exports, new RpcProgramMountd(exports, config));
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
List<String> exports = new ArrayList<String>();
|
||||
exports.add("/");
|
||||
Mountd mountd = new Mountd(exports);
|
||||
mountd.start(true);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,183 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.nfs.mount;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.mount.MountEntry;
|
||||
import org.apache.hadoop.mount.MountInterface;
|
||||
import org.apache.hadoop.mount.MountResponse;
|
||||
import org.apache.hadoop.nfs.nfs3.FileHandle;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
|
||||
import org.apache.hadoop.oncrpc.RpcAcceptedReply;
|
||||
import org.apache.hadoop.oncrpc.RpcCall;
|
||||
import org.apache.hadoop.oncrpc.RpcProgram;
|
||||
import org.apache.hadoop.oncrpc.XDR;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
|
||||
/**
|
||||
* RPC program corresponding to mountd daemon. See {@link Mountd}.
|
||||
*/
|
||||
public class RpcProgramMountd extends RpcProgram implements MountInterface {
|
||||
private static final Log LOG = LogFactory.getLog(RpcProgramMountd.class);
|
||||
public static final int PROGRAM = 100005;
|
||||
public static final int VERSION_1 = 1;
|
||||
public static final int VERSION_2 = 2;
|
||||
public static final int VERSION_3 = 3;
|
||||
public static final int PORT = 4242;
|
||||
|
||||
// Need DFSClient for branch-1 to get ExtendedHdfsFileStatus
|
||||
private final DFSClient dfsClient;
|
||||
|
||||
/** Synchronized list */
|
||||
private final List<MountEntry> mounts;
|
||||
|
||||
/** List that is unmodifiable */
|
||||
private final List<String> exports;
|
||||
|
||||
public RpcProgramMountd() throws IOException {
|
||||
this(new ArrayList<String>(0));
|
||||
}
|
||||
|
||||
public RpcProgramMountd(List<String> exports) throws IOException {
|
||||
this(exports, new Configuration());
|
||||
}
|
||||
|
||||
public RpcProgramMountd(List<String> exports, Configuration config)
|
||||
throws IOException {
|
||||
// Note that RPC cache is not enabled
|
||||
super("mountd", "localhost", PORT, PROGRAM, VERSION_1, VERSION_3, 0);
|
||||
this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>());
|
||||
this.exports = Collections.unmodifiableList(exports);
|
||||
this.dfsClient = new DFSClient(NameNode.getAddress(config), config);
|
||||
}
|
||||
|
||||
public XDR nullOp(XDR out, int xid, InetAddress client) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("MOUNT NULLOP : " + " client: " + client);
|
||||
}
|
||||
return RpcAcceptedReply.voidReply(out, xid);
|
||||
}
|
||||
|
||||
public XDR mnt(XDR xdr, XDR out, int xid, InetAddress client) {
|
||||
String path = xdr.readString();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("MOUNT MNT path: " + path + " client: " + client);
|
||||
}
|
||||
|
||||
String host = client.getHostName();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Got host: " + host + " path: " + path);
|
||||
}
|
||||
if (!exports.contains(path)) {
|
||||
LOG.info("Path " + path + " is not shared.");
|
||||
MountResponse.writeMNTResponse(Nfs3Status.NFS3ERR_NOENT, out, xid, null);
|
||||
return out;
|
||||
}
|
||||
|
||||
FileHandle handle = null;
|
||||
try {
|
||||
HdfsFileStatus exFileStatus = dfsClient.getFileInfo(path);
|
||||
|
||||
handle = new FileHandle(exFileStatus.getFileId());
|
||||
} catch (IOException e) {
|
||||
LOG.error("Can't get handle for export:" + path + ", exception:" + e);
|
||||
MountResponse.writeMNTResponse(Nfs3Status.NFS3ERR_NOENT, out, xid, null);
|
||||
return out;
|
||||
}
|
||||
|
||||
assert (handle != null);
|
||||
LOG.info("Giving handle (fileId:" + handle.getFileId()
|
||||
+ ") to client for export " + path);
|
||||
mounts.add(new MountEntry(host, path));
|
||||
|
||||
MountResponse.writeMNTResponse(Nfs3Status.NFS3_OK, out, xid,
|
||||
handle.getContent());
|
||||
return out;
|
||||
}
|
||||
|
||||
public XDR dump(XDR out, int xid, InetAddress client) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("MOUNT NULLOP : " + " client: " + client);
|
||||
}
|
||||
|
||||
List<MountEntry> copy = new ArrayList<MountEntry>(mounts);
|
||||
MountResponse.writeMountList(out, xid, copy);
|
||||
return out;
|
||||
}
|
||||
|
||||
public XDR umnt(XDR xdr, XDR out, int xid, InetAddress client) {
|
||||
String path = xdr.readString();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("MOUNT UMNT path: " + path + " client: " + client);
|
||||
}
|
||||
|
||||
String host = client.getHostName();
|
||||
mounts.remove(new MountEntry(host, path));
|
||||
RpcAcceptedReply.voidReply(out, xid);
|
||||
return out;
|
||||
}
|
||||
|
||||
public XDR umntall(XDR out, int xid, InetAddress client) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("MOUNT UMNTALL : " + " client: " + client);
|
||||
}
|
||||
mounts.clear();
|
||||
return RpcAcceptedReply.voidReply(out, xid);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XDR handleInternal(RpcCall rpcCall, XDR xdr, XDR out,
|
||||
InetAddress client, Channel channel) {
|
||||
int procedure = rpcCall.getProcedure();
|
||||
int xid = rpcCall.getXid();
|
||||
if (procedure == MNTPROC_NULL) {
|
||||
out = nullOp(out, xid, client);
|
||||
} else if (procedure == MNTPROC_MNT) {
|
||||
out = mnt(xdr, out, xid, client);
|
||||
} else if (procedure == MNTPROC_DUMP) {
|
||||
out = dump(out, xid, client);
|
||||
} else if (procedure == MNTPROC_UMNT) {
|
||||
out = umnt(xdr, out, xid, client);
|
||||
} else if (procedure == MNTPROC_UMNTALL) {
|
||||
umntall(out, xid, client);
|
||||
} else if (procedure == MNTPROC_EXPORT) {
|
||||
out = MountResponse.writeExportList(out, xid, exports);
|
||||
} else {
|
||||
// Invalid procedure
|
||||
RpcAcceptedReply.voidReply(out, xid,
|
||||
RpcAcceptedReply.AcceptState.PROC_UNAVAIL); }
|
||||
return out;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isIdempotent(RpcCall call) {
|
||||
// Not required, because cache is turned off
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,141 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.lang.exception.ExceptionUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* This class is a thread pool to easily schedule async data operations.Current
|
||||
* async data operation is write back operation. In the future, we could use it
|
||||
* for readahead operations too.
|
||||
*/
|
||||
public class AsyncDataService {
|
||||
static final Log LOG = LogFactory.getLog(AsyncDataService.class);
|
||||
|
||||
// ThreadPool core pool size
|
||||
private static final int CORE_THREADS_PER_VOLUME = 1;
|
||||
// ThreadPool maximum pool size
|
||||
private static final int MAXIMUM_THREADS_PER_VOLUME = 4;
|
||||
// ThreadPool keep-alive time for threads over core pool size
|
||||
private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
|
||||
private final ThreadGroup threadGroup = new ThreadGroup("async data service");
|
||||
private ThreadFactory threadFactory = null;
|
||||
private ThreadPoolExecutor executor = null;
|
||||
|
||||
public AsyncDataService() {
|
||||
threadFactory = new ThreadFactory() {
|
||||
public Thread newThread(Runnable r) {
|
||||
return new Thread(threadGroup, r);
|
||||
}
|
||||
};
|
||||
|
||||
executor = new ThreadPoolExecutor(CORE_THREADS_PER_VOLUME,
|
||||
MAXIMUM_THREADS_PER_VOLUME, THREADS_KEEP_ALIVE_SECONDS,
|
||||
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
|
||||
|
||||
// This can reduce the number of running threads
|
||||
executor.allowCoreThreadTimeOut(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the task sometime in the future.
|
||||
*/
|
||||
synchronized void execute(Runnable task) {
|
||||
if (executor == null) {
|
||||
throw new RuntimeException("AsyncDataService is already shutdown");
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Current active thread number: " + executor.getActiveCount()
|
||||
+ " queue size:" + executor.getQueue().size()
|
||||
+ " scheduled task number:" + executor.getTaskCount());
|
||||
}
|
||||
executor.execute(task);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gracefully shut down the ThreadPool. Will wait for all data tasks to
|
||||
* finish.
|
||||
*/
|
||||
synchronized void shutdown() {
|
||||
if (executor == null) {
|
||||
LOG.warn("AsyncDataService has already shut down.");
|
||||
} else {
|
||||
LOG.info("Shutting down all async data service threads...");
|
||||
executor.shutdown();
|
||||
|
||||
// clear the executor so that calling execute again will fail.
|
||||
executor = null;
|
||||
LOG.info("All async data service threads have been shut down");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write the data to HDFS asynchronously
|
||||
*/
|
||||
void writeAsync(OpenFileCtx openFileCtx) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Scheduling write back task for fileId: "
|
||||
+ openFileCtx.copyLatestAttr().getFileId());
|
||||
}
|
||||
WriteBackTask wbTask = new WriteBackTask(openFileCtx);
|
||||
execute(wbTask);
|
||||
}
|
||||
|
||||
/**
|
||||
* A task for write data back to HDFS for a file. Since only one thread can
|
||||
* write for a file, any time there should be only one task(in queue or
|
||||
* executing) for one file existing, and this should be guaranteed by the
|
||||
* caller.
|
||||
*/
|
||||
static class WriteBackTask implements Runnable {
|
||||
|
||||
OpenFileCtx openFileCtx;
|
||||
|
||||
WriteBackTask(OpenFileCtx openFileCtx) {
|
||||
this.openFileCtx = openFileCtx;
|
||||
}
|
||||
|
||||
OpenFileCtx getOpenFileCtx() {
|
||||
return openFileCtx;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
// Called in AsyncDataService.execute for displaying error messages.
|
||||
return "write back data for fileId"
|
||||
+ openFileCtx.copyLatestAttr().getFileId() + " with nextOffset "
|
||||
+ openFileCtx.getNextOffset();
|
||||
}
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
openFileCtx.executeWriteBack();
|
||||
} catch (Throwable t) {
|
||||
LOG.error("Asyn data service got error:"
|
||||
+ ExceptionUtils.getFullStackTrace(t));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,85 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
/**
|
||||
* A cache saves DFSClient objects for different users
|
||||
*/
|
||||
public class DFSClientCache {
|
||||
static final Log LOG = LogFactory.getLog(DFSClientCache.class);
|
||||
private final LruCache<String, DFSClient> lruTable;
|
||||
private final Configuration config;
|
||||
|
||||
public DFSClientCache(Configuration config) {
|
||||
// By default, keep 256 DFSClient instance for 256 active users
|
||||
this(config, 256);
|
||||
}
|
||||
|
||||
public DFSClientCache(Configuration config, int size) {
|
||||
lruTable = new LruCache<String, DFSClient>(size);
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
public void put(String uname, DFSClient client) {
|
||||
lruTable.put(uname, client);
|
||||
}
|
||||
|
||||
synchronized public DFSClient get(String uname) {
|
||||
DFSClient client = lruTable.get(uname);
|
||||
if (client != null) {
|
||||
return client;
|
||||
}
|
||||
|
||||
// Not in table, create one.
|
||||
try {
|
||||
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(uname);
|
||||
client = ugi.doAs(new PrivilegedExceptionAction<DFSClient>() {
|
||||
public DFSClient run() throws IOException {
|
||||
return new DFSClient(NameNode.getAddress(config), config);
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
LOG.error("Create DFSClient failed for user:" + uname);
|
||||
e.printStackTrace();
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
// Add new entry
|
||||
lruTable.put(uname, client);
|
||||
return client;
|
||||
}
|
||||
|
||||
public int usedSize() {
|
||||
return lruTable.usedSize();
|
||||
}
|
||||
|
||||
public boolean containsKey(String key) {
|
||||
return lruTable.containsKey(key);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* A thread-safe LRU table.
|
||||
*/
|
||||
public class LruCache<K, V> {
|
||||
private final int maxSize;
|
||||
private final LinkedHashMap<K, V> map;
|
||||
private static final float hashTableLoadFactor = 0.75f;
|
||||
|
||||
public LruCache(int maxSize) {
|
||||
this.maxSize = maxSize;
|
||||
int hashTableCapacity = (int) Math.ceil(maxSize / hashTableLoadFactor) + 1;
|
||||
map = new LinkedHashMap<K, V>(hashTableCapacity, hashTableLoadFactor, true) {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
|
||||
return size() > LruCache.this.maxSize;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// The found entry becomes the most recently used.
|
||||
public synchronized V get(K key) {
|
||||
return map.get(key);
|
||||
}
|
||||
|
||||
public synchronized void put(K key, V value) {
|
||||
map.put(key, value);
|
||||
}
|
||||
|
||||
public synchronized int usedSize() {
|
||||
return map.size();
|
||||
}
|
||||
|
||||
public synchronized boolean containsKey(K key) {
|
||||
return map.containsKey(key);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.nfs.mount.Mountd;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Base;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Nfs server. Supports NFS v3 using {@link RpcProgramNfs3}.
|
||||
* Currently Mountd program is also started inside this class.
|
||||
* Only TCP server is supported and UDP is not supported.
|
||||
*/
|
||||
public class Nfs3 extends Nfs3Base {
|
||||
public Nfs3(List<String> exports) throws IOException {
|
||||
super(new Mountd(exports), new RpcProgramNfs3(exports));
|
||||
}
|
||||
|
||||
public Nfs3(List<String> exports, Configuration config) throws IOException {
|
||||
super(new Mountd(exports, config), new RpcProgramNfs3(exports, config));
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
StringUtils.startupShutdownMessage(Nfs3.class, args, LOG);
|
||||
List<String> exports = new ArrayList<String>();
|
||||
exports.add("/");
|
||||
final Nfs3 nfsServer = new Nfs3(exports);
|
||||
nfsServer.start(true);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,166 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.nfs.NfsFileType;
|
||||
import org.apache.hadoop.nfs.NfsTime;
|
||||
import org.apache.hadoop.nfs.nfs3.FileHandle;
|
||||
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
|
||||
import org.apache.hadoop.nfs.nfs3.response.WccAttr;
|
||||
import org.apache.hadoop.nfs.nfs3.response.WccData;
|
||||
import org.apache.hadoop.oncrpc.XDR;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
|
||||
/**
|
||||
* Utility/helper methods related to NFS
|
||||
*/
|
||||
public class Nfs3Utils {
|
||||
public final static String INODEID_PATH_PREFIX = "/.reserved/.inodes/";
|
||||
|
||||
public static String getFileIdPath(FileHandle handle) {
|
||||
return getFileIdPath(handle.getFileId());
|
||||
}
|
||||
|
||||
public static String getFileIdPath(long fileId) {
|
||||
return INODEID_PATH_PREFIX + fileId;
|
||||
}
|
||||
|
||||
public static HdfsFileStatus getFileStatus(DFSClient client, String fileIdPath)
|
||||
throws IOException {
|
||||
return client.getFileInfo(fileIdPath);
|
||||
}
|
||||
|
||||
public static Nfs3FileAttributes getNfs3FileAttrFromFileStatus(
|
||||
HdfsFileStatus fs, IdUserGroup iug) {
|
||||
/**
|
||||
* Some 32bit Linux client has problem with 64bit fileId: it seems the 32bit
|
||||
* client takes only the lower 32bit of the fileId and treats it as signed
|
||||
* int. When the 32th bit is 1, the client considers it invalid.
|
||||
*/
|
||||
return new Nfs3FileAttributes(fs.isDir(), fs.getChildrenNum(), fs
|
||||
.getPermission().toShort(), iug.getUidAllowingUnknown(fs.getOwner()),
|
||||
iug.getGidAllowingUnknown(fs.getGroup()), fs.getLen(), 0 /* fsid */,
|
||||
fs.getFileId(), fs.getModificationTime(), fs.getAccessTime());
|
||||
}
|
||||
|
||||
public static Nfs3FileAttributes getFileAttr(DFSClient client,
|
||||
String fileIdPath, IdUserGroup iug) throws IOException {
|
||||
HdfsFileStatus fs = getFileStatus(client, fileIdPath);
|
||||
return fs == null ? null : getNfs3FileAttrFromFileStatus(fs, iug);
|
||||
}
|
||||
|
||||
public static WccAttr getWccAttr(DFSClient client, String fileIdPath)
|
||||
throws IOException {
|
||||
HdfsFileStatus fstat = getFileStatus(client, fileIdPath);
|
||||
if (fstat == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
long size = fstat.isDir() ? Nfs3FileAttributes.getDirSize(fstat
|
||||
.getChildrenNum()) : fstat.getLen();
|
||||
return new WccAttr(size, new NfsTime(fstat.getModificationTime()),
|
||||
new NfsTime(fstat.getModificationTime()));
|
||||
}
|
||||
|
||||
public static WccAttr getWccAttr(Nfs3FileAttributes attr) {
|
||||
return new WccAttr(attr.getSize(), attr.getMtime(), attr.getCtime());
|
||||
}
|
||||
|
||||
public static WccData createWccData(final WccAttr preOpAttr,
|
||||
DFSClient dfsClient, final String fileIdPath, final IdUserGroup iug)
|
||||
throws IOException {
|
||||
Nfs3FileAttributes postOpDirAttr = getFileAttr(dfsClient, fileIdPath, iug);
|
||||
return new WccData(preOpAttr, postOpDirAttr);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a write response to the netty network socket channel
|
||||
*/
|
||||
public static void writeChannel(Channel channel, XDR out) {
|
||||
ChannelBuffer outBuf = XDR.writeMessageTcp(out, true);
|
||||
channel.write(outBuf);
|
||||
}
|
||||
|
||||
private static boolean isSet(int access, int bits) {
|
||||
return (access & bits) == bits;
|
||||
}
|
||||
|
||||
public static int getAccessRights(int mode, int type) {
|
||||
int rtn = 0;
|
||||
if (isSet(mode, Nfs3Constant.ACCESS_MODE_READ)) {
|
||||
rtn |= Nfs3Constant.ACCESS3_READ;
|
||||
// LOOKUP is only meaningful for dir
|
||||
if (type == NfsFileType.NFSDIR.toValue()) {
|
||||
rtn |= Nfs3Constant.ACCESS3_LOOKUP;
|
||||
}
|
||||
}
|
||||
if (isSet(mode, Nfs3Constant.ACCESS_MODE_WRITE)) {
|
||||
rtn |= Nfs3Constant.ACCESS3_MODIFY;
|
||||
rtn |= Nfs3Constant.ACCESS3_EXTEND;
|
||||
// Set delete bit, UNIX may ignore it for regular file since it's up to
|
||||
// parent dir op permission
|
||||
rtn |= Nfs3Constant.ACCESS3_DELETE;
|
||||
}
|
||||
if (isSet(mode, Nfs3Constant.ACCESS_MODE_EXECUTE)) {
|
||||
if (type == NfsFileType.NFSREG.toValue()) {
|
||||
rtn |= Nfs3Constant.ACCESS3_EXECUTE;
|
||||
}
|
||||
}
|
||||
return rtn;
|
||||
}
|
||||
|
||||
public static int getAccessRightsForUserGroup(int uid, int gid,
|
||||
Nfs3FileAttributes attr) {
|
||||
int mode = attr.getMode();
|
||||
if (uid == attr.getUid()) {
|
||||
return getAccessRights(mode >> 6, attr.getType());
|
||||
}
|
||||
if (gid == attr.getGid()) {
|
||||
return getAccessRights(mode >> 3, attr.getType());
|
||||
}
|
||||
return getAccessRights(mode, attr.getType());
|
||||
}
|
||||
|
||||
public static long bytesToLong(byte[] data) {
|
||||
long n = 0xffL & data[0];
|
||||
for (int i = 1; i < 8; i++) {
|
||||
n = (n << 8) | (0xffL & data[i]);
|
||||
}
|
||||
return n;
|
||||
}
|
||||
|
||||
public static byte[] longToByte(long v) {
|
||||
byte[] data = new byte[8];
|
||||
data[0] = (byte) (v >>> 56);
|
||||
data[1] = (byte) (v >>> 48);
|
||||
data[2] = (byte) (v >>> 40);
|
||||
data[3] = (byte) (v >>> 32);
|
||||
data[4] = (byte) (v >>> 24);
|
||||
data[5] = (byte) (v >>> 16);
|
||||
data[6] = (byte) (v >>> 8);
|
||||
data[7] = (byte) (v >>> 0);
|
||||
return data;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,72 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||
|
||||
/**
|
||||
* OffsetRange is the range of read/write request. A single point (e.g.,[5,5])
|
||||
* is not a valid range.
|
||||
*/
|
||||
public class OffsetRange implements Comparable<OffsetRange> {
|
||||
private final long min;
|
||||
private final long max;
|
||||
|
||||
OffsetRange(long min, long max) {
|
||||
if ((min >= max) || (min < 0) || (max < 0)) {
|
||||
throw new IllegalArgumentException("Wrong offset range: (" + min + ","
|
||||
+ max + ")");
|
||||
}
|
||||
this.min = min;
|
||||
this.max = max;
|
||||
}
|
||||
|
||||
long getMin() {
|
||||
return min;
|
||||
}
|
||||
|
||||
long getMax() {
|
||||
return max;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return (int) (min ^ max);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
assert (o instanceof OffsetRange);
|
||||
OffsetRange range = (OffsetRange) o;
|
||||
return (min == range.getMin()) && (max == range.getMax());
|
||||
}
|
||||
|
||||
private static int compareTo(long left, long right) {
|
||||
if (left < right) {
|
||||
return -1;
|
||||
} else if (left > right) {
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(OffsetRange other) {
|
||||
final int d = compareTo(min, other.getMin());
|
||||
return d != 0 ? d : compareTo(max, other.getMax());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,775 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.security.InvalidParameterException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
|
||||
import org.apache.hadoop.io.BytesWritable.Comparator;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.nfs.nfs3.FileHandle;
|
||||
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
|
||||
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
|
||||
import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
|
||||
import org.apache.hadoop.nfs.nfs3.response.WccAttr;
|
||||
import org.apache.hadoop.nfs.nfs3.response.WccData;
|
||||
import org.apache.hadoop.oncrpc.XDR;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
|
||||
/**
|
||||
* OpenFileCtx saves the context of one HDFS file output stream. Access to it is
|
||||
* synchronized by its member lock.
|
||||
*/
|
||||
class OpenFileCtx {
|
||||
public static final Log LOG = LogFactory.getLog(OpenFileCtx.class);
|
||||
|
||||
/**
|
||||
* Lock to synchronize OpenFileCtx changes. Thread should get this lock before
|
||||
* any read/write operation to an OpenFileCtx object
|
||||
*/
|
||||
private final ReentrantLock ctxLock;
|
||||
|
||||
// The stream status. False means the stream is closed.
|
||||
private boolean activeState;
|
||||
// The stream write-back status. True means one thread is doing write back.
|
||||
private boolean asyncStatus;
|
||||
|
||||
private final FSDataOutputStream fos;
|
||||
private final Nfs3FileAttributes latestAttr;
|
||||
private long nextOffset;
|
||||
|
||||
private final SortedMap<OffsetRange, WriteCtx> pendingWrites;
|
||||
|
||||
// The last write, commit request or write-back event. Updating time to keep
|
||||
// output steam alive.
|
||||
private long lastAccessTime;
|
||||
|
||||
// Pending writes water mark for dump, 1MB
|
||||
private static int DUMP_WRITE_WATER_MARK = 1024 * 1024;
|
||||
private FileOutputStream dumpOut;
|
||||
private long nonSequentialWriteInMemory;
|
||||
private boolean enabledDump;
|
||||
private RandomAccessFile raf;
|
||||
private final String dumpFilePath;
|
||||
|
||||
private void updateLastAccessTime() {
|
||||
lastAccessTime = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
private boolean checkStreamTimeout(long streamTimeout) {
|
||||
return System.currentTimeMillis() - lastAccessTime > streamTimeout;
|
||||
}
|
||||
|
||||
// Increase or decrease the memory occupation of non-sequential writes
|
||||
private long updateNonSequentialWriteInMemory(long count) {
|
||||
nonSequentialWriteInMemory += count;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Update nonSequentialWriteInMemory by " + count + " new value:"
|
||||
+ nonSequentialWriteInMemory);
|
||||
}
|
||||
|
||||
if (nonSequentialWriteInMemory < 0) {
|
||||
LOG.error("nonSequentialWriteInMemory is negative after update with count "
|
||||
+ count);
|
||||
throw new IllegalArgumentException(
|
||||
"nonSequentialWriteInMemory is negative after update with count "
|
||||
+ count);
|
||||
}
|
||||
return nonSequentialWriteInMemory;
|
||||
}
|
||||
|
||||
OpenFileCtx(FSDataOutputStream fos, Nfs3FileAttributes latestAttr,
|
||||
String dumpFilePath) {
|
||||
this.fos = fos;
|
||||
this.latestAttr = latestAttr;
|
||||
pendingWrites = new TreeMap<OffsetRange, WriteCtx>();
|
||||
updateLastAccessTime();
|
||||
activeState = true;
|
||||
asyncStatus = false;
|
||||
dumpOut = null;
|
||||
raf = null;
|
||||
nonSequentialWriteInMemory = 0;
|
||||
this.dumpFilePath = dumpFilePath;
|
||||
enabledDump = dumpFilePath == null ? false: true;
|
||||
ctxLock = new ReentrantLock(true);
|
||||
}
|
||||
|
||||
private void lockCtx() {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace();
|
||||
StackTraceElement e = stacktrace[2];
|
||||
String methodName = e.getMethodName();
|
||||
LOG.trace("lock ctx, caller:" + methodName);
|
||||
}
|
||||
ctxLock.lock();
|
||||
}
|
||||
|
||||
private void unlockCtx() {
|
||||
ctxLock.unlock();
|
||||
if (LOG.isTraceEnabled()) {
|
||||
StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace();
|
||||
StackTraceElement e = stacktrace[2];
|
||||
String methodName = e.getMethodName();
|
||||
LOG.info("unlock ctx, caller:" + methodName);
|
||||
}
|
||||
}
|
||||
|
||||
// Make a copy of the latestAttr
|
||||
public Nfs3FileAttributes copyLatestAttr() {
|
||||
Nfs3FileAttributes ret;
|
||||
lockCtx();
|
||||
try {
|
||||
ret = new Nfs3FileAttributes(latestAttr);
|
||||
} finally {
|
||||
unlockCtx();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
private long getNextOffsetUnprotected() {
|
||||
assert(ctxLock.isLocked());
|
||||
return nextOffset;
|
||||
}
|
||||
|
||||
public long getNextOffset() {
|
||||
long ret;
|
||||
lockCtx();
|
||||
try {
|
||||
ret = getNextOffsetUnprotected();
|
||||
} finally {
|
||||
unlockCtx();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
// Get flushed offset. Note that flushed data may not be persisted.
|
||||
private long getFlushedOffset() {
|
||||
return fos.getPos();
|
||||
}
|
||||
|
||||
// Check if need to dump the new writes
|
||||
private void checkDump(long count) {
|
||||
assert (ctxLock.isLocked());
|
||||
|
||||
// Always update the in memory count
|
||||
updateNonSequentialWriteInMemory(count);
|
||||
|
||||
if (!enabledDump) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Do nothing, dump is disabled.");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (nonSequentialWriteInMemory < DUMP_WRITE_WATER_MARK) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Create dump outputstream for the first time
|
||||
if (dumpOut == null) {
|
||||
LOG.info("Create dump file:" + dumpFilePath);
|
||||
File dumpFile = new File(dumpFilePath);
|
||||
try {
|
||||
if (dumpFile.exists()) {
|
||||
LOG.fatal("The dump file should not exist:" + dumpFilePath);
|
||||
throw new RuntimeException("The dump file should not exist:"
|
||||
+ dumpFilePath);
|
||||
}
|
||||
dumpOut = new FileOutputStream(dumpFile);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Got failure when creating dump stream " + dumpFilePath
|
||||
+ " with error:" + e);
|
||||
enabledDump = false;
|
||||
IOUtils.cleanup(LOG, dumpOut);
|
||||
return;
|
||||
}
|
||||
}
|
||||
// Get raf for the first dump
|
||||
if (raf == null) {
|
||||
try {
|
||||
raf = new RandomAccessFile(dumpFilePath, "r");
|
||||
} catch (FileNotFoundException e) {
|
||||
LOG.error("Can't get random access to file " + dumpFilePath);
|
||||
// Disable dump
|
||||
enabledDump = false;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Start dump, current write number:" + pendingWrites.size());
|
||||
}
|
||||
Iterator<OffsetRange> it = pendingWrites.keySet().iterator();
|
||||
while (it.hasNext()) {
|
||||
OffsetRange key = it.next();
|
||||
WriteCtx writeCtx = pendingWrites.get(key);
|
||||
try {
|
||||
long dumpedDataSize = writeCtx.dumpData(dumpOut, raf);
|
||||
if (dumpedDataSize > 0) {
|
||||
updateNonSequentialWriteInMemory(-dumpedDataSize);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Dump data failed:" + writeCtx + " with error:" + e);
|
||||
// Disable dump
|
||||
enabledDump = false;
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (nonSequentialWriteInMemory != 0) {
|
||||
LOG.fatal("After dump, nonSequentialWriteInMemory is not zero: "
|
||||
+ nonSequentialWriteInMemory);
|
||||
throw new RuntimeException(
|
||||
"After dump, nonSequentialWriteInMemory is not zero: "
|
||||
+ nonSequentialWriteInMemory);
|
||||
}
|
||||
}
|
||||
|
||||
private WriteCtx checkRepeatedWriteRequest(WRITE3Request request,
|
||||
Channel channel, int xid) {
|
||||
OffsetRange range = new OffsetRange(request.getOffset(),
|
||||
request.getOffset() + request.getCount());
|
||||
WriteCtx writeCtx = pendingWrites.get(range);
|
||||
if (writeCtx== null) {
|
||||
return null;
|
||||
} else {
|
||||
if (xid != writeCtx.getXid()) {
|
||||
LOG.warn("Got a repeated request, same range, with a different xid:"
|
||||
+ xid + " xid in old request:" + writeCtx.getXid());
|
||||
//TODO: better handling.
|
||||
}
|
||||
return writeCtx;
|
||||
}
|
||||
}
|
||||
|
||||
public void receivedNewWrite(DFSClient dfsClient, WRITE3Request request,
|
||||
Channel channel, int xid, AsyncDataService asyncDataService,
|
||||
IdUserGroup iug) {
|
||||
|
||||
lockCtx();
|
||||
try {
|
||||
if (!activeState) {
|
||||
LOG.info("OpenFileCtx is inactive, fileId:"
|
||||
+ request.getHandle().getFileId());
|
||||
WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
|
||||
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
|
||||
fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
|
||||
Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
|
||||
} else {
|
||||
// Handle repeated write requests(same xid or not).
|
||||
// If already replied, send reply again. If not replied, drop the
|
||||
// repeated request.
|
||||
WriteCtx existantWriteCtx = checkRepeatedWriteRequest(request, channel,
|
||||
xid);
|
||||
if (existantWriteCtx != null) {
|
||||
if (!existantWriteCtx.getReplied()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Repeated write request which hasn't be served: xid="
|
||||
+ xid + ", drop it.");
|
||||
}
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Repeated write request which is already served: xid="
|
||||
+ xid + ", resend response.");
|
||||
}
|
||||
WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
|
||||
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
|
||||
fileWcc, request.getCount(), request.getStableHow(),
|
||||
Nfs3Constant.WRITE_COMMIT_VERF);
|
||||
Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
|
||||
}
|
||||
updateLastAccessTime();
|
||||
|
||||
} else {
|
||||
receivedNewWriteInternal(dfsClient, request, channel, xid,
|
||||
asyncDataService, iug);
|
||||
}
|
||||
}
|
||||
|
||||
} finally {
|
||||
unlockCtx();
|
||||
}
|
||||
}
|
||||
|
||||
private void receivedNewWriteInternal(DFSClient dfsClient,
|
||||
WRITE3Request request, Channel channel, int xid,
|
||||
AsyncDataService asyncDataService, IdUserGroup iug) {
|
||||
long offset = request.getOffset();
|
||||
int count = request.getCount();
|
||||
WriteStableHow stableHow = request.getStableHow();
|
||||
|
||||
// Get file length, fail non-append call
|
||||
WccAttr preOpAttr = latestAttr.getWccAttr();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("requesed offset=" + offset + " and current filesize="
|
||||
+ preOpAttr.getSize());
|
||||
}
|
||||
|
||||
long nextOffset = getNextOffsetUnprotected();
|
||||
if (offset == nextOffset) {
|
||||
LOG.info("Add to the list, update nextOffset and notify the writer,"
|
||||
+ " nextOffset:" + nextOffset);
|
||||
WriteCtx writeCtx = new WriteCtx(request.getHandle(),
|
||||
request.getOffset(), request.getCount(), request.getStableHow(),
|
||||
request.getData().array(), channel, xid, false, WriteCtx.NO_DUMP);
|
||||
addWrite(writeCtx);
|
||||
|
||||
// Create an async task and change openFileCtx status to indicate async
|
||||
// task pending
|
||||
if (!asyncStatus) {
|
||||
asyncStatus = true;
|
||||
asyncDataService.execute(new AsyncDataService.WriteBackTask(this));
|
||||
}
|
||||
|
||||
// Update the write time first
|
||||
updateLastAccessTime();
|
||||
Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr);
|
||||
|
||||
// Send response immediately for unstable write
|
||||
if (request.getStableHow() == WriteStableHow.UNSTABLE) {
|
||||
WccData fileWcc = new WccData(preOpAttr, postOpAttr);
|
||||
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
|
||||
fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
|
||||
Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
|
||||
writeCtx.setReplied(true);
|
||||
}
|
||||
|
||||
} else if (offset > nextOffset) {
|
||||
LOG.info("Add new write to the list but not update nextOffset:"
|
||||
+ nextOffset);
|
||||
WriteCtx writeCtx = new WriteCtx(request.getHandle(),
|
||||
request.getOffset(), request.getCount(), request.getStableHow(),
|
||||
request.getData().array(), channel, xid, false, WriteCtx.ALLOW_DUMP);
|
||||
addWrite(writeCtx);
|
||||
|
||||
// Check if need to dump some pending requests to file
|
||||
checkDump(request.getCount());
|
||||
updateLastAccessTime();
|
||||
Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr);
|
||||
|
||||
// In test, noticed some Linux client sends a batch (e.g., 1MB)
|
||||
// of reordered writes and won't send more writes until it gets
|
||||
// responses of the previous batch. So here send response immediately for
|
||||
// unstable non-sequential write
|
||||
if (request.getStableHow() == WriteStableHow.UNSTABLE) {
|
||||
WccData fileWcc = new WccData(preOpAttr, postOpAttr);
|
||||
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
|
||||
fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
|
||||
Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
|
||||
writeCtx.setReplied(true);
|
||||
}
|
||||
|
||||
} else {
|
||||
// offset < nextOffset
|
||||
LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + ","
|
||||
+ nextOffset + ")");
|
||||
WccData wccData = new WccData(preOpAttr, null);
|
||||
WRITE3Response response;
|
||||
|
||||
if (offset + count > nextOffset) {
|
||||
LOG.warn("Haven't noticed any partial overwrite out of a sequential file"
|
||||
+ "write requests, so treat it as a real random write, no support.");
|
||||
response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
|
||||
WriteStableHow.UNSTABLE, 0);
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Process perfectOverWrite");
|
||||
}
|
||||
response = processPerfectOverWrite(dfsClient, offset, count, stableHow,
|
||||
request.getData().array(),
|
||||
Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug);
|
||||
}
|
||||
|
||||
updateLastAccessTime();
|
||||
Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Honor 2 kinds of overwrites: 1). support some application like touch(write
|
||||
* the same content back to change mtime), 2) client somehow sends the same
|
||||
* write again in a different RPC.
|
||||
*/
|
||||
private WRITE3Response processPerfectOverWrite(DFSClient dfsClient,
|
||||
long offset, int count, WriteStableHow stableHow, byte[] data,
|
||||
String path, WccData wccData, IdUserGroup iug) {
|
||||
assert (ctxLock.isLocked());
|
||||
WRITE3Response response = null;
|
||||
|
||||
// Read the content back
|
||||
byte[] readbuffer = new byte[count];
|
||||
|
||||
int readCount = 0;
|
||||
FSDataInputStream fis = null;
|
||||
try {
|
||||
// Sync file data and length to avoid partial read failure
|
||||
((HdfsDataOutputStream) fos).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
|
||||
|
||||
fis = new FSDataInputStream(dfsClient.open(path));
|
||||
readCount = fis.read(offset, readbuffer, 0, count);
|
||||
if (readCount < count) {
|
||||
LOG.error("Can't read back " + count + " bytes, partial read size:"
|
||||
+ readCount);
|
||||
return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0,
|
||||
stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
LOG.info("Read failed when processing possible perfect overwrite, path="
|
||||
+ path + " error:" + e);
|
||||
return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0,
|
||||
stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
|
||||
} finally {
|
||||
IOUtils.cleanup(LOG, fis);
|
||||
}
|
||||
|
||||
// Compare with the request
|
||||
Comparator comparator = new Comparator();
|
||||
if (comparator.compare(readbuffer, 0, readCount, data, 0, count) != 0) {
|
||||
LOG.info("Perfect overwrite has different content");
|
||||
response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
|
||||
stableHow, 0);
|
||||
} else {
|
||||
LOG.info("Perfect overwrite has same content,"
|
||||
+ " updating the mtime, then return success");
|
||||
Nfs3FileAttributes postOpAttr = null;
|
||||
try {
|
||||
dfsClient.setTimes(path, System.currentTimeMillis(), -1);
|
||||
postOpAttr = Nfs3Utils.getFileAttr(dfsClient, path, iug);
|
||||
} catch (IOException e) {
|
||||
LOG.info("Got error when processing perfect overwrite, path=" + path
|
||||
+ " error:" + e);
|
||||
return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
|
||||
0);
|
||||
}
|
||||
|
||||
wccData.setPostOpAttr(postOpAttr);
|
||||
response = new WRITE3Response(Nfs3Status.NFS3_OK, wccData, count,
|
||||
stableHow, 0);
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
public final static int COMMIT_FINISHED = 0;
|
||||
public final static int COMMIT_WAIT = 1;
|
||||
public final static int COMMIT_INACTIVE_CTX = 2;
|
||||
public final static int COMMIT_ERROR = 3;
|
||||
|
||||
/**
|
||||
* return one commit status: COMMIT_FINISHED, COMMIT_WAIT,
|
||||
* COMMIT_INACTIVE_CTX, COMMIT_ERROR
|
||||
*/
|
||||
public int checkCommit(long commitOffset) {
|
||||
int ret = COMMIT_WAIT;
|
||||
|
||||
lockCtx();
|
||||
try {
|
||||
if (!activeState) {
|
||||
ret = COMMIT_INACTIVE_CTX;
|
||||
} else {
|
||||
ret = checkCommitInternal(commitOffset);
|
||||
}
|
||||
} finally {
|
||||
unlockCtx();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
private int checkCommitInternal(long commitOffset) {
|
||||
if (commitOffset == 0) {
|
||||
// Commit whole file
|
||||
commitOffset = getNextOffsetUnprotected();
|
||||
}
|
||||
|
||||
long flushed = getFlushedOffset();
|
||||
LOG.info("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
|
||||
if (flushed < commitOffset) {
|
||||
// Keep stream active
|
||||
updateLastAccessTime();
|
||||
return COMMIT_WAIT;
|
||||
}
|
||||
|
||||
int ret = COMMIT_WAIT;
|
||||
try {
|
||||
// Sync file data and length
|
||||
((HdfsDataOutputStream) fos).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
|
||||
// Nothing to do for metadata since attr related change is pass-through
|
||||
ret = COMMIT_FINISHED;
|
||||
} catch (IOException e) {
|
||||
LOG.error("Got stream error during data sync:" + e);
|
||||
// Do nothing. Stream will be closed eventually by StreamMonitor.
|
||||
ret = COMMIT_ERROR;
|
||||
}
|
||||
|
||||
// Keep stream active
|
||||
updateLastAccessTime();
|
||||
return ret;
|
||||
}
|
||||
|
||||
private void addWrite(WriteCtx writeCtx) {
|
||||
assert (ctxLock.isLocked());
|
||||
long offset = writeCtx.getOffset();
|
||||
int count = writeCtx.getCount();
|
||||
pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Check stream status to decide if it should be closed
|
||||
* @return true, remove stream; false, keep stream
|
||||
*/
|
||||
public boolean streamCleanup(long fileId, long streamTimeout) {
|
||||
if (streamTimeout < WriteManager.MINIMIUM_STREAM_TIMEOUT) {
|
||||
throw new InvalidParameterException("StreamTimeout" + streamTimeout
|
||||
+ "ms is less than MINIMIUM_STREAM_TIMEOUT "
|
||||
+ WriteManager.MINIMIUM_STREAM_TIMEOUT + "ms");
|
||||
}
|
||||
|
||||
boolean flag = false;
|
||||
if (!ctxLock.tryLock()) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Another thread is working on it" + ctxLock.toString());
|
||||
}
|
||||
return flag;
|
||||
}
|
||||
|
||||
try {
|
||||
// Check the stream timeout
|
||||
if (checkStreamTimeout(streamTimeout)) {
|
||||
LOG.info("closing stream for fileId:" + fileId);
|
||||
cleanup();
|
||||
flag = true;
|
||||
}
|
||||
} finally {
|
||||
unlockCtx();
|
||||
}
|
||||
return flag;
|
||||
}
|
||||
|
||||
// Invoked by AsynDataService to do the write back
|
||||
public void executeWriteBack() {
|
||||
long nextOffset;
|
||||
OffsetRange key;
|
||||
WriteCtx writeCtx;
|
||||
|
||||
try {
|
||||
// Don't lock OpenFileCtx for all writes to reduce the timeout of other
|
||||
// client request to the same file
|
||||
while (true) {
|
||||
lockCtx();
|
||||
if (!asyncStatus) {
|
||||
// This should never happen. There should be only one thread working
|
||||
// on one OpenFileCtx anytime.
|
||||
LOG.fatal("The openFileCtx has false async status");
|
||||
throw new RuntimeException("The openFileCtx has false async status");
|
||||
}
|
||||
// Any single write failure can change activeState to false, so do the
|
||||
// check each loop.
|
||||
if (pendingWrites.isEmpty()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("The asyn write task has no pendding writes, fileId: "
|
||||
+ latestAttr.getFileId());
|
||||
}
|
||||
break;
|
||||
}
|
||||
if (!activeState) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("The openFileCtx is not active anymore, fileId: "
|
||||
+ latestAttr.getFileId());
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
// Get the next sequential write
|
||||
nextOffset = getNextOffsetUnprotected();
|
||||
key = pendingWrites.firstKey();
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("key.getMin()=" + key.getMin() + " nextOffset="
|
||||
+ nextOffset);
|
||||
}
|
||||
|
||||
if (key.getMin() > nextOffset) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.info("The next sequencial write has not arrived yet");
|
||||
}
|
||||
break;
|
||||
|
||||
} else if (key.getMin() < nextOffset && key.getMax() > nextOffset) {
|
||||
// Can't handle overlapping write. Didn't see it in tests yet.
|
||||
LOG.fatal("Got a overlapping write (" + key.getMin() + ","
|
||||
+ key.getMax() + "), nextOffset=" + nextOffset);
|
||||
throw new RuntimeException("Got a overlapping write (" + key.getMin()
|
||||
+ "," + key.getMax() + "), nextOffset=" + nextOffset);
|
||||
|
||||
} else {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Remove write(" + key.getMin() + "-" + key.getMax()
|
||||
+ ") from the list");
|
||||
}
|
||||
writeCtx = pendingWrites.remove(key);
|
||||
// Do the write
|
||||
doSingleWrite(writeCtx);
|
||||
updateLastAccessTime();
|
||||
}
|
||||
|
||||
unlockCtx();
|
||||
}
|
||||
|
||||
} finally {
|
||||
// Always reset the async status so another async task can be created
|
||||
// for this file
|
||||
asyncStatus = false;
|
||||
if (ctxLock.isHeldByCurrentThread()) {
|
||||
unlockCtx();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void doSingleWrite(final WriteCtx writeCtx) {
|
||||
assert(ctxLock.isLocked());
|
||||
Channel channel = writeCtx.getChannel();
|
||||
int xid = writeCtx.getXid();
|
||||
|
||||
long offset = writeCtx.getOffset();
|
||||
int count = writeCtx.getCount();
|
||||
WriteStableHow stableHow = writeCtx.getStableHow();
|
||||
byte[] data = null;
|
||||
try {
|
||||
data = writeCtx.getData();
|
||||
} catch (IOException e1) {
|
||||
LOG.error("Failed to get request data offset:" + offset + " count:"
|
||||
+ count + " error:" + e1);
|
||||
// Cleanup everything
|
||||
cleanup();
|
||||
return;
|
||||
}
|
||||
assert (data.length == count);
|
||||
|
||||
FileHandle handle = writeCtx.getHandle();
|
||||
LOG.info("do write, fileId: " + handle.getFileId() + " offset: " + offset
|
||||
+ " length:" + count + " stableHow:" + stableHow.getValue());
|
||||
|
||||
try {
|
||||
fos.write(data, 0, count);
|
||||
|
||||
if (fos.getPos() != (offset + count)) {
|
||||
throw new IOException("output stream is out of sync, pos="
|
||||
+ fos.getPos() + " and nextOffset should be" + (offset + count));
|
||||
}
|
||||
nextOffset = fos.getPos();
|
||||
|
||||
// Reduce memory occupation size if request was allowed dumped
|
||||
if (writeCtx.getDataState() == WriteCtx.ALLOW_DUMP) {
|
||||
updateNonSequentialWriteInMemory(-count);
|
||||
}
|
||||
|
||||
if (!writeCtx.getReplied()) {
|
||||
WccAttr preOpAttr = latestAttr.getWccAttr();
|
||||
WccData fileWcc = new WccData(preOpAttr, latestAttr);
|
||||
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
|
||||
fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
|
||||
Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error writing to fileId " + handle.getFileId() + " at offset "
|
||||
+ offset + " and length " + data.length, e);
|
||||
if (!writeCtx.getReplied()) {
|
||||
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
|
||||
Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
|
||||
// Keep stream open. Either client retries or SteamMonitor closes it.
|
||||
}
|
||||
|
||||
LOG.info("Clean up open file context for fileId: "
|
||||
+ latestAttr.getFileid());
|
||||
cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanup() {
|
||||
assert(ctxLock.isLocked());
|
||||
activeState = false;
|
||||
|
||||
// Close stream
|
||||
try {
|
||||
if (fos != null) {
|
||||
fos.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.info("Can't close stream for fileId:" + latestAttr.getFileid()
|
||||
+ ", error:" + e);
|
||||
}
|
||||
|
||||
// Reply error for pending writes
|
||||
LOG.info("There are " + pendingWrites.size() + " pending writes.");
|
||||
WccAttr preOpAttr = latestAttr.getWccAttr();
|
||||
while (!pendingWrites.isEmpty()) {
|
||||
OffsetRange key = pendingWrites.firstKey();
|
||||
LOG.info("Fail pending write: (" + key.getMin() + "," + key.getMax()
|
||||
+ "), nextOffset=" + getNextOffsetUnprotected());
|
||||
|
||||
WriteCtx writeCtx = pendingWrites.remove(key);
|
||||
if (!writeCtx.getReplied()) {
|
||||
WccData fileWcc = new WccData(preOpAttr, latestAttr);
|
||||
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
|
||||
fileWcc, 0, writeCtx.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
|
||||
Nfs3Utils.writeChannel(writeCtx.getChannel(),
|
||||
response.send(new XDR(), writeCtx.getXid()));
|
||||
}
|
||||
}
|
||||
|
||||
// Cleanup dump file
|
||||
if (dumpOut!=null){
|
||||
try {
|
||||
dumpOut.close();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
if (raf!=null) {
|
||||
try {
|
||||
raf.close();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
File dumpFile = new File(dumpFilePath);
|
||||
if (dumpFile.delete()) {
|
||||
LOG.error("Failed to delete dumpfile: "+ dumpFile);
|
||||
}
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,162 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.nfs.nfs3.FileHandle;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
|
||||
/**
|
||||
* WriteCtx saves the context of one write request, such as request, channel,
|
||||
* xid and reply status.
|
||||
*/
|
||||
class WriteCtx {
|
||||
public static final Log LOG = LogFactory.getLog(WriteCtx.class);
|
||||
|
||||
private final FileHandle handle;
|
||||
private final long offset;
|
||||
private final int count;
|
||||
private final WriteStableHow stableHow;
|
||||
private byte[] data;
|
||||
|
||||
private final Channel channel;
|
||||
private final int xid;
|
||||
private boolean replied;
|
||||
|
||||
/**
|
||||
* In memory write data has 3 states. ALLOW_DUMP: not sequential write, still
|
||||
* wait for prerequisit writes. NO_DUMP: sequential write, no need to dump
|
||||
* since it will be written to HDFS soon. DUMPED: already dumped to a file.
|
||||
*/
|
||||
public final static int ALLOW_DUMP = 0;
|
||||
public final static int NO_DUMP = 1;
|
||||
public final static int DUMPED = 2;
|
||||
private int dataState;
|
||||
|
||||
public int getDataState() {
|
||||
return dataState;
|
||||
}
|
||||
|
||||
public void setDataState(int dataState) {
|
||||
this.dataState = dataState;
|
||||
}
|
||||
|
||||
private RandomAccessFile raf;
|
||||
private long dumpFileOffset;
|
||||
|
||||
// Return the dumped data size
|
||||
public long dumpData(FileOutputStream dumpOut, RandomAccessFile raf)
|
||||
throws IOException {
|
||||
if (dataState != ALLOW_DUMP) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("No need to dump with status(replied,dataState):" + "("
|
||||
+ replied + "," + dataState + ")");
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
this.raf = raf;
|
||||
dumpFileOffset = dumpOut.getChannel().position();
|
||||
dumpOut.write(data, 0, count);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("After dump, new dumpFileOffset:" + dumpFileOffset);
|
||||
}
|
||||
data = null;
|
||||
dataState = DUMPED;
|
||||
return count;
|
||||
}
|
||||
|
||||
public FileHandle getHandle() {
|
||||
return handle;
|
||||
}
|
||||
|
||||
public long getOffset() {
|
||||
return offset;
|
||||
}
|
||||
|
||||
public int getCount() {
|
||||
return count;
|
||||
}
|
||||
|
||||
public WriteStableHow getStableHow() {
|
||||
return stableHow;
|
||||
}
|
||||
|
||||
public byte[] getData() throws IOException {
|
||||
if (dataState != DUMPED) {
|
||||
if (data == null) {
|
||||
throw new IOException("Data is not dumpted but has null:" + this);
|
||||
}
|
||||
} else {
|
||||
// read back
|
||||
if (data != null) {
|
||||
throw new IOException("Data is dumpted but not null");
|
||||
}
|
||||
data = new byte[count];
|
||||
raf.seek(dumpFileOffset);
|
||||
int size = raf.read(data, 0, count);
|
||||
if (size != count) {
|
||||
throw new IOException("Data count is " + count + ", but read back "
|
||||
+ size + "bytes");
|
||||
}
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
Channel getChannel() {
|
||||
return channel;
|
||||
}
|
||||
|
||||
int getXid() {
|
||||
return xid;
|
||||
}
|
||||
|
||||
boolean getReplied() {
|
||||
return replied;
|
||||
}
|
||||
|
||||
void setReplied(boolean replied) {
|
||||
this.replied = replied;
|
||||
}
|
||||
|
||||
WriteCtx(FileHandle handle, long offset, int count, WriteStableHow stableHow,
|
||||
byte[] data, Channel channel, int xid, boolean replied, int dataState) {
|
||||
this.handle = handle;
|
||||
this.offset = offset;
|
||||
this.count = count;
|
||||
this.stableHow = stableHow;
|
||||
this.data = data;
|
||||
this.channel = channel;
|
||||
this.xid = xid;
|
||||
this.replied = replied;
|
||||
this.dataState = dataState;
|
||||
raf = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Id:" + handle.getFileId() + " offset:" + offset + " count:" + count
|
||||
+ " stableHow:" + stableHow + " replied:" + replied + " dataState:"
|
||||
+ dataState + " xid:" + xid;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,284 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.nfs.NfsFileType;
|
||||
import org.apache.hadoop.nfs.nfs3.FileHandle;
|
||||
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
|
||||
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
|
||||
import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
|
||||
import org.apache.hadoop.nfs.nfs3.response.WccData;
|
||||
import org.apache.hadoop.oncrpc.XDR;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
/**
|
||||
* Manage the writes and responds asynchronously.
|
||||
*/
|
||||
public class WriteManager {
|
||||
public static final Log LOG = LogFactory.getLog(WriteManager.class);
|
||||
|
||||
private final IdUserGroup iug;
|
||||
private final ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = Maps
|
||||
.newConcurrentMap();
|
||||
|
||||
private AsyncDataService asyncDataService;
|
||||
private boolean asyncDataServiceStarted = false;
|
||||
|
||||
private final StreamMonitor streamMonitor;
|
||||
|
||||
/**
|
||||
* The time limit to wait for accumulate reordered sequential writes to the
|
||||
* same file before the write is considered done.
|
||||
*/
|
||||
private long streamTimeout;
|
||||
|
||||
public static final long DEFAULT_STREAM_TIMEOUT = 10 * 1000; // 10 second
|
||||
public static final long MINIMIUM_STREAM_TIMEOUT = 1 * 1000; // 1 second
|
||||
|
||||
void addOpenFileStream(FileHandle h, OpenFileCtx ctx) {
|
||||
openFileMap.put(h, ctx);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("After add the new stream " + h.getFileId()
|
||||
+ ", the stream number:" + openFileMap.size());
|
||||
}
|
||||
}
|
||||
|
||||
WriteManager(IdUserGroup iug, final Configuration config) {
|
||||
this.iug = iug;
|
||||
|
||||
streamTimeout = config.getLong("dfs.nfs3.stream.timeout",
|
||||
DEFAULT_STREAM_TIMEOUT);
|
||||
LOG.info("Stream timeout is " + streamTimeout + "ms.");
|
||||
if (streamTimeout < MINIMIUM_STREAM_TIMEOUT) {
|
||||
LOG.info("Reset stream timeout to minimum value "
|
||||
+ MINIMIUM_STREAM_TIMEOUT + "ms.");
|
||||
streamTimeout = MINIMIUM_STREAM_TIMEOUT;
|
||||
}
|
||||
|
||||
this.streamMonitor = new StreamMonitor();
|
||||
}
|
||||
|
||||
private void startAsyncDataSerivce() {
|
||||
streamMonitor.start();
|
||||
this.asyncDataService = new AsyncDataService();
|
||||
asyncDataServiceStarted = true;
|
||||
}
|
||||
|
||||
private void shutdownAsyncDataService() {
|
||||
asyncDataService.shutdown();
|
||||
asyncDataServiceStarted = false;
|
||||
streamMonitor.interrupt();
|
||||
}
|
||||
|
||||
void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel,
|
||||
int xid, Nfs3FileAttributes preOpAttr) throws IOException {
|
||||
// First write request starts the async data service
|
||||
if (!asyncDataServiceStarted) {
|
||||
startAsyncDataSerivce();
|
||||
}
|
||||
|
||||
long offset = request.getOffset();
|
||||
int count = request.getCount();
|
||||
WriteStableHow stableHow = request.getStableHow();
|
||||
byte[] data = request.getData().array();
|
||||
if (data.length < count) {
|
||||
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL);
|
||||
Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
|
||||
return;
|
||||
}
|
||||
|
||||
FileHandle handle = request.getHandle();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("handleWrite fileId: " + handle.getFileId() + " offset: "
|
||||
+ offset + " length:" + count + " stableHow:" + stableHow.getValue());
|
||||
}
|
||||
|
||||
// Check if there is a stream to write
|
||||
FileHandle fileHandle = request.getHandle();
|
||||
OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
|
||||
if (openFileCtx == null) {
|
||||
LOG.info("No opened stream for fileId:" + fileHandle.getFileId());
|
||||
WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), preOpAttr);
|
||||
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
|
||||
fileWcc, count, request.getStableHow(),
|
||||
Nfs3Constant.WRITE_COMMIT_VERF);
|
||||
Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
|
||||
return;
|
||||
}
|
||||
|
||||
// Add write into the async job queue
|
||||
openFileCtx.receivedNewWrite(dfsClient, request, channel, xid,
|
||||
asyncDataService, iug);
|
||||
// Block stable write
|
||||
if (request.getStableHow() != WriteStableHow.UNSTABLE) {
|
||||
if (handleCommit(fileHandle, offset + count)) {
|
||||
Nfs3FileAttributes postOpAttr = getFileAttr(dfsClient, handle, iug);
|
||||
WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr),
|
||||
postOpAttr);
|
||||
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
|
||||
fileWcc, count, request.getStableHow(),
|
||||
Nfs3Constant.WRITE_COMMIT_VERF);
|
||||
Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
|
||||
} else {
|
||||
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
|
||||
Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
boolean handleCommit(FileHandle fileHandle, long commitOffset) {
|
||||
OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
|
||||
if (openFileCtx == null) {
|
||||
LOG.info("No opened stream for fileId:" + fileHandle.getFileId()
|
||||
+ " commitOffset=" + commitOffset);
|
||||
return true;
|
||||
}
|
||||
long timeout = 30 * 1000; // 30 seconds
|
||||
long startCommit = System.currentTimeMillis();
|
||||
while (true) {
|
||||
int ret = openFileCtx.checkCommit(commitOffset);
|
||||
if (ret == OpenFileCtx.COMMIT_FINISHED) {
|
||||
// Committed
|
||||
return true;
|
||||
} else if (ret == OpenFileCtx.COMMIT_INACTIVE_CTX) {
|
||||
LOG.info("Inactive stream, fileId=" + fileHandle.getFileId()
|
||||
+ " commitOffset=" + commitOffset);
|
||||
return true;
|
||||
}
|
||||
assert (ret == OpenFileCtx.COMMIT_WAIT || ret == OpenFileCtx.COMMIT_ERROR);
|
||||
if (ret == OpenFileCtx.COMMIT_ERROR) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Not committed yet, wait., fileId=" + fileHandle.getFileId()
|
||||
+ " commitOffset=" + commitOffset);
|
||||
}
|
||||
if (System.currentTimeMillis() - startCommit > timeout) {
|
||||
// Commit took too long, return error
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Commit is interrupted, fileId=" + fileHandle.getFileId()
|
||||
+ " commitOffset=" + commitOffset);
|
||||
return false;
|
||||
}
|
||||
}// while
|
||||
}
|
||||
|
||||
/**
|
||||
* If the file is in cache, update the size based on the cached data size
|
||||
*/
|
||||
Nfs3FileAttributes getFileAttr(DFSClient client, FileHandle fileHandle,
|
||||
IdUserGroup iug) throws IOException {
|
||||
String fileIdPath = Nfs3Utils.getFileIdPath(fileHandle);
|
||||
Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug);
|
||||
if (attr != null) {
|
||||
OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
|
||||
if (openFileCtx != null) {
|
||||
attr.setSize(openFileCtx.getNextOffset());
|
||||
attr.setUsed(openFileCtx.getNextOffset());
|
||||
}
|
||||
}
|
||||
return attr;
|
||||
}
|
||||
|
||||
Nfs3FileAttributes getFileAttr(DFSClient client, FileHandle dirHandle,
|
||||
String fileName) throws IOException {
|
||||
String fileIdPath = Nfs3Utils.getFileIdPath(dirHandle) + "/" + fileName;
|
||||
Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug);
|
||||
|
||||
if ((attr != null) && (attr.getType() == NfsFileType.NFSREG.toValue())) {
|
||||
OpenFileCtx openFileCtx = openFileMap
|
||||
.get(new FileHandle(attr.getFileId()));
|
||||
|
||||
if (openFileCtx != null) {
|
||||
attr.setSize(openFileCtx.getNextOffset());
|
||||
attr.setUsed(openFileCtx.getNextOffset());
|
||||
}
|
||||
}
|
||||
return attr;
|
||||
}
|
||||
|
||||
/**
|
||||
* StreamMonitor wakes up periodically to find and closes idle streams.
|
||||
*/
|
||||
class StreamMonitor extends Daemon {
|
||||
private int rotation = 5 * 1000; // 5 seconds
|
||||
private long lastWakeupTime = 0;
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (true) {
|
||||
Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet()
|
||||
.iterator();
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("openFileMap size:" + openFileMap.size());
|
||||
}
|
||||
while (it.hasNext()) {
|
||||
Entry<FileHandle, OpenFileCtx> pairs = it.next();
|
||||
OpenFileCtx ctx = pairs.getValue();
|
||||
if (ctx.streamCleanup((pairs.getKey()).getFileId(), streamTimeout)) {
|
||||
it.remove();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("After remove stream " + pairs.getKey().getFileId()
|
||||
+ ", the stream number:" + openFileMap.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check if it can sleep
|
||||
try {
|
||||
long workedTime = System.currentTimeMillis() - lastWakeupTime;
|
||||
if (workedTime < rotation) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("StreamMonitor can still have a sleep:"
|
||||
+ ((rotation - workedTime) / 1000));
|
||||
}
|
||||
Thread.sleep(rotation - workedTime);
|
||||
}
|
||||
lastWakeupTime = System.currentTimeMillis();
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("StreamMonitor got interrupted");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,63 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.nfs;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.nfs.mount.RpcProgramMountd;
|
||||
import org.apache.hadoop.hdfs.nfs.nfs3.Nfs3;
|
||||
import org.apache.hadoop.hdfs.nfs.nfs3.RpcProgramNfs3;
|
||||
import org.apache.hadoop.oncrpc.XDR;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestMountd {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(TestMountd.class);
|
||||
|
||||
@Test
|
||||
public void testStart() throws IOException {
|
||||
// Start minicluster
|
||||
Configuration config = new Configuration();
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(config).numDataNodes(1)
|
||||
.manageNameDfsDirs(false).build();
|
||||
cluster.waitActive();
|
||||
|
||||
// Start nfs
|
||||
List<String> exports = new ArrayList<String>();
|
||||
exports.add("/");
|
||||
Nfs3 nfs3 = new Nfs3(exports, config);
|
||||
nfs3.start(false);
|
||||
|
||||
RpcProgramMountd mountd = (RpcProgramMountd) nfs3.getMountBase()
|
||||
.getRpcProgram();
|
||||
mountd.nullOp(new XDR(), 1234, InetAddress.getByName("localhost"));
|
||||
|
||||
RpcProgramNfs3 nfsd = (RpcProgramNfs3) nfs3.getRpcProgram();
|
||||
nfsd.nullProcedure();
|
||||
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,185 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.nfs;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hdfs.nfs.nfs3.Nfs3Utils;
|
||||
import org.apache.hadoop.nfs.nfs3.FileHandle;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
|
||||
import org.apache.hadoop.nfs.nfs3.request.CREATE3Request;
|
||||
import org.apache.hadoop.nfs.nfs3.request.SetAttr3;
|
||||
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
|
||||
import org.apache.hadoop.oncrpc.RegistrationClient;
|
||||
import org.apache.hadoop.oncrpc.RpcCall;
|
||||
import org.apache.hadoop.oncrpc.RpcFrameDecoder;
|
||||
import org.apache.hadoop.oncrpc.RpcReply;
|
||||
import org.apache.hadoop.oncrpc.SimpleTcpClient;
|
||||
import org.apache.hadoop.oncrpc.SimpleTcpClientHandler;
|
||||
import org.apache.hadoop.oncrpc.XDR;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelPipeline;
|
||||
import org.jboss.netty.channel.ChannelPipelineFactory;
|
||||
import org.jboss.netty.channel.Channels;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
|
||||
public class TestOutOfOrderWrite {
|
||||
public final static Log LOG = LogFactory.getLog(TestOutOfOrderWrite.class);
|
||||
|
||||
static FileHandle handle = null;
|
||||
static Channel channel;
|
||||
|
||||
static byte[] data1 = new byte[1000];
|
||||
static byte[] data2 = new byte[1000];
|
||||
static byte[] data3 = new byte[1000];
|
||||
|
||||
static XDR create() {
|
||||
XDR request = new XDR();
|
||||
RpcCall.write(request, 0x8000004c, Nfs3Constant.PROGRAM,
|
||||
Nfs3Constant.VERSION, Nfs3Constant.NFSPROC3_CREATE);
|
||||
|
||||
// credentials
|
||||
request.writeInt(0); // auth null
|
||||
request.writeInt(0); // length zero
|
||||
// verifier
|
||||
request.writeInt(0); // auth null
|
||||
request.writeInt(0); // length zero
|
||||
|
||||
SetAttr3 objAttr = new SetAttr3();
|
||||
CREATE3Request createReq = new CREATE3Request(new FileHandle("/"),
|
||||
"out-of-order-write" + System.currentTimeMillis(), 0, objAttr, 0);
|
||||
createReq.serialize(request);
|
||||
return request;
|
||||
}
|
||||
|
||||
static XDR write(FileHandle handle, int xid, long offset, int count,
|
||||
byte[] data) {
|
||||
XDR request = new XDR();
|
||||
RpcCall.write(request, xid, Nfs3Constant.PROGRAM, Nfs3Constant.VERSION,
|
||||
Nfs3Constant.NFSPROC3_WRITE);
|
||||
|
||||
// credentials
|
||||
request.writeInt(0); // auth null
|
||||
request.writeInt(0); // length zero
|
||||
// verifier
|
||||
request.writeInt(0); // auth null
|
||||
request.writeInt(0); // length zero
|
||||
WRITE3Request write1 = new WRITE3Request(handle, offset, count,
|
||||
WriteStableHow.UNSTABLE, ByteBuffer.wrap(data));
|
||||
write1.serialize(request);
|
||||
return request;
|
||||
}
|
||||
|
||||
static void testRequest(XDR request) {
|
||||
RegistrationClient registrationClient = new RegistrationClient("localhost",
|
||||
Nfs3Constant.SUN_RPCBIND, request);
|
||||
registrationClient.run();
|
||||
}
|
||||
|
||||
static class WriteHandler extends SimpleTcpClientHandler {
|
||||
|
||||
public WriteHandler(XDR request) {
|
||||
super(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
|
||||
// Get handle from create response
|
||||
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
|
||||
XDR rsp = new XDR(buf.array());
|
||||
if (rsp.getBytes().length == 0) {
|
||||
LOG.info("rsp length is zero, why?");
|
||||
return;
|
||||
}
|
||||
LOG.info("rsp length=" + rsp.getBytes().length);
|
||||
|
||||
RpcReply reply = RpcReply.read(rsp);
|
||||
int xid = reply.getXid();
|
||||
// Only process the create response
|
||||
if (xid != 0x8000004c) {
|
||||
return;
|
||||
}
|
||||
int status = rsp.readInt();
|
||||
if (status != Nfs3Status.NFS3_OK) {
|
||||
LOG.error("Create failed, status =" + status);
|
||||
return;
|
||||
}
|
||||
LOG.info("Create succeeded");
|
||||
rsp.readBoolean(); // value follow
|
||||
handle = new FileHandle();
|
||||
handle.deserialize(rsp);
|
||||
channel = e.getChannel();
|
||||
}
|
||||
}
|
||||
|
||||
static class WriteClient extends SimpleTcpClient {
|
||||
|
||||
public WriteClient(String host, int port, XDR request, Boolean oneShot) {
|
||||
super(host, port, request, oneShot);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ChannelPipelineFactory setPipelineFactory() {
|
||||
this.pipelineFactory = new ChannelPipelineFactory() {
|
||||
public ChannelPipeline getPipeline() {
|
||||
return Channels.pipeline(new RpcFrameDecoder(), new WriteHandler(
|
||||
request));
|
||||
}
|
||||
};
|
||||
return this.pipelineFactory;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
|
||||
Arrays.fill(data1, (byte) 7);
|
||||
Arrays.fill(data2, (byte) 8);
|
||||
Arrays.fill(data3, (byte) 9);
|
||||
|
||||
// NFS3 Create request
|
||||
WriteClient client = new WriteClient("localhost", Nfs3Constant.PORT,
|
||||
create(), false);
|
||||
client.run();
|
||||
|
||||
while (handle == null) {
|
||||
Thread.sleep(1000);
|
||||
System.out.println("handle is still null...");
|
||||
}
|
||||
LOG.info("Send write1 request");
|
||||
|
||||
XDR writeReq;
|
||||
|
||||
writeReq = write(handle, 0x8000005c, 2000, 1000, data3);
|
||||
Nfs3Utils.writeChannel(channel, writeReq);
|
||||
writeReq = write(handle, 0x8000005d, 1000, 1000, data2);
|
||||
Nfs3Utils.writeChannel(channel, writeReq);
|
||||
writeReq = write(handle, 0x8000005e, 0, 1000, data1);
|
||||
Nfs3Utils.writeChannel(channel, writeReq);
|
||||
|
||||
// TODO: convert to Junit test, and validate result automatically
|
||||
}
|
||||
}
|
|
@ -0,0 +1,142 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.nfs;
|
||||
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hdfs.nfs.mount.RpcProgramMountd;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
|
||||
import org.apache.hadoop.oncrpc.RegistrationClient;
|
||||
import org.apache.hadoop.oncrpc.RpcCall;
|
||||
import org.apache.hadoop.oncrpc.XDR;
|
||||
import org.apache.hadoop.portmap.PortmapMapping;
|
||||
import org.apache.hadoop.portmap.PortmapRequest;
|
||||
|
||||
public class TestPortmapRegister {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(TestPortmapRegister.class);
|
||||
|
||||
static void testRequest(XDR request, XDR request2) {
|
||||
RegistrationClient registrationClient = new RegistrationClient(
|
||||
"localhost", Nfs3Constant.SUN_RPCBIND, request);
|
||||
registrationClient.run();
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
PortmapMapping mapEntry = new PortmapMapping(RpcProgramMountd.PROGRAM,
|
||||
RpcProgramMountd.VERSION_1, PortmapMapping.TRANSPORT_UDP,
|
||||
RpcProgramMountd.PORT);
|
||||
XDR mappingRequest = PortmapRequest.create(mapEntry);
|
||||
RegistrationClient registrationClient = new RegistrationClient(
|
||||
"localhost", Nfs3Constant.SUN_RPCBIND, mappingRequest);
|
||||
registrationClient.run();
|
||||
|
||||
Thread t1 = new Runtest1();
|
||||
//Thread t2 = testa.new Runtest2();
|
||||
t1.start();
|
||||
//t2.start();
|
||||
t1.join();
|
||||
//t2.join();
|
||||
//testDump();
|
||||
}
|
||||
|
||||
static class Runtest1 extends Thread {
|
||||
@Override
|
||||
public void run() {
|
||||
//testGetportMount();
|
||||
PortmapMapping mapEntry = new PortmapMapping(RpcProgramMountd.PROGRAM,
|
||||
RpcProgramMountd.VERSION_1, PortmapMapping.TRANSPORT_UDP,
|
||||
RpcProgramMountd.PORT);
|
||||
XDR req = PortmapRequest.create(mapEntry);
|
||||
testRequest(req, req);
|
||||
}
|
||||
}
|
||||
|
||||
static class Runtest2 extends Thread {
|
||||
@Override
|
||||
public void run() {
|
||||
testDump();
|
||||
}
|
||||
}
|
||||
|
||||
static void createPortmapXDRheader(XDR xdr_out, int procedure) {
|
||||
// TODO: Move this to RpcRequest
|
||||
RpcCall.write(xdr_out, 0, 100000, 2, procedure);
|
||||
xdr_out.writeInt(0); //no auth
|
||||
xdr_out.writeInt(0);
|
||||
xdr_out.writeInt(0);
|
||||
xdr_out.writeInt(0);
|
||||
|
||||
/*
|
||||
xdr_out.putInt(1); //unix auth
|
||||
xdr_out.putVariableOpaque(new byte[20]);
|
||||
xdr_out.putInt(0);
|
||||
xdr_out.putInt(0);
|
||||
*/
|
||||
}
|
||||
|
||||
static void testGetportMount() {
|
||||
XDR xdr_out = new XDR();
|
||||
|
||||
createPortmapXDRheader(xdr_out, 3);
|
||||
|
||||
xdr_out.writeInt(100005);
|
||||
xdr_out.writeInt(1);
|
||||
xdr_out.writeInt(6);
|
||||
xdr_out.writeInt(0);
|
||||
|
||||
XDR request2 = new XDR();
|
||||
|
||||
createPortmapXDRheader(xdr_out, 3);
|
||||
request2.writeInt(100005);
|
||||
request2.writeInt(1);
|
||||
request2.writeInt(6);
|
||||
request2.writeInt(0);
|
||||
|
||||
testRequest(xdr_out, request2);
|
||||
}
|
||||
|
||||
static void testGetport() {
|
||||
XDR xdr_out = new XDR();
|
||||
|
||||
createPortmapXDRheader(xdr_out, 3);
|
||||
|
||||
xdr_out.writeInt(100003);
|
||||
xdr_out.writeInt(3);
|
||||
xdr_out.writeInt(6);
|
||||
xdr_out.writeInt(0);
|
||||
|
||||
XDR request2 = new XDR();
|
||||
|
||||
createPortmapXDRheader(xdr_out, 3);
|
||||
request2.writeInt(100003);
|
||||
request2.writeInt(3);
|
||||
request2.writeInt(6);
|
||||
request2.writeInt(0);
|
||||
|
||||
testRequest(xdr_out, request2);
|
||||
}
|
||||
|
||||
static void testDump() {
|
||||
XDR xdr_out = new XDR();
|
||||
createPortmapXDRheader(xdr_out, 4);
|
||||
testRequest(xdr_out, xdr_out);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,132 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.nfs;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.DatagramPacket;
|
||||
import java.net.DatagramSocket;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
|
||||
import org.apache.hadoop.oncrpc.RpcCall;
|
||||
import org.apache.hadoop.oncrpc.XDR;
|
||||
|
||||
// TODO: convert this to Junit
|
||||
public class TestUdpServer {
|
||||
static void testRequest(XDR request, XDR request2) {
|
||||
try {
|
||||
DatagramSocket clientSocket = new DatagramSocket();
|
||||
InetAddress IPAddress = InetAddress.getByName("localhost");
|
||||
byte[] sendData = request.getBytes();
|
||||
byte[] receiveData = new byte[65535];
|
||||
|
||||
DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length,
|
||||
IPAddress, Nfs3Constant.SUN_RPCBIND);
|
||||
clientSocket.send(sendPacket);
|
||||
DatagramPacket receivePacket = new DatagramPacket(receiveData,
|
||||
receiveData.length);
|
||||
clientSocket.receive(receivePacket);
|
||||
clientSocket.close();
|
||||
|
||||
} catch (UnknownHostException e) {
|
||||
System.err.println("Don't know about host: localhost.");
|
||||
System.exit(1);
|
||||
} catch (IOException e) {
|
||||
System.err.println("Couldn't get I/O for "
|
||||
+ "the connection to: localhost.");
|
||||
System.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
Thread t1 = new Runtest1();
|
||||
// TODO: cleanup
|
||||
//Thread t2 = new Runtest2();
|
||||
t1.start();
|
||||
//t2.start();
|
||||
t1.join();
|
||||
//t2.join();
|
||||
//testDump();
|
||||
}
|
||||
|
||||
static class Runtest1 extends Thread {
|
||||
@Override
|
||||
public void run() {
|
||||
testGetportMount();
|
||||
}
|
||||
}
|
||||
|
||||
static class Runtest2 extends Thread {
|
||||
@Override
|
||||
public void run() {
|
||||
testDump();
|
||||
}
|
||||
}
|
||||
|
||||
static void createPortmapXDRheader(XDR xdr_out, int procedure) {
|
||||
// Make this a method
|
||||
RpcCall.write(xdr_out, 0, 100000, 2, procedure);
|
||||
}
|
||||
|
||||
static void testGetportMount() {
|
||||
XDR xdr_out = new XDR();
|
||||
createPortmapXDRheader(xdr_out, 3);
|
||||
xdr_out.writeInt(100005);
|
||||
xdr_out.writeInt(1);
|
||||
xdr_out.writeInt(6);
|
||||
xdr_out.writeInt(0);
|
||||
|
||||
XDR request2 = new XDR();
|
||||
createPortmapXDRheader(xdr_out, 3);
|
||||
request2.writeInt(100005);
|
||||
request2.writeInt(1);
|
||||
request2.writeInt(6);
|
||||
request2.writeInt(0);
|
||||
|
||||
testRequest(xdr_out, request2);
|
||||
}
|
||||
|
||||
static void testGetport() {
|
||||
XDR xdr_out = new XDR();
|
||||
|
||||
createPortmapXDRheader(xdr_out, 3);
|
||||
|
||||
xdr_out.writeInt(100003);
|
||||
xdr_out.writeInt(3);
|
||||
xdr_out.writeInt(6);
|
||||
xdr_out.writeInt(0);
|
||||
|
||||
XDR request2 = new XDR();
|
||||
|
||||
createPortmapXDRheader(xdr_out, 3);
|
||||
request2.writeInt(100003);
|
||||
request2.writeInt(3);
|
||||
request2.writeInt(6);
|
||||
request2.writeInt(0);
|
||||
|
||||
testRequest(xdr_out, request2);
|
||||
}
|
||||
|
||||
static void testDump() {
|
||||
XDR xdr_out = new XDR();
|
||||
createPortmapXDRheader(xdr_out, 4);
|
||||
testRequest(xdr_out, xdr_out);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
public class TestDFSClientCache {
|
||||
@Test
|
||||
public void testLruTable() throws IOException {
|
||||
DFSClientCache cache = new DFSClientCache(new Configuration(), 3);
|
||||
DFSClient client = Mockito.mock(DFSClient.class);
|
||||
cache.put("a", client);
|
||||
assertTrue(cache.containsKey("a"));
|
||||
|
||||
cache.put("b", client);
|
||||
cache.put("c", client);
|
||||
cache.put("d", client);
|
||||
assertTrue(cache.usedSize() == 3);
|
||||
assertFalse(cache.containsKey("a"));
|
||||
|
||||
// Cache should have d,c,b in LRU order
|
||||
assertTrue(cache.containsKey("b"));
|
||||
// Do a lookup to make b the most recently used
|
||||
assertTrue(cache.get("b") != null);
|
||||
|
||||
cache.put("e", client);
|
||||
assertTrue(cache.usedSize() == 3);
|
||||
// c should be replaced with e, and cache has e,b,d
|
||||
assertFalse(cache.containsKey("c"));
|
||||
assertTrue(cache.containsKey("e"));
|
||||
assertTrue(cache.containsKey("b"));
|
||||
assertTrue(cache.containsKey("d"));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,58 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestOffsetRange {
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testConstructor1() throws IOException {
|
||||
new OffsetRange(0, 0);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testConstructor2() throws IOException {
|
||||
new OffsetRange(-1, 0);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testConstructor3() throws IOException {
|
||||
new OffsetRange(-3, -1);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testConstructor4() throws IOException {
|
||||
new OffsetRange(-3, 100);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompare() throws IOException {
|
||||
OffsetRange r1 = new OffsetRange(0, 1);
|
||||
OffsetRange r2 = new OffsetRange(1, 3);
|
||||
OffsetRange r3 = new OffsetRange(1, 3);
|
||||
OffsetRange r4 = new OffsetRange(3, 4);
|
||||
|
||||
assertTrue(r2.compareTo(r3) == 0);
|
||||
assertTrue(r2.compareTo(r1) == 1);
|
||||
assertTrue(r2.compareTo(r4) == -1);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,66 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
|
||||
/**
|
||||
* Tests for {@link RpcProgramNfs3}
|
||||
*/
|
||||
public class TestRpcProgramNfs3 {
|
||||
@Test(timeout=1000)
|
||||
public void testIdempotent() {
|
||||
int[][] procedures = {
|
||||
{ Nfs3Constant.NFSPROC3_NULL, 1 },
|
||||
{ Nfs3Constant.NFSPROC3_GETATTR, 1 },
|
||||
{ Nfs3Constant.NFSPROC3_SETATTR, 1 },
|
||||
{ Nfs3Constant.NFSPROC3_LOOKUP, 1 },
|
||||
{ Nfs3Constant.NFSPROC3_ACCESS, 1 },
|
||||
{ Nfs3Constant.NFSPROC3_READLINK, 1 },
|
||||
{ Nfs3Constant.NFSPROC3_READ, 1 },
|
||||
{ Nfs3Constant.NFSPROC3_WRITE, 1 },
|
||||
{ Nfs3Constant.NFSPROC3_CREATE, 0 },
|
||||
{ Nfs3Constant.NFSPROC3_MKDIR, 0 },
|
||||
{ Nfs3Constant.NFSPROC3_SYMLINK, 0 },
|
||||
{ Nfs3Constant.NFSPROC3_MKNOD, 0 },
|
||||
{ Nfs3Constant.NFSPROC3_REMOVE, 0 },
|
||||
{ Nfs3Constant.NFSPROC3_RMDIR, 0 },
|
||||
{ Nfs3Constant.NFSPROC3_RENAME, 0 },
|
||||
{ Nfs3Constant.NFSPROC3_LINK, 0 },
|
||||
{ Nfs3Constant.NFSPROC3_READDIR, 1 },
|
||||
{ Nfs3Constant.NFSPROC3_READDIRPLUS, 1 },
|
||||
{ Nfs3Constant.NFSPROC3_FSSTAT, 1 },
|
||||
{ Nfs3Constant.NFSPROC3_FSINFO, 1 },
|
||||
{ Nfs3Constant.NFSPROC3_PATHCONF, 1 },
|
||||
{ Nfs3Constant.NFSPROC3_COMMIT, 1 } };
|
||||
for (int[] procedure : procedures) {
|
||||
boolean idempotent = procedure[1] == 1;
|
||||
int proc = procedure[0];
|
||||
if (idempotent) {
|
||||
Assert.assertTrue(("Procedure " + proc + " should be idempotent"),
|
||||
RpcProgramNfs3.isIdempotent(proc));
|
||||
} else {
|
||||
Assert.assertFalse(("Procedure " + proc + " should be non-idempotent"),
|
||||
RpcProgramNfs3.isIdempotent(proc));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -12,6 +12,8 @@ Trunk (Unreleased)
|
|||
|
||||
HDFS-4659 Support setting execution bit for regular files (Brandon Li via sanjay)
|
||||
|
||||
HDFS-4762 Provide HDFS based NFSv3 and Mountd implementation (brandonli)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
HDFS-4665. Move TestNetworkTopologyWithNodeGroup to common.
|
||||
|
|
|
@ -34,6 +34,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
|||
<module>hadoop-hdfs</module>
|
||||
<module>hadoop-hdfs-httpfs</module>
|
||||
<module>hadoop-hdfs/src/contrib/bkjournal</module>
|
||||
<module>hadoop-hdfs-nfs</module>
|
||||
</modules>
|
||||
|
||||
<build>
|
||||
|
|
Loading…
Reference in New Issue