HDFS-5252. Stable write is not handled correctly in someplace. Contributed by Brandon Li

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1539740 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Brandon Li 2013-11-07 18:02:37 +00:00
parent 87dc1b6553
commit 16c6755554
7 changed files with 189 additions and 1 deletions

View File

@ -19,8 +19,11 @@ package org.apache.hadoop.nfs.nfs3.request;
import java.io.IOException;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.oncrpc.XDR;
import com.google.common.annotations.VisibleForTesting;
/**
* READ3 Request
*/
@ -34,6 +37,13 @@ public class READ3Request extends RequestWithHandle {
count = xdr.readInt();
}
@VisibleForTesting
public READ3Request(FileHandle handle, long offset, int count) {
super(handle);
this.offset = offset;
this.count = count;
}
public long getOffset() {
return this.offset;
}
@ -41,4 +51,11 @@ public class READ3Request extends RequestWithHandle {
public int getCount() {
return this.count;
}
@Override
public void serialize(XDR xdr) {
handle.serialize(xdr);
xdr.writeLongAsHyper(offset);
xdr.writeInt(count);
}
}

View File

@ -109,6 +109,12 @@ public class Nfs3Utils {
* Send a write response to the netty network socket channel
*/
public static void writeChannel(Channel channel, XDR out, int xid) {
if (channel == null) {
RpcProgramNfs3.LOG
.info("Null channel should only happen in tests. Do nothing.");
return;
}
if (RpcProgramNfs3.LOG.isDebugEnabled()) {
RpcProgramNfs3.LOG.debug(WRITE_RPC_END + xid);
}

View File

@ -1007,6 +1007,23 @@ class OpenFileCtx {
}
if (!writeCtx.getReplied()) {
if (stableHow != WriteStableHow.UNSTABLE) {
LOG.info("Do sync for stable write:" + writeCtx);
try {
if (stableHow == WriteStableHow.DATA_SYNC) {
fos.hsync();
} else {
Preconditions.checkState(stableHow == WriteStableHow.FILE_SYNC,
"Unknown WriteStableHow:" + stableHow);
// Sync file data and length
fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
}
} catch (IOException e) {
LOG.error("hsync failed with writeCtx:" + writeCtx + " error:" + e);
throw e;
}
}
WccAttr preOpAttr = latestAttr.getWccAttr();
WccData fileWcc = new WccData(preOpAttr, latestAttr);
if (writeCtx.getOriginalCount() != WriteCtx.INVALID_ORIGINAL_COUNT) {

View File

@ -126,6 +126,8 @@ import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import com.google.common.annotations.VisibleForTesting;
/**
* RPC program corresponding to nfs daemon. See {@link Nfs3}.
*/
@ -1975,4 +1977,9 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
}
return true;
}
@VisibleForTesting
WriteManager getWriteManager() {
return this.writeManager;
}
}

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.util.Daemon;
import org.jboss.netty.channel.Channel;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
/**
@ -263,6 +264,11 @@ public class WriteManager {
return attr;
}
@VisibleForTesting
ConcurrentMap<FileHandle, OpenFileCtx> getOpenFileMap() {
return this.openFileMap;
}
/**
* StreamMonitor wakes up periodically to find and closes idle streams.
*/

View File

@ -17,21 +17,41 @@
*/
package org.apache.hadoop.hdfs.nfs.nfs3;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import junit.framework.Assert;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS;
import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.CommitCtx;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
import org.apache.hadoop.nfs.nfs3.request.CREATE3Request;
import org.apache.hadoop.nfs.nfs3.request.READ3Request;
import org.apache.hadoop.nfs.nfs3.request.SetAttr3;
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
import org.apache.hadoop.nfs.nfs3.response.CREATE3Response;
import org.apache.hadoop.nfs.nfs3.response.READ3Response;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.SecurityHandler;
import org.junit.Test;
import org.mockito.Mockito;
@ -162,4 +182,117 @@ public class TestWrites {
ret = ctx.checkCommit(dfsClient, 0, null, 1, attr);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
}
private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime)
throws InterruptedException {
int waitedTime = 0;
ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = nfsd.getWriteManager()
.getOpenFileMap();
OpenFileCtx ctx = openFileMap.get(handle);
assertTrue(ctx != null);
do {
Thread.sleep(3000);
waitedTime += 3000;
if (ctx.getPendingWritesForTest().size() == 0) {
return;
}
} while (waitedTime < maxWaitTime);
fail("Write can't finish.");
}
@Test
public void testWriteStableHow() throws IOException, InterruptedException {
HdfsConfiguration config = new HdfsConfiguration();
DFSClient client = null;
MiniDFSCluster cluster = null;
RpcProgramNfs3 nfsd;
SecurityHandler securityHandler = Mockito.mock(SecurityHandler.class);
Mockito.when(securityHandler.getUser()).thenReturn(
System.getProperty("user.name"));
try {
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
cluster.waitActive();
client = new DFSClient(NameNode.getAddress(config), config);
// Start nfs
List<String> exports = new ArrayList<String>();
exports.add("/");
Nfs3 nfs3 = new Nfs3(exports, config);
nfs3.start(false);
nfsd = (RpcProgramNfs3) nfs3.getRpcProgram();
HdfsFileStatus status = client.getFileInfo("/");
FileHandle rootHandle = new FileHandle(status.getFileId());
// Create file1
CREATE3Request createReq = new CREATE3Request(rootHandle, "file1",
Nfs3Constant.CREATE_UNCHECKED, new SetAttr3(), 0);
XDR createXdr = new XDR();
createReq.serialize(createXdr);
CREATE3Response createRsp = nfsd.create(createXdr.asReadOnlyWrap(),
securityHandler, InetAddress.getLocalHost());
FileHandle handle = createRsp.getObjHandle();
// Test DATA_SYNC
byte[] buffer = new byte[10];
for (int i = 0; i < 10; i++) {
buffer[i] = (byte) i;
}
WRITE3Request writeReq = new WRITE3Request(handle, 0, 10,
WriteStableHow.DATA_SYNC, ByteBuffer.wrap(buffer));
XDR writeXdr = new XDR();
writeReq.serialize(writeXdr);
nfsd.write(writeXdr.asReadOnlyWrap(), null, 1, securityHandler,
InetAddress.getLocalHost());
waitWrite(nfsd, handle, 60000);
// Readback
READ3Request readReq = new READ3Request(handle, 0, 10);
XDR readXdr = new XDR();
readReq.serialize(readXdr);
READ3Response readRsp = nfsd.read(readXdr.asReadOnlyWrap(),
securityHandler, InetAddress.getLocalHost());
assertTrue(Arrays.equals(buffer, readRsp.getData().array()));
// Test FILE_SYNC
// Create file2
CREATE3Request createReq2 = new CREATE3Request(rootHandle, "file2",
Nfs3Constant.CREATE_UNCHECKED, new SetAttr3(), 0);
XDR createXdr2 = new XDR();
createReq2.serialize(createXdr2);
CREATE3Response createRsp2 = nfsd.create(createXdr2.asReadOnlyWrap(),
securityHandler, InetAddress.getLocalHost());
FileHandle handle2 = createRsp2.getObjHandle();
WRITE3Request writeReq2 = new WRITE3Request(handle2, 0, 10,
WriteStableHow.FILE_SYNC, ByteBuffer.wrap(buffer));
XDR writeXdr2 = new XDR();
writeReq2.serialize(writeXdr2);
nfsd.write(writeXdr2.asReadOnlyWrap(), null, 1, securityHandler,
InetAddress.getLocalHost());
waitWrite(nfsd, handle2, 60000);
// Readback
READ3Request readReq2 = new READ3Request(handle2, 0, 10);
XDR readXdr2 = new XDR();
readReq2.serialize(readXdr2);
READ3Response readRsp2 = nfsd.read(readXdr2.asReadOnlyWrap(),
securityHandler, InetAddress.getLocalHost());
assertTrue(Arrays.equals(buffer, readRsp2.getData().array()));
// FILE_SYNC should sync the file size
status = client.getFileInfo("/file2");
assertTrue(status.getLen() == 10);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -588,6 +588,8 @@ Release 2.2.1 - UNRELEASED
HDFS-5458. Datanode failed volume threshold ignored if exception is thrown
in getDataDirsFromURIs. (Mike Mellenthin via wang)
HDFS-5252. Stable write is not handled correctly in someplace. (brandonli)
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES